From c331babf519de55bd139bd3fabe37a94e586da98 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Wed, 7 Apr 2021 18:44:33 +0300 Subject: [PATCH] Add stopping message on Task process termination Fix --stop on dynamic gpus venv mode --- clearml_agent/commands/worker.py | 29 ++++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/clearml_agent/commands/worker.py b/clearml_agent/commands/worker.py index 9f6d844..edc753b 100644 --- a/clearml_agent/commands/worker.py +++ b/clearml_agent/commands/worker.py @@ -659,7 +659,7 @@ class Worker(ServiceCommandSection): self._services_mode = True # last 64 tasks - list_task_ids = [] + list_task_gpus_ids = {} try: while True: queue_tags = None @@ -747,6 +747,7 @@ class Worker(ServiceCommandSection): org_gpus = os.environ.get('NVIDIA_VISIBLE_DEVICES') worker_id = self.worker_id + # the following is only executed in dynamic gpus mode if gpu_queues and gpu_queues.get(queue): # pick the first available GPUs gpus = available_gpus[:gpu_queues.get(queue)] @@ -755,6 +756,7 @@ class Worker(ServiceCommandSection): key='available_gpus', value=','.join(str(g) for g in available_gpus)) os.environ['CUDA_VISIBLE_DEVICES'] = \ os.environ['NVIDIA_VISIBLE_DEVICES'] = ','.join(str(g) for g in gpus) + list_task_gpus_ids.update({str(g): task_id for g in gpus}) self.worker_id = ':'.join(self.worker_id.split(':')[:-1] + ['gpu'+','.join(str(g) for g in gpus)]) self.send_logs( @@ -763,10 +765,6 @@ class Worker(ServiceCommandSection): level="INFO", ) - list_task_ids.append(task_id) - if len(list_task_ids) > 64: - list_task_ids.pop(0) - self.run_one_task(queue, task_id, worker_params) if gpu_queues: @@ -792,11 +790,24 @@ class Worker(ServiceCommandSection): if self._session.config["agent.reload_config"]: self.reload_config() finally: - # shutdown all active docker runs + # if we are in dynamic gpus mode, shutdown all active runs if self.docker_image_func: - for t_id in reversed(list_task_ids): + for t_id in set(list_task_gpus_ids.values()): if shutdown_docker_process(docker_cmd_contains='--id {}\'\"'.format(t_id)): self.handle_task_termination(task_id=t_id, exit_code=0, stop_reason=TaskStopReason.stopped) + else: + # if we are here, just kill all sub processes + kill_all_child_processes() + for t_id in set(list_task_gpus_ids.values()): + # check if Task is running, + task_info = get_task( + self._session, t_id, only_fields=["status"] + ) + # this is a bit risky we might have rerun it again after it already completed + # basically we are not removing completed tasks from the list, hence the issue + if str(task_info.status) == "in_progress": + self.handle_task_termination( + task_id=t_id, exit_code=0, stop_reason=TaskStopReason.stopped) def _dynamic_gpu_get_available(self, gpu_indexes): # noinspection PyBroadException @@ -2060,6 +2071,7 @@ class Worker(ServiceCommandSection): try: if stop_reason == TaskStopReason.stopped: self.log("Stopping - tasks.stop was called for task") + self.send_logs(task_id, ["Process aborted by user"]) self._session.api_client.tasks.stopped( task=task_id, status_reason="task was stopped by tasks.stop", @@ -2109,6 +2121,7 @@ class Worker(ServiceCommandSection): if exit_code == COMMAND_SUCCESS: self.log("Task success: completing") + self.send_logs(task_id, ["Process completed successfully"]) self._session.api_client.tasks.completed( task=task_id, status_reason="worker execution done", @@ -2116,6 +2129,7 @@ class Worker(ServiceCommandSection): ) elif exit_code in (ExitStatus.interrupted, 256+ExitStatus.interrupted): self.log("Task interrupted: stopping") + self.send_logs(task_id, ["Process terminated by user"]) self._session.api_client.tasks.stopped( task=task_id, status_reason="user abort", @@ -2123,6 +2137,7 @@ class Worker(ServiceCommandSection): ) else: self.log("Task failure: setting status to 'failed'") + self.send_logs(task_id, ["Process failed, exit code {}".format(exit_code)]) self._session.api_client.tasks.failed( task=task_id, status_reason="worker execution exit code {}".format(exit_code),