Fix debug images URL handling and task routing

This commit is contained in:
allegroai 2022-07-08 17:50:26 +03:00
parent a41ae112a1
commit 0f6c4e75b7
4 changed files with 44 additions and 13 deletions

View File

@ -41,7 +41,7 @@ from apiserver.database.model.task.task import Task, TaskStatus
from apiserver.redis_manager import redman
from apiserver.timing_context import TimingContext
from apiserver.tools import safe_get
from apiserver.utilities.dicts import flatten_nested_items
from apiserver.utilities.dicts import flatten_nested_items, nested_get
from apiserver.utilities.json import loads
# noinspection PyTypeChecker
@ -485,7 +485,7 @@ class EventBLL(object):
must.append(get_metric_variants_condition(metric_variants))
query = {"bool": {"must": must}}
search_args = dict(
es=self.es, company_id=company_id, event_type=event_type, routing=task_id,
es=self.es, company_id=company_id, event_type=event_type
)
max_metrics, max_variants = get_max_metric_and_variant_counts(
query=query, **search_args,
@ -648,6 +648,42 @@ class EventBLL(object):
return events, total_events, next_scroll_id
def get_debug_image_urls(
self, company_id: str, task_id: str, after_key: dict = None
) -> Tuple[Sequence[str], Optional[dict]]:
if check_empty_data(self.es, company_id, EventType.metrics_image):
return [], None
es_req = {
"size": 0,
"aggs": {
"debug_images": {
"composite": {
"size": 1000,
**({"after": after_key} if after_key else {}),
"sources": [{"url": {"terms": {"field": "url"}}}],
}
}
},
"query": {
"bool": {
"must": [{"term": {"task": task_id}}, {"exists": {"field": "url"}}]
}
},
}
es_response = search_company_events(
self.es,
company_id=company_id,
event_type=EventType.metrics_image,
body=es_req,
)
res = nested_get(es_response, ("aggregations", "debug_images"))
if not res:
return [], None
return [bucket["key"]["url"] for bucket in res["buckets"]], res.get("after_key")
def get_plot_image_urls(
self, company_id: str, task_id: str, scroll_id: Optional[str]
) -> Tuple[Sequence[dict], Optional[str]]:
@ -774,7 +810,7 @@ class EventBLL(object):
query = {"bool": {"must": [{"term": {"task": task_id}}]}}
search_args = dict(
es=self.es, company_id=company_id, event_type=event_type, routing=task_id,
es=self.es, company_id=company_id, event_type=event_type
)
max_metrics, max_variants = get_max_metric_and_variant_counts(
query=query, **search_args,
@ -832,7 +868,7 @@ class EventBLL(object):
}
}
search_args = dict(
es=self.es, company_id=company_id, event_type=event_type, routing=task_id,
es=self.es, company_id=company_id, event_type=event_type
)
max_metrics, max_variants = get_max_metric_and_variant_counts(
query=query, **search_args,

View File

@ -214,7 +214,6 @@ class EventMetrics:
es=self.es,
company_id=company_id,
event_type=EventType.metrics_scalar,
routing=",".join(task_ids),
)
if not es_res["hits"]["total"]["value"]:
return []
@ -279,7 +278,7 @@ class EventMetrics:
must.append(get_metric_variants_condition(metric_variants))
query = {"bool": {"must": must}}
search_args = dict(
es=self.es, company_id=company_id, event_type=event_type, routing=task_id,
es=self.es, company_id=company_id, event_type=event_type
)
max_metrics, max_variants = get_max_metric_and_variant_counts(
query=query, **search_args,
@ -368,7 +367,7 @@ class EventMetrics:
aggregation = self._add_aggregation_average(key.get_aggregation(interval))
query = self._get_task_metrics_query(task_id=task_id, metrics=metrics)
search_args = dict(
es=self.es, company_id=company_id, event_type=event_type, routing=task_id,
es=self.es, company_id=company_id, event_type=event_type
)
max_metrics, max_variants = get_max_metric_and_variant_counts(
query=query, **search_args,

View File

@ -182,7 +182,6 @@ class HistorySampleIterator(abc.ABC):
company_id=company_id,
event_type=self.event_type,
body=es_req,
routing=state.task,
)
hits = nested_get(es_res, ("hits", "hits"))
@ -244,7 +243,6 @@ class HistorySampleIterator(abc.ABC):
company_id=company_id,
event_type=self.event_type,
body=es_req,
routing=state.task,
)
hits = nested_get(es_res, ("hits", "hits"))
@ -345,7 +343,6 @@ class HistorySampleIterator(abc.ABC):
company_id=company_id,
event_type=self.event_type,
body=es_req,
routing=task,
)
hits = nested_get(es_res, ("hits", "hits"))
@ -393,7 +390,7 @@ class HistorySampleIterator(abc.ABC):
query = {"bool": {"must": must}}
search_args = dict(
es=self.es, company_id=company_id, event_type=self.event_type, routing=task,
es=self.es, company_id=company_id, event_type=self.event_type
)
max_metrics, max_variants = get_max_metric_and_variant_counts(
query=query, **search_args

View File

@ -243,7 +243,7 @@ class MetricEventsIterator:
query = {"bool": {"must": must}}
search_args = dict(
es=self.es, company_id=company_id, event_type=self.event_type, routing=task,
es=self.es, company_id=company_id, event_type=self.event_type
)
max_metrics, max_variants = get_max_metric_and_variant_counts(
query=query, **search_args
@ -389,7 +389,6 @@ class MetricEventsIterator:
company_id=company_id,
event_type=self.event_type,
body=es_req,
routing=task_state.task,
)
if "aggregations" not in es_res:
return task_state.task, []