diff --git a/apiserver/bll/task/task_bll.py b/apiserver/bll/task/task_bll.py index d958008..22f5552 100644 --- a/apiserver/bll/task/task_bll.py +++ b/apiserver/bll/task/task_bll.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import Collection, Sequence, Tuple, Any, Optional, Dict +from typing import Collection, Sequence, Tuple, Optional, Dict import six from mongoengine import Q @@ -381,7 +381,7 @@ class TaskBLL: last_update: datetime = None, last_iteration: int = None, last_iteration_max: int = None, - last_scalar_values: Sequence[Tuple[Tuple[str, ...], Any]] = None, + last_scalar_events: Dict[str, Dict[str, dict]] = None, last_events: Dict[str, Dict[str, dict]] = None, **extra_updates, ): @@ -406,18 +406,43 @@ class TaskBLL: elif last_iteration_max is not None: extra_updates.update(max__last_iteration=last_iteration_max) - if last_scalar_values is not None: + if last_scalar_events is not None: + max_values = config.get("services.tasks.max_last_metrics", 2000) + total_metrics = set() + if max_values: + query = dict(id=task_id) + to_add = sum(len(v) for m, v in last_scalar_events.items()) + if to_add <= max_values: + query[f"unique_metrics__{max_values-to_add}__exists"] = True + task = Task.objects(**query).only("unique_metrics").first() + if task and task.unique_metrics: + total_metrics = set(task.unique_metrics) - def op_path(op, *path): - return "__".join((op, "last_metrics") + path) + new_metrics = [] + for metric_key, metric_data in last_scalar_events.items(): + for variant_key, variant_data in metric_data.items(): + metric = ( + f"{variant_data.get('metric')}/{variant_data.get('variant')}" + ) + if max_values: + if ( + len(total_metrics) >= max_values + and metric not in total_metrics + ): + continue + total_metrics.add(metric) - for path, value in last_scalar_values: - if path[-1] == "min_value": - extra_updates[op_path("min", *path[:-1], "min_value")] = value - elif path[-1] == "max_value": - extra_updates[op_path("max", *path[:-1], "max_value")] = value - else: - extra_updates[op_path("set", *path)] = value + new_metrics.append(metric) + path = f"last_metrics__{metric_key}__{variant_key}" + for key, value in variant_data.items(): + if key == "min_value": + extra_updates[f"min__{path}__min_value"] = value + elif key == "max_value": + extra_updates[f"max__{path}__max_value"] = value + elif key in ("metric", "variant", "value"): + extra_updates[f"set__{path}__{key}"] = value + if new_metrics: + extra_updates["add_to_set__unique_metrics"] = new_metrics if last_events is not None: @@ -446,7 +471,11 @@ class TaskBLL: def dequeue_and_change_status( cls, task: Task, company_id: str, status_message: str, status_reason: str, ): - cls.dequeue(task, company_id) + try: + cls.dequeue(task, company_id) + except errors.bad_request.InvalidQueueOrTaskNotQueued: + # dequeue may fail if the queue was deleted + pass return ChangeStatusRequest( task=task, diff --git a/apiserver/bll/task/task_operations.py b/apiserver/bll/task/task_operations.py index 0be1d1b..ed107d8 100644 --- a/apiserver/bll/task/task_operations.py +++ b/apiserver/bll/task/task_operations.py @@ -278,6 +278,7 @@ def reset_task( updates.update( set__last_iteration=DEFAULT_LAST_ITERATION, set__last_metrics={}, + set__unique_metrics=[], set__metric_stats={}, set__models__output=[], set__runtime={}, diff --git a/apiserver/database/model/task/task.py b/apiserver/database/model/task/task.py index e46676e..e6277ff 100644 --- a/apiserver/database/model/task/task.py +++ b/apiserver/database/model/task/task.py @@ -259,6 +259,7 @@ class Task(AttributedDocument): last_change = DateTimeField() last_iteration = IntField(default=DEFAULT_LAST_ITERATION) last_metrics = SafeMapField(field=SafeMapField(EmbeddedDocumentField(MetricEvent))) + unique_metrics = ListField(StringField(required=True), exclude_by_default=True) metric_stats = SafeMapField(field=EmbeddedDocumentField(MetricEventStats)) company_origin = StringField(exclude_by_default=True) duration = IntField() # task duration in seconds diff --git a/apiserver/tests/automated/test_tasks_filtering.py b/apiserver/tests/automated/test_tasks_filtering.py index 9a959b2..11909cc 100644 --- a/apiserver/tests/automated/test_tasks_filtering.py +++ b/apiserver/tests/automated/test_tasks_filtering.py @@ -42,6 +42,32 @@ class TestTasksFiltering(TestService): self.assertEqual(res.total, 0) self.assertEqual(res["values"], []) + def test_datetime_queries(self): + tasks = [self.temp_task() for _ in range(5)] + now = datetime.utcnow() + for task in tasks: + self.api.tasks.ping(task=task) + + # date time syntax + res = self.api.tasks.get_all_ex(last_update=f">={now.isoformat()}").tasks + self.assertTrue(set(tasks).issubset({t.id for t in res})) + res = self.api.tasks.get_all_ex( + last_update=[ + f">={(now - timedelta(seconds=60)).isoformat()}", + f"<={now.isoformat()}", + ] + ).tasks + self.assertFalse(set(tasks).issubset({t.id for t in res})) + + # simplified range syntax + res = self.api.tasks.get_all_ex(last_update=[now.isoformat(), None]).tasks + self.assertTrue(set(tasks).issubset({t.id for t in res})) + + res = self.api.tasks.get_all_ex( + last_update=[(now - timedelta(seconds=60)).isoformat(), now.isoformat()] + ).tasks + self.assertFalse(set(tasks).issubset({t.id for t in res})) + def test_range_queries(self): tasks = [self.temp_task() for _ in range(5)] now = datetime.utcnow()