mirror of
https://github.com/clearml/clearml
synced 2025-01-31 09:07:00 +00:00
Add Task.get_num_enqueued_tasks()
This commit is contained in:
parent
68b57e3477
commit
e3864afba4
@ -133,6 +133,14 @@ def get_queue_id(session, queue):
|
||||
return None
|
||||
|
||||
|
||||
def get_num_enqueued_tasks(session, queue_id):
|
||||
# type: ('Session', str) -> Optional[int] # noqa: F821
|
||||
res = session.send(queues.GetNumEntriesRequest(queue=queue_id))
|
||||
if res and res.response and res.response.num is not None:
|
||||
return res.response.num
|
||||
return None
|
||||
|
||||
|
||||
# Hack for supporting windows
|
||||
def get_epoch_beginning_of_time(timezone_info=None):
|
||||
return datetime(1970, 1, 1).replace(tzinfo=timezone_info if timezone_info else utc_timezone)
|
||||
|
@ -42,6 +42,7 @@ from .backend_interface.util import (
|
||||
make_message,
|
||||
mutually_exclusive,
|
||||
get_queue_id,
|
||||
get_num_enqueued_tasks,
|
||||
get_or_create_project,
|
||||
)
|
||||
from .binding.absl_bind import PatchAbsl
|
||||
@ -1260,6 +1261,30 @@ class Task(_Task):
|
||||
resp = res.response
|
||||
return resp
|
||||
|
||||
@classmethod
|
||||
def get_num_enqueued_tasks(cls, queue_name=None, queue_id=None):
|
||||
# type: (Optional[str], Optional[str]) -> int
|
||||
"""
|
||||
Get the number of tasks enqueued in a given queue.
|
||||
|
||||
:param queue_name: The name of the queue. If not specified, then ``queue_id`` must be specified
|
||||
:param queue_id: The id of the queue. If not specified, then ``queue_name`` must be specified
|
||||
|
||||
:return: The number of tasks enqueued in the given queue
|
||||
"""
|
||||
if not Session.check_min_api_server_version("2.20"):
|
||||
raise ValueError("You version of clearml-server does not support the 'queues.get_num_entries' endpoint")
|
||||
mutually_exclusive(queue_name=queue_name, queue_id=queue_id)
|
||||
session = cls._get_default_session()
|
||||
if not queue_id:
|
||||
queue_id = get_queue_id(session, queue_name)
|
||||
if not queue_id:
|
||||
raise ValueError('Could not find queue named "{}"'.format(queue_name))
|
||||
result = get_num_enqueued_tasks(session, queue_id)
|
||||
if result is None:
|
||||
raise ValueError("Could not query the number of enqueued tasks in queue with ID {}".format(queue_id))
|
||||
return result
|
||||
|
||||
@classmethod
|
||||
def dequeue(cls, task):
|
||||
# type: (Union[Task, str]) -> Any
|
||||
|
Loading…
Reference in New Issue
Block a user