Fix task that is not in queue but has 'queued' status can't be dequeued

This commit is contained in:
allegroai 2023-05-25 19:14:25 +03:00
parent 6e777e80b8
commit 4c22757002
2 changed files with 16 additions and 8 deletions

View File

@ -7,7 +7,7 @@ from redis import StrictRedis
from six import string_types from six import string_types
import apiserver.database.utils as dbutils import apiserver.database.utils as dbutils
from apiserver.apierrors import errors from apiserver.apierrors import errors, APIError
from apiserver.apimodels.tasks import TaskInputModel from apiserver.apimodels.tasks import TaskInputModel
from apiserver.bll.queue import QueueBLL from apiserver.bll.queue import QueueBLL
from apiserver.bll.organization import OrgBLL, Tags from apiserver.bll.organization import OrgBLL, Tags
@ -453,11 +453,7 @@ class TaskBLL:
"__", "." "__", "."
) )
raw_updates[value_iteration_field] = { raw_updates[value_iteration_field] = {
"$cond": [ "$cond": [condition, iter_value, f"${value_iteration_field}",]
condition,
iter_value,
f"${value_iteration_field}",
]
} }
for metric_key, metric_data in last_scalar_events.items(): for metric_key, metric_data in last_scalar_events.items():
@ -525,8 +521,8 @@ class TaskBLL:
status_reason: str, status_reason: str,
): ):
try: try:
cls.dequeue(task, company_id) cls.dequeue(task, company_id, silent_fail=True)
except errors.bad_request.InvalidQueueOrTaskNotQueued: except APIError:
# dequeue may fail if the queue was deleted # dequeue may fail if the queue was deleted
pass pass

View File

@ -81,6 +81,18 @@ class TestQueues(TestService):
self.assertQueueTasks(res.queue, [task]) self.assertQueueTasks(res.queue, [task])
self.assertTaskTags(task, system_tags=[]) self.assertTaskTags(task, system_tags=[])
def test_dequeue_not_queued_task(self):
# queue = self._temp_queue("TestTempQueue")
task_name = "TempDevTask"
task = self._temp_task(task_name)
self.api.tasks.edit(task=task, status="queued") # , execution={"queue": queue})
res = self.api.tasks.get_by_id(task=task)
self.assertEqual(res.task.status, "queued")
self.api.tasks.dequeue(task=task)
res = self.api.tasks.get_by_id(task=task)
self.assertEqual(res.task.status, "created")
def test_dequeue_from_deleted_queue(self): def test_dequeue_from_deleted_queue(self):
queue = self._temp_queue("TestTempQueue") queue = self._temp_queue("TestTempQueue")
task_name = "TempDevTask" task_name = "TempDevTask"