diff --git a/trains_agent/commands/worker.py b/trains_agent/commands/worker.py index a83d23b..ff54c65 100644 --- a/trains_agent/commands/worker.py +++ b/trains_agent/commands/worker.py @@ -17,7 +17,7 @@ from datetime import datetime from distutils.spawn import find_executable from functools import partial from itertools import chain -from tempfile import mkdtemp, gettempdir +from tempfile import mkdtemp, NamedTemporaryFile from time import sleep, time from typing import Text, Optional, Any, Tuple @@ -66,7 +66,7 @@ from trains_agent.helper.base import ( get_python_path, is_linux_platform, rm_file, - add_python_path) + add_python_path, safe_remove_tree, ) from trains_agent.helper.console import ensure_text, print_text, decode_binary_lines from trains_agent.helper.os.daemonize import daemonize_process from trains_agent.helper.package.base import PackageManager @@ -89,7 +89,7 @@ from trains_agent.helper.process import ( get_bash_output, shutdown_docker_process, get_docker_id, - commit_docker + commit_docker, terminate_process, ) from trains_agent.helper.package.cython_req import CythonRequirement from trains_agent.helper.repo import clone_repository_cached, RepoInfo, VCS @@ -319,6 +319,7 @@ class Worker(ServiceCommandSection): _run_as_user_home = '/trains_agent_home' _docker_fixed_user_cache = '/trains_agent_cache' + _temp_cleanup_list = [] @property def service(self): @@ -332,6 +333,8 @@ class Worker(ServiceCommandSection): @staticmethod def register_signal_handler(): def handler(*_): + for f in Worker._temp_cleanup_list + [Singleton.get_pid_file()]: + safe_remove_tree(f) raise Sigterm() signal.signal(signal.SIGTERM, handler) @@ -388,6 +391,7 @@ class Worker(ServiceCommandSection): self._standalone_mode = None self._services_mode = None self._force_current_version = None + self._redirected_stdout_file_no = None @classmethod def _verify_command_states(cls, kwargs): @@ -569,12 +573,12 @@ class Worker(ServiceCommandSection): else: self.handle_task_termination(task_id, status, stop_signal_status) # remove temp files after we sent everything to the backend - safe_remove_file(temp_stdout_name) - safe_remove_file(temp_stderr_name) if self.docker_image_func: shutdown_docker_process(docker_cmd_contains='--id {}\'\"'.format(task_id)) + safe_remove_file(temp_stdout_name) + safe_remove_file(temp_stderr_name) - def run_tasks_loop(self, queues, worker_params): + def run_tasks_loop(self, queues, worker_params, priority_order=True): """ :summary: Pull and run tasks from queues. :description: 1. Go through ``queues`` by order. @@ -584,6 +588,9 @@ class Worker(ServiceCommandSection): :type queues: list of ``Text`` :param worker_params: Worker command line arguments :type worker_params: ``trains_agent.helper.process.WorkerParams`` + :param priority_order: If True pull order in priority manner. always from the first + If False, pull from each queue once in a round robin manner + :type priority_order: bool """ if not self._daemon_foreground: @@ -614,6 +621,16 @@ class Worker(ServiceCommandSection): print("No tasks in queue {}".format(queue)) continue + # clear output log if we start a new Task + if not worker_params.debug and self._redirected_stdout_file_no is not None and \ + self._redirected_stdout_file_no > 2: + # noinspection PyBroadException + try: + os.lseek(self._redirected_stdout_file_no, 0, 0) + os.ftruncate(self._redirected_stdout_file_no, 0) + except: + pass + self.send_logs( task_id=task_id, lines=["task {} pulled from {} by worker {}\n".format(task_id, queue, self.worker_id)], @@ -622,7 +639,11 @@ class Worker(ServiceCommandSection): self.report_monitor(ResourceMonitor.StatusReport(queues=queues, queue=queue, task=task_id)) self.run_one_task(queue, task_id, worker_params) self.report_monitor(ResourceMonitor.StatusReport(queues=self.queues)) - break + + # if we are using priority start pulling from the first always, + # if we are doing round robin, pull from the next one + if priority_order: + break else: # sleep and retry polling if self._daemon_foreground or worker_params.debug: @@ -674,7 +695,7 @@ class Worker(ServiceCommandSection): self._session.print_configuration() - def daemon(self, queues, log_level, foreground=False, docker=False, detached=False, **kwargs): + def daemon(self, queues, log_level, foreground=False, docker=False, detached=False, order_fairness=False, **kwargs): # if we do not need to create queues, make sure they are valid # match previous behaviour when we validated queue names before everything else queues = self._resolve_queue_names(queues, create_if_missing=kwargs.get('create_queue', False)) @@ -686,6 +707,10 @@ class Worker(ServiceCommandSection): kwargs = self._verify_command_states(kwargs) docker = docker or kwargs.get('docker') + if kwargs.get('stop', False): + self._kill_daemon() + return + # make sure we only have a single instance, # also make sure we set worker_id properly and cache folders self._singleton() @@ -711,9 +736,8 @@ class Worker(ServiceCommandSection): self._register(queues) # create temp config file with current configuration - self.temp_config_path = safe_mkstemp( - suffix=".cfg", prefix=".trains_agent.", text=True, name_only=True - ) + self.temp_config_path = NamedTemporaryFile( + suffix=".cfg", prefix=".trains_agent.", mode='w+t').name # print docker image if docker is not False and docker is not None: @@ -753,6 +777,9 @@ class Worker(ServiceCommandSection): ) ) + if not self._session.debug_mode: + self._temp_cleanup_list.append(name) + if not detached: # redirect std out/err to new file sys.stdout = sys.stderr = out_file @@ -760,6 +787,7 @@ class Worker(ServiceCommandSection): # in detached mode # fully detach stdin.stdout/stderr and leave main process, running in the background daemonize_process(out_file.fileno()) + self._redirected_stdout_file_no = out_file.fileno() # make sure we update the singleton lock file to the new pid Singleton.update_pid_file() # reprint headers to std file (we are now inside the daemon process) @@ -779,6 +807,7 @@ class Worker(ServiceCommandSection): debug=self._session.debug_mode, trace=self._session.trace, ), + priority_order=not order_fairness, ) except Exception: tb = six.text_type(traceback.format_exc()) @@ -2025,6 +2054,7 @@ class Worker(ServiceCommandSection): host_pip_cache = Path(os.path.expandvars(self._session.config.get( "agent.docker_pip_cache", '~/.trains/pip-cache'))).expanduser().as_posix() host_ssh_cache = mkdtemp(prefix='trains_agent.ssh.') + self._temp_cleanup_list.append(host_ssh_cache) # make sure all folders are valid Path(host_apt_cache).mkdir(parents=True, exist_ok=True) @@ -2322,18 +2352,26 @@ class Worker(ServiceCommandSection): return command, script_dir + def _kill_daemon(self): + worker_id, worker_name = self._generate_worker_id_name() + # Iterate over all running process + for pid, uid, slot, file in Singleton.get_running_pids(): + # wither we have a match for the worker_id or we just pick the first one + if pid >= 0 and ( + (worker_id and uid == worker_id) or + (not worker_id and uid.startswith('{}:'.format(worker_name)))): + # this is us kill it + print('Terminating trains-agent worker_id={} pid={}'.format(uid, pid)) + if not terminate_process(pid, timeout=10): + error('Could not terminate process pid={}'.format(pid)) + return True + print('Could not find a running trains-agent instance with worker_name={} worker_id={}'.format( + worker_name, worker_id)) + return False + def _singleton(self): # ensure singleton - worker_id = self._session.config["agent.worker_id"] - worker_name = self._session.config["agent.worker_name"] - if not worker_id and os.environ.get('NVIDIA_VISIBLE_DEVICES') is not None: - nvidia_visible_devices = os.environ.get('NVIDIA_VISIBLE_DEVICES') - if nvidia_visible_devices and nvidia_visible_devices.lower() != 'none': - worker_id = '{}:gpu{}'.format(worker_name, nvidia_visible_devices) - elif nvidia_visible_devices == '': - pass - else: - worker_name = '{}:cpu'.format(worker_name) + worker_id, worker_name = self._generate_worker_id_name() # if we are running in services mode, we allow double register since # docker-compose will kill instances before they cleanup @@ -2348,6 +2386,19 @@ class Worker(ServiceCommandSection): # update folders based on free slot self._session.create_cache_folders(slot_index=worker_slot) + def _generate_worker_id_name(self): + worker_id = self._session.config["agent.worker_id"] + worker_name = self._session.config["agent.worker_name"] + if not worker_id and os.environ.get('NVIDIA_VISIBLE_DEVICES') is not None: + nvidia_visible_devices = os.environ.get('NVIDIA_VISIBLE_DEVICES') + if nvidia_visible_devices and nvidia_visible_devices.lower() != 'none': + worker_id = '{}:gpu{}'.format(worker_name, nvidia_visible_devices) + elif nvidia_visible_devices == '': + pass + else: + worker_name = '{}:cpu'.format(worker_name) + return worker_id, worker_name + def _resolve_queue_names(self, queues, create_if_missing=False): if not queues: default_queue = self._session.send_api(queues_api.GetDefaultRequest()) diff --git a/trains_agent/helper/singleton.py b/trains_agent/helper/singleton.py index 2963e56..e51f2ac 100644 --- a/trains_agent/helper/singleton.py +++ b/trains_agent/helper/singleton.py @@ -37,6 +37,10 @@ class Singleton(object): except: pass + @classmethod + def get_lock_filename(cls): + return os.path.join(cls._get_temp_folder(), cls._lock_file_name) + @classmethod def register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None, allow_double=False): """ @@ -47,7 +51,7 @@ class Singleton(object): :return: (str worker_id, int slot_number) Return None value on instance already running """ # try to lock file - lock_file = os.path.join(cls._get_temp_folder(), cls._lock_file_name) + lock_file = cls.get_lock_filename() timeout = 0 while os.path.exists(lock_file): if timeout > cls._lock_timeout: @@ -79,30 +83,41 @@ class Singleton(object): return ret @classmethod - def _register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None, allow_double=False): - if cls.worker_id: - return cls.worker_id, cls.instance_slot - # make sure we have a unique name - instance_num = 0 + def get_running_pids(cls): temp_folder = cls._get_temp_folder() files = glob(os.path.join(temp_folder, cls.prefix + cls.sep + '*' + cls.ext)) - slots = {} + pids = [] for file in files: parts = file.split(cls.sep) + # noinspection PyBroadException try: pid = int(parts[1]) + if not psutil.pid_exists(pid): + pid = -1 except Exception: # something is wrong, use non existing pid and delete the file pid = -1 uid, slot = None, None + # noinspection PyBroadException try: with open(file, 'r') as f: uid, slot = str(f.read()).split('\n') slot = int(slot) except Exception: pass + pids.append((pid, uid, slot, file)) + return pids + + @classmethod + def _register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None, allow_double=False): + if cls.worker_id: + return cls.worker_id, cls.instance_slot + # make sure we have a unique name + instance_num = 0 + slots = {} + for pid, uid, slot, file in cls.get_running_pids(): worker = None if api_client and ENV_DOCKER_HOST_MOUNT.get() and uid: try: @@ -111,7 +126,7 @@ class Singleton(object): worker = None # count active instances and delete dead files - if not worker and not psutil.pid_exists(pid): + if not worker and pid < 0: # delete the file try: os.remove(os.path.join(file)) @@ -165,3 +180,9 @@ class Singleton(object): @classmethod def get_slot(cls): return cls.instance_slot or 0 + + @classmethod + def get_pid_file(cls): + if not cls._pid_file: + return None + return cls._pid_file.name diff --git a/trains_agent/interface/worker.py b/trains_agent/interface/worker.py index 7884f95..7a3cd53 100644 --- a/trains_agent/interface/worker.py +++ b/trains_agent/interface/worker.py @@ -68,6 +68,10 @@ DAEMON_ARGS = dict({ 'dest': 'queues', 'type': foreign_object_id('queues'), }, + '--order-fairness': { + 'help': 'Pull from each queue in a round-robin order, instead of priority order.', + 'action': 'store_true', + }, '--standalone-mode': { 'help': 'Do not use any network connects, assume everything is pre-installed', 'action': 'store_true', @@ -85,6 +89,10 @@ DAEMON_ARGS = dict({ 'action': 'store_true', 'aliases': ['-d'], }, + '--stop': { + 'help': 'Stop the running agent (based on the same set of arguments)', + 'action': 'store_true', + }, }, **WORKER_ARGS)