Compare commits

...

23 Commits

Author SHA1 Message Date
allegroai
efa1f71dac Version bump to v1.5.1 2022-12-10 22:18:21 +02:00
allegroai
692cb8cf13 Update six requirements 2022-12-10 22:18:10 +02:00
allegroai
ebdc215632 Remove " from pip commands in venv 2022-12-10 20:58:30 +02:00
allegroai
b2da639582 Add CLEARML_AGENT_FORCE_SYSTEM_SITE_PACKAGES env var (default true) to allow overriding default "system_site_packages: true" behavior when running tasks in containers (docker mode and k8s-glue) 2022-12-10 20:00:46 +02:00
allegroai
71fdb43f10 Version bump to v1.5.1rc0 2022-12-07 22:09:40 +02:00
allegroai
ca2791c65e Fix pip support allowing multiple pip version constraints (by default, one for <PY3.10 and one for >=PY3.10) 2022-12-07 22:09:25 +02:00
allegroai
dd75cedaab Upgrade requirements for attrs, jsonschema, pyparsing and pyjwt 2022-12-07 22:08:15 +02:00
allegroai
669fb1a6e5 Fix using deprecated types validator argument raises an error (deprecated even before jsonschema 3.0.0 and unsupported since 4.0.0) 2022-12-07 22:07:53 +02:00
allegroai
5d517c91b5 Add agent.disable_task_docker_override configuration option to disable docker override specified in executing tasks 2022-12-07 22:07:11 +02:00
allegroai
6be75abc86 Add default output URI selection to "clearml-agent init" 2022-12-07 22:06:10 +02:00
allegroai
4c777fa2ee Version bump to v1.5.0 2022-12-05 16:42:44 +02:00
allegroai
dc5e0033c8 Remove support for kubectl run
Allow customizing pod name prefix and limit pod label
Return deleted pods from cleanup
Some refactoring
2022-12-05 11:40:19 +02:00
allegroai
3dd5973734 Filter by phase when detecting hanging pods
More debug print-outs
Use task session when possible
Push task into k8s scheduler queue only if running from the same tenant
Make sure we pass git_user/pass to the task pod
Fix cleanup command not issued when no pods exist in a multi-queue setup
2022-12-05 11:29:59 +02:00
allegroai
53d379205f Support raise_error in get_bash_output() 2022-12-05 11:26:40 +02:00
allegroai
57cde21c48 Send task.ping for executing tasks every 120 seconds (set using the agent.task_ping_interval_sec configuration option) 2022-12-05 11:22:25 +02:00
allegroai
396abf13b6 Fix get_task_session() may cause an old copy of the APIClient to be used containing a reference to the previous session 2022-12-05 11:20:32 +02:00
allegroai
6e7fb5f331 Fix sending task logs fails when agent is not running in the same tenant 2022-12-05 11:19:14 +02:00
allegroai
1d5c118b70 Fix setting CLEARML_API_DEFAULT_REQ_METHOD raises an error 2022-12-05 11:18:12 +02:00
allegroai
18612aac4d Improve configuration examples 2022-12-05 11:17:27 +02:00
allegroai
76c533a2e8 Fix access to config object 2022-11-11 13:34:17 +02:00
Niels ten Boom
9eee213683 Add option to crash agent on exception using agent.crash_on_exception configuration setting (#123) 2022-11-06 17:15:39 +02:00
allegroai
e4861fc0fb Add missing settings in clearml.conf 2022-11-06 12:36:01 +02:00
allegroai
53ef984065 Update README 2022-11-06 11:53:16 +02:00
17 changed files with 517 additions and 311 deletions

View File

@@ -24,7 +24,7 @@ ML-Ops scheduler & orchestration solution supporting Linux, macOS and Windows**
* Launch-and-Forget service containers
* [Cloud autoscaling](https://clear.ml/docs/latest/docs/guides/services/aws_autoscaler)
* [Customizable cleanup](https://clear.ml/docs/latest/docs/guides/services/cleanup_service)
*
Advanced [pipeline building and execution](https://clear.ml/docs/latest/docs/guides/frameworks/pytorch/notebooks/table/tabular_training_pipeline)
It is a zero configuration fire-and-forget execution agent, providing a full ML/DL cluster solution.
@@ -33,12 +33,12 @@ It is a zero configuration fire-and-forget execution agent, providing a full ML/
1. ClearML Server [self-hosted](https://github.com/allegroai/clearml-server)
or [free tier hosting](https://app.clear.ml)
2. `pip install clearml-agent` ([install](#installing-the-clearml-agent) the ClearML Agent on any CPU/GPU machine:
2. `pip install clearml-agent` ([install](#installing-the-clearml-agent) the ClearML Agent on any GPU machine:
on-premises / cloud / ...)
3. Create a [job](https://github.com/allegroai/clearml/docs/clearml-task.md) or
Add [ClearML](https://github.com/allegroai/clearml) to your code with just 2 lines
4. Change the [parameters](#using-the-clearml-agent) in the UI & schedule for [execution](#using-the-clearml-agent) (or
automate with a [pipelines](#automl-and-orchestration-pipelines-))
automate with an [AutoML pipeline](#automl-and-orchestration-pipelines-))
5. :chart_with_downwards_trend: :chart_with_upwards_trend: :eyes: :beer:
"All the Deep/Machine-Learning DevOps your research needs, and then some... Because ain't nobody got time for that"
@@ -313,31 +313,28 @@ clearml-agent daemon --services-mode --detached --queue services --create-queue
**Note**: It is the user's responsibility to make sure the proper tasks are pushed into the specified queue.
### Orchestration and Pipelines <a name="automl-pipes"></a>
### AutoML and Orchestration Pipelines <a name="automl-pipes"></a>
The ClearML Agent can also be used to orchestrate and automate Pipelines in conjunction with the
The ClearML Agent can also be used to implement AutoML orchestration and Experiment Pipelines in conjunction with the
ClearML package.
Sample automation examples can be found in the
ClearML [pipelines](https://github.com/allegroai/clearml/tree/master/examples/pipeline) / [automation](https://github.com/allegroai/clearml/tree/master/examples/automation) folder.
Sample AutoML & Orchestration examples can be found in the
ClearML [example/automation](https://github.com/allegroai/clearml/tree/master/examples/automation) folder.
HPO examples
AutoML examples
- [Toy Keras training experiment](https://github.com/allegroai/clearml/blob/master/examples/optimization/hyper-parameter-optimization/base_template_keras_simple.py)
- In order to create an experiment-template in the system, this code must be executed once manually
- [Manual Search over the above Keras experiment-template](https://github.com/allegroai/clearml/blob/master/examples/automation/manual_random_param_search_example.py)
- [Random Search over the above Keras experiment-template](https://github.com/allegroai/clearml/blob/master/examples/automation/manual_random_param_search_example.py)
- This example will create multiple copies of the Keras experiment-template, with different hyper-parameter
combinations
- [Optimized Bayesian search over the above Keras experiment-template](https://github.com/allegroai/clearml/blob/master/examples/optimization/hyper-parameter-optimization/hyper_parameter_optimizer.py)
- This example will create multiple copies of the Keras experiment-template, with different hyper-parameter combinations launch them on remote machines, monitor the metric (i.e. loss) decide which one has the best potential and abort the others
Experiment Pipeline examples
- [Build DAG from Tasks](https://github.com/allegroai/clearml/blob/master/examples/pipeline/pipeline_from_tasks.py)
- This example will build a DAG processing flow from existing Tasks and launch them on remote machines
- [Logic Driven Pipeline](https://github.com/allegroai/clearml/blob/master/examples/pipeline/pipeline_from_decorator.py)
- This example will run any component function as a standalone Task on a remote machine, it will auto-parallelize jobs, cache results and automatically serialize data between remote machines.
- [First step experiment](https://github.com/allegroai/clearml/blob/master/examples/automation/task_piping_example.py)
- This example will "process data", and once done, will launch a copy of the 'second step' experiment-template
- [Second step experiment](https://github.com/allegroai/clearml/blob/master/examples/automation/toy_base_task.py)
- In order to create an experiment-template in the system, this code must be executed once manually
### License

View File

@@ -18,6 +18,8 @@
# https://docs.gitlab.com/ee/user/profile/personal_access_tokens.html
# git_user: ""
# git_pass: ""
# Limit credentials to a single domain, for example: github.com,
# all other domains will use public access (no user/pass). Default: always send user/pass for any VCS domain
# git_host: ""
# Force GIT protocol to use SSH regardless of the git url (Assumes GIT user/pass are blank)
@@ -64,7 +66,7 @@
type: pip,
# specify pip version to use (examples "<20.2", "==19.3.1", "", empty string will install the latest version)
pip_version: "<21",
pip_version: ["<20.2 ; python_version < '3.10'", "<22.3 ; python_version >= '3.10'"],
# specify poetry version to use (examples "<2", "==1.1.1", "", empty string will install the latest version)
# poetry_version: "<2",
@@ -75,7 +77,7 @@
force_upgrade: false,
# additional artifact repositories to use when installing python packages
# extra_index_url: ["https://allegroai.jfrog.io/clearmlai/api/pypi/public/simple"]
# extra_index_url: ["https://allegroai.jfrog.io/clearml/api/pypi/public/simple"]
# additional conda channels to use when installing with conda package manager
conda_channels: ["pytorch", "conda-forge", "defaults", ]
@@ -215,8 +217,8 @@
# default is True, report a single \r line in a sequence of consecutive lines, per 5 seconds.
# suppress_carriage_return: true
# cuda versions used for solving pytorch wheel packages
# should be detected automatically. Override with os environment CUDA_VERSION / CUDNN_VERSION
# CUDA versions used for Conda setup & solving PyTorch wheel packages
# Should be detected automatically. Override with os environment CUDA_VERSION / CUDNN_VERSION
# cuda_version: 10.1
# cudnn_version: 7.6
@@ -252,9 +254,9 @@
# Name docker containers created by the daemon using the following string format (supported from Docker 0.6.5)
# Allowed variables are task_id, worker_id and rand_string (random lower-case letters string, up to 32 characters)
# Note: resulting name must start with an alphanumeric character and continue with alphanumeric characters,
# underscores (_), dots (.) and/or dashes (-)
#docker_container_name_format: "clearml-id-{task_id}-{rand_string:.8}"
# Note: resulting name must start with an alphanumeric character and
# continue with alphanumeric characters, underscores (_), dots (.) and/or dashes (-)
# docker_container_name_format: "clearml-id-{task_id}-{rand_string:.8}"
# Apply top-level environment section from configuration into os.environ
apply_environment: true
@@ -325,4 +327,14 @@
# into the file specified in CLEARML_CUSTOM_BUILD_OUTPUT, the agent will emit a warning and continue with the
# standard flow.
custom_build_script: ""
# Crash on exception: by default when encountering an exception while running a task,
# the agent will catch the exception, log it and continue running.
# Set this to `true` to propagate exceptions and crash the agent.
# crash_on_exception: true
# Disable task docker override. If true, the agent will use the default docker image and ignore any docker image
# and arguments specified in the task's container section (setup shell script from the task container section will
# be used in any case, of specified).
disable_task_docker_override: false
}

View File

@@ -66,11 +66,16 @@ class DataModel(object):
}
def validate(self, schema=None):
jsonschema.validate(
self.to_dict(),
schema or self._schema,
types=dict(array=(list, tuple), integer=six.integer_types),
schema = schema or self._schema
validator = jsonschema.validators.validator_for(schema)
validator_cls = jsonschema.validators.extend(
validator=validator,
type_checker=validator.TYPE_CHECKER.redefine_many({
"array": lambda s, instance: isinstance(instance, (list, tuple)),
"integer": lambda s, instance: isinstance(instance, six.integer_types),
}),
)
jsonschema.validate(self.to_dict(), schema, cls=validator_cls)
def __repr__(self):
return '<{}.{}: {}>'.format(

View File

@@ -114,7 +114,7 @@ class Session(TokenManager):
if ENV_API_DEFAULT_REQ_METHOD.get(default=None):
# Make sure we update the config object, so we pass it into the new containers when we map them
self.config["api.http.default_method"] = ENV_API_DEFAULT_REQ_METHOD.get()
self.config.put("api.http.default_method", ENV_API_DEFAULT_REQ_METHOD.get())
# notice the default setting of Request.def_method are already set by the OS environment
elif self.config.get("api.http.default_method", None):
def_method = str(self.config.get("api.http.default_method", None)).strip()

View File

@@ -294,6 +294,9 @@ class Config(object):
)
return value
def put(self, key, value):
self._config.put(key, value)
def to_dict(self):
return self._config.as_plain_ordered_dict()

View File

@@ -14,6 +14,14 @@ except ImportError:
ConverterType = TypeVar("ConverterType", bound=Callable[[Any], Any])
def text_to_int(value, default=0):
# type: (Any, int) -> int
try:
return int(value)
except (ValueError, TypeError):
return default
def base64_to_text(value):
# type: (Any) -> Text
return base64.b64decode(value).decode("utf-8")

View File

@@ -1,14 +1,15 @@
from __future__ import print_function
from six.moves import input
from typing import Dict, Optional
from pathlib2 import Path
from six.moves import input
from six.moves.urllib.parse import urlparse
from clearml_agent.external.pyhocon import ConfigFactory, ConfigMissingException
from clearml_agent.backend_api.session import Session
from clearml_agent.backend_api.session.defs import ENV_HOST
from clearml_agent.backend_config.defs import LOCAL_CONFIG_FILES
from clearml_agent.external.pyhocon import ConfigFactory, ConfigMissingException
description = """
Please create new clearml credentials through the settings page in your `clearml-server` web app,
@@ -112,6 +113,21 @@ def main():
print('Exiting setup without creating configuration file')
return
selection = input_options(
'Default Output URI (used to automatically store models and artifacts)',
{'N': 'None', 'S': 'ClearML Server', 'C': 'Custom'},
default='None'
)
if selection == 'Custom':
print('Custom Default Output URI: ', end='')
default_output_uri = input().strip()
elif selection == "ClearML Server":
default_output_uri = files_host
else:
default_output_uri = None
print('\nDefault Output URI: {}'.format(default_output_uri if default_output_uri else 'not set'))
# get GIT User/Pass for cloning
print('Enter git username for repository cloning (leave blank for SSH key authentication): [] ', end='')
git_user = input()
@@ -179,6 +195,13 @@ def main():
'agent.package_manager.extra_index_url= ' \
'[\n{}\n]\n\n'.format("\n".join(map("\"{}\"".format, extra_index_urls)))
f.write(extra_index_str)
if default_output_uri:
default_output_url_str = '# Default Task output_uri. if output_uri is not provided to Task.init, ' \
'default_output_uri will be used instead.\n' \
'sdk.development.default_output_uri="{}"\n' \
'\n'.format(default_output_uri.strip('"'))
f.write(default_output_url_str)
default_conf = default_conf.replace('default_output_uri: ""', '# default_output_uri: ""')
f.write(default_conf)
except Exception:
print('Error! Could not write configuration file at: {}'.format(str(conf_file)))
@@ -305,6 +328,25 @@ def input_url(host_type, host=None):
return host
def input_options(message, options, default=None):
# type: (str, Dict[str, str], Optional[str]) -> str
options_msg = "/".join(
"".join(('(' + c.upper() + ')') if c == o else c for c in option)
for o, option in options.items()
)
if default:
options_msg += " [{}]".format(default)
while True:
print('{}: {} '.format(message, options_msg), end='')
res = input().strip()
if not res:
return default
elif res.lower() in options:
return options[res.lower()]
elif res.upper() in options:
return options[res.upper()]
def input_host_port(host_type, parsed_host):
print('Enter port for {} host '.format(host_type), end='')
replace_port = input().lower()

View File

@@ -41,6 +41,7 @@ from clearml_agent.backend_api.session.defs import (
ENV_VENV_CONFIGURED, ENV_PROPAGATE_EXITCODE, )
from clearml_agent.backend_config.defs import UptimeConf
from clearml_agent.backend_config.utils import apply_environment, apply_files
from clearml_agent.backend_config.converters import text_to_int
from clearml_agent.commands.base import resolve_names, ServiceCommandSection
from clearml_agent.commands.resolver import resolve_default_container
from clearml_agent.definitions import (
@@ -56,10 +57,7 @@ from clearml_agent.definitions import (
ENV_WORKER_ID,
ENV_WORKER_TAGS,
ENV_DOCKER_SKIP_GPUS_FLAG,
ENV_AGENT_SECRET_KEY,
ENV_AGENT_AUTH_TOKEN,
ENV_AWS_SECRET_KEY,
ENV_AZURE_ACCOUNT_KEY,
ENV_AGENT_DISABLE_SSH_MOUNT,
ENV_SSH_AUTH_SOCK,
ENV_AGENT_SKIP_PIP_VENV_INSTALL,
@@ -70,6 +68,7 @@ from clearml_agent.definitions import (
ENV_DEBUG_INFO,
ENV_CHILD_AGENTS_COUNT_CMD,
ENV_DOCKER_ARGS_FILTERS,
ENV_FORCE_SYSTEM_SITE_PACKAGES,
)
from clearml_agent.definitions import WORKING_REPOSITORY_DIR, PIP_EXTRA_INDICES
from clearml_agent.errors import (
@@ -686,6 +685,10 @@ class Worker(ServiceCommandSection):
else:
self._docker_args_filters = []
self._task_ping_interval_sec = max(
0, text_to_int(self._session.config.get("agent.task_ping_interval_sec", 120.0))
)
@classmethod
def _verify_command_states(cls, kwargs):
"""
@@ -732,7 +735,7 @@ class Worker(ServiceCommandSection):
pass
def run_one_task(self, queue, task_id, worker_args, docker=None, task_session=None):
# type: (Text, Text, WorkerParams, Optional[Text]) -> int
# type: (Text, Text, WorkerParams, Optional[Text], Optional[Session]) -> int
"""
Run one task pulled from queue.
:param queue: ID of queue that task was pulled from
@@ -777,10 +780,18 @@ class Worker(ServiceCommandSection):
except Exception:
task_container = {}
default_docker = not bool(task_container.get('image'))
docker_image = task_container.get('image') or self._docker_image
docker_arguments = task_container.get(
'arguments', self._docker_arguments if default_docker else None)
default_docker = (
self._session.config.get('agent.disable_task_docker_override', False)
or not bool(task_container.get('image'))
)
if default_docker:
docker_image = self._docker_image
docker_arguments = self._docker_arguments
else:
docker_image = task_container.get('image') or self._docker_image
docker_arguments = task_container.get(
'arguments', self._docker_arguments if default_docker else None)
docker_setup_script = task_container.get('setup_shell_script')
self.send_logs(
@@ -958,6 +969,7 @@ class Worker(ServiceCommandSection):
if not (result.ok() and result.response):
return
new_session = copy(session)
new_session.api_client = None
new_session.set_auth_token(result.response.token)
return new_session
@@ -1166,7 +1178,7 @@ class Worker(ServiceCommandSection):
print("No tasks in Queues, sleeping for {:.1f} seconds".format(self._polling_interval))
sleep(self._polling_interval)
if self._session.config["agent.reload_config"]:
if self._session.config.get("agent.reload_config", False):
self.reload_config()
finally:
# if we are in dynamic gpus mode, shutdown all active runs
@@ -1197,7 +1209,7 @@ class Worker(ServiceCommandSection):
except Exception:
return None
worker_name = self._session.config["agent.worker_name"] + ':gpu'
worker_name = self._session.config.get("agent.worker_name", "") + ':gpu'
our_workers = [
w.id for w in response.workers
if w.id.startswith(worker_name) and w.id != self.worker_id]
@@ -1548,10 +1560,14 @@ class Worker(ServiceCommandSection):
gpu_indexes=gpu_indexes,
gpu_queues=dynamic_gpus,
)
except Exception:
except Exception as e:
tb = six.text_type(traceback.format_exc())
print("FATAL ERROR:")
print(tb)
if self._session.config.get("agent.crash_on_exception", False):
raise e
crash_file, name = safe_mkstemp(prefix=".clearml_agent-crash", suffix=".log")
try:
with crash_file:
@@ -1657,7 +1673,9 @@ class Worker(ServiceCommandSection):
# noinspection PyBroadException
try:
config_data = self._session.config.as_plain_ordered_dict() if config is None else config.as_plain_ordered_dict()
config_data = (
self._session.config.as_plain_ordered_dict() if config is None else config.as_plain_ordered_dict()
)
if clean_api_credentials:
api = config_data.get("api")
if api:
@@ -1730,6 +1748,7 @@ class Worker(ServiceCommandSection):
stopping = False
status = None
process = None
last_task_ping = 0
try:
_last_machine_update_ts = time()
stop_reason = None
@@ -1765,6 +1784,17 @@ class Worker(ServiceCommandSection):
if stderr:
stderr.flush()
if self._task_ping_interval_sec and time() - last_task_ping > self._task_ping_interval_sec:
# noinspection PyBroadException
try:
res = (session or self._session).send(tasks_api.PingRequest(task=task_id))
if not res:
self.log.error("Failed sending ping for task %s: %s", task_id, res.response)
except Exception as ex:
self.log.error("Failed sending ping: %s", str(ex))
finally:
self._task_ping_interval_sec = time()
# get diff from previous poll
printed_lines, stdout_pos_count = _print_file(stdout_path, stdout_pos_count)
if self._services_mode and not stopping and status is None:
@@ -2055,7 +2085,10 @@ class Worker(ServiceCommandSection):
# noinspection PyBroadException
try:
task_container = get_task_container(self._session, task_id)
if task_container.get('image'):
if (
task_container.get('image')
and not self._session.config.get('agent.disable_task_docker_override', False)
):
docker_image = task_container.get('image')
print('Ignoring default docker image, using task docker image {}'.format(docker_image))
docker_arguments = task_container.get('arguments')
@@ -3423,7 +3456,6 @@ class Worker(ServiceCommandSection):
temp_config.put("sdk.storage.cache.default_base_dir", mounted_cache_dir)
temp_config.put("agent.pip_download_cache.path", mounted_pip_dl_dir)
temp_config.put("agent.vcs_cache.path", mounted_vcs_cache)
temp_config.put("agent.package_manager.system_site_packages", True)
temp_config.put("agent.package_manager.conda_env_as_base_docker", False)
temp_config.put("agent.default_python", "")
temp_config.put("agent.python_binary", "")
@@ -3435,6 +3467,11 @@ class Worker(ServiceCommandSection):
temp_config.put("agent.git_pass", (ENV_AGENT_GIT_PASS.get() or
self._session.config.get("agent.git_pass", None)))
force_system_site_packages = ENV_FORCE_SYSTEM_SITE_PACKAGES.get()
force_system_site_packages = force_system_site_packages if force_system_site_packages is not None else True
if force_system_site_packages:
temp_config.put("agent.package_manager.system_site_packages", True)
if temp_config.get("agent.venvs_cache.path", None):
temp_config.put("agent.venvs_cache.path", '/root/.clearml/venvs-cache')
@@ -3792,7 +3829,6 @@ class Worker(ServiceCommandSection):
except:
pass
agent_install_bash_script = []
if os.environ.get('FORCE_LOCAL_CLEARML_AGENT_WHEEL'):
local_wheel = os.path.expanduser(os.environ.get('FORCE_LOCAL_CLEARML_AGENT_WHEEL'))
docker_wheel = '/tmp/{}'.format(basename(local_wheel))
@@ -3833,9 +3869,6 @@ class Worker(ServiceCommandSection):
if preprocess_bash_script:
bash_script = preprocess_bash_script + bash_script
if agent_install_bash_script:
bash_script += agent_install_bash_script
docker_bash_script = " ; ".join([line for line in bash_script if line]) \
if not isinstance(bash_script, str) else bash_script
@@ -3844,10 +3877,10 @@ class Worker(ServiceCommandSection):
update_scheme += (
docker_bash_script + " ; " +
"[ ! -z $LOCAL_PYTHON ] || export LOCAL_PYTHON={python} ; " +
"$LOCAL_PYTHON -m pip install -U \"pip{pip_version}\" ; " +
"$LOCAL_PYTHON -m pip install -U {pip_version} ; " +
"$LOCAL_PYTHON -m pip install -U {clearml_agent_wheel} ; ").format(
python_single_digit=python_version.split('.')[0],
python=python_version, pip_version=PackageManager.get_pip_version(),
python=python_version, pip_version=" ".join(PackageManager.get_pip_versions(wrap='\"')),
clearml_agent_wheel=clearml_agent_wheel,
mount_ssh_ro=mount_ssh_ro, mount_ssh=mount_ssh,
)

View File

@@ -87,6 +87,7 @@ ENVIRONMENT_CONFIG = {
"agent.cpu_only": EnvironmentConfig(
names=("CLEARML_CPU_ONLY", "TRAINS_CPU_ONLY", "CPU_ONLY"), type=bool
),
"agent.crash_on_exception": EnvironmentConfig("CLEAMRL_AGENT_CRASH_ON_EXCEPTION", type=bool),
"sdk.aws.s3.key": EnvironmentConfig("AWS_ACCESS_KEY_ID"),
"sdk.aws.s3.secret": ENV_AWS_SECRET_KEY,
"sdk.aws.s3.region": EnvironmentConfig("AWS_DEFAULT_REGION"),
@@ -153,6 +154,11 @@ ENV_CHILD_AGENTS_COUNT_CMD = EnvironmentConfig('CLEARML_AGENT_CHILD_AGENTS_COUNT
ENV_DOCKER_ARGS_FILTERS = EnvironmentConfig('CLEARML_AGENT_DOCKER_ARGS_FILTERS')
ENV_DOCKER_ARGS_HIDE_ENV = EnvironmentConfig('CLEARML_AGENT_DOCKER_ARGS_HIDE_ENV')
ENV_FORCE_SYSTEM_SITE_PACKAGES = EnvironmentConfig('CLEARML_AGENT_FORCE_SYSTEM_SITE_PACKAGES', type=bool)
""" Force system_site_packages: true when running tasks in containers (i.e. docker mode or k8s glue) """
ENV_CUSTOM_BUILD_SCRIPT = EnvironmentConfig('CLEARML_AGENT_CUSTOM_BUILD_SCRIPT')
"""
Specifies a custom environment setup script to be executed instead of installing a virtual environment.

View File

@@ -9,26 +9,33 @@ import os
import re
import subprocess
import tempfile
from collections import defaultdict
from copy import deepcopy
from pathlib import Path
from pprint import pformat
from threading import Thread
from time import sleep
from typing import Text, List, Callable, Any, Collection, Optional, Union
from time import sleep, time
from typing import Text, List, Callable, Any, Collection, Optional, Union, Iterable, Dict, Tuple, Set
import yaml
from clearml_agent.backend_api.session import Request
from clearml_agent.commands.events import Events
from clearml_agent.commands.worker import Worker, get_task_container, set_task_container, get_next_task
from clearml_agent.definitions import ENV_DOCKER_IMAGE
from clearml_agent.definitions import (
ENV_DOCKER_IMAGE,
ENV_AGENT_GIT_USER,
ENV_AGENT_GIT_PASS,
ENV_FORCE_SYSTEM_SITE_PACKAGES,
)
from clearml_agent.errors import APIError
from clearml_agent.glue.definitions import ENV_START_AGENT_SCRIPT_PATH
from clearml_agent.helper.base import safe_remove_file
from clearml_agent.helper.dicts import merge_dicts
from clearml_agent.helper.process import get_bash_output
from clearml_agent.helper.process import get_bash_output, stringify_bash_output
from clearml_agent.helper.resource_monitor import ResourceMonitor
from clearml_agent.interface.base import ObjectID
from clearml_agent.backend_api.session import Request
from clearml_agent.glue.definitions import ENV_START_AGENT_SCRIPT_PATH
class K8sIntegration(Worker):
@@ -36,19 +43,14 @@ class K8sIntegration(Worker):
K8S_DEFAULT_NAMESPACE = "clearml"
AGENT_LABEL = "CLEARML=agent"
LIMIT_POD_LABEL = "ai.allegro.agent.serial=pod-{pod_number}"
KUBECTL_APPLY_CMD = "kubectl apply --namespace={namespace} -f"
KUBECTL_RUN_CMD = "kubectl run clearml-id-{task_id} " \
"--image {docker_image} {docker_args} " \
"--restart=Never " \
"--namespace={namespace}"
KUBECTL_DELETE_CMD = "kubectl delete pods " \
"--selector={selector} " \
"-l={agent_label} " \
"--field-selector=status.phase!=Pending,status.phase!=Running " \
"--namespace={namespace}"
"--namespace={namespace} " \
"--output name"
BASH_INSTALL_SSH_CMD = [
"apt-get update",
@@ -65,6 +67,9 @@ class K8sIntegration(Worker):
'echo "ldconfig" >> /etc/profile',
"/usr/sbin/sshd -p {port}"]
DEFAULT_EXECUTION_AGENT_ARGS = os.getenv("K8S_GLUE_DEF_EXEC_AGENT_ARGS", "--full-monitoring --require-queue")
POD_AGENT_INSTALL_ARGS = os.getenv("K8S_GLUE_POD_AGENT_INSTALL_ARGS", "")
CONTAINER_BASH_SCRIPT = [
"export DEBIAN_FRONTEND='noninteractive'",
"echo 'Binary::apt::APT::Keep-Downloaded-Packages \"true\";' > /etc/apt/apt.conf.d/docker-clean",
@@ -77,17 +82,19 @@ class K8sIntegration(Worker):
"[ ! -z $LOCAL_PYTHON ] || apt-get install -y python3-pip",
"[ ! -z $LOCAL_PYTHON ] || export LOCAL_PYTHON=python3",
"{extra_bash_init_cmd}",
"$LOCAL_PYTHON -m pip install clearml-agent",
"$LOCAL_PYTHON -m pip install clearml-agent{agent_install_args}",
"{extra_docker_bash_script}",
"$LOCAL_PYTHON -m clearml_agent execute --full-monitoring --require-queue --id {task_id}"
"$LOCAL_PYTHON -m clearml_agent execute {default_execution_agent_args} --id {task_id}"
]
DEFAULT_POD_NAME_PREFIX = "clearml-id-"
DEFAULT_LIMIT_POD_LABEL = "ai.allegro.agent.serial=pod-{pod_number}"
_edit_hyperparams_version = "2.9"
def __init__(
self,
k8s_pending_queue_name=None,
kubectl_cmd=None,
container_bash_script=None,
debug=False,
ports_mode=False,
@@ -100,15 +107,14 @@ class K8sIntegration(Worker):
extra_bash_init_script=None,
namespace=None,
max_pods_limit=None,
pod_name_prefix=None,
limit_pod_label=None,
**kwargs
):
"""
Initialize the k8s integration glue layer daemon
:param str k8s_pending_queue_name: queue name to use when task is pending in the k8s scheduler
:param str|callable kubectl_cmd: kubectl command line str, supports formatting (default: KUBECTL_RUN_CMD)
example: "task={task_id} image={docker_image} queue_id={queue_id}"
or a callable function: kubectl_cmd(task_id, docker_image, docker_args, queue_id, task_data)
:param str container_bash_script: container bash script to be executed in k8s (default: CONTAINER_BASH_SCRIPT)
Notice this string will use format() call, if you have curly brackets they should be doubled { -> {{
Format arguments passed: {task_id} and {extra_bash_init_cmd}
@@ -130,12 +136,16 @@ class K8sIntegration(Worker):
:param int max_pods_limit: Maximum number of pods that K8S glue can run at the same time
"""
super(K8sIntegration, self).__init__()
self.pod_name_prefix = pod_name_prefix or self.DEFAULT_POD_NAME_PREFIX
self.limit_pod_label = limit_pod_label or self.DEFAULT_LIMIT_POD_LABEL
self.k8s_pending_queue_name = k8s_pending_queue_name or self.K8S_PENDING_QUEUE
self.k8s_pending_queue_id = None
self.kubectl_cmd = kubectl_cmd or self.KUBECTL_RUN_CMD
self.container_bash_script = container_bash_script or self.CONTAINER_BASH_SCRIPT
# Always do system packages, because by we will be running inside a docker
self._session.config.put("agent.package_manager.system_site_packages", True)
force_system_packages = ENV_FORCE_SYSTEM_SITE_PACKAGES.get()
self._force_system_site_packages = force_system_packages if force_system_packages is not None else True
if self._force_system_site_packages:
# Use system packages, because by we will be running inside a docker
self._session.config.put("agent.package_manager.system_site_packages", True)
# Add debug logging
if debug:
self.log.logger.disabled = False
@@ -156,27 +166,9 @@ class K8sIntegration(Worker):
self.pod_limits = []
self.pod_requests = []
self.max_pods_limit = max_pods_limit if not self.ports_mode else None
if overrides_yaml:
overrides = self._load_template_file(overrides_yaml)
if overrides:
containers = overrides.get('spec', {}).get('containers', [])
for c in containers:
resources = {str(k).lower(): v for k, v in c.get('resources', {}).items()}
if not resources:
continue
if resources.get('limits'):
self.pod_limits += ['{}={}'.format(k, v) for k, v in resources['limits'].items()]
if resources.get('requests'):
self.pod_requests += ['{}={}'.format(k, v) for k, v in resources['requests'].items()]
# remove double entries
self.pod_limits = list(set(self.pod_limits))
self.pod_requests = list(set(self.pod_requests))
if self.pod_limits or self.pod_requests:
self.log.warning('Found pod container requests={} limits={}'.format(
self.pod_limits, self.pod_requests))
if containers:
self.log.warning('Removing containers section: {}'.format(overrides['spec'].pop('containers')))
self.overrides_json_string = json.dumps(overrides)
self._load_overrides_yaml(overrides_yaml)
if template_yaml:
self.template_dict = self._load_template_file(template_yaml)
@@ -190,6 +182,34 @@ class K8sIntegration(Worker):
self._monitor_hanging_pods()
self._min_cleanup_interval_per_ns_sec = 1.0
self._last_pod_cleanup_per_ns = defaultdict(lambda: 0.)
def _load_overrides_yaml(self, overrides_yaml):
if not overrides_yaml:
return
overrides = self._load_template_file(overrides_yaml)
if not overrides:
return
containers = overrides.get('spec', {}).get('containers', [])
for c in containers:
resources = {str(k).lower(): v for k, v in c.get('resources', {}).items()}
if not resources:
continue
if resources.get('limits'):
self.pod_limits += ['{}={}'.format(k, v) for k, v in resources['limits'].items()]
if resources.get('requests'):
self.pod_requests += ['{}={}'.format(k, v) for k, v in resources['requests'].items()]
# remove double entries
self.pod_limits = list(set(self.pod_limits))
self.pod_requests = list(set(self.pod_requests))
if self.pod_limits or self.pod_requests:
self.log.warning('Found pod container requests={} limits={}'.format(
self.pod_limits, self.pod_requests))
if containers:
self.log.warning('Removing containers section: {}'.format(overrides['spec'].pop('containers')))
self.overrides_json_string = json.dumps(overrides)
def _monitor_hanging_pods(self):
_check_pod_thread = Thread(target=self._monitor_hanging_pods_daemon)
_check_pod_thread.daemon = True
@@ -209,16 +229,22 @@ class K8sIntegration(Worker):
except (IndexError, KeyError):
return default
def _get_kubectl_options(self, command, extra_labels=None):
labels = [self._get_agent_label()] + (list(extra_labels) if extra_labels else [])
return {
def _get_kubectl_options(self, command, extra_labels=None, filters=None, output="json", labels=None):
# type: (str, Iterable[str], Iterable[str], str, Iterable[str]) -> Dict
if not labels:
labels = [self._get_agent_label()]
labels = list(labels) + (list(extra_labels) if extra_labels else [])
d = {
"-l": ",".join(labels),
"-n": str(self.namespace),
"-o": "json"
"-o": output,
}
if filters:
d["--field-selector"] = ",".join(filters)
return d
def get_kubectl_command(self, command, extra_labels=None):
opts = self._get_kubectl_options(command, extra_labels)
def get_kubectl_command(self, command, output="json", **args):
opts = self._get_kubectl_options(command, output=output, **args)
return 'kubectl {command} {opts}'.format(
command=command, opts=" ".join(x for item in opts.items() for x in item)
)
@@ -227,10 +253,9 @@ class K8sIntegration(Worker):
last_tasks_msgs = {} # last msg updated for every task
while True:
kubectl_cmd = self.get_kubectl_command("get pods")
kubectl_cmd = self.get_kubectl_command("get pods", filters=["status.phase=Pending"])
self.log.debug("Detecting hanging pods: {}".format(kubectl_cmd))
output = get_bash_output(kubectl_cmd)
output = '' if not output else output if isinstance(output, str) else output.decode('utf-8')
output = stringify_bash_output(get_bash_output(kubectl_cmd))
try:
output_config = json.loads(output)
except Exception as ex:
@@ -240,9 +265,6 @@ class K8sIntegration(Worker):
pods = output_config.get('items', [])
task_ids = set()
for pod in pods:
if self._get_path(pod, 'status', 'phase') != "Pending":
continue
pod_name = pod.get('metadata', {}).get('name', None)
if not pod_name:
continue
@@ -275,8 +297,10 @@ class K8sIntegration(Worker):
if reason == 'ImagePullBackOff':
delete_pod_cmd = 'kubectl delete pods {} -n {}'.format(pod_name, namespace)
self.log.debug(" - deleting pod due to ImagePullBackOff: {}".format(delete_pod_cmd))
get_bash_output(delete_pod_cmd)
try:
self.log.debug(" - Detecting hanging pods: {}".format(kubectl_cmd))
self._session.api_client.tasks.failed(
task=task_id,
status_reason="K8S glue error: {}".format(msg),
@@ -308,8 +332,8 @@ class K8sIntegration(Worker):
last_tasks_msgs[task_id] = msg
except Exception as ex:
self.log.warning(
'K8S Glue pods monitor: Failed setting status message for task "{}"\nEX: {}'.format(
task_id, ex
'K8S Glue pods monitor: Failed setting status message for task "{}"\nMSG: {}\nEX: {}'.format(
task_id, msg, ex
)
)
@@ -318,7 +342,8 @@ class K8sIntegration(Worker):
sleep(self._polling_interval)
def _set_task_user_properties(self, task_id: str, **properties: str):
def _set_task_user_properties(self, task_id: str, task_session=None, **properties: str):
session = task_session or self._session
if self._edit_hyperparams_support is not True:
# either not supported or never tested
if self._edit_hyperparams_support == self._session.api_version:
@@ -329,7 +354,7 @@ class K8sIntegration(Worker):
self._edit_hyperparams_support = self._session.api_version
return
try:
self._session.get(
session.get(
service="tasks",
action="edit_hyper_params",
task=task_id,
@@ -361,67 +386,93 @@ class K8sIntegration(Worker):
return self._agent_label
def _get_used_pods(self):
# type: () -> Tuple[int, Set[str]]
# noinspection PyBroadException
try:
kubectl_cmd_new = self.get_kubectl_command("get pods")
self.log.debug("Getting used pods: {}".format(kubectl_cmd_new))
process = subprocess.Popen(kubectl_cmd_new.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate()
output = '' if not output else output if isinstance(output, str) else output.decode('utf-8')
error = '' if not error else error if isinstance(error, str) else error.decode('utf-8')
kubectl_cmd = self.get_kubectl_command(
"get pods",
output="jsonpath=\"{range .items[*]}{.metadata.name}{' '}{.metadata.namespace}{'\\n'}{end}\""
)
self.log.debug("Getting used pods: {}".format(kubectl_cmd))
output = stringify_bash_output(get_bash_output(kubectl_cmd, raise_error=True))
if not output:
# No such pod exist so we can use the pod_number we found
return 0, {}
return 0, set([])
try:
items = json.loads(output).get("items", [])
items = output.splitlines()
current_pod_count = len(items)
namespaces = {item["metadata"]["namespace"] for item in items}
namespaces = {item.rpartition(" ")[-1] for item in items}
self.log.debug(" - found {} pods in namespaces {}".format(current_pod_count, ", ".join(namespaces)))
except (KeyError, ValueError, TypeError, AttributeError) as ex:
print("Failed parsing used pods command response for cleanup: {}".format(ex))
return -1, {}
return -1, set([])
return current_pod_count, namespaces
except Exception as ex:
print('Failed obtaining used pods information: {}'.format(ex))
return -2, {}
return -2, set([])
def _is_same_tenant(self, task_session):
if not task_session or task_session is self._session:
return True
# noinspection PyStatementEffect
try:
tenant = self._session.get_decoded_token(self._session.token, verify=False)["tenant"]
task_tenant = task_session.get_decoded_token(task_session.token, verify=False)["tenant"]
return tenant == task_tenant
except Exception as ex:
print("ERROR: Failed getting tenant for task session: {}".format(ex))
def run_one_task(self, queue: Text, task_id: Text, worker_args=None, task_session=None, **_):
print('Pulling task {} launching on kubernetes cluster'.format(task_id))
task_data = self._session.api_client.tasks.get_all(id=[task_id])[0]
session = task_session or self._session
task_data = session.api_client.tasks.get_all(id=[task_id])[0]
# push task into the k8s queue, so we have visibility on pending tasks in the k8s scheduler
try:
print('Pushing task {} into temporary pending queue'.format(task_id))
_ = self._session.api_client.tasks.stop(task_id, force=True)
res = self._session.api_client.tasks.enqueue(
task_id,
queue=self.k8s_pending_queue_id,
status_reason='k8s pending scheduler',
)
if res.meta.result_code != 200:
raise Exception(res.meta.result_msg)
except Exception as e:
self.log.error("ERROR: Could not push back task [{}] to k8s pending queue {} [{}], error: {}".format(
task_id, self.k8s_pending_queue_name, self.k8s_pending_queue_id, e))
return
if self._is_same_tenant(task_session):
try:
print('Pushing task {} into temporary pending queue'.format(task_id))
_ = session.api_client.tasks.stop(task_id, force=True)
container = get_task_container(self._session, task_id)
res = self._session.api_client.tasks.enqueue(
task_id,
queue=self.k8s_pending_queue_id,
status_reason='k8s pending scheduler',
)
if res.meta.result_code != 200:
raise Exception(res.meta.result_msg)
except Exception as e:
self.log.error("ERROR: Could not push back task [{}] to k8s pending queue {} [{}], error: {}".format(
task_id, self.k8s_pending_queue_name, self.k8s_pending_queue_id, e))
return
container = get_task_container(session, task_id)
if not container.get('image'):
container['image'] = str(
ENV_DOCKER_IMAGE.get() or self._session.config.get("agent.default_docker.image", "nvidia/cuda")
ENV_DOCKER_IMAGE.get() or session.config.get("agent.default_docker.image", "nvidia/cuda")
)
container['arguments'] = self._session.config.get("agent.default_docker.arguments", None)
container['arguments'] = session.config.get("agent.default_docker.arguments", None)
set_task_container(
self._session, task_id, docker_image=container['image'], docker_arguments=container['arguments']
session, task_id, docker_image=container['image'], docker_arguments=container['arguments']
)
# get the clearml.conf encoded file, make sure we use system packages!
git_user = ENV_AGENT_GIT_USER.get() or self._session.config.get("agent.git_user", None)
git_pass = ENV_AGENT_GIT_PASS.get() or self._session.config.get("agent.git_pass", None)
extra_config_values = [
'agent.package_manager.system_site_packages: true' if self._force_system_site_packages else '',
'agent.git_user: "{}"'.format(git_user) if git_user else '',
'agent.git_pass: "{}"'.format(git_pass) if git_pass else '',
]
# noinspection PyProtectedMember
config_content = (
self.conf_file_content or Path(self._session._config_file).read_text() or ""
) + '\nagent.package_manager.system_site_packages=true\n'
self.conf_file_content or Path(session._config_file).read_text() or ""
) + '\n{}\n'.format('\n'.join(x for x in extra_config_values if x))
hocon_config_encoded = config_content.encode("ascii")
create_clearml_conf = ["echo '{}' | base64 --decode >> ~/clearml.conf".format(
@@ -454,13 +505,13 @@ class K8sIntegration(Worker):
kubectl_cmd_new = self.get_kubectl_command(
"get pods",
extra_labels=[self.LIMIT_POD_LABEL.format(pod_number=pod_number)] if self.ports_mode else None
extra_labels=[self.limit_pod_label.format(pod_number=pod_number)] if self.ports_mode else None
)
self.log.debug("Looking for a free pod/port: {}".format(kubectl_cmd_new))
process = subprocess.Popen(kubectl_cmd_new.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate()
output = '' if not output else output if isinstance(output, str) else output.decode('utf-8')
error = '' if not error else error if isinstance(error, str) else error.decode('utf-8')
output = stringify_bash_output(output)
error = stringify_bash_output(error)
try:
items_count = len(json.loads(output).get("items", []))
@@ -471,8 +522,12 @@ class K8sIntegration(Worker):
output, task_id, queue, ex
)
)
self._session.api_client.tasks.stop(task_id, force=True)
self._session.api_client.tasks.enqueue(task_id, queue=queue, status_reason='kubectl parsing error')
session.api_client.tasks.stop(task_id, force=True)
# noinspection PyBroadException
try:
self._session.api_client.tasks.enqueue(task_id, queue=queue, status_reason='kubectl parsing error')
except:
self.log.warning("Failed enqueuing task to queue '{}'".format(queue))
return
if not items_count:
@@ -496,9 +551,14 @@ class K8sIntegration(Worker):
task_id, queue
)
)
self._session.api_client.tasks.stop(task_id, force=True)
self._session.api_client.tasks.enqueue(
task_id, queue=queue, status_reason='k8s max pod limit (no free k8s service)')
session.api_client.tasks.stop(task_id, force=True)
# noinspection PyBroadException
try:
self._session.api_client.tasks.enqueue(
task_id, queue=queue, status_reason='k8s max pod limit (no free k8s service)'
)
except:
self.log.warning("Failed enqueuing task to queue '{}'".format(queue))
return
elif self.max_pods_limit:
# max pods limit hasn't reached yet, so we can create the pod
@@ -507,36 +567,38 @@ class K8sIntegration(Worker):
labels = self._get_pod_labels(queue, queue_name)
if self.ports_mode:
labels.append(self.LIMIT_POD_LABEL.format(pod_number=pod_number))
labels.append(self.limit_pod_label.format(pod_number=pod_number))
if self.ports_mode:
print("Kubernetes scheduling task id={} on pod={} (pod_count={})".format(task_id, pod_number, pod_count))
else:
print("Kubernetes scheduling task id={}".format(task_id))
kubectl_kwargs = dict(
create_clearml_conf=create_clearml_conf,
labels=labels,
docker_image=container['image'],
docker_args=container['arguments'],
docker_bash=container.get('setup_shell_script'),
task_id=task_id,
queue=queue
)
try:
template = self._resolve_template(task_session, task_data, queue)
except Exception as ex:
print("ERROR: Failed resolving template (skipping): {}".format(ex))
return
if template:
output, error = self._kubectl_apply(template=template, **kubectl_kwargs)
else:
output, error = self._kubectl_run(task_data=task_data, **kubectl_kwargs)
try:
namespace = template['metadata']['namespace'] or self.namespace
except (KeyError, TypeError, AttributeError):
namespace = self.namespace
if template:
output, error = self._kubectl_apply(
template=template,
pod_number=pod_number,
create_clearml_conf=create_clearml_conf,
labels=labels,
docker_image=container['image'],
docker_args=container['arguments'],
docker_bash=container.get('setup_shell_script'),
task_id=task_id,
queue=queue,
namespace=namespace,
)
error = '' if not error else (error if isinstance(error, str) else error.decode('utf-8'))
output = '' if not output else (output if isinstance(output, str) else output.decode('utf-8'))
print('kubectl output:\n{}\n{}'.format(error, output))
if error:
send_log = "Running kubectl encountered an error: {}".format(error)
@@ -564,6 +626,7 @@ class K8sIntegration(Worker):
if user_props:
self._set_task_user_properties(
task_id=task_id,
task_session=task_session,
**user_props
)
@@ -601,19 +664,22 @@ class K8sIntegration(Worker):
return results
def _kubectl_apply(
self, create_clearml_conf, docker_image, docker_args, docker_bash, labels, queue, task_id, template=None
self,
create_clearml_conf,
docker_image,
docker_args,
docker_bash,
labels,
queue,
task_id,
namespace,
template=None,
pod_number=None
):
template = template or deepcopy(self.template_dict)
try:
namespace = template['metadata']['namespace'] or self.namespace
except (KeyError, TypeError, AttributeError):
namespace = self.namespace
template.setdefault('apiVersion', 'v1')
template['kind'] = 'Pod'
template.setdefault('metadata', {})
name = 'clearml-id-{task_id}'.format(task_id=task_id)
name = self.pod_name_prefix + str(task_id)
template['metadata']['name'] = name
template.setdefault('spec', {})
template['spec'].setdefault('containers', [])
@@ -641,7 +707,9 @@ class K8sIntegration(Worker):
['#!/bin/bash', ] +
[line.format(extra_bash_init_cmd=self.extra_bash_init_script or '',
task_id=task_id,
extra_docker_bash_script=extra_docker_bash_script)
extra_docker_bash_script=extra_docker_bash_script,
default_execution_agent_args=self.DEFAULT_EXECUTION_AGENT_ARGS,
agent_install_args=self.POD_AGENT_INSTALL_ARGS)
for line in container_bash_script])
extra_bash_commands = list(create_clearml_conf or [])
@@ -701,57 +769,34 @@ class K8sIntegration(Worker):
finally:
safe_remove_file(yaml_file)
return output, error
return stringify_bash_output(output), stringify_bash_output(error)
def _kubectl_run(
self, create_clearml_conf, docker_image, docker_args, docker_bash, labels, queue, task_data, task_id
):
if callable(self.kubectl_cmd):
kubectl_cmd = self.kubectl_cmd(task_id, docker_image, docker_args, queue, task_data)
else:
kubectl_cmd = self.kubectl_cmd.format(
task_id=task_id,
docker_image=docker_image,
docker_args=" ".join(self._get_docker_args(
docker_args, flags={"-e", "--env"}, convert=lambda env: '--env={}'.format(env))
),
queue_id=queue,
namespace=self.namespace,
)
# make sure we provide a list
if isinstance(kubectl_cmd, str):
kubectl_cmd = kubectl_cmd.split()
if self.overrides_json_string:
kubectl_cmd += ['--overrides=' + self.overrides_json_string]
if self.pod_limits:
kubectl_cmd += ['--limits', ",".join(self.pod_limits)]
if self.pod_requests:
kubectl_cmd += ['--requests', ",".join(self.pod_requests)]
if self._docker_force_pull and not any(x.startswith("--image-pull-policy=") for x in kubectl_cmd):
kubectl_cmd += ["--image-pull-policy='always'"]
container_bash_script = [self.container_bash_script] if isinstance(self.container_bash_script, str) \
else self.container_bash_script
container_bash_script = ' ; '.join(container_bash_script)
kubectl_cmd += [
"--labels=" + ",".join(labels),
"--command",
"--",
"/bin/sh",
"-c",
"{} ; {}".format(" ; ".join(create_clearml_conf or []), container_bash_script.format(
extra_bash_init_cmd=self.extra_bash_init_script or "",
extra_docker_bash_script=docker_bash or "",
task_id=task_id
)),
]
process = subprocess.Popen(kubectl_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate()
return output, error
def _cleanup_old_pods(self, namespaces, extra_msg=None):
# type: (Iterable[str], Optional[str]) -> Dict[str, List[str]]
self.log.debug("Cleaning up pods")
deleted_pods = defaultdict(list)
for namespace in namespaces:
if time() - self._last_pod_cleanup_per_ns[namespace] < self._min_cleanup_interval_per_ns_sec:
# Do not try to cleanup the same namespace too quickly
continue
kubectl_cmd = self.KUBECTL_DELETE_CMD.format(namespace=namespace, agent_label=self._get_agent_label())
self.log.debug("Deleting old/failed pods{} for ns {}: {}".format(
extra_msg or "", namespace, kubectl_cmd
))
try:
res = get_bash_output(kubectl_cmd, raise_error=True)
lines = [
line for line in
(r.strip().rpartition("/")[-1] for r in res.splitlines())
if line.startswith(self.pod_name_prefix)
]
self.log.debug(" - deleted pod(s) %s", ", ".join(lines))
deleted_pods[namespace].extend(lines)
except Exception as ex:
self.log.error("Failed deleting old/failed pods for ns %s: %s", namespace, str(ex))
finally:
self._last_pod_cleanup_per_ns[namespace] = time()
return deleted_pods
def run_tasks_loop(self, queues: List[Text], worker_params, **kwargs):
"""
@@ -780,16 +825,16 @@ class K8sIntegration(Worker):
# Get used pods and namespaces
current_pods, namespaces = self._get_used_pods()
# just in case there are no pods, make sure we look at our base namespace
namespaces.add(self.namespace)
# check if have pod limit, then check if we hit it.
if self.max_pods_limit:
if current_pods >= self.max_pods_limit:
print("Maximum pod limit reached {}/{}, sleeping for {:.1f} seconds".format(
current_pods, self.max_pods_limit, self._polling_interval))
# delete old completed / failed pods
for namespace in namespaces:
kubectl_cmd = self.KUBECTL_DELETE_CMD.format(namespace=namespace, selector=self._get_agent_label())
self.log.debug("Deleting old/failed pods due to pod limit: {}".format(kubectl_cmd))
get_bash_output(kubectl_cmd)
self._cleanup_old_pods(namespaces, " due to pod limit")
# go to sleep
sleep(self._polling_interval)
continue
@@ -797,10 +842,7 @@ class K8sIntegration(Worker):
# iterate over queues (priority style, queues[0] is highest)
for queue in queues:
# delete old completed / failed pods
for namespace in namespaces:
kubectl_cmd = self.KUBECTL_DELETE_CMD.format(namespace=namespace, selector=self._get_agent_label())
self.log.debug("Deleting old/failed pods: {}".format(kubectl_cmd))
get_bash_output(kubectl_cmd)
self._cleanup_old_pods(namespaces)
# get next task in queue
try:
@@ -816,14 +858,6 @@ class K8sIntegration(Worker):
except (KeyError, TypeError, AttributeError):
print("No tasks in queue {}".format(queue))
continue
events_service.send_log_events(
self.worker_id,
task_id=task_id,
lines="task {} pulled from {} by worker {}".format(
task_id, queue, self.worker_id
),
level="INFO",
)
task_session = None
if self._impersonate_as_task_owner:
@@ -843,6 +877,16 @@ class K8sIntegration(Worker):
)
continue
events_service.send_log_events(
self.worker_id,
task_id=task_id,
lines="task {} pulled from {} by worker {}".format(
task_id, queue, self.worker_id
),
level="INFO",
session=task_session,
)
self.report_monitor(ResourceMonitor.StatusReport(queues=queues, queue=queue, task=task_id))
self.run_one_task(queue, task_id, worker_params, task_session)
self.report_monitor(ResourceMonitor.StatusReport(queues=self.queues))

View File

@@ -80,7 +80,12 @@ class PackageManager(object):
def upgrade_pip(self):
result = self._install(
select_for_platform(windows='pip{}', linux='pip{}').format(self.get_pip_version()), "--upgrade")
*select_for_platform(
windows=self.get_pip_versions(),
linux=self.get_pip_versions()
),
"--upgrade"
)
packages = self.run_with_env(('list',), output=True).splitlines()
# p.split is ('pip', 'x.y.z')
pip = [p.split() for p in packages if len(p.split()) == 2 and p.split()[0] == 'pip']
@@ -157,15 +162,26 @@ class PackageManager(object):
def set_pip_version(cls, version):
if not version:
return
version = version.replace(' ', '')
if ('=' in version) or ('~' in version) or ('<' in version) or ('>' in version):
cls._pip_version = version
if isinstance(version, (list, tuple)):
versions = version
else:
cls._pip_version = "=="+version
versions = [version]
cls._pip_version = []
for version in versions:
version = version.strip()
if ('=' in version) or ('~' in version) or ('<' in version) or ('>' in version):
cls._pip_version.append(version)
else:
cls._pip_version.append("==" + version)
@classmethod
def get_pip_version(cls):
return cls._pip_version or ''
def get_pip_versions(cls, pip="pip", wrap=''):
return [
(wrap + pip + version + wrap)
for version in cls._pip_version or [pip]
]
def get_cached_venv(self, requirements, docker_cmd, python_version, cuda_version, destination_folder):
# type: (Dict, Optional[Union[dict, str]], Optional[str], Optional[str], Path) -> Optional[Path]

View File

@@ -135,7 +135,12 @@ class CondaAPI(PackageManager):
if self.env_read_only:
print('Conda environment in read-only mode, skipping pip upgrade.')
return ''
return self._install(select_for_platform(windows='pip{}', linux='pip{}').format(self.pip.get_pip_version()))
return self._install(
*select_for_platform(
windows=self.pip.get_pip_versions(),
linux=self.pip.get_pip_versions()
)
)
def create(self):
"""

View File

@@ -25,7 +25,7 @@ from clearml_agent.helper.base import bash_c, is_windows_platform, select_for_pl
PathLike = Union[Text, Path]
def get_bash_output(cmd, strip=False, stderr=subprocess.STDOUT, stdin=False):
def get_bash_output(cmd, strip=False, stderr=subprocess.STDOUT, stdin=False, raise_error=False):
try:
output = (
subprocess.check_output(
@@ -37,10 +37,16 @@ def get_bash_output(cmd, strip=False, stderr=subprocess.STDOUT, stdin=False):
.strip()
)
except subprocess.CalledProcessError:
if raise_error:
raise
output = None
return output if not strip or not output else output.strip()
def stringify_bash_output(value):
return '' if not value else (value if isinstance(value, str) else value.decode('utf-8'))
def terminate_process(pid, timeout=10., ignore_zombie=True, include_children=False):
# noinspection PyBroadException
try:

View File

@@ -106,7 +106,7 @@ class Session(_Session):
if os.path.exists(os.path.expanduser(os.path.expandvars(f))):
self._config_file = f
break
self.api_client = APIClient(session=self, api_version="2.5")
self._api_client = None
# HACK make sure we have python version to execute,
# if nothing was specific, use the one that runs us
def_python = ConfigValue(self.config, "agent.default_python")
@@ -133,7 +133,7 @@ class Session(_Session):
# override with environment variables
# cuda_version & cudnn_version are overridden with os.environ here, and normalized in the next section
for config_key, env_config in ENVIRONMENT_CONFIG.items():
# check if the propery is of a list:
# check if the property is of a list:
if config_key.endswith('.0'):
if all(not i.get() for i in env_config.values()):
continue
@@ -167,6 +167,16 @@ class Session(_Session):
if not kwargs.get('only_load_config'):
self.create_cache_folders()
@property
def api_client(self):
if self._api_client is None:
self._api_client = APIClient(session=self, api_version="2.5")
return self._api_client
@api_client.setter
def api_client(self, value):
self._api_client = value
@staticmethod
def get_logger(name):
logger = logging.getLogger(name)

View File

@@ -1 +1 @@
__version__ = '1.5.0rc0'
__version__ = '1.5.1'

View File

@@ -13,6 +13,15 @@ api {
}
agent {
# unique name of this worker, if None, created based on hostname:process_id
# Override with os environment: CLEARML_WORKER_ID
# worker_id: "clearml-agent-machine1:gpu0"
worker_id: ""
# worker name, replaces the hostname when creating a unique name for this worker
# Override with os environment: CLEARML_WORKER_NAME
# worker_name: "clearml-agent-machine1"
worker_name: ""
# Set GIT user/pass credentials (if user/pass are set, GIT protocol will be set to https)
# leave blank for GIT SSH credentials (set force_git_ssh_protocol=true to force SSH protocol)
# **Notice**: GitHub personal token is equivalent to password, you can put it directly into `git_pass`
@@ -20,11 +29,11 @@ agent {
# https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/creating-a-personal-access-token
# https://support.atlassian.com/bitbucket-cloud/docs/app-passwords/
# https://docs.gitlab.com/ee/user/profile/personal_access_tokens.html
git_user=""
git_pass=""
# git_user: ""
# git_pass: ""
# Limit credentials to a single domain, for example: github.com,
# all other domains will use public access (no user/pass). Default: always send user/pass for any VCS domain
git_host=""
# git_host: ""
# Force GIT protocol to use SSH regardless of the git url (Assumes GIT user/pass are blank)
force_git_ssh_protocol: false
@@ -33,16 +42,6 @@ agent {
# Force a specific SSH username when converting http to ssh links (the default username is 'git')
# force_git_ssh_user: git
# unique name of this worker, if None, created based on hostname:process_id
# Overridden with os environment: CLEARML_WORKER_ID
# worker_id: "clearml-agent-machine1:gpu0"
worker_id: ""
# worker name, replaces the hostname when creating a unique name for this worker
# Overridden with os environment: CLEARML_WORKER_NAME
# worker_name: "clearml-agent-machine1"
worker_name: ""
# Set the python version to use when creating the virtual environment and launching the experiment
# Example values: "/usr/bin/python3" or "/usr/local/bin/python3.6"
# The default is the python executing the clearml_agent
@@ -51,6 +50,22 @@ agent {
# specific python version and the system supports multiple python the agent will use the requested python version)
# ignore_requested_python_version: true
# Force the root folder of the git repository (instead of the working directory) into the PYHTONPATH
# default false, only the working directory will be added to the PYHTONPATH
# force_git_root_python_path: false
# if set, use GIT_ASKPASS to pass user/pass when cloning / fetch repositories
# it solves passing user/token to git submodules.
# this is a safer way to ensure multiple users using the same repository will
# not accidentally leak credentials
# Only supported on Linux systems, it will be the default in future releases
# enable_git_ask_pass: false
# in docker mode, if container's entrypoint automatically activated a virtual environment
# use the activated virtual environment and install everything there
# set to False to disable, and always create a new venv inheriting from the system_site_packages
# docker_use_activated_venv: true
# select python package manager:
# currently supported: pip, conda and poetry
# if "pip" or "conda" are used, the agent installs the required packages
@@ -64,7 +79,7 @@ agent {
type: pip,
# specify pip version to use (examples "<20.2", "==19.3.1", "", empty string will install the latest version)
# pip_version: "<21"
# pip_version: ["<20.2 ; python_version < '3.10'", "<22.3 ; python_version >= '3.10'"]
# specify poetry version to use (examples "<2", "==1.1.1", "", empty string will install the latest version)
# poetry_version: "<2",
@@ -225,7 +240,7 @@ agent {
enable_task_env: false
# CUDA versions used for Conda setup & solving PyTorch wheel packages
# it Should be detected automatically. Override with os environment CUDA_VERSION / CUDNN_VERSION
# Should be detected automatically. Override with os environment CUDA_VERSION / CUDNN_VERSION
# cuda_version: 10.1
# cudnn_version: 7.6
@@ -429,42 +444,46 @@ sdk {
# Apply top-level environment section from configuration into os.environ
apply_environment: true
# Top-level environment section is in the form of:
# environment {
# key: value
# ...
# }
# and is applied to the OS environment as `key=value` for each key/value pair
# Apply top-level files section from configuration into local file system
apply_files: true
# Top-level files section allows auto-generating files at designated paths with a predefined contents
# and target format. Options include:
# contents: the target file's content, typically a string (or any base type int/float/list/dict etc.)
# format: a custom format for the contents. Currently supported value is `base64` to automatically decode a
# base64-encoded contents string, otherwise ignored
# path: the target file's path, may include ~ and inplace env vars
# target_format: format used to encode contents before writing into the target file. Supported values are json,
# yaml, yml and bytes (in which case the file will be written in binary mode). Default is text mode.
# overwrite: overwrite the target file in case it exists. Default is true.
#
# Example:
# files {
# myfile1 {
# contents: "The quick brown fox jumped over the lazy dog"
# path: "/tmp/fox.txt"
# }
# myjsonfile {
# contents: {
# some {
# nested {
# value: [1, 2, 3, 4]
# }
# }
# }
# path: "/tmp/test.json"
# target_format: json
# }
# }
}
# Environment section (top-level) is applied to the OS environment as `key=value` for each key/value pair
# * enable/disable with `agent.apply_environment` OR `sdk.apply_environment`
# Example:
#
# environment {
# key_a: value_a
# key_b: value_b
# }
# Files section (top-level) allows auto-generating files at designated paths with
# predefined content and target format.
# * enable/disable with `agent.apply_files` OR `sdk.apply_files`
# Files content options include:
# contents: the target file's content, typically a string (or any base type int/float/list/dict etc.)
# format: a custom format for the contents. Currently supported value is `base64` to automatically decode a
# base64-encoded contents string, otherwise ignored
# path: the target file's path, may include ~ and inplace env vars
# target_format: format used to encode contents before writing into the target file. Supported values are json,
# yaml, yml and bytes (in which case the file will be written in binary mode). Default is text mode.
# overwrite: overwrite the target file in case it exists. Default is true.
#
# Example:
# files {
# myfile1 {
# contents: "The quick brown fox jumped over the lazy dog"
# path: "/tmp/fox.txt"
# }
# myjsonfile {
# contents: {
# some {
# nested {
# value: [1, 2, 3, 4]
# }
# }
# }
# path: "/tmp/test.json"
# target_format: json
# }
# }

View File

@@ -1,15 +1,15 @@
attrs>=18.0,<20.4.0
attrs>=18.0,<23.0.0
enum34>=0.9,<1.2.0 ; python_version < '3.6'
furl>=2.0.0,<2.2.0
jsonschema>=2.6.0,<3.3.0
jsonschema>=2.6.0,<5.0.0
pathlib2>=2.3.0,<2.4.0
psutil>=3.4.2,<5.10.0
pyparsing>=2.0.3,<2.5.0
pyparsing>=2.0.3,<3.1.0
python-dateutil>=2.4.2,<2.9.0
pyjwt>=2.4.0,<2.5.0
pyjwt>=2.4.0,<2.7.0
PyYAML>=3.12,<6.1
requests>=2.20.0,<2.29.0
six>=1.13.0,<1.16.0
six>=1.13.0,<1.17.0
typing>=3.6.4,<3.8.0 ; python_version < '3.5'
urllib3>=1.21.1,<1.27.0
virtualenv>=16,<21