Fix --stop support for dynamic gpus

This commit is contained in:
allegroai 2021-04-25 10:46:43 +03:00
parent 8569c02b33
commit e60a6f9d14
2 changed files with 46 additions and 24 deletions

View File

@ -90,7 +90,7 @@ from clearml_agent.helper.process import (
get_bash_output, get_bash_output,
shutdown_docker_process, shutdown_docker_process,
get_docker_id, get_docker_id,
commit_docker, terminate_process, check_if_command_exists, commit_docker, terminate_process, check_if_command_exists, terminate_all_child_processes,
) )
from clearml_agent.helper.package.priority_req import PriorityPackageRequirement, PackageCollectorRequirement from clearml_agent.helper.package.priority_req import PriorityPackageRequirement, PackageCollectorRequirement
from clearml_agent.helper.repo import clone_repository_cached, RepoInfo, VCS, fix_package_import_diff_patch from clearml_agent.helper.repo import clone_repository_cached, RepoInfo, VCS, fix_package_import_diff_patch
@ -685,6 +685,9 @@ class Worker(ServiceCommandSection):
shutdown_docker_process(docker_cmd_contains='--id {}\'\"'.format(task_id)) shutdown_docker_process(docker_cmd_contains='--id {}\'\"'.format(task_id))
safe_remove_file(temp_stdout_name) safe_remove_file(temp_stdout_name)
safe_remove_file(temp_stderr_name) safe_remove_file(temp_stderr_name)
if self._services_mode and status == ExitStatus.interrupted:
# unregister this worker, it was killed
self._unregister()
def run_tasks_loop(self, queues, worker_params, priority_order=True, gpu_indexes=None, gpu_queues=None): def run_tasks_loop(self, queues, worker_params, priority_order=True, gpu_indexes=None, gpu_queues=None):
""" """
@ -719,6 +722,7 @@ class Worker(ServiceCommandSection):
# get current running instances # get current running instances
available_gpus = None available_gpus = None
dynamic_gpus_worker_id = None
if gpu_indexes and gpu_queues: if gpu_indexes and gpu_queues:
available_gpus, gpu_queues = self._setup_dynamic_gpus(gpu_queues) available_gpus, gpu_queues = self._setup_dynamic_gpus(gpu_queues)
# multi instance support # multi instance support
@ -812,7 +816,7 @@ class Worker(ServiceCommandSection):
self.report_monitor(ResourceMonitor.StatusReport(queues=queues, queue=queue, task=task_id)) self.report_monitor(ResourceMonitor.StatusReport(queues=queues, queue=queue, task=task_id))
org_gpus = os.environ.get('NVIDIA_VISIBLE_DEVICES') org_gpus = os.environ.get('NVIDIA_VISIBLE_DEVICES')
worker_id = self.worker_id dynamic_gpus_worker_id = self.worker_id
# the following is only executed in dynamic gpus mode # the following is only executed in dynamic gpus mode
if gpu_queues and gpu_queues.get(queue): if gpu_queues and gpu_queues.get(queue):
# pick the first available GPUs # pick the first available GPUs
@ -836,7 +840,7 @@ class Worker(ServiceCommandSection):
self.run_one_task(queue, task_id, worker_params) self.run_one_task(queue, task_id, worker_params)
if gpu_queues: if gpu_queues:
self.worker_id = worker_id self.worker_id = dynamic_gpus_worker_id
os.environ['CUDA_VISIBLE_DEVICES'] = \ os.environ['CUDA_VISIBLE_DEVICES'] = \
os.environ['NVIDIA_VISIBLE_DEVICES'] = org_gpus os.environ['NVIDIA_VISIBLE_DEVICES'] = org_gpus
@ -864,18 +868,18 @@ class Worker(ServiceCommandSection):
if shutdown_docker_process(docker_cmd_contains='--id {}\'\"'.format(t_id)): if shutdown_docker_process(docker_cmd_contains='--id {}\'\"'.format(t_id)):
self.handle_task_termination(task_id=t_id, exit_code=0, stop_reason=TaskStopReason.stopped) self.handle_task_termination(task_id=t_id, exit_code=0, stop_reason=TaskStopReason.stopped)
else: else:
# if we are in dynamic gpus / services mode,
# we should send termination signal to all child processes
if self._services_mode:
terminate_all_child_processes(timeout=120)
# if we are here, just kill all sub processes # if we are here, just kill all sub processes
kill_all_child_processes() kill_all_child_processes()
for t_id in set(list_task_gpus_ids.values()):
# check if Task is running, # unregister dynamic GPU worker, if we were terminated while setting up a Task
task_info = get_task( if dynamic_gpus_worker_id:
self._session, t_id, only_fields=["status"] self.worker_id = dynamic_gpus_worker_id
) self._unregister()
# this is a bit risky we might have rerun it again after it already completed
# basically we are not removing completed tasks from the list, hence the issue
if str(task_info.status) == "in_progress":
self.handle_task_termination(
task_id=t_id, exit_code=0, stop_reason=TaskStopReason.stopped)
def _dynamic_gpu_get_available(self, gpu_indexes): def _dynamic_gpu_get_available(self, gpu_indexes):
# noinspection PyBroadException # noinspection PyBroadException
@ -1792,15 +1796,16 @@ class Worker(ServiceCommandSection):
debug=self._session.debug_mode, debug=self._session.debug_mode,
trace=self._session.trace, trace=self._session.trace,
) )
self.report_monitor(ResourceMonitor.StatusReport(task=current_task.id)) try:
self.run_one_task(queue='', task_id=current_task.id, worker_args=worker_params, docker=docker) self.report_monitor(ResourceMonitor.StatusReport(task=current_task.id))
self.run_one_task(queue='', task_id=current_task.id, worker_args=worker_params, docker=docker)
finally:
self.stop_monitor()
self._unregister()
self.stop_monitor() if full_monitoring and self.temp_config_path:
self._unregister() safe_remove_file(self._session.config_file)
Singleton.close_pid_file()
if full_monitoring and self.temp_config_path:
safe_remove_file(self._session.config_file)
Singleton.close_pid_file()
return return
self._session.print_configuration() self._session.print_configuration()

View File

@ -42,18 +42,18 @@ def get_bash_output(cmd, strip=False, stderr=subprocess.STDOUT, stdin=False):
return output if not strip or not output else output.strip() return output if not strip or not output else output.strip()
def terminate_process(pid, timeout=10.): def terminate_process(pid, timeout=10., ignore_zombie=True):
# noinspection PyBroadException # noinspection PyBroadException
try: try:
proc = psutil.Process(pid) proc = psutil.Process(pid)
proc.terminate() proc.terminate()
cnt = 0 cnt = 0
while proc.is_running() and cnt < timeout: while proc.is_running() and (ignore_zombie or proc.status() != 'zombie') and cnt < timeout:
sleep(1.) sleep(1.)
cnt += 1 cnt += 1
proc.terminate() proc.terminate()
cnt = 0 cnt = 0
while proc.is_running() and cnt < timeout: while proc.is_running() and (ignore_zombie or proc.status() != 'zombie') and cnt < timeout:
sleep(1.) sleep(1.)
cnt += 1 cnt += 1
proc.kill() proc.kill()
@ -84,6 +84,23 @@ def kill_all_child_processes(pid=None):
parent.kill() parent.kill()
def terminate_all_child_processes(pid=None, timeout=10., include_parent=True):
# get current process if pid not provided
if not pid:
pid = os.getpid()
include_parent = False
try:
parent = psutil.Process(pid)
except psutil.Error:
# could not find parent process id
return
for child in parent.children(recursive=False):
print('Terminating child process {}'.format(child.pid))
terminate_process(child.pid, timeout=timeout, ignore_zombie=False)
if include_parent:
terminate_process(parent.pid, timeout=timeout, ignore_zombie=False)
def get_docker_id(docker_cmd_contains): def get_docker_id(docker_cmd_contains):
try: try:
containers_running = get_bash_output(cmd='docker ps --no-trunc --format \"{{.ID}}: {{.Command}}\"') containers_running = get_bash_output(cmd='docker ps --no-trunc --format \"{{.ID}}: {{.Command}}\"')