diff --git a/clearml_agent/commands/worker.py b/clearml_agent/commands/worker.py index edfd86e..0e93183 100644 --- a/clearml_agent/commands/worker.py +++ b/clearml_agent/commands/worker.py @@ -213,6 +213,7 @@ class TaskStopReason(object): stopped = 1 # type: TaskStopReason reset = 2 # type: TaskStopReason status_changed = 3 # type: TaskStopReason + exception = 4 # type: TaskStopReason def get_task(session, task_id, *args, **kwargs): @@ -570,7 +571,7 @@ class Worker(ServiceCommandSection): task_id=task_id, ) stop_signal_status = TaskStopSignal.default - status = ExitStatus.failure + status = ExitStatus.interrupted try: # set WORKER ID ENV_WORKER_ID.set(worker_id) @@ -989,7 +990,7 @@ class Worker(ServiceCommandSection): # We are not running a daemon we are killing one. # find the pid send termination signal and leave if kwargs.get('stop', False): - return 1 if not self._kill_daemon() else 0 + return 1 if not self._kill_daemon(dynamic_gpus=dynamic_gpus) else 0 queues_info = [ q.to_dict() @@ -1311,7 +1312,7 @@ class Worker(ServiceCommandSection): except subprocess.CalledProcessError as ex: # non zero return code - stop_reason = 'Exception occurred' + stop_reason = TaskStopReason.exception status = ex.returncode except KeyboardInterrupt: # so someone else will catch us @@ -1323,7 +1324,7 @@ class Worker(ServiceCommandSection): if stderr_path: printed_lines, stderr_pos_count = _print_file(stderr_path, stderr_pos_count) stderr_line_count += self.send_logs(task_id, printed_lines) - stop_reason = 'Exception occurred' + stop_reason = TaskStopReason.exception status = -1 # if running in services mode, keep the file open @@ -2096,7 +2097,7 @@ class Worker(ServiceCommandSection): status_reason="worker execution done", status_message=self._task_status_change_message, ) - elif exit_code == ExitStatus.interrupted: + elif exit_code in (ExitStatus.interrupted, 256+ExitStatus.interrupted): self.log("Task interrupted: stopping") self._session.api_client.tasks.stopped( task=task_id, @@ -2932,19 +2933,34 @@ class Worker(ServiceCommandSection): return command, script_dir def _kill_daemon(self, dynamic_gpus=False): - ## TODO kill dynamic agents with children worker_id, worker_name = self._generate_worker_id_name(dynamic_gpus=dynamic_gpus) + success = False + # Iterate over all running process for pid, uid, slot, file in sorted(Singleton.get_running_pids(), key=lambda x: x[1] or ''): - # wither we have a match for the worker_id or we just pick the first one - if pid >= 0 and uid is not None and ( - (worker_id and uid == worker_id) or - (not worker_id and uid.startswith('{}:'.format(worker_name)))): + if pid < 0 or uid is None: + continue + + # if dynamic gpus kill all children + if dynamic_gpus and (uid.startswith('{}:gpu'.format(worker_name)) or uid == worker_id): + print('Terminating clearml-agent worker_id={} pid={}'.format(uid, pid)) + if not terminate_process(pid, timeout=10): + warning('Could not terminate process pid={}'.format(pid)) + success = True + continue + + # wither we have a match for the worker_id or we just pick the first one, and kill it. + if (worker_id and uid == worker_id) or (not worker_id and uid.startswith('{}:'.format(worker_name))): # this is us kill it print('Terminating clearml-agent worker_id={} pid={}'.format(uid, pid)) if not terminate_process(pid, timeout=10): error('Could not terminate process pid={}'.format(pid)) - return True + success = True + break + + if success: + return True + print('Could not find a running clearml-agent instance with worker_name={} worker_id={}'.format( worker_name, worker_id)) return False