From 0f6c4e75b7405cba48c6031d52664b4f90963a5d Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Fri, 8 Jul 2022 17:50:26 +0300 Subject: [PATCH] Fix debug images URL handling and task routing --- apiserver/bll/event/event_bll.py | 44 +++++++++++++++++-- apiserver/bll/event/event_metrics.py | 5 +-- .../bll/event/history_sample_iterator.py | 5 +-- apiserver/bll/event/metric_events_iterator.py | 3 +- 4 files changed, 44 insertions(+), 13 deletions(-) diff --git a/apiserver/bll/event/event_bll.py b/apiserver/bll/event/event_bll.py index 72acde5..3562a35 100644 --- a/apiserver/bll/event/event_bll.py +++ b/apiserver/bll/event/event_bll.py @@ -41,7 +41,7 @@ from apiserver.database.model.task.task import Task, TaskStatus from apiserver.redis_manager import redman from apiserver.timing_context import TimingContext from apiserver.tools import safe_get -from apiserver.utilities.dicts import flatten_nested_items +from apiserver.utilities.dicts import flatten_nested_items, nested_get from apiserver.utilities.json import loads # noinspection PyTypeChecker @@ -485,7 +485,7 @@ class EventBLL(object): must.append(get_metric_variants_condition(metric_variants)) query = {"bool": {"must": must}} search_args = dict( - es=self.es, company_id=company_id, event_type=event_type, routing=task_id, + es=self.es, company_id=company_id, event_type=event_type ) max_metrics, max_variants = get_max_metric_and_variant_counts( query=query, **search_args, @@ -648,6 +648,42 @@ class EventBLL(object): return events, total_events, next_scroll_id + def get_debug_image_urls( + self, company_id: str, task_id: str, after_key: dict = None + ) -> Tuple[Sequence[str], Optional[dict]]: + if check_empty_data(self.es, company_id, EventType.metrics_image): + return [], None + + es_req = { + "size": 0, + "aggs": { + "debug_images": { + "composite": { + "size": 1000, + **({"after": after_key} if after_key else {}), + "sources": [{"url": {"terms": {"field": "url"}}}], + } + } + }, + "query": { + "bool": { + "must": [{"term": {"task": task_id}}, {"exists": {"field": "url"}}] + } + }, + } + + es_response = search_company_events( + self.es, + company_id=company_id, + event_type=EventType.metrics_image, + body=es_req, + ) + res = nested_get(es_response, ("aggregations", "debug_images")) + if not res: + return [], None + + return [bucket["key"]["url"] for bucket in res["buckets"]], res.get("after_key") + def get_plot_image_urls( self, company_id: str, task_id: str, scroll_id: Optional[str] ) -> Tuple[Sequence[dict], Optional[str]]: @@ -774,7 +810,7 @@ class EventBLL(object): query = {"bool": {"must": [{"term": {"task": task_id}}]}} search_args = dict( - es=self.es, company_id=company_id, event_type=event_type, routing=task_id, + es=self.es, company_id=company_id, event_type=event_type ) max_metrics, max_variants = get_max_metric_and_variant_counts( query=query, **search_args, @@ -832,7 +868,7 @@ class EventBLL(object): } } search_args = dict( - es=self.es, company_id=company_id, event_type=event_type, routing=task_id, + es=self.es, company_id=company_id, event_type=event_type ) max_metrics, max_variants = get_max_metric_and_variant_counts( query=query, **search_args, diff --git a/apiserver/bll/event/event_metrics.py b/apiserver/bll/event/event_metrics.py index bdc2179..f8b4f26 100644 --- a/apiserver/bll/event/event_metrics.py +++ b/apiserver/bll/event/event_metrics.py @@ -214,7 +214,6 @@ class EventMetrics: es=self.es, company_id=company_id, event_type=EventType.metrics_scalar, - routing=",".join(task_ids), ) if not es_res["hits"]["total"]["value"]: return [] @@ -279,7 +278,7 @@ class EventMetrics: must.append(get_metric_variants_condition(metric_variants)) query = {"bool": {"must": must}} search_args = dict( - es=self.es, company_id=company_id, event_type=event_type, routing=task_id, + es=self.es, company_id=company_id, event_type=event_type ) max_metrics, max_variants = get_max_metric_and_variant_counts( query=query, **search_args, @@ -368,7 +367,7 @@ class EventMetrics: aggregation = self._add_aggregation_average(key.get_aggregation(interval)) query = self._get_task_metrics_query(task_id=task_id, metrics=metrics) search_args = dict( - es=self.es, company_id=company_id, event_type=event_type, routing=task_id, + es=self.es, company_id=company_id, event_type=event_type ) max_metrics, max_variants = get_max_metric_and_variant_counts( query=query, **search_args, diff --git a/apiserver/bll/event/history_sample_iterator.py b/apiserver/bll/event/history_sample_iterator.py index 3c0c535..cf90b37 100644 --- a/apiserver/bll/event/history_sample_iterator.py +++ b/apiserver/bll/event/history_sample_iterator.py @@ -182,7 +182,6 @@ class HistorySampleIterator(abc.ABC): company_id=company_id, event_type=self.event_type, body=es_req, - routing=state.task, ) hits = nested_get(es_res, ("hits", "hits")) @@ -244,7 +243,6 @@ class HistorySampleIterator(abc.ABC): company_id=company_id, event_type=self.event_type, body=es_req, - routing=state.task, ) hits = nested_get(es_res, ("hits", "hits")) @@ -345,7 +343,6 @@ class HistorySampleIterator(abc.ABC): company_id=company_id, event_type=self.event_type, body=es_req, - routing=task, ) hits = nested_get(es_res, ("hits", "hits")) @@ -393,7 +390,7 @@ class HistorySampleIterator(abc.ABC): query = {"bool": {"must": must}} search_args = dict( - es=self.es, company_id=company_id, event_type=self.event_type, routing=task, + es=self.es, company_id=company_id, event_type=self.event_type ) max_metrics, max_variants = get_max_metric_and_variant_counts( query=query, **search_args diff --git a/apiserver/bll/event/metric_events_iterator.py b/apiserver/bll/event/metric_events_iterator.py index 37c2251..6b2e739 100644 --- a/apiserver/bll/event/metric_events_iterator.py +++ b/apiserver/bll/event/metric_events_iterator.py @@ -243,7 +243,7 @@ class MetricEventsIterator: query = {"bool": {"must": must}} search_args = dict( - es=self.es, company_id=company_id, event_type=self.event_type, routing=task, + es=self.es, company_id=company_id, event_type=self.event_type ) max_metrics, max_variants = get_max_metric_and_variant_counts( query=query, **search_args @@ -389,7 +389,6 @@ class MetricEventsIterator: company_id=company_id, event_type=self.event_type, body=es_req, - routing=task_state.task, ) if "aggregations" not in es_res: return task_state.task, []