Add Task Unique Metrics to task object

This commit is contained in:
allegroai 2022-09-29 19:16:56 +03:00
parent 7772f47773
commit ddb91f226a
4 changed files with 70 additions and 13 deletions

View File

@ -1,5 +1,5 @@
from datetime import datetime from datetime import datetime
from typing import Collection, Sequence, Tuple, Any, Optional, Dict from typing import Collection, Sequence, Tuple, Optional, Dict
import six import six
from mongoengine import Q from mongoengine import Q
@ -381,7 +381,7 @@ class TaskBLL:
last_update: datetime = None, last_update: datetime = None,
last_iteration: int = None, last_iteration: int = None,
last_iteration_max: int = None, last_iteration_max: int = None,
last_scalar_values: Sequence[Tuple[Tuple[str, ...], Any]] = None, last_scalar_events: Dict[str, Dict[str, dict]] = None,
last_events: Dict[str, Dict[str, dict]] = None, last_events: Dict[str, Dict[str, dict]] = None,
**extra_updates, **extra_updates,
): ):
@ -406,18 +406,43 @@ class TaskBLL:
elif last_iteration_max is not None: elif last_iteration_max is not None:
extra_updates.update(max__last_iteration=last_iteration_max) extra_updates.update(max__last_iteration=last_iteration_max)
if last_scalar_values is not None: if last_scalar_events is not None:
max_values = config.get("services.tasks.max_last_metrics", 2000)
total_metrics = set()
if max_values:
query = dict(id=task_id)
to_add = sum(len(v) for m, v in last_scalar_events.items())
if to_add <= max_values:
query[f"unique_metrics__{max_values-to_add}__exists"] = True
task = Task.objects(**query).only("unique_metrics").first()
if task and task.unique_metrics:
total_metrics = set(task.unique_metrics)
def op_path(op, *path): new_metrics = []
return "__".join((op, "last_metrics") + path) for metric_key, metric_data in last_scalar_events.items():
for variant_key, variant_data in metric_data.items():
metric = (
f"{variant_data.get('metric')}/{variant_data.get('variant')}"
)
if max_values:
if (
len(total_metrics) >= max_values
and metric not in total_metrics
):
continue
total_metrics.add(metric)
for path, value in last_scalar_values: new_metrics.append(metric)
if path[-1] == "min_value": path = f"last_metrics__{metric_key}__{variant_key}"
extra_updates[op_path("min", *path[:-1], "min_value")] = value for key, value in variant_data.items():
elif path[-1] == "max_value": if key == "min_value":
extra_updates[op_path("max", *path[:-1], "max_value")] = value extra_updates[f"min__{path}__min_value"] = value
else: elif key == "max_value":
extra_updates[op_path("set", *path)] = value extra_updates[f"max__{path}__max_value"] = value
elif key in ("metric", "variant", "value"):
extra_updates[f"set__{path}__{key}"] = value
if new_metrics:
extra_updates["add_to_set__unique_metrics"] = new_metrics
if last_events is not None: if last_events is not None:
@ -446,7 +471,11 @@ class TaskBLL:
def dequeue_and_change_status( def dequeue_and_change_status(
cls, task: Task, company_id: str, status_message: str, status_reason: str, cls, task: Task, company_id: str, status_message: str, status_reason: str,
): ):
cls.dequeue(task, company_id) try:
cls.dequeue(task, company_id)
except errors.bad_request.InvalidQueueOrTaskNotQueued:
# dequeue may fail if the queue was deleted
pass
return ChangeStatusRequest( return ChangeStatusRequest(
task=task, task=task,

View File

@ -278,6 +278,7 @@ def reset_task(
updates.update( updates.update(
set__last_iteration=DEFAULT_LAST_ITERATION, set__last_iteration=DEFAULT_LAST_ITERATION,
set__last_metrics={}, set__last_metrics={},
set__unique_metrics=[],
set__metric_stats={}, set__metric_stats={},
set__models__output=[], set__models__output=[],
set__runtime={}, set__runtime={},

View File

@ -259,6 +259,7 @@ class Task(AttributedDocument):
last_change = DateTimeField() last_change = DateTimeField()
last_iteration = IntField(default=DEFAULT_LAST_ITERATION) last_iteration = IntField(default=DEFAULT_LAST_ITERATION)
last_metrics = SafeMapField(field=SafeMapField(EmbeddedDocumentField(MetricEvent))) last_metrics = SafeMapField(field=SafeMapField(EmbeddedDocumentField(MetricEvent)))
unique_metrics = ListField(StringField(required=True), exclude_by_default=True)
metric_stats = SafeMapField(field=EmbeddedDocumentField(MetricEventStats)) metric_stats = SafeMapField(field=EmbeddedDocumentField(MetricEventStats))
company_origin = StringField(exclude_by_default=True) company_origin = StringField(exclude_by_default=True)
duration = IntField() # task duration in seconds duration = IntField() # task duration in seconds

View File

@ -42,6 +42,32 @@ class TestTasksFiltering(TestService):
self.assertEqual(res.total, 0) self.assertEqual(res.total, 0)
self.assertEqual(res["values"], []) self.assertEqual(res["values"], [])
def test_datetime_queries(self):
tasks = [self.temp_task() for _ in range(5)]
now = datetime.utcnow()
for task in tasks:
self.api.tasks.ping(task=task)
# date time syntax
res = self.api.tasks.get_all_ex(last_update=f">={now.isoformat()}").tasks
self.assertTrue(set(tasks).issubset({t.id for t in res}))
res = self.api.tasks.get_all_ex(
last_update=[
f">={(now - timedelta(seconds=60)).isoformat()}",
f"<={now.isoformat()}",
]
).tasks
self.assertFalse(set(tasks).issubset({t.id for t in res}))
# simplified range syntax
res = self.api.tasks.get_all_ex(last_update=[now.isoformat(), None]).tasks
self.assertTrue(set(tasks).issubset({t.id for t in res}))
res = self.api.tasks.get_all_ex(
last_update=[(now - timedelta(seconds=60)).isoformat(), now.isoformat()]
).tasks
self.assertFalse(set(tasks).issubset({t.id for t in res}))
def test_range_queries(self): def test_range_queries(self):
tasks = [self.temp_task() for _ in range(5)] tasks = [self.temp_task() for _ in range(5)]
now = datetime.utcnow() now = datetime.utcnow()