2021-01-05 16:20:38 +00:00
|
|
|
from enum import Enum
|
2021-07-25 11:29:41 +00:00
|
|
|
from typing import Union, Sequence, Mapping
|
2021-01-05 16:20:38 +00:00
|
|
|
|
|
|
|
from boltons.typeutils import classproperty
|
|
|
|
from elasticsearch import Elasticsearch
|
|
|
|
|
|
|
|
from apiserver.config_repo import config
|
|
|
|
|
|
|
|
|
|
|
|
class EventType(Enum):
|
|
|
|
metrics_scalar = "training_stats_scalar"
|
|
|
|
metrics_vector = "training_stats_vector"
|
|
|
|
metrics_image = "training_debug_image"
|
|
|
|
metrics_plot = "plot"
|
|
|
|
task_log = "log"
|
2021-01-05 16:21:11 +00:00
|
|
|
all = "*"
|
2021-01-05 16:20:38 +00:00
|
|
|
|
|
|
|
|
2021-07-25 11:29:41 +00:00
|
|
|
MetricVariants = Mapping[str, Sequence[str]]
|
|
|
|
|
|
|
|
|
2021-01-05 16:20:38 +00:00
|
|
|
class EventSettings:
|
|
|
|
@classproperty
|
|
|
|
def max_workers(self):
|
|
|
|
return config.get("services.events.events_retrieval.max_metrics_concurrency", 4)
|
|
|
|
|
|
|
|
@classproperty
|
|
|
|
def state_expiration_sec(self):
|
|
|
|
return config.get(
|
|
|
|
f"services.events.events_retrieval.state_expiration_sec", 3600
|
|
|
|
)
|
|
|
|
|
|
|
|
@classproperty
|
|
|
|
def max_metrics_count(self):
|
|
|
|
return config.get("services.events.events_retrieval.max_metrics_count", 100)
|
|
|
|
|
|
|
|
@classproperty
|
|
|
|
def max_variants_count(self):
|
|
|
|
return config.get("services.events.events_retrieval.max_variants_count", 100)
|
|
|
|
|
|
|
|
|
|
|
|
def get_index_name(company_id: str, event_type: str):
|
|
|
|
event_type = event_type.lower().replace(" ", "_")
|
|
|
|
return f"events-{event_type}-{company_id}"
|
|
|
|
|
|
|
|
|
2021-01-05 16:21:11 +00:00
|
|
|
def check_empty_data(es: Elasticsearch, company_id: str, event_type: EventType) -> bool:
|
|
|
|
es_index = get_index_name(company_id, event_type.value)
|
2021-01-05 16:20:38 +00:00
|
|
|
if not es.indices.exists(es_index):
|
|
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
def search_company_events(
|
|
|
|
es: Elasticsearch,
|
|
|
|
company_id: Union[str, Sequence[str]],
|
2021-01-05 16:21:11 +00:00
|
|
|
event_type: EventType,
|
2021-01-05 16:20:38 +00:00
|
|
|
body: dict,
|
|
|
|
**kwargs,
|
|
|
|
) -> dict:
|
2021-01-05 16:21:11 +00:00
|
|
|
es_index = get_index_name(company_id, event_type.value)
|
2021-01-05 16:20:38 +00:00
|
|
|
return es.search(index=es_index, body=body, **kwargs)
|
|
|
|
|
|
|
|
|
|
|
|
def delete_company_events(
|
2021-01-05 16:21:11 +00:00
|
|
|
es: Elasticsearch, company_id: str, event_type: EventType, body: dict, **kwargs
|
2021-01-05 16:20:38 +00:00
|
|
|
) -> dict:
|
2021-01-05 16:21:11 +00:00
|
|
|
es_index = get_index_name(company_id, event_type.value)
|
2022-02-13 18:01:25 +00:00
|
|
|
return es.delete_by_query(
|
|
|
|
index=es_index, body=body, conflicts="proceed", **kwargs
|
|
|
|
)
|
2021-07-25 11:29:41 +00:00
|
|
|
|
|
|
|
|
2022-02-13 17:26:03 +00:00
|
|
|
def count_company_events(
|
|
|
|
es: Elasticsearch, company_id: str, event_type: EventType, body: dict, **kwargs
|
|
|
|
) -> dict:
|
|
|
|
es_index = get_index_name(company_id, event_type.value)
|
|
|
|
return es.count(index=es_index, body=body, **kwargs)
|
|
|
|
|
|
|
|
|
2022-02-13 18:01:25 +00:00
|
|
|
def get_metric_variants_condition(metric_variants: MetricVariants,) -> Sequence:
|
2021-07-25 11:29:41 +00:00
|
|
|
conditions = [
|
|
|
|
{
|
|
|
|
"bool": {
|
|
|
|
"must": [
|
|
|
|
{"term": {"metric": metric}},
|
|
|
|
{"terms": {"variant": variants}},
|
|
|
|
]
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if variants
|
|
|
|
else {"term": {"metric": metric}}
|
|
|
|
for metric, variants in metric_variants.items()
|
|
|
|
]
|
|
|
|
|
|
|
|
return {"bool": {"should": conditions}}
|