diff --git a/clearml_agent/backend_api/config/default/agent.conf b/clearml_agent/backend_api/config/default/agent.conf index de82bf3..fa37231 100644 --- a/clearml_agent/backend_api/config/default/agent.conf +++ b/clearml_agent/backend_api/config/default/agent.conf @@ -26,6 +26,9 @@ # Example values: "/usr/bin/python3" or "/usr/local/bin/python3.6" # The default is the python executing the clearml_agent python_binary: "" + # ignore any requested python version (Default: False, if a Task was using a + # specific python version and the system supports multiple python the agent will use the requested python version) + # ignore_requested_python_version: true # select python package manager: # currently supported pip and conda @@ -182,4 +185,16 @@ # should be detected automatically. Override with os environment CUDA_VERSION / CUDNN_VERSION # cuda_version: 10.1 # cudnn_version: 7.6 + + # Hide docker environment variables containing secrets when printing out the docker command by replacing their + # values with "********". Turning this feature on will hide the following environment variables values: + # CLEARML_API_SECRET_KEY, CLEARML_AGENT_GIT_PASS, AWS_SECRET_ACCESS_KEY, AZURE_STORAGE_KEY + # To include more environment variables, add their keys to the "extra_keys" list. E.g. to make sure the value of + # your custom environment variable named MY_SPECIAL_PASSWORD will not show in the logs when included in the + # docker command, set: + # extra_keys: ["MY_SPECIAL_PASSWORD"] + hide_docker_command_env_vars { + enabled: true + extra_keys: [] + } } diff --git a/clearml_agent/backend_api/session/session.py b/clearml_agent/backend_api/session/session.py index 27fc877..dbf8cf4 100644 --- a/clearml_agent/backend_api/session/session.py +++ b/clearml_agent/backend_api/session/session.py @@ -111,7 +111,8 @@ class Session(TokenManager): self._logger = logger self.__access_key = api_key or ENV_ACCESS_KEY.get( - default=(self.config.get("api.credentials.access_key", None) or self.default_key) + default=(self.config.get("api.credentials.access_key", None) or self.default_key), + value_cb=lambda key, value: logger.info("Using environment access key {}={}".format(key, value)) ) if not self.access_key: raise ValueError( @@ -119,7 +120,8 @@ class Session(TokenManager): ) self.__secret_key = secret_key or ENV_SECRET_KEY.get( - default=(self.config.get("api.credentials.secret_key", None) or self.default_secret) + default=(self.config.get("api.credentials.secret_key", None) or self.default_secret), + value_cb=lambda key, value: logger.info("Using environment secret key {}=********".format(key)) ) if not self.secret_key: raise ValueError( diff --git a/clearml_agent/backend_config/entry.py b/clearml_agent/backend_config/entry.py index 1107563..489d326 100644 --- a/clearml_agent/backend_config/entry.py +++ b/clearml_agent/backend_config/entry.py @@ -64,8 +64,8 @@ class Entry(object): converter = self.default_conversions().get(self.type, self.type) return converter(value) - def get_pair(self, default=NotSet, converter=None): - # type: (Any, Converter) -> Optional[Tuple[Text, Any]] + def get_pair(self, default=NotSet, converter=None, value_cb=None): + # type: (Any, Converter, Callable[[str, Any], None]) -> Optional[Tuple[Text, Any]] for key in self.keys: value = self._get(key) if value is NotSet: @@ -75,13 +75,20 @@ class Entry(object): except Exception as ex: self.error("invalid value {key}={value}: {ex}".format(**locals())) break + # noinspection PyBroadException + try: + if value_cb: + value_cb(key, value) + except Exception: + pass return key, value + result = self.default if default is NotSet else default return self.key, result - def get(self, default=NotSet, converter=None): - # type: (Any, Converter) -> Optional[Any] - return self.get_pair(default=default, converter=converter)[1] + def get(self, default=NotSet, converter=None, value_cb=None): + # type: (Any, Converter, Callable[[str, Any], None]) -> Optional[Any] + return self.get_pair(default=default, converter=converter, value_cb=value_cb)[1] def set(self, value): # type: (Any, Any) -> (Text, Any) diff --git a/clearml_agent/commands/worker.py b/clearml_agent/commands/worker.py index c08b07f..17e0949 100644 --- a/clearml_agent/commands/worker.py +++ b/clearml_agent/commands/worker.py @@ -11,6 +11,7 @@ import subprocess import sys import shutil import traceback +import shlex from collections import defaultdict from copy import deepcopy, copy from datetime import datetime @@ -19,7 +20,7 @@ from functools import partial, cmp_to_key from itertools import chain from tempfile import mkdtemp, NamedTemporaryFile from time import sleep, time -from typing import Text, Optional, Any, Tuple +from typing import Text, Optional, Any, Tuple, List import attr import psutil @@ -43,7 +44,14 @@ from clearml_agent.definitions import ( ENV_DOCKER_HOST_MOUNT, ENV_TASK_EXTRA_PYTHON_PATH, ENV_AGENT_GIT_USER, - ENV_AGENT_GIT_PASS, ENV_WORKER_ID, ENV_DOCKER_SKIP_GPUS_FLAG, ) + ENV_AGENT_GIT_PASS, + ENV_WORKER_ID, + ENV_DOCKER_SKIP_GPUS_FLAG, + ENV_AGENT_SECRET_KEY, + ENV_AWS_SECRET_KEY, + ENV_AZURE_ACCOUNT_KEY, + ENV_AGENT_DISABLE_SSH_MOUNT, +) from clearml_agent.definitions import WORKING_REPOSITORY_DIR, PIP_EXTRA_INDICES from clearml_agent.errors import APIError, CommandFailedError, Sigterm from clearml_agent.helper.base import ( @@ -67,7 +75,9 @@ from clearml_agent.helper.base import ( get_python_path, is_linux_platform, rm_file, - add_python_path, safe_remove_tree, ) + add_python_path, + safe_remove_tree, +) from clearml_agent.helper.console import ensure_text, print_text, decode_binary_lines from clearml_agent.helper.os.daemonize import daemonize_process from clearml_agent.helper.package.base import PackageManager @@ -90,7 +100,10 @@ from clearml_agent.helper.process import ( get_bash_output, shutdown_docker_process, get_docker_id, - commit_docker, terminate_process, check_if_command_exists, + commit_docker, + terminate_process, + check_if_command_exists, + terminate_all_child_processes, ) from clearml_agent.helper.package.priority_req import PriorityPackageRequirement, PackageCollectorRequirement from clearml_agent.helper.repo import clone_repository_cached, RepoInfo, VCS, fix_package_import_diff_patch @@ -187,7 +200,7 @@ class LiteralScriptManager(object): location = location or (repo_info and repo_info.root) if not location: location = Path(self.venv_folder, "code") - location.mkdir(exist_ok=True) + location.mkdir(exist_ok=True, parents=True) log.debug("selected execution directory: %s", location) return Text(location), self.write(task, location, execution.entry_point) @@ -221,6 +234,9 @@ def get_task(session, task_id, *args, **kwargs): def get_task_container(session, task_id): + """ + Returns dict with Task docker container setup {container: '', arguments: '', setup_shell_script: ''} + """ if session.check_min_api_version("2.13"): result = session.send_request( service='tasks', @@ -233,12 +249,12 @@ def get_task_container(session, task_id): try: container = result.json()['data']['tasks'][0]['container'] if result.ok else {} if container.get('arguments'): - container['arguments'] = str(container.get('arguments')).split(' ') + container['arguments'] = shlex.split(str(container.get('arguments')).strip()) except (ValueError, TypeError): container = {} else: response = get_task(session, task_id, only_fields=["execution.docker_cmd"]) - task_docker_cmd_parts = str(response.execution.docker_cmd or '').strip().split(' ') + task_docker_cmd_parts = shlex.split(str(response.execution.docker_cmd or '').strip()) try: container = dict( container=task_docker_cmd_parts[0], @@ -251,11 +267,14 @@ def get_task_container(session, task_id): def set_task_container(session, task_id, docker_image=None, docker_arguments=None, docker_bash_script=None): + if docker_arguments and isinstance(docker_arguments, str): + docker_arguments = [docker_arguments] + if session.check_min_api_version("2.13"): container = dict( - image=docker_image or None, - arguments=' '.join(docker_arguments) if docker_arguments else None, - setup_shell_script=docker_bash_script or None, + image=docker_image or '', + arguments=' '.join(docker_arguments) if docker_arguments else '', + setup_shell_script=docker_bash_script or '', ) result = session.send_request( service='tasks', @@ -614,10 +633,13 @@ class Worker(ServiceCommandSection): '--standalone-mode' if self._standalone_mode else '', task_id) - # send the actual used command line to the backend - self.send_logs(task_id=task_id, lines=['Executing: {}\n'.format(full_docker_cmd)], level="INFO") + display_docker_command = self._sanitize_docker_command(full_docker_cmd) + + # send the actual used command line to the backend + self.send_logs(task_id=task_id, lines=['Executing: {}\n'.format(display_docker_command)], level="INFO") + + cmd = Argv(*full_docker_cmd, display_argv=display_docker_command) - cmd = Argv(*full_docker_cmd) print('Running Docker:\n{}\n'.format(str(cmd))) else: cmd = worker_args.get_argv_for_command("execute") + ( @@ -685,6 +707,9 @@ class Worker(ServiceCommandSection): shutdown_docker_process(docker_cmd_contains='--id {}\'\"'.format(task_id)) safe_remove_file(temp_stdout_name) safe_remove_file(temp_stderr_name) + if self._services_mode and status == ExitStatus.interrupted: + # unregister this worker, it was killed + self._unregister() def run_tasks_loop(self, queues, worker_params, priority_order=True, gpu_indexes=None, gpu_queues=None): """ @@ -719,6 +744,7 @@ class Worker(ServiceCommandSection): # get current running instances available_gpus = None + dynamic_gpus_worker_id = None if gpu_indexes and gpu_queues: available_gpus, gpu_queues = self._setup_dynamic_gpus(gpu_queues) # multi instance support @@ -812,7 +838,7 @@ class Worker(ServiceCommandSection): self.report_monitor(ResourceMonitor.StatusReport(queues=queues, queue=queue, task=task_id)) org_gpus = os.environ.get('NVIDIA_VISIBLE_DEVICES') - worker_id = self.worker_id + dynamic_gpus_worker_id = self.worker_id # the following is only executed in dynamic gpus mode if gpu_queues and gpu_queues.get(queue): # pick the first available GPUs @@ -836,7 +862,7 @@ class Worker(ServiceCommandSection): self.run_one_task(queue, task_id, worker_params) if gpu_queues: - self.worker_id = worker_id + self.worker_id = dynamic_gpus_worker_id os.environ['CUDA_VISIBLE_DEVICES'] = \ os.environ['NVIDIA_VISIBLE_DEVICES'] = org_gpus @@ -864,23 +890,23 @@ class Worker(ServiceCommandSection): if shutdown_docker_process(docker_cmd_contains='--id {}\'\"'.format(t_id)): self.handle_task_termination(task_id=t_id, exit_code=0, stop_reason=TaskStopReason.stopped) else: + # if we are in dynamic gpus / services mode, + # we should send termination signal to all child processes + if self._services_mode: + terminate_all_child_processes(timeout=20, include_parent=False) + # if we are here, just kill all sub processes kill_all_child_processes() - for t_id in set(list_task_gpus_ids.values()): - # check if Task is running, - task_info = get_task( - self._session, t_id, only_fields=["status"] - ) - # this is a bit risky we might have rerun it again after it already completed - # basically we are not removing completed tasks from the list, hence the issue - if str(task_info.status) == "in_progress": - self.handle_task_termination( - task_id=t_id, exit_code=0, stop_reason=TaskStopReason.stopped) + + # unregister dynamic GPU worker, if we were terminated while setting up a Task + if dynamic_gpus_worker_id: + self.worker_id = dynamic_gpus_worker_id + self._unregister() def _dynamic_gpu_get_available(self, gpu_indexes): # noinspection PyBroadException try: - response = self._session.send_api(workers_api.GetAllRequest(last_seen=60)) + response = self._session.send_api(workers_api.GetAllRequest(last_seen=600)) except Exception: return None @@ -1360,6 +1386,7 @@ class Worker(ServiceCommandSection): service_mode_internal_agent_started = None stopping = False status = None + process = None try: _last_machine_update_ts = time() stop_reason = None @@ -1396,7 +1423,7 @@ class Worker(ServiceCommandSection): # get diff from previous poll printed_lines, stdout_pos_count = _print_file(stdout_path, stdout_pos_count) - if self._services_mode and not stopping and not status: + if self._services_mode and not stopping and status is None: # if the internal agent started, we stop logging, it will take over logging. # if the internal agent started running the task itself, it will return status==0, # then we can quit the monitoring loop of this process @@ -1416,6 +1443,8 @@ class Worker(ServiceCommandSection): status = ex.returncode except KeyboardInterrupt: # so someone else will catch us + if process: + kill_all_child_processes(process.pid) raise except Exception: # we should not get here, but better safe than sorry @@ -1427,6 +1456,10 @@ class Worker(ServiceCommandSection): stop_reason = TaskStopReason.exception status = -1 + # full cleanup (just in case) + if process: + kill_all_child_processes(process.pid) + # if running in services mode, keep the file open # in case the docker was so quick it started and finished, check the stop reason if self._services_mode and service_mode_internal_agent_started and stop_reason == 'Service started': @@ -1792,15 +1825,16 @@ class Worker(ServiceCommandSection): debug=self._session.debug_mode, trace=self._session.trace, ) - self.report_monitor(ResourceMonitor.StatusReport(task=current_task.id)) - self.run_one_task(queue='', task_id=current_task.id, worker_args=worker_params, docker=docker) + try: + self.report_monitor(ResourceMonitor.StatusReport(task=current_task.id)) + self.run_one_task(queue='', task_id=current_task.id, worker_args=worker_params, docker=docker) + finally: + self.stop_monitor() + self._unregister() - self.stop_monitor() - self._unregister() - - if full_monitoring and self.temp_config_path: - safe_remove_file(self._session.config_file) - Singleton.close_pid_file() + if full_monitoring and self.temp_config_path: + safe_remove_file(self._session.config_file) + Singleton.close_pid_file() return self._session.print_configuration() @@ -1908,7 +1942,6 @@ class Worker(ServiceCommandSection): if current_task.script.binary and current_task.script.binary.startswith('python') and \ execution.entry_point and execution.entry_point.split()[0].strip() == '-m': # we need to split it - import shlex extra.extend(shlex.split(execution.entry_point)) else: extra.append(execution.entry_point) @@ -2693,8 +2726,11 @@ class Worker(ServiceCommandSection): if temp_config.get("agent.venvs_cache.path", None): temp_config.put("agent.venvs_cache.path", '/root/.clearml/venvs-cache') - self._host_ssh_cache = mkdtemp(prefix='clearml_agent.ssh.') - self._temp_cleanup_list.append(self._host_ssh_cache) + if ENV_AGENT_DISABLE_SSH_MOUNT.get(): + self._host_ssh_cache = None + else: + self._host_ssh_cache = mkdtemp(prefix='clearml_agent.ssh.') + self._temp_cleanup_list.append(self._host_ssh_cache) return temp_config, partial(self._get_docker_config_cmd, temp_config=temp_config) @@ -2716,24 +2752,31 @@ class Worker(ServiceCommandSection): "agent.docker_pip_cache", '~/.clearml/pip-cache'))).expanduser().as_posix() # make sure all folders are valid - Path(host_apt_cache).mkdir(parents=True, exist_ok=True) - Path(host_pip_cache).mkdir(parents=True, exist_ok=True) - Path(host_cache).mkdir(parents=True, exist_ok=True) - Path(host_pip_dl).mkdir(parents=True, exist_ok=True) - Path(host_vcs_cache).mkdir(parents=True, exist_ok=True) - Path(host_ssh_cache).mkdir(parents=True, exist_ok=True) + if host_apt_cache: + Path(host_apt_cache).mkdir(parents=True, exist_ok=True) + if host_pip_cache: + Path(host_pip_cache).mkdir(parents=True, exist_ok=True) + if host_cache: + Path(host_cache).mkdir(parents=True, exist_ok=True) + if host_pip_dl: + Path(host_pip_dl).mkdir(parents=True, exist_ok=True) + if host_vcs_cache: + Path(host_vcs_cache).mkdir(parents=True, exist_ok=True) + if host_ssh_cache: + Path(host_ssh_cache).mkdir(parents=True, exist_ok=True) if host_venvs_cache: Path(host_venvs_cache).mkdir(parents=True, exist_ok=True) - # copy the .ssh folder to a temp folder, to be mapped into docker - # noinspection PyBroadException - try: - if Path(host_ssh_cache).is_dir(): - shutil.rmtree(host_ssh_cache, ignore_errors=True) - shutil.copytree(Path('~/.ssh').expanduser().as_posix(), host_ssh_cache) - except Exception: - host_ssh_cache = None - self.log.warning('Failed creating temporary copy of ~/.ssh for git credential') + if host_ssh_cache: + # copy the .ssh folder to a temp folder, to be mapped into docker + # noinspection PyBroadException + try: + if Path(host_ssh_cache).is_dir(): + shutil.rmtree(host_ssh_cache, ignore_errors=True) + shutil.copytree(Path('~/.ssh').expanduser().as_posix(), host_ssh_cache) + except Exception: + host_ssh_cache = None + self.log.warning('Failed creating temporary copy of ~/.ssh for git credential') # check if the .git credentials exist: try: @@ -3080,7 +3123,7 @@ class Worker(ServiceCommandSection): warning('Could not terminate process pid={}'.format(pid)) return True - # wither we have a match for the worker_id or we just pick the first one, and kill it. + # either we have a match for the worker_id or we just pick the first one, and kill it. if (worker_id and uid == worker_id) or (not worker_id and uid.startswith('{}:'.format(worker_name))): # this is us kill it print('Terminating clearml-agent worker_id={} pid={}'.format(uid, pid)) @@ -3143,6 +3186,33 @@ class Worker(ServiceCommandSection): queue_ids.append(q_id) return queue_ids + def _sanitize_docker_command(self, docker_command): + # type: (List[str]) -> List[str] + if not self._session.config.get('agent.hide_docker_command_env_vars.enabled', False): + return docker_command + + keys = set(self._session.config.get('agent.hide_docker_command_env_vars.extra_keys', [])) + keys.update( + ENV_AGENT_GIT_PASS.vars, + ENV_AGENT_SECRET_KEY.vars, + ENV_AWS_SECRET_KEY.vars, + ENV_AZURE_ACCOUNT_KEY.vars + ) + + result = docker_command[:] + for i, item in enumerate(docker_command): + try: + if item not in ("-e", "--env"): + continue + key, sep, _ = result[i + 1].partition("=") + if key not in keys or not sep: + continue + result[i + 1] = "{}={}".format(key, "********") + except KeyError: + pass + + return result + if __name__ == "__main__": pass diff --git a/clearml_agent/definitions.py b/clearml_agent/definitions.py index c71db5f..6b0d8e2 100644 --- a/clearml_agent/definitions.py +++ b/clearml_agent/definitions.py @@ -62,6 +62,10 @@ class EnvironmentConfig(object): return None +ENV_AGENT_SECRET_KEY = EnvironmentConfig("CLEARML_API_SECRET_KEY", "TRAINS_API_SECRET_KEY") +ENV_AWS_SECRET_KEY = EnvironmentConfig("AWS_SECRET_ACCESS_KEY") +ENV_AZURE_ACCOUNT_KEY = EnvironmentConfig("AZURE_STORAGE_KEY") + ENVIRONMENT_CONFIG = { "api.api_server": EnvironmentConfig("CLEARML_API_HOST", "TRAINS_API_HOST", ), "api.files_server": EnvironmentConfig("CLEARML_FILES_HOST", "TRAINS_FILES_HOST", ), @@ -69,9 +73,7 @@ ENVIRONMENT_CONFIG = { "api.credentials.access_key": EnvironmentConfig( "CLEARML_API_ACCESS_KEY", "TRAINS_API_ACCESS_KEY", ), - "api.credentials.secret_key": EnvironmentConfig( - "CLEARML_API_SECRET_KEY", "TRAINS_API_SECRET_KEY", - ), + "api.credentials.secret_key": ENV_AGENT_SECRET_KEY, "agent.worker_name": EnvironmentConfig("CLEARML_WORKER_NAME", "TRAINS_WORKER_NAME", ), "agent.worker_id": EnvironmentConfig("CLEARML_WORKER_ID", "TRAINS_WORKER_ID", ), "agent.cuda_version": EnvironmentConfig( @@ -84,10 +86,10 @@ ENVIRONMENT_CONFIG = { names=("CLEARML_CPU_ONLY", "TRAINS_CPU_ONLY", "CPU_ONLY"), type=bool ), "sdk.aws.s3.key": EnvironmentConfig("AWS_ACCESS_KEY_ID"), - "sdk.aws.s3.secret": EnvironmentConfig("AWS_SECRET_ACCESS_KEY"), + "sdk.aws.s3.secret": ENV_AWS_SECRET_KEY, "sdk.aws.s3.region": EnvironmentConfig("AWS_DEFAULT_REGION"), "sdk.azure.storage.containers.0": {'account_name': EnvironmentConfig("AZURE_STORAGE_ACCOUNT"), - 'account_key': EnvironmentConfig("AZURE_STORAGE_KEY")}, + 'account_key': ENV_AZURE_ACCOUNT_KEY}, "sdk.google.storage.credentials_json": EnvironmentConfig("GOOGLE_APPLICATION_CREDENTIALS"), } @@ -132,6 +134,7 @@ ENV_DOCKER_SKIP_GPUS_FLAG = EnvironmentConfig('CLEARML_DOCKER_SKIP_GPUS_FLAG', ' ENV_AGENT_GIT_USER = EnvironmentConfig('CLEARML_AGENT_GIT_USER', 'TRAINS_AGENT_GIT_USER') ENV_AGENT_GIT_PASS = EnvironmentConfig('CLEARML_AGENT_GIT_PASS', 'TRAINS_AGENT_GIT_PASS') ENV_AGENT_GIT_HOST = EnvironmentConfig('CLEARML_AGENT_GIT_HOST', 'TRAINS_AGENT_GIT_HOST') +ENV_AGENT_DISABLE_SSH_MOUNT = EnvironmentConfig('CLEARML_AGENT_DISABLE_SSH_MOUNT', type=bool) ENV_TASK_EXECUTE_AS_USER = EnvironmentConfig('CLEARML_AGENT_EXEC_USER', 'TRAINS_AGENT_EXEC_USER') ENV_TASK_EXTRA_PYTHON_PATH = EnvironmentConfig('CLEARML_AGENT_EXTRA_PYTHON_PATH', 'TRAINS_AGENT_EXTRA_PYTHON_PATH') ENV_DOCKER_HOST_MOUNT = EnvironmentConfig('CLEARML_AGENT_K8S_HOST_MOUNT', 'CLEARML_AGENT_DOCKER_HOST_MOUNT', diff --git a/clearml_agent/glue/k8s.py b/clearml_agent/glue/k8s.py index d00abd2..4b5375a 100644 --- a/clearml_agent/glue/k8s.py +++ b/clearml_agent/glue/k8s.py @@ -32,9 +32,9 @@ class K8sIntegration(Worker): K8S_DEFAULT_NAMESPACE = "clearml" - KUBECTL_APPLY_CMD = "kubectl apply -f" + KUBECTL_APPLY_CMD = "kubectl apply --namespace={namespace} -f" - KUBECTL_RUN_CMD = "kubectl run clearml-{queue_name}-id-{task_id} " \ + KUBECTL_RUN_CMD = "kubectl run clearml-id-{task_id} " \ "--image {docker_image} " \ "--restart=Never " \ "--namespace={namespace}" @@ -95,6 +95,7 @@ class K8sIntegration(Worker): clearml_conf_file=None, extra_bash_init_script=None, namespace=None, + max_pods_limit=None, **kwargs ): """ @@ -122,6 +123,7 @@ class K8sIntegration(Worker): :param str clearml_conf_file: clearml.conf file to be use by the pod itself (optional) :param str extra_bash_init_script: Additional bash script to run before starting the Task inside the container :param str namespace: K8S namespace to be used when creating the new pods (default: clearml) + :param int max_pods_limit: Maximum number of pods that K8S glue can run at the same time """ super(K8sIntegration, self).__init__() self.k8s_pending_queue_name = k8s_pending_queue_name or self.K8S_PENDING_QUEUE @@ -147,6 +149,7 @@ class K8sIntegration(Worker): self.namespace = namespace or self.K8S_DEFAULT_NAMESPACE self.pod_limits = [] self.pod_requests = [] + self.max_pods_limit = max_pods_limit if not self.ports_mode else None if overrides_yaml: with open(os.path.expandvars(os.path.expanduser(str(overrides_yaml))), 'rt') as f: overrides = yaml.load(f, Loader=getattr(yaml, 'FullLoader', None)) @@ -304,20 +307,22 @@ class K8sIntegration(Worker): except Exception: queue_name = 'k8s' - # conform queue name to k8s standards - safe_queue_name = queue_name.lower().strip() - safe_queue_name = re.sub(r'\W+', '', safe_queue_name).replace('_', '').replace('-', '') - # Search for a free pod number pod_count = 0 pod_number = self.base_pod_num - while self.ports_mode: + while self.ports_mode or self.max_pods_limit: pod_number = self.base_pod_num + pod_count - kubectl_cmd_new = "kubectl get pods -l {pod_label},{agent_label} -n {namespace}".format( - pod_label=self.LIMIT_POD_LABEL.format(pod_number=pod_number), - agent_label=self.AGENT_LABEL, - namespace=self.namespace, - ) + if self.ports_mode: + kubectl_cmd_new = "kubectl get pods -l {pod_label},{agent_label} -n {namespace}".format( + pod_label=self.LIMIT_POD_LABEL.format(pod_number=pod_number), + agent_label=self.AGENT_LABEL, + namespace=self.namespace, + ) + else: + kubectl_cmd_new = "kubectl get pods -l {agent_label} -n {namespace} -o json".format( + agent_label=self.AGENT_LABEL, + namespace=self.namespace, + ) process = subprocess.Popen(kubectl_cmd_new.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, error = process.communicate() output = '' if not output else output if isinstance(output, str) else output.decode('utf-8') @@ -326,21 +331,47 @@ class K8sIntegration(Worker): if not output: # No such pod exist so we can use the pod_number we found break - if pod_count >= self.num_of_services - 1: - # All pod numbers are taken, exit + + if self.max_pods_limit: + try: + current_pod_count = len(json.loads(output).get("items", [])) + except (ValueError, TypeError) as ex: + self.log.warning( + "K8S Glue pods monitor: Failed parsing kubectl output:\n{}\ntask '{}' " + "will be enqueued back to queue '{}'\nEx: {}".format( + output, task_id, queue, ex + ) + ) + self._session.api_client.tasks.reset(task_id) + self._session.api_client.tasks.enqueue(task_id, queue=queue, status_reason='kubectl parsing error') + return + max_count = self.max_pods_limit + else: + current_pod_count = pod_count + max_count = self.num_of_services - 1 + + if current_pod_count >= max_count: + # All pods are taken, exit + self.log.debug( + "kubectl last result: {}\n{}".format(error, output)) self.log.warning( - "kubectl last result: {}\n{}\nAll k8s services are in use, task '{}' " + "All k8s services are in use, task '{}' " "will be enqueued back to queue '{}'".format( - error, output, task_id, queue + task_id, queue ) ) self._session.api_client.tasks.reset(task_id) self._session.api_client.tasks.enqueue( task_id, queue=queue, status_reason='k8s max pod limit (no free k8s service)') return + elif self.max_pods_limit: + # max pods limit hasn't reached yet, so we can create the pod + break pod_count += 1 labels = ([self.LIMIT_POD_LABEL.format(pod_number=pod_number)] if self.ports_mode else []) + [self.AGENT_LABEL] + labels.append("clearml-agent-queue={}".format(self._safe_k8s_label_value(queue))) + labels.append("clearml-agent-queue-name={}".format(self._safe_k8s_label_value(queue_name))) if self.ports_mode: print("Kubernetes scheduling task id={} on pod={} (pod_count={})".format(task_id, pod_number, pod_count)) @@ -351,13 +382,13 @@ class K8sIntegration(Worker): output, error = self._kubectl_apply( create_clearml_conf=create_clearml_conf, labels=labels, docker_image=docker_image, docker_args=docker_args, - task_id=task_id, queue=queue, queue_name=safe_queue_name) + task_id=task_id, queue=queue) else: output, error = self._kubectl_run( create_clearml_conf=create_clearml_conf, labels=labels, docker_image=docker_cmd, task_data=task_data, - task_id=task_id, queue=queue, queue_name=safe_queue_name) + task_id=task_id, queue=queue) error = '' if not error else (error if isinstance(error, str) else error.decode('utf-8')) output = '' if not output else (output if isinstance(output, str) else output.decode('utf-8')) @@ -404,15 +435,16 @@ class K8sIntegration(Worker): self.log.warning('skipping docker argument {} (only -e --env supported)'.format(cmd)) return {'env': kube_args} if kube_args else {} - def _kubectl_apply(self, create_clearml_conf, docker_image, docker_args, labels, queue, task_id, queue_name): + def _kubectl_apply(self, create_clearml_conf, docker_image, docker_args, labels, queue, task_id): template = deepcopy(self.template_dict) template.setdefault('apiVersion', 'v1') template['kind'] = 'Pod' template.setdefault('metadata', {}) - name = 'clearml-{queue}-id-{task_id}'.format(queue=queue_name, task_id=task_id) + name = 'clearml-id-{task_id}'.format(task_id=task_id) template['metadata']['name'] = name template.setdefault('spec', {}) template['spec'].setdefault('containers', []) + template['spec'].setdefault('restartPolicy', 'Never') if labels: labels_dict = dict(pair.split('=', 1) for pair in labels) template['metadata'].setdefault('labels', {}) @@ -474,12 +506,11 @@ class K8sIntegration(Worker): return output, error - def _kubectl_run(self, create_clearml_conf, docker_image, labels, queue, task_data, task_id, queue_name): + def _kubectl_run(self, create_clearml_conf, docker_image, labels, queue, task_data, task_id): if callable(self.kubectl_cmd): - kubectl_cmd = self.kubectl_cmd(task_id, docker_image, queue, task_data, queue_name) + kubectl_cmd = self.kubectl_cmd(task_id, docker_image, queue, task_data) else: kubectl_cmd = self.kubectl_cmd.format( - queue_name=queue_name, task_id=task_id, docker_image=docker_image, queue_id=queue, @@ -607,3 +638,13 @@ class K8sIntegration(Worker): return merge_dicts( c1, c2, custom_merge_func=merge_env ) + + @staticmethod + def _safe_k8s_label_value(value): + """ Conform string to k8s standards for a label value """ + value = value.lower().strip() + value = re.sub(r'^[^A-Za-z0-9]+', '', value) # strip leading non-alphanumeric chars + value = re.sub(r'[^A-Za-z0-9]+$', '', value) # strip trailing non-alphanumeric chars + value = re.sub(r'\W+', '-', value) # allow only word chars (this removed "." which is supported, but nvm) + value = re.sub(r'-+', '-', value) # don't leave messy "--" after replacing previous chars + return value[:63] diff --git a/clearml_agent/helper/process.py b/clearml_agent/helper/process.py index b8f4ef9..fc86785 100644 --- a/clearml_agent/helper/process.py +++ b/clearml_agent/helper/process.py @@ -42,20 +42,31 @@ def get_bash_output(cmd, strip=False, stderr=subprocess.STDOUT, stdin=False): return output if not strip or not output else output.strip() -def terminate_process(pid, timeout=10.): +def terminate_process(pid, timeout=10., ignore_zombie=True, include_children=False): # noinspection PyBroadException try: proc = psutil.Process(pid) + children = proc.children(recursive=True) if include_children else [] proc.terminate() cnt = 0 - while proc.is_running() and cnt < timeout: + while proc.is_running() and (ignore_zombie or proc.status() != 'zombie') and cnt < timeout: sleep(1.) cnt += 1 proc.terminate() + + # terminate children + for c in children: + c.terminate() + cnt = 0 - while proc.is_running() and cnt < timeout: + while proc.is_running() and (ignore_zombie or proc.status() != 'zombie') and cnt < timeout: sleep(1.) cnt += 1 + + # kill children + for c in children: + c.kill() + proc.kill() except Exception: pass @@ -66,9 +77,8 @@ def terminate_process(pid, timeout=10.): return True -def kill_all_child_processes(pid=None): +def kill_all_child_processes(pid=None, include_parent=True): # get current process if pid not provided - include_parent = True if not pid: pid = os.getpid() include_parent = False @@ -84,6 +94,23 @@ def kill_all_child_processes(pid=None): parent.kill() +def terminate_all_child_processes(pid=None, timeout=10., include_parent=True): + # get current process if pid not provided + if not pid: + pid = os.getpid() + include_parent = False + try: + parent = psutil.Process(pid) + except psutil.Error: + # could not find parent process id + return + for child in parent.children(recursive=False): + print('Terminating child process {}'.format(child.pid)) + terminate_process(child.pid, timeout=timeout, ignore_zombie=False, include_children=True) + if include_parent: + terminate_process(parent.pid, timeout=timeout, ignore_zombie=False) + + def get_docker_id(docker_cmd_contains): try: containers_running = get_bash_output(cmd='docker ps --no-trunc --format \"{{.ID}}: {{.Command}}\"') @@ -194,6 +221,7 @@ class Argv(Executable): """ self.argv = argv self._log = kwargs.pop("log", None) + self._display_argv = kwargs.pop("display_argv", argv) if not self._log: self._log = logging.getLogger(__name__) self._log.propagate = False @@ -218,10 +246,10 @@ class Argv(Executable): return self.argv def __repr__(self): - return "".format(self.argv) + return "".format(self._display_argv) def __str__(self): - return "Executing: {}".format(self.argv) + return "Executing: {}".format(self._display_argv) def __iter__(self): if is_windows_platform(): diff --git a/clearml_agent/helper/repo.py b/clearml_agent/helper/repo.py index f39a4f8..551c9f3 100644 --- a/clearml_agent/helper/repo.py +++ b/clearml_agent/helper/repo.py @@ -591,7 +591,7 @@ def clone_repository_cached(session, execution, destination): # mock lock repo_lock = Lock() repo_lock_timeout_sec = 300 - repo_url = execution.repository # type: str + repo_url = execution.repository or '' # type: str parsed_url = furl(repo_url) no_password_url = parsed_url.copy().remove(password=True).url diff --git a/clearml_agent/version.py b/clearml_agent/version.py index 84d3488..6dada8f 100644 --- a/clearml_agent/version.py +++ b/clearml_agent/version.py @@ -1 +1 @@ -__version__ = '0.17.2' +__version__ = '1.0.1rc1' diff --git a/docs/clearml.conf b/docs/clearml.conf index bd0c7eb..05e0922 100644 --- a/docs/clearml.conf +++ b/docs/clearml.conf @@ -42,6 +42,9 @@ agent { # Example values: "/usr/bin/python3" or "/usr/local/bin/python3.6" # The default is the python executing the clearml_agent python_binary: "" + # ignore any requested python version (Default: False, if a Task was using a + # specific python version and the system supports multiple python the agent will use the requested python version) + # ignore_requested_python_version: true # select python package manager: # currently supported pip and conda @@ -107,11 +110,12 @@ agent { path: ~/.clearml/vcs-cache }, + # DEPRECATED: please use `venvs_cache` and set `venvs_cache.path` # use venv-update in order to accelerate python virtual environment building # Still in beta, turned off by default - venv_update: { - enabled: false, - }, + # venv_update: { + # enabled: false, + # }, # cached folder for specific python package download (mostly pytorch versions) pip_download_cache { diff --git a/examples/k8s_glue_example.py b/examples/k8s_glue_example.py index b368b50..dc69c37 100644 --- a/examples/k8s_glue_example.py +++ b/examples/k8s_glue_example.py @@ -10,12 +10,15 @@ from clearml_agent.glue.k8s import K8sIntegration def parse_args(): parser = ArgumentParser() + group = parser.add_mutually_exclusive_group() + parser.add_argument( "--queue", type=str, help="Queue to pull tasks from" ) - parser.add_argument( + group.add_argument( "--ports-mode", action='store_true', default=False, help="Ports-Mode will add a label to the pod which can be used as service, in order to expose ports" + "Should not be used with max-pods" ) parser.add_argument( "--num-of-services", type=int, default=20, @@ -57,6 +60,11 @@ def parse_args(): "--namespace", type=str, help="Specify the namespace in which pods will be created (default: %(default)s)", default="clearml" ) + group.add_argument( + "--max-pods", type=int, + help="Limit the maximum number of pods that this service can run at the same time." + "Should not be used with ports-mode" + ) return parser.parse_args() @@ -77,7 +85,7 @@ def main(): user_props_cb=user_props_cb, overrides_yaml=args.overrides_yaml, clearml_conf_file=args.pod_clearml_conf, template_yaml=args.template_yaml, extra_bash_init_script=K8sIntegration.get_ssh_server_bash( ssh_port_number=args.ssh_server_port) if args.ssh_server_port else None, - namespace=args.namespace, + namespace=args.namespace, max_pods_limit=args.max_pods or None, ) k8s.k8s_daemon(args.queue) diff --git a/requirements.txt b/requirements.txt index ba96007..5186b3f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,7 +9,7 @@ pyhocon>=0.3.38,<0.4.0 pyparsing>=2.0.3,<2.5.0 python-dateutil>=2.4.2,<2.9.0 pyjwt>=1.6.4,<2.1.0 -PyYAML>=3.12,<5.4.0 +PyYAML>=3.12,<5.5.0 requests>=2.20.0,<2.26.0 six>=1.11.0,<1.16.0 typing>=3.6.4,<3.8.0