From eb755be001dbd88de3d7d38ee937ad25f9ac8298 Mon Sep 17 00:00:00 2001 From: clearml <> Date: Thu, 5 Dec 2024 22:20:11 +0200 Subject: [PATCH] Add model endpoints support --- apiserver/apierrors/errors.conf | 5 + apiserver/apimodels/serving.py | 90 ++++ apiserver/bll/query/builder.py | 23 +- apiserver/bll/serving/__init__.py | 358 ++++++++++++++++ apiserver/bll/serving/stats.py | 298 +++++++++++++ .../config/default/services/serving.conf | 7 + .../workers/serving_stats.json | 79 ++++ .../schema/services/_workers_common.conf | 67 +++ apiserver/schema/services/serving.conf | 400 ++++++++++++++++++ apiserver/schema/services/workers.conf | 69 +-- apiserver/services/serving.py | 69 +++ apiserver/tests/automated/test_serving.py | 101 +++++ 12 files changed, 1494 insertions(+), 72 deletions(-) create mode 100644 apiserver/apimodels/serving.py create mode 100644 apiserver/bll/serving/__init__.py create mode 100644 apiserver/bll/serving/stats.py create mode 100644 apiserver/config/default/services/serving.conf create mode 100644 apiserver/elastic/index_templates/workers/serving_stats.json create mode 100644 apiserver/schema/services/_workers_common.conf create mode 100644 apiserver/schema/services/serving.conf create mode 100644 apiserver/services/serving.py create mode 100644 apiserver/tests/automated/test_serving.py diff --git a/apiserver/apierrors/errors.conf b/apiserver/apierrors/errors.conf index ca78c81..90eaf60 100644 --- a/apiserver/apierrors/errors.conf +++ b/apiserver/apierrors/errors.conf @@ -106,6 +106,11 @@ 1004: ["worker_not_registered", "worker is not registered"] 1005: ["worker_stats_not_found", "worker stats not found"] + # Serving + 1050: ["invalid_container_id", "invalid container id"] + 1051: ["container_not_registered", "container is not registered"] + 1052: ["no_containers_for_url", "no container instances found for serice url"] + 1104: ["invalid_scroll_id", "Invalid scroll id"] } diff --git a/apiserver/apimodels/serving.py b/apiserver/apimodels/serving.py new file mode 100644 index 0000000..2404085 --- /dev/null +++ b/apiserver/apimodels/serving.py @@ -0,0 +1,90 @@ +from enum import Enum + +from jsonmodels.models import Base +from jsonmodels.fields import ( + StringField, + EmbeddedField, + DateTimeField, + IntField, + FloatField, BoolField, +) +from jsonmodels.validators import Min + +from apiserver.apimodels import ListField, JsonSerializableMixin +from apiserver.apimodels import ActualEnumField +from apiserver.config_repo import config +from .workers import MachineStats + + +class ServingModel(Base): + container_id = StringField(required=True) + endpoint_name = StringField(required=True) + endpoint_url = StringField() # can be not existing yet at registration time + model_name = StringField(required=True) + model_source = StringField() + model_version = StringField() + preprocess_artifact = StringField() + input_type = StringField() + input_size = IntField() + tags = ListField(str) + system_tags = ListField(str) + + +class RegisterRequest(ServingModel): + timeout = IntField( + default=int(config.get("services.serving.default_container_timeout_sec", 10 * 60)), + validators=[Min(1)] + ) + """ registration timeout in seconds (default is 10min) """ + + +class UnregisterRequest(Base): + container_id = StringField(required=True) + + +class StatusReportRequest(ServingModel): + uptime_sec = IntField() + requests_num = IntField() + requests_min = FloatField() + latency_ms = IntField() + machine_stats: MachineStats = EmbeddedField(MachineStats) + + +class ServingContainerEntry(StatusReportRequest, JsonSerializableMixin): + key = StringField(required=True) + company_id = StringField(required=True) + ip = StringField() + register_time = DateTimeField(required=True) + register_timeout = IntField(required=True) + last_activity_time = DateTimeField(required=True) + + +class GetEndpointDetailsRequest(Base): + endpoint_url = StringField(required=True) + + +class MetricType(Enum): + requests = "requests" + requests_min = "requests_min" + latency_ms = "latency_ms" + cpu_count = "cpu_count" + gpu_count = "gpu_count" + cpu_util = "cpu_util" + gpu_util = "gpu_util" + ram_total = "ram_total" + ram_free = "ram_free" + gpu_ram_total = "gpu_ram_total" + gpu_ram_free = "gpu_ram_free" + network_rx = "network_rx" + network_tx = "network_tx" + + +class GetEndpointMetricsHistoryRequest(Base): + from_date = FloatField(required=True, validators=Min(0)) + to_date = FloatField(required=True, validators=Min(0)) + interval = IntField(required=True, validators=Min(1)) + endpoint_url = StringField(required=True) + metric_type = ActualEnumField( + MetricType, default=MetricType.requests + ) + instance_charts = BoolField(default=True) diff --git a/apiserver/bll/query/builder.py b/apiserver/bll/query/builder.py index 0581b5d..f28ea9c 100644 --- a/apiserver/bll/query/builder.py +++ b/apiserver/bll/query/builder.py @@ -9,20 +9,35 @@ RANGE_IGNORE_VALUE = -1 class Builder: @staticmethod - def dates_range(from_date: Union[int, float], to_date: Union[int, float]) -> dict: + def dates_range( + from_date: Optional[Union[int, float]] = None, + to_date: Optional[Union[int, float]] = None, + ) -> dict: + assert ( + from_date or to_date + ), "range condition requires that at least one of from_date or to_date specified" + conditions = {} + if from_date: + conditions["gte"] = int(from_date) + if to_date: + conditions["lte"] = int(to_date) return { "range": { "timestamp": { - "gte": int(from_date), - "lte": int(to_date), + **conditions, "format": "epoch_second", } } } @staticmethod - def terms(field: str, values: Iterable[str]) -> dict: + def terms(field: str, values: Iterable) -> dict: + if isinstance(values, str): + assert not isinstance(values, str), "apparently 'term' should be used here" return {"terms": {field: list(values)}} + @staticmethod + def term(field: str, value) -> dict: + return {"term": {field: value}} @staticmethod def normalize_range( diff --git a/apiserver/bll/serving/__init__.py b/apiserver/bll/serving/__init__.py new file mode 100644 index 0000000..f8016dc --- /dev/null +++ b/apiserver/bll/serving/__init__.py @@ -0,0 +1,358 @@ +from datetime import datetime, timedelta +from enum import Enum, auto +from operator import attrgetter +from time import time +from typing import Optional, Sequence, Union + +import attr +from boltons.iterutils import chunked_iter, bucketize +from pyhocon import ConfigTree + +from apiserver.apimodels.serving import ( + ServingContainerEntry, + RegisterRequest, + StatusReportRequest, +) +from apiserver.apierrors import errors +from apiserver.config_repo import config +from apiserver.redis_manager import redman +from .stats import ServingStats + +log = config.logger(__file__) + + +class ServingBLL: + def __init__(self, redis=None): + self.conf = config.get("services.serving", ConfigTree()) + self.redis = redis or redman.connection("workers") + + @staticmethod + def _get_url_key(company: str, url: str): + return f"serving_url_{company}_{url}" + + @staticmethod + def _get_container_key(company: str, container_id: str) -> str: + """Build redis key from company and container_id""" + return f"serving_container_{company}_{container_id}" + + def _save_serving_container_entry(self, entry: ServingContainerEntry): + self.redis.setex( + entry.key, timedelta(seconds=entry.register_timeout), entry.to_json() + ) + + url_key = self._get_url_key(entry.company_id, entry.endpoint_url) + expiration = int(time()) + entry.register_timeout + container_item = {entry.key: expiration} + self.redis.zadd(url_key, container_item) + # make sure that url set will not get stuck in redis + # indefinitely in case no more containers report to it + self.redis.expire(url_key, max(3600, entry.register_timeout)) + + def _get_serving_container_entry( + self, company_id: str, container_id: str + ) -> Optional[ServingContainerEntry]: + """ + Get a container entry for the provided container ID. + """ + key = self._get_container_key(company_id, container_id) + data = self.redis.get(key) + if not data: + return + + try: + entry = ServingContainerEntry.from_json(data) + return entry + except Exception as e: + msg = "Failed parsing container entry" + log.exception(f"{msg}: {str(e)}") + + def register_serving_container( + self, + company_id: str, + request: RegisterRequest, + ip: str = "", + ) -> ServingContainerEntry: + """ + Register a serving container + """ + now = datetime.utcnow() + key = self._get_container_key(company_id, request.container_id) + entry = ServingContainerEntry( + **request.to_struct(), + key=key, + company_id=company_id, + ip=ip, + register_time=now, + register_timeout=request.timeout, + last_activity_time=now, + ) + self._save_serving_container_entry(entry) + return entry + + def unregister_serving_container( + self, + company_id: str, + container_id: str, + ) -> None: + """ + Unregister a serving container + """ + entry = self._get_serving_container_entry(company_id, container_id) + if entry: + url_key = self._get_url_key(entry.company_id, entry.endpoint_url) + self.redis.zrem(url_key, entry.key) + + key = self._get_container_key(company_id, container_id) + res = self.redis.delete(key) + if res: + return + + if not self.conf.get("container_auto_unregister", True): + raise errors.bad_request.ContainerNotRegistered(container=container_id) + + def container_status_report( + self, + company_id: str, + report: StatusReportRequest, + ip: str = "", + ) -> None: + """ + Serving container status report + """ + container_id = report.container_id + now = datetime.utcnow() + entry = self._get_serving_container_entry(company_id, container_id) + if entry: + ip = ip or entry.ip + register_time = entry.register_time + register_timeout = entry.register_timeout + else: + if not self.conf.get("container_auto_register", True): + raise errors.bad_request.ContainerNotRegistered(container=container_id) + ip = ip + register_time = now + register_timeout = int( + self.conf.get("default_container_timeout_sec", 10 * 60) + ) + + key = self._get_container_key(company_id, container_id) + entry = ServingContainerEntry( + **report.to_struct(), + key=key, + company_id=company_id, + ip=ip, + register_time=register_time, + register_timeout=register_timeout, + last_activity_time=now, + ) + self._save_serving_container_entry(entry) + ServingStats.log_stats_to_es(entry) + + def _get_all( + self, + company_id: str, + ) -> Sequence[ServingContainerEntry]: + keys = list(self.redis.scan_iter(self._get_container_key(company_id, "*"))) + entries = [] + for keys in chunked_iter(keys, 1000): + data = self.redis.mget(keys) + if not data: + continue + for d in data: + try: + entries.append(ServingContainerEntry.from_json(d)) + except Exception as ex: + log.error(f"Failed parsing container entry {str(ex)}") + + return entries + + @attr.s(auto_attribs=True) + class Counter: + class AggType(Enum): + avg = auto() + max = auto() + total = auto() + count = auto() + + name: str + field: str + agg_type: AggType + float_precision: int = None + + _max: Union[int, float, datetime] = attr.field(init=False, default=None) + _total: Union[int, float] = attr.field(init=False, default=0) + _count: int = attr.field(init=False, default=0) + + def add(self, entry: ServingContainerEntry): + value = getattr(entry, self.field, None) + if value is None: + return + + self._count += 1 + if self.agg_type == self.AggType.max: + self._max = value if self._max is None else max(self._max, value) + else: + self._total += value + + def __call__(self): + if self.agg_type == self.AggType.count: + return self._count + + if self.agg_type == self.AggType.max: + return self._max + + if self.agg_type == self.AggType.total: + return self._total + + if not self._count: + return None + avg = self._total / self._count + return round(avg, self.float_precision) if self.float_precision else round(avg) + + def _get_summary(self, entries: Sequence[ServingContainerEntry]) -> dict: + counters = [ + self.Counter( + name="uptime_sec", + field="uptime_sec", + agg_type=self.Counter.AggType.max, + ), + self.Counter( + name="requests", + field="requests_num", + agg_type=self.Counter.AggType.total, + ), + self.Counter( + name="requests_min", + field="requests_min", + agg_type=self.Counter.AggType.avg, + float_precision=2, + ), + self.Counter( + name="latency_ms", + field="latency_ms", + agg_type=self.Counter.AggType.avg, + ), + self.Counter( + name="last_update", + field="last_activity_time", + agg_type=self.Counter.AggType.max, + ), + ] + for entry in entries: + for counter in counters: + counter.add(entry) + + first_entry = entries[0] + ret = { + "endpoint": first_entry.endpoint_name, + "model": first_entry.model_name, + "url": first_entry.endpoint_url, + "instances": len(entries), + **{counter.name: counter() for counter in counters}, + } + ret["last_update"] = self._naive_time(ret.get("last_update")) + return ret + + def get_endpoints(self, company_id: str): + """ + Group instances by urls and return a summary for each url + Do not return data for "loading" instances that have no url + """ + entries = self._get_all(company_id) + by_url = bucketize(entries, key=attrgetter("endpoint_url")) + by_url.pop(None, None) + return [self._get_summary(url_entries) for url_entries in by_url.values()] + + def _get_endpoint_entries(self, company_id, endpoint_url: Union[str, None]) -> Sequence[ServingContainerEntry]: + url_key = self._get_url_key(company_id, endpoint_url) + timestamp = int(time()) + self.redis.zremrangebyscore(url_key, min=0, max=timestamp) + container_keys = {key.decode() for key in self.redis.zrange(url_key, 0, -1)} + if not container_keys: + return [] + + entries = [] + found_keys = set() + data = self.redis.mget(container_keys) or [] + for d in data: + try: + entry = ServingContainerEntry.from_json(d) + if entry.endpoint_url == endpoint_url: + entries.append(entry) + found_keys.add(entry.key) + except Exception as ex: + log.error(f"Failed parsing container entry {str(ex)}") + + missing_keys = container_keys - found_keys + if missing_keys: + self.redis.zrem(url_key, *missing_keys) + + return entries + + def get_loading_instances(self, company_id: str): + entries = self._get_endpoint_entries(company_id, None) + return [ + { + "id": entry.container_id, + "endpoint": entry.endpoint_name, + "url": entry.endpoint_url, + "model": entry.model_name, + "model_source": entry.model_source, + "model_version": entry.model_version, + "preprocess_artifact": entry.preprocess_artifact, + "input_type": entry.input_type, + "input_size": entry.input_size, + "uptime_sec": entry.uptime_sec, + "last_update": self._naive_time(entry.last_activity_time), + } + for entry in entries + ] + + @staticmethod + def _naive_time(input_: datetime) -> datetime: + if not isinstance(input_, datetime): + return input_ + + return input_.replace(tzinfo=None) + + def get_endpoint_details(self, company_id, endpoint_url: str) -> dict: + entries = self._get_endpoint_entries(company_id, endpoint_url) + if not entries: + raise errors.bad_request.NoContainersForUrl(url=endpoint_url) + + instances = [] + entry: ServingContainerEntry + for entry in entries: + instances.append( + { + "endpoint": entry.endpoint_name, + "model": entry.model_name, + "url": entry.endpoint_url, + + } + ) + + first_entry = entries[0] + return { + "endpoint": first_entry.endpoint_name, + "model": first_entry.model_name, + "url": first_entry.endpoint_url, + "preprocess_artifact": first_entry.preprocess_artifact, + "input_type": first_entry.input_type, + "input_size": first_entry.input_size, + "model_source": first_entry.model_source, + "model_version": first_entry.model_version, + "uptime_sec": max(e.uptime_sec for e in entries), + "last_update": self._naive_time(max(e.last_activity_time for e in entries)), + "instances": [ + { + "id": entry.container_id, + "uptime_sec": entry.uptime_sec, + "requests": entry.requests_num, + "requests_min": entry.requests_min, + "latency_ms": entry.latency_ms, + "last_update": self._naive_time(entry.last_activity_time), + } + for entry in entries + ] + } diff --git a/apiserver/bll/serving/stats.py b/apiserver/bll/serving/stats.py new file mode 100644 index 0000000..47aba59 --- /dev/null +++ b/apiserver/bll/serving/stats.py @@ -0,0 +1,298 @@ +from collections import defaultdict +from datetime import datetime +from enum import Enum + +from typing import Tuple, Optional, Sequence + +from elasticsearch import Elasticsearch + +from apiserver.apimodels.serving import ( + ServingContainerEntry, + GetEndpointMetricsHistoryRequest, + MetricType, +) +from apiserver.apierrors import errors +from apiserver.utilities.dicts import nested_get +from apiserver.bll.query import Builder as QueryBuilder +from apiserver.config_repo import config +from apiserver.es_factory import es_factory + + +class _AggregationType(Enum): + avg = "avg" + sum = "sum" + + +class ServingStats: + min_chart_interval = config.get("services.serving.min_chart_interval_sec", 40) + es: Elasticsearch = es_factory.connect("workers") + + @classmethod + def _serving_stats_prefix(cls, company_id: str) -> str: + """Returns the es index prefix for the company""" + return f"serving_stats_{company_id.lower()}_" + + @staticmethod + def _get_es_index_suffix(): + """Get the index name suffix for storing current month data""" + return datetime.utcnow().strftime("%Y-%m") + + @staticmethod + def _get_average_value(value) -> Tuple[Optional[float], Optional[int]]: + if value is None: + return None, None + + if isinstance(value, (list, tuple)): + count = len(value) + if not count: + return None, None + return sum(value) / count, count + + return value, 1 + + @classmethod + def log_stats_to_es( + cls, + entry: ServingContainerEntry, + ) -> int: + """ + Actually writing the worker statistics to Elastic + :return: The amount of logged documents + """ + company_id = entry.company_id + es_index = ( + f"{cls._serving_stats_prefix(company_id)}" f"{cls._get_es_index_suffix()}" + ) + + entry_data = entry.to_struct() + doc = { + "timestamp": es_factory.get_timestamp_millis(), + **{ + field: entry_data.get(field) + for field in ( + "container_id", + "company_id", + "endpoint_url", + "requests_num", + "requests_min", + "uptime_sec", + "latency_ms", + ) + }, + } + + stats = entry_data.get("machine_stats") + if stats: + for category in ("cpu", "gpu"): + usage, num = cls._get_average_value(stats.get(f"{category}_usage")) + doc.update({f"{category}_usage": usage, f"{category}_num": num}) + + for category in ("memory", "gpu_memory"): + free, _ = cls._get_average_value(stats.get(f"{category}_free")) + used, _ = cls._get_average_value(stats.get(f"{category}_used")) + doc.update( + { + f"{category}_free": free, + f"{category}_used": used, + f"{category}_total": (free or 0) + (used or 0), + } + ) + + doc.update( + { + field: stats.get(field) + for field in ("disk_free_home", "network_rx", "network_tx") + } + ) + + cls.es.index(index=es_index, document=doc) + + return 1 + + @staticmethod + def round_series(values: Sequence, koeff=1.0) -> list: + return [round(v * koeff, 2) if v else 0 for v in values] + + agg_fields = { + MetricType.requests: ( + "requests_num", + "Number of Requests", + _AggregationType.sum, + ), + MetricType.requests_min: ( + "requests_min", + "Requests per Minute", + _AggregationType.sum, + ), + MetricType.latency_ms: ( + "latency_ms", + "Average Latency (ms)", + _AggregationType.avg, + ), + MetricType.cpu_count: ("cpu_num", "CPU Count", _AggregationType.sum), + MetricType.gpu_count: ("gpu_num", "GPU Count", _AggregationType.sum), + MetricType.cpu_util: ( + "cpu_usage", + "Average CPU Load (%)", + _AggregationType.avg, + ), + MetricType.gpu_util: ( + "gpu_usage", + "Average GPU Utilization (%)", + _AggregationType.avg, + ), + MetricType.ram_total: ("memory_total", "RAM Total (GB)", _AggregationType.sum), + MetricType.ram_free: ("memory_free", "RAM Free (GB)", _AggregationType.sum), + MetricType.gpu_ram_total: ( + "gpu_memory_total", + "GPU RAM Total (GB)", + _AggregationType.sum, + ), + MetricType.gpu_ram_free: ( + "gpu_memory_free", + "GPU RAM Free (GB)", + _AggregationType.sum, + ), + MetricType.network_rx: ( + "network_rx", + "Network Throughput RX (MBps)", + _AggregationType.sum, + ), + MetricType.network_tx: ( + "network_tx", + "Network Throughput TX (MBps)", + _AggregationType.sum, + ), + } + + @classmethod + def get_endpoint_metrics( + cls, + company_id: str, + metrics_request: GetEndpointMetricsHistoryRequest, + ) -> dict: + from_date = metrics_request.from_date + to_date = metrics_request.to_date + if from_date >= to_date: + raise errors.bad_request.FieldsValueError( + "from_date must be less than to_date" + ) + + metric_type = metrics_request.metric_type + agg_data = cls.agg_fields.get(metric_type) + if not agg_data: + raise NotImplemented(f"Charts for {metric_type} not implemented") + + agg_field, title, agg_type = agg_data + if agg_type == _AggregationType.sum: + instance_sum_type = "sum_bucket" + else: + instance_sum_type = "avg_bucket" + + interval = max(metrics_request.interval, cls.min_chart_interval) + endpoint_url = metrics_request.endpoint_url + hist_ret = { + "computed_interval": interval, + "total": { + "title": title, + "dates": [], + "values": [], + }, + "instances": {}, + } + must_conditions = [ + QueryBuilder.term("company_id", company_id), + QueryBuilder.term("endpoint_url", endpoint_url), + QueryBuilder.dates_range(from_date, to_date), + ] + query = {"bool": {"must": must_conditions}} + es_index = f"{cls._serving_stats_prefix(company_id)}*" + res = cls.es.search( + index=es_index, + size=0, + query=query, + aggs={"instances": {"terms": {"field": "container_id"}}}, + ) + instance_buckets = nested_get(res, ("aggregations", "instances", "buckets")) + if not instance_buckets: + return hist_ret + + instance_keys = {ib["key"] for ib in instance_buckets} + must_conditions.append(QueryBuilder.terms("container_id", instance_keys)) + query = {"bool": {"must": must_conditions}} + + aggs = { + "instances": { + "terms": { + "field": "container_id", + "size": max(len(instance_keys), 10), + }, + "aggs": { + "average": {"avg": {"field": agg_field}}, + }, + }, + "total_instances": { + instance_sum_type: { + "gap_policy": "insert_zeros", + "buckets_path": "instances>average", + } + }, + } + aggs = { + "dates": { + "date_histogram": { + "field": "timestamp", + "fixed_interval": f"{interval}s", + "extended_bounds": { + "min": int(from_date) * 1000, + "max": int(to_date) * 1000, + }, + }, + "aggs": aggs, + } + } + + filter_path = None + if not metrics_request.instance_charts: + filter_path = "aggregations.dates.buckets.total_instances" + + data = cls.es.search( + index=es_index, + size=0, + query=query, + aggs=aggs, + filter_path=filter_path, + ) + agg_res = data.get("aggregations") + if not agg_res: + return hist_ret + + dates_ = [] + total = [] + instances = defaultdict(list) + # remove last interval if it's incomplete. Allow 10% tolerance + last_valid_timestamp = (to_date - 0.9 * interval) * 1000 + for point in agg_res["dates"]["buckets"]: + date_ = point["key"] + if date_ > last_valid_timestamp: + break + dates_.append(date_) + total.append(nested_get(point, ("total_instances", "value"), 0)) + if metrics_request.instance_charts: + found_keys = set() + for instance in nested_get(point, ("instances", "buckets"), []): + instances[instance["key"]].append( + nested_get(instance, ("average", "value"), 0) + ) + found_keys.add(instance["key"]) + for missing_key in instance_keys - found_keys: + instances[missing_key].append(0) + + hist_ret["total"]["dates"] = dates_ + hist_ret["total"]["values"] = cls.round_series(total) + hist_ret["instances"] = { + key: {"title": key, "dates": dates_, "values": cls.round_series(values)} + for key, values in instances.items() + } + + return hist_ret diff --git a/apiserver/config/default/services/serving.conf b/apiserver/config/default/services/serving.conf new file mode 100644 index 0000000..4279e00 --- /dev/null +++ b/apiserver/config/default/services/serving.conf @@ -0,0 +1,7 @@ +default_container_timeout_sec: 600 +# Auto-register unknown serving containers on status reports and other calls +container_auto_register: true +# Assume unknow serving containers have unregistered (i.e. do not raise unregistered error) +container_auto_unregister: true +# The minimal sampling interval for serving model monitor chars +min_chart_interval_sec: 40 diff --git a/apiserver/elastic/index_templates/workers/serving_stats.json b/apiserver/elastic/index_templates/workers/serving_stats.json new file mode 100644 index 0000000..b1e255d --- /dev/null +++ b/apiserver/elastic/index_templates/workers/serving_stats.json @@ -0,0 +1,79 @@ +{ + "index_patterns": "serving_stats_*", + "template": { + "settings": { + "number_of_replicas": 0, + "number_of_shards": 1 + }, + "mappings": { + "_source": { + "enabled": true + }, + "properties": { + "timestamp": { + "type": "date" + }, + "container_id": { + "type": "keyword" + }, + "company_id": { + "type": "keyword" + }, + "endpoint_url": { + "type": "keyword" + }, + "requests_num": { + "type": "integer" + }, + "requests_min": { + "type": "float" + }, + "uptime_sec": { + "type": "integer" + }, + "latency_ms": { + "type": "integer" + }, + "cpu_usage": { + "type": "float" + }, + "cpu_num": { + "type": "integer" + }, + "gpu_usage": { + "type": "float" + }, + "gpu_num": { + "type": "integer" + }, + "memory_used": { + "type": "float" + }, + "memory_free": { + "type": "float" + }, + "memory_total": { + "type": "float" + }, + "gpu_memory_used": { + "type": "float" + }, + "gpu_memory_free": { + "type": "float" + }, + "gpu_memory_total": { + "type": "float" + }, + "disk_free_home": { + "type": "float" + }, + "network_rx": { + "type": "float" + }, + "network_tx": { + "type": "float" + } + } + } + } +} \ No newline at end of file diff --git a/apiserver/schema/services/_workers_common.conf b/apiserver/schema/services/_workers_common.conf new file mode 100644 index 0000000..a4e9bc4 --- /dev/null +++ b/apiserver/schema/services/_workers_common.conf @@ -0,0 +1,67 @@ +machine_stats { + type: object + properties { + cpu_usage { + description: "Average CPU usage per core" + type: array + items { type: number } + } + gpu_usage { + description: "Average GPU usage per GPU card" + type: array + items { type: number } + } + memory_used { + description: "Used memory MBs" + type: integer + } + memory_free { + description: "Free memory MBs" + type: integer + } + gpu_memory_free { + description: "GPU free memory MBs" + type: array + items { type: integer } + } + gpu_memory_used { + description: "GPU used memory MBs" + type: array + items { type: integer } + } + network_tx { + description: "Mbytes per second" + type: integer + } + network_rx { + description: "Mbytes per second" + type: integer + } + disk_free_home { + description: "Free space in % of /home drive" + type: integer + } + disk_free_temp { + description: "Free space in % of /tmp drive" + type: integer + } + disk_read { + description: "Mbytes read per second" + type: integer + } + disk_write { + description: "Mbytes write per second" + type: integer + } + cpu_temperature { + description: "CPU temperature" + type: array + items { type: number } + } + gpu_temperature { + description: "GPU temperature" + type: array + items { type: number } + } + } +} \ No newline at end of file diff --git a/apiserver/schema/services/serving.conf b/apiserver/schema/services/serving.conf new file mode 100644 index 0000000..53577ad --- /dev/null +++ b/apiserver/schema/services/serving.conf @@ -0,0 +1,400 @@ +_description: "Serving apis" +_definitions { + include "_workers_common.conf" + serving_model_report { + type: object + required: [container_id, endpoint_name, model_name] + properties { + container_id { + type: string + description: Container ID + } + endpoint_name { + type: string + description: Endpoint name + } + endpoint_url { + type: string + description: Endpoint URL + } + model_name { + type: string + description: Model name + } + model_source { + type: string + description: Model source + } + model_version { + type: string + description: Model version + } + preprocess_artifact { + type: string + description: Preprocess Artifact + } + input_type { + type: string + description: Input type + } + input_size { + type: integer + description: Input size in bytes + } + } + } + endpoint_stats { + type: object + properties { + endpoint { + type: string + description: Endpoint name + } + model { + type: string + description: Model name + } + url { + type: string + description: Model url + } + instances { + type: integer + description: The number of model serving instances + } + uptime_sec { + type: integer + description: Max of model instance uptime in seconds + } + requests { + type: integer + description: Total requests processed by model instances + } + requests_min { + type: number + description: Average of request rate of model instances per minute + } + latency_ms { + type: integer + description: Average of latency of model instances in ms + } + last_update { + type: string + format: "date-time" + description: The latest time when one of the model instances was updated + } + } + } + container_instance_stats { + type: object + properties { + id { + type: string + description: Container ID + } + uptime_sec { + type: integer + description: Uptime in seconds + } + requests { + type: integer + description: Number of requests + } + requests_min { + type: number + description: Average requests per minute + } + latency_ms { + type: integer + description: Average request latency in ms + } + last_update { + type: string + format: "date-time" + description: The latest time when the container instance sent update + } + } + } + serving_model_info { + type: object + properties { + endpoint { + type: string + description: Endpoint name + } + model { + type: string + description: Model name + } + url { + type: string + description: Model url + } + model_source { + type: string + description: Model source + } + model_version { + type: string + description: Model version + } + preprocess_artifact { + type: string + description: Preprocess Artifact + } + input_type { + type: string + description: Input type + } + input_size { + type: integer + description: Input size in bytes + } + } + } + container_info: ${_definitions.serving_model_info} { + properties { + id { + type: string + description: Container ID + } + uptime_sec { + type: integer + description: Model instance uptime in seconds + } + last_update { + type: string + format: "date-time" + description: The latest time when the container instance sent update + } + } + } + metrics_history_series { + type: object + properties { + title { + type: string + description: "The title of the series" + } + dates { + type: array + description: "List of timestamps (in seconds from epoch) in the acceding order. The timestamps are separated by the requested interval." + items {type: integer} + } + values { + type: array + description: "List of values corresponding to the timestamps in the dates list." + items {type: number} + } + } + } +} +register_container { + "2.31" { + description: Register container + request = ${_definitions.serving_model_report} { + properties { + timeout { + description: "Registration timeout in seconds. If timeout seconds have passed since the service container last call to register or status_report, the container is automatically removed from the list of registered containers." + type: integer + default: 600 + } + } + } + response { + type: object + additionalProperties: false + } + } +} +unregister_container { + "2.31" { + description: Unregister container + request { + type: object + required: [container_id] + properties { + container_id { + type: string + description: Container ID + } + } + } + response { + type: object + additionalProperties: false + } + } +} +container_status_report { + "2.31" { + description: Container status report + request = ${_definitions.serving_model_report} { + properties { + uptime_sec { + type: integer + description: Uptime in seconds + } + requests_num { + type: integer + description: Number of requests + } + requests_min { + type: number + description: Average requests per minute + } + latency_ms { + type: integer + description: Average request latency in ms + } + machine_stats { + description: "The machine statistics" + "$ref": "#/definitions/machine_stats" + } + } + } + response { + type: object + additionalProperties: false + } + } +} +get_endpoints { + "2.31" { + description: Get all the registered endpoints + request { + type: object + additionalProperties: false + } + response { + type: object + properties { + endpoints { + type: array + items { "$ref": "#/definitions/endpoint_stats" } + } + } + } + } +} +get_loading_instances { + "2.31" { + description: "Get loading instances (enpoint_url not set yet)" + request { + type: object + additionalProperties: false + } + response { + type: object + properties { + instances { + type: array + items { "$ref": "#/definitions/container_info" } + } + } + } + } +} +get_endpoint_details { + "2.31" { + description: Get endpoint details + request { + type: object + required: [endpoint_url] + properties { + endpoint_url { + type: string + description: Endpoint URL + } + } + } + response: ${_definitions.serving_model_info} { + properties { + uptime_sec { + type: integer + description: Max of model instance uptime in seconds + } + last_update { + type: string + format: "date-time" + description: The latest time when one of the model instances was updated + } + instances { + type: array + items {"$ref": "#/definitions/container_instance_stats"} + } + } + } + } +} +get_endpoint_metrics_history { + "2.31" { + description: Get endpoint charts + request { + type: object + required: [endpoint_url, from_date, to_date, interval] + properties { + endpoint_url { + description: Endpoint Url + type: string + } + from_date { + description: "Starting time (in seconds from epoch) for collecting statistics" + type: number + } + to_date { + description: "Ending time (in seconds from epoch) for collecting statistics" + type: number + } + interval { + description: "Time interval in seconds for a single statistics point. The minimal value is 1" + type: integer + } + metric_type { + description: The type of the metrics to return on the chart + type: string + default: requests + enum: [ + requests + requests_min + latency_ms + cpu_count + gpu_count + cpu_util + gpu_util + ram_total + ram_free + gpu_ram_total + gpu_ram_free + network_rx + network_tx + ] + } + instance_charts { + type: boolean + default: true + description: If set then return instance charts and total. Otherwise total only + } + } + } + response { + type: object + properties { + computed_interval { + description: The inteval that was actually used for the histogram. May be larger then the requested one + type: integer + } + total: ${_definitions.metrics_history_series} { + properties { + description: The total histogram + } + } + instances { + description: Instance charts + type: object + additionalProperties: ${_definitions.metrics_history_series} + } + } + } + } +} \ No newline at end of file diff --git a/apiserver/schema/services/workers.conf b/apiserver/schema/services/workers.conf index 42f6fd8..1e6d310 100644 --- a/apiserver/schema/services/workers.conf +++ b/apiserver/schema/services/workers.conf @@ -1,5 +1,6 @@ _description: "Provides an API for worker machines, allowing workers to report status and get tasks for execution" _definitions { + include "_workers_common.conf" metrics_category { type: object properties { @@ -203,74 +204,6 @@ _definitions { } } } - - machine_stats { - type: object - properties { - cpu_usage { - description: "Average CPU usage per core" - type: array - items { type: number } - } - gpu_usage { - description: "Average GPU usage per GPU card" - type: array - items { type: number } - } - memory_used { - description: "Used memory MBs" - type: integer - } - memory_free { - description: "Free memory MBs" - type: integer - } - gpu_memory_free { - description: "GPU free memory MBs" - type: array - items { type: integer } - } - gpu_memory_used { - description: "GPU used memory MBs" - type: array - items { type: integer } - } - network_tx { - description: "Mbytes per second" - type: integer - } - network_rx { - description: "Mbytes per second" - type: integer - } - disk_free_home { - description: "Mbytes free space of /home drive" - type: integer - } - disk_free_temp { - description: "Mbytes free space of /tmp drive" - type: integer - } - disk_read { - description: "Mbytes read per second" - type: integer - } - disk_write { - description: "Mbytes write per second" - type: integer - } - cpu_temperature { - description: "CPU temperature" - type: array - items { type: number } - } - gpu_temperature { - description: "GPU temperature" - type: array - items { type: number } - } - } - } } get_all { "2.4" { diff --git a/apiserver/services/serving.py b/apiserver/services/serving.py new file mode 100644 index 0000000..56455e8 --- /dev/null +++ b/apiserver/services/serving.py @@ -0,0 +1,69 @@ +from apiserver.apimodels.serving import ( + RegisterRequest, + UnregisterRequest, + StatusReportRequest, + GetEndpointDetailsRequest, + GetEndpointMetricsHistoryRequest, +) +from apiserver.apierrors import errors +from apiserver.service_repo import endpoint, APICall +from apiserver.bll.serving import ServingBLL, ServingStats + + +serving_bll = ServingBLL() + + +@endpoint("serving.register_container") +def register_container(call: APICall, company: str, request: RegisterRequest): + serving_bll.register_serving_container( + company_id=company, ip=call.real_ip, request=request + ) + + +@endpoint("serving.unregister_container") +def unregister_container(_: APICall, company: str, request: UnregisterRequest): + serving_bll.unregister_serving_container( + company_id=company, container_id=request.container_id + ) + + +@endpoint("serving.container_status_report") +def container_status_report(call: APICall, company: str, request: StatusReportRequest): + if not request.endpoint_url: + raise errors.bad_request.ValidationError( + "Missing required field 'endpoint_url'" + ) + serving_bll.container_status_report( + company_id=company, + ip=call.real_ip, + report=request, + ) + + +@endpoint("serving.get_endpoints") +def get_endpoints(call: APICall, company: str, _): + call.result.data = {"endpoints": serving_bll.get_endpoints(company)} + + +@endpoint("serving.get_loading_instances") +def get_loading_instances(call: APICall, company: str, _): + call.result.data = {"instances": serving_bll.get_loading_instances(company)} + + +@endpoint("serving.get_endpoint_details") +def get_endpoint_details( + call: APICall, company: str, request: GetEndpointDetailsRequest +): + call.result.data = serving_bll.get_endpoint_details( + company_id=company, endpoint_url=request.endpoint_url + ) + + +@endpoint("serving.get_endpoint_metrics_history") +def get_endpoint_metrics_history( + call: APICall, company: str, request: GetEndpointMetricsHistoryRequest +): + call.result.data = ServingStats.get_endpoint_metrics( + company_id=company, + metrics_request=request, + ) diff --git a/apiserver/tests/automated/test_serving.py b/apiserver/tests/automated/test_serving.py new file mode 100644 index 0000000..76f9dd4 --- /dev/null +++ b/apiserver/tests/automated/test_serving.py @@ -0,0 +1,101 @@ +from time import time, sleep + +from apiserver.apierrors import errors +from apiserver.tests.automated import TestService + + +class TestServing(TestService): + def test_status_report(self): + container_id1 = "container_1" + container_id2 = "container_2" + url = "http://test_url" + container_infos = [ + { + "container_id": container_id, # required + "endpoint_name": "my endpoint", # required + "endpoint_url": url, # can be omitted for register but required for status report + "model_name": "my model", # required + "model_source": "s3//my_bucket", # optional right now + "model_version": "3.1.0", # optional right now + "preprocess_artifact": "some string here", # optional right now + "input_type": "another string here", # optional right now + "input_size": 9_000_000, # optional right now, bytes + "tags": ["tag1", "tag2"], # optional + "system_tags": None, # optional + } + for container_id in (container_id1, container_id2) + ] + + for container_info in container_infos: + self.api.serving.register_container( + **container_info, + timeout=100, # expiration timeout in seconds. Optional, the default value is 600 + ) + for idx, container_info in enumerate(container_infos): + mul = idx + 1 + self.api.serving.container_status_report( + **container_info, + uptime_sec=1000 * mul, + requests_num=1000 * mul, + requests_min=5 * mul, # requests per minute + latency_ms=100 * mul, # average latency + machine_stats={ # the same structure here as used by worker status_reports + "cpu_usage": [10, 20], + "memory_used": 50, + } + ) + endpoints = self.api.serving.get_endpoints().endpoints + details = self.api.serving.get_endpoint_details(endpoint_url=url) + details = self.api.serving.get_endpoint_details(endpoint_url=url) + + sleep(5) # give time to ES to accomodate data + to_date = int(time()) + 40 + from_date = to_date - 100 + res1 = self.api.serving.get_endpoint_metrics_history( + endpoint_url=url, + from_date=from_date, + to_date=to_date, + interval=1, + ) + res2 = self.api.serving.get_endpoint_metrics_history( + endpoint_url=url, + from_date=from_date, + to_date=to_date, + interval=1, + metric_type="requests_min", + ) + res3 = self.api.serving.get_endpoint_metrics_history( + endpoint_url=url, + from_date=from_date, + to_date=to_date, + interval=1, + metric_type="latency_ms", + ) + res4 = self.api.serving.get_endpoint_metrics_history( + endpoint_url=url, + from_date=from_date, + to_date=to_date, + interval=1, + metric_type="cpu_count", + ) + res5 = self.api.serving.get_endpoint_metrics_history( + endpoint_url=url, + from_date=from_date, + to_date=to_date, + interval=1, + metric_type="cpu_util", + ) + res6 = self.api.serving.get_endpoint_metrics_history( + endpoint_url=url, + from_date=from_date, + to_date=to_date, + interval=1, + metric_type="ram_total", + ) + + for container_id in (container_id1, container_id2): + self.api.serving.unregister_container(container_id=container_id) + endpoints = self.api.serving.get_endpoints().endpoints + with self.api.raises(errors.bad_request.NoContainersForUrl): + details = self.api.serving.get_endpoint_details(endpoint_url=url) + pass