From f2e207e2f2b4b0b1088939e7e5ffd0fd0247526b Mon Sep 17 00:00:00 2001 From: allegroai Date: Sun, 5 Jun 2022 16:11:17 +0300 Subject: [PATCH] Add per endpoint-variable add/remove statistics logging --- clearml_serving/__main__.py | 18 +++++--- .../serving/model_request_processor.py | 45 ++++++++++++++++--- 2 files changed, 50 insertions(+), 13 deletions(-) diff --git a/clearml_serving/__main__.py b/clearml_serving/__main__.py index 55af8a1..86a9ff6 100644 --- a/clearml_serving/__main__.py +++ b/clearml_serving/__main__.py @@ -22,11 +22,17 @@ def func_metric_rm(args): print("Serving service Task {}, Removing metrics from endpoint={}".format( request_processor.get_id(), args.endpoint)) request_processor.deserialize(skip_sync=True) - for v in (args.variable or []): - if request_processor.remove_metric_logging(endpoint=args.endpoint, variable_name=v): - print("Removing static endpoint: {}".format(args.endpoint)) + if not args.variable: + if request_processor.remove_metric_logging(endpoint=args.endpoint): + print("Removing metric endpoint: {}".format(args.endpoint)) else: - raise ValueError("Could not remove {} from endpoin {}".format(v, args.endpoint)) + raise ValueError("Could not remove metric endpoint {}".format(args.endpoint)) + else: + for v in args.variable: + if request_processor.remove_metric_logging(endpoint=args.endpoint, variable_name=v): + print("Removing metric endpoint: {} / {}".format(args.endpoint, v)) + else: + raise ValueError("Could not remove metric {} from endpoint {}".format(v, args.endpoint)) print("Updating serving service") request_processor.serialize() @@ -69,7 +75,7 @@ def func_metric_add(args): print("Warning: {} defined twice".format(name)) metric.metrics[name] = dict(type="value", buckets=None) - if not request_processor.add_metric_logging(metric=metric): + if not request_processor.add_metric_logging(metric=metric, update=True): raise ValueError("Could not add metric logging endpoint {}".format(args.endpoint)) print("Updating serving service") @@ -332,7 +338,7 @@ def cli(): parser_metrics_rm.add_argument( '--endpoint', type=str, help='metric endpoint name including version, e.g. "model/1" or a prefix "model/*"') parser_metrics_rm.add_argument( - '--variable', type=str, nargs='+', + '--variable', type=str, nargs='*', help='Remove (scalar/enum) argument from the metric logger, example: "x1"') parser_metrics_rm.set_defaults(func=func_metric_rm) diff --git a/clearml_serving/serving/model_request_processor.py b/clearml_serving/serving/model_request_processor.py index d52d7f6..acd1877 100644 --- a/clearml_serving/serving/model_request_processor.py +++ b/clearml_serving/serving/model_request_processor.py @@ -11,6 +11,7 @@ from multiprocessing import Lock from numpy.random import choice from clearml import Task, Model +from clearml.utilities.dicts import merge_dicts from clearml.storage.util import hash_dict from .preprocess_service import BasePreprocessRequest from .endpoints import ModelEndpoint, ModelMonitoring, CanaryEP, EndpointMetricLogging @@ -339,7 +340,7 @@ class ModelRequestProcessor(object): self._canary_endpoints.pop(endpoint_url, None) return True - def add_metric_logging(self, metric: Union[EndpointMetricLogging, dict]) -> bool: + def add_metric_logging(self, metric: Union[EndpointMetricLogging, dict], update: bool = False) -> bool: """ Add metric logging to a specific endpoint Valid metric variable are any variables on the request or response dictionary, @@ -348,6 +349,7 @@ class ModelRequestProcessor(object): When overwriting and existing monitored variable, output a warning. :param metric: Metric variable to monitor + :param update: If True update the current metric with the new one, otherwise overwrite if exists :return: True if successful """ if not isinstance(metric, EndpointMetricLogging): @@ -360,9 +362,15 @@ class ModelRequestProcessor(object): raise ValueError("Metric logging \'{}\' references a nonexistent endpoint".format(name)) if name in self._metric_logging: - print("Warning: Metric logging \'{}\' overwritten".format(name)) + print("Warning: Metric logging \'{}\' {}".format(name, "updated" if update else "overwritten")) - self._metric_logging[name] = metric + if update and name in self._metric_logging: + metric_dict = metric.as_dict() + cur_metric_dict = self._metric_logging[name].as_dict() + metric_dict = merge_dicts(cur_metric_dict, metric_dict) + self._metric_logging[name] = EndpointMetricLogging(**metric_dict) + else: + self._metric_logging[name] = metric return True def remove_metric_logging( @@ -420,6 +428,7 @@ class ModelRequestProcessor(object): """ Restore ModelRequestProcessor state from Task return True if actually needed serialization, False nothing changed + :param task: Load data from Task :param prefetch_artifacts: If True prefetch artifacts requested by the endpoints :param skip_sync: If True do not update the canary/monitoring state @@ -429,19 +438,26 @@ class ModelRequestProcessor(object): if not task: task = self._task - task.reload() configuration = task.get_parameters_as_dict().get("General") or {} endpoints = task.get_configuration_object_as_dict(name='endpoints') or {} canary_ep = task.get_configuration_object_as_dict(name='canary') or {} model_monitoring = task.get_configuration_object_as_dict(name='model_monitoring') or {} metric_logging = task.get_configuration_object_as_dict(name='metric_logging') or {} + task_artifacts = task.artifacts + artifacts_hash = [ + task_artifacts[m["preprocess_artifact"]].hash + for m in list(endpoints.values()) + list(model_monitoring.values()) + if m.get("preprocess_artifact") and m.get("preprocess_artifact") in task_artifacts + ] + hashed_conf = hash_dict( dict(endpoints=endpoints, canary_ep=canary_ep, model_monitoring=model_monitoring, metric_logging=metric_logging, - configuration=configuration) + configuration=configuration, + artifacts_hash=artifacts_hash) ) if self._last_update_hash == hashed_conf and not self._model_monitoring_update_request: return False @@ -518,6 +534,13 @@ class ModelRequestProcessor(object): return True + def reload(self) -> None: + """ + Reload the serving session state from the backend + """ + self._task.reload() + self.deserialize(prefetch_artifacts=False, skip_sync=False, update_current_task=False) + def serialize(self, task: Optional[Task] = None) -> None: """ Store ModelRequestProcessor state into Task @@ -871,9 +894,18 @@ class ModelRequestProcessor(object): # clear plot if we had any return + # noinspection PyProtectedMember + model_link_template = "{}/projects/*/models/{{model}}/".format(self._task._get_app_server().rstrip("/")) + endpoints = [e.as_dict() for e in endpoints] table_values = [list(endpoints[0].keys())] - table_values += [[e[c] or "" for c in table_values[0]] for e in endpoints] + table_values += [ + [ + e.get(c) or "" if c != "model_id" else " {} ".format( + model_link_template.format(model=e["model_id"]), e["model_id"]) + for c in table_values[0] + ] for e in endpoints + ] self._instance_task.get_logger().report_table( title='Serving Endpoint Configuration', series='Details', iteration=0, table_plot=table_values, extra_layout={"title": "Model Endpoints Details"}) @@ -1063,7 +1095,6 @@ class ModelRequestProcessor(object): @classmethod def list_control_plane_tasks( cls, - task_id: Optional[str] = None, name: Optional[str] = None, project: Optional[str] = None, tags: Optional[List[str]] = None