Support for first and mean value for task last scalar metrics

This commit is contained in:
clearml 2024-12-05 19:02:48 +02:00
parent 50593f69f8
commit e86b7fd24e
11 changed files with 160 additions and 89 deletions

View File

@ -319,6 +319,7 @@ class EventBLL(object):
if actions: if actions:
chunk_size = 500 chunk_size = 500
# TODO: replace it with helpers.parallel_bulk in the future once the parallel pool leak is fixed # TODO: replace it with helpers.parallel_bulk in the future once the parallel pool leak is fixed
# noinspection PyTypeChecker
with closing( with closing(
elasticsearch.helpers.streaming_bulk( elasticsearch.helpers.streaming_bulk(
self.es, self.es,
@ -438,45 +439,45 @@ class EventBLL(object):
last_events contains [hashed_metric_name -> hashed_variant_name -> event]. Keys are hashed to avoid mongodb last_events contains [hashed_metric_name -> hashed_variant_name -> event]. Keys are hashed to avoid mongodb
key conflicts due to invalid characters and/or long field names. key conflicts due to invalid characters and/or long field names.
""" """
value = event.get("value")
if value is None:
return
metric = event.get("metric") or "" metric = event.get("metric") or ""
variant = event.get("variant") or "" variant = event.get("variant") or ""
metric_hash = dbutils.hash_field_name(metric) metric_hash = dbutils.hash_field_name(metric)
variant_hash = dbutils.hash_field_name(variant) variant_hash = dbutils.hash_field_name(variant)
last_event = last_events[metric_hash][variant_hash] last_event = last_events[metric_hash][variant_hash]
last_event["metric"] = metric
last_event["variant"] = variant
last_event["count"] = last_event.get("count", 0) + 1
last_event["total"] = last_event.get("total", 0) + value
event_iter = event.get("iter", 0) event_iter = event.get("iter", 0)
event_timestamp = event.get("timestamp", 0) event_timestamp = event.get("timestamp", 0)
value = event.get("value") if (event_iter, event_timestamp) >= (
if value is not None and (
(event_iter, event_timestamp)
>= (
last_event.get("iter", event_iter), last_event.get("iter", event_iter),
last_event.get("timestamp", event_timestamp), last_event.get("timestamp", event_timestamp),
)
): ):
event_data = { last_event["value"] = value
k: event[k] last_event["iter"] = event_iter
for k in ("value", "metric", "variant", "iter", "timestamp") last_event["timestamp"] = event_timestamp
if k in event
} first_value_iter = last_event.get("first_value_iter")
last_event_min_value = last_event.get("min_value", value) if first_value_iter is None or event_iter < first_value_iter:
last_event_min_value_iter = last_event.get("min_value_iter", event_iter) last_event["first_value"] = value
if value < last_event_min_value: last_event["first_value_iter"] = event_iter
event_data["min_value"] = value
event_data["min_value_iter"] = event_iter last_event_min_value = last_event.get("min_value")
else: if last_event_min_value is None or value < last_event_min_value:
event_data["min_value"] = last_event_min_value last_event["min_value"] = value
event_data["min_value_iter"] = last_event_min_value_iter last_event["min_value_iter"] = event_iter
last_event_max_value = last_event.get("max_value", value)
last_event_max_value_iter = last_event.get("max_value_iter", event_iter) last_event_max_value = last_event.get("max_value")
if value > last_event_max_value: if last_event_max_value is None or value > last_event_max_value:
event_data["max_value"] = value last_event["max_value"] = value
event_data["max_value_iter"] = event_iter last_event["max_value_iter"] = event_iter
else:
event_data["max_value"] = last_event_max_value
event_data["max_value_iter"] = last_event_max_value_iter
last_events[metric_hash][variant_hash] = event_data
def _update_last_metric_events_for_task(self, last_events, event): def _update_last_metric_events_for_task(self, last_events, event):
""" """

View File

@ -454,6 +454,7 @@ class TaskBLL:
status_reason: str, status_reason: str,
remove_from_all_queues=False, remove_from_all_queues=False,
new_status=None, new_status=None,
new_status_for_aborted_task=None,
): ):
try: try:
cls.dequeue(task, company_id, silent_fail=True) cls.dequeue(task, company_id, silent_fail=True)
@ -467,6 +468,9 @@ class TaskBLL:
if task.status not in [TaskStatus.queued, TaskStatus.in_progress]: if task.status not in [TaskStatus.queued, TaskStatus.in_progress]:
return {"updated": 0} return {"updated": 0}
if new_status_for_aborted_task and task.status == TaskStatus.in_progress:
new_status = new_status_for_aborted_task
return ChangeStatusRequest( return ChangeStatusRequest(
task=task, task=task,
new_status=new_status or task.enqueue_status or TaskStatus.created, new_status=new_status or task.enqueue_status or TaskStatus.created,

View File

@ -85,6 +85,7 @@ def archive_task(
status_message=status_message, status_message=status_message,
status_reason=status_reason, status_reason=status_reason,
remove_from_all_queues=True, remove_from_all_queues=True,
new_status_for_aborted_task=TaskStatus.stopped,
) )
except APIError: except APIError:
# dequeue may fail if the task was not enqueued # dequeue may fail if the task was not enqueued

View File

@ -182,7 +182,7 @@ def get_many_tasks_for_writing(
throw_on_forbidden: bool = True, throw_on_forbidden: bool = True,
) -> Sequence[Task]: ) -> Sequence[Task]:
if only: if only:
missing = [f for f in ("company", ) if f not in only] missing = [f for f in ("company",) if f not in only]
if missing: if missing:
only = [*only, *missing] only = [*only, *missing]
@ -235,7 +235,7 @@ def get_task_for_update(
task_id: str, task_id: str,
identity: Identity, identity: Identity,
allow_all_statuses: bool = False, allow_all_statuses: bool = False,
force: bool = False force: bool = False,
) -> Task: ) -> Task:
""" """
Loads only task id and return the task only if it is updatable (status == 'created') Loads only task id and return the task only if it is updatable (status == 'created')
@ -291,13 +291,62 @@ def get_last_metric_updates(
new_metrics = [] new_metrics = []
def add_last_metric_mean_update(
metric_path: str,
metric_count: int,
metric_total: float,
):
"""
Update new mean field based on the value in db and new data
The count field is updated here too and not with inc__ so that
it will not get updated in the db earlier than the corresponding mean
"""
metric_path = metric_path.replace("__", ".")
mean_value_field = f"{metric_path}.mean_value"
count_field = f"{metric_path}.count"
raw_updates[mean_value_field] = {
"$round": [
{
"$divide": [
{
"$add": [
{
"$multiply": [
{"$ifNull": [f"${mean_value_field}", 0]},
{"$ifNull": [f"${count_field}", 0]},
]
},
metric_total,
]
},
{
"$add": [
{"$ifNull": [f"${count_field}", 0]},
metric_count,
]
},
]
},
2,
]
}
raw_updates[count_field] = {
"$add": [
{"$ifNull": [f"${count_field}", 0]},
metric_count,
]
}
def add_last_metric_conditional_update( def add_last_metric_conditional_update(
metric_path: str, metric_value, iter_value: int, is_min: bool metric_path: str, metric_value, iter_value: int, is_min: bool, is_first: bool
): ):
""" """
Build an aggregation for an atomic update of the min or max value and the corresponding iteration Build an aggregation for an atomic update of the min or max value and the corresponding iteration
""" """
if is_min: if is_first:
field_prefix = "first"
op = None
elif is_min:
field_prefix = "min" field_prefix = "min"
op = "$gt" op = "$gt"
else: else:
@ -305,18 +354,23 @@ def get_last_metric_updates(
op = "$lt" op = "$lt"
value_field = f"{metric_path}__{field_prefix}_value".replace("__", ".") value_field = f"{metric_path}__{field_prefix}_value".replace("__", ".")
exists = {"$lte": [f"${value_field}", None]}
if op:
condition = { condition = {
"$or": [ "$or": [
{"$lte": [f"${value_field}", None]}, exists,
{op: [f"${value_field}", metric_value]}, {op: [f"${value_field}", metric_value]},
] ]
} }
else:
condition = exists
raw_updates[value_field] = { raw_updates[value_field] = {
"$cond": [condition, metric_value, f"${value_field}"] "$cond": [condition, metric_value, f"${value_field}"]
} }
value_iteration_field = f"{metric_path}__{field_prefix}_value_iteration".replace( value_iteration_field = (
"__", "." f"{metric_path}__{field_prefix}_value_iteration".replace("__", ".")
) )
raw_updates[value_iteration_field] = { raw_updates[value_iteration_field] = {
"$cond": [condition, iter_value, f"${value_iteration_field}"] "$cond": [condition, iter_value, f"${value_iteration_field}"]
@ -333,15 +387,25 @@ def get_last_metric_updates(
new_metrics.append(metric) new_metrics.append(metric)
path = f"last_metrics__{metric_key}__{variant_key}" path = f"last_metrics__{metric_key}__{variant_key}"
for key, value in variant_data.items(): for key, value in variant_data.items():
if key in ("min_value", "max_value"): if key in ("min_value", "max_value", "first_value"):
add_last_metric_conditional_update( add_last_metric_conditional_update(
metric_path=path, metric_path=path,
metric_value=value, metric_value=value,
iter_value=variant_data.get(f"{key}_iter", 0), iter_value=variant_data.get(f"{key}_iter", 0),
is_min=(key == "min_value"), is_min=(key == "min_value"),
is_first=(key == "first_value"),
) )
elif key in ("metric", "variant", "value"): elif key in ("metric", "variant", "value"):
extra_updates[f"set__{path}__{key}"] = value extra_updates[f"set__{path}__{key}"] = value
count = variant_data.get("count")
total = variant_data.get("total")
if count is not None and total is not None:
add_last_metric_mean_update(
metric_path=path,
metric_count=count,
metric_total=total,
)
if new_metrics: if new_metrics:
extra_updates["add_to_set__unique_metrics"] = new_metrics extra_updates["add_to_set__unique_metrics"] = new_metrics

View File

@ -5,6 +5,7 @@ from mongoengine import (
LongField, LongField,
EmbeddedDocumentField, EmbeddedDocumentField,
IntField, IntField,
FloatField,
) )
from apiserver.database.fields import SafeMapField from apiserver.database.fields import SafeMapField
@ -23,6 +24,10 @@ class MetricEvent(EmbeddedDocument):
min_value_iteration = IntField() min_value_iteration = IntField()
max_value = DynamicField() # for backwards compatibility reasons max_value = DynamicField() # for backwards compatibility reasons
max_value_iteration = IntField() max_value_iteration = IntField()
first_value = FloatField()
first_value_iteration = IntField()
count = IntField()
mean_value = FloatField()
class EventStats(EmbeddedDocument): class EventStats(EmbeddedDocument):

View File

@ -73,7 +73,7 @@ def init_mongo_data():
} }
internal_user_emails.add(email.lower()) internal_user_emails.add(email.lower())
revoke = fixed_mode and credentials.get("revoke_in_fixed_mode", False) revoke = fixed_mode and credentials.get("revoke_in_fixed_mode", False)
user_id = _ensure_auth_user(user_data, company_id, log=log, revoke=revoke, internal_user=True) user_id = _ensure_auth_user(user_data, company_id, log=log, revoke=revoke)
if credentials.role == Role.user: if credentials.role == Role.user:
_ensure_backend_user(user_id, company_id, credentials.display_name) _ensure_backend_user(user_id, company_id, credentials.display_name)

View File

@ -10,12 +10,7 @@ from apiserver.service_repo.auth.fixed_user import FixedUser
def _ensure_user_credentials( def _ensure_user_credentials(
user: AuthUser, user: AuthUser, key: str, secret: str, log: Logger, revoke: bool = False
key: str,
secret: str,
log: Logger,
revoke: bool = False,
internal_user: bool = False,
) -> None: ) -> None:
if revoke: if revoke:
log.info(f"Revoking credentials for existing user {user.id} ({user.name})") log.info(f"Revoking credentials for existing user {user.id} ({user.name})")
@ -24,34 +19,19 @@ def _ensure_user_credentials(
return return
if not (key and secret): if not (key and secret):
if internal_user:
log.info(f"Resetting credentials for existing user {user.id} ({user.name})") log.info(f"Resetting credentials for existing user {user.id} ({user.name})")
user.credentials = [] user.credentials = []
user.save() user.save()
return return
new_credentials = Credentials(key=key, secret=secret) new_credentials = Credentials(key=key, secret=secret)
if internal_user:
log.info(f"Setting credentials for existing user {user.id} ({user.name})") log.info(f"Setting credentials for existing user {user.id} ({user.name})")
user.credentials = [new_credentials] user.credentials = [new_credentials]
user.save() user.save()
return return
if user.credentials is None:
user.credentials = []
if not any((cred.key, cred.secret) == (key, secret) for cred in user.credentials):
log.info(f"Adding credentials for existing user {user.id} ({user.name})")
user.credentials.append(new_credentials)
user.save()
def _ensure_auth_user(user_data: dict, company_id: str, log: Logger, revoke: bool = False) -> str:
def _ensure_auth_user(
user_data: dict,
company_id: str,
log: Logger,
revoke: bool = False,
internal_user: bool = False,
) -> str:
user_id = user_data.get("id", f"__{user_data['name']}__") user_id = user_data.get("id", f"__{user_data['name']}__")
role = user_data["role"] role = user_data["role"]
email = user_data["email"] email = user_data["email"]
@ -60,15 +40,12 @@ def _ensure_auth_user(
user: AuthUser = AuthUser.objects(id=user_id).first() user: AuthUser = AuthUser.objects(id=user_id).first()
if user: if user:
_ensure_user_credentials( _ensure_user_credentials(user=user, key=key, secret=secret, log=log, revoke=revoke)
user=user, if (
key=key, user.role != role
secret=secret, or user.email != email
log=log, or user.autocreated != autocreated
revoke=revoke, ):
internal_user=internal_user,
)
if user.role != role or user.email != email or user.autocreated != autocreated:
user.email = email user.email = email
user.role = role user.role = role
user.autocreated = autocreated user.autocreated = autocreated
@ -77,7 +54,9 @@ def _ensure_auth_user(
return user.id return user.id
credentials = ( credentials = (
[Credentials(key=key, secret=secret)] if not revoke and key and secret else [] [Credentials(key=key, secret=secret)]
if not revoke and key and secret
else []
) )
log.info(f"Creating user: {user_data['name']}") log.info(f"Creating user: {user_data['name']}")
@ -129,9 +108,7 @@ def ensure_fixed_user(user: FixedUser, log: Logger, emails: set):
try: try:
log.info(f"Updating user name: {user.name}") log.info(f"Updating user name: {user.name}")
given_name, _, family_name = user.name.partition(" ") given_name, _, family_name = user.name.partition(" ")
db_user.update( db_user.update(name=user.name, given_name=given_name, family_name=family_name)
name=user.name, given_name=given_name, family_name=family_name
)
except Exception: except Exception:
pass pass
else: else:

View File

@ -283,6 +283,22 @@ last_metrics_event {
description: "The iteration at which the maximum value was reported" description: "The iteration at which the maximum value was reported"
type: integer type: integer
} }
first_value {
description: "First value reported"
type: number
}
first_value_iteration {
description: "The iteration at which the first value was reported"
type: integer
}
mean_value {
description: "The mean value"
type: number
}
count {
description: "The total count of reported values"
type: integer
}
} }
} }
last_metrics_variants { last_metrics_variants {

View File

@ -217,7 +217,10 @@ class TestTaskEvents(TestService):
self.assertEqual(iter_count - 1, metric_data.max_value_iteration) self.assertEqual(iter_count - 1, metric_data.max_value_iteration)
self.assertEqual(0, metric_data.min_value) self.assertEqual(0, metric_data.min_value)
self.assertEqual(0, metric_data.min_value_iteration) self.assertEqual(0, metric_data.min_value_iteration)
self.assertEqual(0, metric_data.first_value_iteration)
self.assertEqual(0, metric_data.first_value)
self.assertEqual(iter_count, metric_data.count)
self.assertEqual(sum(i for i in range(iter_count)) / iter_count, metric_data.mean_value)
res = self.api.events.get_task_latest_scalar_values(task=task) res = self.api.events.get_task_latest_scalar_values(task=task)
self.assertEqual(iter_count - 1, res.last_iter) self.assertEqual(iter_count - 1, res.last_iter)

View File

@ -87,7 +87,7 @@ services:
networks: networks:
- backend - backend
container_name: clearml-mongo container_name: clearml-mongo
image: mongo:4.4.29 image: mongo:5.0.26
restart: unless-stopped restart: unless-stopped
command: --setParameter internalQueryMaxBlockingSortMemoryUsageBytes=196100200 command: --setParameter internalQueryMaxBlockingSortMemoryUsageBytes=196100200
volumes: volumes:

View File

@ -88,7 +88,7 @@ services:
networks: networks:
- backend - backend
container_name: clearml-mongo container_name: clearml-mongo
image: mongo:4.4.29 image: mongo:5.0.26
restart: unless-stopped restart: unless-stopped
command: --setParameter internalQueryMaxBlockingSortMemoryUsageBytes=196100200 command: --setParameter internalQueryMaxBlockingSortMemoryUsageBytes=196100200
volumes: volumes: