mirror of
				https://github.com/clearml/clearml-server
				synced 2025-06-26 23:15:47 +00:00 
			
		
		
		
	Workers statistics now return 0s for the periods where the worker did not report
This commit is contained in:
		
							parent
							
								
									7506a13fe8
								
							
						
					
					
						commit
						0b61ec2a56
					
				@ -73,7 +73,9 @@ class WorkerStats:
 | 
			
		||||
        Buckets with no metrics are not returned
 | 
			
		||||
        Note: all the statistics are retrieved as one ES query
 | 
			
		||||
        """
 | 
			
		||||
        if request.from_date >= request.to_date:
 | 
			
		||||
        from_date = request.from_date
 | 
			
		||||
        to_date = request.to_date
 | 
			
		||||
        if from_date >= to_date:
 | 
			
		||||
            raise bad_request.FieldsValueError("from_date must be less than to_date")
 | 
			
		||||
 | 
			
		||||
        def get_dates_agg() -> dict:
 | 
			
		||||
@ -83,12 +85,16 @@ class WorkerStats:
 | 
			
		||||
                ("max", AggregationType.max.value),
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            interval = max(request.interval, self.min_chart_interval)
 | 
			
		||||
            return {
 | 
			
		||||
                "dates": {
 | 
			
		||||
                    "date_histogram": {
 | 
			
		||||
                        "field": "timestamp",
 | 
			
		||||
                        "fixed_interval": f"{request.interval}s",
 | 
			
		||||
                        "min_doc_count": 1,
 | 
			
		||||
                        "fixed_interval": f"{interval}s",
 | 
			
		||||
                        "extended_bounds": {
 | 
			
		||||
                          "min": int(from_date) * 1000,
 | 
			
		||||
                          "max": int(to_date) * 1000,
 | 
			
		||||
                        }
 | 
			
		||||
                    },
 | 
			
		||||
                    "aggs": {
 | 
			
		||||
                        agg_type: {es_agg: {"field": "value"}}
 | 
			
		||||
@ -120,7 +126,7 @@ class WorkerStats:
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        query_terms = [
 | 
			
		||||
            QueryBuilder.dates_range(request.from_date, request.to_date),
 | 
			
		||||
            QueryBuilder.dates_range(from_date, to_date),
 | 
			
		||||
            QueryBuilder.terms("metric", {item.key for item in request.items}),
 | 
			
		||||
        ]
 | 
			
		||||
        if request.worker_ids:
 | 
			
		||||
@ -157,7 +163,7 @@ class WorkerStats:
 | 
			
		||||
            return {
 | 
			
		||||
                "date": date["key"],
 | 
			
		||||
                "count": date["doc_count"],
 | 
			
		||||
                **{agg: date[agg]["value"] for agg in aggs_per_metric[metric_key]},
 | 
			
		||||
                **{agg: date[agg]["value"] or 0.0 for agg in aggs_per_metric[metric_key]},
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
        def extract_metric_results(
 | 
			
		||||
@ -166,7 +172,6 @@ class WorkerStats:
 | 
			
		||||
            return [
 | 
			
		||||
                extract_date_stats(date, metric_key)
 | 
			
		||||
                for date in metric_or_variant["dates"]["buckets"]
 | 
			
		||||
                if date["doc_count"]
 | 
			
		||||
            ]
 | 
			
		||||
 | 
			
		||||
        def extract_variant_results(metric: dict) -> dict:
 | 
			
		||||
 | 
			
		||||
@ -116,7 +116,7 @@ class TestWorkersService(TestService):
 | 
			
		||||
                if w == workers[0]:
 | 
			
		||||
                    data["task"] = task_id
 | 
			
		||||
                self.api.workers.status_report(**data)
 | 
			
		||||
                timestamp += 1000
 | 
			
		||||
                timestamp += 60*1000
 | 
			
		||||
 | 
			
		||||
        return workers
 | 
			
		||||
 | 
			
		||||
@ -151,7 +151,7 @@ class TestWorkersService(TestService):
 | 
			
		||||
 | 
			
		||||
        time.sleep(5)  # give to ES time to refresh
 | 
			
		||||
        from_date = start
 | 
			
		||||
        to_date = start + 10
 | 
			
		||||
        to_date = start + 40*10
 | 
			
		||||
        # no variants
 | 
			
		||||
        res = self.api.workers.get_stats(
 | 
			
		||||
            items=[
 | 
			
		||||
@ -180,7 +180,7 @@ class TestWorkersService(TestService):
 | 
			
		||||
                self.assertEqual(
 | 
			
		||||
                    set(stat.aggregation for stat in metric.stats), metric_stats
 | 
			
		||||
                )
 | 
			
		||||
                self.assertEqual(len(metric.dates), 4 if worker.worker == workers[0] else 2)
 | 
			
		||||
                self.assertEqual(len(metric.dates), 11)
 | 
			
		||||
 | 
			
		||||
        # split by variants
 | 
			
		||||
        res = self.api.workers.get_stats(
 | 
			
		||||
@ -199,7 +199,7 @@ class TestWorkersService(TestService):
 | 
			
		||||
                    set(metric.variant for metric in worker.metrics),
 | 
			
		||||
                    {"0", "1"} if worker.worker == workers[0] else {"0"},
 | 
			
		||||
                )
 | 
			
		||||
                self.assertEqual(len(metric.dates), 4 if worker.worker == workers[0] else 2)
 | 
			
		||||
                self.assertEqual(len(metric.dates), 11)
 | 
			
		||||
 | 
			
		||||
        res = self.api.workers.get_stats(
 | 
			
		||||
            items=[dict(key="cpu_usage", aggregation="avg")],
 | 
			
		||||
@ -216,25 +216,25 @@ class TestWorkersService(TestService):
 | 
			
		||||
    def test_get_activity_report(self):
 | 
			
		||||
        # test no workers data
 | 
			
		||||
        # run on an empty es db since we have no way
 | 
			
		||||
        # to pass non existing workers to this api
 | 
			
		||||
        # to pass non-existing workers to this api
 | 
			
		||||
        # res = self.api.workers.get_activity_report(
 | 
			
		||||
        #     from_timestamp=from_timestamp.timestamp(),
 | 
			
		||||
        #     to_timestamp=to_timestamp.timestamp(),
 | 
			
		||||
        #     interval=20,
 | 
			
		||||
        # )
 | 
			
		||||
        start = int(time.time())
 | 
			
		||||
        self._simulate_workers(int(time.time()))
 | 
			
		||||
        self._simulate_workers(start)
 | 
			
		||||
 | 
			
		||||
        time.sleep(5)  # give to es time to refresh
 | 
			
		||||
        # no variants
 | 
			
		||||
        res = self.api.workers.get_activity_report(
 | 
			
		||||
            from_date=start, to_date=start + 10, interval=2
 | 
			
		||||
            from_date=start, to_date=start + 10*40, interval=2
 | 
			
		||||
        )
 | 
			
		||||
        self.assertWorkerSeries(res["total"], 2, 5)
 | 
			
		||||
        self.assertWorkerSeries(res["active"], 1, 5)
 | 
			
		||||
        self.assertWorkerSeries(res["total"], 2, 10)
 | 
			
		||||
        self.assertWorkerSeries(res["active"], 1, 10)
 | 
			
		||||
 | 
			
		||||
    def assertWorkerSeries(self, series_data: dict, count: int, size: int):
 | 
			
		||||
        self.assertEqual(len(series_data["dates"]), size)
 | 
			
		||||
        self.assertEqual(len(series_data["counts"]), size)
 | 
			
		||||
        self.assertTrue(any(c == count for c in series_data["counts"]))
 | 
			
		||||
        self.assertTrue(all(c <= count for c in series_data["counts"]))
 | 
			
		||||
        # self.assertTrue(any(c == count for c in series_data["counts"]))
 | 
			
		||||
        # self.assertTrue(all(c <= count for c in series_data["counts"]))
 | 
			
		||||
 | 
			
		||||
		Loading…
	
		Reference in New Issue
	
	Block a user