From e3864afba49a1bcaf9475d4e8c36988d5e8d748a Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Thu, 28 Jul 2022 18:45:18 +0300 Subject: [PATCH] Add Task.get_num_enqueued_tasks() --- clearml/backend_interface/util.py | 8 ++++++++ clearml/task.py | 25 +++++++++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/clearml/backend_interface/util.py b/clearml/backend_interface/util.py index 32f93c41..da5d116e 100644 --- a/clearml/backend_interface/util.py +++ b/clearml/backend_interface/util.py @@ -133,6 +133,14 @@ def get_queue_id(session, queue): return None +def get_num_enqueued_tasks(session, queue_id): + # type: ('Session', str) -> Optional[int] # noqa: F821 + res = session.send(queues.GetNumEntriesRequest(queue=queue_id)) + if res and res.response and res.response.num is not None: + return res.response.num + return None + + # Hack for supporting windows def get_epoch_beginning_of_time(timezone_info=None): return datetime(1970, 1, 1).replace(tzinfo=timezone_info if timezone_info else utc_timezone) diff --git a/clearml/task.py b/clearml/task.py index d2112874..3f4da773 100644 --- a/clearml/task.py +++ b/clearml/task.py @@ -42,6 +42,7 @@ from .backend_interface.util import ( make_message, mutually_exclusive, get_queue_id, + get_num_enqueued_tasks, get_or_create_project, ) from .binding.absl_bind import PatchAbsl @@ -1260,6 +1261,30 @@ class Task(_Task): resp = res.response return resp + @classmethod + def get_num_enqueued_tasks(cls, queue_name=None, queue_id=None): + # type: (Optional[str], Optional[str]) -> int + """ + Get the number of tasks enqueued in a given queue. + + :param queue_name: The name of the queue. If not specified, then ``queue_id`` must be specified + :param queue_id: The id of the queue. If not specified, then ``queue_name`` must be specified + + :return: The number of tasks enqueued in the given queue + """ + if not Session.check_min_api_server_version("2.20"): + raise ValueError("You version of clearml-server does not support the 'queues.get_num_entries' endpoint") + mutually_exclusive(queue_name=queue_name, queue_id=queue_id) + session = cls._get_default_session() + if not queue_id: + queue_id = get_queue_id(session, queue_name) + if not queue_id: + raise ValueError('Could not find queue named "{}"'.format(queue_name)) + result = get_num_enqueued_tasks(session, queue_id) + if result is None: + raise ValueError("Could not query the number of enqueued tasks in queue with ID {}".format(queue_id)) + return result + @classmethod def dequeue(cls, task): # type: (Union[Task, str]) -> Any