diff --git a/apiserver/bll/event/event_bll.py b/apiserver/bll/event/event_bll.py index bd47e43..73a67ad 100644 --- a/apiserver/bll/event/event_bll.py +++ b/apiserver/bll/event/event_bll.py @@ -424,8 +424,22 @@ class EventBLL(object): for k in ("value", "metric", "variant", "iter", "timestamp") if k in event } - event_data["min_value"] = min(value, last_event.get("min_value", value)) - event_data["max_value"] = max(value, last_event.get("max_value", value)) + last_event_min_value = last_event.get("min_value", value) + last_event_min_value_iter = last_event.get("min_value_iter", event_iter) + if value < last_event_min_value: + event_data["min_value"] = value + event_data["min_value_iter"] = event_iter + else: + event_data["min_value"] = last_event_min_value + event_data["min_value_iter"] = last_event_min_value_iter + last_event_max_value = last_event.get("max_value", value) + last_event_max_value_iter = last_event.get("max_value_iter", event_iter) + if value > last_event_max_value: + event_data["max_value"] = value + event_data["max_value_iter"] = event_iter + else: + event_data["max_value"] = last_event_max_value + event_data["max_value_iter"] = last_event_max_value_iter last_events[metric_hash][variant_hash] = event_data def _update_last_metric_events_for_task(self, last_events, event): @@ -800,6 +814,7 @@ class EventBLL(object): event_type=event_type, task_id=task_ids, iters=last_iter_count, + metrics=metrics, ) should = [ { @@ -1016,11 +1031,16 @@ class EventBLL(object): event_type: EventType, task_id: Union[str, Sequence[str]], iters: int, + metrics: MetricVariants = None ) -> Mapping[str, Sequence]: if check_empty_data(self.es, company_id=company_id, event_type=event_type): return {} task_ids = [task_id] if isinstance(task_id, str) else task_id + must = [{"terms": {"task": task_ids}}] + if metrics: + must.append(get_metric_variants_condition(metrics)) + es_req: dict = { "size": 0, "aggs": { @@ -1037,7 +1057,7 @@ class EventBLL(object): }, } }, - "query": {"bool": {"must": [{"terms": {"task": task_ids}}]}}, + "query": {"bool": {"must": must}}, } with translate_errors_context(): diff --git a/apiserver/bll/task/task_bll.py b/apiserver/bll/task/task_bll.py index 939f62f..31cfae1 100644 --- a/apiserver/bll/task/task_bll.py +++ b/apiserver/bll/task/task_bll.py @@ -400,6 +400,7 @@ class TaskBLL: elif last_iteration_max is not None: extra_updates.update(max__last_iteration=last_iteration_max) + raw_updates = {} if last_scalar_events is not None: max_values = config.get("services.tasks.max_last_metrics", 2000) total_metrics = set() @@ -413,6 +414,42 @@ class TaskBLL: total_metrics = set(task.unique_metrics) new_metrics = [] + + def add_last_metric_conditional_update( + metric_path: str, metric_value, iter_value: int, is_min: bool + ): + """ + Build an aggregation for an atomic update of the min or max value and the corresponding iteration + """ + if is_min: + field_prefix = "min" + op = "$gt" + else: + field_prefix = "max" + op = "$lt" + + value_field = f"{metric_path}__{field_prefix}_value".replace("__", ".") + condition = { + "$or": [ + {"$lte": [f"${value_field}", None]}, + {op: [f"${value_field}", metric_value]}, + ] + } + raw_updates[value_field] = { + "$cond": [condition, metric_value, f"${value_field}"] + } + + value_iteration_field = f"{metric_path}__{field_prefix}_value_iteration".replace( + "__", "." + ) + raw_updates[value_iteration_field] = { + "$cond": [ + condition, + iter_value, + f"${value_iteration_field}", + ] + } + for metric_key, metric_data in last_scalar_events.items(): for variant_key, variant_data in metric_data.items(): metric = ( @@ -429,10 +466,13 @@ class TaskBLL: new_metrics.append(metric) path = f"last_metrics__{metric_key}__{variant_key}" for key, value in variant_data.items(): - if key == "min_value": - extra_updates[f"min__{path}__min_value"] = value - elif key == "max_value": - extra_updates[f"max__{path}__max_value"] = value + if key in ("min_value", "max_value"): + add_last_metric_conditional_update( + metric_path=path, + metric_value=value, + iter_value=variant_data.get(f"{key}_iter", 0), + is_min=(key == "min_value"), + ) elif key in ("metric", "variant", "value"): extra_updates[f"set__{path}__{key}"] = value if new_metrics: @@ -440,10 +480,10 @@ class TaskBLL: if last_events is not None: - def events_per_type(metric_data: Dict[str, dict]) -> Dict[str, EventStats]: + def events_per_type(metric_data_: Dict[str, dict]) -> Dict[str, EventStats]: return { event_type: EventStats(last_update=event["timestamp"]) - for event_type, event in metric_data.items() + for event_type, event in metric_data_.items() } metric_stats = { @@ -454,12 +494,16 @@ class TaskBLL: } extra_updates["metric_stats"] = metric_stats - return TaskBLL.set_last_update( + ret = TaskBLL.set_last_update( task_ids=[task_id], company_id=company_id, last_update=last_update, **extra_updates, ) + if ret and raw_updates: + Task.objects(id=task_id).update_one(__raw__=[{"$set": raw_updates}]) + + return ret @classmethod def dequeue_and_change_status( diff --git a/apiserver/database/model/task/metrics.py b/apiserver/database/model/task/metrics.py index 3ac1152..f94a6f2 100644 --- a/apiserver/database/model/task/metrics.py +++ b/apiserver/database/model/task/metrics.py @@ -4,6 +4,7 @@ from mongoengine import ( DynamicField, LongField, EmbeddedDocumentField, + IntField, ) from apiserver.database.fields import SafeMapField @@ -19,7 +20,9 @@ class MetricEvent(EmbeddedDocument): variant = StringField(required=True) value = DynamicField(required=True) min_value = DynamicField() # for backwards compatibility reasons + min_value_iteration = IntField() max_value = DynamicField() # for backwards compatibility reasons + max_value_iteration = IntField() class EventStats(EmbeddedDocument): diff --git a/apiserver/schema/services/_tasks_common.conf b/apiserver/schema/services/_tasks_common.conf index b24a10b..87e1165 100644 --- a/apiserver/schema/services/_tasks_common.conf +++ b/apiserver/schema/services/_tasks_common.conf @@ -271,10 +271,18 @@ last_metrics_event { description: "Minimum value reported" type: number } + min_value_iteration { + description: "The iteration at which the minimum value was reported" + type: integer + } max_value { description: "Maximum value reported" type: number } + max_value_iteration { + description: "The iteration at which the maximum value was reported" + type: integer + } } } last_metrics_variants { diff --git a/apiserver/tests/automated/test_task_events.py b/apiserver/tests/automated/test_task_events.py index 5a9b7e1..cf0b85b 100644 --- a/apiserver/tests/automated/test_task_events.py +++ b/apiserver/tests/automated/test_task_events.py @@ -177,7 +177,12 @@ class TestTaskEvents(TestService): metric_data = first(first(task_data.last_metrics.values()).values()) self.assertEqual(iter_count - 1, metric_data.value) self.assertEqual(iter_count - 1, metric_data.max_value) + self.assertEqual(iter_count - 1, metric_data.max_value_iteration) self.assertEqual(0, metric_data.min_value) + self.assertEqual(0, metric_data.min_value_iteration) + + res = self.api.events.get_task_latest_scalar_values(task=task) + self.assertEqual(iter_count - 1, res.last_iter) def test_model_events(self): model = self._temp_model(ready=False)