From 5bb257c46cc472be4e37c1b752a7ef2333552edf Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Sat, 9 May 2020 19:50:53 +0300 Subject: [PATCH] Add daemon --create-queue to automatically create a queue and use it if queue name doesn't exist in server --- trains_agent/commands/worker.py | 31 +++++++++++++++++++++++-------- trains_agent/interface/worker.py | 4 ++++ 2 files changed, 27 insertions(+), 8 deletions(-) diff --git a/trains_agent/commands/worker.py b/trains_agent/commands/worker.py index b93ffa9..e281764 100644 --- a/trains_agent/commands/worker.py +++ b/trains_agent/commands/worker.py @@ -665,8 +665,11 @@ class Worker(ServiceCommandSection): self._session.print_configuration() - @resolve_names def daemon(self, queues, log_level, foreground=False, docker=False, detached=False, **kwargs): + # if we do not need to create queues, make sure they are valid + # match previous behaviour when we validated queue names before everything else + queues = self._resolve_queue_names(queues, create_if_missing=kwargs.get('create_queue', False)) + # make sure we only have a single instance, # also make sure we set worker_id properly and cache folders self._singleton() @@ -681,13 +684,6 @@ class Worker(ServiceCommandSection): self.log.debug("starting resource monitor thread") print("Worker \"{}\" - ".format(self.worker_id), end='') - if queues: - queues = return_list(queues) - queues = [self._resolve_name(q, "queues") for q in queues] - else: - default_queue = self._session.send_api(queues_api.GetDefaultRequest()) - queues = [default_queue.id] - queues_info = [ self._session.send_api( queues_api.GetByIdRequest(queue) @@ -2220,6 +2216,25 @@ class Worker(ServiceCommandSection): # update folders based on free slot self._session.create_cache_folders(slot_index=worker_slot) + def _resolve_queue_names(self, queues, create_if_missing=False): + if not queues: + default_queue = self._session.send_api(queues_api.GetDefaultRequest()) + return [default_queue.id] + + queues = return_list(queues) + if not create_if_missing: + return [self._resolve_name(q.name, "queues") for q in queues] + + queue_ids = [] + for q in queues: + try: + q_id = self._resolve_name(q.name, "queues") + except: + self._session.send_api(queues_api.CreateRequest(name=q.name)) + q_id = self._resolve_name(q.name, "queues") + queue_ids.append(q_id) + return queue_ids + if __name__ == "__main__": pass diff --git a/trains_agent/interface/worker.py b/trains_agent/interface/worker.py index aa4be70..52477b9 100644 --- a/trains_agent/interface/worker.py +++ b/trains_agent/interface/worker.py @@ -76,6 +76,10 @@ DAEMON_ARGS = dict({ 'help': 'Launch multiple long-term docker services. Implies docker & cpu-only flags.', 'action': 'store_true', }, + '--create-queue': { + 'help': 'Create requested queue if it does not exist already.', + 'action': 'store_true', + }, '--detached': { 'help': 'Detached mode, run agent in the background', 'action': 'store_true',