Add model endpoints support

This commit is contained in:
clearml 2024-12-05 22:20:11 +02:00
parent 9997dcc977
commit eb755be001
12 changed files with 1494 additions and 72 deletions

View File

@ -106,6 +106,11 @@
1004: ["worker_not_registered", "worker is not registered"]
1005: ["worker_stats_not_found", "worker stats not found"]
# Serving
1050: ["invalid_container_id", "invalid container id"]
1051: ["container_not_registered", "container is not registered"]
1052: ["no_containers_for_url", "no container instances found for serice url"]
1104: ["invalid_scroll_id", "Invalid scroll id"]
}

View File

@ -0,0 +1,90 @@
from enum import Enum
from jsonmodels.models import Base
from jsonmodels.fields import (
StringField,
EmbeddedField,
DateTimeField,
IntField,
FloatField, BoolField,
)
from jsonmodels.validators import Min
from apiserver.apimodels import ListField, JsonSerializableMixin
from apiserver.apimodels import ActualEnumField
from apiserver.config_repo import config
from .workers import MachineStats
class ServingModel(Base):
container_id = StringField(required=True)
endpoint_name = StringField(required=True)
endpoint_url = StringField() # can be not existing yet at registration time
model_name = StringField(required=True)
model_source = StringField()
model_version = StringField()
preprocess_artifact = StringField()
input_type = StringField()
input_size = IntField()
tags = ListField(str)
system_tags = ListField(str)
class RegisterRequest(ServingModel):
timeout = IntField(
default=int(config.get("services.serving.default_container_timeout_sec", 10 * 60)),
validators=[Min(1)]
)
""" registration timeout in seconds (default is 10min) """
class UnregisterRequest(Base):
container_id = StringField(required=True)
class StatusReportRequest(ServingModel):
uptime_sec = IntField()
requests_num = IntField()
requests_min = FloatField()
latency_ms = IntField()
machine_stats: MachineStats = EmbeddedField(MachineStats)
class ServingContainerEntry(StatusReportRequest, JsonSerializableMixin):
key = StringField(required=True)
company_id = StringField(required=True)
ip = StringField()
register_time = DateTimeField(required=True)
register_timeout = IntField(required=True)
last_activity_time = DateTimeField(required=True)
class GetEndpointDetailsRequest(Base):
endpoint_url = StringField(required=True)
class MetricType(Enum):
requests = "requests"
requests_min = "requests_min"
latency_ms = "latency_ms"
cpu_count = "cpu_count"
gpu_count = "gpu_count"
cpu_util = "cpu_util"
gpu_util = "gpu_util"
ram_total = "ram_total"
ram_free = "ram_free"
gpu_ram_total = "gpu_ram_total"
gpu_ram_free = "gpu_ram_free"
network_rx = "network_rx"
network_tx = "network_tx"
class GetEndpointMetricsHistoryRequest(Base):
from_date = FloatField(required=True, validators=Min(0))
to_date = FloatField(required=True, validators=Min(0))
interval = IntField(required=True, validators=Min(1))
endpoint_url = StringField(required=True)
metric_type = ActualEnumField(
MetricType, default=MetricType.requests
)
instance_charts = BoolField(default=True)

View File

@ -9,20 +9,35 @@ RANGE_IGNORE_VALUE = -1
class Builder:
@staticmethod
def dates_range(from_date: Union[int, float], to_date: Union[int, float]) -> dict:
def dates_range(
from_date: Optional[Union[int, float]] = None,
to_date: Optional[Union[int, float]] = None,
) -> dict:
assert (
from_date or to_date
), "range condition requires that at least one of from_date or to_date specified"
conditions = {}
if from_date:
conditions["gte"] = int(from_date)
if to_date:
conditions["lte"] = int(to_date)
return {
"range": {
"timestamp": {
"gte": int(from_date),
"lte": int(to_date),
**conditions,
"format": "epoch_second",
}
}
}
@staticmethod
def terms(field: str, values: Iterable[str]) -> dict:
def terms(field: str, values: Iterable) -> dict:
if isinstance(values, str):
assert not isinstance(values, str), "apparently 'term' should be used here"
return {"terms": {field: list(values)}}
@staticmethod
def term(field: str, value) -> dict:
return {"term": {field: value}}
@staticmethod
def normalize_range(

View File

@ -0,0 +1,358 @@
from datetime import datetime, timedelta
from enum import Enum, auto
from operator import attrgetter
from time import time
from typing import Optional, Sequence, Union
import attr
from boltons.iterutils import chunked_iter, bucketize
from pyhocon import ConfigTree
from apiserver.apimodels.serving import (
ServingContainerEntry,
RegisterRequest,
StatusReportRequest,
)
from apiserver.apierrors import errors
from apiserver.config_repo import config
from apiserver.redis_manager import redman
from .stats import ServingStats
log = config.logger(__file__)
class ServingBLL:
def __init__(self, redis=None):
self.conf = config.get("services.serving", ConfigTree())
self.redis = redis or redman.connection("workers")
@staticmethod
def _get_url_key(company: str, url: str):
return f"serving_url_{company}_{url}"
@staticmethod
def _get_container_key(company: str, container_id: str) -> str:
"""Build redis key from company and container_id"""
return f"serving_container_{company}_{container_id}"
def _save_serving_container_entry(self, entry: ServingContainerEntry):
self.redis.setex(
entry.key, timedelta(seconds=entry.register_timeout), entry.to_json()
)
url_key = self._get_url_key(entry.company_id, entry.endpoint_url)
expiration = int(time()) + entry.register_timeout
container_item = {entry.key: expiration}
self.redis.zadd(url_key, container_item)
# make sure that url set will not get stuck in redis
# indefinitely in case no more containers report to it
self.redis.expire(url_key, max(3600, entry.register_timeout))
def _get_serving_container_entry(
self, company_id: str, container_id: str
) -> Optional[ServingContainerEntry]:
"""
Get a container entry for the provided container ID.
"""
key = self._get_container_key(company_id, container_id)
data = self.redis.get(key)
if not data:
return
try:
entry = ServingContainerEntry.from_json(data)
return entry
except Exception as e:
msg = "Failed parsing container entry"
log.exception(f"{msg}: {str(e)}")
def register_serving_container(
self,
company_id: str,
request: RegisterRequest,
ip: str = "",
) -> ServingContainerEntry:
"""
Register a serving container
"""
now = datetime.utcnow()
key = self._get_container_key(company_id, request.container_id)
entry = ServingContainerEntry(
**request.to_struct(),
key=key,
company_id=company_id,
ip=ip,
register_time=now,
register_timeout=request.timeout,
last_activity_time=now,
)
self._save_serving_container_entry(entry)
return entry
def unregister_serving_container(
self,
company_id: str,
container_id: str,
) -> None:
"""
Unregister a serving container
"""
entry = self._get_serving_container_entry(company_id, container_id)
if entry:
url_key = self._get_url_key(entry.company_id, entry.endpoint_url)
self.redis.zrem(url_key, entry.key)
key = self._get_container_key(company_id, container_id)
res = self.redis.delete(key)
if res:
return
if not self.conf.get("container_auto_unregister", True):
raise errors.bad_request.ContainerNotRegistered(container=container_id)
def container_status_report(
self,
company_id: str,
report: StatusReportRequest,
ip: str = "",
) -> None:
"""
Serving container status report
"""
container_id = report.container_id
now = datetime.utcnow()
entry = self._get_serving_container_entry(company_id, container_id)
if entry:
ip = ip or entry.ip
register_time = entry.register_time
register_timeout = entry.register_timeout
else:
if not self.conf.get("container_auto_register", True):
raise errors.bad_request.ContainerNotRegistered(container=container_id)
ip = ip
register_time = now
register_timeout = int(
self.conf.get("default_container_timeout_sec", 10 * 60)
)
key = self._get_container_key(company_id, container_id)
entry = ServingContainerEntry(
**report.to_struct(),
key=key,
company_id=company_id,
ip=ip,
register_time=register_time,
register_timeout=register_timeout,
last_activity_time=now,
)
self._save_serving_container_entry(entry)
ServingStats.log_stats_to_es(entry)
def _get_all(
self,
company_id: str,
) -> Sequence[ServingContainerEntry]:
keys = list(self.redis.scan_iter(self._get_container_key(company_id, "*")))
entries = []
for keys in chunked_iter(keys, 1000):
data = self.redis.mget(keys)
if not data:
continue
for d in data:
try:
entries.append(ServingContainerEntry.from_json(d))
except Exception as ex:
log.error(f"Failed parsing container entry {str(ex)}")
return entries
@attr.s(auto_attribs=True)
class Counter:
class AggType(Enum):
avg = auto()
max = auto()
total = auto()
count = auto()
name: str
field: str
agg_type: AggType
float_precision: int = None
_max: Union[int, float, datetime] = attr.field(init=False, default=None)
_total: Union[int, float] = attr.field(init=False, default=0)
_count: int = attr.field(init=False, default=0)
def add(self, entry: ServingContainerEntry):
value = getattr(entry, self.field, None)
if value is None:
return
self._count += 1
if self.agg_type == self.AggType.max:
self._max = value if self._max is None else max(self._max, value)
else:
self._total += value
def __call__(self):
if self.agg_type == self.AggType.count:
return self._count
if self.agg_type == self.AggType.max:
return self._max
if self.agg_type == self.AggType.total:
return self._total
if not self._count:
return None
avg = self._total / self._count
return round(avg, self.float_precision) if self.float_precision else round(avg)
def _get_summary(self, entries: Sequence[ServingContainerEntry]) -> dict:
counters = [
self.Counter(
name="uptime_sec",
field="uptime_sec",
agg_type=self.Counter.AggType.max,
),
self.Counter(
name="requests",
field="requests_num",
agg_type=self.Counter.AggType.total,
),
self.Counter(
name="requests_min",
field="requests_min",
agg_type=self.Counter.AggType.avg,
float_precision=2,
),
self.Counter(
name="latency_ms",
field="latency_ms",
agg_type=self.Counter.AggType.avg,
),
self.Counter(
name="last_update",
field="last_activity_time",
agg_type=self.Counter.AggType.max,
),
]
for entry in entries:
for counter in counters:
counter.add(entry)
first_entry = entries[0]
ret = {
"endpoint": first_entry.endpoint_name,
"model": first_entry.model_name,
"url": first_entry.endpoint_url,
"instances": len(entries),
**{counter.name: counter() for counter in counters},
}
ret["last_update"] = self._naive_time(ret.get("last_update"))
return ret
def get_endpoints(self, company_id: str):
"""
Group instances by urls and return a summary for each url
Do not return data for "loading" instances that have no url
"""
entries = self._get_all(company_id)
by_url = bucketize(entries, key=attrgetter("endpoint_url"))
by_url.pop(None, None)
return [self._get_summary(url_entries) for url_entries in by_url.values()]
def _get_endpoint_entries(self, company_id, endpoint_url: Union[str, None]) -> Sequence[ServingContainerEntry]:
url_key = self._get_url_key(company_id, endpoint_url)
timestamp = int(time())
self.redis.zremrangebyscore(url_key, min=0, max=timestamp)
container_keys = {key.decode() for key in self.redis.zrange(url_key, 0, -1)}
if not container_keys:
return []
entries = []
found_keys = set()
data = self.redis.mget(container_keys) or []
for d in data:
try:
entry = ServingContainerEntry.from_json(d)
if entry.endpoint_url == endpoint_url:
entries.append(entry)
found_keys.add(entry.key)
except Exception as ex:
log.error(f"Failed parsing container entry {str(ex)}")
missing_keys = container_keys - found_keys
if missing_keys:
self.redis.zrem(url_key, *missing_keys)
return entries
def get_loading_instances(self, company_id: str):
entries = self._get_endpoint_entries(company_id, None)
return [
{
"id": entry.container_id,
"endpoint": entry.endpoint_name,
"url": entry.endpoint_url,
"model": entry.model_name,
"model_source": entry.model_source,
"model_version": entry.model_version,
"preprocess_artifact": entry.preprocess_artifact,
"input_type": entry.input_type,
"input_size": entry.input_size,
"uptime_sec": entry.uptime_sec,
"last_update": self._naive_time(entry.last_activity_time),
}
for entry in entries
]
@staticmethod
def _naive_time(input_: datetime) -> datetime:
if not isinstance(input_, datetime):
return input_
return input_.replace(tzinfo=None)
def get_endpoint_details(self, company_id, endpoint_url: str) -> dict:
entries = self._get_endpoint_entries(company_id, endpoint_url)
if not entries:
raise errors.bad_request.NoContainersForUrl(url=endpoint_url)
instances = []
entry: ServingContainerEntry
for entry in entries:
instances.append(
{
"endpoint": entry.endpoint_name,
"model": entry.model_name,
"url": entry.endpoint_url,
}
)
first_entry = entries[0]
return {
"endpoint": first_entry.endpoint_name,
"model": first_entry.model_name,
"url": first_entry.endpoint_url,
"preprocess_artifact": first_entry.preprocess_artifact,
"input_type": first_entry.input_type,
"input_size": first_entry.input_size,
"model_source": first_entry.model_source,
"model_version": first_entry.model_version,
"uptime_sec": max(e.uptime_sec for e in entries),
"last_update": self._naive_time(max(e.last_activity_time for e in entries)),
"instances": [
{
"id": entry.container_id,
"uptime_sec": entry.uptime_sec,
"requests": entry.requests_num,
"requests_min": entry.requests_min,
"latency_ms": entry.latency_ms,
"last_update": self._naive_time(entry.last_activity_time),
}
for entry in entries
]
}

View File

@ -0,0 +1,298 @@
from collections import defaultdict
from datetime import datetime
from enum import Enum
from typing import Tuple, Optional, Sequence
from elasticsearch import Elasticsearch
from apiserver.apimodels.serving import (
ServingContainerEntry,
GetEndpointMetricsHistoryRequest,
MetricType,
)
from apiserver.apierrors import errors
from apiserver.utilities.dicts import nested_get
from apiserver.bll.query import Builder as QueryBuilder
from apiserver.config_repo import config
from apiserver.es_factory import es_factory
class _AggregationType(Enum):
avg = "avg"
sum = "sum"
class ServingStats:
min_chart_interval = config.get("services.serving.min_chart_interval_sec", 40)
es: Elasticsearch = es_factory.connect("workers")
@classmethod
def _serving_stats_prefix(cls, company_id: str) -> str:
"""Returns the es index prefix for the company"""
return f"serving_stats_{company_id.lower()}_"
@staticmethod
def _get_es_index_suffix():
"""Get the index name suffix for storing current month data"""
return datetime.utcnow().strftime("%Y-%m")
@staticmethod
def _get_average_value(value) -> Tuple[Optional[float], Optional[int]]:
if value is None:
return None, None
if isinstance(value, (list, tuple)):
count = len(value)
if not count:
return None, None
return sum(value) / count, count
return value, 1
@classmethod
def log_stats_to_es(
cls,
entry: ServingContainerEntry,
) -> int:
"""
Actually writing the worker statistics to Elastic
:return: The amount of logged documents
"""
company_id = entry.company_id
es_index = (
f"{cls._serving_stats_prefix(company_id)}" f"{cls._get_es_index_suffix()}"
)
entry_data = entry.to_struct()
doc = {
"timestamp": es_factory.get_timestamp_millis(),
**{
field: entry_data.get(field)
for field in (
"container_id",
"company_id",
"endpoint_url",
"requests_num",
"requests_min",
"uptime_sec",
"latency_ms",
)
},
}
stats = entry_data.get("machine_stats")
if stats:
for category in ("cpu", "gpu"):
usage, num = cls._get_average_value(stats.get(f"{category}_usage"))
doc.update({f"{category}_usage": usage, f"{category}_num": num})
for category in ("memory", "gpu_memory"):
free, _ = cls._get_average_value(stats.get(f"{category}_free"))
used, _ = cls._get_average_value(stats.get(f"{category}_used"))
doc.update(
{
f"{category}_free": free,
f"{category}_used": used,
f"{category}_total": (free or 0) + (used or 0),
}
)
doc.update(
{
field: stats.get(field)
for field in ("disk_free_home", "network_rx", "network_tx")
}
)
cls.es.index(index=es_index, document=doc)
return 1
@staticmethod
def round_series(values: Sequence, koeff=1.0) -> list:
return [round(v * koeff, 2) if v else 0 for v in values]
agg_fields = {
MetricType.requests: (
"requests_num",
"Number of Requests",
_AggregationType.sum,
),
MetricType.requests_min: (
"requests_min",
"Requests per Minute",
_AggregationType.sum,
),
MetricType.latency_ms: (
"latency_ms",
"Average Latency (ms)",
_AggregationType.avg,
),
MetricType.cpu_count: ("cpu_num", "CPU Count", _AggregationType.sum),
MetricType.gpu_count: ("gpu_num", "GPU Count", _AggregationType.sum),
MetricType.cpu_util: (
"cpu_usage",
"Average CPU Load (%)",
_AggregationType.avg,
),
MetricType.gpu_util: (
"gpu_usage",
"Average GPU Utilization (%)",
_AggregationType.avg,
),
MetricType.ram_total: ("memory_total", "RAM Total (GB)", _AggregationType.sum),
MetricType.ram_free: ("memory_free", "RAM Free (GB)", _AggregationType.sum),
MetricType.gpu_ram_total: (
"gpu_memory_total",
"GPU RAM Total (GB)",
_AggregationType.sum,
),
MetricType.gpu_ram_free: (
"gpu_memory_free",
"GPU RAM Free (GB)",
_AggregationType.sum,
),
MetricType.network_rx: (
"network_rx",
"Network Throughput RX (MBps)",
_AggregationType.sum,
),
MetricType.network_tx: (
"network_tx",
"Network Throughput TX (MBps)",
_AggregationType.sum,
),
}
@classmethod
def get_endpoint_metrics(
cls,
company_id: str,
metrics_request: GetEndpointMetricsHistoryRequest,
) -> dict:
from_date = metrics_request.from_date
to_date = metrics_request.to_date
if from_date >= to_date:
raise errors.bad_request.FieldsValueError(
"from_date must be less than to_date"
)
metric_type = metrics_request.metric_type
agg_data = cls.agg_fields.get(metric_type)
if not agg_data:
raise NotImplemented(f"Charts for {metric_type} not implemented")
agg_field, title, agg_type = agg_data
if agg_type == _AggregationType.sum:
instance_sum_type = "sum_bucket"
else:
instance_sum_type = "avg_bucket"
interval = max(metrics_request.interval, cls.min_chart_interval)
endpoint_url = metrics_request.endpoint_url
hist_ret = {
"computed_interval": interval,
"total": {
"title": title,
"dates": [],
"values": [],
},
"instances": {},
}
must_conditions = [
QueryBuilder.term("company_id", company_id),
QueryBuilder.term("endpoint_url", endpoint_url),
QueryBuilder.dates_range(from_date, to_date),
]
query = {"bool": {"must": must_conditions}}
es_index = f"{cls._serving_stats_prefix(company_id)}*"
res = cls.es.search(
index=es_index,
size=0,
query=query,
aggs={"instances": {"terms": {"field": "container_id"}}},
)
instance_buckets = nested_get(res, ("aggregations", "instances", "buckets"))
if not instance_buckets:
return hist_ret
instance_keys = {ib["key"] for ib in instance_buckets}
must_conditions.append(QueryBuilder.terms("container_id", instance_keys))
query = {"bool": {"must": must_conditions}}
aggs = {
"instances": {
"terms": {
"field": "container_id",
"size": max(len(instance_keys), 10),
},
"aggs": {
"average": {"avg": {"field": agg_field}},
},
},
"total_instances": {
instance_sum_type: {
"gap_policy": "insert_zeros",
"buckets_path": "instances>average",
}
},
}
aggs = {
"dates": {
"date_histogram": {
"field": "timestamp",
"fixed_interval": f"{interval}s",
"extended_bounds": {
"min": int(from_date) * 1000,
"max": int(to_date) * 1000,
},
},
"aggs": aggs,
}
}
filter_path = None
if not metrics_request.instance_charts:
filter_path = "aggregations.dates.buckets.total_instances"
data = cls.es.search(
index=es_index,
size=0,
query=query,
aggs=aggs,
filter_path=filter_path,
)
agg_res = data.get("aggregations")
if not agg_res:
return hist_ret
dates_ = []
total = []
instances = defaultdict(list)
# remove last interval if it's incomplete. Allow 10% tolerance
last_valid_timestamp = (to_date - 0.9 * interval) * 1000
for point in agg_res["dates"]["buckets"]:
date_ = point["key"]
if date_ > last_valid_timestamp:
break
dates_.append(date_)
total.append(nested_get(point, ("total_instances", "value"), 0))
if metrics_request.instance_charts:
found_keys = set()
for instance in nested_get(point, ("instances", "buckets"), []):
instances[instance["key"]].append(
nested_get(instance, ("average", "value"), 0)
)
found_keys.add(instance["key"])
for missing_key in instance_keys - found_keys:
instances[missing_key].append(0)
hist_ret["total"]["dates"] = dates_
hist_ret["total"]["values"] = cls.round_series(total)
hist_ret["instances"] = {
key: {"title": key, "dates": dates_, "values": cls.round_series(values)}
for key, values in instances.items()
}
return hist_ret

View File

@ -0,0 +1,7 @@
default_container_timeout_sec: 600
# Auto-register unknown serving containers on status reports and other calls
container_auto_register: true
# Assume unknow serving containers have unregistered (i.e. do not raise unregistered error)
container_auto_unregister: true
# The minimal sampling interval for serving model monitor chars
min_chart_interval_sec: 40

View File

@ -0,0 +1,79 @@
{
"index_patterns": "serving_stats_*",
"template": {
"settings": {
"number_of_replicas": 0,
"number_of_shards": 1
},
"mappings": {
"_source": {
"enabled": true
},
"properties": {
"timestamp": {
"type": "date"
},
"container_id": {
"type": "keyword"
},
"company_id": {
"type": "keyword"
},
"endpoint_url": {
"type": "keyword"
},
"requests_num": {
"type": "integer"
},
"requests_min": {
"type": "float"
},
"uptime_sec": {
"type": "integer"
},
"latency_ms": {
"type": "integer"
},
"cpu_usage": {
"type": "float"
},
"cpu_num": {
"type": "integer"
},
"gpu_usage": {
"type": "float"
},
"gpu_num": {
"type": "integer"
},
"memory_used": {
"type": "float"
},
"memory_free": {
"type": "float"
},
"memory_total": {
"type": "float"
},
"gpu_memory_used": {
"type": "float"
},
"gpu_memory_free": {
"type": "float"
},
"gpu_memory_total": {
"type": "float"
},
"disk_free_home": {
"type": "float"
},
"network_rx": {
"type": "float"
},
"network_tx": {
"type": "float"
}
}
}
}
}

View File

@ -0,0 +1,67 @@
machine_stats {
type: object
properties {
cpu_usage {
description: "Average CPU usage per core"
type: array
items { type: number }
}
gpu_usage {
description: "Average GPU usage per GPU card"
type: array
items { type: number }
}
memory_used {
description: "Used memory MBs"
type: integer
}
memory_free {
description: "Free memory MBs"
type: integer
}
gpu_memory_free {
description: "GPU free memory MBs"
type: array
items { type: integer }
}
gpu_memory_used {
description: "GPU used memory MBs"
type: array
items { type: integer }
}
network_tx {
description: "Mbytes per second"
type: integer
}
network_rx {
description: "Mbytes per second"
type: integer
}
disk_free_home {
description: "Free space in % of /home drive"
type: integer
}
disk_free_temp {
description: "Free space in % of /tmp drive"
type: integer
}
disk_read {
description: "Mbytes read per second"
type: integer
}
disk_write {
description: "Mbytes write per second"
type: integer
}
cpu_temperature {
description: "CPU temperature"
type: array
items { type: number }
}
gpu_temperature {
description: "GPU temperature"
type: array
items { type: number }
}
}
}

View File

@ -0,0 +1,400 @@
_description: "Serving apis"
_definitions {
include "_workers_common.conf"
serving_model_report {
type: object
required: [container_id, endpoint_name, model_name]
properties {
container_id {
type: string
description: Container ID
}
endpoint_name {
type: string
description: Endpoint name
}
endpoint_url {
type: string
description: Endpoint URL
}
model_name {
type: string
description: Model name
}
model_source {
type: string
description: Model source
}
model_version {
type: string
description: Model version
}
preprocess_artifact {
type: string
description: Preprocess Artifact
}
input_type {
type: string
description: Input type
}
input_size {
type: integer
description: Input size in bytes
}
}
}
endpoint_stats {
type: object
properties {
endpoint {
type: string
description: Endpoint name
}
model {
type: string
description: Model name
}
url {
type: string
description: Model url
}
instances {
type: integer
description: The number of model serving instances
}
uptime_sec {
type: integer
description: Max of model instance uptime in seconds
}
requests {
type: integer
description: Total requests processed by model instances
}
requests_min {
type: number
description: Average of request rate of model instances per minute
}
latency_ms {
type: integer
description: Average of latency of model instances in ms
}
last_update {
type: string
format: "date-time"
description: The latest time when one of the model instances was updated
}
}
}
container_instance_stats {
type: object
properties {
id {
type: string
description: Container ID
}
uptime_sec {
type: integer
description: Uptime in seconds
}
requests {
type: integer
description: Number of requests
}
requests_min {
type: number
description: Average requests per minute
}
latency_ms {
type: integer
description: Average request latency in ms
}
last_update {
type: string
format: "date-time"
description: The latest time when the container instance sent update
}
}
}
serving_model_info {
type: object
properties {
endpoint {
type: string
description: Endpoint name
}
model {
type: string
description: Model name
}
url {
type: string
description: Model url
}
model_source {
type: string
description: Model source
}
model_version {
type: string
description: Model version
}
preprocess_artifact {
type: string
description: Preprocess Artifact
}
input_type {
type: string
description: Input type
}
input_size {
type: integer
description: Input size in bytes
}
}
}
container_info: ${_definitions.serving_model_info} {
properties {
id {
type: string
description: Container ID
}
uptime_sec {
type: integer
description: Model instance uptime in seconds
}
last_update {
type: string
format: "date-time"
description: The latest time when the container instance sent update
}
}
}
metrics_history_series {
type: object
properties {
title {
type: string
description: "The title of the series"
}
dates {
type: array
description: "List of timestamps (in seconds from epoch) in the acceding order. The timestamps are separated by the requested interval."
items {type: integer}
}
values {
type: array
description: "List of values corresponding to the timestamps in the dates list."
items {type: number}
}
}
}
}
register_container {
"2.31" {
description: Register container
request = ${_definitions.serving_model_report} {
properties {
timeout {
description: "Registration timeout in seconds. If timeout seconds have passed since the service container last call to register or status_report, the container is automatically removed from the list of registered containers."
type: integer
default: 600
}
}
}
response {
type: object
additionalProperties: false
}
}
}
unregister_container {
"2.31" {
description: Unregister container
request {
type: object
required: [container_id]
properties {
container_id {
type: string
description: Container ID
}
}
}
response {
type: object
additionalProperties: false
}
}
}
container_status_report {
"2.31" {
description: Container status report
request = ${_definitions.serving_model_report} {
properties {
uptime_sec {
type: integer
description: Uptime in seconds
}
requests_num {
type: integer
description: Number of requests
}
requests_min {
type: number
description: Average requests per minute
}
latency_ms {
type: integer
description: Average request latency in ms
}
machine_stats {
description: "The machine statistics"
"$ref": "#/definitions/machine_stats"
}
}
}
response {
type: object
additionalProperties: false
}
}
}
get_endpoints {
"2.31" {
description: Get all the registered endpoints
request {
type: object
additionalProperties: false
}
response {
type: object
properties {
endpoints {
type: array
items { "$ref": "#/definitions/endpoint_stats" }
}
}
}
}
}
get_loading_instances {
"2.31" {
description: "Get loading instances (enpoint_url not set yet)"
request {
type: object
additionalProperties: false
}
response {
type: object
properties {
instances {
type: array
items { "$ref": "#/definitions/container_info" }
}
}
}
}
}
get_endpoint_details {
"2.31" {
description: Get endpoint details
request {
type: object
required: [endpoint_url]
properties {
endpoint_url {
type: string
description: Endpoint URL
}
}
}
response: ${_definitions.serving_model_info} {
properties {
uptime_sec {
type: integer
description: Max of model instance uptime in seconds
}
last_update {
type: string
format: "date-time"
description: The latest time when one of the model instances was updated
}
instances {
type: array
items {"$ref": "#/definitions/container_instance_stats"}
}
}
}
}
}
get_endpoint_metrics_history {
"2.31" {
description: Get endpoint charts
request {
type: object
required: [endpoint_url, from_date, to_date, interval]
properties {
endpoint_url {
description: Endpoint Url
type: string
}
from_date {
description: "Starting time (in seconds from epoch) for collecting statistics"
type: number
}
to_date {
description: "Ending time (in seconds from epoch) for collecting statistics"
type: number
}
interval {
description: "Time interval in seconds for a single statistics point. The minimal value is 1"
type: integer
}
metric_type {
description: The type of the metrics to return on the chart
type: string
default: requests
enum: [
requests
requests_min
latency_ms
cpu_count
gpu_count
cpu_util
gpu_util
ram_total
ram_free
gpu_ram_total
gpu_ram_free
network_rx
network_tx
]
}
instance_charts {
type: boolean
default: true
description: If set then return instance charts and total. Otherwise total only
}
}
}
response {
type: object
properties {
computed_interval {
description: The inteval that was actually used for the histogram. May be larger then the requested one
type: integer
}
total: ${_definitions.metrics_history_series} {
properties {
description: The total histogram
}
}
instances {
description: Instance charts
type: object
additionalProperties: ${_definitions.metrics_history_series}
}
}
}
}
}

View File

@ -1,5 +1,6 @@
_description: "Provides an API for worker machines, allowing workers to report status and get tasks for execution"
_definitions {
include "_workers_common.conf"
metrics_category {
type: object
properties {
@ -203,74 +204,6 @@ _definitions {
}
}
}
machine_stats {
type: object
properties {
cpu_usage {
description: "Average CPU usage per core"
type: array
items { type: number }
}
gpu_usage {
description: "Average GPU usage per GPU card"
type: array
items { type: number }
}
memory_used {
description: "Used memory MBs"
type: integer
}
memory_free {
description: "Free memory MBs"
type: integer
}
gpu_memory_free {
description: "GPU free memory MBs"
type: array
items { type: integer }
}
gpu_memory_used {
description: "GPU used memory MBs"
type: array
items { type: integer }
}
network_tx {
description: "Mbytes per second"
type: integer
}
network_rx {
description: "Mbytes per second"
type: integer
}
disk_free_home {
description: "Mbytes free space of /home drive"
type: integer
}
disk_free_temp {
description: "Mbytes free space of /tmp drive"
type: integer
}
disk_read {
description: "Mbytes read per second"
type: integer
}
disk_write {
description: "Mbytes write per second"
type: integer
}
cpu_temperature {
description: "CPU temperature"
type: array
items { type: number }
}
gpu_temperature {
description: "GPU temperature"
type: array
items { type: number }
}
}
}
}
get_all {
"2.4" {

View File

@ -0,0 +1,69 @@
from apiserver.apimodels.serving import (
RegisterRequest,
UnregisterRequest,
StatusReportRequest,
GetEndpointDetailsRequest,
GetEndpointMetricsHistoryRequest,
)
from apiserver.apierrors import errors
from apiserver.service_repo import endpoint, APICall
from apiserver.bll.serving import ServingBLL, ServingStats
serving_bll = ServingBLL()
@endpoint("serving.register_container")
def register_container(call: APICall, company: str, request: RegisterRequest):
serving_bll.register_serving_container(
company_id=company, ip=call.real_ip, request=request
)
@endpoint("serving.unregister_container")
def unregister_container(_: APICall, company: str, request: UnregisterRequest):
serving_bll.unregister_serving_container(
company_id=company, container_id=request.container_id
)
@endpoint("serving.container_status_report")
def container_status_report(call: APICall, company: str, request: StatusReportRequest):
if not request.endpoint_url:
raise errors.bad_request.ValidationError(
"Missing required field 'endpoint_url'"
)
serving_bll.container_status_report(
company_id=company,
ip=call.real_ip,
report=request,
)
@endpoint("serving.get_endpoints")
def get_endpoints(call: APICall, company: str, _):
call.result.data = {"endpoints": serving_bll.get_endpoints(company)}
@endpoint("serving.get_loading_instances")
def get_loading_instances(call: APICall, company: str, _):
call.result.data = {"instances": serving_bll.get_loading_instances(company)}
@endpoint("serving.get_endpoint_details")
def get_endpoint_details(
call: APICall, company: str, request: GetEndpointDetailsRequest
):
call.result.data = serving_bll.get_endpoint_details(
company_id=company, endpoint_url=request.endpoint_url
)
@endpoint("serving.get_endpoint_metrics_history")
def get_endpoint_metrics_history(
call: APICall, company: str, request: GetEndpointMetricsHistoryRequest
):
call.result.data = ServingStats.get_endpoint_metrics(
company_id=company,
metrics_request=request,
)

View File

@ -0,0 +1,101 @@
from time import time, sleep
from apiserver.apierrors import errors
from apiserver.tests.automated import TestService
class TestServing(TestService):
def test_status_report(self):
container_id1 = "container_1"
container_id2 = "container_2"
url = "http://test_url"
container_infos = [
{
"container_id": container_id, # required
"endpoint_name": "my endpoint", # required
"endpoint_url": url, # can be omitted for register but required for status report
"model_name": "my model", # required
"model_source": "s3//my_bucket", # optional right now
"model_version": "3.1.0", # optional right now
"preprocess_artifact": "some string here", # optional right now
"input_type": "another string here", # optional right now
"input_size": 9_000_000, # optional right now, bytes
"tags": ["tag1", "tag2"], # optional
"system_tags": None, # optional
}
for container_id in (container_id1, container_id2)
]
for container_info in container_infos:
self.api.serving.register_container(
**container_info,
timeout=100, # expiration timeout in seconds. Optional, the default value is 600
)
for idx, container_info in enumerate(container_infos):
mul = idx + 1
self.api.serving.container_status_report(
**container_info,
uptime_sec=1000 * mul,
requests_num=1000 * mul,
requests_min=5 * mul, # requests per minute
latency_ms=100 * mul, # average latency
machine_stats={ # the same structure here as used by worker status_reports
"cpu_usage": [10, 20],
"memory_used": 50,
}
)
endpoints = self.api.serving.get_endpoints().endpoints
details = self.api.serving.get_endpoint_details(endpoint_url=url)
details = self.api.serving.get_endpoint_details(endpoint_url=url)
sleep(5) # give time to ES to accomodate data
to_date = int(time()) + 40
from_date = to_date - 100
res1 = self.api.serving.get_endpoint_metrics_history(
endpoint_url=url,
from_date=from_date,
to_date=to_date,
interval=1,
)
res2 = self.api.serving.get_endpoint_metrics_history(
endpoint_url=url,
from_date=from_date,
to_date=to_date,
interval=1,
metric_type="requests_min",
)
res3 = self.api.serving.get_endpoint_metrics_history(
endpoint_url=url,
from_date=from_date,
to_date=to_date,
interval=1,
metric_type="latency_ms",
)
res4 = self.api.serving.get_endpoint_metrics_history(
endpoint_url=url,
from_date=from_date,
to_date=to_date,
interval=1,
metric_type="cpu_count",
)
res5 = self.api.serving.get_endpoint_metrics_history(
endpoint_url=url,
from_date=from_date,
to_date=to_date,
interval=1,
metric_type="cpu_util",
)
res6 = self.api.serving.get_endpoint_metrics_history(
endpoint_url=url,
from_date=from_date,
to_date=to_date,
interval=1,
metric_type="ram_total",
)
for container_id in (container_id1, container_id2):
self.api.serving.unregister_container(container_id=container_id)
endpoints = self.api.serving.get_endpoints().endpoints
with self.api.raises(errors.bad_request.NoContainersForUrl):
details = self.api.serving.get_endpoint_details(endpoint_url=url)
pass