Compare commits

..

20 Commits

Author SHA1 Message Date
allegroai
e3c8bd5666 Add support for agent.docker_force_pull configuration setting in k8s glue 2021-06-25 17:36:08 +03:00
allegroai
3ae1741343 Fix k8s glue task container arguments not supported in kubectl_run command
Fix k8s glue not passing required extra_docker_bash_script to string format
2021-06-25 17:35:01 +03:00
allegroai
53c106c3af Fix k8s glue task container handling fails parsing docker image
Fix k8s glue uses task container image arguments when no image is specified
2021-06-25 17:34:28 +03:00
allegroai
44fc7dffe6 Fix key/secret usage printout 2021-06-24 19:37:59 +03:00
allegroai
aaa6b32f9f Fix support for "-r requirements.txt" inside "installed packages" 2021-06-24 19:26:35 +03:00
allegroai
821a0c4a2b Fix parsing VCS links starting with "git+git@" (notice "git+git://" was already supported) 2021-06-24 19:25:41 +03:00
allegroai
176b4a4cde Fix --services-mode when the execute agent fails when starting to run with error code 0 2021-06-16 18:32:29 +03:00
allegroai
29bf993be7 Add printout when using key/secret from env vars 2021-06-02 21:15:48 +03:00
allegroai
eda597dea5 Version bump 2021-06-02 13:17:57 +03:00
allegroai
8c56777125 Add CLEARML_AGENT_DISABLE_SSH_MOUNT allowing disabling the auto .ssh mount into the docker 2021-06-02 13:16:58 +03:00
allegroai
7e90ebd5db Fix _dynamic_gpu_get_available worker timeout increase to 10 minutes 2021-06-02 13:16:17 +03:00
allegroai
3a07bfe1d7 Version bump 2021-05-31 23:19:46 +03:00
allegroai
0694b9e8af Fix PyYAML supported versions 2021-05-26 18:33:35 +03:00
allegroai
742cbf5767 Add docker environment arguments log masking support (issue #67) 2021-05-25 19:31:45 +03:00
allegroai
e93384b99b Fix --stop with dynamic gpus 2021-05-20 10:58:46 +03:00
allegroai
3c4e976093 Add agent.ignore_requested_python_version to config file 2021-05-19 15:20:44 +03:00
allegroai
1e795beec8 Fix support for spaces in docker arguments (issue #358) 2021-05-19 15:20:03 +03:00
allegroai
4f7407084d Fix standalone script with pre-exiting conda venv 2021-05-12 15:46:25 +03:00
allegroai
ae3d034531 Protect against None in execution.repository 2021-05-12 15:45:31 +03:00
allegroai
a2db1f5ab5 Remove queue name from pod name in k8s glue, add queue name and ID to pod labels (issue #64) 2021-05-05 12:03:35 +03:00
14 changed files with 284 additions and 114 deletions

View File

@@ -26,6 +26,9 @@
# Example values: "/usr/bin/python3" or "/usr/local/bin/python3.6"
# The default is the python executing the clearml_agent
python_binary: ""
# ignore any requested python version (Default: False, if a Task was using a
# specific python version and the system supports multiple python the agent will use the requested python version)
# ignore_requested_python_version: true
# select python package manager:
# currently supported pip and conda
@@ -182,4 +185,16 @@
# should be detected automatically. Override with os environment CUDA_VERSION / CUDNN_VERSION
# cuda_version: 10.1
# cudnn_version: 7.6
# Hide docker environment variables containing secrets when printing out the docker command by replacing their
# values with "********". Turning this feature on will hide the following environment variables values:
# CLEARML_API_SECRET_KEY, CLEARML_AGENT_GIT_PASS, AWS_SECRET_ACCESS_KEY, AZURE_STORAGE_KEY
# To include more environment variables, add their keys to the "extra_keys" list. E.g. to make sure the value of
# your custom environment variable named MY_SPECIAL_PASSWORD will not show in the logs when included in the
# docker command, set:
# extra_keys: ["MY_SPECIAL_PASSWORD"]
hide_docker_command_env_vars {
enabled: true
extra_keys: []
}
}

View File

@@ -111,7 +111,8 @@ class Session(TokenManager):
self._logger = logger
self.__access_key = api_key or ENV_ACCESS_KEY.get(
default=(self.config.get("api.credentials.access_key", None) or self.default_key)
default=(self.config.get("api.credentials.access_key", None) or self.default_key),
value_cb=lambda key, value: print("Using environment access key {}={}".format(key, value))
)
if not self.access_key:
raise ValueError(
@@ -119,7 +120,8 @@ class Session(TokenManager):
)
self.__secret_key = secret_key or ENV_SECRET_KEY.get(
default=(self.config.get("api.credentials.secret_key", None) or self.default_secret)
default=(self.config.get("api.credentials.secret_key", None) or self.default_secret),
value_cb=lambda key, value: print("Using environment secret key {}=********".format(key))
)
if not self.secret_key:
raise ValueError(

View File

@@ -64,8 +64,8 @@ class Entry(object):
converter = self.default_conversions().get(self.type, self.type)
return converter(value)
def get_pair(self, default=NotSet, converter=None):
# type: (Any, Converter) -> Optional[Tuple[Text, Any]]
def get_pair(self, default=NotSet, converter=None, value_cb=None):
# type: (Any, Converter, Callable[[str, Any], None]) -> Optional[Tuple[Text, Any]]
for key in self.keys:
value = self._get(key)
if value is NotSet:
@@ -75,13 +75,20 @@ class Entry(object):
except Exception as ex:
self.error("invalid value {key}={value}: {ex}".format(**locals()))
break
# noinspection PyBroadException
try:
if value_cb:
value_cb(key, value)
except Exception:
pass
return key, value
result = self.default if default is NotSet else default
return self.key, result
def get(self, default=NotSet, converter=None):
# type: (Any, Converter) -> Optional[Any]
return self.get_pair(default=default, converter=converter)[1]
def get(self, default=NotSet, converter=None, value_cb=None):
# type: (Any, Converter, Callable[[str, Any], None]) -> Optional[Any]
return self.get_pair(default=default, converter=converter, value_cb=value_cb)[1]
def set(self, value):
# type: (Any, Any) -> (Text, Any)

View File

@@ -11,6 +11,7 @@ import subprocess
import sys
import shutil
import traceback
import shlex
from collections import defaultdict
from copy import deepcopy, copy
from datetime import datetime
@@ -19,7 +20,7 @@ from functools import partial, cmp_to_key
from itertools import chain
from tempfile import mkdtemp, NamedTemporaryFile
from time import sleep, time
from typing import Text, Optional, Any, Tuple
from typing import Text, Optional, Any, Tuple, List
import attr
import psutil
@@ -43,7 +44,14 @@ from clearml_agent.definitions import (
ENV_DOCKER_HOST_MOUNT,
ENV_TASK_EXTRA_PYTHON_PATH,
ENV_AGENT_GIT_USER,
ENV_AGENT_GIT_PASS, ENV_WORKER_ID, ENV_DOCKER_SKIP_GPUS_FLAG, )
ENV_AGENT_GIT_PASS,
ENV_WORKER_ID,
ENV_DOCKER_SKIP_GPUS_FLAG,
ENV_AGENT_SECRET_KEY,
ENV_AWS_SECRET_KEY,
ENV_AZURE_ACCOUNT_KEY,
ENV_AGENT_DISABLE_SSH_MOUNT,
)
from clearml_agent.definitions import WORKING_REPOSITORY_DIR, PIP_EXTRA_INDICES
from clearml_agent.errors import APIError, CommandFailedError, Sigterm
from clearml_agent.helper.base import (
@@ -67,7 +75,9 @@ from clearml_agent.helper.base import (
get_python_path,
is_linux_platform,
rm_file,
add_python_path, safe_remove_tree, )
add_python_path,
safe_remove_tree,
)
from clearml_agent.helper.console import ensure_text, print_text, decode_binary_lines
from clearml_agent.helper.os.daemonize import daemonize_process
from clearml_agent.helper.package.base import PackageManager
@@ -90,7 +100,10 @@ from clearml_agent.helper.process import (
get_bash_output,
shutdown_docker_process,
get_docker_id,
commit_docker, terminate_process, check_if_command_exists, terminate_all_child_processes,
commit_docker,
terminate_process,
check_if_command_exists,
terminate_all_child_processes,
)
from clearml_agent.helper.package.priority_req import PriorityPackageRequirement, PackageCollectorRequirement
from clearml_agent.helper.repo import clone_repository_cached, RepoInfo, VCS, fix_package_import_diff_patch
@@ -187,7 +200,7 @@ class LiteralScriptManager(object):
location = location or (repo_info and repo_info.root)
if not location:
location = Path(self.venv_folder, "code")
location.mkdir(exist_ok=True)
location.mkdir(exist_ok=True, parents=True)
log.debug("selected execution directory: %s", location)
return Text(location), self.write(task, location, execution.entry_point)
@@ -221,6 +234,9 @@ def get_task(session, task_id, *args, **kwargs):
def get_task_container(session, task_id):
"""
Returns dict with Task docker container setup {container: '', arguments: '', setup_shell_script: ''}
"""
if session.check_min_api_version("2.13"):
result = session.send_request(
service='tasks',
@@ -233,12 +249,12 @@ def get_task_container(session, task_id):
try:
container = result.json()['data']['tasks'][0]['container'] if result.ok else {}
if container.get('arguments'):
container['arguments'] = str(container.get('arguments')).split(' ')
container['arguments'] = shlex.split(str(container.get('arguments')).strip())
except (ValueError, TypeError):
container = {}
else:
response = get_task(session, task_id, only_fields=["execution.docker_cmd"])
task_docker_cmd_parts = str(response.execution.docker_cmd or '').strip().split(' ')
task_docker_cmd_parts = shlex.split(str(response.execution.docker_cmd or '').strip())
try:
container = dict(
container=task_docker_cmd_parts[0],
@@ -251,11 +267,14 @@ def get_task_container(session, task_id):
def set_task_container(session, task_id, docker_image=None, docker_arguments=None, docker_bash_script=None):
if docker_arguments and isinstance(docker_arguments, str):
docker_arguments = [docker_arguments]
if session.check_min_api_version("2.13"):
container = dict(
image=docker_image or None,
arguments=' '.join(docker_arguments) if docker_arguments else None,
setup_shell_script=docker_bash_script or None,
image=docker_image or '',
arguments=' '.join(docker_arguments) if docker_arguments else '',
setup_shell_script=docker_bash_script or '',
)
result = session.send_request(
service='tasks',
@@ -614,10 +633,13 @@ class Worker(ServiceCommandSection):
'--standalone-mode' if self._standalone_mode else '',
task_id)
# send the actual used command line to the backend
self.send_logs(task_id=task_id, lines=['Executing: {}\n'.format(full_docker_cmd)], level="INFO")
display_docker_command = self._sanitize_docker_command(full_docker_cmd)
# send the actual used command line to the backend
self.send_logs(task_id=task_id, lines=['Executing: {}\n'.format(display_docker_command)], level="INFO")
cmd = Argv(*full_docker_cmd, display_argv=display_docker_command)
cmd = Argv(*full_docker_cmd)
print('Running Docker:\n{}\n'.format(str(cmd)))
else:
cmd = worker_args.get_argv_for_command("execute") + (
@@ -871,7 +893,7 @@ class Worker(ServiceCommandSection):
# if we are in dynamic gpus / services mode,
# we should send termination signal to all child processes
if self._services_mode:
terminate_all_child_processes(timeout=120)
terminate_all_child_processes(timeout=20, include_parent=False)
# if we are here, just kill all sub processes
kill_all_child_processes()
@@ -884,7 +906,7 @@ class Worker(ServiceCommandSection):
def _dynamic_gpu_get_available(self, gpu_indexes):
# noinspection PyBroadException
try:
response = self._session.send_api(workers_api.GetAllRequest(last_seen=60))
response = self._session.send_api(workers_api.GetAllRequest(last_seen=600))
except Exception:
return None
@@ -1364,6 +1386,7 @@ class Worker(ServiceCommandSection):
service_mode_internal_agent_started = None
stopping = False
status = None
process = None
try:
_last_machine_update_ts = time()
stop_reason = None
@@ -1400,7 +1423,7 @@ class Worker(ServiceCommandSection):
# get diff from previous poll
printed_lines, stdout_pos_count = _print_file(stdout_path, stdout_pos_count)
if self._services_mode and not stopping and not status:
if self._services_mode and not stopping and status is None:
# if the internal agent started, we stop logging, it will take over logging.
# if the internal agent started running the task itself, it will return status==0,
# then we can quit the monitoring loop of this process
@@ -1420,6 +1443,8 @@ class Worker(ServiceCommandSection):
status = ex.returncode
except KeyboardInterrupt:
# so someone else will catch us
if process:
kill_all_child_processes(process.pid)
raise
except Exception:
# we should not get here, but better safe than sorry
@@ -1431,6 +1456,10 @@ class Worker(ServiceCommandSection):
stop_reason = TaskStopReason.exception
status = -1
# full cleanup (just in case)
if process:
kill_all_child_processes(process.pid)
# if running in services mode, keep the file open
# in case the docker was so quick it started and finished, check the stop reason
if self._services_mode and service_mode_internal_agent_started and stop_reason == 'Service started':
@@ -1913,7 +1942,6 @@ class Worker(ServiceCommandSection):
if current_task.script.binary and current_task.script.binary.startswith('python') and \
execution.entry_point and execution.entry_point.split()[0].strip() == '-m':
# we need to split it
import shlex
extra.extend(shlex.split(execution.entry_point))
else:
extra.append(execution.entry_point)
@@ -2342,6 +2370,10 @@ class Worker(ServiceCommandSection):
if not self.is_conda:
package_api.out_of_scope_install_package('Cython')
# add support for -r <file.txt> in requirements
if requirements_manager:
requirements_manager.set_cwd(cwd)
cached_requirements_failed = False
if cached_requirements and (cached_requirements.get('pip') is not None or
cached_requirements.get('conda') is not None):
@@ -2698,8 +2730,11 @@ class Worker(ServiceCommandSection):
if temp_config.get("agent.venvs_cache.path", None):
temp_config.put("agent.venvs_cache.path", '/root/.clearml/venvs-cache')
self._host_ssh_cache = mkdtemp(prefix='clearml_agent.ssh.')
self._temp_cleanup_list.append(self._host_ssh_cache)
if ENV_AGENT_DISABLE_SSH_MOUNT.get():
self._host_ssh_cache = None
else:
self._host_ssh_cache = mkdtemp(prefix='clearml_agent.ssh.')
self._temp_cleanup_list.append(self._host_ssh_cache)
return temp_config, partial(self._get_docker_config_cmd, temp_config=temp_config)
@@ -2721,24 +2756,31 @@ class Worker(ServiceCommandSection):
"agent.docker_pip_cache", '~/.clearml/pip-cache'))).expanduser().as_posix()
# make sure all folders are valid
Path(host_apt_cache).mkdir(parents=True, exist_ok=True)
Path(host_pip_cache).mkdir(parents=True, exist_ok=True)
Path(host_cache).mkdir(parents=True, exist_ok=True)
Path(host_pip_dl).mkdir(parents=True, exist_ok=True)
Path(host_vcs_cache).mkdir(parents=True, exist_ok=True)
Path(host_ssh_cache).mkdir(parents=True, exist_ok=True)
if host_apt_cache:
Path(host_apt_cache).mkdir(parents=True, exist_ok=True)
if host_pip_cache:
Path(host_pip_cache).mkdir(parents=True, exist_ok=True)
if host_cache:
Path(host_cache).mkdir(parents=True, exist_ok=True)
if host_pip_dl:
Path(host_pip_dl).mkdir(parents=True, exist_ok=True)
if host_vcs_cache:
Path(host_vcs_cache).mkdir(parents=True, exist_ok=True)
if host_ssh_cache:
Path(host_ssh_cache).mkdir(parents=True, exist_ok=True)
if host_venvs_cache:
Path(host_venvs_cache).mkdir(parents=True, exist_ok=True)
# copy the .ssh folder to a temp folder, to be mapped into docker
# noinspection PyBroadException
try:
if Path(host_ssh_cache).is_dir():
shutil.rmtree(host_ssh_cache, ignore_errors=True)
shutil.copytree(Path('~/.ssh').expanduser().as_posix(), host_ssh_cache)
except Exception:
host_ssh_cache = None
self.log.warning('Failed creating temporary copy of ~/.ssh for git credential')
if host_ssh_cache:
# copy the .ssh folder to a temp folder, to be mapped into docker
# noinspection PyBroadException
try:
if Path(host_ssh_cache).is_dir():
shutil.rmtree(host_ssh_cache, ignore_errors=True)
shutil.copytree(Path('~/.ssh').expanduser().as_posix(), host_ssh_cache)
except Exception:
host_ssh_cache = None
self.log.warning('Failed creating temporary copy of ~/.ssh for git credential')
# check if the .git credentials exist:
try:
@@ -3085,7 +3127,7 @@ class Worker(ServiceCommandSection):
warning('Could not terminate process pid={}'.format(pid))
return True
# wither we have a match for the worker_id or we just pick the first one, and kill it.
# either we have a match for the worker_id or we just pick the first one, and kill it.
if (worker_id and uid == worker_id) or (not worker_id and uid.startswith('{}:'.format(worker_name))):
# this is us kill it
print('Terminating clearml-agent worker_id={} pid={}'.format(uid, pid))
@@ -3148,6 +3190,33 @@ class Worker(ServiceCommandSection):
queue_ids.append(q_id)
return queue_ids
def _sanitize_docker_command(self, docker_command):
# type: (List[str]) -> List[str]
if not self._session.config.get('agent.hide_docker_command_env_vars.enabled', False):
return docker_command
keys = set(self._session.config.get('agent.hide_docker_command_env_vars.extra_keys', []))
keys.update(
ENV_AGENT_GIT_PASS.vars,
ENV_AGENT_SECRET_KEY.vars,
ENV_AWS_SECRET_KEY.vars,
ENV_AZURE_ACCOUNT_KEY.vars
)
result = docker_command[:]
for i, item in enumerate(docker_command):
try:
if item not in ("-e", "--env"):
continue
key, sep, _ = result[i + 1].partition("=")
if key not in keys or not sep:
continue
result[i + 1] = "{}={}".format(key, "********")
except KeyError:
pass
return result
if __name__ == "__main__":
pass

View File

@@ -62,6 +62,10 @@ class EnvironmentConfig(object):
return None
ENV_AGENT_SECRET_KEY = EnvironmentConfig("CLEARML_API_SECRET_KEY", "TRAINS_API_SECRET_KEY")
ENV_AWS_SECRET_KEY = EnvironmentConfig("AWS_SECRET_ACCESS_KEY")
ENV_AZURE_ACCOUNT_KEY = EnvironmentConfig("AZURE_STORAGE_KEY")
ENVIRONMENT_CONFIG = {
"api.api_server": EnvironmentConfig("CLEARML_API_HOST", "TRAINS_API_HOST", ),
"api.files_server": EnvironmentConfig("CLEARML_FILES_HOST", "TRAINS_FILES_HOST", ),
@@ -69,9 +73,7 @@ ENVIRONMENT_CONFIG = {
"api.credentials.access_key": EnvironmentConfig(
"CLEARML_API_ACCESS_KEY", "TRAINS_API_ACCESS_KEY",
),
"api.credentials.secret_key": EnvironmentConfig(
"CLEARML_API_SECRET_KEY", "TRAINS_API_SECRET_KEY",
),
"api.credentials.secret_key": ENV_AGENT_SECRET_KEY,
"agent.worker_name": EnvironmentConfig("CLEARML_WORKER_NAME", "TRAINS_WORKER_NAME", ),
"agent.worker_id": EnvironmentConfig("CLEARML_WORKER_ID", "TRAINS_WORKER_ID", ),
"agent.cuda_version": EnvironmentConfig(
@@ -84,10 +86,10 @@ ENVIRONMENT_CONFIG = {
names=("CLEARML_CPU_ONLY", "TRAINS_CPU_ONLY", "CPU_ONLY"), type=bool
),
"sdk.aws.s3.key": EnvironmentConfig("AWS_ACCESS_KEY_ID"),
"sdk.aws.s3.secret": EnvironmentConfig("AWS_SECRET_ACCESS_KEY"),
"sdk.aws.s3.secret": ENV_AWS_SECRET_KEY,
"sdk.aws.s3.region": EnvironmentConfig("AWS_DEFAULT_REGION"),
"sdk.azure.storage.containers.0": {'account_name': EnvironmentConfig("AZURE_STORAGE_ACCOUNT"),
'account_key': EnvironmentConfig("AZURE_STORAGE_KEY")},
'account_key': ENV_AZURE_ACCOUNT_KEY},
"sdk.google.storage.credentials_json": EnvironmentConfig("GOOGLE_APPLICATION_CREDENTIALS"),
}
@@ -132,6 +134,7 @@ ENV_DOCKER_SKIP_GPUS_FLAG = EnvironmentConfig('CLEARML_DOCKER_SKIP_GPUS_FLAG', '
ENV_AGENT_GIT_USER = EnvironmentConfig('CLEARML_AGENT_GIT_USER', 'TRAINS_AGENT_GIT_USER')
ENV_AGENT_GIT_PASS = EnvironmentConfig('CLEARML_AGENT_GIT_PASS', 'TRAINS_AGENT_GIT_PASS')
ENV_AGENT_GIT_HOST = EnvironmentConfig('CLEARML_AGENT_GIT_HOST', 'TRAINS_AGENT_GIT_HOST')
ENV_AGENT_DISABLE_SSH_MOUNT = EnvironmentConfig('CLEARML_AGENT_DISABLE_SSH_MOUNT', type=bool)
ENV_TASK_EXECUTE_AS_USER = EnvironmentConfig('CLEARML_AGENT_EXEC_USER', 'TRAINS_AGENT_EXEC_USER')
ENV_TASK_EXTRA_PYTHON_PATH = EnvironmentConfig('CLEARML_AGENT_EXTRA_PYTHON_PATH', 'TRAINS_AGENT_EXTRA_PYTHON_PATH')
ENV_DOCKER_HOST_MOUNT = EnvironmentConfig('CLEARML_AGENT_K8S_HOST_MOUNT', 'CLEARML_AGENT_DOCKER_HOST_MOUNT',

View File

@@ -4,13 +4,14 @@ import warnings
from .requirement import Requirement
def parse(reqstr):
def parse(reqstr, cwd=None):
"""
Parse a requirements file into a list of Requirements
See: pip/req.py:parse_requirements()
:param reqstr: a string or file like object containing requirements
:param cwd: Optional current working dir for -r file.txt loading
:returns: a *generator* of Requirement objects
"""
filename = getattr(reqstr, 'name', None)
@@ -32,8 +33,8 @@ def parse(reqstr):
continue
elif line.startswith('-r') or line.startswith('--requirement'):
_, new_filename = line.split()
new_file_path = os.path.join(os.path.dirname(filename or '.'),
new_filename)
new_file_path = os.path.join(
os.path.dirname(filename or '.') if filename or not cwd else cwd, new_filename)
with open(new_file_path) as f:
for requirement in parse(f):
yield requirement

View File

@@ -20,6 +20,15 @@ VCS_REGEX = re.compile(
r'(#(?P<fragment>\S+))?'
)
VCS_EXT_REGEX = re.compile(
r'^(?P<scheme>{0})(@)'.format(r'|'.join(
[scheme.replace('+', r'\+') for scheme in ['git+git']])) +
r'((?P<login>[^/@]+)@)?'
r'(?P<path>[^#@]+)'
r'(@(?P<revision>[^#]+))?'
r'(#(?P<fragment>\S+))?'
)
# This matches just about everyting
LOCAL_REGEX = re.compile(
r'^((?P<scheme>file)://)?'
@@ -100,7 +109,7 @@ class Requirement(object):
req = cls('-e {0}'.format(line))
req.editable = True
vcs_match = VCS_REGEX.match(line)
vcs_match = VCS_REGEX.match(line) or VCS_EXT_REGEX.match(line)
local_match = LOCAL_REGEX.match(line)
if vcs_match is not None:
@@ -147,7 +156,7 @@ class Requirement(object):
req = cls(line)
vcs_match = VCS_REGEX.match(line)
vcs_match = VCS_REGEX.match(line) or VCS_EXT_REGEX.match(line)
uri_match = URI_REGEX.match(line)
local_match = LOCAL_REGEX.match(line)
@@ -226,7 +235,7 @@ class Requirement(object):
# check if the name is valid & parsed
Req.parse(name)
# if we are here, name is a valid package name, check if the vcs part is valid
if VCS_REGEX.match(uri):
if VCS_REGEX.match(uri) or VCS_EXT_REGEX.match(uri):
req = cls.parse_line(uri)
req.name = name
return req

View File

@@ -12,12 +12,12 @@ from copy import deepcopy
from pathlib import Path
from threading import Thread
from time import sleep
from typing import Text, List
from typing import Text, List, Callable, Any, Collection, Optional, Union
import yaml
from clearml_agent.commands.events import Events
from clearml_agent.commands.worker import Worker
from clearml_agent.commands.worker import Worker, get_task_container
from clearml_agent.definitions import ENV_DOCKER_IMAGE
from clearml_agent.errors import APIError
from clearml_agent.helper.base import safe_remove_file
@@ -31,16 +31,18 @@ class K8sIntegration(Worker):
K8S_PENDING_QUEUE = "k8s_scheduler"
K8S_DEFAULT_NAMESPACE = "clearml"
AGENT_LABEL = "CLEARML=agent"
LIMIT_POD_LABEL = "ai.allegro.agent.serial=pod-{pod_number}"
KUBECTL_APPLY_CMD = "kubectl apply --namespace={namespace} -f"
KUBECTL_RUN_CMD = "kubectl run clearml-{queue_name}-id-{task_id} " \
"--image {docker_image} " \
KUBECTL_RUN_CMD = "kubectl run clearml-id-{task_id} " \
"--image {docker_image} {docker_args} " \
"--restart=Never " \
"--namespace={namespace}"
KUBECTL_DELETE_CMD = "kubectl delete pods " \
"--selector=TRAINS=agent " \
"--selector={selector} " \
"--field-selector=status.phase!=Pending,status.phase!=Running " \
"--namespace={namespace}"
@@ -72,12 +74,10 @@ class K8sIntegration(Worker):
"[ ! -z $LOCAL_PYTHON ] || export LOCAL_PYTHON=python3",
"$LOCAL_PYTHON -m pip install clearml-agent",
"{extra_bash_init_cmd}",
"{extra_docker_bash_script}",
"$LOCAL_PYTHON -m clearml_agent execute --full-monitoring --require-queue --id {task_id}"
]
AGENT_LABEL = "TRAINS=agent"
LIMIT_POD_LABEL = "ai.allegro.agent.serial=pod-{pod_number}"
_edit_hyperparams_version = "2.9"
def __init__(
@@ -104,7 +104,7 @@ class K8sIntegration(Worker):
:param str k8s_pending_queue_name: queue name to use when task is pending in the k8s scheduler
:param str|callable kubectl_cmd: kubectl command line str, supports formatting (default: KUBECTL_RUN_CMD)
example: "task={task_id} image={docker_image} queue_id={queue_id}"
or a callable function: kubectl_cmd(task_id, docker_image, queue_id, task_data)
or a callable function: kubectl_cmd(task_id, docker_image, docker_args, queue_id, task_data)
:param str container_bash_script: container bash script to be executed in k8s (default: CONTAINER_BASH_SCRIPT)
Notice this string will use format() call, if you have curly brackets they should be doubled { -> {{
Format arguments passed: {task_id} and {extra_bash_init_cmd}
@@ -275,16 +275,12 @@ class K8sIntegration(Worker):
task_id, self.k8s_pending_queue_name, e))
return
if task_data.execution.docker_cmd:
docker_cmd = task_data.execution.docker_cmd
else:
docker_cmd = str(ENV_DOCKER_IMAGE.get() or
self._session.config.get("agent.default_docker.image", "nvidia/cuda"))
# take the first part, this is the docker image name (not arguments)
docker_parts = docker_cmd.split()
docker_image = docker_parts[0]
docker_args = docker_parts[1:] if len(docker_parts) > 1 else []
container = get_task_container(self._session, task_id)
if not container.get('image'):
container['image'] = str(
ENV_DOCKER_IMAGE.get() or self._session.config.get("agent.default_docker.image", "nvidia/cuda")
)
container['arguments'] = self._session.config.get("agent.default_docker.arguments", None)
# get the clearml.conf encoded file
# noinspection PyProtectedMember
@@ -307,10 +303,6 @@ class K8sIntegration(Worker):
except Exception:
queue_name = 'k8s'
# conform queue name to k8s standards
safe_queue_name = queue_name.lower().strip()
safe_queue_name = re.sub(r'\W+', '', safe_queue_name).replace('_', '').replace('-', '')
# Search for a free pod number
pod_count = 0
pod_number = self.base_pod_num
@@ -374,23 +366,28 @@ class K8sIntegration(Worker):
pod_count += 1
labels = ([self.LIMIT_POD_LABEL.format(pod_number=pod_number)] if self.ports_mode else []) + [self.AGENT_LABEL]
labels.append("clearml-agent-queue={}".format(self._safe_k8s_label_value(queue)))
labels.append("clearml-agent-queue-name={}".format(self._safe_k8s_label_value(queue_name)))
if self.ports_mode:
print("Kubernetes scheduling task id={} on pod={} (pod_count={})".format(task_id, pod_number, pod_count))
else:
print("Kubernetes scheduling task id={}".format(task_id))
kubectl_kwargs = dict(
create_clearml_conf=create_clearml_conf,
labels=labels,
docker_image=container['image'],
docker_args=container['arguments'],
docker_bash=container.get('setup_shell_script'),
task_id=task_id,
queue=queue
)
if self.template_dict:
output, error = self._kubectl_apply(
create_clearml_conf=create_clearml_conf,
labels=labels, docker_image=docker_image, docker_args=docker_args,
task_id=task_id, queue=queue, queue_name=safe_queue_name)
output, error = self._kubectl_apply(**kubectl_kwargs)
else:
output, error = self._kubectl_run(
create_clearml_conf=create_clearml_conf,
labels=labels, docker_image=docker_cmd,
task_data=task_data,
task_id=task_id, queue=queue, queue_name=safe_queue_name)
output, error = self._kubectl_run(task_data=task_data, **kubectl_kwargs)
error = '' if not error else (error if isinstance(error, str) else error.decode('utf-8'))
output = '' if not output else (output if isinstance(output, str) else output.decode('utf-8'))
@@ -424,25 +421,38 @@ class K8sIntegration(Worker):
**user_props
)
def _parse_docker_args(self, docker_args):
# type: (list) -> dict
kube_args = []
while docker_args:
cmd = docker_args.pop(0).strip()
if cmd in ('-e', '--env',):
env = docker_args.pop(0).strip()
key, value = env.split('=', 1)
kube_args.append({'name': key, 'value': value})
def _get_docker_args(self, docker_args, flags, target=None, convert=None):
# type: (List[str], Collection[str], Optional[str], Callable[[str], Any]) -> Union[dict, List[str]]
"""
Get docker args matching specific flags.
:argument docker_args: List of docker argument strings (flags and values)
:argument flags: List of flags/names to intercept (e.g. "--env" etc.)
:argument target: Controls return format. If provided, returns a dict with a target field containing a list
of result strings, otherwise returns a list of result strings
:argument convert: Optional conversion function for each result string
"""
args = docker_args[:] if docker_args else []
results = []
while args:
cmd = args.pop(0).strip()
if cmd in flags:
env = args.pop(0).strip()
if convert:
env = convert(env)
results.append(env)
else:
self.log.warning('skipping docker argument {} (only -e --env supported)'.format(cmd))
return {'env': kube_args} if kube_args else {}
if target:
return {target: results} if results else {}
return results
def _kubectl_apply(self, create_clearml_conf, docker_image, docker_args, labels, queue, task_id, queue_name):
def _kubectl_apply(self, create_clearml_conf, docker_image, docker_args, docker_bash, labels, queue, task_id):
template = deepcopy(self.template_dict)
template.setdefault('apiVersion', 'v1')
template['kind'] = 'Pod'
template.setdefault('metadata', {})
name = 'clearml-{queue}-id-{task_id}'.format(queue=queue_name, task_id=task_id)
name = 'clearml-id-{task_id}'.format(task_id=task_id)
template['metadata']['name'] = name
template.setdefault('spec', {})
template['spec'].setdefault('containers', [])
@@ -451,14 +461,26 @@ class K8sIntegration(Worker):
labels_dict = dict(pair.split('=', 1) for pair in labels)
template['metadata'].setdefault('labels', {})
template['metadata']['labels'].update(labels_dict)
container = self._parse_docker_args(docker_args)
container = self._get_docker_args(
docker_args,
target="env",
flags={"-e", "--env"},
convert=lambda env: {'name': env.partition("=")[0], 'value': env.partition("=")[2]},
)
container_bash_script = [self.container_bash_script] if isinstance(self.container_bash_script, str) \
else self.container_bash_script
extra_docker_bash_script = '\n'.join(self._session.config.get("agent.extra_docker_shell_script", None) or [])
if docker_bash:
extra_docker_bash_script += '\n' + str(docker_bash) + '\n'
script_encoded = '\n'.join(
['#!/bin/bash', ] +
[line.format(extra_bash_init_cmd=self.extra_bash_init_script or '', task_id=task_id)
[line.format(extra_bash_init_cmd=self.extra_bash_init_script or '',
task_id=task_id,
extra_docker_bash_script=extra_docker_bash_script)
for line in container_bash_script])
create_init_script = \
@@ -486,6 +508,10 @@ class K8sIntegration(Worker):
with open(yaml_file, 'wt') as f:
yaml.dump(template, f)
if self._docker_force_pull:
for c in template['spec']['containers']:
c.setdefault('imagePullPolicy', 'Always')
kubectl_cmd = self.KUBECTL_APPLY_CMD.format(
task_id=task_id,
docker_image=docker_image,
@@ -508,14 +534,18 @@ class K8sIntegration(Worker):
return output, error
def _kubectl_run(self, create_clearml_conf, docker_image, labels, queue, task_data, task_id, queue_name):
def _kubectl_run(
self, create_clearml_conf, docker_image, docker_args, docker_bash, labels, queue, task_data, task_id
):
if callable(self.kubectl_cmd):
kubectl_cmd = self.kubectl_cmd(task_id, docker_image, queue, task_data, queue_name)
kubectl_cmd = self.kubectl_cmd(task_id, docker_image, docker_args, queue, task_data)
else:
kubectl_cmd = self.kubectl_cmd.format(
queue_name=queue_name,
task_id=task_id,
docker_image=docker_image,
docker_args=" ".join(self._get_docker_args(
docker_args, flags={"-e", "--env"}, convert=lambda env: '--env={}'.format(env))
),
queue_id=queue,
namespace=self.namespace,
)
@@ -531,6 +561,9 @@ class K8sIntegration(Worker):
if self.pod_requests:
kubectl_cmd += ['--requests', ",".join(self.pod_requests)]
if self._docker_force_pull and not any(x.startswith("--image-pull-policy=") for x in kubectl_cmd):
kubectl_cmd += ["--image-pull-policy='always'"]
container_bash_script = [self.container_bash_script] if isinstance(self.container_bash_script, str) \
else self.container_bash_script
container_bash_script = ' ; '.join(container_bash_script)
@@ -542,7 +575,10 @@ class K8sIntegration(Worker):
"/bin/sh",
"-c",
"{} ; {}".format(create_clearml_conf, container_bash_script.format(
extra_bash_init_cmd=self.extra_bash_init_script, task_id=task_id)),
extra_bash_init_cmd=self.extra_bash_init_script or "",
extra_docker_bash_script=docker_bash or "",
task_id=task_id
)),
]
process = subprocess.Popen(kubectl_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate()
@@ -575,7 +611,7 @@ class K8sIntegration(Worker):
# iterate over queues (priority style, queues[0] is highest)
for queue in queues:
# delete old completed / failed pods
get_bash_output(self.KUBECTL_DELETE_CMD.format(namespace=self.namespace))
get_bash_output(self.KUBECTL_DELETE_CMD.format(namespace=self.namespace, selector=self.AGENT_LABEL))
# get next task in queue
try:
@@ -641,3 +677,13 @@ class K8sIntegration(Worker):
return merge_dicts(
c1, c2, custom_merge_func=merge_env
)
@staticmethod
def _safe_k8s_label_value(value):
""" Conform string to k8s standards for a label value """
value = value.lower().strip()
value = re.sub(r'^[^A-Za-z0-9]+', '', value) # strip leading non-alphanumeric chars
value = re.sub(r'[^A-Za-z0-9]+$', '', value) # strip trailing non-alphanumeric chars
value = re.sub(r'\W+', '-', value) # allow only word chars (this removed "." which is supported, but nvm)
value = re.sub(r'-+', '-', value) # don't leave messy "--" after replacing previous chars
return value[:63]

View File

@@ -448,10 +448,14 @@ class RequirementsManager(object):
self.translator = RequirementsTranslator(session, interpreter=base_interpreter,
cache_dir=pip_cache_dir.as_posix())
self._base_interpreter = base_interpreter
self._cwd = None
def register(self, cls): # type: (Type[RequirementSubstitution]) -> None
self.handlers.append(cls(self._session))
def set_cwd(self, cwd):
self._cwd = str(cwd) if cwd else None
def _replace_one(self, req): # type: (MarkerRequirement) -> Optional[Text]
match = re.search(r';\s*(.*)', Text(req))
if match:
@@ -466,7 +470,7 @@ class RequirementsManager(object):
def replace(self, requirements): # type: (Text) -> Text
def safe_parse(req_str):
try:
return next(parse(req_str))
return next(parse(req_str, cwd=self._cwd))
except Exception as ex:
return Requirement(req_str)

View File

@@ -42,20 +42,31 @@ def get_bash_output(cmd, strip=False, stderr=subprocess.STDOUT, stdin=False):
return output if not strip or not output else output.strip()
def terminate_process(pid, timeout=10., ignore_zombie=True):
def terminate_process(pid, timeout=10., ignore_zombie=True, include_children=False):
# noinspection PyBroadException
try:
proc = psutil.Process(pid)
children = proc.children(recursive=True) if include_children else []
proc.terminate()
cnt = 0
while proc.is_running() and (ignore_zombie or proc.status() != 'zombie') and cnt < timeout:
sleep(1.)
cnt += 1
proc.terminate()
# terminate children
for c in children:
c.terminate()
cnt = 0
while proc.is_running() and (ignore_zombie or proc.status() != 'zombie') and cnt < timeout:
sleep(1.)
cnt += 1
# kill children
for c in children:
c.kill()
proc.kill()
except Exception:
pass
@@ -66,9 +77,8 @@ def terminate_process(pid, timeout=10., ignore_zombie=True):
return True
def kill_all_child_processes(pid=None):
def kill_all_child_processes(pid=None, include_parent=True):
# get current process if pid not provided
include_parent = True
if not pid:
pid = os.getpid()
include_parent = False
@@ -96,7 +106,7 @@ def terminate_all_child_processes(pid=None, timeout=10., include_parent=True):
return
for child in parent.children(recursive=False):
print('Terminating child process {}'.format(child.pid))
terminate_process(child.pid, timeout=timeout, ignore_zombie=False)
terminate_process(child.pid, timeout=timeout, ignore_zombie=False, include_children=True)
if include_parent:
terminate_process(parent.pid, timeout=timeout, ignore_zombie=False)
@@ -211,6 +221,7 @@ class Argv(Executable):
"""
self.argv = argv
self._log = kwargs.pop("log", None)
self._display_argv = kwargs.pop("display_argv", argv)
if not self._log:
self._log = logging.getLogger(__name__)
self._log.propagate = False
@@ -235,10 +246,10 @@ class Argv(Executable):
return self.argv
def __repr__(self):
return "<Argv{}>".format(self.argv)
return "<Argv{}>".format(self._display_argv)
def __str__(self):
return "Executing: {}".format(self.argv)
return "Executing: {}".format(self._display_argv)
def __iter__(self):
if is_windows_platform():

View File

@@ -591,7 +591,7 @@ def clone_repository_cached(session, execution, destination):
# mock lock
repo_lock = Lock()
repo_lock_timeout_sec = 300
repo_url = execution.repository # type: str
repo_url = execution.repository or '' # type: str
parsed_url = furl(repo_url)
no_password_url = parsed_url.copy().remove(password=True).url

View File

@@ -1 +1 @@
__version__ = '1.0.0'
__version__ = '1.0.1rc1'

View File

@@ -42,6 +42,9 @@ agent {
# Example values: "/usr/bin/python3" or "/usr/local/bin/python3.6"
# The default is the python executing the clearml_agent
python_binary: ""
# ignore any requested python version (Default: False, if a Task was using a
# specific python version and the system supports multiple python the agent will use the requested python version)
# ignore_requested_python_version: true
# select python package manager:
# currently supported pip and conda

View File

@@ -9,7 +9,7 @@ pyhocon>=0.3.38,<0.4.0
pyparsing>=2.0.3,<2.5.0
python-dateutil>=2.4.2,<2.9.0
pyjwt>=1.6.4,<2.1.0
PyYAML>=3.12,<5.4.0
PyYAML>=3.12,<5.5.0
requests>=2.20.0,<2.26.0
six>=1.11.0,<1.16.0
typing>=3.6.4,<3.8.0