mirror of
https://github.com/clearml/clearml-agent
synced 2025-03-15 16:01:56 +00:00
Add stopping message on Task process termination
Fix --stop on dynamic gpus venv mode
This commit is contained in:
parent
c59d268995
commit
c331babf51
@ -659,7 +659,7 @@ class Worker(ServiceCommandSection):
|
|||||||
self._services_mode = True
|
self._services_mode = True
|
||||||
|
|
||||||
# last 64 tasks
|
# last 64 tasks
|
||||||
list_task_ids = []
|
list_task_gpus_ids = {}
|
||||||
try:
|
try:
|
||||||
while True:
|
while True:
|
||||||
queue_tags = None
|
queue_tags = None
|
||||||
@ -747,6 +747,7 @@ class Worker(ServiceCommandSection):
|
|||||||
|
|
||||||
org_gpus = os.environ.get('NVIDIA_VISIBLE_DEVICES')
|
org_gpus = os.environ.get('NVIDIA_VISIBLE_DEVICES')
|
||||||
worker_id = self.worker_id
|
worker_id = self.worker_id
|
||||||
|
# the following is only executed in dynamic gpus mode
|
||||||
if gpu_queues and gpu_queues.get(queue):
|
if gpu_queues and gpu_queues.get(queue):
|
||||||
# pick the first available GPUs
|
# pick the first available GPUs
|
||||||
gpus = available_gpus[:gpu_queues.get(queue)]
|
gpus = available_gpus[:gpu_queues.get(queue)]
|
||||||
@ -755,6 +756,7 @@ class Worker(ServiceCommandSection):
|
|||||||
key='available_gpus', value=','.join(str(g) for g in available_gpus))
|
key='available_gpus', value=','.join(str(g) for g in available_gpus))
|
||||||
os.environ['CUDA_VISIBLE_DEVICES'] = \
|
os.environ['CUDA_VISIBLE_DEVICES'] = \
|
||||||
os.environ['NVIDIA_VISIBLE_DEVICES'] = ','.join(str(g) for g in gpus)
|
os.environ['NVIDIA_VISIBLE_DEVICES'] = ','.join(str(g) for g in gpus)
|
||||||
|
list_task_gpus_ids.update({str(g): task_id for g in gpus})
|
||||||
self.worker_id = ':'.join(self.worker_id.split(':')[:-1] + ['gpu'+','.join(str(g) for g in gpus)])
|
self.worker_id = ':'.join(self.worker_id.split(':')[:-1] + ['gpu'+','.join(str(g) for g in gpus)])
|
||||||
|
|
||||||
self.send_logs(
|
self.send_logs(
|
||||||
@ -763,10 +765,6 @@ class Worker(ServiceCommandSection):
|
|||||||
level="INFO",
|
level="INFO",
|
||||||
)
|
)
|
||||||
|
|
||||||
list_task_ids.append(task_id)
|
|
||||||
if len(list_task_ids) > 64:
|
|
||||||
list_task_ids.pop(0)
|
|
||||||
|
|
||||||
self.run_one_task(queue, task_id, worker_params)
|
self.run_one_task(queue, task_id, worker_params)
|
||||||
|
|
||||||
if gpu_queues:
|
if gpu_queues:
|
||||||
@ -792,11 +790,24 @@ class Worker(ServiceCommandSection):
|
|||||||
if self._session.config["agent.reload_config"]:
|
if self._session.config["agent.reload_config"]:
|
||||||
self.reload_config()
|
self.reload_config()
|
||||||
finally:
|
finally:
|
||||||
# shutdown all active docker runs
|
# if we are in dynamic gpus mode, shutdown all active runs
|
||||||
if self.docker_image_func:
|
if self.docker_image_func:
|
||||||
for t_id in reversed(list_task_ids):
|
for t_id in set(list_task_gpus_ids.values()):
|
||||||
if shutdown_docker_process(docker_cmd_contains='--id {}\'\"'.format(t_id)):
|
if shutdown_docker_process(docker_cmd_contains='--id {}\'\"'.format(t_id)):
|
||||||
self.handle_task_termination(task_id=t_id, exit_code=0, stop_reason=TaskStopReason.stopped)
|
self.handle_task_termination(task_id=t_id, exit_code=0, stop_reason=TaskStopReason.stopped)
|
||||||
|
else:
|
||||||
|
# if we are here, just kill all sub processes
|
||||||
|
kill_all_child_processes()
|
||||||
|
for t_id in set(list_task_gpus_ids.values()):
|
||||||
|
# check if Task is running,
|
||||||
|
task_info = get_task(
|
||||||
|
self._session, t_id, only_fields=["status"]
|
||||||
|
)
|
||||||
|
# this is a bit risky we might have rerun it again after it already completed
|
||||||
|
# basically we are not removing completed tasks from the list, hence the issue
|
||||||
|
if str(task_info.status) == "in_progress":
|
||||||
|
self.handle_task_termination(
|
||||||
|
task_id=t_id, exit_code=0, stop_reason=TaskStopReason.stopped)
|
||||||
|
|
||||||
def _dynamic_gpu_get_available(self, gpu_indexes):
|
def _dynamic_gpu_get_available(self, gpu_indexes):
|
||||||
# noinspection PyBroadException
|
# noinspection PyBroadException
|
||||||
@ -2060,6 +2071,7 @@ class Worker(ServiceCommandSection):
|
|||||||
try:
|
try:
|
||||||
if stop_reason == TaskStopReason.stopped:
|
if stop_reason == TaskStopReason.stopped:
|
||||||
self.log("Stopping - tasks.stop was called for task")
|
self.log("Stopping - tasks.stop was called for task")
|
||||||
|
self.send_logs(task_id, ["Process aborted by user"])
|
||||||
self._session.api_client.tasks.stopped(
|
self._session.api_client.tasks.stopped(
|
||||||
task=task_id,
|
task=task_id,
|
||||||
status_reason="task was stopped by tasks.stop",
|
status_reason="task was stopped by tasks.stop",
|
||||||
@ -2109,6 +2121,7 @@ class Worker(ServiceCommandSection):
|
|||||||
|
|
||||||
if exit_code == COMMAND_SUCCESS:
|
if exit_code == COMMAND_SUCCESS:
|
||||||
self.log("Task success: completing")
|
self.log("Task success: completing")
|
||||||
|
self.send_logs(task_id, ["Process completed successfully"])
|
||||||
self._session.api_client.tasks.completed(
|
self._session.api_client.tasks.completed(
|
||||||
task=task_id,
|
task=task_id,
|
||||||
status_reason="worker execution done",
|
status_reason="worker execution done",
|
||||||
@ -2116,6 +2129,7 @@ class Worker(ServiceCommandSection):
|
|||||||
)
|
)
|
||||||
elif exit_code in (ExitStatus.interrupted, 256+ExitStatus.interrupted):
|
elif exit_code in (ExitStatus.interrupted, 256+ExitStatus.interrupted):
|
||||||
self.log("Task interrupted: stopping")
|
self.log("Task interrupted: stopping")
|
||||||
|
self.send_logs(task_id, ["Process terminated by user"])
|
||||||
self._session.api_client.tasks.stopped(
|
self._session.api_client.tasks.stopped(
|
||||||
task=task_id,
|
task=task_id,
|
||||||
status_reason="user abort",
|
status_reason="user abort",
|
||||||
@ -2123,6 +2137,7 @@ class Worker(ServiceCommandSection):
|
|||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
self.log("Task failure: setting status to 'failed'")
|
self.log("Task failure: setting status to 'failed'")
|
||||||
|
self.send_logs(task_id, ["Process failed, exit code {}".format(exit_code)])
|
||||||
self._session.api_client.tasks.failed(
|
self._session.api_client.tasks.failed(
|
||||||
task=task_id,
|
task=task_id,
|
||||||
status_reason="worker execution exit code {}".format(exit_code),
|
status_reason="worker execution exit code {}".format(exit_code),
|
||||||
|
Loading…
Reference in New Issue
Block a user