diff --git a/trains_agent/commands/worker.py b/trains_agent/commands/worker.py index b382c0f..3f0345e 100644 --- a/trains_agent/commands/worker.py +++ b/trains_agent/commands/worker.py @@ -12,7 +12,7 @@ import sys import shutil import traceback from collections import defaultdict -from copy import copy, deepcopy +from copy import deepcopy from datetime import datetime from distutils.spawn import find_executable from functools import partial @@ -30,14 +30,11 @@ from pathlib2 import Path from pyhocon import ConfigTree, ConfigFactory from six.moves.urllib.parse import quote -from trains_agent.backend_api.services import queues from trains_agent.backend_config.defs import UptimeConf from trains_agent.helper.check_update import start_check_update_daemon from trains_agent.commands.base import resolve_names, ServiceCommandSection from trains_agent.definitions import ( - WORKER_ALREADY_REGISTERED, ENVIRONMENT_SDK_PARAMS, - INVALID_WORKER_ID, PROGRAM_NAME, DEFAULT_VENV_UPDATE_URL, ENV_TASK_EXECUTE_AS_USER, @@ -284,7 +281,7 @@ class TaskStopSignal(object): ) return TaskStopReason.stopped - if status in self.unexpected_statuses: ## and "worker" not in message: + if status in self.unexpected_statuses: # ## and "worker" not in message: self.command.log("unexpected status change, task will terminate") return TaskStopReason.status_changed @@ -396,7 +393,7 @@ class Worker(ServiceCommandSection): self.temp_config_path = None self.queues = () self.venv_folder = None # type: Optional[Text] - self.package_api = None # type: PackageManager + self.package_api = None # type: Optional[PackageManager] self.global_package_api = None self.is_venv_update = self._session.config.agent.venv_update.enabled @@ -530,7 +527,7 @@ class Worker(ServiceCommandSection): full_docker_cmd = self.docker_image_func(docker_image=docker_image, docker_arguments=docker_arguments) try: self._session.send_api( - tasks_api.EditRequest(task_id, force=True, execution=dict( + tasks_api.EditRequest(task_id, force=True, execution=dict( # noqa docker_cmd=' '.join([docker_image] + docker_arguments) if docker_arguments else docker_image))) except Exception: pass @@ -1005,7 +1002,7 @@ class Worker(ServiceCommandSection): stop_signal=None, # type: Optional[TaskStopSignal] **kwargs # type: Any ): - # type: (...) -> Tuple[Optional[int], TaskStopReason] + # type: (...) -> Tuple[Optional[int], Optional[TaskStopReason]] def _print_file(file_path, prev_pos=0): with open(file_path, "rb") as f: f.seek(prev_pos) @@ -1189,7 +1186,8 @@ class Worker(ServiceCommandSection): success = False if not success: - raise ValueError("Failed applying git diff:\n{}\n\nERROR! Failed applying git diff, see diff above.".format(diff)) + raise ValueError("Failed applying git diff:\n{}\n\n" + "ERROR! Failed applying git diff, see diff above.".format(diff)) @resolve_names def build( @@ -1389,7 +1387,8 @@ class Worker(ServiceCommandSection): print("Cloning task id={}".format(task_id)) current_task = self._session.api_client.tasks.get_by_id( self._session.send_api( - tasks_api.CloneRequest(task=current_task.id, new_task_name='Clone of {}'.format(current_task.name)) + tasks_api.CloneRequest(task=current_task.id, + new_task_name='Clone of {}'.format(current_task.name)) ).id ) print("Task cloned, new task id={}".format(current_task.id)) @@ -1867,7 +1866,8 @@ class Worker(ServiceCommandSection): package_api.out_of_scope_install_package('Cython') cached_requirements_failed = False - if cached_requirements and ('pip' in cached_requirements or 'conda' in cached_requirements): + if cached_requirements and (cached_requirements.get('pip') is not None or + cached_requirements.get('conda') is not None): self.log("Found task requirements section, trying to install") try: package_api.load_requirements(cached_requirements) @@ -2185,9 +2185,12 @@ class Worker(ServiceCommandSection): mounted_pip_dl_dir = '/root/.trains/pip-download-cache' mounted_vcs_cache = '/root/.trains/vcs-cache' mounted_venv_dir = '/root/.trains/venvs-builds' - host_cache = Path(os.path.expandvars(self._session.config["sdk.storage.cache.default_base_dir"])).expanduser().as_posix() - host_pip_dl = Path(os.path.expandvars(self._session.config["agent.pip_download_cache.path"])).expanduser().as_posix() - host_vcs_cache = Path(os.path.expandvars(self._session.config["agent.vcs_cache.path"])).expanduser().as_posix() + host_cache = Path(os.path.expandvars( + self._session.config["sdk.storage.cache.default_base_dir"])).expanduser().as_posix() + host_pip_dl = Path(os.path.expandvars( + self._session.config["agent.pip_download_cache.path"])).expanduser().as_posix() + host_vcs_cache = Path(os.path.expandvars( + self._session.config["agent.vcs_cache.path"])).expanduser().as_posix() temp_config.put("sdk.storage.cache.default_base_dir", mounted_cache_dir) temp_config.put("agent.pip_download_cache.path", mounted_pip_dl_dir) temp_config.put("agent.vcs_cache.path", mounted_vcs_cache) @@ -2198,8 +2201,10 @@ class Worker(ServiceCommandSection): temp_config.put("agent.cuda_version", "") temp_config.put("agent.cudnn_version", "") temp_config.put("agent.venvs_dir", mounted_venv_dir) - temp_config.put("agent.git_user", (ENV_AGENT_GIT_USER.get() or self._session.config.get("agent.git_user", None))) - temp_config.put("agent.git_pass", (ENV_AGENT_GIT_PASS.get() or self._session.config.get("agent.git_pass", None))) + temp_config.put("agent.git_user", (ENV_AGENT_GIT_USER.get() or + self._session.config.get("agent.git_user", None))) + temp_config.put("agent.git_pass", (ENV_AGENT_GIT_PASS.get() or + self._session.config.get("agent.git_pass", None))) host_apt_cache = Path(os.path.expandvars(self._session.config.get( "agent.docker_apt_cache", '~/.trains/apt-cache'))).expanduser().as_posix() @@ -2317,7 +2322,8 @@ class Worker(ServiceCommandSection): base_cmd += [str(a) for a in extra_docker_arguments if a] # check if running inside a kubernetes - if ENV_DOCKER_HOST_MOUNT.get() or (os.environ.get('KUBERNETES_SERVICE_HOST') and os.environ.get('KUBERNETES_PORT')): + if ENV_DOCKER_HOST_MOUNT.get() or (os.environ.get('KUBERNETES_SERVICE_HOST') and + os.environ.get('KUBERNETES_PORT')): # map network to sibling docker, unless we have other network argument if not any(a.strip().startswith('--network') for a in base_cmd): try: @@ -2465,7 +2471,8 @@ class Worker(ServiceCommandSection): os.setuid(self.uid) # create a home folder for our user - trains_agent_home = self._run_as_user_home + '{}'.format('.'+str(Singleton.get_slot()) if Singleton.get_slot() else '') + trains_agent_home = self._run_as_user_home + '{}'.format( + '.'+str(Singleton.get_slot()) if Singleton.get_slot() else '') try: home_folder = self._run_as_user_home rm_tree(home_folder)