From e86b7fd24e6dc8171369a1090d8344aa9cf2b438 Mon Sep 17 00:00:00 2001 From: clearml <> Date: Thu, 5 Dec 2024 19:02:48 +0200 Subject: [PATCH] Support for first and mean value for task last scalar metrics --- apiserver/bll/event/event_bll.py | 61 ++++++------- apiserver/bll/task/task_bll.py | 4 + apiserver/bll/task/task_operations.py | 1 + apiserver/bll/task/utils.py | 90 ++++++++++++++++--- apiserver/database/model/task/metrics.py | 5 ++ apiserver/mongo/initialize/__init__.py | 2 +- apiserver/mongo/initialize/user.py | 61 ++++--------- apiserver/schema/services/_tasks_common.conf | 16 ++++ apiserver/tests/automated/test_task_events.py | 5 +- docker/docker-compose-win10.yml | 2 +- docker/docker-compose.yml | 2 +- 11 files changed, 160 insertions(+), 89 deletions(-) diff --git a/apiserver/bll/event/event_bll.py b/apiserver/bll/event/event_bll.py index 80d8392..9c5ead6 100644 --- a/apiserver/bll/event/event_bll.py +++ b/apiserver/bll/event/event_bll.py @@ -319,6 +319,7 @@ class EventBLL(object): if actions: chunk_size = 500 # TODO: replace it with helpers.parallel_bulk in the future once the parallel pool leak is fixed + # noinspection PyTypeChecker with closing( elasticsearch.helpers.streaming_bulk( self.es, @@ -438,45 +439,45 @@ class EventBLL(object): last_events contains [hashed_metric_name -> hashed_variant_name -> event]. Keys are hashed to avoid mongodb key conflicts due to invalid characters and/or long field names. """ + value = event.get("value") + if value is None: + return + metric = event.get("metric") or "" variant = event.get("variant") or "" - metric_hash = dbutils.hash_field_name(metric) variant_hash = dbutils.hash_field_name(variant) last_event = last_events[metric_hash][variant_hash] + last_event["metric"] = metric + last_event["variant"] = variant + last_event["count"] = last_event.get("count", 0) + 1 + last_event["total"] = last_event.get("total", 0) + value + event_iter = event.get("iter", 0) event_timestamp = event.get("timestamp", 0) - value = event.get("value") - if value is not None and ( - (event_iter, event_timestamp) - >= ( - last_event.get("iter", event_iter), - last_event.get("timestamp", event_timestamp), - ) + if (event_iter, event_timestamp) >= ( + last_event.get("iter", event_iter), + last_event.get("timestamp", event_timestamp), ): - event_data = { - k: event[k] - for k in ("value", "metric", "variant", "iter", "timestamp") - if k in event - } - last_event_min_value = last_event.get("min_value", value) - last_event_min_value_iter = last_event.get("min_value_iter", event_iter) - if value < last_event_min_value: - event_data["min_value"] = value - event_data["min_value_iter"] = event_iter - else: - event_data["min_value"] = last_event_min_value - event_data["min_value_iter"] = last_event_min_value_iter - last_event_max_value = last_event.get("max_value", value) - last_event_max_value_iter = last_event.get("max_value_iter", event_iter) - if value > last_event_max_value: - event_data["max_value"] = value - event_data["max_value_iter"] = event_iter - else: - event_data["max_value"] = last_event_max_value - event_data["max_value_iter"] = last_event_max_value_iter - last_events[metric_hash][variant_hash] = event_data + last_event["value"] = value + last_event["iter"] = event_iter + last_event["timestamp"] = event_timestamp + + first_value_iter = last_event.get("first_value_iter") + if first_value_iter is None or event_iter < first_value_iter: + last_event["first_value"] = value + last_event["first_value_iter"] = event_iter + + last_event_min_value = last_event.get("min_value") + if last_event_min_value is None or value < last_event_min_value: + last_event["min_value"] = value + last_event["min_value_iter"] = event_iter + + last_event_max_value = last_event.get("max_value") + if last_event_max_value is None or value > last_event_max_value: + last_event["max_value"] = value + last_event["max_value_iter"] = event_iter def _update_last_metric_events_for_task(self, last_events, event): """ diff --git a/apiserver/bll/task/task_bll.py b/apiserver/bll/task/task_bll.py index 6c99b23..a244397 100644 --- a/apiserver/bll/task/task_bll.py +++ b/apiserver/bll/task/task_bll.py @@ -454,6 +454,7 @@ class TaskBLL: status_reason: str, remove_from_all_queues=False, new_status=None, + new_status_for_aborted_task=None, ): try: cls.dequeue(task, company_id, silent_fail=True) @@ -467,6 +468,9 @@ class TaskBLL: if task.status not in [TaskStatus.queued, TaskStatus.in_progress]: return {"updated": 0} + if new_status_for_aborted_task and task.status == TaskStatus.in_progress: + new_status = new_status_for_aborted_task + return ChangeStatusRequest( task=task, new_status=new_status or task.enqueue_status or TaskStatus.created, diff --git a/apiserver/bll/task/task_operations.py b/apiserver/bll/task/task_operations.py index 75d93e3..c8a2e3c 100644 --- a/apiserver/bll/task/task_operations.py +++ b/apiserver/bll/task/task_operations.py @@ -85,6 +85,7 @@ def archive_task( status_message=status_message, status_reason=status_reason, remove_from_all_queues=True, + new_status_for_aborted_task=TaskStatus.stopped, ) except APIError: # dequeue may fail if the task was not enqueued diff --git a/apiserver/bll/task/utils.py b/apiserver/bll/task/utils.py index 4ea21b4..6e1d470 100644 --- a/apiserver/bll/task/utils.py +++ b/apiserver/bll/task/utils.py @@ -182,7 +182,7 @@ def get_many_tasks_for_writing( throw_on_forbidden: bool = True, ) -> Sequence[Task]: if only: - missing = [f for f in ("company", ) if f not in only] + missing = [f for f in ("company",) if f not in only] if missing: only = [*only, *missing] @@ -235,7 +235,7 @@ def get_task_for_update( task_id: str, identity: Identity, allow_all_statuses: bool = False, - force: bool = False + force: bool = False, ) -> Task: """ Loads only task id and return the task only if it is updatable (status == 'created') @@ -291,13 +291,62 @@ def get_last_metric_updates( new_metrics = [] + def add_last_metric_mean_update( + metric_path: str, + metric_count: int, + metric_total: float, + ): + """ + Update new mean field based on the value in db and new data + The count field is updated here too and not with inc__ so that + it will not get updated in the db earlier than the corresponding mean + """ + metric_path = metric_path.replace("__", ".") + mean_value_field = f"{metric_path}.mean_value" + count_field = f"{metric_path}.count" + raw_updates[mean_value_field] = { + "$round": [ + { + "$divide": [ + { + "$add": [ + { + "$multiply": [ + {"$ifNull": [f"${mean_value_field}", 0]}, + {"$ifNull": [f"${count_field}", 0]}, + ] + }, + metric_total, + ] + }, + { + "$add": [ + {"$ifNull": [f"${count_field}", 0]}, + metric_count, + ] + }, + ] + }, + 2, + ] + } + raw_updates[count_field] = { + "$add": [ + {"$ifNull": [f"${count_field}", 0]}, + metric_count, + ] + } + def add_last_metric_conditional_update( - metric_path: str, metric_value, iter_value: int, is_min: bool + metric_path: str, metric_value, iter_value: int, is_min: bool, is_first: bool ): """ Build an aggregation for an atomic update of the min or max value and the corresponding iteration """ - if is_min: + if is_first: + field_prefix = "first" + op = None + elif is_min: field_prefix = "min" op = "$gt" else: @@ -305,18 +354,23 @@ def get_last_metric_updates( op = "$lt" value_field = f"{metric_path}__{field_prefix}_value".replace("__", ".") - condition = { - "$or": [ - {"$lte": [f"${value_field}", None]}, - {op: [f"${value_field}", metric_value]}, - ] - } + exists = {"$lte": [f"${value_field}", None]} + if op: + condition = { + "$or": [ + exists, + {op: [f"${value_field}", metric_value]}, + ] + } + else: + condition = exists + raw_updates[value_field] = { "$cond": [condition, metric_value, f"${value_field}"] } - value_iteration_field = f"{metric_path}__{field_prefix}_value_iteration".replace( - "__", "." + value_iteration_field = ( + f"{metric_path}__{field_prefix}_value_iteration".replace("__", ".") ) raw_updates[value_iteration_field] = { "$cond": [condition, iter_value, f"${value_iteration_field}"] @@ -333,15 +387,25 @@ def get_last_metric_updates( new_metrics.append(metric) path = f"last_metrics__{metric_key}__{variant_key}" for key, value in variant_data.items(): - if key in ("min_value", "max_value"): + if key in ("min_value", "max_value", "first_value"): add_last_metric_conditional_update( metric_path=path, metric_value=value, iter_value=variant_data.get(f"{key}_iter", 0), is_min=(key == "min_value"), + is_first=(key == "first_value"), ) elif key in ("metric", "variant", "value"): extra_updates[f"set__{path}__{key}"] = value + count = variant_data.get("count") + total = variant_data.get("total") + if count is not None and total is not None: + add_last_metric_mean_update( + metric_path=path, + metric_count=count, + metric_total=total, + ) + if new_metrics: extra_updates["add_to_set__unique_metrics"] = new_metrics diff --git a/apiserver/database/model/task/metrics.py b/apiserver/database/model/task/metrics.py index f94a6f2..025b9e1 100644 --- a/apiserver/database/model/task/metrics.py +++ b/apiserver/database/model/task/metrics.py @@ -5,6 +5,7 @@ from mongoengine import ( LongField, EmbeddedDocumentField, IntField, + FloatField, ) from apiserver.database.fields import SafeMapField @@ -23,6 +24,10 @@ class MetricEvent(EmbeddedDocument): min_value_iteration = IntField() max_value = DynamicField() # for backwards compatibility reasons max_value_iteration = IntField() + first_value = FloatField() + first_value_iteration = IntField() + count = IntField() + mean_value = FloatField() class EventStats(EmbeddedDocument): diff --git a/apiserver/mongo/initialize/__init__.py b/apiserver/mongo/initialize/__init__.py index 02f6598..94fc24c 100644 --- a/apiserver/mongo/initialize/__init__.py +++ b/apiserver/mongo/initialize/__init__.py @@ -73,7 +73,7 @@ def init_mongo_data(): } internal_user_emails.add(email.lower()) revoke = fixed_mode and credentials.get("revoke_in_fixed_mode", False) - user_id = _ensure_auth_user(user_data, company_id, log=log, revoke=revoke, internal_user=True) + user_id = _ensure_auth_user(user_data, company_id, log=log, revoke=revoke) if credentials.role == Role.user: _ensure_backend_user(user_id, company_id, credentials.display_name) diff --git a/apiserver/mongo/initialize/user.py b/apiserver/mongo/initialize/user.py index 12e6d0b..90ca85f 100644 --- a/apiserver/mongo/initialize/user.py +++ b/apiserver/mongo/initialize/user.py @@ -10,12 +10,7 @@ from apiserver.service_repo.auth.fixed_user import FixedUser def _ensure_user_credentials( - user: AuthUser, - key: str, - secret: str, - log: Logger, - revoke: bool = False, - internal_user: bool = False, + user: AuthUser, key: str, secret: str, log: Logger, revoke: bool = False ) -> None: if revoke: log.info(f"Revoking credentials for existing user {user.id} ({user.name})") @@ -24,34 +19,19 @@ def _ensure_user_credentials( return if not (key and secret): - if internal_user: - log.info(f"Resetting credentials for existing user {user.id} ({user.name})") - user.credentials = [] - user.save() + log.info(f"Resetting credentials for existing user {user.id} ({user.name})") + user.credentials = [] + user.save() return new_credentials = Credentials(key=key, secret=secret) - if internal_user: - log.info(f"Setting credentials for existing user {user.id} ({user.name})") - user.credentials = [new_credentials] - user.save() - return - - if user.credentials is None: - user.credentials = [] - if not any((cred.key, cred.secret) == (key, secret) for cred in user.credentials): - log.info(f"Adding credentials for existing user {user.id} ({user.name})") - user.credentials.append(new_credentials) - user.save() + log.info(f"Setting credentials for existing user {user.id} ({user.name})") + user.credentials = [new_credentials] + user.save() + return -def _ensure_auth_user( - user_data: dict, - company_id: str, - log: Logger, - revoke: bool = False, - internal_user: bool = False, -) -> str: +def _ensure_auth_user(user_data: dict, company_id: str, log: Logger, revoke: bool = False) -> str: user_id = user_data.get("id", f"__{user_data['name']}__") role = user_data["role"] email = user_data["email"] @@ -60,15 +40,12 @@ def _ensure_auth_user( user: AuthUser = AuthUser.objects(id=user_id).first() if user: - _ensure_user_credentials( - user=user, - key=key, - secret=secret, - log=log, - revoke=revoke, - internal_user=internal_user, - ) - if user.role != role or user.email != email or user.autocreated != autocreated: + _ensure_user_credentials(user=user, key=key, secret=secret, log=log, revoke=revoke) + if ( + user.role != role + or user.email != email + or user.autocreated != autocreated + ): user.email = email user.role = role user.autocreated = autocreated @@ -77,7 +54,9 @@ def _ensure_auth_user( return user.id credentials = ( - [Credentials(key=key, secret=secret)] if not revoke and key and secret else [] + [Credentials(key=key, secret=secret)] + if not revoke and key and secret + else [] ) log.info(f"Creating user: {user_data['name']}") @@ -129,9 +108,7 @@ def ensure_fixed_user(user: FixedUser, log: Logger, emails: set): try: log.info(f"Updating user name: {user.name}") given_name, _, family_name = user.name.partition(" ") - db_user.update( - name=user.name, given_name=given_name, family_name=family_name - ) + db_user.update(name=user.name, given_name=given_name, family_name=family_name) except Exception: pass else: diff --git a/apiserver/schema/services/_tasks_common.conf b/apiserver/schema/services/_tasks_common.conf index 2afffe7..8807bb6 100644 --- a/apiserver/schema/services/_tasks_common.conf +++ b/apiserver/schema/services/_tasks_common.conf @@ -283,6 +283,22 @@ last_metrics_event { description: "The iteration at which the maximum value was reported" type: integer } + first_value { + description: "First value reported" + type: number + } + first_value_iteration { + description: "The iteration at which the first value was reported" + type: integer + } + mean_value { + description: "The mean value" + type: number + } + count { + description: "The total count of reported values" + type: integer + } } } last_metrics_variants { diff --git a/apiserver/tests/automated/test_task_events.py b/apiserver/tests/automated/test_task_events.py index 0be5d50..f8a6ca5 100644 --- a/apiserver/tests/automated/test_task_events.py +++ b/apiserver/tests/automated/test_task_events.py @@ -217,7 +217,10 @@ class TestTaskEvents(TestService): self.assertEqual(iter_count - 1, metric_data.max_value_iteration) self.assertEqual(0, metric_data.min_value) self.assertEqual(0, metric_data.min_value_iteration) - + self.assertEqual(0, metric_data.first_value_iteration) + self.assertEqual(0, metric_data.first_value) + self.assertEqual(iter_count, metric_data.count) + self.assertEqual(sum(i for i in range(iter_count)) / iter_count, metric_data.mean_value) res = self.api.events.get_task_latest_scalar_values(task=task) self.assertEqual(iter_count - 1, res.last_iter) diff --git a/docker/docker-compose-win10.yml b/docker/docker-compose-win10.yml index 6f52412..d16a8e2 100644 --- a/docker/docker-compose-win10.yml +++ b/docker/docker-compose-win10.yml @@ -87,7 +87,7 @@ services: networks: - backend container_name: clearml-mongo - image: mongo:4.4.29 + image: mongo:5.0.26 restart: unless-stopped command: --setParameter internalQueryMaxBlockingSortMemoryUsageBytes=196100200 volumes: diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 07f7f43..05d8ea0 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -88,7 +88,7 @@ services: networks: - backend container_name: clearml-mongo - image: mongo:4.4.29 + image: mongo:5.0.26 restart: unless-stopped command: --setParameter internalQueryMaxBlockingSortMemoryUsageBytes=196100200 volumes: