mirror of
https://github.com/clearml/clearml-agent
synced 2025-06-26 18:16:15 +00:00
Compare commits
23 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
efa1f71dac | ||
|
|
692cb8cf13 | ||
|
|
ebdc215632 | ||
|
|
b2da639582 | ||
|
|
71fdb43f10 | ||
|
|
ca2791c65e | ||
|
|
dd75cedaab | ||
|
|
669fb1a6e5 | ||
|
|
5d517c91b5 | ||
|
|
6be75abc86 | ||
|
|
4c777fa2ee | ||
|
|
dc5e0033c8 | ||
|
|
3dd5973734 | ||
|
|
53d379205f | ||
|
|
57cde21c48 | ||
|
|
396abf13b6 | ||
|
|
6e7fb5f331 | ||
|
|
1d5c118b70 | ||
|
|
18612aac4d | ||
|
|
76c533a2e8 | ||
|
|
9eee213683 | ||
|
|
e4861fc0fb | ||
|
|
53ef984065 |
29
README.md
29
README.md
@@ -24,7 +24,7 @@ ML-Ops scheduler & orchestration solution supporting Linux, macOS and Windows**
|
||||
* Launch-and-Forget service containers
|
||||
* [Cloud autoscaling](https://clear.ml/docs/latest/docs/guides/services/aws_autoscaler)
|
||||
* [Customizable cleanup](https://clear.ml/docs/latest/docs/guides/services/cleanup_service)
|
||||
|
||||
*
|
||||
Advanced [pipeline building and execution](https://clear.ml/docs/latest/docs/guides/frameworks/pytorch/notebooks/table/tabular_training_pipeline)
|
||||
|
||||
It is a zero configuration fire-and-forget execution agent, providing a full ML/DL cluster solution.
|
||||
@@ -33,12 +33,12 @@ It is a zero configuration fire-and-forget execution agent, providing a full ML/
|
||||
|
||||
1. ClearML Server [self-hosted](https://github.com/allegroai/clearml-server)
|
||||
or [free tier hosting](https://app.clear.ml)
|
||||
2. `pip install clearml-agent` ([install](#installing-the-clearml-agent) the ClearML Agent on any CPU/GPU machine:
|
||||
2. `pip install clearml-agent` ([install](#installing-the-clearml-agent) the ClearML Agent on any GPU machine:
|
||||
on-premises / cloud / ...)
|
||||
3. Create a [job](https://github.com/allegroai/clearml/docs/clearml-task.md) or
|
||||
Add [ClearML](https://github.com/allegroai/clearml) to your code with just 2 lines
|
||||
4. Change the [parameters](#using-the-clearml-agent) in the UI & schedule for [execution](#using-the-clearml-agent) (or
|
||||
automate with a [pipelines](#automl-and-orchestration-pipelines-))
|
||||
automate with an [AutoML pipeline](#automl-and-orchestration-pipelines-))
|
||||
5. :chart_with_downwards_trend: :chart_with_upwards_trend: :eyes: :beer:
|
||||
|
||||
"All the Deep/Machine-Learning DevOps your research needs, and then some... Because ain't nobody got time for that"
|
||||
@@ -313,31 +313,28 @@ clearml-agent daemon --services-mode --detached --queue services --create-queue
|
||||
|
||||
**Note**: It is the user's responsibility to make sure the proper tasks are pushed into the specified queue.
|
||||
|
||||
### Orchestration and Pipelines <a name="automl-pipes"></a>
|
||||
### AutoML and Orchestration Pipelines <a name="automl-pipes"></a>
|
||||
|
||||
The ClearML Agent can also be used to orchestrate and automate Pipelines in conjunction with the
|
||||
The ClearML Agent can also be used to implement AutoML orchestration and Experiment Pipelines in conjunction with the
|
||||
ClearML package.
|
||||
|
||||
Sample automation examples can be found in the
|
||||
ClearML [pipelines](https://github.com/allegroai/clearml/tree/master/examples/pipeline) / [automation](https://github.com/allegroai/clearml/tree/master/examples/automation) folder.
|
||||
Sample AutoML & Orchestration examples can be found in the
|
||||
ClearML [example/automation](https://github.com/allegroai/clearml/tree/master/examples/automation) folder.
|
||||
|
||||
HPO examples
|
||||
AutoML examples
|
||||
|
||||
- [Toy Keras training experiment](https://github.com/allegroai/clearml/blob/master/examples/optimization/hyper-parameter-optimization/base_template_keras_simple.py)
|
||||
- In order to create an experiment-template in the system, this code must be executed once manually
|
||||
- [Manual Search over the above Keras experiment-template](https://github.com/allegroai/clearml/blob/master/examples/automation/manual_random_param_search_example.py)
|
||||
- [Random Search over the above Keras experiment-template](https://github.com/allegroai/clearml/blob/master/examples/automation/manual_random_param_search_example.py)
|
||||
- This example will create multiple copies of the Keras experiment-template, with different hyper-parameter
|
||||
combinations
|
||||
- [Optimized Bayesian search over the above Keras experiment-template](https://github.com/allegroai/clearml/blob/master/examples/optimization/hyper-parameter-optimization/hyper_parameter_optimizer.py)
|
||||
- This example will create multiple copies of the Keras experiment-template, with different hyper-parameter combinations launch them on remote machines, monitor the metric (i.e. loss) decide which one has the best potential and abort the others
|
||||
|
||||
|
||||
Experiment Pipeline examples
|
||||
|
||||
- [Build DAG from Tasks](https://github.com/allegroai/clearml/blob/master/examples/pipeline/pipeline_from_tasks.py)
|
||||
- This example will build a DAG processing flow from existing Tasks and launch them on remote machines
|
||||
- [Logic Driven Pipeline](https://github.com/allegroai/clearml/blob/master/examples/pipeline/pipeline_from_decorator.py)
|
||||
- This example will run any component function as a standalone Task on a remote machine, it will auto-parallelize jobs, cache results and automatically serialize data between remote machines.
|
||||
- [First step experiment](https://github.com/allegroai/clearml/blob/master/examples/automation/task_piping_example.py)
|
||||
- This example will "process data", and once done, will launch a copy of the 'second step' experiment-template
|
||||
- [Second step experiment](https://github.com/allegroai/clearml/blob/master/examples/automation/toy_base_task.py)
|
||||
- In order to create an experiment-template in the system, this code must be executed once manually
|
||||
|
||||
### License
|
||||
|
||||
|
||||
@@ -18,6 +18,8 @@
|
||||
# https://docs.gitlab.com/ee/user/profile/personal_access_tokens.html
|
||||
# git_user: ""
|
||||
# git_pass: ""
|
||||
# Limit credentials to a single domain, for example: github.com,
|
||||
# all other domains will use public access (no user/pass). Default: always send user/pass for any VCS domain
|
||||
# git_host: ""
|
||||
|
||||
# Force GIT protocol to use SSH regardless of the git url (Assumes GIT user/pass are blank)
|
||||
@@ -64,7 +66,7 @@
|
||||
type: pip,
|
||||
|
||||
# specify pip version to use (examples "<20.2", "==19.3.1", "", empty string will install the latest version)
|
||||
pip_version: "<21",
|
||||
pip_version: ["<20.2 ; python_version < '3.10'", "<22.3 ; python_version >= '3.10'"],
|
||||
# specify poetry version to use (examples "<2", "==1.1.1", "", empty string will install the latest version)
|
||||
# poetry_version: "<2",
|
||||
|
||||
@@ -75,7 +77,7 @@
|
||||
force_upgrade: false,
|
||||
|
||||
# additional artifact repositories to use when installing python packages
|
||||
# extra_index_url: ["https://allegroai.jfrog.io/clearmlai/api/pypi/public/simple"]
|
||||
# extra_index_url: ["https://allegroai.jfrog.io/clearml/api/pypi/public/simple"]
|
||||
|
||||
# additional conda channels to use when installing with conda package manager
|
||||
conda_channels: ["pytorch", "conda-forge", "defaults", ]
|
||||
@@ -215,8 +217,8 @@
|
||||
# default is True, report a single \r line in a sequence of consecutive lines, per 5 seconds.
|
||||
# suppress_carriage_return: true
|
||||
|
||||
# cuda versions used for solving pytorch wheel packages
|
||||
# should be detected automatically. Override with os environment CUDA_VERSION / CUDNN_VERSION
|
||||
# CUDA versions used for Conda setup & solving PyTorch wheel packages
|
||||
# Should be detected automatically. Override with os environment CUDA_VERSION / CUDNN_VERSION
|
||||
# cuda_version: 10.1
|
||||
# cudnn_version: 7.6
|
||||
|
||||
@@ -252,9 +254,9 @@
|
||||
|
||||
# Name docker containers created by the daemon using the following string format (supported from Docker 0.6.5)
|
||||
# Allowed variables are task_id, worker_id and rand_string (random lower-case letters string, up to 32 characters)
|
||||
# Note: resulting name must start with an alphanumeric character and continue with alphanumeric characters,
|
||||
# underscores (_), dots (.) and/or dashes (-)
|
||||
#docker_container_name_format: "clearml-id-{task_id}-{rand_string:.8}"
|
||||
# Note: resulting name must start with an alphanumeric character and
|
||||
# continue with alphanumeric characters, underscores (_), dots (.) and/or dashes (-)
|
||||
# docker_container_name_format: "clearml-id-{task_id}-{rand_string:.8}"
|
||||
|
||||
# Apply top-level environment section from configuration into os.environ
|
||||
apply_environment: true
|
||||
@@ -325,4 +327,14 @@
|
||||
# into the file specified in CLEARML_CUSTOM_BUILD_OUTPUT, the agent will emit a warning and continue with the
|
||||
# standard flow.
|
||||
custom_build_script: ""
|
||||
|
||||
# Crash on exception: by default when encountering an exception while running a task,
|
||||
# the agent will catch the exception, log it and continue running.
|
||||
# Set this to `true` to propagate exceptions and crash the agent.
|
||||
# crash_on_exception: true
|
||||
|
||||
# Disable task docker override. If true, the agent will use the default docker image and ignore any docker image
|
||||
# and arguments specified in the task's container section (setup shell script from the task container section will
|
||||
# be used in any case, of specified).
|
||||
disable_task_docker_override: false
|
||||
}
|
||||
|
||||
@@ -66,11 +66,16 @@ class DataModel(object):
|
||||
}
|
||||
|
||||
def validate(self, schema=None):
|
||||
jsonschema.validate(
|
||||
self.to_dict(),
|
||||
schema or self._schema,
|
||||
types=dict(array=(list, tuple), integer=six.integer_types),
|
||||
schema = schema or self._schema
|
||||
validator = jsonschema.validators.validator_for(schema)
|
||||
validator_cls = jsonschema.validators.extend(
|
||||
validator=validator,
|
||||
type_checker=validator.TYPE_CHECKER.redefine_many({
|
||||
"array": lambda s, instance: isinstance(instance, (list, tuple)),
|
||||
"integer": lambda s, instance: isinstance(instance, six.integer_types),
|
||||
}),
|
||||
)
|
||||
jsonschema.validate(self.to_dict(), schema, cls=validator_cls)
|
||||
|
||||
def __repr__(self):
|
||||
return '<{}.{}: {}>'.format(
|
||||
|
||||
@@ -114,7 +114,7 @@ class Session(TokenManager):
|
||||
|
||||
if ENV_API_DEFAULT_REQ_METHOD.get(default=None):
|
||||
# Make sure we update the config object, so we pass it into the new containers when we map them
|
||||
self.config["api.http.default_method"] = ENV_API_DEFAULT_REQ_METHOD.get()
|
||||
self.config.put("api.http.default_method", ENV_API_DEFAULT_REQ_METHOD.get())
|
||||
# notice the default setting of Request.def_method are already set by the OS environment
|
||||
elif self.config.get("api.http.default_method", None):
|
||||
def_method = str(self.config.get("api.http.default_method", None)).strip()
|
||||
|
||||
@@ -294,6 +294,9 @@ class Config(object):
|
||||
)
|
||||
return value
|
||||
|
||||
def put(self, key, value):
|
||||
self._config.put(key, value)
|
||||
|
||||
def to_dict(self):
|
||||
return self._config.as_plain_ordered_dict()
|
||||
|
||||
|
||||
@@ -14,6 +14,14 @@ except ImportError:
|
||||
ConverterType = TypeVar("ConverterType", bound=Callable[[Any], Any])
|
||||
|
||||
|
||||
def text_to_int(value, default=0):
|
||||
# type: (Any, int) -> int
|
||||
try:
|
||||
return int(value)
|
||||
except (ValueError, TypeError):
|
||||
return default
|
||||
|
||||
|
||||
def base64_to_text(value):
|
||||
# type: (Any) -> Text
|
||||
return base64.b64decode(value).decode("utf-8")
|
||||
|
||||
@@ -1,14 +1,15 @@
|
||||
from __future__ import print_function
|
||||
|
||||
from six.moves import input
|
||||
from typing import Dict, Optional
|
||||
|
||||
from pathlib2 import Path
|
||||
from six.moves import input
|
||||
from six.moves.urllib.parse import urlparse
|
||||
|
||||
from clearml_agent.external.pyhocon import ConfigFactory, ConfigMissingException
|
||||
from clearml_agent.backend_api.session import Session
|
||||
from clearml_agent.backend_api.session.defs import ENV_HOST
|
||||
from clearml_agent.backend_config.defs import LOCAL_CONFIG_FILES
|
||||
|
||||
from clearml_agent.external.pyhocon import ConfigFactory, ConfigMissingException
|
||||
|
||||
description = """
|
||||
Please create new clearml credentials through the settings page in your `clearml-server` web app,
|
||||
@@ -112,6 +113,21 @@ def main():
|
||||
print('Exiting setup without creating configuration file')
|
||||
return
|
||||
|
||||
selection = input_options(
|
||||
'Default Output URI (used to automatically store models and artifacts)',
|
||||
{'N': 'None', 'S': 'ClearML Server', 'C': 'Custom'},
|
||||
default='None'
|
||||
)
|
||||
if selection == 'Custom':
|
||||
print('Custom Default Output URI: ', end='')
|
||||
default_output_uri = input().strip()
|
||||
elif selection == "ClearML Server":
|
||||
default_output_uri = files_host
|
||||
else:
|
||||
default_output_uri = None
|
||||
|
||||
print('\nDefault Output URI: {}'.format(default_output_uri if default_output_uri else 'not set'))
|
||||
|
||||
# get GIT User/Pass for cloning
|
||||
print('Enter git username for repository cloning (leave blank for SSH key authentication): [] ', end='')
|
||||
git_user = input()
|
||||
@@ -179,6 +195,13 @@ def main():
|
||||
'agent.package_manager.extra_index_url= ' \
|
||||
'[\n{}\n]\n\n'.format("\n".join(map("\"{}\"".format, extra_index_urls)))
|
||||
f.write(extra_index_str)
|
||||
if default_output_uri:
|
||||
default_output_url_str = '# Default Task output_uri. if output_uri is not provided to Task.init, ' \
|
||||
'default_output_uri will be used instead.\n' \
|
||||
'sdk.development.default_output_uri="{}"\n' \
|
||||
'\n'.format(default_output_uri.strip('"'))
|
||||
f.write(default_output_url_str)
|
||||
default_conf = default_conf.replace('default_output_uri: ""', '# default_output_uri: ""')
|
||||
f.write(default_conf)
|
||||
except Exception:
|
||||
print('Error! Could not write configuration file at: {}'.format(str(conf_file)))
|
||||
@@ -305,6 +328,25 @@ def input_url(host_type, host=None):
|
||||
return host
|
||||
|
||||
|
||||
def input_options(message, options, default=None):
|
||||
# type: (str, Dict[str, str], Optional[str]) -> str
|
||||
options_msg = "/".join(
|
||||
"".join(('(' + c.upper() + ')') if c == o else c for c in option)
|
||||
for o, option in options.items()
|
||||
)
|
||||
if default:
|
||||
options_msg += " [{}]".format(default)
|
||||
while True:
|
||||
print('{}: {} '.format(message, options_msg), end='')
|
||||
res = input().strip()
|
||||
if not res:
|
||||
return default
|
||||
elif res.lower() in options:
|
||||
return options[res.lower()]
|
||||
elif res.upper() in options:
|
||||
return options[res.upper()]
|
||||
|
||||
|
||||
def input_host_port(host_type, parsed_host):
|
||||
print('Enter port for {} host '.format(host_type), end='')
|
||||
replace_port = input().lower()
|
||||
|
||||
@@ -41,6 +41,7 @@ from clearml_agent.backend_api.session.defs import (
|
||||
ENV_VENV_CONFIGURED, ENV_PROPAGATE_EXITCODE, )
|
||||
from clearml_agent.backend_config.defs import UptimeConf
|
||||
from clearml_agent.backend_config.utils import apply_environment, apply_files
|
||||
from clearml_agent.backend_config.converters import text_to_int
|
||||
from clearml_agent.commands.base import resolve_names, ServiceCommandSection
|
||||
from clearml_agent.commands.resolver import resolve_default_container
|
||||
from clearml_agent.definitions import (
|
||||
@@ -56,10 +57,7 @@ from clearml_agent.definitions import (
|
||||
ENV_WORKER_ID,
|
||||
ENV_WORKER_TAGS,
|
||||
ENV_DOCKER_SKIP_GPUS_FLAG,
|
||||
ENV_AGENT_SECRET_KEY,
|
||||
ENV_AGENT_AUTH_TOKEN,
|
||||
ENV_AWS_SECRET_KEY,
|
||||
ENV_AZURE_ACCOUNT_KEY,
|
||||
ENV_AGENT_DISABLE_SSH_MOUNT,
|
||||
ENV_SSH_AUTH_SOCK,
|
||||
ENV_AGENT_SKIP_PIP_VENV_INSTALL,
|
||||
@@ -70,6 +68,7 @@ from clearml_agent.definitions import (
|
||||
ENV_DEBUG_INFO,
|
||||
ENV_CHILD_AGENTS_COUNT_CMD,
|
||||
ENV_DOCKER_ARGS_FILTERS,
|
||||
ENV_FORCE_SYSTEM_SITE_PACKAGES,
|
||||
)
|
||||
from clearml_agent.definitions import WORKING_REPOSITORY_DIR, PIP_EXTRA_INDICES
|
||||
from clearml_agent.errors import (
|
||||
@@ -686,6 +685,10 @@ class Worker(ServiceCommandSection):
|
||||
else:
|
||||
self._docker_args_filters = []
|
||||
|
||||
self._task_ping_interval_sec = max(
|
||||
0, text_to_int(self._session.config.get("agent.task_ping_interval_sec", 120.0))
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def _verify_command_states(cls, kwargs):
|
||||
"""
|
||||
@@ -732,7 +735,7 @@ class Worker(ServiceCommandSection):
|
||||
pass
|
||||
|
||||
def run_one_task(self, queue, task_id, worker_args, docker=None, task_session=None):
|
||||
# type: (Text, Text, WorkerParams, Optional[Text]) -> int
|
||||
# type: (Text, Text, WorkerParams, Optional[Text], Optional[Session]) -> int
|
||||
"""
|
||||
Run one task pulled from queue.
|
||||
:param queue: ID of queue that task was pulled from
|
||||
@@ -777,10 +780,18 @@ class Worker(ServiceCommandSection):
|
||||
except Exception:
|
||||
task_container = {}
|
||||
|
||||
default_docker = not bool(task_container.get('image'))
|
||||
docker_image = task_container.get('image') or self._docker_image
|
||||
docker_arguments = task_container.get(
|
||||
'arguments', self._docker_arguments if default_docker else None)
|
||||
default_docker = (
|
||||
self._session.config.get('agent.disable_task_docker_override', False)
|
||||
or not bool(task_container.get('image'))
|
||||
)
|
||||
if default_docker:
|
||||
docker_image = self._docker_image
|
||||
docker_arguments = self._docker_arguments
|
||||
else:
|
||||
docker_image = task_container.get('image') or self._docker_image
|
||||
docker_arguments = task_container.get(
|
||||
'arguments', self._docker_arguments if default_docker else None)
|
||||
|
||||
docker_setup_script = task_container.get('setup_shell_script')
|
||||
|
||||
self.send_logs(
|
||||
@@ -958,6 +969,7 @@ class Worker(ServiceCommandSection):
|
||||
if not (result.ok() and result.response):
|
||||
return
|
||||
new_session = copy(session)
|
||||
new_session.api_client = None
|
||||
new_session.set_auth_token(result.response.token)
|
||||
return new_session
|
||||
|
||||
@@ -1166,7 +1178,7 @@ class Worker(ServiceCommandSection):
|
||||
print("No tasks in Queues, sleeping for {:.1f} seconds".format(self._polling_interval))
|
||||
sleep(self._polling_interval)
|
||||
|
||||
if self._session.config["agent.reload_config"]:
|
||||
if self._session.config.get("agent.reload_config", False):
|
||||
self.reload_config()
|
||||
finally:
|
||||
# if we are in dynamic gpus mode, shutdown all active runs
|
||||
@@ -1197,7 +1209,7 @@ class Worker(ServiceCommandSection):
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
worker_name = self._session.config["agent.worker_name"] + ':gpu'
|
||||
worker_name = self._session.config.get("agent.worker_name", "") + ':gpu'
|
||||
our_workers = [
|
||||
w.id for w in response.workers
|
||||
if w.id.startswith(worker_name) and w.id != self.worker_id]
|
||||
@@ -1548,10 +1560,14 @@ class Worker(ServiceCommandSection):
|
||||
gpu_indexes=gpu_indexes,
|
||||
gpu_queues=dynamic_gpus,
|
||||
)
|
||||
except Exception:
|
||||
except Exception as e:
|
||||
tb = six.text_type(traceback.format_exc())
|
||||
print("FATAL ERROR:")
|
||||
print(tb)
|
||||
|
||||
if self._session.config.get("agent.crash_on_exception", False):
|
||||
raise e
|
||||
|
||||
crash_file, name = safe_mkstemp(prefix=".clearml_agent-crash", suffix=".log")
|
||||
try:
|
||||
with crash_file:
|
||||
@@ -1657,7 +1673,9 @@ class Worker(ServiceCommandSection):
|
||||
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
config_data = self._session.config.as_plain_ordered_dict() if config is None else config.as_plain_ordered_dict()
|
||||
config_data = (
|
||||
self._session.config.as_plain_ordered_dict() if config is None else config.as_plain_ordered_dict()
|
||||
)
|
||||
if clean_api_credentials:
|
||||
api = config_data.get("api")
|
||||
if api:
|
||||
@@ -1730,6 +1748,7 @@ class Worker(ServiceCommandSection):
|
||||
stopping = False
|
||||
status = None
|
||||
process = None
|
||||
last_task_ping = 0
|
||||
try:
|
||||
_last_machine_update_ts = time()
|
||||
stop_reason = None
|
||||
@@ -1765,6 +1784,17 @@ class Worker(ServiceCommandSection):
|
||||
if stderr:
|
||||
stderr.flush()
|
||||
|
||||
if self._task_ping_interval_sec and time() - last_task_ping > self._task_ping_interval_sec:
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
res = (session or self._session).send(tasks_api.PingRequest(task=task_id))
|
||||
if not res:
|
||||
self.log.error("Failed sending ping for task %s: %s", task_id, res.response)
|
||||
except Exception as ex:
|
||||
self.log.error("Failed sending ping: %s", str(ex))
|
||||
finally:
|
||||
self._task_ping_interval_sec = time()
|
||||
|
||||
# get diff from previous poll
|
||||
printed_lines, stdout_pos_count = _print_file(stdout_path, stdout_pos_count)
|
||||
if self._services_mode and not stopping and status is None:
|
||||
@@ -2055,7 +2085,10 @@ class Worker(ServiceCommandSection):
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
task_container = get_task_container(self._session, task_id)
|
||||
if task_container.get('image'):
|
||||
if (
|
||||
task_container.get('image')
|
||||
and not self._session.config.get('agent.disable_task_docker_override', False)
|
||||
):
|
||||
docker_image = task_container.get('image')
|
||||
print('Ignoring default docker image, using task docker image {}'.format(docker_image))
|
||||
docker_arguments = task_container.get('arguments')
|
||||
@@ -3423,7 +3456,6 @@ class Worker(ServiceCommandSection):
|
||||
temp_config.put("sdk.storage.cache.default_base_dir", mounted_cache_dir)
|
||||
temp_config.put("agent.pip_download_cache.path", mounted_pip_dl_dir)
|
||||
temp_config.put("agent.vcs_cache.path", mounted_vcs_cache)
|
||||
temp_config.put("agent.package_manager.system_site_packages", True)
|
||||
temp_config.put("agent.package_manager.conda_env_as_base_docker", False)
|
||||
temp_config.put("agent.default_python", "")
|
||||
temp_config.put("agent.python_binary", "")
|
||||
@@ -3435,6 +3467,11 @@ class Worker(ServiceCommandSection):
|
||||
temp_config.put("agent.git_pass", (ENV_AGENT_GIT_PASS.get() or
|
||||
self._session.config.get("agent.git_pass", None)))
|
||||
|
||||
force_system_site_packages = ENV_FORCE_SYSTEM_SITE_PACKAGES.get()
|
||||
force_system_site_packages = force_system_site_packages if force_system_site_packages is not None else True
|
||||
if force_system_site_packages:
|
||||
temp_config.put("agent.package_manager.system_site_packages", True)
|
||||
|
||||
if temp_config.get("agent.venvs_cache.path", None):
|
||||
temp_config.put("agent.venvs_cache.path", '/root/.clearml/venvs-cache')
|
||||
|
||||
@@ -3792,7 +3829,6 @@ class Worker(ServiceCommandSection):
|
||||
except:
|
||||
pass
|
||||
|
||||
agent_install_bash_script = []
|
||||
if os.environ.get('FORCE_LOCAL_CLEARML_AGENT_WHEEL'):
|
||||
local_wheel = os.path.expanduser(os.environ.get('FORCE_LOCAL_CLEARML_AGENT_WHEEL'))
|
||||
docker_wheel = '/tmp/{}'.format(basename(local_wheel))
|
||||
@@ -3833,9 +3869,6 @@ class Worker(ServiceCommandSection):
|
||||
if preprocess_bash_script:
|
||||
bash_script = preprocess_bash_script + bash_script
|
||||
|
||||
if agent_install_bash_script:
|
||||
bash_script += agent_install_bash_script
|
||||
|
||||
docker_bash_script = " ; ".join([line for line in bash_script if line]) \
|
||||
if not isinstance(bash_script, str) else bash_script
|
||||
|
||||
@@ -3844,10 +3877,10 @@ class Worker(ServiceCommandSection):
|
||||
update_scheme += (
|
||||
docker_bash_script + " ; " +
|
||||
"[ ! -z $LOCAL_PYTHON ] || export LOCAL_PYTHON={python} ; " +
|
||||
"$LOCAL_PYTHON -m pip install -U \"pip{pip_version}\" ; " +
|
||||
"$LOCAL_PYTHON -m pip install -U {pip_version} ; " +
|
||||
"$LOCAL_PYTHON -m pip install -U {clearml_agent_wheel} ; ").format(
|
||||
python_single_digit=python_version.split('.')[0],
|
||||
python=python_version, pip_version=PackageManager.get_pip_version(),
|
||||
python=python_version, pip_version=" ".join(PackageManager.get_pip_versions(wrap='\"')),
|
||||
clearml_agent_wheel=clearml_agent_wheel,
|
||||
mount_ssh_ro=mount_ssh_ro, mount_ssh=mount_ssh,
|
||||
)
|
||||
|
||||
@@ -87,6 +87,7 @@ ENVIRONMENT_CONFIG = {
|
||||
"agent.cpu_only": EnvironmentConfig(
|
||||
names=("CLEARML_CPU_ONLY", "TRAINS_CPU_ONLY", "CPU_ONLY"), type=bool
|
||||
),
|
||||
"agent.crash_on_exception": EnvironmentConfig("CLEAMRL_AGENT_CRASH_ON_EXCEPTION", type=bool),
|
||||
"sdk.aws.s3.key": EnvironmentConfig("AWS_ACCESS_KEY_ID"),
|
||||
"sdk.aws.s3.secret": ENV_AWS_SECRET_KEY,
|
||||
"sdk.aws.s3.region": EnvironmentConfig("AWS_DEFAULT_REGION"),
|
||||
@@ -153,6 +154,11 @@ ENV_CHILD_AGENTS_COUNT_CMD = EnvironmentConfig('CLEARML_AGENT_CHILD_AGENTS_COUNT
|
||||
ENV_DOCKER_ARGS_FILTERS = EnvironmentConfig('CLEARML_AGENT_DOCKER_ARGS_FILTERS')
|
||||
ENV_DOCKER_ARGS_HIDE_ENV = EnvironmentConfig('CLEARML_AGENT_DOCKER_ARGS_HIDE_ENV')
|
||||
|
||||
ENV_FORCE_SYSTEM_SITE_PACKAGES = EnvironmentConfig('CLEARML_AGENT_FORCE_SYSTEM_SITE_PACKAGES', type=bool)
|
||||
""" Force system_site_packages: true when running tasks in containers (i.e. docker mode or k8s glue) """
|
||||
|
||||
|
||||
|
||||
ENV_CUSTOM_BUILD_SCRIPT = EnvironmentConfig('CLEARML_AGENT_CUSTOM_BUILD_SCRIPT')
|
||||
"""
|
||||
Specifies a custom environment setup script to be executed instead of installing a virtual environment.
|
||||
|
||||
@@ -9,26 +9,33 @@ import os
|
||||
import re
|
||||
import subprocess
|
||||
import tempfile
|
||||
from collections import defaultdict
|
||||
from copy import deepcopy
|
||||
from pathlib import Path
|
||||
from pprint import pformat
|
||||
from threading import Thread
|
||||
from time import sleep
|
||||
from typing import Text, List, Callable, Any, Collection, Optional, Union
|
||||
from time import sleep, time
|
||||
from typing import Text, List, Callable, Any, Collection, Optional, Union, Iterable, Dict, Tuple, Set
|
||||
|
||||
import yaml
|
||||
|
||||
from clearml_agent.backend_api.session import Request
|
||||
from clearml_agent.commands.events import Events
|
||||
from clearml_agent.commands.worker import Worker, get_task_container, set_task_container, get_next_task
|
||||
from clearml_agent.definitions import ENV_DOCKER_IMAGE
|
||||
from clearml_agent.definitions import (
|
||||
ENV_DOCKER_IMAGE,
|
||||
ENV_AGENT_GIT_USER,
|
||||
ENV_AGENT_GIT_PASS,
|
||||
ENV_FORCE_SYSTEM_SITE_PACKAGES,
|
||||
)
|
||||
from clearml_agent.errors import APIError
|
||||
from clearml_agent.glue.definitions import ENV_START_AGENT_SCRIPT_PATH
|
||||
from clearml_agent.helper.base import safe_remove_file
|
||||
from clearml_agent.helper.dicts import merge_dicts
|
||||
from clearml_agent.helper.process import get_bash_output
|
||||
from clearml_agent.helper.process import get_bash_output, stringify_bash_output
|
||||
from clearml_agent.helper.resource_monitor import ResourceMonitor
|
||||
from clearml_agent.interface.base import ObjectID
|
||||
from clearml_agent.backend_api.session import Request
|
||||
from clearml_agent.glue.definitions import ENV_START_AGENT_SCRIPT_PATH
|
||||
|
||||
|
||||
|
||||
class K8sIntegration(Worker):
|
||||
@@ -36,19 +43,14 @@ class K8sIntegration(Worker):
|
||||
|
||||
K8S_DEFAULT_NAMESPACE = "clearml"
|
||||
AGENT_LABEL = "CLEARML=agent"
|
||||
LIMIT_POD_LABEL = "ai.allegro.agent.serial=pod-{pod_number}"
|
||||
|
||||
KUBECTL_APPLY_CMD = "kubectl apply --namespace={namespace} -f"
|
||||
|
||||
KUBECTL_RUN_CMD = "kubectl run clearml-id-{task_id} " \
|
||||
"--image {docker_image} {docker_args} " \
|
||||
"--restart=Never " \
|
||||
"--namespace={namespace}"
|
||||
|
||||
KUBECTL_DELETE_CMD = "kubectl delete pods " \
|
||||
"--selector={selector} " \
|
||||
"-l={agent_label} " \
|
||||
"--field-selector=status.phase!=Pending,status.phase!=Running " \
|
||||
"--namespace={namespace}"
|
||||
"--namespace={namespace} " \
|
||||
"--output name"
|
||||
|
||||
BASH_INSTALL_SSH_CMD = [
|
||||
"apt-get update",
|
||||
@@ -65,6 +67,9 @@ class K8sIntegration(Worker):
|
||||
'echo "ldconfig" >> /etc/profile',
|
||||
"/usr/sbin/sshd -p {port}"]
|
||||
|
||||
DEFAULT_EXECUTION_AGENT_ARGS = os.getenv("K8S_GLUE_DEF_EXEC_AGENT_ARGS", "--full-monitoring --require-queue")
|
||||
POD_AGENT_INSTALL_ARGS = os.getenv("K8S_GLUE_POD_AGENT_INSTALL_ARGS", "")
|
||||
|
||||
CONTAINER_BASH_SCRIPT = [
|
||||
"export DEBIAN_FRONTEND='noninteractive'",
|
||||
"echo 'Binary::apt::APT::Keep-Downloaded-Packages \"true\";' > /etc/apt/apt.conf.d/docker-clean",
|
||||
@@ -77,17 +82,19 @@ class K8sIntegration(Worker):
|
||||
"[ ! -z $LOCAL_PYTHON ] || apt-get install -y python3-pip",
|
||||
"[ ! -z $LOCAL_PYTHON ] || export LOCAL_PYTHON=python3",
|
||||
"{extra_bash_init_cmd}",
|
||||
"$LOCAL_PYTHON -m pip install clearml-agent",
|
||||
"$LOCAL_PYTHON -m pip install clearml-agent{agent_install_args}",
|
||||
"{extra_docker_bash_script}",
|
||||
"$LOCAL_PYTHON -m clearml_agent execute --full-monitoring --require-queue --id {task_id}"
|
||||
"$LOCAL_PYTHON -m clearml_agent execute {default_execution_agent_args} --id {task_id}"
|
||||
]
|
||||
|
||||
DEFAULT_POD_NAME_PREFIX = "clearml-id-"
|
||||
DEFAULT_LIMIT_POD_LABEL = "ai.allegro.agent.serial=pod-{pod_number}"
|
||||
|
||||
_edit_hyperparams_version = "2.9"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
k8s_pending_queue_name=None,
|
||||
kubectl_cmd=None,
|
||||
container_bash_script=None,
|
||||
debug=False,
|
||||
ports_mode=False,
|
||||
@@ -100,15 +107,14 @@ class K8sIntegration(Worker):
|
||||
extra_bash_init_script=None,
|
||||
namespace=None,
|
||||
max_pods_limit=None,
|
||||
pod_name_prefix=None,
|
||||
limit_pod_label=None,
|
||||
**kwargs
|
||||
):
|
||||
"""
|
||||
Initialize the k8s integration glue layer daemon
|
||||
|
||||
:param str k8s_pending_queue_name: queue name to use when task is pending in the k8s scheduler
|
||||
:param str|callable kubectl_cmd: kubectl command line str, supports formatting (default: KUBECTL_RUN_CMD)
|
||||
example: "task={task_id} image={docker_image} queue_id={queue_id}"
|
||||
or a callable function: kubectl_cmd(task_id, docker_image, docker_args, queue_id, task_data)
|
||||
:param str container_bash_script: container bash script to be executed in k8s (default: CONTAINER_BASH_SCRIPT)
|
||||
Notice this string will use format() call, if you have curly brackets they should be doubled { -> {{
|
||||
Format arguments passed: {task_id} and {extra_bash_init_cmd}
|
||||
@@ -130,12 +136,16 @@ class K8sIntegration(Worker):
|
||||
:param int max_pods_limit: Maximum number of pods that K8S glue can run at the same time
|
||||
"""
|
||||
super(K8sIntegration, self).__init__()
|
||||
self.pod_name_prefix = pod_name_prefix or self.DEFAULT_POD_NAME_PREFIX
|
||||
self.limit_pod_label = limit_pod_label or self.DEFAULT_LIMIT_POD_LABEL
|
||||
self.k8s_pending_queue_name = k8s_pending_queue_name or self.K8S_PENDING_QUEUE
|
||||
self.k8s_pending_queue_id = None
|
||||
self.kubectl_cmd = kubectl_cmd or self.KUBECTL_RUN_CMD
|
||||
self.container_bash_script = container_bash_script or self.CONTAINER_BASH_SCRIPT
|
||||
# Always do system packages, because by we will be running inside a docker
|
||||
self._session.config.put("agent.package_manager.system_site_packages", True)
|
||||
force_system_packages = ENV_FORCE_SYSTEM_SITE_PACKAGES.get()
|
||||
self._force_system_site_packages = force_system_packages if force_system_packages is not None else True
|
||||
if self._force_system_site_packages:
|
||||
# Use system packages, because by we will be running inside a docker
|
||||
self._session.config.put("agent.package_manager.system_site_packages", True)
|
||||
# Add debug logging
|
||||
if debug:
|
||||
self.log.logger.disabled = False
|
||||
@@ -156,27 +166,9 @@ class K8sIntegration(Worker):
|
||||
self.pod_limits = []
|
||||
self.pod_requests = []
|
||||
self.max_pods_limit = max_pods_limit if not self.ports_mode else None
|
||||
if overrides_yaml:
|
||||
overrides = self._load_template_file(overrides_yaml)
|
||||
if overrides:
|
||||
containers = overrides.get('spec', {}).get('containers', [])
|
||||
for c in containers:
|
||||
resources = {str(k).lower(): v for k, v in c.get('resources', {}).items()}
|
||||
if not resources:
|
||||
continue
|
||||
if resources.get('limits'):
|
||||
self.pod_limits += ['{}={}'.format(k, v) for k, v in resources['limits'].items()]
|
||||
if resources.get('requests'):
|
||||
self.pod_requests += ['{}={}'.format(k, v) for k, v in resources['requests'].items()]
|
||||
# remove double entries
|
||||
self.pod_limits = list(set(self.pod_limits))
|
||||
self.pod_requests = list(set(self.pod_requests))
|
||||
if self.pod_limits or self.pod_requests:
|
||||
self.log.warning('Found pod container requests={} limits={}'.format(
|
||||
self.pod_limits, self.pod_requests))
|
||||
if containers:
|
||||
self.log.warning('Removing containers section: {}'.format(overrides['spec'].pop('containers')))
|
||||
self.overrides_json_string = json.dumps(overrides)
|
||||
|
||||
self._load_overrides_yaml(overrides_yaml)
|
||||
|
||||
if template_yaml:
|
||||
self.template_dict = self._load_template_file(template_yaml)
|
||||
|
||||
@@ -190,6 +182,34 @@ class K8sIntegration(Worker):
|
||||
|
||||
self._monitor_hanging_pods()
|
||||
|
||||
self._min_cleanup_interval_per_ns_sec = 1.0
|
||||
self._last_pod_cleanup_per_ns = defaultdict(lambda: 0.)
|
||||
|
||||
def _load_overrides_yaml(self, overrides_yaml):
|
||||
if not overrides_yaml:
|
||||
return
|
||||
overrides = self._load_template_file(overrides_yaml)
|
||||
if not overrides:
|
||||
return
|
||||
containers = overrides.get('spec', {}).get('containers', [])
|
||||
for c in containers:
|
||||
resources = {str(k).lower(): v for k, v in c.get('resources', {}).items()}
|
||||
if not resources:
|
||||
continue
|
||||
if resources.get('limits'):
|
||||
self.pod_limits += ['{}={}'.format(k, v) for k, v in resources['limits'].items()]
|
||||
if resources.get('requests'):
|
||||
self.pod_requests += ['{}={}'.format(k, v) for k, v in resources['requests'].items()]
|
||||
# remove double entries
|
||||
self.pod_limits = list(set(self.pod_limits))
|
||||
self.pod_requests = list(set(self.pod_requests))
|
||||
if self.pod_limits or self.pod_requests:
|
||||
self.log.warning('Found pod container requests={} limits={}'.format(
|
||||
self.pod_limits, self.pod_requests))
|
||||
if containers:
|
||||
self.log.warning('Removing containers section: {}'.format(overrides['spec'].pop('containers')))
|
||||
self.overrides_json_string = json.dumps(overrides)
|
||||
|
||||
def _monitor_hanging_pods(self):
|
||||
_check_pod_thread = Thread(target=self._monitor_hanging_pods_daemon)
|
||||
_check_pod_thread.daemon = True
|
||||
@@ -209,16 +229,22 @@ class K8sIntegration(Worker):
|
||||
except (IndexError, KeyError):
|
||||
return default
|
||||
|
||||
def _get_kubectl_options(self, command, extra_labels=None):
|
||||
labels = [self._get_agent_label()] + (list(extra_labels) if extra_labels else [])
|
||||
return {
|
||||
def _get_kubectl_options(self, command, extra_labels=None, filters=None, output="json", labels=None):
|
||||
# type: (str, Iterable[str], Iterable[str], str, Iterable[str]) -> Dict
|
||||
if not labels:
|
||||
labels = [self._get_agent_label()]
|
||||
labels = list(labels) + (list(extra_labels) if extra_labels else [])
|
||||
d = {
|
||||
"-l": ",".join(labels),
|
||||
"-n": str(self.namespace),
|
||||
"-o": "json"
|
||||
"-o": output,
|
||||
}
|
||||
if filters:
|
||||
d["--field-selector"] = ",".join(filters)
|
||||
return d
|
||||
|
||||
def get_kubectl_command(self, command, extra_labels=None):
|
||||
opts = self._get_kubectl_options(command, extra_labels)
|
||||
def get_kubectl_command(self, command, output="json", **args):
|
||||
opts = self._get_kubectl_options(command, output=output, **args)
|
||||
return 'kubectl {command} {opts}'.format(
|
||||
command=command, opts=" ".join(x for item in opts.items() for x in item)
|
||||
)
|
||||
@@ -227,10 +253,9 @@ class K8sIntegration(Worker):
|
||||
last_tasks_msgs = {} # last msg updated for every task
|
||||
|
||||
while True:
|
||||
kubectl_cmd = self.get_kubectl_command("get pods")
|
||||
kubectl_cmd = self.get_kubectl_command("get pods", filters=["status.phase=Pending"])
|
||||
self.log.debug("Detecting hanging pods: {}".format(kubectl_cmd))
|
||||
output = get_bash_output(kubectl_cmd)
|
||||
output = '' if not output else output if isinstance(output, str) else output.decode('utf-8')
|
||||
output = stringify_bash_output(get_bash_output(kubectl_cmd))
|
||||
try:
|
||||
output_config = json.loads(output)
|
||||
except Exception as ex:
|
||||
@@ -240,9 +265,6 @@ class K8sIntegration(Worker):
|
||||
pods = output_config.get('items', [])
|
||||
task_ids = set()
|
||||
for pod in pods:
|
||||
if self._get_path(pod, 'status', 'phase') != "Pending":
|
||||
continue
|
||||
|
||||
pod_name = pod.get('metadata', {}).get('name', None)
|
||||
if not pod_name:
|
||||
continue
|
||||
@@ -275,8 +297,10 @@ class K8sIntegration(Worker):
|
||||
|
||||
if reason == 'ImagePullBackOff':
|
||||
delete_pod_cmd = 'kubectl delete pods {} -n {}'.format(pod_name, namespace)
|
||||
self.log.debug(" - deleting pod due to ImagePullBackOff: {}".format(delete_pod_cmd))
|
||||
get_bash_output(delete_pod_cmd)
|
||||
try:
|
||||
self.log.debug(" - Detecting hanging pods: {}".format(kubectl_cmd))
|
||||
self._session.api_client.tasks.failed(
|
||||
task=task_id,
|
||||
status_reason="K8S glue error: {}".format(msg),
|
||||
@@ -308,8 +332,8 @@ class K8sIntegration(Worker):
|
||||
last_tasks_msgs[task_id] = msg
|
||||
except Exception as ex:
|
||||
self.log.warning(
|
||||
'K8S Glue pods monitor: Failed setting status message for task "{}"\nEX: {}'.format(
|
||||
task_id, ex
|
||||
'K8S Glue pods monitor: Failed setting status message for task "{}"\nMSG: {}\nEX: {}'.format(
|
||||
task_id, msg, ex
|
||||
)
|
||||
)
|
||||
|
||||
@@ -318,7 +342,8 @@ class K8sIntegration(Worker):
|
||||
|
||||
sleep(self._polling_interval)
|
||||
|
||||
def _set_task_user_properties(self, task_id: str, **properties: str):
|
||||
def _set_task_user_properties(self, task_id: str, task_session=None, **properties: str):
|
||||
session = task_session or self._session
|
||||
if self._edit_hyperparams_support is not True:
|
||||
# either not supported or never tested
|
||||
if self._edit_hyperparams_support == self._session.api_version:
|
||||
@@ -329,7 +354,7 @@ class K8sIntegration(Worker):
|
||||
self._edit_hyperparams_support = self._session.api_version
|
||||
return
|
||||
try:
|
||||
self._session.get(
|
||||
session.get(
|
||||
service="tasks",
|
||||
action="edit_hyper_params",
|
||||
task=task_id,
|
||||
@@ -361,67 +386,93 @@ class K8sIntegration(Worker):
|
||||
return self._agent_label
|
||||
|
||||
def _get_used_pods(self):
|
||||
# type: () -> Tuple[int, Set[str]]
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
kubectl_cmd_new = self.get_kubectl_command("get pods")
|
||||
self.log.debug("Getting used pods: {}".format(kubectl_cmd_new))
|
||||
process = subprocess.Popen(kubectl_cmd_new.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
output, error = process.communicate()
|
||||
output = '' if not output else output if isinstance(output, str) else output.decode('utf-8')
|
||||
error = '' if not error else error if isinstance(error, str) else error.decode('utf-8')
|
||||
kubectl_cmd = self.get_kubectl_command(
|
||||
"get pods",
|
||||
output="jsonpath=\"{range .items[*]}{.metadata.name}{' '}{.metadata.namespace}{'\\n'}{end}\""
|
||||
)
|
||||
self.log.debug("Getting used pods: {}".format(kubectl_cmd))
|
||||
output = stringify_bash_output(get_bash_output(kubectl_cmd, raise_error=True))
|
||||
|
||||
if not output:
|
||||
# No such pod exist so we can use the pod_number we found
|
||||
return 0, {}
|
||||
return 0, set([])
|
||||
|
||||
try:
|
||||
items = json.loads(output).get("items", [])
|
||||
items = output.splitlines()
|
||||
current_pod_count = len(items)
|
||||
namespaces = {item["metadata"]["namespace"] for item in items}
|
||||
namespaces = {item.rpartition(" ")[-1] for item in items}
|
||||
self.log.debug(" - found {} pods in namespaces {}".format(current_pod_count, ", ".join(namespaces)))
|
||||
except (KeyError, ValueError, TypeError, AttributeError) as ex:
|
||||
print("Failed parsing used pods command response for cleanup: {}".format(ex))
|
||||
return -1, {}
|
||||
return -1, set([])
|
||||
|
||||
return current_pod_count, namespaces
|
||||
except Exception as ex:
|
||||
print('Failed obtaining used pods information: {}'.format(ex))
|
||||
return -2, {}
|
||||
return -2, set([])
|
||||
|
||||
def _is_same_tenant(self, task_session):
|
||||
if not task_session or task_session is self._session:
|
||||
return True
|
||||
# noinspection PyStatementEffect
|
||||
try:
|
||||
tenant = self._session.get_decoded_token(self._session.token, verify=False)["tenant"]
|
||||
task_tenant = task_session.get_decoded_token(task_session.token, verify=False)["tenant"]
|
||||
return tenant == task_tenant
|
||||
except Exception as ex:
|
||||
print("ERROR: Failed getting tenant for task session: {}".format(ex))
|
||||
|
||||
def run_one_task(self, queue: Text, task_id: Text, worker_args=None, task_session=None, **_):
|
||||
print('Pulling task {} launching on kubernetes cluster'.format(task_id))
|
||||
task_data = self._session.api_client.tasks.get_all(id=[task_id])[0]
|
||||
session = task_session or self._session
|
||||
task_data = session.api_client.tasks.get_all(id=[task_id])[0]
|
||||
|
||||
# push task into the k8s queue, so we have visibility on pending tasks in the k8s scheduler
|
||||
try:
|
||||
print('Pushing task {} into temporary pending queue'.format(task_id))
|
||||
_ = self._session.api_client.tasks.stop(task_id, force=True)
|
||||
res = self._session.api_client.tasks.enqueue(
|
||||
task_id,
|
||||
queue=self.k8s_pending_queue_id,
|
||||
status_reason='k8s pending scheduler',
|
||||
)
|
||||
if res.meta.result_code != 200:
|
||||
raise Exception(res.meta.result_msg)
|
||||
except Exception as e:
|
||||
self.log.error("ERROR: Could not push back task [{}] to k8s pending queue {} [{}], error: {}".format(
|
||||
task_id, self.k8s_pending_queue_name, self.k8s_pending_queue_id, e))
|
||||
return
|
||||
if self._is_same_tenant(task_session):
|
||||
try:
|
||||
print('Pushing task {} into temporary pending queue'.format(task_id))
|
||||
_ = session.api_client.tasks.stop(task_id, force=True)
|
||||
|
||||
container = get_task_container(self._session, task_id)
|
||||
res = self._session.api_client.tasks.enqueue(
|
||||
task_id,
|
||||
queue=self.k8s_pending_queue_id,
|
||||
status_reason='k8s pending scheduler',
|
||||
)
|
||||
if res.meta.result_code != 200:
|
||||
raise Exception(res.meta.result_msg)
|
||||
except Exception as e:
|
||||
self.log.error("ERROR: Could not push back task [{}] to k8s pending queue {} [{}], error: {}".format(
|
||||
task_id, self.k8s_pending_queue_name, self.k8s_pending_queue_id, e))
|
||||
return
|
||||
|
||||
container = get_task_container(session, task_id)
|
||||
if not container.get('image'):
|
||||
container['image'] = str(
|
||||
ENV_DOCKER_IMAGE.get() or self._session.config.get("agent.default_docker.image", "nvidia/cuda")
|
||||
ENV_DOCKER_IMAGE.get() or session.config.get("agent.default_docker.image", "nvidia/cuda")
|
||||
)
|
||||
container['arguments'] = self._session.config.get("agent.default_docker.arguments", None)
|
||||
container['arguments'] = session.config.get("agent.default_docker.arguments", None)
|
||||
set_task_container(
|
||||
self._session, task_id, docker_image=container['image'], docker_arguments=container['arguments']
|
||||
session, task_id, docker_image=container['image'], docker_arguments=container['arguments']
|
||||
)
|
||||
|
||||
# get the clearml.conf encoded file, make sure we use system packages!
|
||||
|
||||
git_user = ENV_AGENT_GIT_USER.get() or self._session.config.get("agent.git_user", None)
|
||||
git_pass = ENV_AGENT_GIT_PASS.get() or self._session.config.get("agent.git_pass", None)
|
||||
extra_config_values = [
|
||||
'agent.package_manager.system_site_packages: true' if self._force_system_site_packages else '',
|
||||
'agent.git_user: "{}"'.format(git_user) if git_user else '',
|
||||
'agent.git_pass: "{}"'.format(git_pass) if git_pass else '',
|
||||
]
|
||||
|
||||
# noinspection PyProtectedMember
|
||||
config_content = (
|
||||
self.conf_file_content or Path(self._session._config_file).read_text() or ""
|
||||
) + '\nagent.package_manager.system_site_packages=true\n'
|
||||
self.conf_file_content or Path(session._config_file).read_text() or ""
|
||||
) + '\n{}\n'.format('\n'.join(x for x in extra_config_values if x))
|
||||
|
||||
hocon_config_encoded = config_content.encode("ascii")
|
||||
|
||||
create_clearml_conf = ["echo '{}' | base64 --decode >> ~/clearml.conf".format(
|
||||
@@ -454,13 +505,13 @@ class K8sIntegration(Worker):
|
||||
|
||||
kubectl_cmd_new = self.get_kubectl_command(
|
||||
"get pods",
|
||||
extra_labels=[self.LIMIT_POD_LABEL.format(pod_number=pod_number)] if self.ports_mode else None
|
||||
extra_labels=[self.limit_pod_label.format(pod_number=pod_number)] if self.ports_mode else None
|
||||
)
|
||||
self.log.debug("Looking for a free pod/port: {}".format(kubectl_cmd_new))
|
||||
process = subprocess.Popen(kubectl_cmd_new.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
output, error = process.communicate()
|
||||
output = '' if not output else output if isinstance(output, str) else output.decode('utf-8')
|
||||
error = '' if not error else error if isinstance(error, str) else error.decode('utf-8')
|
||||
output = stringify_bash_output(output)
|
||||
error = stringify_bash_output(error)
|
||||
|
||||
try:
|
||||
items_count = len(json.loads(output).get("items", []))
|
||||
@@ -471,8 +522,12 @@ class K8sIntegration(Worker):
|
||||
output, task_id, queue, ex
|
||||
)
|
||||
)
|
||||
self._session.api_client.tasks.stop(task_id, force=True)
|
||||
self._session.api_client.tasks.enqueue(task_id, queue=queue, status_reason='kubectl parsing error')
|
||||
session.api_client.tasks.stop(task_id, force=True)
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
self._session.api_client.tasks.enqueue(task_id, queue=queue, status_reason='kubectl parsing error')
|
||||
except:
|
||||
self.log.warning("Failed enqueuing task to queue '{}'".format(queue))
|
||||
return
|
||||
|
||||
if not items_count:
|
||||
@@ -496,9 +551,14 @@ class K8sIntegration(Worker):
|
||||
task_id, queue
|
||||
)
|
||||
)
|
||||
self._session.api_client.tasks.stop(task_id, force=True)
|
||||
self._session.api_client.tasks.enqueue(
|
||||
task_id, queue=queue, status_reason='k8s max pod limit (no free k8s service)')
|
||||
session.api_client.tasks.stop(task_id, force=True)
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
self._session.api_client.tasks.enqueue(
|
||||
task_id, queue=queue, status_reason='k8s max pod limit (no free k8s service)'
|
||||
)
|
||||
except:
|
||||
self.log.warning("Failed enqueuing task to queue '{}'".format(queue))
|
||||
return
|
||||
elif self.max_pods_limit:
|
||||
# max pods limit hasn't reached yet, so we can create the pod
|
||||
@@ -507,36 +567,38 @@ class K8sIntegration(Worker):
|
||||
|
||||
labels = self._get_pod_labels(queue, queue_name)
|
||||
if self.ports_mode:
|
||||
labels.append(self.LIMIT_POD_LABEL.format(pod_number=pod_number))
|
||||
labels.append(self.limit_pod_label.format(pod_number=pod_number))
|
||||
|
||||
if self.ports_mode:
|
||||
print("Kubernetes scheduling task id={} on pod={} (pod_count={})".format(task_id, pod_number, pod_count))
|
||||
else:
|
||||
print("Kubernetes scheduling task id={}".format(task_id))
|
||||
|
||||
kubectl_kwargs = dict(
|
||||
create_clearml_conf=create_clearml_conf,
|
||||
labels=labels,
|
||||
docker_image=container['image'],
|
||||
docker_args=container['arguments'],
|
||||
docker_bash=container.get('setup_shell_script'),
|
||||
task_id=task_id,
|
||||
queue=queue
|
||||
)
|
||||
|
||||
try:
|
||||
template = self._resolve_template(task_session, task_data, queue)
|
||||
except Exception as ex:
|
||||
print("ERROR: Failed resolving template (skipping): {}".format(ex))
|
||||
return
|
||||
|
||||
if template:
|
||||
output, error = self._kubectl_apply(template=template, **kubectl_kwargs)
|
||||
else:
|
||||
output, error = self._kubectl_run(task_data=task_data, **kubectl_kwargs)
|
||||
try:
|
||||
namespace = template['metadata']['namespace'] or self.namespace
|
||||
except (KeyError, TypeError, AttributeError):
|
||||
namespace = self.namespace
|
||||
|
||||
if template:
|
||||
output, error = self._kubectl_apply(
|
||||
template=template,
|
||||
pod_number=pod_number,
|
||||
create_clearml_conf=create_clearml_conf,
|
||||
labels=labels,
|
||||
docker_image=container['image'],
|
||||
docker_args=container['arguments'],
|
||||
docker_bash=container.get('setup_shell_script'),
|
||||
task_id=task_id,
|
||||
queue=queue,
|
||||
namespace=namespace,
|
||||
)
|
||||
|
||||
error = '' if not error else (error if isinstance(error, str) else error.decode('utf-8'))
|
||||
output = '' if not output else (output if isinstance(output, str) else output.decode('utf-8'))
|
||||
print('kubectl output:\n{}\n{}'.format(error, output))
|
||||
if error:
|
||||
send_log = "Running kubectl encountered an error: {}".format(error)
|
||||
@@ -564,6 +626,7 @@ class K8sIntegration(Worker):
|
||||
if user_props:
|
||||
self._set_task_user_properties(
|
||||
task_id=task_id,
|
||||
task_session=task_session,
|
||||
**user_props
|
||||
)
|
||||
|
||||
@@ -601,19 +664,22 @@ class K8sIntegration(Worker):
|
||||
return results
|
||||
|
||||
def _kubectl_apply(
|
||||
self, create_clearml_conf, docker_image, docker_args, docker_bash, labels, queue, task_id, template=None
|
||||
self,
|
||||
create_clearml_conf,
|
||||
docker_image,
|
||||
docker_args,
|
||||
docker_bash,
|
||||
labels,
|
||||
queue,
|
||||
task_id,
|
||||
namespace,
|
||||
template=None,
|
||||
pod_number=None
|
||||
):
|
||||
template = template or deepcopy(self.template_dict)
|
||||
|
||||
try:
|
||||
namespace = template['metadata']['namespace'] or self.namespace
|
||||
except (KeyError, TypeError, AttributeError):
|
||||
namespace = self.namespace
|
||||
|
||||
template.setdefault('apiVersion', 'v1')
|
||||
template['kind'] = 'Pod'
|
||||
template.setdefault('metadata', {})
|
||||
name = 'clearml-id-{task_id}'.format(task_id=task_id)
|
||||
name = self.pod_name_prefix + str(task_id)
|
||||
template['metadata']['name'] = name
|
||||
template.setdefault('spec', {})
|
||||
template['spec'].setdefault('containers', [])
|
||||
@@ -641,7 +707,9 @@ class K8sIntegration(Worker):
|
||||
['#!/bin/bash', ] +
|
||||
[line.format(extra_bash_init_cmd=self.extra_bash_init_script or '',
|
||||
task_id=task_id,
|
||||
extra_docker_bash_script=extra_docker_bash_script)
|
||||
extra_docker_bash_script=extra_docker_bash_script,
|
||||
default_execution_agent_args=self.DEFAULT_EXECUTION_AGENT_ARGS,
|
||||
agent_install_args=self.POD_AGENT_INSTALL_ARGS)
|
||||
for line in container_bash_script])
|
||||
|
||||
extra_bash_commands = list(create_clearml_conf or [])
|
||||
@@ -701,57 +769,34 @@ class K8sIntegration(Worker):
|
||||
finally:
|
||||
safe_remove_file(yaml_file)
|
||||
|
||||
return output, error
|
||||
return stringify_bash_output(output), stringify_bash_output(error)
|
||||
|
||||
def _kubectl_run(
|
||||
self, create_clearml_conf, docker_image, docker_args, docker_bash, labels, queue, task_data, task_id
|
||||
):
|
||||
if callable(self.kubectl_cmd):
|
||||
kubectl_cmd = self.kubectl_cmd(task_id, docker_image, docker_args, queue, task_data)
|
||||
else:
|
||||
kubectl_cmd = self.kubectl_cmd.format(
|
||||
task_id=task_id,
|
||||
docker_image=docker_image,
|
||||
docker_args=" ".join(self._get_docker_args(
|
||||
docker_args, flags={"-e", "--env"}, convert=lambda env: '--env={}'.format(env))
|
||||
),
|
||||
queue_id=queue,
|
||||
namespace=self.namespace,
|
||||
)
|
||||
# make sure we provide a list
|
||||
if isinstance(kubectl_cmd, str):
|
||||
kubectl_cmd = kubectl_cmd.split()
|
||||
|
||||
if self.overrides_json_string:
|
||||
kubectl_cmd += ['--overrides=' + self.overrides_json_string]
|
||||
|
||||
if self.pod_limits:
|
||||
kubectl_cmd += ['--limits', ",".join(self.pod_limits)]
|
||||
if self.pod_requests:
|
||||
kubectl_cmd += ['--requests', ",".join(self.pod_requests)]
|
||||
|
||||
if self._docker_force_pull and not any(x.startswith("--image-pull-policy=") for x in kubectl_cmd):
|
||||
kubectl_cmd += ["--image-pull-policy='always'"]
|
||||
|
||||
container_bash_script = [self.container_bash_script] if isinstance(self.container_bash_script, str) \
|
||||
else self.container_bash_script
|
||||
container_bash_script = ' ; '.join(container_bash_script)
|
||||
|
||||
kubectl_cmd += [
|
||||
"--labels=" + ",".join(labels),
|
||||
"--command",
|
||||
"--",
|
||||
"/bin/sh",
|
||||
"-c",
|
||||
"{} ; {}".format(" ; ".join(create_clearml_conf or []), container_bash_script.format(
|
||||
extra_bash_init_cmd=self.extra_bash_init_script or "",
|
||||
extra_docker_bash_script=docker_bash or "",
|
||||
task_id=task_id
|
||||
)),
|
||||
]
|
||||
process = subprocess.Popen(kubectl_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
output, error = process.communicate()
|
||||
return output, error
|
||||
def _cleanup_old_pods(self, namespaces, extra_msg=None):
|
||||
# type: (Iterable[str], Optional[str]) -> Dict[str, List[str]]
|
||||
self.log.debug("Cleaning up pods")
|
||||
deleted_pods = defaultdict(list)
|
||||
for namespace in namespaces:
|
||||
if time() - self._last_pod_cleanup_per_ns[namespace] < self._min_cleanup_interval_per_ns_sec:
|
||||
# Do not try to cleanup the same namespace too quickly
|
||||
continue
|
||||
kubectl_cmd = self.KUBECTL_DELETE_CMD.format(namespace=namespace, agent_label=self._get_agent_label())
|
||||
self.log.debug("Deleting old/failed pods{} for ns {}: {}".format(
|
||||
extra_msg or "", namespace, kubectl_cmd
|
||||
))
|
||||
try:
|
||||
res = get_bash_output(kubectl_cmd, raise_error=True)
|
||||
lines = [
|
||||
line for line in
|
||||
(r.strip().rpartition("/")[-1] for r in res.splitlines())
|
||||
if line.startswith(self.pod_name_prefix)
|
||||
]
|
||||
self.log.debug(" - deleted pod(s) %s", ", ".join(lines))
|
||||
deleted_pods[namespace].extend(lines)
|
||||
except Exception as ex:
|
||||
self.log.error("Failed deleting old/failed pods for ns %s: %s", namespace, str(ex))
|
||||
finally:
|
||||
self._last_pod_cleanup_per_ns[namespace] = time()
|
||||
return deleted_pods
|
||||
|
||||
def run_tasks_loop(self, queues: List[Text], worker_params, **kwargs):
|
||||
"""
|
||||
@@ -780,16 +825,16 @@ class K8sIntegration(Worker):
|
||||
# Get used pods and namespaces
|
||||
current_pods, namespaces = self._get_used_pods()
|
||||
|
||||
# just in case there are no pods, make sure we look at our base namespace
|
||||
namespaces.add(self.namespace)
|
||||
|
||||
# check if have pod limit, then check if we hit it.
|
||||
if self.max_pods_limit:
|
||||
if current_pods >= self.max_pods_limit:
|
||||
print("Maximum pod limit reached {}/{}, sleeping for {:.1f} seconds".format(
|
||||
current_pods, self.max_pods_limit, self._polling_interval))
|
||||
# delete old completed / failed pods
|
||||
for namespace in namespaces:
|
||||
kubectl_cmd = self.KUBECTL_DELETE_CMD.format(namespace=namespace, selector=self._get_agent_label())
|
||||
self.log.debug("Deleting old/failed pods due to pod limit: {}".format(kubectl_cmd))
|
||||
get_bash_output(kubectl_cmd)
|
||||
self._cleanup_old_pods(namespaces, " due to pod limit")
|
||||
# go to sleep
|
||||
sleep(self._polling_interval)
|
||||
continue
|
||||
@@ -797,10 +842,7 @@ class K8sIntegration(Worker):
|
||||
# iterate over queues (priority style, queues[0] is highest)
|
||||
for queue in queues:
|
||||
# delete old completed / failed pods
|
||||
for namespace in namespaces:
|
||||
kubectl_cmd = self.KUBECTL_DELETE_CMD.format(namespace=namespace, selector=self._get_agent_label())
|
||||
self.log.debug("Deleting old/failed pods: {}".format(kubectl_cmd))
|
||||
get_bash_output(kubectl_cmd)
|
||||
self._cleanup_old_pods(namespaces)
|
||||
|
||||
# get next task in queue
|
||||
try:
|
||||
@@ -816,14 +858,6 @@ class K8sIntegration(Worker):
|
||||
except (KeyError, TypeError, AttributeError):
|
||||
print("No tasks in queue {}".format(queue))
|
||||
continue
|
||||
events_service.send_log_events(
|
||||
self.worker_id,
|
||||
task_id=task_id,
|
||||
lines="task {} pulled from {} by worker {}".format(
|
||||
task_id, queue, self.worker_id
|
||||
),
|
||||
level="INFO",
|
||||
)
|
||||
|
||||
task_session = None
|
||||
if self._impersonate_as_task_owner:
|
||||
@@ -843,6 +877,16 @@ class K8sIntegration(Worker):
|
||||
)
|
||||
continue
|
||||
|
||||
events_service.send_log_events(
|
||||
self.worker_id,
|
||||
task_id=task_id,
|
||||
lines="task {} pulled from {} by worker {}".format(
|
||||
task_id, queue, self.worker_id
|
||||
),
|
||||
level="INFO",
|
||||
session=task_session,
|
||||
)
|
||||
|
||||
self.report_monitor(ResourceMonitor.StatusReport(queues=queues, queue=queue, task=task_id))
|
||||
self.run_one_task(queue, task_id, worker_params, task_session)
|
||||
self.report_monitor(ResourceMonitor.StatusReport(queues=self.queues))
|
||||
|
||||
@@ -80,7 +80,12 @@ class PackageManager(object):
|
||||
|
||||
def upgrade_pip(self):
|
||||
result = self._install(
|
||||
select_for_platform(windows='pip{}', linux='pip{}').format(self.get_pip_version()), "--upgrade")
|
||||
*select_for_platform(
|
||||
windows=self.get_pip_versions(),
|
||||
linux=self.get_pip_versions()
|
||||
),
|
||||
"--upgrade"
|
||||
)
|
||||
packages = self.run_with_env(('list',), output=True).splitlines()
|
||||
# p.split is ('pip', 'x.y.z')
|
||||
pip = [p.split() for p in packages if len(p.split()) == 2 and p.split()[0] == 'pip']
|
||||
@@ -157,15 +162,26 @@ class PackageManager(object):
|
||||
def set_pip_version(cls, version):
|
||||
if not version:
|
||||
return
|
||||
version = version.replace(' ', '')
|
||||
if ('=' in version) or ('~' in version) or ('<' in version) or ('>' in version):
|
||||
cls._pip_version = version
|
||||
|
||||
if isinstance(version, (list, tuple)):
|
||||
versions = version
|
||||
else:
|
||||
cls._pip_version = "=="+version
|
||||
versions = [version]
|
||||
|
||||
cls._pip_version = []
|
||||
for version in versions:
|
||||
version = version.strip()
|
||||
if ('=' in version) or ('~' in version) or ('<' in version) or ('>' in version):
|
||||
cls._pip_version.append(version)
|
||||
else:
|
||||
cls._pip_version.append("==" + version)
|
||||
|
||||
@classmethod
|
||||
def get_pip_version(cls):
|
||||
return cls._pip_version or ''
|
||||
def get_pip_versions(cls, pip="pip", wrap=''):
|
||||
return [
|
||||
(wrap + pip + version + wrap)
|
||||
for version in cls._pip_version or [pip]
|
||||
]
|
||||
|
||||
def get_cached_venv(self, requirements, docker_cmd, python_version, cuda_version, destination_folder):
|
||||
# type: (Dict, Optional[Union[dict, str]], Optional[str], Optional[str], Path) -> Optional[Path]
|
||||
|
||||
@@ -135,7 +135,12 @@ class CondaAPI(PackageManager):
|
||||
if self.env_read_only:
|
||||
print('Conda environment in read-only mode, skipping pip upgrade.')
|
||||
return ''
|
||||
return self._install(select_for_platform(windows='pip{}', linux='pip{}').format(self.pip.get_pip_version()))
|
||||
return self._install(
|
||||
*select_for_platform(
|
||||
windows=self.pip.get_pip_versions(),
|
||||
linux=self.pip.get_pip_versions()
|
||||
)
|
||||
)
|
||||
|
||||
def create(self):
|
||||
"""
|
||||
|
||||
@@ -25,7 +25,7 @@ from clearml_agent.helper.base import bash_c, is_windows_platform, select_for_pl
|
||||
PathLike = Union[Text, Path]
|
||||
|
||||
|
||||
def get_bash_output(cmd, strip=False, stderr=subprocess.STDOUT, stdin=False):
|
||||
def get_bash_output(cmd, strip=False, stderr=subprocess.STDOUT, stdin=False, raise_error=False):
|
||||
try:
|
||||
output = (
|
||||
subprocess.check_output(
|
||||
@@ -37,10 +37,16 @@ def get_bash_output(cmd, strip=False, stderr=subprocess.STDOUT, stdin=False):
|
||||
.strip()
|
||||
)
|
||||
except subprocess.CalledProcessError:
|
||||
if raise_error:
|
||||
raise
|
||||
output = None
|
||||
return output if not strip or not output else output.strip()
|
||||
|
||||
|
||||
def stringify_bash_output(value):
|
||||
return '' if not value else (value if isinstance(value, str) else value.decode('utf-8'))
|
||||
|
||||
|
||||
def terminate_process(pid, timeout=10., ignore_zombie=True, include_children=False):
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
|
||||
@@ -106,7 +106,7 @@ class Session(_Session):
|
||||
if os.path.exists(os.path.expanduser(os.path.expandvars(f))):
|
||||
self._config_file = f
|
||||
break
|
||||
self.api_client = APIClient(session=self, api_version="2.5")
|
||||
self._api_client = None
|
||||
# HACK make sure we have python version to execute,
|
||||
# if nothing was specific, use the one that runs us
|
||||
def_python = ConfigValue(self.config, "agent.default_python")
|
||||
@@ -133,7 +133,7 @@ class Session(_Session):
|
||||
# override with environment variables
|
||||
# cuda_version & cudnn_version are overridden with os.environ here, and normalized in the next section
|
||||
for config_key, env_config in ENVIRONMENT_CONFIG.items():
|
||||
# check if the propery is of a list:
|
||||
# check if the property is of a list:
|
||||
if config_key.endswith('.0'):
|
||||
if all(not i.get() for i in env_config.values()):
|
||||
continue
|
||||
@@ -167,6 +167,16 @@ class Session(_Session):
|
||||
if not kwargs.get('only_load_config'):
|
||||
self.create_cache_folders()
|
||||
|
||||
@property
|
||||
def api_client(self):
|
||||
if self._api_client is None:
|
||||
self._api_client = APIClient(session=self, api_version="2.5")
|
||||
return self._api_client
|
||||
|
||||
@api_client.setter
|
||||
def api_client(self, value):
|
||||
self._api_client = value
|
||||
|
||||
@staticmethod
|
||||
def get_logger(name):
|
||||
logger = logging.getLogger(name)
|
||||
|
||||
@@ -1 +1 @@
|
||||
__version__ = '1.5.0rc0'
|
||||
__version__ = '1.5.1'
|
||||
|
||||
@@ -13,6 +13,15 @@ api {
|
||||
}
|
||||
|
||||
agent {
|
||||
# unique name of this worker, if None, created based on hostname:process_id
|
||||
# Override with os environment: CLEARML_WORKER_ID
|
||||
# worker_id: "clearml-agent-machine1:gpu0"
|
||||
worker_id: ""
|
||||
|
||||
# worker name, replaces the hostname when creating a unique name for this worker
|
||||
# Override with os environment: CLEARML_WORKER_NAME
|
||||
# worker_name: "clearml-agent-machine1"
|
||||
worker_name: ""
|
||||
# Set GIT user/pass credentials (if user/pass are set, GIT protocol will be set to https)
|
||||
# leave blank for GIT SSH credentials (set force_git_ssh_protocol=true to force SSH protocol)
|
||||
# **Notice**: GitHub personal token is equivalent to password, you can put it directly into `git_pass`
|
||||
@@ -20,11 +29,11 @@ agent {
|
||||
# https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/creating-a-personal-access-token
|
||||
# https://support.atlassian.com/bitbucket-cloud/docs/app-passwords/
|
||||
# https://docs.gitlab.com/ee/user/profile/personal_access_tokens.html
|
||||
git_user=""
|
||||
git_pass=""
|
||||
# git_user: ""
|
||||
# git_pass: ""
|
||||
# Limit credentials to a single domain, for example: github.com,
|
||||
# all other domains will use public access (no user/pass). Default: always send user/pass for any VCS domain
|
||||
git_host=""
|
||||
# git_host: ""
|
||||
|
||||
# Force GIT protocol to use SSH regardless of the git url (Assumes GIT user/pass are blank)
|
||||
force_git_ssh_protocol: false
|
||||
@@ -33,16 +42,6 @@ agent {
|
||||
# Force a specific SSH username when converting http to ssh links (the default username is 'git')
|
||||
# force_git_ssh_user: git
|
||||
|
||||
# unique name of this worker, if None, created based on hostname:process_id
|
||||
# Overridden with os environment: CLEARML_WORKER_ID
|
||||
# worker_id: "clearml-agent-machine1:gpu0"
|
||||
worker_id: ""
|
||||
|
||||
# worker name, replaces the hostname when creating a unique name for this worker
|
||||
# Overridden with os environment: CLEARML_WORKER_NAME
|
||||
# worker_name: "clearml-agent-machine1"
|
||||
worker_name: ""
|
||||
|
||||
# Set the python version to use when creating the virtual environment and launching the experiment
|
||||
# Example values: "/usr/bin/python3" or "/usr/local/bin/python3.6"
|
||||
# The default is the python executing the clearml_agent
|
||||
@@ -51,6 +50,22 @@ agent {
|
||||
# specific python version and the system supports multiple python the agent will use the requested python version)
|
||||
# ignore_requested_python_version: true
|
||||
|
||||
# Force the root folder of the git repository (instead of the working directory) into the PYHTONPATH
|
||||
# default false, only the working directory will be added to the PYHTONPATH
|
||||
# force_git_root_python_path: false
|
||||
|
||||
# if set, use GIT_ASKPASS to pass user/pass when cloning / fetch repositories
|
||||
# it solves passing user/token to git submodules.
|
||||
# this is a safer way to ensure multiple users using the same repository will
|
||||
# not accidentally leak credentials
|
||||
# Only supported on Linux systems, it will be the default in future releases
|
||||
# enable_git_ask_pass: false
|
||||
|
||||
# in docker mode, if container's entrypoint automatically activated a virtual environment
|
||||
# use the activated virtual environment and install everything there
|
||||
# set to False to disable, and always create a new venv inheriting from the system_site_packages
|
||||
# docker_use_activated_venv: true
|
||||
|
||||
# select python package manager:
|
||||
# currently supported: pip, conda and poetry
|
||||
# if "pip" or "conda" are used, the agent installs the required packages
|
||||
@@ -64,7 +79,7 @@ agent {
|
||||
type: pip,
|
||||
|
||||
# specify pip version to use (examples "<20.2", "==19.3.1", "", empty string will install the latest version)
|
||||
# pip_version: "<21"
|
||||
# pip_version: ["<20.2 ; python_version < '3.10'", "<22.3 ; python_version >= '3.10'"]
|
||||
# specify poetry version to use (examples "<2", "==1.1.1", "", empty string will install the latest version)
|
||||
# poetry_version: "<2",
|
||||
|
||||
@@ -225,7 +240,7 @@ agent {
|
||||
enable_task_env: false
|
||||
|
||||
# CUDA versions used for Conda setup & solving PyTorch wheel packages
|
||||
# it Should be detected automatically. Override with os environment CUDA_VERSION / CUDNN_VERSION
|
||||
# Should be detected automatically. Override with os environment CUDA_VERSION / CUDNN_VERSION
|
||||
# cuda_version: 10.1
|
||||
# cudnn_version: 7.6
|
||||
|
||||
@@ -429,42 +444,46 @@ sdk {
|
||||
|
||||
# Apply top-level environment section from configuration into os.environ
|
||||
apply_environment: true
|
||||
# Top-level environment section is in the form of:
|
||||
# environment {
|
||||
# key: value
|
||||
# ...
|
||||
# }
|
||||
# and is applied to the OS environment as `key=value` for each key/value pair
|
||||
|
||||
# Apply top-level files section from configuration into local file system
|
||||
apply_files: true
|
||||
# Top-level files section allows auto-generating files at designated paths with a predefined contents
|
||||
# and target format. Options include:
|
||||
# contents: the target file's content, typically a string (or any base type int/float/list/dict etc.)
|
||||
# format: a custom format for the contents. Currently supported value is `base64` to automatically decode a
|
||||
# base64-encoded contents string, otherwise ignored
|
||||
# path: the target file's path, may include ~ and inplace env vars
|
||||
# target_format: format used to encode contents before writing into the target file. Supported values are json,
|
||||
# yaml, yml and bytes (in which case the file will be written in binary mode). Default is text mode.
|
||||
# overwrite: overwrite the target file in case it exists. Default is true.
|
||||
#
|
||||
# Example:
|
||||
# files {
|
||||
# myfile1 {
|
||||
# contents: "The quick brown fox jumped over the lazy dog"
|
||||
# path: "/tmp/fox.txt"
|
||||
# }
|
||||
# myjsonfile {
|
||||
# contents: {
|
||||
# some {
|
||||
# nested {
|
||||
# value: [1, 2, 3, 4]
|
||||
# }
|
||||
# }
|
||||
# }
|
||||
# path: "/tmp/test.json"
|
||||
# target_format: json
|
||||
# }
|
||||
# }
|
||||
}
|
||||
|
||||
# Environment section (top-level) is applied to the OS environment as `key=value` for each key/value pair
|
||||
# * enable/disable with `agent.apply_environment` OR `sdk.apply_environment`
|
||||
# Example:
|
||||
#
|
||||
# environment {
|
||||
# key_a: value_a
|
||||
# key_b: value_b
|
||||
# }
|
||||
|
||||
# Files section (top-level) allows auto-generating files at designated paths with
|
||||
# predefined content and target format.
|
||||
# * enable/disable with `agent.apply_files` OR `sdk.apply_files`
|
||||
# Files content options include:
|
||||
# contents: the target file's content, typically a string (or any base type int/float/list/dict etc.)
|
||||
# format: a custom format for the contents. Currently supported value is `base64` to automatically decode a
|
||||
# base64-encoded contents string, otherwise ignored
|
||||
# path: the target file's path, may include ~ and inplace env vars
|
||||
# target_format: format used to encode contents before writing into the target file. Supported values are json,
|
||||
# yaml, yml and bytes (in which case the file will be written in binary mode). Default is text mode.
|
||||
# overwrite: overwrite the target file in case it exists. Default is true.
|
||||
#
|
||||
# Example:
|
||||
# files {
|
||||
# myfile1 {
|
||||
# contents: "The quick brown fox jumped over the lazy dog"
|
||||
# path: "/tmp/fox.txt"
|
||||
# }
|
||||
# myjsonfile {
|
||||
# contents: {
|
||||
# some {
|
||||
# nested {
|
||||
# value: [1, 2, 3, 4]
|
||||
# }
|
||||
# }
|
||||
# }
|
||||
# path: "/tmp/test.json"
|
||||
# target_format: json
|
||||
# }
|
||||
# }
|
||||
|
||||
@@ -1,15 +1,15 @@
|
||||
attrs>=18.0,<20.4.0
|
||||
attrs>=18.0,<23.0.0
|
||||
enum34>=0.9,<1.2.0 ; python_version < '3.6'
|
||||
furl>=2.0.0,<2.2.0
|
||||
jsonschema>=2.6.0,<3.3.0
|
||||
jsonschema>=2.6.0,<5.0.0
|
||||
pathlib2>=2.3.0,<2.4.0
|
||||
psutil>=3.4.2,<5.10.0
|
||||
pyparsing>=2.0.3,<2.5.0
|
||||
pyparsing>=2.0.3,<3.1.0
|
||||
python-dateutil>=2.4.2,<2.9.0
|
||||
pyjwt>=2.4.0,<2.5.0
|
||||
pyjwt>=2.4.0,<2.7.0
|
||||
PyYAML>=3.12,<6.1
|
||||
requests>=2.20.0,<2.29.0
|
||||
six>=1.13.0,<1.16.0
|
||||
six>=1.13.0,<1.17.0
|
||||
typing>=3.6.4,<3.8.0 ; python_version < '3.5'
|
||||
urllib3>=1.21.1,<1.27.0
|
||||
virtualenv>=16,<21
|
||||
|
||||
Reference in New Issue
Block a user