Merge branch 'allegroai:master' into master

This commit is contained in:
pollfly 2021-06-22 13:47:48 +03:00 committed by GitHub
commit 1caf7b104f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 282 additions and 104 deletions

View File

@ -26,6 +26,9 @@
# Example values: "/usr/bin/python3" or "/usr/local/bin/python3.6"
# The default is the python executing the clearml_agent
python_binary: ""
# ignore any requested python version (Default: False, if a Task was using a
# specific python version and the system supports multiple python the agent will use the requested python version)
# ignore_requested_python_version: true
# select python package manager:
# currently supported pip and conda
@ -182,4 +185,16 @@
# should be detected automatically. Override with os environment CUDA_VERSION / CUDNN_VERSION
# cuda_version: 10.1
# cudnn_version: 7.6
# Hide docker environment variables containing secrets when printing out the docker command by replacing their
# values with "********". Turning this feature on will hide the following environment variables values:
# CLEARML_API_SECRET_KEY, CLEARML_AGENT_GIT_PASS, AWS_SECRET_ACCESS_KEY, AZURE_STORAGE_KEY
# To include more environment variables, add their keys to the "extra_keys" list. E.g. to make sure the value of
# your custom environment variable named MY_SPECIAL_PASSWORD will not show in the logs when included in the
# docker command, set:
# extra_keys: ["MY_SPECIAL_PASSWORD"]
hide_docker_command_env_vars {
enabled: true
extra_keys: []
}
}

View File

@ -111,7 +111,8 @@ class Session(TokenManager):
self._logger = logger
self.__access_key = api_key or ENV_ACCESS_KEY.get(
default=(self.config.get("api.credentials.access_key", None) or self.default_key)
default=(self.config.get("api.credentials.access_key", None) or self.default_key),
value_cb=lambda key, value: logger.info("Using environment access key {}={}".format(key, value))
)
if not self.access_key:
raise ValueError(
@ -119,7 +120,8 @@ class Session(TokenManager):
)
self.__secret_key = secret_key or ENV_SECRET_KEY.get(
default=(self.config.get("api.credentials.secret_key", None) or self.default_secret)
default=(self.config.get("api.credentials.secret_key", None) or self.default_secret),
value_cb=lambda key, value: logger.info("Using environment secret key {}=********".format(key))
)
if not self.secret_key:
raise ValueError(

View File

@ -64,8 +64,8 @@ class Entry(object):
converter = self.default_conversions().get(self.type, self.type)
return converter(value)
def get_pair(self, default=NotSet, converter=None):
# type: (Any, Converter) -> Optional[Tuple[Text, Any]]
def get_pair(self, default=NotSet, converter=None, value_cb=None):
# type: (Any, Converter, Callable[[str, Any], None]) -> Optional[Tuple[Text, Any]]
for key in self.keys:
value = self._get(key)
if value is NotSet:
@ -75,13 +75,20 @@ class Entry(object):
except Exception as ex:
self.error("invalid value {key}={value}: {ex}".format(**locals()))
break
# noinspection PyBroadException
try:
if value_cb:
value_cb(key, value)
except Exception:
pass
return key, value
result = self.default if default is NotSet else default
return self.key, result
def get(self, default=NotSet, converter=None):
# type: (Any, Converter) -> Optional[Any]
return self.get_pair(default=default, converter=converter)[1]
def get(self, default=NotSet, converter=None, value_cb=None):
# type: (Any, Converter, Callable[[str, Any], None]) -> Optional[Any]
return self.get_pair(default=default, converter=converter, value_cb=value_cb)[1]
def set(self, value):
# type: (Any, Any) -> (Text, Any)

View File

@ -11,6 +11,7 @@ import subprocess
import sys
import shutil
import traceback
import shlex
from collections import defaultdict
from copy import deepcopy, copy
from datetime import datetime
@ -19,7 +20,7 @@ from functools import partial, cmp_to_key
from itertools import chain
from tempfile import mkdtemp, NamedTemporaryFile
from time import sleep, time
from typing import Text, Optional, Any, Tuple
from typing import Text, Optional, Any, Tuple, List
import attr
import psutil
@ -43,7 +44,14 @@ from clearml_agent.definitions import (
ENV_DOCKER_HOST_MOUNT,
ENV_TASK_EXTRA_PYTHON_PATH,
ENV_AGENT_GIT_USER,
ENV_AGENT_GIT_PASS, ENV_WORKER_ID, ENV_DOCKER_SKIP_GPUS_FLAG, )
ENV_AGENT_GIT_PASS,
ENV_WORKER_ID,
ENV_DOCKER_SKIP_GPUS_FLAG,
ENV_AGENT_SECRET_KEY,
ENV_AWS_SECRET_KEY,
ENV_AZURE_ACCOUNT_KEY,
ENV_AGENT_DISABLE_SSH_MOUNT,
)
from clearml_agent.definitions import WORKING_REPOSITORY_DIR, PIP_EXTRA_INDICES
from clearml_agent.errors import APIError, CommandFailedError, Sigterm
from clearml_agent.helper.base import (
@ -67,7 +75,9 @@ from clearml_agent.helper.base import (
get_python_path,
is_linux_platform,
rm_file,
add_python_path, safe_remove_tree, )
add_python_path,
safe_remove_tree,
)
from clearml_agent.helper.console import ensure_text, print_text, decode_binary_lines
from clearml_agent.helper.os.daemonize import daemonize_process
from clearml_agent.helper.package.base import PackageManager
@ -90,7 +100,10 @@ from clearml_agent.helper.process import (
get_bash_output,
shutdown_docker_process,
get_docker_id,
commit_docker, terminate_process, check_if_command_exists,
commit_docker,
terminate_process,
check_if_command_exists,
terminate_all_child_processes,
)
from clearml_agent.helper.package.priority_req import PriorityPackageRequirement, PackageCollectorRequirement
from clearml_agent.helper.repo import clone_repository_cached, RepoInfo, VCS, fix_package_import_diff_patch
@ -187,7 +200,7 @@ class LiteralScriptManager(object):
location = location or (repo_info and repo_info.root)
if not location:
location = Path(self.venv_folder, "code")
location.mkdir(exist_ok=True)
location.mkdir(exist_ok=True, parents=True)
log.debug("selected execution directory: %s", location)
return Text(location), self.write(task, location, execution.entry_point)
@ -221,6 +234,9 @@ def get_task(session, task_id, *args, **kwargs):
def get_task_container(session, task_id):
"""
Returns dict with Task docker container setup {container: '', arguments: '', setup_shell_script: ''}
"""
if session.check_min_api_version("2.13"):
result = session.send_request(
service='tasks',
@ -233,12 +249,12 @@ def get_task_container(session, task_id):
try:
container = result.json()['data']['tasks'][0]['container'] if result.ok else {}
if container.get('arguments'):
container['arguments'] = str(container.get('arguments')).split(' ')
container['arguments'] = shlex.split(str(container.get('arguments')).strip())
except (ValueError, TypeError):
container = {}
else:
response = get_task(session, task_id, only_fields=["execution.docker_cmd"])
task_docker_cmd_parts = str(response.execution.docker_cmd or '').strip().split(' ')
task_docker_cmd_parts = shlex.split(str(response.execution.docker_cmd or '').strip())
try:
container = dict(
container=task_docker_cmd_parts[0],
@ -251,11 +267,14 @@ def get_task_container(session, task_id):
def set_task_container(session, task_id, docker_image=None, docker_arguments=None, docker_bash_script=None):
if docker_arguments and isinstance(docker_arguments, str):
docker_arguments = [docker_arguments]
if session.check_min_api_version("2.13"):
container = dict(
image=docker_image or None,
arguments=' '.join(docker_arguments) if docker_arguments else None,
setup_shell_script=docker_bash_script or None,
image=docker_image or '',
arguments=' '.join(docker_arguments) if docker_arguments else '',
setup_shell_script=docker_bash_script or '',
)
result = session.send_request(
service='tasks',
@ -614,10 +633,13 @@ class Worker(ServiceCommandSection):
'--standalone-mode' if self._standalone_mode else '',
task_id)
# send the actual used command line to the backend
self.send_logs(task_id=task_id, lines=['Executing: {}\n'.format(full_docker_cmd)], level="INFO")
display_docker_command = self._sanitize_docker_command(full_docker_cmd)
# send the actual used command line to the backend
self.send_logs(task_id=task_id, lines=['Executing: {}\n'.format(display_docker_command)], level="INFO")
cmd = Argv(*full_docker_cmd, display_argv=display_docker_command)
cmd = Argv(*full_docker_cmd)
print('Running Docker:\n{}\n'.format(str(cmd)))
else:
cmd = worker_args.get_argv_for_command("execute") + (
@ -685,6 +707,9 @@ class Worker(ServiceCommandSection):
shutdown_docker_process(docker_cmd_contains='--id {}\'\"'.format(task_id))
safe_remove_file(temp_stdout_name)
safe_remove_file(temp_stderr_name)
if self._services_mode and status == ExitStatus.interrupted:
# unregister this worker, it was killed
self._unregister()
def run_tasks_loop(self, queues, worker_params, priority_order=True, gpu_indexes=None, gpu_queues=None):
"""
@ -719,6 +744,7 @@ class Worker(ServiceCommandSection):
# get current running instances
available_gpus = None
dynamic_gpus_worker_id = None
if gpu_indexes and gpu_queues:
available_gpus, gpu_queues = self._setup_dynamic_gpus(gpu_queues)
# multi instance support
@ -812,7 +838,7 @@ class Worker(ServiceCommandSection):
self.report_monitor(ResourceMonitor.StatusReport(queues=queues, queue=queue, task=task_id))
org_gpus = os.environ.get('NVIDIA_VISIBLE_DEVICES')
worker_id = self.worker_id
dynamic_gpus_worker_id = self.worker_id
# the following is only executed in dynamic gpus mode
if gpu_queues and gpu_queues.get(queue):
# pick the first available GPUs
@ -836,7 +862,7 @@ class Worker(ServiceCommandSection):
self.run_one_task(queue, task_id, worker_params)
if gpu_queues:
self.worker_id = worker_id
self.worker_id = dynamic_gpus_worker_id
os.environ['CUDA_VISIBLE_DEVICES'] = \
os.environ['NVIDIA_VISIBLE_DEVICES'] = org_gpus
@ -864,23 +890,23 @@ class Worker(ServiceCommandSection):
if shutdown_docker_process(docker_cmd_contains='--id {}\'\"'.format(t_id)):
self.handle_task_termination(task_id=t_id, exit_code=0, stop_reason=TaskStopReason.stopped)
else:
# if we are in dynamic gpus / services mode,
# we should send termination signal to all child processes
if self._services_mode:
terminate_all_child_processes(timeout=20, include_parent=False)
# if we are here, just kill all sub processes
kill_all_child_processes()
for t_id in set(list_task_gpus_ids.values()):
# check if Task is running,
task_info = get_task(
self._session, t_id, only_fields=["status"]
)
# this is a bit risky we might have rerun it again after it already completed
# basically we are not removing completed tasks from the list, hence the issue
if str(task_info.status) == "in_progress":
self.handle_task_termination(
task_id=t_id, exit_code=0, stop_reason=TaskStopReason.stopped)
# unregister dynamic GPU worker, if we were terminated while setting up a Task
if dynamic_gpus_worker_id:
self.worker_id = dynamic_gpus_worker_id
self._unregister()
def _dynamic_gpu_get_available(self, gpu_indexes):
# noinspection PyBroadException
try:
response = self._session.send_api(workers_api.GetAllRequest(last_seen=60))
response = self._session.send_api(workers_api.GetAllRequest(last_seen=600))
except Exception:
return None
@ -1360,6 +1386,7 @@ class Worker(ServiceCommandSection):
service_mode_internal_agent_started = None
stopping = False
status = None
process = None
try:
_last_machine_update_ts = time()
stop_reason = None
@ -1396,7 +1423,7 @@ class Worker(ServiceCommandSection):
# get diff from previous poll
printed_lines, stdout_pos_count = _print_file(stdout_path, stdout_pos_count)
if self._services_mode and not stopping and not status:
if self._services_mode and not stopping and status is None:
# if the internal agent started, we stop logging, it will take over logging.
# if the internal agent started running the task itself, it will return status==0,
# then we can quit the monitoring loop of this process
@ -1416,6 +1443,8 @@ class Worker(ServiceCommandSection):
status = ex.returncode
except KeyboardInterrupt:
# so someone else will catch us
if process:
kill_all_child_processes(process.pid)
raise
except Exception:
# we should not get here, but better safe than sorry
@ -1427,6 +1456,10 @@ class Worker(ServiceCommandSection):
stop_reason = TaskStopReason.exception
status = -1
# full cleanup (just in case)
if process:
kill_all_child_processes(process.pid)
# if running in services mode, keep the file open
# in case the docker was so quick it started and finished, check the stop reason
if self._services_mode and service_mode_internal_agent_started and stop_reason == 'Service started':
@ -1792,15 +1825,16 @@ class Worker(ServiceCommandSection):
debug=self._session.debug_mode,
trace=self._session.trace,
)
self.report_monitor(ResourceMonitor.StatusReport(task=current_task.id))
self.run_one_task(queue='', task_id=current_task.id, worker_args=worker_params, docker=docker)
try:
self.report_monitor(ResourceMonitor.StatusReport(task=current_task.id))
self.run_one_task(queue='', task_id=current_task.id, worker_args=worker_params, docker=docker)
finally:
self.stop_monitor()
self._unregister()
self.stop_monitor()
self._unregister()
if full_monitoring and self.temp_config_path:
safe_remove_file(self._session.config_file)
Singleton.close_pid_file()
if full_monitoring and self.temp_config_path:
safe_remove_file(self._session.config_file)
Singleton.close_pid_file()
return
self._session.print_configuration()
@ -1908,7 +1942,6 @@ class Worker(ServiceCommandSection):
if current_task.script.binary and current_task.script.binary.startswith('python') and \
execution.entry_point and execution.entry_point.split()[0].strip() == '-m':
# we need to split it
import shlex
extra.extend(shlex.split(execution.entry_point))
else:
extra.append(execution.entry_point)
@ -2693,8 +2726,11 @@ class Worker(ServiceCommandSection):
if temp_config.get("agent.venvs_cache.path", None):
temp_config.put("agent.venvs_cache.path", '/root/.clearml/venvs-cache')
self._host_ssh_cache = mkdtemp(prefix='clearml_agent.ssh.')
self._temp_cleanup_list.append(self._host_ssh_cache)
if ENV_AGENT_DISABLE_SSH_MOUNT.get():
self._host_ssh_cache = None
else:
self._host_ssh_cache = mkdtemp(prefix='clearml_agent.ssh.')
self._temp_cleanup_list.append(self._host_ssh_cache)
return temp_config, partial(self._get_docker_config_cmd, temp_config=temp_config)
@ -2716,24 +2752,31 @@ class Worker(ServiceCommandSection):
"agent.docker_pip_cache", '~/.clearml/pip-cache'))).expanduser().as_posix()
# make sure all folders are valid
Path(host_apt_cache).mkdir(parents=True, exist_ok=True)
Path(host_pip_cache).mkdir(parents=True, exist_ok=True)
Path(host_cache).mkdir(parents=True, exist_ok=True)
Path(host_pip_dl).mkdir(parents=True, exist_ok=True)
Path(host_vcs_cache).mkdir(parents=True, exist_ok=True)
Path(host_ssh_cache).mkdir(parents=True, exist_ok=True)
if host_apt_cache:
Path(host_apt_cache).mkdir(parents=True, exist_ok=True)
if host_pip_cache:
Path(host_pip_cache).mkdir(parents=True, exist_ok=True)
if host_cache:
Path(host_cache).mkdir(parents=True, exist_ok=True)
if host_pip_dl:
Path(host_pip_dl).mkdir(parents=True, exist_ok=True)
if host_vcs_cache:
Path(host_vcs_cache).mkdir(parents=True, exist_ok=True)
if host_ssh_cache:
Path(host_ssh_cache).mkdir(parents=True, exist_ok=True)
if host_venvs_cache:
Path(host_venvs_cache).mkdir(parents=True, exist_ok=True)
# copy the .ssh folder to a temp folder, to be mapped into docker
# noinspection PyBroadException
try:
if Path(host_ssh_cache).is_dir():
shutil.rmtree(host_ssh_cache, ignore_errors=True)
shutil.copytree(Path('~/.ssh').expanduser().as_posix(), host_ssh_cache)
except Exception:
host_ssh_cache = None
self.log.warning('Failed creating temporary copy of ~/.ssh for git credential')
if host_ssh_cache:
# copy the .ssh folder to a temp folder, to be mapped into docker
# noinspection PyBroadException
try:
if Path(host_ssh_cache).is_dir():
shutil.rmtree(host_ssh_cache, ignore_errors=True)
shutil.copytree(Path('~/.ssh').expanduser().as_posix(), host_ssh_cache)
except Exception:
host_ssh_cache = None
self.log.warning('Failed creating temporary copy of ~/.ssh for git credential')
# check if the .git credentials exist:
try:
@ -3080,7 +3123,7 @@ class Worker(ServiceCommandSection):
warning('Could not terminate process pid={}'.format(pid))
return True
# wither we have a match for the worker_id or we just pick the first one, and kill it.
# either we have a match for the worker_id or we just pick the first one, and kill it.
if (worker_id and uid == worker_id) or (not worker_id and uid.startswith('{}:'.format(worker_name))):
# this is us kill it
print('Terminating clearml-agent worker_id={} pid={}'.format(uid, pid))
@ -3143,6 +3186,33 @@ class Worker(ServiceCommandSection):
queue_ids.append(q_id)
return queue_ids
def _sanitize_docker_command(self, docker_command):
# type: (List[str]) -> List[str]
if not self._session.config.get('agent.hide_docker_command_env_vars.enabled', False):
return docker_command
keys = set(self._session.config.get('agent.hide_docker_command_env_vars.extra_keys', []))
keys.update(
ENV_AGENT_GIT_PASS.vars,
ENV_AGENT_SECRET_KEY.vars,
ENV_AWS_SECRET_KEY.vars,
ENV_AZURE_ACCOUNT_KEY.vars
)
result = docker_command[:]
for i, item in enumerate(docker_command):
try:
if item not in ("-e", "--env"):
continue
key, sep, _ = result[i + 1].partition("=")
if key not in keys or not sep:
continue
result[i + 1] = "{}={}".format(key, "********")
except KeyError:
pass
return result
if __name__ == "__main__":
pass

View File

@ -62,6 +62,10 @@ class EnvironmentConfig(object):
return None
ENV_AGENT_SECRET_KEY = EnvironmentConfig("CLEARML_API_SECRET_KEY", "TRAINS_API_SECRET_KEY")
ENV_AWS_SECRET_KEY = EnvironmentConfig("AWS_SECRET_ACCESS_KEY")
ENV_AZURE_ACCOUNT_KEY = EnvironmentConfig("AZURE_STORAGE_KEY")
ENVIRONMENT_CONFIG = {
"api.api_server": EnvironmentConfig("CLEARML_API_HOST", "TRAINS_API_HOST", ),
"api.files_server": EnvironmentConfig("CLEARML_FILES_HOST", "TRAINS_FILES_HOST", ),
@ -69,9 +73,7 @@ ENVIRONMENT_CONFIG = {
"api.credentials.access_key": EnvironmentConfig(
"CLEARML_API_ACCESS_KEY", "TRAINS_API_ACCESS_KEY",
),
"api.credentials.secret_key": EnvironmentConfig(
"CLEARML_API_SECRET_KEY", "TRAINS_API_SECRET_KEY",
),
"api.credentials.secret_key": ENV_AGENT_SECRET_KEY,
"agent.worker_name": EnvironmentConfig("CLEARML_WORKER_NAME", "TRAINS_WORKER_NAME", ),
"agent.worker_id": EnvironmentConfig("CLEARML_WORKER_ID", "TRAINS_WORKER_ID", ),
"agent.cuda_version": EnvironmentConfig(
@ -84,10 +86,10 @@ ENVIRONMENT_CONFIG = {
names=("CLEARML_CPU_ONLY", "TRAINS_CPU_ONLY", "CPU_ONLY"), type=bool
),
"sdk.aws.s3.key": EnvironmentConfig("AWS_ACCESS_KEY_ID"),
"sdk.aws.s3.secret": EnvironmentConfig("AWS_SECRET_ACCESS_KEY"),
"sdk.aws.s3.secret": ENV_AWS_SECRET_KEY,
"sdk.aws.s3.region": EnvironmentConfig("AWS_DEFAULT_REGION"),
"sdk.azure.storage.containers.0": {'account_name': EnvironmentConfig("AZURE_STORAGE_ACCOUNT"),
'account_key': EnvironmentConfig("AZURE_STORAGE_KEY")},
'account_key': ENV_AZURE_ACCOUNT_KEY},
"sdk.google.storage.credentials_json": EnvironmentConfig("GOOGLE_APPLICATION_CREDENTIALS"),
}
@ -132,6 +134,7 @@ ENV_DOCKER_SKIP_GPUS_FLAG = EnvironmentConfig('CLEARML_DOCKER_SKIP_GPUS_FLAG', '
ENV_AGENT_GIT_USER = EnvironmentConfig('CLEARML_AGENT_GIT_USER', 'TRAINS_AGENT_GIT_USER')
ENV_AGENT_GIT_PASS = EnvironmentConfig('CLEARML_AGENT_GIT_PASS', 'TRAINS_AGENT_GIT_PASS')
ENV_AGENT_GIT_HOST = EnvironmentConfig('CLEARML_AGENT_GIT_HOST', 'TRAINS_AGENT_GIT_HOST')
ENV_AGENT_DISABLE_SSH_MOUNT = EnvironmentConfig('CLEARML_AGENT_DISABLE_SSH_MOUNT', type=bool)
ENV_TASK_EXECUTE_AS_USER = EnvironmentConfig('CLEARML_AGENT_EXEC_USER', 'TRAINS_AGENT_EXEC_USER')
ENV_TASK_EXTRA_PYTHON_PATH = EnvironmentConfig('CLEARML_AGENT_EXTRA_PYTHON_PATH', 'TRAINS_AGENT_EXTRA_PYTHON_PATH')
ENV_DOCKER_HOST_MOUNT = EnvironmentConfig('CLEARML_AGENT_K8S_HOST_MOUNT', 'CLEARML_AGENT_DOCKER_HOST_MOUNT',

View File

@ -32,9 +32,9 @@ class K8sIntegration(Worker):
K8S_DEFAULT_NAMESPACE = "clearml"
KUBECTL_APPLY_CMD = "kubectl apply -f"
KUBECTL_APPLY_CMD = "kubectl apply --namespace={namespace} -f"
KUBECTL_RUN_CMD = "kubectl run clearml-{queue_name}-id-{task_id} " \
KUBECTL_RUN_CMD = "kubectl run clearml-id-{task_id} " \
"--image {docker_image} " \
"--restart=Never " \
"--namespace={namespace}"
@ -95,6 +95,7 @@ class K8sIntegration(Worker):
clearml_conf_file=None,
extra_bash_init_script=None,
namespace=None,
max_pods_limit=None,
**kwargs
):
"""
@ -122,6 +123,7 @@ class K8sIntegration(Worker):
:param str clearml_conf_file: clearml.conf file to be use by the pod itself (optional)
:param str extra_bash_init_script: Additional bash script to run before starting the Task inside the container
:param str namespace: K8S namespace to be used when creating the new pods (default: clearml)
:param int max_pods_limit: Maximum number of pods that K8S glue can run at the same time
"""
super(K8sIntegration, self).__init__()
self.k8s_pending_queue_name = k8s_pending_queue_name or self.K8S_PENDING_QUEUE
@ -147,6 +149,7 @@ class K8sIntegration(Worker):
self.namespace = namespace or self.K8S_DEFAULT_NAMESPACE
self.pod_limits = []
self.pod_requests = []
self.max_pods_limit = max_pods_limit if not self.ports_mode else None
if overrides_yaml:
with open(os.path.expandvars(os.path.expanduser(str(overrides_yaml))), 'rt') as f:
overrides = yaml.load(f, Loader=getattr(yaml, 'FullLoader', None))
@ -304,20 +307,22 @@ class K8sIntegration(Worker):
except Exception:
queue_name = 'k8s'
# conform queue name to k8s standards
safe_queue_name = queue_name.lower().strip()
safe_queue_name = re.sub(r'\W+', '', safe_queue_name).replace('_', '').replace('-', '')
# Search for a free pod number
pod_count = 0
pod_number = self.base_pod_num
while self.ports_mode:
while self.ports_mode or self.max_pods_limit:
pod_number = self.base_pod_num + pod_count
kubectl_cmd_new = "kubectl get pods -l {pod_label},{agent_label} -n {namespace}".format(
pod_label=self.LIMIT_POD_LABEL.format(pod_number=pod_number),
agent_label=self.AGENT_LABEL,
namespace=self.namespace,
)
if self.ports_mode:
kubectl_cmd_new = "kubectl get pods -l {pod_label},{agent_label} -n {namespace}".format(
pod_label=self.LIMIT_POD_LABEL.format(pod_number=pod_number),
agent_label=self.AGENT_LABEL,
namespace=self.namespace,
)
else:
kubectl_cmd_new = "kubectl get pods -l {agent_label} -n {namespace} -o json".format(
agent_label=self.AGENT_LABEL,
namespace=self.namespace,
)
process = subprocess.Popen(kubectl_cmd_new.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate()
output = '' if not output else output if isinstance(output, str) else output.decode('utf-8')
@ -326,21 +331,47 @@ class K8sIntegration(Worker):
if not output:
# No such pod exist so we can use the pod_number we found
break
if pod_count >= self.num_of_services - 1:
# All pod numbers are taken, exit
if self.max_pods_limit:
try:
current_pod_count = len(json.loads(output).get("items", []))
except (ValueError, TypeError) as ex:
self.log.warning(
"K8S Glue pods monitor: Failed parsing kubectl output:\n{}\ntask '{}' "
"will be enqueued back to queue '{}'\nEx: {}".format(
output, task_id, queue, ex
)
)
self._session.api_client.tasks.reset(task_id)
self._session.api_client.tasks.enqueue(task_id, queue=queue, status_reason='kubectl parsing error')
return
max_count = self.max_pods_limit
else:
current_pod_count = pod_count
max_count = self.num_of_services - 1
if current_pod_count >= max_count:
# All pods are taken, exit
self.log.debug(
"kubectl last result: {}\n{}".format(error, output))
self.log.warning(
"kubectl last result: {}\n{}\nAll k8s services are in use, task '{}' "
"All k8s services are in use, task '{}' "
"will be enqueued back to queue '{}'".format(
error, output, task_id, queue
task_id, queue
)
)
self._session.api_client.tasks.reset(task_id)
self._session.api_client.tasks.enqueue(
task_id, queue=queue, status_reason='k8s max pod limit (no free k8s service)')
return
elif self.max_pods_limit:
# max pods limit hasn't reached yet, so we can create the pod
break
pod_count += 1
labels = ([self.LIMIT_POD_LABEL.format(pod_number=pod_number)] if self.ports_mode else []) + [self.AGENT_LABEL]
labels.append("clearml-agent-queue={}".format(self._safe_k8s_label_value(queue)))
labels.append("clearml-agent-queue-name={}".format(self._safe_k8s_label_value(queue_name)))
if self.ports_mode:
print("Kubernetes scheduling task id={} on pod={} (pod_count={})".format(task_id, pod_number, pod_count))
@ -351,13 +382,13 @@ class K8sIntegration(Worker):
output, error = self._kubectl_apply(
create_clearml_conf=create_clearml_conf,
labels=labels, docker_image=docker_image, docker_args=docker_args,
task_id=task_id, queue=queue, queue_name=safe_queue_name)
task_id=task_id, queue=queue)
else:
output, error = self._kubectl_run(
create_clearml_conf=create_clearml_conf,
labels=labels, docker_image=docker_cmd,
task_data=task_data,
task_id=task_id, queue=queue, queue_name=safe_queue_name)
task_id=task_id, queue=queue)
error = '' if not error else (error if isinstance(error, str) else error.decode('utf-8'))
output = '' if not output else (output if isinstance(output, str) else output.decode('utf-8'))
@ -404,15 +435,16 @@ class K8sIntegration(Worker):
self.log.warning('skipping docker argument {} (only -e --env supported)'.format(cmd))
return {'env': kube_args} if kube_args else {}
def _kubectl_apply(self, create_clearml_conf, docker_image, docker_args, labels, queue, task_id, queue_name):
def _kubectl_apply(self, create_clearml_conf, docker_image, docker_args, labels, queue, task_id):
template = deepcopy(self.template_dict)
template.setdefault('apiVersion', 'v1')
template['kind'] = 'Pod'
template.setdefault('metadata', {})
name = 'clearml-{queue}-id-{task_id}'.format(queue=queue_name, task_id=task_id)
name = 'clearml-id-{task_id}'.format(task_id=task_id)
template['metadata']['name'] = name
template.setdefault('spec', {})
template['spec'].setdefault('containers', [])
template['spec'].setdefault('restartPolicy', 'Never')
if labels:
labels_dict = dict(pair.split('=', 1) for pair in labels)
template['metadata'].setdefault('labels', {})
@ -474,12 +506,11 @@ class K8sIntegration(Worker):
return output, error
def _kubectl_run(self, create_clearml_conf, docker_image, labels, queue, task_data, task_id, queue_name):
def _kubectl_run(self, create_clearml_conf, docker_image, labels, queue, task_data, task_id):
if callable(self.kubectl_cmd):
kubectl_cmd = self.kubectl_cmd(task_id, docker_image, queue, task_data, queue_name)
kubectl_cmd = self.kubectl_cmd(task_id, docker_image, queue, task_data)
else:
kubectl_cmd = self.kubectl_cmd.format(
queue_name=queue_name,
task_id=task_id,
docker_image=docker_image,
queue_id=queue,
@ -607,3 +638,13 @@ class K8sIntegration(Worker):
return merge_dicts(
c1, c2, custom_merge_func=merge_env
)
@staticmethod
def _safe_k8s_label_value(value):
""" Conform string to k8s standards for a label value """
value = value.lower().strip()
value = re.sub(r'^[^A-Za-z0-9]+', '', value) # strip leading non-alphanumeric chars
value = re.sub(r'[^A-Za-z0-9]+$', '', value) # strip trailing non-alphanumeric chars
value = re.sub(r'\W+', '-', value) # allow only word chars (this removed "." which is supported, but nvm)
value = re.sub(r'-+', '-', value) # don't leave messy "--" after replacing previous chars
return value[:63]

View File

@ -42,20 +42,31 @@ def get_bash_output(cmd, strip=False, stderr=subprocess.STDOUT, stdin=False):
return output if not strip or not output else output.strip()
def terminate_process(pid, timeout=10.):
def terminate_process(pid, timeout=10., ignore_zombie=True, include_children=False):
# noinspection PyBroadException
try:
proc = psutil.Process(pid)
children = proc.children(recursive=True) if include_children else []
proc.terminate()
cnt = 0
while proc.is_running() and cnt < timeout:
while proc.is_running() and (ignore_zombie or proc.status() != 'zombie') and cnt < timeout:
sleep(1.)
cnt += 1
proc.terminate()
# terminate children
for c in children:
c.terminate()
cnt = 0
while proc.is_running() and cnt < timeout:
while proc.is_running() and (ignore_zombie or proc.status() != 'zombie') and cnt < timeout:
sleep(1.)
cnt += 1
# kill children
for c in children:
c.kill()
proc.kill()
except Exception:
pass
@ -66,9 +77,8 @@ def terminate_process(pid, timeout=10.):
return True
def kill_all_child_processes(pid=None):
def kill_all_child_processes(pid=None, include_parent=True):
# get current process if pid not provided
include_parent = True
if not pid:
pid = os.getpid()
include_parent = False
@ -84,6 +94,23 @@ def kill_all_child_processes(pid=None):
parent.kill()
def terminate_all_child_processes(pid=None, timeout=10., include_parent=True):
# get current process if pid not provided
if not pid:
pid = os.getpid()
include_parent = False
try:
parent = psutil.Process(pid)
except psutil.Error:
# could not find parent process id
return
for child in parent.children(recursive=False):
print('Terminating child process {}'.format(child.pid))
terminate_process(child.pid, timeout=timeout, ignore_zombie=False, include_children=True)
if include_parent:
terminate_process(parent.pid, timeout=timeout, ignore_zombie=False)
def get_docker_id(docker_cmd_contains):
try:
containers_running = get_bash_output(cmd='docker ps --no-trunc --format \"{{.ID}}: {{.Command}}\"')
@ -194,6 +221,7 @@ class Argv(Executable):
"""
self.argv = argv
self._log = kwargs.pop("log", None)
self._display_argv = kwargs.pop("display_argv", argv)
if not self._log:
self._log = logging.getLogger(__name__)
self._log.propagate = False
@ -218,10 +246,10 @@ class Argv(Executable):
return self.argv
def __repr__(self):
return "<Argv{}>".format(self.argv)
return "<Argv{}>".format(self._display_argv)
def __str__(self):
return "Executing: {}".format(self.argv)
return "Executing: {}".format(self._display_argv)
def __iter__(self):
if is_windows_platform():

View File

@ -591,7 +591,7 @@ def clone_repository_cached(session, execution, destination):
# mock lock
repo_lock = Lock()
repo_lock_timeout_sec = 300
repo_url = execution.repository # type: str
repo_url = execution.repository or '' # type: str
parsed_url = furl(repo_url)
no_password_url = parsed_url.copy().remove(password=True).url

View File

@ -1 +1 @@
__version__ = '0.17.2'
__version__ = '1.0.1rc1'

View File

@ -42,6 +42,9 @@ agent {
# Example values: "/usr/bin/python3" or "/usr/local/bin/python3.6"
# The default is the python executing the clearml_agent
python_binary: ""
# ignore any requested python version (Default: False, if a Task was using a
# specific python version and the system supports multiple python the agent will use the requested python version)
# ignore_requested_python_version: true
# select python package manager:
# currently supported pip and conda
@ -107,11 +110,12 @@ agent {
path: ~/.clearml/vcs-cache
},
# DEPRECATED: please use `venvs_cache` and set `venvs_cache.path`
# use venv-update in order to accelerate python virtual environment building
# Still in beta, turned off by default
venv_update: {
enabled: false,
},
# venv_update: {
# enabled: false,
# },
# cached folder for specific python package download (mostly pytorch versions)
pip_download_cache {

View File

@ -10,12 +10,15 @@ from clearml_agent.glue.k8s import K8sIntegration
def parse_args():
parser = ArgumentParser()
group = parser.add_mutually_exclusive_group()
parser.add_argument(
"--queue", type=str, help="Queue to pull tasks from"
)
parser.add_argument(
group.add_argument(
"--ports-mode", action='store_true', default=False,
help="Ports-Mode will add a label to the pod which can be used as service, in order to expose ports"
"Should not be used with max-pods"
)
parser.add_argument(
"--num-of-services", type=int, default=20,
@ -57,6 +60,11 @@ def parse_args():
"--namespace", type=str,
help="Specify the namespace in which pods will be created (default: %(default)s)", default="clearml"
)
group.add_argument(
"--max-pods", type=int,
help="Limit the maximum number of pods that this service can run at the same time."
"Should not be used with ports-mode"
)
return parser.parse_args()
@ -77,7 +85,7 @@ def main():
user_props_cb=user_props_cb, overrides_yaml=args.overrides_yaml, clearml_conf_file=args.pod_clearml_conf,
template_yaml=args.template_yaml, extra_bash_init_script=K8sIntegration.get_ssh_server_bash(
ssh_port_number=args.ssh_server_port) if args.ssh_server_port else None,
namespace=args.namespace,
namespace=args.namespace, max_pods_limit=args.max_pods or None,
)
k8s.k8s_daemon(args.queue)

View File

@ -9,7 +9,7 @@ pyhocon>=0.3.38,<0.4.0
pyparsing>=2.0.3,<2.5.0
python-dateutil>=2.4.2,<2.9.0
pyjwt>=1.6.4,<2.1.0
PyYAML>=3.12,<5.4.0
PyYAML>=3.12,<5.5.0
requests>=2.20.0,<2.26.0
six>=1.11.0,<1.16.0
typing>=3.6.4,<3.8.0