diff --git a/apiserver/bll/workers/stats.py b/apiserver/bll/workers/stats.py index eab81a2..c6bd8d9 100644 --- a/apiserver/bll/workers/stats.py +++ b/apiserver/bll/workers/stats.py @@ -215,6 +215,10 @@ class WorkerStats: "date_histogram": { "field": "timestamp", "fixed_interval": f"{interval}s", + "extended_bounds": { + "min": int(from_date) * 1000, + "max": int(to_date) * 1000, + } }, "aggs": {"workers_count": {"cardinality": {"field": "worker"}}}, } diff --git a/apiserver/tests/automated/test_workers.py b/apiserver/tests/automated/test_workers.py index 9f35857..93f41e8 100644 --- a/apiserver/tests/automated/test_workers.py +++ b/apiserver/tests/automated/test_workers.py @@ -1,10 +1,9 @@ import time from uuid import uuid4 -from datetime import timedelta from typing import Sequence from apiserver.apierrors.errors import bad_request -from apiserver.tests.automated import TestService, utc_now_tz_aware +from apiserver.tests.automated import TestService from apiserver.config_repo import config log = config.logger(__file__) @@ -84,7 +83,7 @@ class TestWorkersService(TestService): self._check_exists(test_worker, False, tags=["test"]) self._check_exists(test_worker, False, tags=["-application"]) - def _simulate_workers(self) -> Sequence[str]: + def _simulate_workers(self, start: int) -> Sequence[str]: """ Two workers writing the same metrics. One for 4 seconds. Another one for 2 The first worker reports a task @@ -106,7 +105,7 @@ class TestWorkersService(TestService): (workers[0],), (workers[0],), ] - timestamp = int(utc_now_tz_aware().timestamp() * 1000) + timestamp = start * 1000 for ws, stats in zip(workers_activity, workers_stats): for w, s in zip(ws, stats): data = dict( @@ -130,7 +129,7 @@ class TestWorkersService(TestService): return task_id def test_get_keys(self): - workers = self._simulate_workers() + workers = self._simulate_workers(int(time.time())) time.sleep(5) # give to es time to refresh res = self.api.workers.get_metric_keys(worker_ids=workers) assert {"cpu", "memory"} == set(c.name for c in res["categories"]) @@ -147,12 +146,12 @@ class TestWorkersService(TestService): self.api.workers.get_metric_keys(worker_ids=["Non existing worker id"]) def test_get_stats(self): - workers = self._simulate_workers() - - to_date = utc_now_tz_aware() + timedelta(seconds=10) - from_date = to_date - timedelta(days=1) + start = int(time.time()) + workers = self._simulate_workers(start) time.sleep(5) # give to ES time to refresh + from_date = start + to_date = start + 10 # no variants res = self.api.workers.get_stats( items=[ @@ -161,8 +160,8 @@ class TestWorkersService(TestService): dict(key="memory_used", aggregation="max"), dict(key="memory_used", aggregation="min"), ], - from_date=from_date.timestamp(), - to_date=to_date.timestamp(), + from_date=from_date, + to_date=to_date, # split_by_variant=True, interval=1, worker_ids=workers, @@ -186,8 +185,8 @@ class TestWorkersService(TestService): # split by variants res = self.api.workers.get_stats( items=[dict(key="cpu_usage", aggregation="avg")], - from_date=from_date.timestamp(), - to_date=to_date.timestamp(), + from_date=from_date, + to_date=to_date, split_by_variant=True, interval=1, worker_ids=workers, @@ -204,8 +203,8 @@ class TestWorkersService(TestService): res = self.api.workers.get_stats( items=[dict(key="cpu_usage", aggregation="avg")], - from_date=from_date.timestamp(), - to_date=to_date.timestamp(), + from_date=from_date, + to_date=to_date, interval=1, worker_ids=["Non existing worker id"], ) @@ -223,29 +222,19 @@ class TestWorkersService(TestService): # to_timestamp=to_timestamp.timestamp(), # interval=20, # ) - - self._simulate_workers() - - to_date = utc_now_tz_aware() + timedelta(seconds=10) - from_date = to_date - timedelta(minutes=1) + start = int(time.time()) + self._simulate_workers(int(time.time())) time.sleep(5) # give to es time to refresh # no variants res = self.api.workers.get_activity_report( - from_date=from_date.timestamp(), to_date=to_date.timestamp(), interval=20 + from_date=start, to_date=start + 10, interval=2 ) - self.assertWorkerSeries(res["total"], 2) - self.assertWorkerSeries(res["active"], 1) - self.assertTotalSeriesGreaterThenActive(res["total"], res["active"]) + self.assertWorkerSeries(res["total"], 2, 5) + self.assertWorkerSeries(res["active"], 1, 5) - @staticmethod - def assertTotalSeriesGreaterThenActive(total_data: dict, active_data: dict): - assert total_data["dates"][-1] == active_data["dates"][-1] - assert total_data["counts"][-1] > active_data["counts"][-1] - - @staticmethod - def assertWorkerSeries(series_data: dict, min_count: int): - assert len(series_data["dates"]) == len(series_data["counts"]) - # check the last 20s aggregation - # there may be more workers that we created since we are not filtering by test workers here - assert series_data["counts"][-1] >= min_count + def assertWorkerSeries(self, series_data: dict, count: int, size: int): + self.assertEqual(len(series_data["dates"]), size) + self.assertEqual(len(series_data["counts"]), size) + self.assertTrue(any(c == count for c in series_data["counts"])) + self.assertTrue(all(c <= count for c in series_data["counts"]))