Support queue_name in tasks enqueue

This commit is contained in:
allegroai 2022-05-17 16:04:34 +03:00
parent 6c2dcb5c8a
commit b90165b4e4
6 changed files with 45 additions and 3 deletions

View File

@ -96,6 +96,7 @@ class UpdateRequest(TaskUpdateRequest):
class EnqueueRequest(UpdateRequest): class EnqueueRequest(UpdateRequest):
queue = StringField() queue = StringField()
queue_name = StringField()
class DeleteRequest(UpdateRequest): class DeleteRequest(UpdateRequest):
@ -262,6 +263,7 @@ class StopManyRequest(TaskBatchRequest):
class EnqueueManyRequest(TaskBatchRequest): class EnqueueManyRequest(TaskBatchRequest):
queue = StringField() queue = StringField()
queue_name = StringField()
validate_tasks = BoolField(default=False) validate_tasks = BoolField(default=False)

View File

@ -50,6 +50,18 @@ class QueueBLL(object):
queue.save() queue.save()
return queue return queue
def get_by_name(
self,
company_id: str,
queue_name: str,
only: Optional[Sequence[str]] = None,
) -> Queue:
qs = Queue.objects(name=queue_name, company=company_id)
if only:
qs = qs.only(*only)
return qs.first()
def get_by_id( def get_by_id(
self, company_id: str, queue_id: str, only: Optional[Sequence[str]] = None self, company_id: str, queue_id: str, only: Optional[Sequence[str]] = None
) -> Queue: ) -> Queue:

View File

@ -108,9 +108,23 @@ def enqueue_task(
queue_id: str, queue_id: str,
status_message: str, status_message: str,
status_reason: str, status_reason: str,
queue_name: str = None,
validate: bool = False, validate: bool = False,
force: bool = False, force: bool = False,
) -> Tuple[int, dict]: ) -> Tuple[int, dict]:
if queue_id and queue_name:
raise errors.bad_request.ValidationError(
"Either queue id or queue name should be provided"
)
if queue_name:
queue = queue_bll.get_by_name(
company_id=company_id, queue_name=queue_name, only=("id",)
)
if not queue:
queue = queue_bll.create(company_id=company_id, name=queue_name)
queue_id = queue.id
if not queue_id: if not queue_id:
# try to get default queue # try to get default queue
queue_id = queue_bll.get_default(company_id).id queue_id = queue_bll.get_default(company_id).id

View File

@ -1898,7 +1898,7 @@ Fails if the following parameters in the task were not filled:
] ]
properties { properties {
queue { queue {
description: "Queue id. If not provided, task is added to the default queue." description: "Queue id. If not provided and no queue name is passed then task is added to the default queue."
type: string type: string
} }
} }
@ -1914,6 +1914,12 @@ Fails if the following parameters in the task were not filled:
} }
} }
} }
"2.19": ${enqueue."1.5"} {
request.properties.queue_name {
description: The name of the queue. If the queue does not exist then it is auto-created. Cannot be used together with the queue id
type: string
}
}
} }
enqueue_many { enqueue_many {
"2.13": ${_definitions.change_many_request} { "2.13": ${_definitions.change_many_request} {
@ -1922,7 +1928,7 @@ enqueue_many {
properties { properties {
ids.description: "IDs of the tasks to enqueue" ids.description: "IDs of the tasks to enqueue"
queue { queue {
description: "Queue id. If not provided, tasks are added to the default queue." description: "Queue id. If not provided and no queue name is passed then tasks are added to the default queue."
type: string type: string
} }
validate_tasks { validate_tasks {
@ -1941,6 +1947,12 @@ enqueue_many {
} }
} }
} }
"2.19": ${enqueue_many."2.13"} {
request.properties.queue_name {
description: The name of the queue. If the queue does not exist then it is auto-created. Cannot be used together with the queue id
type: string
}
}
} }
dequeue { dequeue {
"1.5" { "1.5" {

View File

@ -861,6 +861,7 @@ def enqueue(call: APICall, company_id, request: EnqueueRequest):
queue_id=request.queue, queue_id=request.queue,
status_message=request.status_message, status_message=request.status_message,
status_reason=request.status_reason, status_reason=request.status_reason,
queue_name=request.queue_name,
force=request.force, force=request.force,
) )
call.result.data_model = EnqueueResponse(queued=queued, **res) call.result.data_model = EnqueueResponse(queued=queued, **res)
@ -879,6 +880,7 @@ def enqueue_many(call: APICall, company_id, request: EnqueueManyRequest):
queue_id=request.queue, queue_id=request.queue,
status_message=request.status_message, status_message=request.status_message,
status_reason=request.status_reason, status_reason=request.status_reason,
queue_name=request.queue_name,
validate=request.validate_tasks, validate=request.validate_tasks,
), ),
ids=request.ids, ids=request.ids,

View File

@ -20,7 +20,7 @@ class TestBatchOperations(TestService):
ids = [*tasks, missing_id] ids = [*tasks, missing_id]
# enqueue # enqueue
res = self.api.tasks.enqueue_many(ids=ids) res = self.api.tasks.enqueue_many(ids=ids, queue_name="test")
self._assert_succeeded(res, tasks) self._assert_succeeded(res, tasks)
self._assert_failed(res, [missing_id]) self._assert_failed(res, [missing_id])
data = self.api.tasks.get_all_ex(id=ids).tasks data = self.api.tasks.get_all_ex(id=ids).tasks