diff --git a/apiserver/apimodels/serving.py b/apiserver/apimodels/serving.py index 227c2b3..95c2061 100644 --- a/apiserver/apimodels/serving.py +++ b/apiserver/apimodels/serving.py @@ -86,8 +86,10 @@ class MetricType(Enum): cpu_util = "cpu_util" gpu_util = "gpu_util" ram_total = "ram_total" + ram_used = "ram_used" ram_free = "ram_free" gpu_ram_total = "gpu_ram_total" + gpu_ram_used = "gpu_ram_used" gpu_ram_free = "gpu_ram_free" network_rx = "network_rx" network_tx = "network_tx" diff --git a/apiserver/bll/serving/__init__.py b/apiserver/bll/serving/__init__.py index a8e80b2..24f3cb5 100644 --- a/apiserver/bll/serving/__init__.py +++ b/apiserver/bll/serving/__init__.py @@ -13,11 +13,13 @@ from apiserver.apimodels.serving import ( RegisterRequest, StatusReportRequest, ) +from apiserver.apimodels.workers import MachineStats from apiserver.apierrors import errors from apiserver.config_repo import config from apiserver.redis_manager import redman from .stats import ServingStats + log = config.logger(__file__) @@ -329,6 +331,21 @@ class ServingBLL: } ) + def get_machine_stats_data(machine_stats: MachineStats) -> dict: + ret = {"cpu_count": 0, "gpu_count": 0} + if not machine_stats: + return ret + + for value, field in ( + (machine_stats.cpu_usage, "cpu_count"), + (machine_stats.gpu_usage, "gpu_count"), + ): + if value is None: + continue + ret[field] = len(value) if isinstance(value, (list, tuple)) else 1 + + return ret + first_entry = entries[0] return { "endpoint": first_entry.endpoint_name, @@ -352,6 +369,7 @@ class ServingBLL: "reference": [ref.to_struct() for ref in entry.reference] if isinstance(entry.reference, list) else entry.reference, + **get_machine_stats_data(entry.machine_stats), } for entry in entries ], diff --git a/apiserver/bll/serving/stats.py b/apiserver/bll/serving/stats.py index 6238151..8c5f992 100644 --- a/apiserver/bll/serving/stats.py +++ b/apiserver/bll/serving/stats.py @@ -94,7 +94,7 @@ class ServingStats: { f"{category}_free": free, f"{category}_used": used, - f"{category}_total": (free or 0) + (used or 0), + f"{category}_total": round((free or 0) + (used or 0), 3), } ) @@ -110,58 +110,90 @@ class ServingStats: return 1 @staticmethod - def round_series(values: Sequence, koeff=1.0) -> list: + def round_series(values: Sequence, koeff) -> list: return [round(v * koeff, 2) if v else 0 for v in values] + _mb_to_gb = 1 / 1024 agg_fields = { MetricType.requests: ( "requests_num", "Number of Requests", _AggregationType.sum, + None, ), MetricType.requests_min: ( "requests_min", "Requests per Minute", _AggregationType.sum, + None, ), MetricType.latency_ms: ( "latency_ms", "Average Latency (ms)", _AggregationType.avg, + None, ), - MetricType.cpu_count: ("cpu_num", "CPU Count", _AggregationType.sum), - MetricType.gpu_count: ("gpu_num", "GPU Count", _AggregationType.sum), + MetricType.cpu_count: ("cpu_num", "CPU Count", _AggregationType.sum, None), + MetricType.gpu_count: ("gpu_num", "GPU Count", _AggregationType.sum, None), MetricType.cpu_util: ( "cpu_usage", "Average CPU Load (%)", _AggregationType.avg, + None, ), MetricType.gpu_util: ( "gpu_usage", "Average GPU Utilization (%)", _AggregationType.avg, + None, + ), + MetricType.ram_total: ( + "memory_total", + "RAM Total (GB)", + _AggregationType.sum, + _mb_to_gb, + ), + MetricType.ram_used: ( + "memory_used", + "RAM Used (GB)", + _AggregationType.sum, + _mb_to_gb, + ), + MetricType.ram_free: ( + "memory_free", + "RAM Free (GB)", + _AggregationType.sum, + _mb_to_gb, ), - MetricType.ram_total: ("memory_total", "RAM Total (GB)", _AggregationType.sum), - MetricType.ram_free: ("memory_free", "RAM Free (GB)", _AggregationType.sum), MetricType.gpu_ram_total: ( "gpu_memory_total", "GPU RAM Total (GB)", _AggregationType.sum, + _mb_to_gb, + ), + MetricType.gpu_ram_used: ( + "gpu_memory_used", + "GPU RAM Used (GB)", + _AggregationType.sum, + _mb_to_gb, ), MetricType.gpu_ram_free: ( "gpu_memory_free", "GPU RAM Free (GB)", _AggregationType.sum, + _mb_to_gb, ), MetricType.network_rx: ( "network_rx", "Network Throughput RX (MBps)", _AggregationType.sum, + None, ), MetricType.network_tx: ( "network_tx", "Network Throughput TX (MBps)", _AggregationType.sum, + None, ), } @@ -183,7 +215,7 @@ class ServingStats: if not agg_data: raise NotImplemented(f"Charts for {metric_type} not implemented") - agg_field, title, agg_type = agg_data + agg_field, title, agg_type, multiplier = agg_data if agg_type == _AggregationType.sum: instance_sum_type = "sum_bucket" else: @@ -220,7 +252,7 @@ class ServingStats: instance_keys = {ib["key"] for ib in instance_buckets} must_conditions.append(QueryBuilder.terms("container_id", instance_keys)) query = {"bool": {"must": must_conditions}} - + sample_func = "avg" if metric_type != MetricType.requests else "max" aggs = { "instances": { "terms": { @@ -228,13 +260,13 @@ class ServingStats: "size": max(len(instance_keys), 10), }, "aggs": { - "average": {"avg": {"field": agg_field}}, + "sample": {sample_func: {"field": agg_field}}, }, }, "total_instances": { instance_sum_type: { "gap_policy": "insert_zeros", - "buckets_path": "instances>average", + "buckets_path": "instances>sample", } }, } @@ -282,16 +314,21 @@ class ServingStats: found_keys = set() for instance in nested_get(point, ("instances", "buckets"), []): instances[instance["key"]].append( - nested_get(instance, ("average", "value"), 0) + nested_get(instance, ("sample", "value"), 0) ) found_keys.add(instance["key"]) for missing_key in instance_keys - found_keys: instances[missing_key].append(0) + koeff = multiplier if multiplier else 1.0 hist_ret["total"]["dates"] = dates_ - hist_ret["total"]["values"] = cls.round_series(total) + hist_ret["total"]["values"] = cls.round_series(total, koeff) hist_ret["instances"] = { - key: {"title": key, "dates": dates_, "values": cls.round_series(values)} + key: { + "title": key, + "dates": dates_, + "values": cls.round_series(values, koeff), + } for key, values in sorted(instances.items(), key=lambda p: p[0]) } diff --git a/apiserver/schema/services/_workers_common.conf b/apiserver/schema/services/_workers_common.conf index a4e9bc4..034abd3 100644 --- a/apiserver/schema/services/_workers_common.conf +++ b/apiserver/schema/services/_workers_common.conf @@ -13,45 +13,45 @@ machine_stats { } memory_used { description: "Used memory MBs" - type: integer + type: number } memory_free { description: "Free memory MBs" - type: integer + type: number } gpu_memory_free { description: "GPU free memory MBs" type: array - items { type: integer } + items { type: number } } gpu_memory_used { description: "GPU used memory MBs" type: array - items { type: integer } + items { type: number } } network_tx { description: "Mbytes per second" - type: integer + type: number } network_rx { description: "Mbytes per second" - type: integer + type: number } disk_free_home { description: "Free space in % of /home drive" - type: integer + type: number } disk_free_temp { description: "Free space in % of /tmp drive" - type: integer + type: number } disk_read { description: "Mbytes read per second" - type: integer + type: number } disk_write { description: "Mbytes write per second" - type: integer + type: number } cpu_temperature { description: "CPU temperature" diff --git a/apiserver/schema/services/serving.conf b/apiserver/schema/services/serving.conf index 86fb68a..7a5106a 100644 --- a/apiserver/schema/services/serving.conf +++ b/apiserver/schema/services/serving.conf @@ -134,6 +134,14 @@ _definitions { format: "date-time" description: The latest time when the container instance sent update } + cpu_count { + type: integer + description: CPU Count + } + gpu_count { + type: integer + description: GPU Count + } reference: ${_definitions.reference} } @@ -390,8 +398,10 @@ get_endpoint_metrics_history { cpu_util gpu_util ram_total + ram_used ram_free gpu_ram_total + gpu_ram_used gpu_ram_free network_rx network_tx diff --git a/apiserver/tests/automated/test_serving.py b/apiserver/tests/automated/test_serving.py index f581efa..b24f5b7 100644 --- a/apiserver/tests/automated/test_serving.py +++ b/apiserver/tests/automated/test_serving.py @@ -49,7 +49,7 @@ class TestServing(TestService): latency_ms=100 * mul, # average latency machine_stats={ # the same structure here as used by worker status_reports "cpu_usage": [10, 20], - "memory_used": 50, + "memory_used": 50 * 1024, }, ) @@ -68,14 +68,16 @@ class TestServing(TestService): "requests", "requests_min", "latency_ms", + "cpu_count", + "gpu_count", "reference", ) ] for inst in details.instances }, { - "container_1": [1000, 1000, 5, 100, reference], - "container_2": [2000, 2000, 10, 200, []], + "container_1": [1000, 1000, 5, 100, 2, 0, reference], + "container_2": [2000, 2000, 10, 200, 2, 0, []], }, ) # make sure that the first call did not invalidate anything @@ -92,7 +94,7 @@ class TestServing(TestService): ("latency_ms", "Average Latency (ms)", 150), ("cpu_count", "CPU Count", 4), ("cpu_util", "Average CPU Load (%)", 15), - ("ram_total", "RAM Total (GB)", 100), + ("ram_used", "RAM Used (GB)", 100.0), ): res = self.api.serving.get_endpoint_metrics_history( endpoint_url=url,