Add per endpoint-variable add/remove statistics logging

This commit is contained in:
allegroai 2022-06-05 16:11:17 +03:00
parent 8778f723e6
commit f2e207e2f2
2 changed files with 50 additions and 13 deletions

View File

@ -22,11 +22,17 @@ def func_metric_rm(args):
print("Serving service Task {}, Removing metrics from endpoint={}".format(
request_processor.get_id(), args.endpoint))
request_processor.deserialize(skip_sync=True)
for v in (args.variable or []):
if request_processor.remove_metric_logging(endpoint=args.endpoint, variable_name=v):
print("Removing static endpoint: {}".format(args.endpoint))
if not args.variable:
if request_processor.remove_metric_logging(endpoint=args.endpoint):
print("Removing metric endpoint: {}".format(args.endpoint))
else:
raise ValueError("Could not remove {} from endpoin {}".format(v, args.endpoint))
raise ValueError("Could not remove metric endpoint {}".format(args.endpoint))
else:
for v in args.variable:
if request_processor.remove_metric_logging(endpoint=args.endpoint, variable_name=v):
print("Removing metric endpoint: {} / {}".format(args.endpoint, v))
else:
raise ValueError("Could not remove metric {} from endpoint {}".format(v, args.endpoint))
print("Updating serving service")
request_processor.serialize()
@ -69,7 +75,7 @@ def func_metric_add(args):
print("Warning: {} defined twice".format(name))
metric.metrics[name] = dict(type="value", buckets=None)
if not request_processor.add_metric_logging(metric=metric):
if not request_processor.add_metric_logging(metric=metric, update=True):
raise ValueError("Could not add metric logging endpoint {}".format(args.endpoint))
print("Updating serving service")
@ -332,7 +338,7 @@ def cli():
parser_metrics_rm.add_argument(
'--endpoint', type=str, help='metric endpoint name including version, e.g. "model/1" or a prefix "model/*"')
parser_metrics_rm.add_argument(
'--variable', type=str, nargs='+',
'--variable', type=str, nargs='*',
help='Remove (scalar/enum) argument from the metric logger, <name> example: "x1"')
parser_metrics_rm.set_defaults(func=func_metric_rm)

View File

@ -11,6 +11,7 @@ from multiprocessing import Lock
from numpy.random import choice
from clearml import Task, Model
from clearml.utilities.dicts import merge_dicts
from clearml.storage.util import hash_dict
from .preprocess_service import BasePreprocessRequest
from .endpoints import ModelEndpoint, ModelMonitoring, CanaryEP, EndpointMetricLogging
@ -339,7 +340,7 @@ class ModelRequestProcessor(object):
self._canary_endpoints.pop(endpoint_url, None)
return True
def add_metric_logging(self, metric: Union[EndpointMetricLogging, dict]) -> bool:
def add_metric_logging(self, metric: Union[EndpointMetricLogging, dict], update: bool = False) -> bool:
"""
Add metric logging to a specific endpoint
Valid metric variable are any variables on the request or response dictionary,
@ -348,6 +349,7 @@ class ModelRequestProcessor(object):
When overwriting and existing monitored variable, output a warning.
:param metric: Metric variable to monitor
:param update: If True update the current metric with the new one, otherwise overwrite if exists
:return: True if successful
"""
if not isinstance(metric, EndpointMetricLogging):
@ -360,9 +362,15 @@ class ModelRequestProcessor(object):
raise ValueError("Metric logging \'{}\' references a nonexistent endpoint".format(name))
if name in self._metric_logging:
print("Warning: Metric logging \'{}\' overwritten".format(name))
print("Warning: Metric logging \'{}\' {}".format(name, "updated" if update else "overwritten"))
self._metric_logging[name] = metric
if update and name in self._metric_logging:
metric_dict = metric.as_dict()
cur_metric_dict = self._metric_logging[name].as_dict()
metric_dict = merge_dicts(cur_metric_dict, metric_dict)
self._metric_logging[name] = EndpointMetricLogging(**metric_dict)
else:
self._metric_logging[name] = metric
return True
def remove_metric_logging(
@ -420,6 +428,7 @@ class ModelRequestProcessor(object):
"""
Restore ModelRequestProcessor state from Task
return True if actually needed serialization, False nothing changed
:param task: Load data from Task
:param prefetch_artifacts: If True prefetch artifacts requested by the endpoints
:param skip_sync: If True do not update the canary/monitoring state
@ -429,19 +438,26 @@ class ModelRequestProcessor(object):
if not task:
task = self._task
task.reload()
configuration = task.get_parameters_as_dict().get("General") or {}
endpoints = task.get_configuration_object_as_dict(name='endpoints') or {}
canary_ep = task.get_configuration_object_as_dict(name='canary') or {}
model_monitoring = task.get_configuration_object_as_dict(name='model_monitoring') or {}
metric_logging = task.get_configuration_object_as_dict(name='metric_logging') or {}
task_artifacts = task.artifacts
artifacts_hash = [
task_artifacts[m["preprocess_artifact"]].hash
for m in list(endpoints.values()) + list(model_monitoring.values())
if m.get("preprocess_artifact") and m.get("preprocess_artifact") in task_artifacts
]
hashed_conf = hash_dict(
dict(endpoints=endpoints,
canary_ep=canary_ep,
model_monitoring=model_monitoring,
metric_logging=metric_logging,
configuration=configuration)
configuration=configuration,
artifacts_hash=artifacts_hash)
)
if self._last_update_hash == hashed_conf and not self._model_monitoring_update_request:
return False
@ -518,6 +534,13 @@ class ModelRequestProcessor(object):
return True
def reload(self) -> None:
"""
Reload the serving session state from the backend
"""
self._task.reload()
self.deserialize(prefetch_artifacts=False, skip_sync=False, update_current_task=False)
def serialize(self, task: Optional[Task] = None) -> None:
"""
Store ModelRequestProcessor state into Task
@ -871,9 +894,18 @@ class ModelRequestProcessor(object):
# clear plot if we had any
return
# noinspection PyProtectedMember
model_link_template = "{}/projects/*/models/{{model}}/".format(self._task._get_app_server().rstrip("/"))
endpoints = [e.as_dict() for e in endpoints]
table_values = [list(endpoints[0].keys())]
table_values += [[e[c] or "" for c in table_values[0]] for e in endpoints]
table_values += [
[
e.get(c) or "" if c != "model_id" else "<a href=\"{}\"> {} </a>".format(
model_link_template.format(model=e["model_id"]), e["model_id"])
for c in table_values[0]
] for e in endpoints
]
self._instance_task.get_logger().report_table(
title='Serving Endpoint Configuration', series='Details', iteration=0, table_plot=table_values,
extra_layout={"title": "Model Endpoints Details"})
@ -1063,7 +1095,6 @@ class ModelRequestProcessor(object):
@classmethod
def list_control_plane_tasks(
cls,
task_id: Optional[str] = None,
name: Optional[str] = None,
project: Optional[str] = None,
tags: Optional[List[str]] = None