diff --git a/clearml_agent/backend_api/session/defs.py b/clearml_agent/backend_api/session/defs.py index d96102f..fbf716f 100644 --- a/clearml_agent/backend_api/session/defs.py +++ b/clearml_agent/backend_api/session/defs.py @@ -6,6 +6,8 @@ ENV_WEB_HOST = EnvEntry("CLEARML_WEB_HOST", "TRAINS_WEB_HOST") ENV_FILES_HOST = EnvEntry("CLEARML_FILES_HOST", "TRAINS_FILES_HOST") ENV_ACCESS_KEY = EnvEntry("CLEARML_API_ACCESS_KEY", "TRAINS_API_ACCESS_KEY") ENV_SECRET_KEY = EnvEntry("CLEARML_API_SECRET_KEY", "TRAINS_API_SECRET_KEY") +ENV_AUTH_TOKEN = EnvEntry("CLEARML_AUTH_TOKEN") ENV_VERBOSE = EnvEntry("CLEARML_API_VERBOSE", "TRAINS_API_VERBOSE", type=bool, default=False) ENV_HOST_VERIFY_CERT = EnvEntry("CLEARML_API_HOST_VERIFY_CERT", "TRAINS_API_HOST_VERIFY_CERT", type=bool, default=True) ENV_CONDA_ENV_PACKAGE = EnvEntry("CLEARML_CONDA_ENV_PACKAGE", "TRAINS_CONDA_ENV_PACKAGE") +ENV_NO_DEFAULT_SERVER = EnvEntry("CLEARML_NO_DEFAULT_SERVER", "TRAINS_NO_DEFAULT_SERVER", type=bool, default=False) diff --git a/clearml_agent/backend_api/session/session.py b/clearml_agent/backend_api/session/session.py index fae184b..757b9b6 100644 --- a/clearml_agent/backend_api/session/session.py +++ b/clearml_agent/backend_api/session/session.py @@ -11,7 +11,8 @@ from pyhocon import ConfigTree from requests.auth import HTTPBasicAuth from .callresult import CallResult -from .defs import ENV_VERBOSE, ENV_HOST, ENV_ACCESS_KEY, ENV_SECRET_KEY, ENV_WEB_HOST, ENV_FILES_HOST +from .defs import ENV_VERBOSE, ENV_HOST, ENV_ACCESS_KEY, ENV_SECRET_KEY, ENV_WEB_HOST, ENV_FILES_HOST, ENV_AUTH_TOKEN, \ + ENV_NO_DEFAULT_SERVER from .request import Request, BatchRequest from .token_manager import TokenManager from ..config import load @@ -45,6 +46,7 @@ class Session(TokenManager): _write_session_timeout = (30.0, 30.) api_version = '2.1' + feature_set = 'basic' default_host = "https://demoapi.demo.clear.ml" default_web = "https://demoapp.demo.clear.ml" default_files = "https://demofiles.demo.clear.ml" @@ -103,30 +105,42 @@ class Session(TokenManager): "auth.token_expiration_threshold_sec", 60 ) + self._verbose = verbose if verbose is not None else ENV_VERBOSE.get() + self._logger = logger + self.__auth_token = None + + if ENV_AUTH_TOKEN.get( + value_cb=lambda key, value: print("Using environment access token {}=********".format(key)) + ): + self._set_auth_token(ENV_AUTH_TOKEN.get()) + # if we use a token we override make sure we are at least 3600 seconds (1 hour) + # away from the token expiration date, ask for a new one. + token_expiration_threshold_sec = max(token_expiration_threshold_sec, 3600) + else: + self.__access_key = api_key or ENV_ACCESS_KEY.get( + default=(self.config.get("api.credentials.access_key", None) or self.default_key), + value_cb=lambda key, value: print("Using environment access key {}={}".format(key, value)) + ) + if not self.access_key: + raise ValueError( + "Missing access_key. Please set in configuration file or pass in session init." + ) + + self.__secret_key = secret_key or ENV_SECRET_KEY.get( + default=(self.config.get("api.credentials.secret_key", None) or self.default_secret), + value_cb=lambda key, value: print("Using environment secret key {}=********".format(key)) + ) + if not self.secret_key: + raise ValueError( + "Missing secret_key. Please set in configuration file or pass in session init." + ) + super(Session, self).__init__( token_expiration_threshold_sec=token_expiration_threshold_sec, **kwargs ) - self._verbose = verbose if verbose is not None else ENV_VERBOSE.get() - self._logger = logger - - self.__access_key = api_key or ENV_ACCESS_KEY.get( - default=(self.config.get("api.credentials.access_key", None) or self.default_key), - value_cb=lambda key, value: print("Using environment access key {}={}".format(key, value)) - ) - if not self.access_key: - raise ValueError( - "Missing access_key. Please set in configuration file or pass in session init." - ) - - self.__secret_key = secret_key or ENV_SECRET_KEY.get( - default=(self.config.get("api.credentials.secret_key", None) or self.default_secret), - value_cb=lambda key, value: print("Using environment secret key {}=********".format(key)) - ) - if not self.secret_key: - raise ValueError( - "Missing secret_key. Please set in configuration file or pass in session init." - ) + if self.access_key == self.default_key and self.secret_key == self.default_secret: + print("Using built-in ClearML default key/secret") host = host or self.get_api_server_host(config=self.config) if not host: @@ -163,6 +177,7 @@ class Session(TokenManager): api_version = '2.2' if token_dict.get('env', '') == 'prod' else Session.api_version Session.api_version = str(api_version) + Session.feature_set = str(token_dict.get('feature_set', self.feature_set) or "basic") except (jwt.DecodeError, ValueError): pass @@ -244,6 +259,14 @@ class Session(TokenManager): headers[self._AUTHORIZATION_HEADER] = "Bearer {}".format(self.token) return headers + def _set_auth_token(self, auth_token): + self.__access_key = self.__secret_key = None + self.__auth_token = auth_token + + def set_auth_token(self, auth_token): + self._set_auth_token(auth_token) + self.refresh_token() + def send_request( self, service, @@ -441,8 +464,11 @@ class Session(TokenManager): if not config: return None - return ENV_HOST.get(default=(config.get("api.api_server", None) or - config.get("api.host", None) or cls.default_host)) + default = config.get("api.api_server", None) or config.get("api.host", None) + if not ENV_NO_DEFAULT_SERVER.get(): + default = default or cls.default_host + + return ENV_HOST.get(default=default) @classmethod def get_app_server_host(cls, config=None): @@ -522,7 +548,13 @@ class Session(TokenManager): ) ) - auth = HTTPBasicAuth(self.access_key, self.secret_key) + headers = None + # use token only once (the second time the token is already built into the http session) + if self.__auth_token: + headers = dict(Authorization="Bearer {}".format(self.__auth_token)) + self.__auth_token = None + + auth = HTTPBasicAuth(self.access_key, self.secret_key) if self.access_key and self.secret_key else None res = None try: data = {"expiration_sec": exp} if exp else {} @@ -531,6 +563,7 @@ class Session(TokenManager): action="login", auth=auth, json=data, + headers=headers, refresh_token_if_unauthorized=False, ) try: diff --git a/clearml_agent/commands/base.py b/clearml_agent/commands/base.py index 7ef34ba..b198649 100644 --- a/clearml_agent/commands/base.py +++ b/clearml_agent/commands/base.py @@ -118,11 +118,13 @@ class ServiceCommandSection(BaseCommandSection): """ The name of the REST service used by this command """ pass - def get(self, endpoint, *args, **kwargs): - return self._session.get(service=self.service, action=endpoint, *args, **kwargs) + def get(self, endpoint, *args, session=None, **kwargs): + session = session or self._session + return session.get(service=self.service, action=endpoint, *args, **kwargs) - def post(self, endpoint, *args, **kwargs): - return self._session.post(service=self.service, action=endpoint, *args, **kwargs) + def post(self, endpoint, *args, session=None, **kwargs): + session = session or self._session + return session.post(service=self.service, action=endpoint, *args, **kwargs) def get_with_act_as(self, endpoint, *args, **kwargs): return self._session.get_with_act_as(service=self.service, action=endpoint, *args, **kwargs) diff --git a/clearml_agent/commands/events.py b/clearml_agent/commands/events.py index d1abb52..e10c6e5 100644 --- a/clearml_agent/commands/events.py +++ b/clearml_agent/commands/events.py @@ -21,14 +21,16 @@ class Events(ServiceCommandSection): """ Events command service endpoint """ return 'events' - def send_events(self, list_events): + def send_events(self, list_events, session=None): def send_packet(jsonlines): if not jsonlines: return 0 num_lines = len(jsonlines) jsonlines = '\n'.join(jsonlines) - new_events = self.post('add_batch', data=jsonlines, headers={'Content-type': 'application/json-lines'}) + new_events = self.post( + 'add_batch', data=jsonlines, headers={'Content-type': 'application/json-lines'}, session=session + ) if new_events['added'] != num_lines: print('Error (%s) sending events only %d of %d registered' % (new_events['errors'], new_events['added'], num_lines)) @@ -57,7 +59,7 @@ class Events(ServiceCommandSection): # print('Sending events done: %d / %d events sent' % (sent_events, len(list_events))) return sent_events - def send_log_events(self, worker_id, task_id, lines, level='DEBUG'): + def send_log_events(self, worker_id, task_id, lines, level='DEBUG', session=None): log_events = [] base_timestamp = int(time.time() * 1000) base_log_items = { @@ -94,4 +96,4 @@ class Events(ServiceCommandSection): log_events.append(get_event(count)) # now send the events - return self.send_events(list_events=log_events) + return self.send_events(list_events=log_events, session=session) diff --git a/clearml_agent/commands/worker.py b/clearml_agent/commands/worker.py index 40fbe93..e452dca 100644 --- a/clearml_agent/commands/worker.py +++ b/clearml_agent/commands/worker.py @@ -6,17 +6,18 @@ import logging import os import os.path import re +import shlex +import shutil import signal import subprocess import sys -import shutil import traceback -import shlex from collections import defaultdict from copy import deepcopy, copy from datetime import datetime from distutils.spawn import find_executable -from functools import partial, cmp_to_key +from distutils.util import strtobool +from functools import partial from itertools import chain from tempfile import mkdtemp, NamedTemporaryFile from time import sleep, time @@ -25,15 +26,16 @@ from typing import Text, Optional, Any, Tuple, List import attr import psutil import six -from clearml_agent.backend_api.services import queues as queues_api -from clearml_agent.backend_api.services import tasks as tasks_api -from clearml_agent.backend_api.services import workers as workers_api from pathlib2 import Path from pyhocon import ConfigTree, ConfigFactory from six.moves.urllib.parse import quote +from clearml_agent.backend_api.services import auth as auth_api +from clearml_agent.backend_api.services import queues as queues_api +from clearml_agent.backend_api.services import tasks as tasks_api +from clearml_agent.backend_api.services import workers as workers_api +from clearml_agent.backend_api.session import CallResult from clearml_agent.backend_config.defs import UptimeConf -from clearml_agent.helper.check_update import start_check_update_daemon from clearml_agent.commands.base import resolve_names, ServiceCommandSection from clearml_agent.definitions import ( ENVIRONMENT_SDK_PARAMS, @@ -46,12 +48,16 @@ from clearml_agent.definitions import ( ENV_AGENT_GIT_USER, ENV_AGENT_GIT_PASS, ENV_WORKER_ID, + ENV_WORKER_TAGS, ENV_DOCKER_SKIP_GPUS_FLAG, ENV_AGENT_SECRET_KEY, + ENV_AGENT_AUTH_TOKEN, ENV_AWS_SECRET_KEY, ENV_AZURE_ACCOUNT_KEY, ENV_AGENT_DISABLE_SSH_MOUNT, ENV_SSH_AUTH_SOCK, + ENV_AGENT_SKIP_PIP_VENV_INSTALL, + ENV_EXTRA_DOCKER_ARGS, ) from clearml_agent.definitions import WORKING_REPOSITORY_DIR, PIP_EXTRA_INDICES from clearml_agent.errors import APIError, CommandFailedError, Sigterm @@ -79,15 +85,17 @@ from clearml_agent.helper.base import ( add_python_path, safe_remove_tree, ) +from clearml_agent.helper.check_update import start_check_update_daemon from clearml_agent.helper.console import ensure_text, print_text, decode_binary_lines from clearml_agent.helper.os.daemonize import daemonize_process from clearml_agent.helper.package.base import PackageManager from clearml_agent.helper.package.conda_api import CondaAPI -from clearml_agent.helper.package.post_req import PostRequirement from clearml_agent.helper.package.external_req import ExternalRequirements, OnlyExternalRequirements from clearml_agent.helper.package.pip_api.system import SystemPip from clearml_agent.helper.package.pip_api.venv import VirtualenvPip from clearml_agent.helper.package.poetry_api import PoetryConfig, PoetryAPI +from clearml_agent.helper.package.post_req import PostRequirement +from clearml_agent.helper.package.priority_req import PriorityPackageRequirement, PackageCollectorRequirement from clearml_agent.helper.package.pytorch import PytorchRequirement from clearml_agent.helper.package.requirements import RequirementsManager from clearml_agent.helper.package.venv_update_api import VenvUpdateAPI @@ -106,13 +114,11 @@ from clearml_agent.helper.process import ( check_if_command_exists, terminate_all_child_processes, ) -from clearml_agent.helper.package.priority_req import PriorityPackageRequirement, PackageCollectorRequirement from clearml_agent.helper.repo import clone_repository_cached, RepoInfo, VCS, fix_package_import_diff_patch from clearml_agent.helper.resource_monitor import ResourceMonitor from clearml_agent.helper.runtime_verification import check_runtime, print_uptime_properties -from clearml_agent.session import Session from clearml_agent.helper.singleton import Singleton - +from clearml_agent.session import Session from .events import Events DOCKER_ROOT_CONF_FILE = "/root/clearml.conf" @@ -222,16 +228,68 @@ def get_repo_auth_string(user, password): CONCAT_CMD = select_for_platform(linux=" && ", windows=" & ") +class TaskNotFoundError(APIError): + pass + + class TaskStopReason(object): no_stop = 0 # type: TaskStopReason stopped = 1 # type: TaskStopReason reset = 2 # type: TaskStopReason status_changed = 3 # type: TaskStopReason exception = 4 # type: TaskStopReason + not_found = 5 # type: TaskStopReason -def get_task(session, task_id, *args, **kwargs): - return session.api_client.tasks.get_all(id=[task_id], *args, **kwargs)[0] +def get_task(session, task_id, **kwargs): + """Use manual api call so that we can pass 'search_hidden' param from api v2.14""" + # return session.api_client.tasks.get_all(id=[task_id], **kwargs)[0] + res = session.send_request( + service='tasks', + action='get_all', + version='2.14', + json={"id": [task_id], "search_hidden": True, **kwargs}, + method='get', + async_enable=False, + ) + result = CallResult.from_result( + res=res, + request_cls=tasks_api.GetAllRequest, + logger=session._logger, + service='tasks', + action='get_all', + session=session, + ) + if not result.ok(): + raise APIError(result) + if not result.response: + raise APIError(result, extra_info="Invalid response") + if not result.response.tasks: + raise TaskNotFoundError(result) + return result.response.tasks[0] + + +def get_next_task(session, queue, get_task_info=False): + """ + Returns dict that contains next task and its additional info (company, user) + """ + request = {'queue': queue} + if get_task_info: + request["get_task_info"] = True + result = session.send_request( + service='queues', + action='get_next_task', + version='2.14', + json=request, + method='get', + async_enable=False, + ) + if not result.ok: + raise APIError(result) + data = result.json().get('data') + if data is None: + raise APIError(result, extra_info="Invalid response") + return data def get_task_container(session, task_id): @@ -242,8 +300,8 @@ def get_task_container(session, task_id): result = session.send_request( service='tasks', action='get_all', - version='2.13', - json={'id': [task_id], 'only_fields': ['container']}, + version='2.14', + json={'id': [task_id], 'only_fields': ['container'], 'search_hidden': True}, method='get', async_enable=False, ) @@ -334,6 +392,8 @@ class TaskStopSignal(object): """ try: return self._test() + except TaskNotFoundError: + return TaskStopReason.not_found except Exception as ex: self.command.log_traceback(ex) # make sure we break nothing @@ -413,6 +473,11 @@ class Worker(ServiceCommandSection): # last message before passing control to the actual task _task_logging_pass_control_message = "Running task id [{}]:" + # label with worker id for worker agent docker in services mode + _worker_label = "clearml-worker-id={}" + # label with parent worker id for worker agent docker in services mode + _parent_worker_label = "clearml-parent-worker-id={}" + _run_as_user_home = '/clearml_agent_home' _docker_fixed_user_cache = '/clearml_agent_cache' _temp_cleanup_list = [] @@ -480,12 +545,16 @@ class Worker(ServiceCommandSection): self._docker_image = None self._docker_arguments = None PackageManager.set_pip_version(self._session.config.get("agent.package_manager.pip_version", None)) - self._extra_docker_arguments = self._session.config.get("agent.extra_docker_arguments", None) + self._extra_docker_arguments = ( + ENV_EXTRA_DOCKER_ARGS.get() or self._session.config.get("agent.extra_docker_arguments", None) + ) self._extra_shell_script = self._session.config.get("agent.extra_docker_shell_script", None) self._docker_force_pull = self._session.config.get("agent.docker_force_pull", False) self._daemon_foreground = None self._standalone_mode = None self._services_mode = None + self._impersonate_as_task_owner = None + self._worker_tags = None self._dynamic_gpus = None self._force_current_version = None self._redirected_stdout_file_no = None @@ -529,36 +598,39 @@ class Worker(ServiceCommandSection): ) return requirements_manager - def handle_user_abort(self, task_id): + def handle_user_abort(self, task_id, session=None): """ Set task status to appropriate value on user abort. """ + session = session or self._session try: - task_status = self._session.send_api( + task_status = session.send_api( tasks_api.GetByIdRequest(task_id) ).task.status if task_status == tasks_api.TaskStatusEnum.in_progress: print("\nUser abort - stopping task {}".format(task_id)) - self._session.send_api(tasks_api.StoppedRequest(task_id)) + session.send_api(tasks_api.StoppedRequest(task_id)) except Exception: pass - def run_one_task(self, queue, task_id, worker_args, docker=None): + def run_one_task(self, queue, task_id, worker_args, docker=None, task_session=None): # type: (Text, Text, WorkerParams, Optional[Text]) -> () """ Run one task pulled from queue. :param queue: ID of queue that task was pulled from :param task_id: ID of task to run :param worker_args: Worker command line arguments + :params task_session: The session for running operations on the passed task :param docker: Docker image in which the execution task will run """ # start new process and execute task id # "Running task '{}'".format(task_id) print(self._task_logging_start_message.format(task_id)) - + task_session = task_session or self._session # set task status to in_progress so we know it was popped from the queue + # noinspection PyBroadException try: - self._session.send_api(tasks_api.StartedRequest(task=task_id, force=True)) + task_session.send_api(tasks_api.StartedRequest(task=task_id, status_message="pulled by agent", force=True)) except Exception: print("Warning: Could not start task id '{}', skipping".format(task_id)) return @@ -581,7 +653,7 @@ class Worker(ServiceCommandSection): if self.docker_image_func: # noinspection PyBroadException try: - task_container = get_task_container(self._session, task_id) + task_container = get_task_container(task_session, task_id) except Exception: task_container = {} @@ -598,31 +670,36 @@ class Worker(ServiceCommandSection): task_id, "default " if default_docker else '', docker_image, docker_arguments or [])] + (['custom_setup_bash_script:\n{}'.format(docker_setup_script)] if docker_setup_script else []), - level="INFO") + level="INFO", + session=task_session, + ) # Update docker command + docker_params = dict( + docker_image=docker_image, + docker_arguments=docker_arguments, + docker_bash_setup_script=docker_setup_script, + ) + if self._impersonate_as_task_owner: + docker_params["auth_token"] = task_session.token + if self._worker_tags: + docker_params["worker_tags"] = self._worker_tags if self._services_mode: # if this is services mode, give the docker a unique worker id, as it will register itself. - full_docker_cmd = self.docker_image_func( - worker_id=worker_id, - docker_image=docker_image, - docker_arguments=docker_arguments, - docker_bash_setup_script=docker_setup_script) - else: - full_docker_cmd = self.docker_image_func( - docker_image=docker_image, - docker_arguments=docker_arguments, - docker_bash_setup_script=docker_setup_script) + docker_params["worker_id"] = worker_id + full_docker_cmd = self.docker_image_func(**docker_params) # if we are using the default docker, update back the Task: if default_docker: # noinspection PyBroadException try: set_task_container( - self._session, task_id, + task_session, + task_id=task_id, docker_image=docker_image, docker_arguments=docker_arguments, - docker_bash_script=docker_setup_script) + docker_bash_script=docker_setup_script, + ) except Exception: pass @@ -637,7 +714,12 @@ class Worker(ServiceCommandSection): display_docker_command = self._sanitize_docker_command(full_docker_cmd) # send the actual used command line to the backend - self.send_logs(task_id=task_id, lines=['Executing: {}\n'.format(display_docker_command)], level="INFO") + self.send_logs( + task_id=task_id, + lines=['Executing: {}\n'.format(display_docker_command)], + level="INFO", + session=task_session, + ) cmd = Argv(*full_docker_cmd, display_argv=display_docker_command) @@ -652,7 +734,7 @@ class Worker(ServiceCommandSection): events_service = self.get_service(Events) stop_signal = TaskStopSignal( command=self, - session=self._session, + session=task_session, events_service=events_service, task_id=task_id, ) @@ -672,8 +754,15 @@ class Worker(ServiceCommandSection): stderr_path=temp_stderr_name, daemon=True, stop_signal=stop_signal, + session=task_session, ) + env = os.environ.copy() + if self._impersonate_as_task_owner: + env[ENV_AGENT_AUTH_TOKEN.vars[0]] = task_session.token + if self._worker_tags: + env[ENV_WORKER_TAGS.vars[0]] = " ".join(shlex.quote(t) for t in self._worker_tags) + status, stop_signal_status = self._log_command_output( task_id=task_id, cmd=cmd, @@ -681,6 +770,8 @@ class Worker(ServiceCommandSection): stderr_path=temp_stderr_name, daemon=True, stop_signal=stop_signal, + env=env, + session=task_session, ) errors = temp_stderr_name and Path(temp_stderr_name).read_text() if errors: @@ -696,13 +787,13 @@ class Worker(ServiceCommandSection): else: print("DONE: Running task '{}', exit status {}".format(task_id, status)) except KeyboardInterrupt: - self.handle_user_abort(task_id) + self.handle_user_abort(task_id, session=task_session) status = ExitStatus.interrupted finally: if self._services_mode and stop_signal_status is None: print('Service started, docker running in the background') else: - self.handle_task_termination(task_id, status, stop_signal_status) + self.handle_task_termination(task_id, status, stop_signal_status, session=task_session) # remove temp files after we sent everything to the backend if self.docker_image_func: shutdown_docker_process(docker_cmd_contains='--id {}\'\"'.format(task_id)) @@ -712,6 +803,32 @@ class Worker(ServiceCommandSection): # unregister this worker, it was killed self._unregister() + def get_task_session(self, user, company): + """ + Get task session for the user by cloning the agent session + and replacing the session credentials with the task owner auth token + In case the task is not from the user company proceed with another login + to get the auth token for the tenant and return the session for it + Requires that agent session credentials will allow impersonation as task user + """ + def get_new_session(session, headers): + result = session.send(auth_api.LoginRequest(), headers=headers) + if not (result.ok() and result.response): + return + new_session = copy(session) + new_session.set_auth_token(result.response.token) + return new_session + + task_session = get_new_session(self._session, headers={"X-Clearml-Impersonate-As": user}) + if not task_session: + return + + token = task_session.get_decoded_token(task_session.token) + if token.get("tenant") == company: + return task_session + + return get_new_session(task_session, headers={"X-Clearml-Tenant": company}) + def run_tasks_loop(self, queues, worker_params, priority_order=True, gpu_indexes=None, gpu_queues=None): """ :summary: Pull and run tasks from queues. @@ -760,10 +877,17 @@ class Worker(ServiceCommandSection): if max_num_instances and max_num_instances > 0: # make sure we do not have too many instances to run - if len(Singleton.get_running_pids()) >= max_num_instances: + if self.docker_image_func: + running_count = self._get_child_agents_count_for_worker() + else: + running_count = len(Singleton.get_running_pids()) + if running_count >= max_num_instances: if self._daemon_foreground or worker_params.debug: - print("Reached max number of services, sleeping for {:.1f} seconds".format( - self._polling_interval)) + print( + "Reached max number of services {}, sleeping for {:.1f} seconds".format( + max_num_instances, self._polling_interval + ) + ) sleep(self._polling_interval) continue @@ -808,8 +932,8 @@ class Worker(ServiceCommandSection): # get next task in queue try: - response = self._session.send_api( - queues_api.GetNextTaskRequest(queue=queue) + response = get_next_task( + self._session, queue=queue, get_task_info=self._impersonate_as_task_owner ) except Exception as e: print( @@ -820,8 +944,8 @@ class Worker(ServiceCommandSection): continue else: try: - task_id = response.entry.task - except AttributeError: + task_id = response["entry"]["task"] + except (KeyError, TypeError, AttributeError): if self._daemon_foreground or worker_params.debug: print("No tasks in queue {}".format(queue)) continue @@ -836,6 +960,24 @@ class Worker(ServiceCommandSection): except: pass + task_session = None + if self._impersonate_as_task_owner: + try: + task_user = response["task_info"]["user"] + task_company = response["task_info"]["company"] + except (KeyError, TypeError, AttributeError): + print("Error: cannot retrieve owner user for the task '{}', skipping".format(task_id)) + continue + + task_session = self.get_task_session(task_user, task_company) + if not task_session: + print( + "Error: Could not login as the user '{}' for the task '{}', skipping".format( + task_user, task_id + ) + ) + continue + self.report_monitor(ResourceMonitor.StatusReport(queues=queues, queue=queue, task=task_id)) org_gpus = os.environ.get('NVIDIA_VISIBLE_DEVICES') @@ -858,9 +1000,10 @@ class Worker(ServiceCommandSection): task_id=task_id, lines=["task {} pulled from {} by worker {}\n".format(task_id, queue, self.worker_id)], level="INFO", + session=task_session, ) - self.run_one_task(queue, task_id, worker_params) + self.run_one_task(queue, task_id, worker_params, task_session=task_session) if gpu_queues: self.worker_id = dynamic_gpus_worker_id @@ -982,12 +1125,12 @@ class Worker(ServiceCommandSection): try: self.post("set_runtime_properties", json={ - 'runtime_properties': [{'key': key, 'value': value}], + 'runtime_properties': [{'key': key, 'value': str(value)}], 'worker': self.worker_id}) # definitely supported self._runtime_props_support = True return True - except APIError: + except APIError as ex: self._runtime_props_support = self._session.api_version except Exception as ex: # not sure what happened @@ -1051,7 +1194,7 @@ class Worker(ServiceCommandSection): self ) ) - self.dump_config(self.temp_config_path) + self.dump_config(self.temp_config_path, clean_api_credentials=self._impersonate_as_task_owner) def check(self, **_): try: @@ -1086,6 +1229,13 @@ class Worker(ServiceCommandSection): if docker not in (False, None) and not check_if_command_exists("docker"): raise ValueError("Running in Docker mode, 'docker' command was not found") + self._worker_tags = kwargs.get('child_report_tags', None) + self._impersonate_as_task_owner = kwargs.get('use_owner_token', False) + if self._impersonate_as_task_owner: + if not self._session.check_min_api_version("2.14"): + raise ValueError("Apiserver does not support --use-owner-token option. The apiserver version is too low") + if self._session.feature_set == "basic": + raise ValueError("Apiserver does not support --use-owner-token option") self._standalone_mode = kwargs.get('standalone_mode', False) self._services_mode = kwargs.get('services_mode', False) # must have docker in services_mode @@ -1162,9 +1312,9 @@ class Worker(ServiceCommandSection): # print docker image if docker is not False and docker is not None: self._force_current_version = kwargs.get('force_current_version', False) - self.set_docker_variables(docker) + self.set_docker_variables(docker, clean_api_credentials=self._impersonate_as_task_owner) else: - self.dump_config(self.temp_config_path) + self.dump_config(self.temp_config_path, clean_api_credentials=self._impersonate_as_task_owner) # only in none docker we have to make sure we have CUDA setup # make sure we have CUDA set if we have --gpus @@ -1324,17 +1474,17 @@ class Worker(ServiceCommandSection): def new_monitor(self, report=None): self.stop_monitor() self.monitor = ResourceMonitor( - session=self._session, worker_id=self.worker_id, + session=self._session, + worker_id=self.worker_id, first_report_sec=3.0, - report_frequency_sec=self._machine_update_interval) + report_frequency_sec=self._machine_update_interval, + worker_tags=None if self._services_mode else self._worker_tags, + ) self.monitor.set_report(report) self.monitor.start() return self.monitor - def dump_config(self, filename, config=None): - def to_json(config): - return json.dumps(config.as_plain_ordered_dict(), cls=HOCONEncoder, indent=4) - + def dump_config(self, filename, config=None, clean_api_credentials=False): # noinspection PyBroadException try: current_content = Path(filename).read_text() @@ -1343,7 +1493,13 @@ class Worker(ServiceCommandSection): # noinspection PyBroadException try: - new_content = six.text_type(self._session.to_json() if config is None else to_json(config)) + config_data = self._session.config.as_plain_ordered_dict() if config is None else config.as_plain_ordered_dict() + if clean_api_credentials: + api = config_data.get("api") + if api: + api.pop("credentials", None) + + new_content = six.text_type(json.dumps(config_data, cls=HOCONEncoder, indent=4)) # Overwrite file only if the content is different, because we are mounting the same file into # multiple containers in services mode, and we don't want to change it if we do not have to. if new_content != current_content: @@ -1361,6 +1517,7 @@ class Worker(ServiceCommandSection): daemon=False, # type: bool cwd=None, # type: Text stop_signal=None, # type: Optional[TaskStopSignal] + session=None, **kwargs # type: Any ): # type: (...) -> Tuple[Optional[int], Optional[TaskStopReason]] @@ -1411,6 +1568,7 @@ class Worker(ServiceCommandSection): task_id=task_id, lines=["User aborted: stopping task ({})\n".format(str(stop_reason))], level="ERROR", + session=session, ) kill_all_child_processes(process.pid) else: @@ -1433,10 +1591,10 @@ class Worker(ServiceCommandSection): if status is not None: stop_reason = 'Service started' - stdout_line_count += self.send_logs(task_id, printed_lines) + stdout_line_count += self.send_logs(task_id, printed_lines, session=session) if stderr_path: printed_lines, stderr_pos_count = _print_file(stderr_path, stderr_pos_count) - stderr_line_count += self.send_logs(task_id, printed_lines) + stderr_line_count += self.send_logs(task_id, printed_lines, session=session) except subprocess.CalledProcessError as ex: # non zero return code @@ -1450,10 +1608,10 @@ class Worker(ServiceCommandSection): except Exception: # we should not get here, but better safe than sorry printed_lines, stdout_pos_count = _print_file(stdout_path, stdout_pos_count) - stdout_line_count += self.send_logs(task_id, printed_lines) + stdout_line_count += self.send_logs(task_id, printed_lines, session=session) if stderr_path: printed_lines, stderr_pos_count = _print_file(stderr_path, stderr_pos_count) - stderr_line_count += self.send_logs(task_id, printed_lines) + stderr_line_count += self.send_logs(task_id, printed_lines, session=session) stop_reason = TaskStopReason.exception status = -1 @@ -1472,10 +1630,10 @@ class Worker(ServiceCommandSection): # Send last lines printed_lines, stdout_pos_count = _print_file(stdout_path, stdout_pos_count) - stdout_line_count += self.send_logs(task_id, printed_lines) + stdout_line_count += self.send_logs(task_id, printed_lines, session=session) if stderr_path: printed_lines, stderr_pos_count = _print_file(stderr_path, stderr_pos_count) - stderr_line_count += self.send_logs(task_id, printed_lines) + stderr_line_count += self.send_logs(task_id, printed_lines, session=session) return status, stop_reason @@ -1492,7 +1650,7 @@ class Worker(ServiceCommandSection): return filter_lines, service_mode_internal_agent_started, None - def send_logs(self, task_id, lines, level="DEBUG"): + def send_logs(self, task_id, lines, level="DEBUG", session=None): """ Send output lines as log events to backend :param task_id: ID of task to send logs for @@ -1514,7 +1672,7 @@ class Worker(ServiceCommandSection): events_service = self.get_service(Events) try: events_service.send_log_events( - self.worker_id, task_id=task_id, lines=lines, level=level + self.worker_id, task_id=task_id, lines=lines, level=level, session=session ) return len(lines) except Exception as e: @@ -1656,7 +1814,7 @@ class Worker(ServiceCommandSection): return 0 - def _build_docker(self, docker, target, task_id, entry_point=None, standalone_mode=True): + def _build_docker(self, docker, target, task_id, entry_point=None): self.temp_config_path = safe_mkstemp( suffix=".cfg", prefix=".clearml_agent.", text=True, name_only=True @@ -1845,6 +2003,7 @@ class Worker(ServiceCommandSection): task=current_task.id, status_reason="worker started execution", status_message=self._task_status_change_message, + force=True, ) if not disable_monitoring: @@ -1872,8 +2031,11 @@ class Worker(ServiceCommandSection): python_ver = None venv_folder, requirements_manager, is_cached = self.install_virtualenv( - standalone_mode=standalone_mode, requested_python_version=python_ver, - execution_info=execution, cached_requirements=requirements) + standalone_mode=standalone_mode, + requested_python_version=python_ver, + execution_info=execution, + cached_requirements=requirements, + ) if not is_cached and not standalone_mode: if self._default_pip: @@ -2002,6 +2164,13 @@ class Worker(ServiceCommandSection): else: use_execv = is_linux_platform() and not isinstance(self.package_api, (PoetryAPI, CondaAPI)) + self._session.api_client.tasks.started( + task=current_task.id, + status_reason="worker starting task execution", + status_message=self._task_status_change_message, + force=True, + ) + print("Starting Task Execution:\n".format(current_task.id)) exit_code = -1 try: @@ -2075,9 +2244,9 @@ class Worker(ServiceCommandSection): except Exception: return None - def set_docker_variables(self, docker): - temp_config, docker_image_func = self.get_docker_config_cmd(docker) - self.dump_config(self.temp_config_path, config=temp_config) + def set_docker_variables(self, docker, clean_api_credentials=False): + temp_config, docker_image_func = self.get_docker_config_cmd(docker, clean_api_credentials=clean_api_credentials) + self.dump_config(self.temp_config_path, config=temp_config, clean_api_credentials=clean_api_credentials) self.docker_image_func = docker_image_func def get_execution_info(self, current_task): @@ -2176,22 +2345,25 @@ class Worker(ServiceCommandSection): ) return vcs, repo_info - def handle_task_termination(self, task_id, exit_code, stop_reason): + def handle_task_termination(self, task_id, exit_code, stop_reason, session=None): # type: (Text, int, TaskStopReason) -> None + session = session or self._session try: if stop_reason == TaskStopReason.stopped: self.log("Stopping - tasks.stop was called for task") - self.send_logs(task_id, ["Process aborted by user"]) - self._session.api_client.tasks.stopped( - task=task_id, - status_reason="task was stopped by tasks.stop", - status_message=self._task_status_change_message, + self.send_logs(task_id, ["Process aborted by user"], session=session) + session.send_api( + tasks_api.StoppedRequest( + task=task_id, + status_reason="task was stopped by tasks.stop", + status_message=self._task_status_change_message, + ) ) elif stop_reason == TaskStopReason.status_changed: try: task_status = get_task( - self._session, task_id, only_fields=["status"] + session, task_id, only_fields=["status"] ).status self.log( "Task status changed unexpectedly (status: {}), " @@ -2208,7 +2380,11 @@ class Worker(ServiceCommandSection): self.log("Task was reset unexpectedly") elif stop_reason == TaskStopReason.no_stop: - self.handle_task_process_termination(task_id, exit_code) + self.handle_task_process_termination(task_id, exit_code, session=session) + + elif stop_reason == TaskStopReason.not_found: + self.log("Task not found") + else: self.log( "INTERNAL ERROR: unidentified task stop reason: {}".format( @@ -2225,33 +2401,40 @@ class Worker(ServiceCommandSection): ) self.log_traceback(e) - def handle_task_process_termination(self, task_id, exit_code): + def handle_task_process_termination(self, task_id, exit_code, session=None): # type: (Text, int) -> None + session = session or self._session self.log("Task process terminated") if exit_code == COMMAND_SUCCESS: self.log("Task success: completing") - self.send_logs(task_id, ["Process completed successfully"]) - self._session.api_client.tasks.completed( - task=task_id, - status_reason="worker execution done", - status_message=self._task_status_change_message, + self.send_logs(task_id, ["Process completed successfully"], session=session) + session.send_api( + tasks_api.CompletedRequest( + task=task_id, + status_reason="worker execution done", + status_message=self._task_status_change_message, + ) ) elif exit_code in (ExitStatus.interrupted, 256+ExitStatus.interrupted): self.log("Task interrupted: stopping") - self.send_logs(task_id, ["Process terminated by user"]) - self._session.api_client.tasks.stopped( - task=task_id, - status_reason="user abort", - status_message=self._task_status_change_message, + self.send_logs(task_id, ["Process terminated by user"], session=session) + session.send_api( + tasks_api.StoppedRequest( + task=task_id, + status_reason="user abort", + status_message=self._task_status_change_message, + ) ) else: self.log("Task failure: setting status to 'failed'") - self.send_logs(task_id, ["Process failed, exit code {}".format(exit_code)]) - self._session.api_client.tasks.failed( - task=task_id, - status_reason="worker execution exit code {}".format(exit_code), - status_message=self._task_status_change_message, + self.send_logs(task_id, ["Process failed, exit code {}".format(exit_code)], session=session) + session.send_api( + tasks_api.FailedRequest( + task=task_id, + status_reason="worker execution exit code {}".format(exit_code), + status_message=self._task_status_change_message, + ) ) def freeze_task_environment(self, task_id=None, requirements_manager=None, @@ -2476,6 +2659,24 @@ class Worker(ServiceCommandSection): if self._session.debug_mode: print("clearml_agent: {}".format(message)) + @staticmethod + def _get_python_version_suffix(executable_path): + # type: (Text) -> Text + """ + Platform independent function that returns version substring from the python executable + """ + + def rreplace(s, old, new, count): + return (s[::-1].replace(old[::-1], new[::-1], count))[::-1] + + if is_windows_platform(): + return rreplace( + rreplace(executable_path.split(os.path.sep)[-1].lower(), 'python', '', 1), + '.exe', '', 1 + ) + + return rreplace(executable_path.split(os.path.sep)[-1], 'python', '', 1) + def find_python_executable_for_version(self, config_version): # type: (Text) -> Tuple[Text, Text, Text] """ @@ -2501,27 +2702,12 @@ class Worker(ServiceCommandSection): for i in range(len(it) + 1): yield it[:i] - def rreplace(s, old, new, count): - return (s[::-1].replace(old[::-1], new[::-1], count))[::-1] - - if is_windows_platform(): - python_executables = [ - (version, config_version if os.path.sep in config_version else 'python{}'.format(version)) - for version in map( - ".".join, reversed(list(suffixes( - rreplace( - rreplace(config_version.split(os.path.sep)[-1].lower(), 'python', '', 1), - '.exe', '', 1).split(".")))) - ) - ] - else: - python_executables = [ - (version, config_version if os.path.sep in config_version else 'python{}'.format(version)) - for version in map( - ".".join, reversed(list(suffixes( - rreplace(config_version.split(os.path.sep)[-1], 'python', '', 1).split(".")))) - ) - ] + python_executables = [ + (version, config_version if os.path.sep in config_version else 'python{}'.format(version)) + for version in map( + ".".join, reversed(list(suffixes(self._get_python_version_suffix(config_version).split(".")))) + ) + ] for version, executable in python_executables: self.log.debug("Searching for {}".format(executable)) @@ -2548,13 +2734,28 @@ class Worker(ServiceCommandSection): ) def install_virtualenv( - self, venv_dir=None, requested_python_version=None, standalone_mode=False, - execution_info=None, cached_requirements=None): + self, + venv_dir=None, + requested_python_version=None, + standalone_mode=False, + execution_info=None, + cached_requirements=None, + ): # type: (str, str, bool, ExecutionInfo, dict) -> Tuple[Path, RequirementsManager, bool] """ Install a new python virtual environment, removing the old one if exists + If CLEARML_SKIP_PIP_VENV_INSTALL is set then an emtpy virtual env folder is created + and package manager is configured to work with the global python interpreter (the interpreter + path itself can be passed in this variable) :return: virtualenv directory, requirements manager to use with task, True if there is a cached venv entry """ + skip_pip_venv_install = ENV_AGENT_SKIP_PIP_VENV_INSTALL.get() if self._session.feature_set != "basic" else None + if skip_pip_venv_install: + try: + skip_pip_venv_install = bool(strtobool(skip_pip_venv_install)) + except ValueError: + pass + if self._session.config.get("agent.ignore_requested_python_version", None): requested_python_version = '' @@ -2568,18 +2769,34 @@ class Worker(ServiceCommandSection): requested_python_version[max(requested_python_version.find('python'), 0):].replace('python', '') executable_name = 'python' else: - try: - executable_version, executable_version_suffix, executable_name = \ - self.find_python_executable_for_version(requested_python_version) - except Exception: - def_python_version = Text(self._session.config.get("agent.python_binary", None)) or \ - Text(self._session.config.get("agent.default_python", None)) - print('Warning: could not locate requested Python version {}, reverting to version {}'.format( - requested_python_version, def_python_version)) - executable_version, executable_version_suffix, executable_name = \ - self.find_python_executable_for_version(def_python_version) - self._session.config.put("agent.default_python", executable_version) - self._session.config.put("agent.python_binary", executable_name) + override_interpreter_path = None + if skip_pip_venv_install and isinstance(skip_pip_venv_install, str): + if find_executable(skip_pip_venv_install): + override_interpreter_path = skip_pip_venv_install + else: + print( + "Warning: interpreter {} could not be found. Reverting to the default interpreter resolution".format( + skip_pip_venv_install + ) + ) + if override_interpreter_path: + print("Python interpreter {} is set from environment var".format(override_interpreter_path)) + executable_name = override_interpreter_path + executable_version_suffix = self._get_python_version_suffix(executable_name) + else: + try: + executable_version, executable_version_suffix, executable_name = \ + self.find_python_executable_for_version(requested_python_version) + except Exception: + def_python_version = Text(self._session.config.get("agent.python_binary", None)) or \ + Text(self._session.config.get("agent.default_python", None)) + print('Warning: could not locate requested Python version {}, reverting to version {}'.format( + requested_python_version, def_python_version)) + executable_version, executable_version_suffix, executable_name = \ + self.find_python_executable_for_version(def_python_version) + + self._session.config.put("agent.default_python", executable_version) + self._session.config.put("agent.python_binary", executable_name) venv_dir = Path(venv_dir) if venv_dir else \ Path(self._session.config["agent.venvs_dir"], executable_version_suffix) @@ -2613,50 +2830,53 @@ class Worker(ServiceCommandSection): call_package_manager_create = False - if not self.is_conda and standalone_mode: - # pip with standalone mode - get_pip = partial(VirtualenvPip, **package_manager_params) - self.package_api = get_pip() - self.global_package_api = SystemPip(**global_package_manager_params) - elif not self.is_conda: - if self.is_venv_update: - self.package_api = VenvUpdateAPI( - url=self._session.config["agent.venv_update.url"] or DEFAULT_VENV_UPDATE_URL, - **package_manager_params - ) + if not self.is_conda: + if standalone_mode or skip_pip_venv_install: + # pip with standalone mode + self.global_package_api = SystemPip(**global_package_manager_params) + if standalone_mode: + self.package_api = VirtualenvPip(**package_manager_params) + else: + self.package_api = self.global_package_api else: - self.package_api = VirtualenvPip(**package_manager_params) - if first_time: + if self.is_venv_update: + self.package_api = VenvUpdateAPI( + url=self._session.config["agent.venv_update.url"] or DEFAULT_VENV_UPDATE_URL, + **package_manager_params + ) + else: + self.package_api = VirtualenvPip(**package_manager_params) + if first_time: + self.package_api.remove() + call_package_manager_create = True + self.global_package_api = SystemPip(**global_package_manager_params) + else: + if standalone_mode: + # conda with standalone mode + get_conda = partial(CondaAPI, **package_manager_params) + self.package_api = get_conda() + else: + get_conda = partial(CondaAPI, **package_manager_params) + self.package_api = get_conda() + # no support for reusing Conda environments self.package_api.remove() call_package_manager_create = True - self.global_package_api = SystemPip(**global_package_manager_params) - elif standalone_mode: - # conda with standalone mode - get_conda = partial(CondaAPI, **package_manager_params) - self.package_api = get_conda() - else: - get_conda = partial(CondaAPI, **package_manager_params) - self.package_api = get_conda() - # no support for reusing Conda environments - self.package_api.remove() - call_package_manager_create = True - - if venv_dir.exists(): - timestamp = datetime.utcnow().strftime("%Y-%m-%d-%H-%M-%S") - new_venv_folder = venv_dir.with_name( - "{}_{}".format(venv_dir.name, timestamp) - ) - self.warning( - 'Path "{}" exists, using "{}" instead'.format( - venv_dir, new_venv_folder + if venv_dir.exists(): + timestamp = datetime.utcnow().strftime("%Y-%m-%d-%H-%M-%S") + new_venv_folder = venv_dir.with_name( + "{}_{}".format(venv_dir.name, timestamp) ) - ) - venv_dir = new_venv_folder - self.package_api = get_conda(path=venv_dir) + self.warning( + 'Path "{}" exists, using "{}" instead'.format( + venv_dir, new_venv_folder + ) + ) + venv_dir = new_venv_folder + self.package_api = get_conda(path=venv_dir) # check if we have a cached folder - if cached_requirements and self.package_api.get_cached_venv( + if cached_requirements and not skip_pip_venv_install and self.package_api.get_cached_venv( requirements=cached_requirements, docker_cmd=execution_info.docker_cmd if execution_info else None, python_version=package_manager_params['python'], @@ -2667,8 +2887,12 @@ class Worker(ServiceCommandSection): return venv_dir, requirements_manager, True # create the initial venv - if call_package_manager_create: - self.package_api.create() + if not skip_pip_venv_install: + if call_package_manager_create: + self.package_api.create() + else: + if not venv_dir.exists(): + venv_dir.mkdir(parents=True, exist_ok=True) return venv_dir, requirements_manager, False @@ -2688,7 +2912,7 @@ class Worker(ServiceCommandSection): requirements_manager.translator.enabled = False print(requirements_manager.replace(contents)) - def get_docker_config_cmd(self, docker_args): + def get_docker_config_cmd(self, docker_args, clean_api_credentials=False): docker_image = str(ENV_DOCKER_IMAGE.get() or self._session.config.get("agent.default_docker.image", "nvidia/cuda")) \ if not docker_args else docker_args[0] @@ -2745,9 +2969,11 @@ class Worker(ServiceCommandSection): self._host_ssh_cache = mkdtemp(prefix='clearml_agent.ssh.') self._temp_cleanup_list.append(self._host_ssh_cache) - return temp_config, partial(self._get_docker_config_cmd, temp_config=temp_config) + return temp_config, partial( + self._get_docker_config_cmd, temp_config=temp_config, clean_api_credentials=clean_api_credentials + ) - def _get_docker_config_cmd(self, temp_config, **kwargs): + def _get_docker_config_cmd(self, temp_config, clean_api_credentials=False, **kwargs): host_cache = Path(os.path.expandvars( self._session.config["sdk.storage.cache.default_base_dir"])).expanduser().as_posix() host_pip_dl = Path(os.path.expandvars( @@ -2820,11 +3046,12 @@ class Worker(ServiceCommandSection): mounted_venvs_cache = temp_config.get("agent.venvs_cache.path", "") # Make sure we have created the configuration file for the executor - if not self.dump_config(self.temp_config_path, config=temp_config): + if not self.dump_config(self.temp_config_path, config=temp_config, clean_api_credentials=clean_api_credentials): self.log.warning('Could not update docker configuration file {}'.format(self.temp_config_path)) docker_cmd = dict( worker_id=self.worker_id, + parent_worker_id=self.worker_id, # docker_image=docker_image, # docker_arguments=docker_arguments, extra_docker_arguments=self._extra_docker_arguments, @@ -2848,9 +3075,32 @@ class Worker(ServiceCommandSection): docker_cmd.update(kwargs) return self._get_docker_cmd(**docker_cmd) - @staticmethod + def _get_child_agents_count_for_worker(self): + """Get the amount of running child agents. In case of any error return 0""" + parent_worker_label = self._parent_worker_label.format(self.worker_id) + cmd = [ + 'docker', + 'ps', + '--filter', + 'label={}'.format(parent_worker_label), + '--format', + # get some fields for debugging + '{"ID":"{{ .ID }}", "Image": "{{ .Image }}", "Names":"{{ .Names }}", "Labels":"{{ .Labels }}"}' + ] + try: + output = Argv(*cmd).get_output( + stderr=subprocess.STDOUT + ) + except subprocess.CalledProcessError as ex: + self.log.warning("error getting child agents: %s", ex) + return 0 + + return len(output.splitlines()) if output else 0 + + @classmethod def _get_docker_cmd( - worker_id, + cls, + worker_id, parent_worker_id, docker_image, docker_arguments, python_version, conf_file, @@ -2867,6 +3117,8 @@ class Worker(ServiceCommandSection): preprocess_bash_script=None, install_opencv_libs=None, docker_bash_setup_script=None, + auth_token=None, + worker_tags=None, ): docker = 'docker' @@ -2900,6 +3152,10 @@ class Worker(ServiceCommandSection): if isinstance(extra_docker_arguments, six.string_types) else extra_docker_arguments base_cmd += [str(a) for a in extra_docker_arguments if a] + # set docker labels + base_cmd += ['-l', cls._worker_label.format(worker_id)] + base_cmd += ['-l', cls._parent_worker_label.format(parent_worker_id)] + # check if running inside a kubernetes if ENV_DOCKER_HOST_MOUNT.get() or (os.environ.get('KUBERNETES_SERVICE_HOST') and os.environ.get('KUBERNETES_PORT')): @@ -2954,6 +3210,17 @@ class Worker(ServiceCommandSection): # update the docker image, so the system knows where it runs base_cmd += ['-e', 'CLEARML_DOCKER_IMAGE={} {}'.format(docker_image, ' '.join(docker_arguments or [])).strip()] + if auth_token: + # if auth token is passed then put it in the env var + base_cmd += ['-e', '{}={}'.format(ENV_AGENT_AUTH_TOKEN.vars[0], auth_token)] + + if worker_tags: + base_cmd += ['-e', '{}={}'.format(ENV_WORKER_TAGS.vars[0], " ".join(shlex.quote(t) for t in worker_tags))] + + skip_pip_venv_install = ENV_AGENT_SKIP_PIP_VENV_INSTALL.get() + if skip_pip_venv_install: + base_cmd += ['-e', '{}={}'.format(ENV_AGENT_SKIP_PIP_VENV_INSTALL.vars[0], skip_pip_venv_install)] + # if we are running a RC version, install the same version in the docker # because the default latest, will be a release version (not RC) specify_version = '' @@ -2965,11 +3232,14 @@ class Worker(ServiceCommandSection): except: pass + agent_install_bash_script = [] if os.environ.get('FORCE_LOCAL_CLEARML_AGENT_WHEEL'): local_wheel = os.path.expanduser(os.environ.get('FORCE_LOCAL_CLEARML_AGENT_WHEEL')) docker_wheel = str(Path('/tmp') / Path(local_wheel).name) base_cmd += ['-v', local_wheel + ':' + docker_wheel] clearml_agent_wheel = '\"{}\"'.format(docker_wheel) + elif os.environ.get('FORCE_CLEARML_AGENT_REPO'): + clearml_agent_wheel = os.environ.get('FORCE_CLEARML_AGENT_REPO') else: # clearml-agent{specify_version} clearml_agent_wheel = 'clearml-agent{specify_version}'.format(specify_version=specify_version) @@ -2996,6 +3266,9 @@ class Worker(ServiceCommandSection): if preprocess_bash_script: bash_script = preprocess_bash_script + bash_script + if agent_install_bash_script: + bash_script += agent_install_bash_script + docker_bash_script = " ; ".join([line for line in bash_script if line]) \ if not isinstance(bash_script, str) else bash_script @@ -3209,7 +3482,8 @@ class Worker(ServiceCommandSection): ENV_AGENT_GIT_PASS.vars, ENV_AGENT_SECRET_KEY.vars, ENV_AWS_SECRET_KEY.vars, - ENV_AZURE_ACCOUNT_KEY.vars + ENV_AZURE_ACCOUNT_KEY.vars, + ENV_AGENT_AUTH_TOKEN.vars, ) result = docker_command[:] diff --git a/clearml_agent/helper/package/base.py b/clearml_agent/helper/package/base.py index 32d7e46..fc08b14 100644 --- a/clearml_agent/helper/package/base.py +++ b/clearml_agent/helper/package/base.py @@ -3,11 +3,13 @@ from __future__ import unicode_literals import abc from collections import OrderedDict from contextlib import contextmanager -from typing import Text, Iterable, Union, Optional, Dict, List -from pathlib2 import Path from hashlib import md5 +from typing import Text, Iterable, Union, Optional, Dict, List import six +from pathlib2 import Path + +from clearml_agent.definitions import ENV_VENV_CACHE_PATH from clearml_agent.helper.base import mkstemp, safe_remove_file, join_lines, select_for_platform from clearml_agent.helper.console import ensure_binary from clearml_agent.helper.os.folder_cache import FolderCache @@ -252,7 +254,7 @@ class PackageManager(object): def _get_cache_manager(self): if not self._cache_manager: - cache_folder = self.session.config.get(self._config_cache_folder, None) + cache_folder = ENV_VENV_CACHE_PATH.get() or self.session.config.get(self._config_cache_folder, None) if not cache_folder: return None diff --git a/clearml_agent/helper/package/pip_api/system.py b/clearml_agent/helper/package/pip_api/system.py index 67d27b0..99ee6a8 100644 --- a/clearml_agent/helper/package/pip_api/system.py +++ b/clearml_agent/helper/package/pip_api/system.py @@ -1,6 +1,7 @@ import os import sys from itertools import chain +from pathlib import Path from typing import Text, Optional from clearml_agent.definitions import PIP_EXTRA_INDICES, PROGRAM_NAME @@ -19,7 +20,7 @@ class SystemPip(PackageManager): Program interface to the system pip. """ super(SystemPip, self).__init__() - self._bin = interpreter or sys.executable + self._bin = Path(interpreter or sys.executable) self.session = session @property diff --git a/clearml_agent/helper/resource_monitor.py b/clearml_agent/helper/resource_monitor.py index 25b18ff..3d8f34f 100644 --- a/clearml_agent/helper/resource_monitor.py +++ b/clearml_agent/helper/resource_monitor.py @@ -2,6 +2,7 @@ from __future__ import unicode_literals, division import logging import os +import shlex from collections import deque from itertools import starmap from threading import Thread, Event @@ -12,6 +13,7 @@ import attr import psutil from pathlib2 import Path from clearml_agent.session import Session +from clearml_agent.definitions import ENV_WORKER_TAGS try: from .gpu import gpustat @@ -59,6 +61,7 @@ class ResourceMonitor(object): sample_frequency_per_sec=2.0, report_frequency_sec=30.0, first_report_sec=None, + worker_tags=None, ): self.session = session self.queue = deque(maxlen=1) @@ -76,6 +79,9 @@ class ResourceMonitor(object): self._gpustat_fail = 0 self._gpustat = gpustat self._active_gpus = None + if not worker_tags and ENV_WORKER_TAGS.get(): + worker_tags = shlex.split(ENV_WORKER_TAGS.get()) + self._worker_tags = worker_tags if os.environ.get('NVIDIA_VISIBLE_DEVICES') == 'none': # NVIDIA_VISIBLE_DEVICES set to none, marks cpu_only flag # active_gpus == False means no GPU reporting @@ -118,6 +124,7 @@ class ResourceMonitor(object): machine_stats=stats, timestamp=(int(time()) * 1000), worker=self._worker_id, + tags=self._worker_tags, **self.get_report().to_dict() ) log.debug("sending report: %s", report) diff --git a/clearml_agent/interface/worker.py b/clearml_agent/interface/worker.py index ad5da38..684b302 100644 --- a/clearml_agent/interface/worker.py +++ b/clearml_agent/interface/worker.py @@ -83,6 +83,12 @@ DAEMON_ARGS = dict({ 'type': int, 'default': None, }, + '--child-report-tags': { + 'help': 'List of tags to send with the status reports from the worker that runs a task', + 'nargs': '+', + 'type': str, + 'default': None, + }, '--create-queue': { 'help': 'Create requested queue if it does not exist already.', 'action': 'store_true', @@ -121,6 +127,10 @@ DAEMON_ARGS = dict({ 'help': 'Print the worker\'s schedule (uptime properties, server\'s runtime properties and listening queues)', 'action': 'store_true', }, + '--use-owner-token': { + 'help': 'Generate and use task owner token for the execution of the task', + 'action': 'store_true', + } }, **WORKER_ARGS) COMMANDS = {