Fix task stats update

This commit is contained in:
allegroai 2022-09-29 19:18:22 +03:00
parent ddb91f226a
commit a4fa567be2
3 changed files with 29 additions and 27 deletions

View File

@ -41,7 +41,7 @@ from apiserver.database.model.task.task import Task, TaskStatus
from apiserver.redis_manager import redman
from apiserver.timing_context import TimingContext
from apiserver.tools import safe_get
from apiserver.utilities.dicts import flatten_nested_items, nested_get
from apiserver.utilities.dicts import nested_get
from apiserver.utilities.json import loads
# noinspection PyTypeChecker
@ -396,33 +396,17 @@ class EventBLL(object):
as the latest metric/variant scalar values reported (according to the report timestamp) and the task's last
update time.
"""
fields = {}
if iter_max is not None:
fields["last_iteration_max"] = iter_max
if last_scalar_events:
fields["last_scalar_values"] = list(
flatten_nested_items(
last_scalar_events,
nesting=2,
include_leaves=[
"value",
"min_value",
"max_value",
"metric",
"variant",
],
)
)
if last_events:
fields["last_events"] = last_events
if not fields:
if iter_max is None and not last_events and not last_scalar_events:
return False
return TaskBLL.update_statistics(task_id, company_id, last_update=now, **fields)
return TaskBLL.update_statistics(
task_id,
company_id,
last_update=now,
last_iteration_max=iter_max,
last_scalar_events=last_scalar_events,
last_events=last_events,
)
def _get_event_id(self, event):
id_values = (str(event[field]) for field in self.id_fields if field in event)

View File

@ -19,4 +19,8 @@ hyperparam_values {
# cache ttl sec
cache_ttl_sec: 86400
}
}
# the maximum amount of unique last metrics/variants combinations
# for which the last values are stored in a task
max_last_metrics: 2000

View File

@ -60,6 +60,20 @@ class TestQueues(TestService):
self.assertQueueTasks(res.queue, [task])
self.assertTaskTags(task, system_tags=[])
def test_dequeue_from_deleted_queue(self):
queue = self._temp_queue("TestTempQueue")
task_name = "TempDevTask"
task = self._temp_task(task_name)
self.api.tasks.enqueue(task=task, queue=queue)
res = self.api.tasks.get_by_id(task=task)
self.assertEqual(res.task.status, "queued")
self.api.queues.delete(queue=queue, force=True)
self.api.tasks.dequeue(task=task)
res = self.api.tasks.get_by_id(task=task)
self.assertEqual(res.task.status, "created")
def test_max_queue_entries(self):
queue = self._temp_queue("TestTempQueue")
tasks = [