mirror of
https://github.com/clearml/clearml-server
synced 2025-06-26 23:15:47 +00:00
Add remove_from_all_queues parameter to tasks.dequeue/dequeue_many endpoints
This commit is contained in:
parent
3729afe014
commit
755cc803d9
@ -96,6 +96,10 @@ class UpdateRequest(TaskUpdateRequest):
|
|||||||
status_message = StringField(default="")
|
status_message = StringField(default="")
|
||||||
|
|
||||||
|
|
||||||
|
class DequeueRequest(UpdateRequest):
|
||||||
|
remove_from_all_queues = BoolField(default=False)
|
||||||
|
|
||||||
|
|
||||||
class EnqueueRequest(UpdateRequest):
|
class EnqueueRequest(UpdateRequest):
|
||||||
queue = StringField()
|
queue = StringField()
|
||||||
queue_name = StringField()
|
queue_name = StringField()
|
||||||
@ -274,6 +278,10 @@ class StopManyRequest(TaskBatchRequest):
|
|||||||
force = BoolField(default=False)
|
force = BoolField(default=False)
|
||||||
|
|
||||||
|
|
||||||
|
class DequeueManyRequest(TaskBatchRequest):
|
||||||
|
remove_from_all_queues = BoolField(default=False)
|
||||||
|
|
||||||
|
|
||||||
class EnqueueManyRequest(TaskBatchRequest):
|
class EnqueueManyRequest(TaskBatchRequest):
|
||||||
queue = StringField()
|
queue = StringField()
|
||||||
queue_name = StringField()
|
queue_name = StringField()
|
||||||
|
@ -30,6 +30,7 @@ from apiserver.database.model.task.task import (
|
|||||||
TaskModelTypes,
|
TaskModelTypes,
|
||||||
)
|
)
|
||||||
from apiserver.database.model import EntityVisibility
|
from apiserver.database.model import EntityVisibility
|
||||||
|
from apiserver.database.model.queue import Queue
|
||||||
from apiserver.database.utils import get_company_or_none_constraint, id as create_id
|
from apiserver.database.utils import get_company_or_none_constraint, id as create_id
|
||||||
from apiserver.es_factory import es_factory
|
from apiserver.es_factory import es_factory
|
||||||
from apiserver.redis_manager import redman
|
from apiserver.redis_manager import redman
|
||||||
@ -447,6 +448,12 @@ class TaskBLL:
|
|||||||
|
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def remove_task_from_all_queues(company_id: str, task: Task) -> int:
|
||||||
|
return Queue.objects(company=company_id, entries__task=task.id).update(
|
||||||
|
pull__entries__task=task.id, last_update=datetime.utcnow()
|
||||||
|
)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def dequeue_and_change_status(
|
def dequeue_and_change_status(
|
||||||
cls,
|
cls,
|
||||||
@ -455,6 +462,7 @@ class TaskBLL:
|
|||||||
user_id: str,
|
user_id: str,
|
||||||
status_message: str,
|
status_message: str,
|
||||||
status_reason: str,
|
status_reason: str,
|
||||||
|
remove_from_all_queues=False,
|
||||||
):
|
):
|
||||||
try:
|
try:
|
||||||
cls.dequeue(task, company_id, silent_fail=True)
|
cls.dequeue(task, company_id, silent_fail=True)
|
||||||
@ -462,6 +470,9 @@ class TaskBLL:
|
|||||||
# dequeue may fail if the queue was deleted
|
# dequeue may fail if the queue was deleted
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
if remove_from_all_queues:
|
||||||
|
cls.remove_task_from_all_queues(company_id=company_id, task=task)
|
||||||
|
|
||||||
return ChangeStatusRequest(
|
return ChangeStatusRequest(
|
||||||
task=task,
|
task=task,
|
||||||
new_status=task.enqueue_status or TaskStatus.created,
|
new_status=task.enqueue_status or TaskStatus.created,
|
||||||
|
@ -61,6 +61,7 @@ def archive_task(
|
|||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
status_message=status_message,
|
status_message=status_message,
|
||||||
status_reason=status_reason,
|
status_reason=status_reason,
|
||||||
|
remove_from_all_queues=True,
|
||||||
)
|
)
|
||||||
except APIError:
|
except APIError:
|
||||||
# dequeue may fail if the task was not enqueued
|
# dequeue may fail if the task was not enqueued
|
||||||
@ -99,6 +100,7 @@ def dequeue_task(
|
|||||||
user_id: str,
|
user_id: str,
|
||||||
status_message: str,
|
status_message: str,
|
||||||
status_reason: str,
|
status_reason: str,
|
||||||
|
remove_from_all_queues: bool = False,
|
||||||
) -> Tuple[int, dict]:
|
) -> Tuple[int, dict]:
|
||||||
query = dict(id=task_id, company=company_id)
|
query = dict(id=task_id, company=company_id)
|
||||||
task = Task.get_for_writing(**query)
|
task = Task.get_for_writing(**query)
|
||||||
@ -111,6 +113,7 @@ def dequeue_task(
|
|||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
status_message=status_message,
|
status_message=status_message,
|
||||||
status_reason=status_reason,
|
status_reason=status_reason,
|
||||||
|
remove_from_all_queues=remove_from_all_queues,
|
||||||
)
|
)
|
||||||
return 1, res
|
return 1, res
|
||||||
|
|
||||||
@ -244,6 +247,7 @@ def delete_task(
|
|||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
status_message=status_message,
|
status_message=status_message,
|
||||||
status_reason=status_reason,
|
status_reason=status_reason,
|
||||||
|
remove_from_all_queues=True,
|
||||||
)
|
)
|
||||||
except APIError:
|
except APIError:
|
||||||
# dequeue may fail if the task was not enqueued
|
# dequeue may fail if the task was not enqueued
|
||||||
@ -296,6 +300,8 @@ def reset_task(
|
|||||||
# dequeue may fail if the task was not enqueued
|
# dequeue may fail if the task was not enqueued
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
TaskBLL.remove_task_from_all_queues(company_id=company_id, task=task)
|
||||||
|
|
||||||
cleaned_up = cleanup_task(
|
cleaned_up = cleanup_task(
|
||||||
company=company_id,
|
company=company_id,
|
||||||
user=user_id,
|
user=user_id,
|
||||||
|
@ -1507,6 +1507,13 @@ dequeue {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
"999.0": ${dequeue."1.5"} {
|
||||||
|
request.properties.remove_from_all_queues {
|
||||||
|
type: boolean
|
||||||
|
description: If set to 'true' then the task is searched and removed from all the queues. Otherwise only from the queue stored in the task execution parameters
|
||||||
|
default: false
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
dequeue_many {
|
dequeue_many {
|
||||||
"2.13": ${_definitions.change_many_request} {
|
"2.13": ${_definitions.change_many_request} {
|
||||||
@ -1525,6 +1532,13 @@ dequeue_many {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
"999.0": ${dequeue_many."2.13"} {
|
||||||
|
request.properties.remove_from_all_queues {
|
||||||
|
type: boolean
|
||||||
|
description: If set to 'true' then the tasks are searched and removed from all the queues. Otherwise only from the queue stored in the task execution parameters
|
||||||
|
default: false
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
set_requirements {
|
set_requirements {
|
||||||
"2.1" {
|
"2.1" {
|
||||||
|
@ -65,6 +65,8 @@ from apiserver.apimodels.tasks import (
|
|||||||
CompletedRequest,
|
CompletedRequest,
|
||||||
CompletedResponse,
|
CompletedResponse,
|
||||||
GetAllReq,
|
GetAllReq,
|
||||||
|
DequeueRequest,
|
||||||
|
DequeueManyRequest,
|
||||||
)
|
)
|
||||||
from apiserver.bll.event import EventBLL
|
from apiserver.bll.event import EventBLL
|
||||||
from apiserver.bll.model import ModelBLL
|
from apiserver.bll.model import ModelBLL
|
||||||
@ -383,7 +385,8 @@ def close(call: APICall, company_id, req_model: UpdateRequest):
|
|||||||
req_model,
|
req_model,
|
||||||
company_id=company_id,
|
company_id=company_id,
|
||||||
user_id=call.identity.user,
|
user_id=call.identity.user,
|
||||||
new_status=TaskStatus.closed)
|
new_status=TaskStatus.closed,
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@ -929,27 +932,24 @@ def enqueue_many(call: APICall, company_id, request: EnqueueManyRequest):
|
|||||||
|
|
||||||
|
|
||||||
@endpoint(
|
@endpoint(
|
||||||
"tasks.dequeue",
|
"tasks.dequeue", response_data_model=DequeueResponse,
|
||||||
request_data_model=UpdateRequest,
|
|
||||||
response_data_model=DequeueResponse,
|
|
||||||
)
|
)
|
||||||
def dequeue(call: APICall, company_id, request: UpdateRequest):
|
def dequeue(call: APICall, company_id, request: DequeueRequest):
|
||||||
dequeued, res = dequeue_task(
|
dequeued, res = dequeue_task(
|
||||||
task_id=request.task,
|
task_id=request.task,
|
||||||
company_id=company_id,
|
company_id=company_id,
|
||||||
user_id=call.identity.user,
|
user_id=call.identity.user,
|
||||||
status_message=request.status_message,
|
status_message=request.status_message,
|
||||||
status_reason=request.status_reason,
|
status_reason=request.status_reason,
|
||||||
|
remove_from_all_queues=request.remove_from_all_queues,
|
||||||
)
|
)
|
||||||
call.result.data_model = DequeueResponse(dequeued=dequeued, **res)
|
call.result.data_model = DequeueResponse(dequeued=dequeued, **res)
|
||||||
|
|
||||||
|
|
||||||
@endpoint(
|
@endpoint(
|
||||||
"tasks.dequeue_many",
|
"tasks.dequeue_many", response_data_model=DequeueManyResponse,
|
||||||
request_data_model=TaskBatchRequest,
|
|
||||||
response_data_model=DequeueManyResponse,
|
|
||||||
)
|
)
|
||||||
def dequeue_many(call: APICall, company_id, request: TaskBatchRequest):
|
def dequeue_many(call: APICall, company_id, request: DequeueManyRequest):
|
||||||
results, failures = run_batch_operation(
|
results, failures = run_batch_operation(
|
||||||
func=partial(
|
func=partial(
|
||||||
dequeue_task,
|
dequeue_task,
|
||||||
@ -957,6 +957,7 @@ def dequeue_many(call: APICall, company_id, request: TaskBatchRequest):
|
|||||||
user_id=call.identity.user,
|
user_id=call.identity.user,
|
||||||
status_message=request.status_message,
|
status_message=request.status_message,
|
||||||
status_reason=request.status_reason,
|
status_reason=request.status_reason,
|
||||||
|
remove_from_all_queues=request.remove_from_all_queues,
|
||||||
),
|
),
|
||||||
ids=request.ids,
|
ids=request.ids,
|
||||||
)
|
)
|
||||||
@ -1357,5 +1358,7 @@ def delete_models(call: APICall, company_id: str, request: DeleteModelsRequest):
|
|||||||
if names
|
if names
|
||||||
}
|
}
|
||||||
|
|
||||||
updated = task.update(last_change=datetime.utcnow(), last_changed_by=call.identity.user, **commands,)
|
updated = task.update(
|
||||||
|
last_change=datetime.utcnow(), last_changed_by=call.identity.user, **commands,
|
||||||
|
)
|
||||||
return {"updated": updated}
|
return {"updated": updated}
|
||||||
|
Loading…
Reference in New Issue
Block a user