From 40456be948ad11dda0b9b6c48092d7c301579105 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Sun, 5 Mar 2023 18:04:35 +0200 Subject: [PATCH] Black formatting Refactor path support --- clearml_agent/commands/worker.py | 94 +++++++++++++++++--- clearml_agent/definitions.py | 146 ++++++++++++++++++++----------- clearml_agent/glue/k8s.py | 9 -- 3 files changed, 176 insertions(+), 73 deletions(-) diff --git a/clearml_agent/commands/worker.py b/clearml_agent/commands/worker.py index 6dbf446..3c83839 100644 --- a/clearml_agent/commands/worker.py +++ b/clearml_agent/commands/worker.py @@ -1,6 +1,7 @@ from __future__ import print_function, division, unicode_literals import errno +import functools import json import logging import os @@ -735,8 +736,68 @@ class Worker(ServiceCommandSection): except Exception: pass + def _get_docker_restart_value(self, session, task_id: str): + try: + session.verify_feature_set('advanced') + except ValueError: + return + + restart = (ENV_SERVICES_DOCKER_RESTART.get() or "").strip() + if not restart: + return + + # Parse value and selector + restart_value, _, selector = restart.partition(";") + + if restart_value not in ("unless-stopped", "no", "always") and not restart_value.startswith("on-failure"): + self.log.error( + "Invalid value \"{}\" provided for {}, ignoring".format(restart, ENV_SERVICES_DOCKER_RESTART.vars[0]) + ) + return + + if not selector: + return restart_value + + path, _, expected_value = selector.partition("=") + + result = session.send_request( + service='tasks', + action='get_all', + json={'id': [task_id], 'only_fields': [path], 'search_hidden': True}, + method=Request.def_method, + ) + if not result.ok: + result_msg = self._get_path(result.json(), 'meta', 'result_msg') + self.log.error( + "Failed obtaining selector value for restart option \"{}\", ignoring: {}".format(selector, result_msg) + ) + return + + not_found = object() + try: + value = self._get_path(result.json(), 'data', 'tasks', 0, *path.split("."), default=not_found) + except (ValueError, TypeError): + return + + if value is not_found: + return + + if not expected_value: + return restart_value + + # noinspection PyBroadException + try: + if ( + (isinstance(value, bool) and value == strtobool(expected_value)) # check first - bool is also an int + or (isinstance(value, (int, float)) and value == float(expected_value)) + or (str(value) == str(expected_value)) + ): + return restart_value + except Exception as ex: + pass + def run_one_task(self, queue, task_id, worker_args, docker=None, task_session=None): - # type: (Text, Text, WorkerParams, Optional[Text], Optional[Session]) -> int + # type: (Text, Text, WorkerParams, Optional[Text], Optional[Session]) -> Optional[int] """ Run one task pulled from queue. :param queue: ID of queue that task was pulled from @@ -811,6 +872,7 @@ class Worker(ServiceCommandSection): docker_image=docker_image, docker_arguments=docker_arguments, docker_bash_setup_script=docker_setup_script, + restart=self._get_docker_restart_value(task_session, task_id), ) if self._impersonate_as_task_owner: docker_params["auth_token"] = task_session.token @@ -2110,7 +2172,8 @@ class Worker(ServiceCommandSection): print('Building Task {} inside docker image: {} {} setup_script={}\n'.format( task_id, docker_image, docker_arguments or '', docker_setup_script or '')) full_docker_cmd = self.docker_image_func( - docker_image=docker_image, docker_arguments=docker_arguments, docker_bash_setup_script=docker_setup_script) + docker_image=docker_image, docker_arguments=docker_arguments, docker_bash_setup_script=docker_setup_script + ) end_of_build_marker = "build.done=true" docker_cmd_suffix = ' build --id {task_id} --install-globally; ' \ @@ -3725,20 +3788,19 @@ class Worker(ServiceCommandSection): name=None, mount_ssh=None, mount_ssh_ro=None, mount_apt_cache=None, mount_pip_cache=None, mount_poetry_cache=None, env_task_id=None, + restart=None, ): self.debug("Constructing docker command", context="docker") docker = 'docker' base_cmd = [docker, 'run', '-t'] - - if ENV_SERVICES_DOCKER_RESTART.get(): - value = ENV_SERVICES_DOCKER_RESTART.get().strip() - if value in ("unless-stopped", "no", "always") or value.startswith("on-failure"): - base_cmd += ["--restart", value] + use_rm = True + if restart: + if restart in ("unless-stopped", "no", "always") or restart.startswith("on-failure"): + base_cmd += ["--restart", restart] + use_rm = False else: - self.log.error( - "Invalid value \"{}\" provided for {}, ignoring".format(value, ENV_SERVICES_DOCKER_RESTART.vars[0]) - ) + self.log.error("Invalid restart value \"{}\" , ignoring".format(restart)) update_scheme = "" dockers_nvidia_visible_devices = 'all' @@ -3952,7 +4014,8 @@ class Worker(ServiceCommandSection): (['-v', host_cache+':'+mounted_cache] if host_cache else []) + (['-v', host_vcs_cache+':'+mounted_vcs_cache] if host_vcs_cache else []) + (['-v', host_venvs_cache + ':' + mounted_venvs_cache] if host_venvs_cache else []) + - ['--rm', docker_image, 'bash', '-c', + (['--rm'] if use_rm else []) + + [docker_image, 'bash', '-c', update_scheme + extra_shell_script + "cp {} {} ; ".format(DOCKER_ROOT_CONF_FILE, DOCKER_DEFAULT_CONF_FILE) + @@ -4158,6 +4221,15 @@ class Worker(ServiceCommandSection): " found {})".format(role) ) + @staticmethod + def _get_path(d, *path, default=None): + try: + return functools.reduce( + lambda a, b: a[b], path, d + ) + except (IndexError, KeyError): + return default + if __name__ == "__main__": pass diff --git a/clearml_agent/definitions.py b/clearml_agent/definitions.py index 186b0da..999903c 100644 --- a/clearml_agent/definitions.py +++ b/clearml_agent/definitions.py @@ -5,9 +5,9 @@ from enum import IntEnum from os import getenv, environ from typing import Text, Optional, Union, Tuple, Any +import six from pathlib2 import Path -import six from clearml_agent.helper.base import normalize_path PROGRAM_NAME = "clearml-agent" @@ -69,42 +69,65 @@ ENV_AWS_SECRET_KEY = EnvironmentConfig("AWS_SECRET_ACCESS_KEY") ENV_AZURE_ACCOUNT_KEY = EnvironmentConfig("AZURE_STORAGE_KEY") ENVIRONMENT_CONFIG = { - "api.api_server": EnvironmentConfig("CLEARML_API_HOST", "TRAINS_API_HOST", ), - "api.files_server": EnvironmentConfig("CLEARML_FILES_HOST", "TRAINS_FILES_HOST", ), - "api.web_server": EnvironmentConfig("CLEARML_WEB_HOST", "TRAINS_WEB_HOST", ), + "api.api_server": EnvironmentConfig( + "CLEARML_API_HOST", + "TRAINS_API_HOST", + ), + "api.files_server": EnvironmentConfig( + "CLEARML_FILES_HOST", + "TRAINS_FILES_HOST", + ), + "api.web_server": EnvironmentConfig( + "CLEARML_WEB_HOST", + "TRAINS_WEB_HOST", + ), "api.credentials.access_key": EnvironmentConfig( - "CLEARML_API_ACCESS_KEY", "TRAINS_API_ACCESS_KEY", + "CLEARML_API_ACCESS_KEY", + "TRAINS_API_ACCESS_KEY", ), "api.credentials.secret_key": ENV_AGENT_SECRET_KEY, - "agent.worker_name": EnvironmentConfig("CLEARML_WORKER_NAME", "TRAINS_WORKER_NAME", ), - "agent.worker_id": EnvironmentConfig("CLEARML_WORKER_ID", "TRAINS_WORKER_ID", ), - "agent.cuda_version": EnvironmentConfig( - "CLEARML_CUDA_VERSION", "TRAINS_CUDA_VERSION", "CUDA_VERSION" + "agent.worker_name": EnvironmentConfig( + "CLEARML_WORKER_NAME", + "TRAINS_WORKER_NAME", ), - "agent.cudnn_version": EnvironmentConfig( - "CLEARML_CUDNN_VERSION", "TRAINS_CUDNN_VERSION", "CUDNN_VERSION" - ), - "agent.cpu_only": EnvironmentConfig( - names=("CLEARML_CPU_ONLY", "TRAINS_CPU_ONLY", "CPU_ONLY"), type=bool + "agent.worker_id": EnvironmentConfig( + "CLEARML_WORKER_ID", + "TRAINS_WORKER_ID", ), + "agent.cuda_version": EnvironmentConfig("CLEARML_CUDA_VERSION", "TRAINS_CUDA_VERSION", "CUDA_VERSION"), + "agent.cudnn_version": EnvironmentConfig("CLEARML_CUDNN_VERSION", "TRAINS_CUDNN_VERSION", "CUDNN_VERSION"), + "agent.cpu_only": EnvironmentConfig(names=("CLEARML_CPU_ONLY", "TRAINS_CPU_ONLY", "CPU_ONLY"), type=bool), "agent.crash_on_exception": EnvironmentConfig("CLEAMRL_AGENT_CRASH_ON_EXCEPTION", type=bool), "sdk.aws.s3.key": EnvironmentConfig("AWS_ACCESS_KEY_ID"), "sdk.aws.s3.secret": ENV_AWS_SECRET_KEY, "sdk.aws.s3.region": EnvironmentConfig("AWS_DEFAULT_REGION"), - "sdk.azure.storage.containers.0": {'account_name': EnvironmentConfig("AZURE_STORAGE_ACCOUNT"), - 'account_key': ENV_AZURE_ACCOUNT_KEY}, + "sdk.azure.storage.containers.0": { + "account_name": EnvironmentConfig("AZURE_STORAGE_ACCOUNT"), + "account_key": ENV_AZURE_ACCOUNT_KEY, + }, "sdk.google.storage.credentials_json": EnvironmentConfig("GOOGLE_APPLICATION_CREDENTIALS"), } ENVIRONMENT_SDK_PARAMS = { - "task_id": ("CLEARML_TASK_ID", "TRAINS_TASK_ID", ), - "config_file": ("CLEARML_CONFIG_FILE", "TRAINS_CONFIG_FILE", ), - "log_level": ("CLEARML_LOG_LEVEL", "TRAINS_LOG_LEVEL", ), - "log_to_backend": ("CLEARML_LOG_TASK_TO_BACKEND", "TRAINS_LOG_TASK_TO_BACKEND", ), + "task_id": ( + "CLEARML_TASK_ID", + "TRAINS_TASK_ID", + ), + "config_file": ( + "CLEARML_CONFIG_FILE", + "TRAINS_CONFIG_FILE", + ), + "log_level": ( + "CLEARML_LOG_LEVEL", + "TRAINS_LOG_LEVEL", + ), + "log_to_backend": ( + "CLEARML_LOG_TASK_TO_BACKEND", + "TRAINS_LOG_TASK_TO_BACKEND", + ), } -ENVIRONMENT_BACKWARD_COMPATIBLE = EnvironmentConfig( - names=("CLEARML_AGENT_ALG_ENV", "TRAINS_AGENT_ALG_ENV"), type=bool) +ENVIRONMENT_BACKWARD_COMPATIBLE = EnvironmentConfig(names=("CLEARML_AGENT_ALG_ENV", "TRAINS_AGENT_ALG_ENV"), type=bool) VIRTUAL_ENVIRONMENT_PATH = { "python2": normalize_path(CONFIG_DIR, "py2venv"), @@ -123,44 +146,61 @@ TOKEN_EXPIRATION_SECONDS = int(timedelta(days=2).total_seconds()) METADATA_EXTENSION = ".json" -DEFAULT_VENV_UPDATE_URL = ( - "https://raw.githubusercontent.com/Yelp/venv-update/v3.2.4/venv_update.py" -) +DEFAULT_VENV_UPDATE_URL = "https://raw.githubusercontent.com/Yelp/venv-update/v3.2.4/venv_update.py" WORKING_REPOSITORY_DIR = "task_repository" WORKING_STANDALONE_DIR = "code" DEFAULT_VCS_CACHE = normalize_path(CONFIG_DIR, "vcs-cache") -PIP_EXTRA_INDICES = [ -] +PIP_EXTRA_INDICES = [] DEFAULT_PIP_DOWNLOAD_CACHE = normalize_path(CONFIG_DIR, "pip-download-cache") -ENV_DOCKER_IMAGE = EnvironmentConfig('CLEARML_DOCKER_IMAGE', 'TRAINS_DOCKER_IMAGE') -ENV_WORKER_ID = EnvironmentConfig('CLEARML_WORKER_ID', 'TRAINS_WORKER_ID') -ENV_WORKER_TAGS = EnvironmentConfig('CLEARML_WORKER_TAGS') -ENV_AGENT_SKIP_PIP_VENV_INSTALL = EnvironmentConfig('CLEARML_AGENT_SKIP_PIP_VENV_INSTALL') -ENV_AGENT_SKIP_PYTHON_ENV_INSTALL = EnvironmentConfig('CLEARML_AGENT_SKIP_PYTHON_ENV_INSTALL', type=bool) -ENV_DOCKER_SKIP_GPUS_FLAG = EnvironmentConfig('CLEARML_DOCKER_SKIP_GPUS_FLAG', 'TRAINS_DOCKER_SKIP_GPUS_FLAG') -ENV_AGENT_GIT_USER = EnvironmentConfig('CLEARML_AGENT_GIT_USER', 'TRAINS_AGENT_GIT_USER') -ENV_AGENT_GIT_PASS = EnvironmentConfig('CLEARML_AGENT_GIT_PASS', 'TRAINS_AGENT_GIT_PASS') -ENV_AGENT_GIT_HOST = EnvironmentConfig('CLEARML_AGENT_GIT_HOST', 'TRAINS_AGENT_GIT_HOST') -ENV_AGENT_DISABLE_SSH_MOUNT = EnvironmentConfig('CLEARML_AGENT_DISABLE_SSH_MOUNT', type=bool) -ENV_SSH_AUTH_SOCK = EnvironmentConfig('SSH_AUTH_SOCK') -ENV_TASK_EXECUTE_AS_USER = EnvironmentConfig('CLEARML_AGENT_EXEC_USER', 'TRAINS_AGENT_EXEC_USER') -ENV_TASK_EXTRA_PYTHON_PATH = EnvironmentConfig('CLEARML_AGENT_EXTRA_PYTHON_PATH', 'TRAINS_AGENT_EXTRA_PYTHON_PATH') -ENV_DOCKER_HOST_MOUNT = EnvironmentConfig('CLEARML_AGENT_K8S_HOST_MOUNT', 'CLEARML_AGENT_DOCKER_HOST_MOUNT', - 'TRAINS_AGENT_K8S_HOST_MOUNT', 'TRAINS_AGENT_DOCKER_HOST_MOUNT') -ENV_VENV_CACHE_PATH = EnvironmentConfig('CLEARML_AGENT_VENV_CACHE_PATH') -ENV_EXTRA_DOCKER_ARGS = EnvironmentConfig('CLEARML_AGENT_EXTRA_DOCKER_ARGS', type=list) -ENV_DEBUG_INFO = EnvironmentConfig('CLEARML_AGENT_DEBUG_INFO') -ENV_CHILD_AGENTS_COUNT_CMD = EnvironmentConfig('CLEARML_AGENT_CHILD_AGENTS_COUNT_CMD') -ENV_DOCKER_ARGS_FILTERS = EnvironmentConfig('CLEARML_AGENT_DOCKER_ARGS_FILTERS') -ENV_DOCKER_ARGS_HIDE_ENV = EnvironmentConfig('CLEARML_AGENT_DOCKER_ARGS_HIDE_ENV') -ENV_SERVICES_DOCKER_RESTART = EnvironmentConfig('CLEARML_AGENT_SERVICES_DOCKER_RESTART') +ENV_DOCKER_IMAGE = EnvironmentConfig("CLEARML_DOCKER_IMAGE", "TRAINS_DOCKER_IMAGE") +ENV_WORKER_ID = EnvironmentConfig("CLEARML_WORKER_ID", "TRAINS_WORKER_ID") +ENV_WORKER_TAGS = EnvironmentConfig("CLEARML_WORKER_TAGS") +ENV_AGENT_SKIP_PIP_VENV_INSTALL = EnvironmentConfig("CLEARML_AGENT_SKIP_PIP_VENV_INSTALL") +ENV_AGENT_SKIP_PYTHON_ENV_INSTALL = EnvironmentConfig("CLEARML_AGENT_SKIP_PYTHON_ENV_INSTALL", type=bool) +ENV_DOCKER_SKIP_GPUS_FLAG = EnvironmentConfig("CLEARML_DOCKER_SKIP_GPUS_FLAG", "TRAINS_DOCKER_SKIP_GPUS_FLAG") +ENV_AGENT_GIT_USER = EnvironmentConfig("CLEARML_AGENT_GIT_USER", "TRAINS_AGENT_GIT_USER") +ENV_AGENT_GIT_PASS = EnvironmentConfig("CLEARML_AGENT_GIT_PASS", "TRAINS_AGENT_GIT_PASS") +ENV_AGENT_GIT_HOST = EnvironmentConfig("CLEARML_AGENT_GIT_HOST", "TRAINS_AGENT_GIT_HOST") +ENV_AGENT_DISABLE_SSH_MOUNT = EnvironmentConfig("CLEARML_AGENT_DISABLE_SSH_MOUNT", type=bool) +ENV_SSH_AUTH_SOCK = EnvironmentConfig("SSH_AUTH_SOCK") +ENV_TASK_EXECUTE_AS_USER = EnvironmentConfig("CLEARML_AGENT_EXEC_USER", "TRAINS_AGENT_EXEC_USER") +ENV_TASK_EXTRA_PYTHON_PATH = EnvironmentConfig("CLEARML_AGENT_EXTRA_PYTHON_PATH", "TRAINS_AGENT_EXTRA_PYTHON_PATH") +ENV_DOCKER_HOST_MOUNT = EnvironmentConfig( + "CLEARML_AGENT_K8S_HOST_MOUNT", + "CLEARML_AGENT_DOCKER_HOST_MOUNT", + "TRAINS_AGENT_K8S_HOST_MOUNT", + "TRAINS_AGENT_DOCKER_HOST_MOUNT", +) +ENV_VENV_CACHE_PATH = EnvironmentConfig("CLEARML_AGENT_VENV_CACHE_PATH") +ENV_EXTRA_DOCKER_ARGS = EnvironmentConfig("CLEARML_AGENT_EXTRA_DOCKER_ARGS", type=list) +ENV_DEBUG_INFO = EnvironmentConfig("CLEARML_AGENT_DEBUG_INFO") +ENV_CHILD_AGENTS_COUNT_CMD = EnvironmentConfig("CLEARML_AGENT_CHILD_AGENTS_COUNT_CMD") +ENV_DOCKER_ARGS_FILTERS = EnvironmentConfig("CLEARML_AGENT_DOCKER_ARGS_FILTERS") +ENV_DOCKER_ARGS_HIDE_ENV = EnvironmentConfig("CLEARML_AGENT_DOCKER_ARGS_HIDE_ENV") -ENV_FORCE_SYSTEM_SITE_PACKAGES = EnvironmentConfig('CLEARML_AGENT_FORCE_SYSTEM_SITE_PACKAGES', type=bool) +ENV_SERVICES_DOCKER_RESTART = EnvironmentConfig("CLEARML_AGENT_SERVICES_DOCKER_RESTART") +""" + Specify a restart value for a services agent task containers. + Note that when a restart value is provided, task containers will not be run with the '--rm' flag and will + not be cleaned up automatically when completed (this will need to be done externally using the + 'docker container prune' command to free up resources). + Value format for this env var is ";", where: + - can be any valid restart value for docker-run (see https://docs.docker.com/engine/reference/commandline/run/#restart) + - is optional, allowing to restrict this behaviour to specific tasks. The format is: + "=" where: + * is a dot-separated path to a task field (e.g. "container.image") + * is optional. If not provided, the restart policy till be applied for the task container if the + path provided exists. If provided, the restart policy will be applied if the value matches the value + obtained from the task (value parsing and comparison is based on the type of value obtained from the task) + For example: + CLEARML_AGENT_SERVICES_DOCKER_RESTART=unless-stopped + CLEARML_AGENT_SERVICES_DOCKER_RESTART=unless-stopped;container.image=some-image +""" + +ENV_FORCE_SYSTEM_SITE_PACKAGES = EnvironmentConfig("CLEARML_AGENT_FORCE_SYSTEM_SITE_PACKAGES", type=bool) """ Force system_site_packages: true when running tasks in containers (i.e. docker mode or k8s glue) """ - - -ENV_CUSTOM_BUILD_SCRIPT = EnvironmentConfig('CLEARML_AGENT_CUSTOM_BUILD_SCRIPT') +ENV_CUSTOM_BUILD_SCRIPT = EnvironmentConfig("CLEARML_AGENT_CUSTOM_BUILD_SCRIPT") """ Specifies a custom environment setup script to be executed instead of installing a virtual environment. If provided, this script is executed following Git cloning. Script command may include environment variable and diff --git a/clearml_agent/glue/k8s.py b/clearml_agent/glue/k8s.py index 32be4c9..f7c0e3b 100644 --- a/clearml_agent/glue/k8s.py +++ b/clearml_agent/glue/k8s.py @@ -219,15 +219,6 @@ class K8sIntegration(Worker): with open(os.path.expandvars(os.path.expanduser(str(path))), 'rt') as f: return yaml.load(f, Loader=getattr(yaml, 'FullLoader', None)) - @staticmethod - def _get_path(d, *path, default=None): - try: - return functools.reduce( - lambda a, b: a[b], path, d - ) - except (IndexError, KeyError): - return default - def _get_kubectl_options(self, command, extra_labels=None, filters=None, output="json", labels=None): # type: (str, Iterable[str], Iterable[str], str, Iterable[str]) -> Dict if not labels: