From e60a6f9d14c0eb2064185b3a80e650ca050c5a60 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Sun, 25 Apr 2021 10:46:43 +0300 Subject: [PATCH] Fix --stop support for dynamic gpus --- clearml_agent/commands/worker.py | 47 ++++++++++++++++++-------------- clearml_agent/helper/process.py | 23 ++++++++++++++-- 2 files changed, 46 insertions(+), 24 deletions(-) diff --git a/clearml_agent/commands/worker.py b/clearml_agent/commands/worker.py index c08b07f..3de4fb0 100644 --- a/clearml_agent/commands/worker.py +++ b/clearml_agent/commands/worker.py @@ -90,7 +90,7 @@ from clearml_agent.helper.process import ( get_bash_output, shutdown_docker_process, get_docker_id, - commit_docker, terminate_process, check_if_command_exists, + commit_docker, terminate_process, check_if_command_exists, terminate_all_child_processes, ) from clearml_agent.helper.package.priority_req import PriorityPackageRequirement, PackageCollectorRequirement from clearml_agent.helper.repo import clone_repository_cached, RepoInfo, VCS, fix_package_import_diff_patch @@ -685,6 +685,9 @@ class Worker(ServiceCommandSection): shutdown_docker_process(docker_cmd_contains='--id {}\'\"'.format(task_id)) safe_remove_file(temp_stdout_name) safe_remove_file(temp_stderr_name) + if self._services_mode and status == ExitStatus.interrupted: + # unregister this worker, it was killed + self._unregister() def run_tasks_loop(self, queues, worker_params, priority_order=True, gpu_indexes=None, gpu_queues=None): """ @@ -719,6 +722,7 @@ class Worker(ServiceCommandSection): # get current running instances available_gpus = None + dynamic_gpus_worker_id = None if gpu_indexes and gpu_queues: available_gpus, gpu_queues = self._setup_dynamic_gpus(gpu_queues) # multi instance support @@ -812,7 +816,7 @@ class Worker(ServiceCommandSection): self.report_monitor(ResourceMonitor.StatusReport(queues=queues, queue=queue, task=task_id)) org_gpus = os.environ.get('NVIDIA_VISIBLE_DEVICES') - worker_id = self.worker_id + dynamic_gpus_worker_id = self.worker_id # the following is only executed in dynamic gpus mode if gpu_queues and gpu_queues.get(queue): # pick the first available GPUs @@ -836,7 +840,7 @@ class Worker(ServiceCommandSection): self.run_one_task(queue, task_id, worker_params) if gpu_queues: - self.worker_id = worker_id + self.worker_id = dynamic_gpus_worker_id os.environ['CUDA_VISIBLE_DEVICES'] = \ os.environ['NVIDIA_VISIBLE_DEVICES'] = org_gpus @@ -864,18 +868,18 @@ class Worker(ServiceCommandSection): if shutdown_docker_process(docker_cmd_contains='--id {}\'\"'.format(t_id)): self.handle_task_termination(task_id=t_id, exit_code=0, stop_reason=TaskStopReason.stopped) else: + # if we are in dynamic gpus / services mode, + # we should send termination signal to all child processes + if self._services_mode: + terminate_all_child_processes(timeout=120) + # if we are here, just kill all sub processes kill_all_child_processes() - for t_id in set(list_task_gpus_ids.values()): - # check if Task is running, - task_info = get_task( - self._session, t_id, only_fields=["status"] - ) - # this is a bit risky we might have rerun it again after it already completed - # basically we are not removing completed tasks from the list, hence the issue - if str(task_info.status) == "in_progress": - self.handle_task_termination( - task_id=t_id, exit_code=0, stop_reason=TaskStopReason.stopped) + + # unregister dynamic GPU worker, if we were terminated while setting up a Task + if dynamic_gpus_worker_id: + self.worker_id = dynamic_gpus_worker_id + self._unregister() def _dynamic_gpu_get_available(self, gpu_indexes): # noinspection PyBroadException @@ -1792,15 +1796,16 @@ class Worker(ServiceCommandSection): debug=self._session.debug_mode, trace=self._session.trace, ) - self.report_monitor(ResourceMonitor.StatusReport(task=current_task.id)) - self.run_one_task(queue='', task_id=current_task.id, worker_args=worker_params, docker=docker) + try: + self.report_monitor(ResourceMonitor.StatusReport(task=current_task.id)) + self.run_one_task(queue='', task_id=current_task.id, worker_args=worker_params, docker=docker) + finally: + self.stop_monitor() + self._unregister() - self.stop_monitor() - self._unregister() - - if full_monitoring and self.temp_config_path: - safe_remove_file(self._session.config_file) - Singleton.close_pid_file() + if full_monitoring and self.temp_config_path: + safe_remove_file(self._session.config_file) + Singleton.close_pid_file() return self._session.print_configuration() diff --git a/clearml_agent/helper/process.py b/clearml_agent/helper/process.py index b8f4ef9..427d9c9 100644 --- a/clearml_agent/helper/process.py +++ b/clearml_agent/helper/process.py @@ -42,18 +42,18 @@ def get_bash_output(cmd, strip=False, stderr=subprocess.STDOUT, stdin=False): return output if not strip or not output else output.strip() -def terminate_process(pid, timeout=10.): +def terminate_process(pid, timeout=10., ignore_zombie=True): # noinspection PyBroadException try: proc = psutil.Process(pid) proc.terminate() cnt = 0 - while proc.is_running() and cnt < timeout: + while proc.is_running() and (ignore_zombie or proc.status() != 'zombie') and cnt < timeout: sleep(1.) cnt += 1 proc.terminate() cnt = 0 - while proc.is_running() and cnt < timeout: + while proc.is_running() and (ignore_zombie or proc.status() != 'zombie') and cnt < timeout: sleep(1.) cnt += 1 proc.kill() @@ -84,6 +84,23 @@ def kill_all_child_processes(pid=None): parent.kill() +def terminate_all_child_processes(pid=None, timeout=10., include_parent=True): + # get current process if pid not provided + if not pid: + pid = os.getpid() + include_parent = False + try: + parent = psutil.Process(pid) + except psutil.Error: + # could not find parent process id + return + for child in parent.children(recursive=False): + print('Terminating child process {}'.format(child.pid)) + terminate_process(child.pid, timeout=timeout, ignore_zombie=False) + if include_parent: + terminate_process(parent.pid, timeout=timeout, ignore_zombie=False) + + def get_docker_id(docker_cmd_contains): try: containers_running = get_bash_output(cmd='docker ps --no-trunc --format \"{{.ID}}: {{.Command}}\"')