From 9cb71b95268eeef80ac1530d575f0ec7ab746e8c Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Sat, 9 May 2020 19:45:14 +0300 Subject: [PATCH] Add daemon service mode to allow multiple tasks to be launched simultaneously on the same machine (--service-mode) --- trains_agent/commands/worker.py | 95 ++++++++++++++++++++++++++++---- trains_agent/interface/worker.py | 4 ++ 2 files changed, 87 insertions(+), 12 deletions(-) diff --git a/trains_agent/commands/worker.py b/trains_agent/commands/worker.py index b08c414..b93ffa9 100644 --- a/trains_agent/commands/worker.py +++ b/trains_agent/commands/worker.py @@ -306,6 +306,12 @@ class Worker(ServiceCommandSection): # machine status update intervals, seconds _machine_update_interval = 30.0 + # message printed before starting task logging, + # it will be parsed by services_mode, to identify internal docker logging start + _task_logging_start_message = "Running task '{}'" + # last message before passing control to the actual task + _task_logging_pass_control_message = "Running task id [{}]:" + @property def service(self): """ Worker command service endpoint """ @@ -371,8 +377,24 @@ class Worker(ServiceCommandSection): self._docker_force_pull = self._session.config.get("agent.docker_force_pull", False) self._daemon_foreground = None self._standalone_mode = None + self._services_mode = None self._force_current_version = None + @classmethod + def _verify_command_states(cls, kwargs): + """ + Conform and enforce command argument + This is where you can automatically turn on/off switches based on different states. + :param kwargs: + :return: kwargs + """ + if kwargs.get('services_mode'): + kwargs['cpu_only'] = True + kwargs['docker'] = kwargs.get('docker', []) + kwargs['gpus'] = None + + return kwargs + def _get_requirements_manager(self, os_override=None, base_interpreter=None): requirements_manager = RequirementsManager( self._session, base_interpreter=base_interpreter @@ -411,7 +433,9 @@ class Worker(ServiceCommandSection): :param docker: Docker image in which the execution task will run """ # start new process and execute task id - print("Running task '{}'".format(task_id)) + # "Running task '{}'".format(task_id) + print(self._task_logging_start_message.format(task_id)) + # set task status to in_progress so we know it was popped from the queue try: self._session.send_api(tasks_api.StartedRequest(task=task_id, force=True)) @@ -455,7 +479,13 @@ class Worker(ServiceCommandSection): docker_arguments = self._docker_arguments # Update docker command - full_docker_cmd = self.docker_image_func(docker_image=docker_image, docker_arguments=docker_arguments) + if self._services_mode: + # if this is services mode, give the docker a unique worker id, as it will register itself. + full_docker_cmd = self.docker_image_func( + worker_id='{}:service:{}'.format(self.worker_id, task_id), + docker_image=docker_image, docker_arguments=docker_arguments) + else: + full_docker_cmd = self.docker_image_func(docker_image=docker_image, docker_arguments=docker_arguments) try: self._session.send_api( tasks_api.EditRequest(task_id, force=True, execution=dict( @@ -463,8 +493,13 @@ class Worker(ServiceCommandSection): except Exception: pass - full_docker_cmd[-1] = full_docker_cmd[-1] + 'execute --disable-monitoring {} --id {}'.format( - '--standalone-mode' if self._standalone_mode else '', task_id) + # if this is services_mode, change the worker_id to a unique name + # abd use full-monitoring, ot it registers itself as a worker for this specific service. + # notice, the internal agent will monitor itself once the docker is up and running + full_docker_cmd[-1] = full_docker_cmd[-1] + 'execute {} {} --id {}'.format( + '--full-monitoring' if self._services_mode else '--disable-monitoring', + '--standalone-mode' if self._standalone_mode else '', + task_id) cmd = Argv(*full_docker_cmd) print('Running Docker:\n{}\n'.format(str(cmd))) else: @@ -520,12 +555,15 @@ class Worker(ServiceCommandSection): self.handle_user_abort(task_id) status = ExitStatus.interrupted finally: - self.handle_task_termination(task_id, status, stop_signal_status) - # remove temp files after we sent everything to the backend - safe_remove_file(temp_stdout_name) - safe_remove_file(temp_stderr_name) - if self.docker_image_func: - shutdown_docker_process(docker_cmd_contains='--id {}\'\"'.format(task_id)) + if self._services_mode: + print('Service started, docker running in the background') + else: + self.handle_task_termination(task_id, status, stop_signal_status) + # remove temp files after we sent everything to the backend + safe_remove_file(temp_stdout_name) + safe_remove_file(temp_stderr_name) + if self.docker_image_func: + shutdown_docker_process(docker_cmd_contains='--id {}\'\"'.format(task_id)) def run_tasks_loop(self, queues, worker_params): """ @@ -637,6 +675,7 @@ class Worker(ServiceCommandSection): start_check_update_daemon() self._standalone_mode = kwargs.get('standalone_mode', False) + self._services_mode = kwargs.get('services_mode', False) self.check(**kwargs) self.log.debug("starting resource monitor thread") @@ -802,6 +841,7 @@ class Worker(ServiceCommandSection): **kwargs ) + service_mode_internal_agent_started = None while status is None and not stopping: stop_reason = stop_signal.test() if stop_signal else TaskStopSignal.default @@ -825,13 +865,24 @@ class Worker(ServiceCommandSection): stderr.flush() # get diff from previous poll + printed_lines = _print_file(stdout_path, stdout_line_count) + if self._services_mode: + # if the internal agent started, we stop logging, it will take over logging. + # if the internal agent started running the task itself, it will return status==0, + # then we can quit the monitoring loop of this process + printed_lines, service_mode_internal_agent_started, status = self._check_if_internal_agent_started( + printed_lines, service_mode_internal_agent_started, task_id) + if status is not None: + stop_reason = 'Service started' + stdout_line_count += self.send_logs( - task_id, _print_file(stdout_path, stdout_line_count) + task_id, printed_lines ) if stderr_path: stderr_line_count += self.send_logs( task_id, _print_file(stderr_path, stderr_line_count) ) + except subprocess.CalledProcessError as ex: # non zero return code stop_reason = 'Exception occurred' @@ -847,6 +898,10 @@ class Worker(ServiceCommandSection): stop_reason = 'Exception occurred' status = -1 + # if running in services mode, keep the file open + if self._services_mode: + return status, stop_reason + stdout.close() if stderr_path: stderr.close() @@ -862,6 +917,19 @@ class Worker(ServiceCommandSection): return status, stop_reason + def _check_if_internal_agent_started(self, printed_lines, service_mode_internal_agent_started, task_id): + log_start_msg = self._task_logging_start_message.format(task_id) + log_control_end_msg = self._task_logging_pass_control_message.format(task_id) + filter_lines = printed_lines if not service_mode_internal_agent_started else [] + for i, line in enumerate(printed_lines): + if not service_mode_internal_agent_started and line.startswith(log_start_msg): + service_mode_internal_agent_started = True + filter_lines = printed_lines[:i+1] + elif line.startswith(log_control_end_msg): + return filter_lines, service_mode_internal_agent_started, 0 + + return filter_lines, service_mode_internal_agent_started, None + def send_logs(self, task_id, lines, level="DEBUG"): """ Send output lines as log events to backend @@ -1187,7 +1255,8 @@ class Worker(ServiceCommandSection): script_dir = (directory if isinstance(directory, Path) else Path(directory)).absolute().as_posix() # run code - print("Running task id [%s]:" % current_task.id) + # print("Running task id [%s]:" % current_task.id) + print(self._task_logging_pass_control_message.format(current_task.id)) extra = ['-u', ] if optimization: extra.append( @@ -1951,6 +2020,8 @@ class Worker(ServiceCommandSection): base_cmd += ['--gpus', 'device='+gpu_devices, ] # We are using --gpu, so we should not pass NVIDIA_VISIBLE_DEVICES, I think. # base_cmd += ['-e', 'NVIDIA_VISIBLE_DEVICES=' + gpu_devices, ] + elif gpu_devices.strip() == 'none': + dockers_nvidia_visible_devices = gpu_devices if docker_arguments: docker_arguments = list(docker_arguments) \ diff --git a/trains_agent/interface/worker.py b/trains_agent/interface/worker.py index 4af6285..aa4be70 100644 --- a/trains_agent/interface/worker.py +++ b/trains_agent/interface/worker.py @@ -72,6 +72,10 @@ DAEMON_ARGS = dict({ 'help': 'Do not use any network connects, assume everything is pre-installed', 'action': 'store_true', }, + '--services-mode': { + 'help': 'Launch multiple long-term docker services. Implies docker & cpu-only flags.', + 'action': 'store_true', + }, '--detached': { 'help': 'Detached mode, run agent in the background', 'action': 'store_true',