Compare commits

..

9 Commits

Author SHA1 Message Date
allegroai
3a07bfe1d7 Version bump 2021-05-31 23:19:46 +03:00
allegroai
0694b9e8af Fix PyYAML supported versions 2021-05-26 18:33:35 +03:00
allegroai
742cbf5767 Add docker environment arguments log masking support (issue #67) 2021-05-25 19:31:45 +03:00
allegroai
e93384b99b Fix --stop with dynamic gpus 2021-05-20 10:58:46 +03:00
allegroai
3c4e976093 Add agent.ignore_requested_python_version to config file 2021-05-19 15:20:44 +03:00
allegroai
1e795beec8 Fix support for spaces in docker arguments (issue #358) 2021-05-19 15:20:03 +03:00
allegroai
4f7407084d Fix standalone script with pre-exiting conda venv 2021-05-12 15:46:25 +03:00
allegroai
ae3d034531 Protect against None in execution.repository 2021-05-12 15:45:31 +03:00
allegroai
a2db1f5ab5 Remove queue name from pod name in k8s glue, add queue name and ID to pod labels (issue #64) 2021-05-05 12:03:35 +03:00
9 changed files with 134 additions and 42 deletions

View File

@@ -26,6 +26,9 @@
# Example values: "/usr/bin/python3" or "/usr/local/bin/python3.6"
# The default is the python executing the clearml_agent
python_binary: ""
# ignore any requested python version (Default: False, if a Task was using a
# specific python version and the system supports multiple python the agent will use the requested python version)
# ignore_requested_python_version: true
# select python package manager:
# currently supported pip and conda
@@ -182,4 +185,16 @@
# should be detected automatically. Override with os environment CUDA_VERSION / CUDNN_VERSION
# cuda_version: 10.1
# cudnn_version: 7.6
# Hide docker environment variables containing secrets when printing out the docker command by replacing their
# values with "********". Turning this feature on will hide the following environment variables values:
# CLEARML_API_SECRET_KEY, CLEARML_AGENT_GIT_PASS, AWS_SECRET_ACCESS_KEY, AZURE_STORAGE_KEY
# To include more environment variables, add their keys to the "extra_keys" list. E.g. to make sure the value of
# your custom environment variable named MY_SPECIAL_PASSWORD will not show in the logs when included in the
# docker command, set:
# extra_keys: ["MY_SPECIAL_PASSWORD"]
hide_docker_command_env_vars {
enabled: true
extra_keys: []
}
}

View File

@@ -11,6 +11,7 @@ import subprocess
import sys
import shutil
import traceback
import shlex
from collections import defaultdict
from copy import deepcopy, copy
from datetime import datetime
@@ -19,7 +20,7 @@ from functools import partial, cmp_to_key
from itertools import chain
from tempfile import mkdtemp, NamedTemporaryFile
from time import sleep, time
from typing import Text, Optional, Any, Tuple
from typing import Text, Optional, Any, Tuple, List
import attr
import psutil
@@ -43,7 +44,13 @@ from clearml_agent.definitions import (
ENV_DOCKER_HOST_MOUNT,
ENV_TASK_EXTRA_PYTHON_PATH,
ENV_AGENT_GIT_USER,
ENV_AGENT_GIT_PASS, ENV_WORKER_ID, ENV_DOCKER_SKIP_GPUS_FLAG, )
ENV_AGENT_GIT_PASS,
ENV_WORKER_ID,
ENV_DOCKER_SKIP_GPUS_FLAG,
ENV_AGENT_SECRET_KEY,
ENV_AWS_SECRET_KEY,
ENV_AZURE_ACCOUNT_KEY,
)
from clearml_agent.definitions import WORKING_REPOSITORY_DIR, PIP_EXTRA_INDICES
from clearml_agent.errors import APIError, CommandFailedError, Sigterm
from clearml_agent.helper.base import (
@@ -67,7 +74,9 @@ from clearml_agent.helper.base import (
get_python_path,
is_linux_platform,
rm_file,
add_python_path, safe_remove_tree, )
add_python_path,
safe_remove_tree,
)
from clearml_agent.helper.console import ensure_text, print_text, decode_binary_lines
from clearml_agent.helper.os.daemonize import daemonize_process
from clearml_agent.helper.package.base import PackageManager
@@ -90,7 +99,10 @@ from clearml_agent.helper.process import (
get_bash_output,
shutdown_docker_process,
get_docker_id,
commit_docker, terminate_process, check_if_command_exists, terminate_all_child_processes,
commit_docker,
terminate_process,
check_if_command_exists,
terminate_all_child_processes,
)
from clearml_agent.helper.package.priority_req import PriorityPackageRequirement, PackageCollectorRequirement
from clearml_agent.helper.repo import clone_repository_cached, RepoInfo, VCS, fix_package_import_diff_patch
@@ -187,7 +199,7 @@ class LiteralScriptManager(object):
location = location or (repo_info and repo_info.root)
if not location:
location = Path(self.venv_folder, "code")
location.mkdir(exist_ok=True)
location.mkdir(exist_ok=True, parents=True)
log.debug("selected execution directory: %s", location)
return Text(location), self.write(task, location, execution.entry_point)
@@ -221,6 +233,9 @@ def get_task(session, task_id, *args, **kwargs):
def get_task_container(session, task_id):
"""
Returns dict with Task docker container setup {container: '', arguments: '', setup_shell_script: ''}
"""
if session.check_min_api_version("2.13"):
result = session.send_request(
service='tasks',
@@ -233,12 +248,12 @@ def get_task_container(session, task_id):
try:
container = result.json()['data']['tasks'][0]['container'] if result.ok else {}
if container.get('arguments'):
container['arguments'] = str(container.get('arguments')).split(' ')
container['arguments'] = shlex.split(str(container.get('arguments')).strip())
except (ValueError, TypeError):
container = {}
else:
response = get_task(session, task_id, only_fields=["execution.docker_cmd"])
task_docker_cmd_parts = str(response.execution.docker_cmd or '').strip().split(' ')
task_docker_cmd_parts = shlex.split(str(response.execution.docker_cmd or '').strip())
try:
container = dict(
container=task_docker_cmd_parts[0],
@@ -251,11 +266,14 @@ def get_task_container(session, task_id):
def set_task_container(session, task_id, docker_image=None, docker_arguments=None, docker_bash_script=None):
if docker_arguments and isinstance(docker_arguments, str):
docker_arguments = [docker_arguments]
if session.check_min_api_version("2.13"):
container = dict(
image=docker_image or None,
arguments=' '.join(docker_arguments) if docker_arguments else None,
setup_shell_script=docker_bash_script or None,
image=docker_image or '',
arguments=' '.join(docker_arguments) if docker_arguments else '',
setup_shell_script=docker_bash_script or '',
)
result = session.send_request(
service='tasks',
@@ -614,10 +632,13 @@ class Worker(ServiceCommandSection):
'--standalone-mode' if self._standalone_mode else '',
task_id)
# send the actual used command line to the backend
self.send_logs(task_id=task_id, lines=['Executing: {}\n'.format(full_docker_cmd)], level="INFO")
display_docker_command = self._sanitize_docker_command(full_docker_cmd)
# send the actual used command line to the backend
self.send_logs(task_id=task_id, lines=['Executing: {}\n'.format(display_docker_command)], level="INFO")
cmd = Argv(*full_docker_cmd, display_argv=display_docker_command)
cmd = Argv(*full_docker_cmd)
print('Running Docker:\n{}\n'.format(str(cmd)))
else:
cmd = worker_args.get_argv_for_command("execute") + (
@@ -871,7 +892,7 @@ class Worker(ServiceCommandSection):
# if we are in dynamic gpus / services mode,
# we should send termination signal to all child processes
if self._services_mode:
terminate_all_child_processes(timeout=120)
terminate_all_child_processes(timeout=20, include_parent=False)
# if we are here, just kill all sub processes
kill_all_child_processes()
@@ -1364,6 +1385,7 @@ class Worker(ServiceCommandSection):
service_mode_internal_agent_started = None
stopping = False
status = None
process = None
try:
_last_machine_update_ts = time()
stop_reason = None
@@ -1420,6 +1442,8 @@ class Worker(ServiceCommandSection):
status = ex.returncode
except KeyboardInterrupt:
# so someone else will catch us
if process:
kill_all_child_processes(process.pid)
raise
except Exception:
# we should not get here, but better safe than sorry
@@ -1431,6 +1455,10 @@ class Worker(ServiceCommandSection):
stop_reason = TaskStopReason.exception
status = -1
# full cleanup (just in case)
if process:
kill_all_child_processes(process.pid)
# if running in services mode, keep the file open
# in case the docker was so quick it started and finished, check the stop reason
if self._services_mode and service_mode_internal_agent_started and stop_reason == 'Service started':
@@ -1913,7 +1941,6 @@ class Worker(ServiceCommandSection):
if current_task.script.binary and current_task.script.binary.startswith('python') and \
execution.entry_point and execution.entry_point.split()[0].strip() == '-m':
# we need to split it
import shlex
extra.extend(shlex.split(execution.entry_point))
else:
extra.append(execution.entry_point)
@@ -3085,7 +3112,7 @@ class Worker(ServiceCommandSection):
warning('Could not terminate process pid={}'.format(pid))
return True
# wither we have a match for the worker_id or we just pick the first one, and kill it.
# either we have a match for the worker_id or we just pick the first one, and kill it.
if (worker_id and uid == worker_id) or (not worker_id and uid.startswith('{}:'.format(worker_name))):
# this is us kill it
print('Terminating clearml-agent worker_id={} pid={}'.format(uid, pid))
@@ -3148,6 +3175,33 @@ class Worker(ServiceCommandSection):
queue_ids.append(q_id)
return queue_ids
def _sanitize_docker_command(self, docker_command):
# type: (List[str]) -> List[str]
if not self._session.config.get('agent.hide_docker_command_env_vars.enabled', False):
return docker_command
keys = set(self._session.config.get('agent.hide_docker_command_env_vars.extra_keys', []))
keys.update(
ENV_AGENT_GIT_PASS.vars,
ENV_AGENT_SECRET_KEY.vars,
ENV_AWS_SECRET_KEY.vars,
ENV_AZURE_ACCOUNT_KEY.vars
)
result = docker_command[:]
for i, item in enumerate(docker_command):
try:
if item not in ("-e", "--env"):
continue
key, sep, _ = result[i + 1].partition("=")
if key not in keys or not sep:
continue
result[i + 1] = "{}={}".format(key, "********")
except KeyError:
pass
return result
if __name__ == "__main__":
pass

View File

@@ -62,6 +62,10 @@ class EnvironmentConfig(object):
return None
ENV_AGENT_SECRET_KEY = EnvironmentConfig("CLEARML_API_SECRET_KEY", "TRAINS_API_SECRET_KEY")
ENV_AWS_SECRET_KEY = EnvironmentConfig("AWS_SECRET_ACCESS_KEY")
ENV_AZURE_ACCOUNT_KEY = EnvironmentConfig("AZURE_STORAGE_KEY")
ENVIRONMENT_CONFIG = {
"api.api_server": EnvironmentConfig("CLEARML_API_HOST", "TRAINS_API_HOST", ),
"api.files_server": EnvironmentConfig("CLEARML_FILES_HOST", "TRAINS_FILES_HOST", ),
@@ -69,9 +73,7 @@ ENVIRONMENT_CONFIG = {
"api.credentials.access_key": EnvironmentConfig(
"CLEARML_API_ACCESS_KEY", "TRAINS_API_ACCESS_KEY",
),
"api.credentials.secret_key": EnvironmentConfig(
"CLEARML_API_SECRET_KEY", "TRAINS_API_SECRET_KEY",
),
"api.credentials.secret_key": ENV_AGENT_SECRET_KEY,
"agent.worker_name": EnvironmentConfig("CLEARML_WORKER_NAME", "TRAINS_WORKER_NAME", ),
"agent.worker_id": EnvironmentConfig("CLEARML_WORKER_ID", "TRAINS_WORKER_ID", ),
"agent.cuda_version": EnvironmentConfig(
@@ -84,10 +86,10 @@ ENVIRONMENT_CONFIG = {
names=("CLEARML_CPU_ONLY", "TRAINS_CPU_ONLY", "CPU_ONLY"), type=bool
),
"sdk.aws.s3.key": EnvironmentConfig("AWS_ACCESS_KEY_ID"),
"sdk.aws.s3.secret": EnvironmentConfig("AWS_SECRET_ACCESS_KEY"),
"sdk.aws.s3.secret": ENV_AWS_SECRET_KEY,
"sdk.aws.s3.region": EnvironmentConfig("AWS_DEFAULT_REGION"),
"sdk.azure.storage.containers.0": {'account_name': EnvironmentConfig("AZURE_STORAGE_ACCOUNT"),
'account_key': EnvironmentConfig("AZURE_STORAGE_KEY")},
'account_key': ENV_AZURE_ACCOUNT_KEY},
"sdk.google.storage.credentials_json": EnvironmentConfig("GOOGLE_APPLICATION_CREDENTIALS"),
}

View File

@@ -34,7 +34,7 @@ class K8sIntegration(Worker):
KUBECTL_APPLY_CMD = "kubectl apply --namespace={namespace} -f"
KUBECTL_RUN_CMD = "kubectl run clearml-{queue_name}-id-{task_id} " \
KUBECTL_RUN_CMD = "kubectl run clearml-id-{task_id} " \
"--image {docker_image} " \
"--restart=Never " \
"--namespace={namespace}"
@@ -307,10 +307,6 @@ class K8sIntegration(Worker):
except Exception:
queue_name = 'k8s'
# conform queue name to k8s standards
safe_queue_name = queue_name.lower().strip()
safe_queue_name = re.sub(r'\W+', '', safe_queue_name).replace('_', '').replace('-', '')
# Search for a free pod number
pod_count = 0
pod_number = self.base_pod_num
@@ -374,6 +370,8 @@ class K8sIntegration(Worker):
pod_count += 1
labels = ([self.LIMIT_POD_LABEL.format(pod_number=pod_number)] if self.ports_mode else []) + [self.AGENT_LABEL]
labels.append("clearml-agent-queue={}".format(self._safe_k8s_label_value(queue)))
labels.append("clearml-agent-queue-name={}".format(self._safe_k8s_label_value(queue_name)))
if self.ports_mode:
print("Kubernetes scheduling task id={} on pod={} (pod_count={})".format(task_id, pod_number, pod_count))
@@ -384,13 +382,13 @@ class K8sIntegration(Worker):
output, error = self._kubectl_apply(
create_clearml_conf=create_clearml_conf,
labels=labels, docker_image=docker_image, docker_args=docker_args,
task_id=task_id, queue=queue, queue_name=safe_queue_name)
task_id=task_id, queue=queue)
else:
output, error = self._kubectl_run(
create_clearml_conf=create_clearml_conf,
labels=labels, docker_image=docker_cmd,
task_data=task_data,
task_id=task_id, queue=queue, queue_name=safe_queue_name)
task_id=task_id, queue=queue)
error = '' if not error else (error if isinstance(error, str) else error.decode('utf-8'))
output = '' if not output else (output if isinstance(output, str) else output.decode('utf-8'))
@@ -437,12 +435,12 @@ class K8sIntegration(Worker):
self.log.warning('skipping docker argument {} (only -e --env supported)'.format(cmd))
return {'env': kube_args} if kube_args else {}
def _kubectl_apply(self, create_clearml_conf, docker_image, docker_args, labels, queue, task_id, queue_name):
def _kubectl_apply(self, create_clearml_conf, docker_image, docker_args, labels, queue, task_id):
template = deepcopy(self.template_dict)
template.setdefault('apiVersion', 'v1')
template['kind'] = 'Pod'
template.setdefault('metadata', {})
name = 'clearml-{queue}-id-{task_id}'.format(queue=queue_name, task_id=task_id)
name = 'clearml-id-{task_id}'.format(task_id=task_id)
template['metadata']['name'] = name
template.setdefault('spec', {})
template['spec'].setdefault('containers', [])
@@ -508,12 +506,11 @@ class K8sIntegration(Worker):
return output, error
def _kubectl_run(self, create_clearml_conf, docker_image, labels, queue, task_data, task_id, queue_name):
def _kubectl_run(self, create_clearml_conf, docker_image, labels, queue, task_data, task_id):
if callable(self.kubectl_cmd):
kubectl_cmd = self.kubectl_cmd(task_id, docker_image, queue, task_data, queue_name)
kubectl_cmd = self.kubectl_cmd(task_id, docker_image, queue, task_data)
else:
kubectl_cmd = self.kubectl_cmd.format(
queue_name=queue_name,
task_id=task_id,
docker_image=docker_image,
queue_id=queue,
@@ -641,3 +638,13 @@ class K8sIntegration(Worker):
return merge_dicts(
c1, c2, custom_merge_func=merge_env
)
@staticmethod
def _safe_k8s_label_value(value):
""" Conform string to k8s standards for a label value """
value = value.lower().strip()
value = re.sub(r'^[^A-Za-z0-9]+', '', value) # strip leading non-alphanumeric chars
value = re.sub(r'[^A-Za-z0-9]+$', '', value) # strip trailing non-alphanumeric chars
value = re.sub(r'\W+', '-', value) # allow only word chars (this removed "." which is supported, but nvm)
value = re.sub(r'-+', '-', value) # don't leave messy "--" after replacing previous chars
return value[:63]

View File

@@ -42,20 +42,31 @@ def get_bash_output(cmd, strip=False, stderr=subprocess.STDOUT, stdin=False):
return output if not strip or not output else output.strip()
def terminate_process(pid, timeout=10., ignore_zombie=True):
def terminate_process(pid, timeout=10., ignore_zombie=True, include_children=False):
# noinspection PyBroadException
try:
proc = psutil.Process(pid)
children = proc.children(recursive=True) if include_children else []
proc.terminate()
cnt = 0
while proc.is_running() and (ignore_zombie or proc.status() != 'zombie') and cnt < timeout:
sleep(1.)
cnt += 1
proc.terminate()
# terminate children
for c in children:
c.terminate()
cnt = 0
while proc.is_running() and (ignore_zombie or proc.status() != 'zombie') and cnt < timeout:
sleep(1.)
cnt += 1
# kill children
for c in children:
c.kill()
proc.kill()
except Exception:
pass
@@ -66,9 +77,8 @@ def terminate_process(pid, timeout=10., ignore_zombie=True):
return True
def kill_all_child_processes(pid=None):
def kill_all_child_processes(pid=None, include_parent=True):
# get current process if pid not provided
include_parent = True
if not pid:
pid = os.getpid()
include_parent = False
@@ -96,7 +106,7 @@ def terminate_all_child_processes(pid=None, timeout=10., include_parent=True):
return
for child in parent.children(recursive=False):
print('Terminating child process {}'.format(child.pid))
terminate_process(child.pid, timeout=timeout, ignore_zombie=False)
terminate_process(child.pid, timeout=timeout, ignore_zombie=False, include_children=True)
if include_parent:
terminate_process(parent.pid, timeout=timeout, ignore_zombie=False)
@@ -211,6 +221,7 @@ class Argv(Executable):
"""
self.argv = argv
self._log = kwargs.pop("log", None)
self._display_argv = kwargs.pop("display_argv", argv)
if not self._log:
self._log = logging.getLogger(__name__)
self._log.propagate = False
@@ -235,10 +246,10 @@ class Argv(Executable):
return self.argv
def __repr__(self):
return "<Argv{}>".format(self.argv)
return "<Argv{}>".format(self._display_argv)
def __str__(self):
return "Executing: {}".format(self.argv)
return "Executing: {}".format(self._display_argv)
def __iter__(self):
if is_windows_platform():

View File

@@ -591,7 +591,7 @@ def clone_repository_cached(session, execution, destination):
# mock lock
repo_lock = Lock()
repo_lock_timeout_sec = 300
repo_url = execution.repository # type: str
repo_url = execution.repository or '' # type: str
parsed_url = furl(repo_url)
no_password_url = parsed_url.copy().remove(password=True).url

View File

@@ -1 +1 @@
__version__ = '1.0.0'
__version__ = '1.0.1rc0'

View File

@@ -42,6 +42,9 @@ agent {
# Example values: "/usr/bin/python3" or "/usr/local/bin/python3.6"
# The default is the python executing the clearml_agent
python_binary: ""
# ignore any requested python version (Default: False, if a Task was using a
# specific python version and the system supports multiple python the agent will use the requested python version)
# ignore_requested_python_version: true
# select python package manager:
# currently supported pip and conda

View File

@@ -9,7 +9,7 @@ pyhocon>=0.3.38,<0.4.0
pyparsing>=2.0.3,<2.5.0
python-dateutil>=2.4.2,<2.9.0
pyjwt>=1.6.4,<2.1.0
PyYAML>=3.12,<5.4.0
PyYAML>=3.12,<5.5.0
requests>=2.20.0,<2.26.0
six>=1.11.0,<1.16.0
typing>=3.6.4,<3.8.0