diff --git a/clearml_agent/commands/worker.py b/clearml_agent/commands/worker.py index cf3b307..9f6d844 100644 --- a/clearml_agent/commands/worker.py +++ b/clearml_agent/commands/worker.py @@ -658,132 +658,145 @@ class Worker(ServiceCommandSection): # multi instance support self._services_mode = True - while True: - queue_tags = None - runtime_props = None + # last 64 tasks + list_task_ids = [] + try: + while True: + queue_tags = None + runtime_props = None - if max_num_instances and max_num_instances > 0: - # make sure we do not have too many instances to run - if len(Singleton.get_running_pids()) >= max_num_instances: - if self._daemon_foreground or worker_params.debug: - print("Reached max number of services, sleeping for {:.1f} seconds".format( - self._polling_interval)) - sleep(self._polling_interval) - continue - - # update available gpus - if gpu_queues: - available_gpus = self._dynamic_gpu_get_available(gpu_indexes) - # if something went wrong or we have no free gpus - # start over from the highest priority queue - if not available_gpus: - if self._daemon_foreground or worker_params.debug: - print("All GPUs allocated, sleeping for {:.1f} seconds".format(self._polling_interval)) - sleep(self._polling_interval) - continue - - # iterate over queues (priority style, queues[0] is highest) - for queue in queues: - - if queue_tags is None or runtime_props is None: - queue_tags, runtime_props = self.get_worker_properties(queues) - - if not self.should_be_currently_active(queue_tags[queue], runtime_props): - continue - - if gpu_queues: - # peek into queue - # get next task in queue - try: - response = self._session.send_api(queues_api.GetByIdRequest(queue=queue)) - except Exception: - # if something went wrong start over from the highest priority queue - break - if not len(response.queue.entries): - continue - # check if we have enough available gpus - if gpu_queues[queue] > len(available_gpus): - # not enough available_gpus, we should sleep and start over + if max_num_instances and max_num_instances > 0: + # make sure we do not have too many instances to run + if len(Singleton.get_running_pids()) >= max_num_instances: if self._daemon_foreground or worker_params.debug: - print("Not enough free GPUs {}/{}, sleeping for {:.1f} seconds".format( - len(available_gpus), gpu_queues[queue], self._polling_interval)) + print("Reached max number of services, sleeping for {:.1f} seconds".format( + self._polling_interval)) sleep(self._polling_interval) - break - - # get next task in queue - try: - response = self._session.send_api( - queues_api.GetNextTaskRequest(queue=queue) - ) - except Exception as e: - print( - "Warning: Could not access task queue [{}], error: {}".format( - queue, e - ) - ) - continue - else: - try: - task_id = response.entry.task - except AttributeError: - if self._daemon_foreground or worker_params.debug: - print("No tasks in queue {}".format(queue)) continue - # clear output log if we start a new Task - if not worker_params.debug and self._redirected_stdout_file_no is not None and \ - self._redirected_stdout_file_no > 2: - # noinspection PyBroadException - try: - os.lseek(self._redirected_stdout_file_no, 0, 0) - os.ftruncate(self._redirected_stdout_file_no, 0) - except: - pass + # update available gpus + if gpu_queues: + available_gpus = self._dynamic_gpu_get_available(gpu_indexes) + # if something went wrong or we have no free gpus + # start over from the highest priority queue + if not available_gpus: + if self._daemon_foreground or worker_params.debug: + print("All GPUs allocated, sleeping for {:.1f} seconds".format(self._polling_interval)) + sleep(self._polling_interval) + continue - self.report_monitor(ResourceMonitor.StatusReport(queues=queues, queue=queue, task=task_id)) + # iterate over queues (priority style, queues[0] is highest) + for queue in queues: - org_gpus = os.environ.get('NVIDIA_VISIBLE_DEVICES') - worker_id = self.worker_id - if gpu_queues and gpu_queues.get(queue): - # pick the first available GPUs - gpus = available_gpus[:gpu_queues.get(queue)] - available_gpus = available_gpus[gpu_queues.get(queue):] - self.set_runtime_properties( - key='available_gpus', value=','.join(str(g) for g in available_gpus)) - os.environ['CUDA_VISIBLE_DEVICES'] = \ - os.environ['NVIDIA_VISIBLE_DEVICES'] = ','.join(str(g) for g in gpus) - self.worker_id = ':'.join(self.worker_id.split(':')[:-1] + ['gpu'+','.join(str(g) for g in gpus)]) + if queue_tags is None or runtime_props is None: + queue_tags, runtime_props = self.get_worker_properties(queues) - self.send_logs( - task_id=task_id, - lines=["task {} pulled from {} by worker {}\n".format(task_id, queue, self.worker_id)], - level="INFO", - ) - - self.run_one_task(queue, task_id, worker_params) + if not self.should_be_currently_active(queue_tags[queue], runtime_props): + continue if gpu_queues: - self.worker_id = worker_id - os.environ['CUDA_VISIBLE_DEVICES'] = \ - os.environ['NVIDIA_VISIBLE_DEVICES'] = org_gpus + # peek into queue + # get next task in queue + try: + response = self._session.send_api(queues_api.GetByIdRequest(queue=queue)) + except Exception: + # if something went wrong start over from the highest priority queue + break + if not len(response.queue.entries): + continue + # check if we have enough available gpus + if gpu_queues[queue] > len(available_gpus): + # not enough available_gpus, we should sleep and start over + if self._daemon_foreground or worker_params.debug: + print("Not enough free GPUs {}/{}, sleeping for {:.1f} seconds".format( + len(available_gpus), gpu_queues[queue], self._polling_interval)) + sleep(self._polling_interval) + break - self.report_monitor(ResourceMonitor.StatusReport(queues=self.queues)) + # get next task in queue + try: + response = self._session.send_api( + queues_api.GetNextTaskRequest(queue=queue) + ) + except Exception as e: + print( + "Warning: Could not access task queue [{}], error: {}".format( + queue, e + ) + ) + continue + else: + try: + task_id = response.entry.task + except AttributeError: + if self._daemon_foreground or worker_params.debug: + print("No tasks in queue {}".format(queue)) + continue - queue_tags = None - runtime_props = None + # clear output log if we start a new Task + if not worker_params.debug and self._redirected_stdout_file_no is not None and \ + self._redirected_stdout_file_no > 2: + # noinspection PyBroadException + try: + os.lseek(self._redirected_stdout_file_no, 0, 0) + os.ftruncate(self._redirected_stdout_file_no, 0) + except: + pass - # if we are using priority start pulling from the first always, - # if we are doing round robin, pull from the next one - if priority_order: - break - else: - # sleep and retry polling - if self._daemon_foreground or worker_params.debug: - print("No tasks in Queues, sleeping for {:.1f} seconds".format(self._polling_interval)) - sleep(self._polling_interval) + self.report_monitor(ResourceMonitor.StatusReport(queues=queues, queue=queue, task=task_id)) - if self._session.config["agent.reload_config"]: - self.reload_config() + org_gpus = os.environ.get('NVIDIA_VISIBLE_DEVICES') + worker_id = self.worker_id + if gpu_queues and gpu_queues.get(queue): + # pick the first available GPUs + gpus = available_gpus[:gpu_queues.get(queue)] + available_gpus = available_gpus[gpu_queues.get(queue):] + self.set_runtime_properties( + key='available_gpus', value=','.join(str(g) for g in available_gpus)) + os.environ['CUDA_VISIBLE_DEVICES'] = \ + os.environ['NVIDIA_VISIBLE_DEVICES'] = ','.join(str(g) for g in gpus) + self.worker_id = ':'.join(self.worker_id.split(':')[:-1] + ['gpu'+','.join(str(g) for g in gpus)]) + + self.send_logs( + task_id=task_id, + lines=["task {} pulled from {} by worker {}\n".format(task_id, queue, self.worker_id)], + level="INFO", + ) + + list_task_ids.append(task_id) + if len(list_task_ids) > 64: + list_task_ids.pop(0) + + self.run_one_task(queue, task_id, worker_params) + + if gpu_queues: + self.worker_id = worker_id + os.environ['CUDA_VISIBLE_DEVICES'] = \ + os.environ['NVIDIA_VISIBLE_DEVICES'] = org_gpus + + self.report_monitor(ResourceMonitor.StatusReport(queues=self.queues)) + + queue_tags = None + runtime_props = None + + # if we are using priority start pulling from the first always, + # if we are doing round robin, pull from the next one + if priority_order: + break + else: + # sleep and retry polling + if self._daemon_foreground or worker_params.debug: + print("No tasks in Queues, sleeping for {:.1f} seconds".format(self._polling_interval)) + sleep(self._polling_interval) + + if self._session.config["agent.reload_config"]: + self.reload_config() + finally: + # shutdown all active docker runs + if self.docker_image_func: + for t_id in reversed(list_task_ids): + if shutdown_docker_process(docker_cmd_contains='--id {}\'\"'.format(t_id)): + self.handle_task_termination(task_id=t_id, exit_code=0, stop_reason=TaskStopReason.stopped) def _dynamic_gpu_get_available(self, gpu_indexes): # noinspection PyBroadException @@ -964,7 +977,7 @@ class Worker(ServiceCommandSection): def daemon(self, queues, log_level, foreground=False, docker=False, detached=False, order_fairness=False, **kwargs): # check that we have docker command if we need it - if docker is not False and not check_if_command_exists("docker"): + if docker not in (False, None) and not check_if_command_exists("docker"): raise ValueError("Running in Docker mode, 'docker' command was not found") self._standalone_mode = kwargs.get('standalone_mode', False) @@ -2938,7 +2951,6 @@ class Worker(ServiceCommandSection): def _kill_daemon(self, dynamic_gpus=False): worker_id, worker_name = self._generate_worker_id_name(dynamic_gpus=dynamic_gpus) - success = False # Iterate over all running process for pid, uid, slot, file in sorted(Singleton.get_running_pids(), key=lambda x: x[1] or ''): @@ -2946,24 +2958,20 @@ class Worker(ServiceCommandSection): continue # if dynamic gpus kill all children - if dynamic_gpus and (uid.startswith('{}:gpu'.format(worker_name)) or uid == worker_id): + if dynamic_gpus and uid == worker_id: print('Terminating clearml-agent worker_id={} pid={}'.format(uid, pid)) - if not terminate_process(pid, timeout=10): + if not terminate_process(pid, timeout=120): warning('Could not terminate process pid={}'.format(pid)) - success = True - continue + return True # wither we have a match for the worker_id or we just pick the first one, and kill it. if (worker_id and uid == worker_id) or (not worker_id and uid.startswith('{}:'.format(worker_name))): # this is us kill it print('Terminating clearml-agent worker_id={} pid={}'.format(uid, pid)) - if not terminate_process(pid, timeout=10): + timeout = 120 if uid.startswith('{}:dgpu'.format(worker_name)) else 10 + if not terminate_process(pid, timeout=timeout): error('Could not terminate process pid={}'.format(pid)) - success = True - break - - if success: - return True + return True print('Could not find a running clearml-agent instance with worker_name={} worker_id={}'.format( worker_name, worker_id)) diff --git a/clearml_agent/helper/process.py b/clearml_agent/helper/process.py index 821b225..b8f4ef9 100644 --- a/clearml_agent/helper/process.py +++ b/clearml_agent/helper/process.py @@ -103,9 +103,10 @@ def shutdown_docker_process(docker_cmd_contains=None, docker_id=None): docker_id = get_docker_id(docker_cmd_contains=docker_cmd_contains) if docker_id: # we found our docker, stop it - get_bash_output(cmd='docker stop -t 1 {}'.format(docker_id)) + return get_bash_output(cmd='docker stop -t 1 {}'.format(docker_id)) except Exception: pass + return None def commit_docker(container_name, docker_cmd_contains=None, docker_id=None, apply_change=None):