mirror of
https://github.com/clearml/clearml-agent
synced 2025-04-30 02:30:56 +00:00
Add dynamic mode terminate dockers on sig_term
This commit is contained in:
parent
f33e0b2f78
commit
9e9fcb0ba9
@ -658,132 +658,145 @@ class Worker(ServiceCommandSection):
|
||||
# multi instance support
|
||||
self._services_mode = True
|
||||
|
||||
while True:
|
||||
queue_tags = None
|
||||
runtime_props = None
|
||||
# last 64 tasks
|
||||
list_task_ids = []
|
||||
try:
|
||||
while True:
|
||||
queue_tags = None
|
||||
runtime_props = None
|
||||
|
||||
if max_num_instances and max_num_instances > 0:
|
||||
# make sure we do not have too many instances to run
|
||||
if len(Singleton.get_running_pids()) >= max_num_instances:
|
||||
if self._daemon_foreground or worker_params.debug:
|
||||
print("Reached max number of services, sleeping for {:.1f} seconds".format(
|
||||
self._polling_interval))
|
||||
sleep(self._polling_interval)
|
||||
continue
|
||||
|
||||
# update available gpus
|
||||
if gpu_queues:
|
||||
available_gpus = self._dynamic_gpu_get_available(gpu_indexes)
|
||||
# if something went wrong or we have no free gpus
|
||||
# start over from the highest priority queue
|
||||
if not available_gpus:
|
||||
if self._daemon_foreground or worker_params.debug:
|
||||
print("All GPUs allocated, sleeping for {:.1f} seconds".format(self._polling_interval))
|
||||
sleep(self._polling_interval)
|
||||
continue
|
||||
|
||||
# iterate over queues (priority style, queues[0] is highest)
|
||||
for queue in queues:
|
||||
|
||||
if queue_tags is None or runtime_props is None:
|
||||
queue_tags, runtime_props = self.get_worker_properties(queues)
|
||||
|
||||
if not self.should_be_currently_active(queue_tags[queue], runtime_props):
|
||||
continue
|
||||
|
||||
if gpu_queues:
|
||||
# peek into queue
|
||||
# get next task in queue
|
||||
try:
|
||||
response = self._session.send_api(queues_api.GetByIdRequest(queue=queue))
|
||||
except Exception:
|
||||
# if something went wrong start over from the highest priority queue
|
||||
break
|
||||
if not len(response.queue.entries):
|
||||
continue
|
||||
# check if we have enough available gpus
|
||||
if gpu_queues[queue] > len(available_gpus):
|
||||
# not enough available_gpus, we should sleep and start over
|
||||
if max_num_instances and max_num_instances > 0:
|
||||
# make sure we do not have too many instances to run
|
||||
if len(Singleton.get_running_pids()) >= max_num_instances:
|
||||
if self._daemon_foreground or worker_params.debug:
|
||||
print("Not enough free GPUs {}/{}, sleeping for {:.1f} seconds".format(
|
||||
len(available_gpus), gpu_queues[queue], self._polling_interval))
|
||||
print("Reached max number of services, sleeping for {:.1f} seconds".format(
|
||||
self._polling_interval))
|
||||
sleep(self._polling_interval)
|
||||
break
|
||||
|
||||
# get next task in queue
|
||||
try:
|
||||
response = self._session.send_api(
|
||||
queues_api.GetNextTaskRequest(queue=queue)
|
||||
)
|
||||
except Exception as e:
|
||||
print(
|
||||
"Warning: Could not access task queue [{}], error: {}".format(
|
||||
queue, e
|
||||
)
|
||||
)
|
||||
continue
|
||||
else:
|
||||
try:
|
||||
task_id = response.entry.task
|
||||
except AttributeError:
|
||||
if self._daemon_foreground or worker_params.debug:
|
||||
print("No tasks in queue {}".format(queue))
|
||||
continue
|
||||
|
||||
# clear output log if we start a new Task
|
||||
if not worker_params.debug and self._redirected_stdout_file_no is not None and \
|
||||
self._redirected_stdout_file_no > 2:
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
os.lseek(self._redirected_stdout_file_no, 0, 0)
|
||||
os.ftruncate(self._redirected_stdout_file_no, 0)
|
||||
except:
|
||||
pass
|
||||
# update available gpus
|
||||
if gpu_queues:
|
||||
available_gpus = self._dynamic_gpu_get_available(gpu_indexes)
|
||||
# if something went wrong or we have no free gpus
|
||||
# start over from the highest priority queue
|
||||
if not available_gpus:
|
||||
if self._daemon_foreground or worker_params.debug:
|
||||
print("All GPUs allocated, sleeping for {:.1f} seconds".format(self._polling_interval))
|
||||
sleep(self._polling_interval)
|
||||
continue
|
||||
|
||||
self.report_monitor(ResourceMonitor.StatusReport(queues=queues, queue=queue, task=task_id))
|
||||
# iterate over queues (priority style, queues[0] is highest)
|
||||
for queue in queues:
|
||||
|
||||
org_gpus = os.environ.get('NVIDIA_VISIBLE_DEVICES')
|
||||
worker_id = self.worker_id
|
||||
if gpu_queues and gpu_queues.get(queue):
|
||||
# pick the first available GPUs
|
||||
gpus = available_gpus[:gpu_queues.get(queue)]
|
||||
available_gpus = available_gpus[gpu_queues.get(queue):]
|
||||
self.set_runtime_properties(
|
||||
key='available_gpus', value=','.join(str(g) for g in available_gpus))
|
||||
os.environ['CUDA_VISIBLE_DEVICES'] = \
|
||||
os.environ['NVIDIA_VISIBLE_DEVICES'] = ','.join(str(g) for g in gpus)
|
||||
self.worker_id = ':'.join(self.worker_id.split(':')[:-1] + ['gpu'+','.join(str(g) for g in gpus)])
|
||||
if queue_tags is None or runtime_props is None:
|
||||
queue_tags, runtime_props = self.get_worker_properties(queues)
|
||||
|
||||
self.send_logs(
|
||||
task_id=task_id,
|
||||
lines=["task {} pulled from {} by worker {}\n".format(task_id, queue, self.worker_id)],
|
||||
level="INFO",
|
||||
)
|
||||
|
||||
self.run_one_task(queue, task_id, worker_params)
|
||||
if not self.should_be_currently_active(queue_tags[queue], runtime_props):
|
||||
continue
|
||||
|
||||
if gpu_queues:
|
||||
self.worker_id = worker_id
|
||||
os.environ['CUDA_VISIBLE_DEVICES'] = \
|
||||
os.environ['NVIDIA_VISIBLE_DEVICES'] = org_gpus
|
||||
# peek into queue
|
||||
# get next task in queue
|
||||
try:
|
||||
response = self._session.send_api(queues_api.GetByIdRequest(queue=queue))
|
||||
except Exception:
|
||||
# if something went wrong start over from the highest priority queue
|
||||
break
|
||||
if not len(response.queue.entries):
|
||||
continue
|
||||
# check if we have enough available gpus
|
||||
if gpu_queues[queue] > len(available_gpus):
|
||||
# not enough available_gpus, we should sleep and start over
|
||||
if self._daemon_foreground or worker_params.debug:
|
||||
print("Not enough free GPUs {}/{}, sleeping for {:.1f} seconds".format(
|
||||
len(available_gpus), gpu_queues[queue], self._polling_interval))
|
||||
sleep(self._polling_interval)
|
||||
break
|
||||
|
||||
self.report_monitor(ResourceMonitor.StatusReport(queues=self.queues))
|
||||
# get next task in queue
|
||||
try:
|
||||
response = self._session.send_api(
|
||||
queues_api.GetNextTaskRequest(queue=queue)
|
||||
)
|
||||
except Exception as e:
|
||||
print(
|
||||
"Warning: Could not access task queue [{}], error: {}".format(
|
||||
queue, e
|
||||
)
|
||||
)
|
||||
continue
|
||||
else:
|
||||
try:
|
||||
task_id = response.entry.task
|
||||
except AttributeError:
|
||||
if self._daemon_foreground or worker_params.debug:
|
||||
print("No tasks in queue {}".format(queue))
|
||||
continue
|
||||
|
||||
queue_tags = None
|
||||
runtime_props = None
|
||||
# clear output log if we start a new Task
|
||||
if not worker_params.debug and self._redirected_stdout_file_no is not None and \
|
||||
self._redirected_stdout_file_no > 2:
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
os.lseek(self._redirected_stdout_file_no, 0, 0)
|
||||
os.ftruncate(self._redirected_stdout_file_no, 0)
|
||||
except:
|
||||
pass
|
||||
|
||||
# if we are using priority start pulling from the first always,
|
||||
# if we are doing round robin, pull from the next one
|
||||
if priority_order:
|
||||
break
|
||||
else:
|
||||
# sleep and retry polling
|
||||
if self._daemon_foreground or worker_params.debug:
|
||||
print("No tasks in Queues, sleeping for {:.1f} seconds".format(self._polling_interval))
|
||||
sleep(self._polling_interval)
|
||||
self.report_monitor(ResourceMonitor.StatusReport(queues=queues, queue=queue, task=task_id))
|
||||
|
||||
if self._session.config["agent.reload_config"]:
|
||||
self.reload_config()
|
||||
org_gpus = os.environ.get('NVIDIA_VISIBLE_DEVICES')
|
||||
worker_id = self.worker_id
|
||||
if gpu_queues and gpu_queues.get(queue):
|
||||
# pick the first available GPUs
|
||||
gpus = available_gpus[:gpu_queues.get(queue)]
|
||||
available_gpus = available_gpus[gpu_queues.get(queue):]
|
||||
self.set_runtime_properties(
|
||||
key='available_gpus', value=','.join(str(g) for g in available_gpus))
|
||||
os.environ['CUDA_VISIBLE_DEVICES'] = \
|
||||
os.environ['NVIDIA_VISIBLE_DEVICES'] = ','.join(str(g) for g in gpus)
|
||||
self.worker_id = ':'.join(self.worker_id.split(':')[:-1] + ['gpu'+','.join(str(g) for g in gpus)])
|
||||
|
||||
self.send_logs(
|
||||
task_id=task_id,
|
||||
lines=["task {} pulled from {} by worker {}\n".format(task_id, queue, self.worker_id)],
|
||||
level="INFO",
|
||||
)
|
||||
|
||||
list_task_ids.append(task_id)
|
||||
if len(list_task_ids) > 64:
|
||||
list_task_ids.pop(0)
|
||||
|
||||
self.run_one_task(queue, task_id, worker_params)
|
||||
|
||||
if gpu_queues:
|
||||
self.worker_id = worker_id
|
||||
os.environ['CUDA_VISIBLE_DEVICES'] = \
|
||||
os.environ['NVIDIA_VISIBLE_DEVICES'] = org_gpus
|
||||
|
||||
self.report_monitor(ResourceMonitor.StatusReport(queues=self.queues))
|
||||
|
||||
queue_tags = None
|
||||
runtime_props = None
|
||||
|
||||
# if we are using priority start pulling from the first always,
|
||||
# if we are doing round robin, pull from the next one
|
||||
if priority_order:
|
||||
break
|
||||
else:
|
||||
# sleep and retry polling
|
||||
if self._daemon_foreground or worker_params.debug:
|
||||
print("No tasks in Queues, sleeping for {:.1f} seconds".format(self._polling_interval))
|
||||
sleep(self._polling_interval)
|
||||
|
||||
if self._session.config["agent.reload_config"]:
|
||||
self.reload_config()
|
||||
finally:
|
||||
# shutdown all active docker runs
|
||||
if self.docker_image_func:
|
||||
for t_id in reversed(list_task_ids):
|
||||
if shutdown_docker_process(docker_cmd_contains='--id {}\'\"'.format(t_id)):
|
||||
self.handle_task_termination(task_id=t_id, exit_code=0, stop_reason=TaskStopReason.stopped)
|
||||
|
||||
def _dynamic_gpu_get_available(self, gpu_indexes):
|
||||
# noinspection PyBroadException
|
||||
@ -964,7 +977,7 @@ class Worker(ServiceCommandSection):
|
||||
def daemon(self, queues, log_level, foreground=False, docker=False, detached=False, order_fairness=False, **kwargs):
|
||||
|
||||
# check that we have docker command if we need it
|
||||
if docker is not False and not check_if_command_exists("docker"):
|
||||
if docker not in (False, None) and not check_if_command_exists("docker"):
|
||||
raise ValueError("Running in Docker mode, 'docker' command was not found")
|
||||
|
||||
self._standalone_mode = kwargs.get('standalone_mode', False)
|
||||
@ -2938,7 +2951,6 @@ class Worker(ServiceCommandSection):
|
||||
|
||||
def _kill_daemon(self, dynamic_gpus=False):
|
||||
worker_id, worker_name = self._generate_worker_id_name(dynamic_gpus=dynamic_gpus)
|
||||
success = False
|
||||
|
||||
# Iterate over all running process
|
||||
for pid, uid, slot, file in sorted(Singleton.get_running_pids(), key=lambda x: x[1] or ''):
|
||||
@ -2946,24 +2958,20 @@ class Worker(ServiceCommandSection):
|
||||
continue
|
||||
|
||||
# if dynamic gpus kill all children
|
||||
if dynamic_gpus and (uid.startswith('{}:gpu'.format(worker_name)) or uid == worker_id):
|
||||
if dynamic_gpus and uid == worker_id:
|
||||
print('Terminating clearml-agent worker_id={} pid={}'.format(uid, pid))
|
||||
if not terminate_process(pid, timeout=10):
|
||||
if not terminate_process(pid, timeout=120):
|
||||
warning('Could not terminate process pid={}'.format(pid))
|
||||
success = True
|
||||
continue
|
||||
return True
|
||||
|
||||
# wither we have a match for the worker_id or we just pick the first one, and kill it.
|
||||
if (worker_id and uid == worker_id) or (not worker_id and uid.startswith('{}:'.format(worker_name))):
|
||||
# this is us kill it
|
||||
print('Terminating clearml-agent worker_id={} pid={}'.format(uid, pid))
|
||||
if not terminate_process(pid, timeout=10):
|
||||
timeout = 120 if uid.startswith('{}:dgpu'.format(worker_name)) else 10
|
||||
if not terminate_process(pid, timeout=timeout):
|
||||
error('Could not terminate process pid={}'.format(pid))
|
||||
success = True
|
||||
break
|
||||
|
||||
if success:
|
||||
return True
|
||||
return True
|
||||
|
||||
print('Could not find a running clearml-agent instance with worker_name={} worker_id={}'.format(
|
||||
worker_name, worker_id))
|
||||
|
@ -103,9 +103,10 @@ def shutdown_docker_process(docker_cmd_contains=None, docker_id=None):
|
||||
docker_id = get_docker_id(docker_cmd_contains=docker_cmd_contains)
|
||||
if docker_id:
|
||||
# we found our docker, stop it
|
||||
get_bash_output(cmd='docker stop -t 1 {}'.format(docker_id))
|
||||
return get_bash_output(cmd='docker stop -t 1 {}'.format(docker_id))
|
||||
except Exception:
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
def commit_docker(container_name, docker_cmd_contains=None, docker_id=None, apply_change=None):
|
||||
|
Loading…
Reference in New Issue
Block a user