Allow enqueueing enqueued tasks

This commit is contained in:
clearml 2024-12-05 19:10:34 +02:00
parent 4df5687ecd
commit fa41e14625
2 changed files with 11 additions and 5 deletions

View File

@ -439,8 +439,11 @@ class TaskBLL:
return ret return ret
@staticmethod @staticmethod
def remove_task_from_all_queues(company_id: str, task_id: str) -> int: def remove_task_from_all_queues(company_id: str, task_id: str, exclude: str = None) -> int:
return Queue.objects(company=company_id, entries__task=task_id).update( more = {}
if exclude:
more["id__ne"] = exclude
return Queue.objects(company=company_id, entries__task=task_id, **more).update(
pull__entries__task=task_id, last_update=datetime.utcnow() pull__entries__task=task_id, last_update=datetime.utcnow()
) )

View File

@ -231,15 +231,17 @@ def enqueue_task(
if validate: if validate:
TaskBLL.validate(task) TaskBLL.validate(task)
before_enqueue_status = task.status
if task.status == TaskStatus.queued and task.enqueue_status:
before_enqueue_status = task.enqueue_status
res = ChangeStatusRequest( res = ChangeStatusRequest(
task=task, task=task,
new_status=TaskStatus.queued, new_status=TaskStatus.queued,
status_reason=status_reason, status_reason=status_reason,
status_message=status_message, status_message=status_message,
allow_same_state_transition=False,
force=force, force=force,
user_id=user_id, user_id=user_id,
).execute(enqueue_status=task.status) ).execute(enqueue_status=before_enqueue_status)
try: try:
queue_bll.add_task(company_id=company_id, queue_id=queue_id, task_id=task.id) queue_bll.add_task(company_id=company_id, queue_id=queue_id, task_id=task.id)
@ -260,7 +262,8 @@ def enqueue_task(
Task.objects(id=task_id).update(execution__queue=queue_id, multi=False) Task.objects(id=task_id).update(execution__queue=queue_id, multi=False)
else: else:
Task.objects(id=task_id).update(execution=Execution(queue=queue_id), multi=False) Task.objects(id=task_id).update(execution=Execution(queue=queue_id), multi=False)
# make sure that the task is not queued in any other queue
TaskBLL.remove_task_from_all_queues(company_id=company_id, task_id=task_id, exclude=queue_id)
nested_set(res, ("fields", "execution.queue"), queue_id) nested_set(res, ("fields", "execution.queue"), queue_id)
return 1, res return 1, res