mirror of
https://github.com/clearml/clearml-agent
synced 2025-03-03 10:42:05 +00:00
Add --stop support for dynamic gpus
Fix --stop mark tasks as aborted (not failed as before)
This commit is contained in:
parent
81edd2860f
commit
0e4b99351f
@ -213,6 +213,7 @@ class TaskStopReason(object):
|
|||||||
stopped = 1 # type: TaskStopReason
|
stopped = 1 # type: TaskStopReason
|
||||||
reset = 2 # type: TaskStopReason
|
reset = 2 # type: TaskStopReason
|
||||||
status_changed = 3 # type: TaskStopReason
|
status_changed = 3 # type: TaskStopReason
|
||||||
|
exception = 4 # type: TaskStopReason
|
||||||
|
|
||||||
|
|
||||||
def get_task(session, task_id, *args, **kwargs):
|
def get_task(session, task_id, *args, **kwargs):
|
||||||
@ -570,7 +571,7 @@ class Worker(ServiceCommandSection):
|
|||||||
task_id=task_id,
|
task_id=task_id,
|
||||||
)
|
)
|
||||||
stop_signal_status = TaskStopSignal.default
|
stop_signal_status = TaskStopSignal.default
|
||||||
status = ExitStatus.failure
|
status = ExitStatus.interrupted
|
||||||
try:
|
try:
|
||||||
# set WORKER ID
|
# set WORKER ID
|
||||||
ENV_WORKER_ID.set(worker_id)
|
ENV_WORKER_ID.set(worker_id)
|
||||||
@ -989,7 +990,7 @@ class Worker(ServiceCommandSection):
|
|||||||
# We are not running a daemon we are killing one.
|
# We are not running a daemon we are killing one.
|
||||||
# find the pid send termination signal and leave
|
# find the pid send termination signal and leave
|
||||||
if kwargs.get('stop', False):
|
if kwargs.get('stop', False):
|
||||||
return 1 if not self._kill_daemon() else 0
|
return 1 if not self._kill_daemon(dynamic_gpus=dynamic_gpus) else 0
|
||||||
|
|
||||||
queues_info = [
|
queues_info = [
|
||||||
q.to_dict()
|
q.to_dict()
|
||||||
@ -1311,7 +1312,7 @@ class Worker(ServiceCommandSection):
|
|||||||
|
|
||||||
except subprocess.CalledProcessError as ex:
|
except subprocess.CalledProcessError as ex:
|
||||||
# non zero return code
|
# non zero return code
|
||||||
stop_reason = 'Exception occurred'
|
stop_reason = TaskStopReason.exception
|
||||||
status = ex.returncode
|
status = ex.returncode
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
# so someone else will catch us
|
# so someone else will catch us
|
||||||
@ -1323,7 +1324,7 @@ class Worker(ServiceCommandSection):
|
|||||||
if stderr_path:
|
if stderr_path:
|
||||||
printed_lines, stderr_pos_count = _print_file(stderr_path, stderr_pos_count)
|
printed_lines, stderr_pos_count = _print_file(stderr_path, stderr_pos_count)
|
||||||
stderr_line_count += self.send_logs(task_id, printed_lines)
|
stderr_line_count += self.send_logs(task_id, printed_lines)
|
||||||
stop_reason = 'Exception occurred'
|
stop_reason = TaskStopReason.exception
|
||||||
status = -1
|
status = -1
|
||||||
|
|
||||||
# if running in services mode, keep the file open
|
# if running in services mode, keep the file open
|
||||||
@ -2096,7 +2097,7 @@ class Worker(ServiceCommandSection):
|
|||||||
status_reason="worker execution done",
|
status_reason="worker execution done",
|
||||||
status_message=self._task_status_change_message,
|
status_message=self._task_status_change_message,
|
||||||
)
|
)
|
||||||
elif exit_code == ExitStatus.interrupted:
|
elif exit_code in (ExitStatus.interrupted, 256+ExitStatus.interrupted):
|
||||||
self.log("Task interrupted: stopping")
|
self.log("Task interrupted: stopping")
|
||||||
self._session.api_client.tasks.stopped(
|
self._session.api_client.tasks.stopped(
|
||||||
task=task_id,
|
task=task_id,
|
||||||
@ -2932,19 +2933,34 @@ class Worker(ServiceCommandSection):
|
|||||||
return command, script_dir
|
return command, script_dir
|
||||||
|
|
||||||
def _kill_daemon(self, dynamic_gpus=False):
|
def _kill_daemon(self, dynamic_gpus=False):
|
||||||
## TODO kill dynamic agents with children
|
|
||||||
worker_id, worker_name = self._generate_worker_id_name(dynamic_gpus=dynamic_gpus)
|
worker_id, worker_name = self._generate_worker_id_name(dynamic_gpus=dynamic_gpus)
|
||||||
|
success = False
|
||||||
|
|
||||||
# Iterate over all running process
|
# Iterate over all running process
|
||||||
for pid, uid, slot, file in sorted(Singleton.get_running_pids(), key=lambda x: x[1] or ''):
|
for pid, uid, slot, file in sorted(Singleton.get_running_pids(), key=lambda x: x[1] or ''):
|
||||||
# wither we have a match for the worker_id or we just pick the first one
|
if pid < 0 or uid is None:
|
||||||
if pid >= 0 and uid is not None and (
|
continue
|
||||||
(worker_id and uid == worker_id) or
|
|
||||||
(not worker_id and uid.startswith('{}:'.format(worker_name)))):
|
# if dynamic gpus kill all children
|
||||||
|
if dynamic_gpus and (uid.startswith('{}:gpu'.format(worker_name)) or uid == worker_id):
|
||||||
|
print('Terminating clearml-agent worker_id={} pid={}'.format(uid, pid))
|
||||||
|
if not terminate_process(pid, timeout=10):
|
||||||
|
warning('Could not terminate process pid={}'.format(pid))
|
||||||
|
success = True
|
||||||
|
continue
|
||||||
|
|
||||||
|
# wither we have a match for the worker_id or we just pick the first one, and kill it.
|
||||||
|
if (worker_id and uid == worker_id) or (not worker_id and uid.startswith('{}:'.format(worker_name))):
|
||||||
# this is us kill it
|
# this is us kill it
|
||||||
print('Terminating clearml-agent worker_id={} pid={}'.format(uid, pid))
|
print('Terminating clearml-agent worker_id={} pid={}'.format(uid, pid))
|
||||||
if not terminate_process(pid, timeout=10):
|
if not terminate_process(pid, timeout=10):
|
||||||
error('Could not terminate process pid={}'.format(pid))
|
error('Could not terminate process pid={}'.format(pid))
|
||||||
return True
|
success = True
|
||||||
|
break
|
||||||
|
|
||||||
|
if success:
|
||||||
|
return True
|
||||||
|
|
||||||
print('Could not find a running clearml-agent instance with worker_name={} worker_id={}'.format(
|
print('Could not find a running clearml-agent instance with worker_name={} worker_id={}'.format(
|
||||||
worker_name, worker_id))
|
worker_name, worker_id))
|
||||||
return False
|
return False
|
||||||
|
Loading…
Reference in New Issue
Block a user