Set configurable and consistent limits on variants and metrics across different iterators

This commit is contained in:
allegroai 2021-01-05 18:02:01 +02:00
parent 0303c3525f
commit 8b0afd47a6
5 changed files with 57 additions and 24 deletions

View File

@ -46,6 +46,7 @@ class MetricScrollState(Base):
class DebugImageEventsScrollState(Base, JsonSerializableMixin): class DebugImageEventsScrollState(Base, JsonSerializableMixin):
id: str = StringField(required=True) id: str = StringField(required=True)
metrics: Sequence[MetricScrollState] = ListField([MetricScrollState]) metrics: Sequence[MetricScrollState] = ListField([MetricScrollState])
warning: str = StringField()
@attr.s(auto_attribs=True) @attr.s(auto_attribs=True)
@ -65,7 +66,7 @@ class DebugImagesIterator:
@property @property
def _max_workers(self): def _max_workers(self):
return config.get("services.events.max_metrics_concurrency", 4) return config.get("services.events.events_retrieval.max_metrics_concurrency", 4)
def __init__(self, redis: StrictRedis, es: Elasticsearch): def __init__(self, redis: StrictRedis, es: Elasticsearch):
self.es = es self.es = es
@ -219,14 +220,16 @@ class DebugImagesIterator:
"metrics": { "metrics": {
"terms": { "terms": {
"field": "metric", "field": "metric",
"size": EventMetrics.MAX_METRICS_COUNT, "size": EventMetrics.max_metrics_count,
"order": {"_key": "asc"},
}, },
"aggs": { "aggs": {
"last_event_timestamp": {"max": {"field": "timestamp"}}, "last_event_timestamp": {"max": {"field": "timestamp"}},
"variants": { "variants": {
"terms": { "terms": {
"field": "variant", "field": "variant",
"size": EventMetrics.MAX_VARIANTS_COUNT, "size": EventMetrics.max_variants_count,
"order": {"_key": "asc"},
}, },
"aggs": { "aggs": {
"urls": { "urls": {
@ -379,7 +382,8 @@ class DebugImagesIterator:
"variants": { "variants": {
"terms": { "terms": {
"field": "variant", "field": "variant",
"size": EventMetrics.MAX_VARIANTS_COUNT, "size": EventMetrics.max_variants_count,
"order": {"_key": "asc"},
}, },
"aggs": { "aggs": {
"events": { "events": {

View File

@ -33,6 +33,7 @@ class DebugSampleHistoryState(Base, JsonSerializableMixin):
reached_first: bool = BoolField() reached_first: bool = BoolField()
reached_last: bool = BoolField() reached_last: bool = BoolField()
variant_states: Sequence[VariantState] = ListField([VariantState]) variant_states: Sequence[VariantState] = ListField([VariantState])
warning: str = StringField()
@attr.s(auto_attribs=True) @attr.s(auto_attribs=True)
@ -317,7 +318,8 @@ class DebugSampleHistory:
# all variants that sent debug images # all variants that sent debug images
"terms": { "terms": {
"field": "variant", "field": "variant",
"size": EventMetrics.MAX_VARIANTS_COUNT, "size": EventMetrics.max_variants_count,
"order": {"_key": "asc"},
}, },
"aggs": { "aggs": {
"last_iter": {"max": {"field": "iter"}}, "last_iter": {"max": {"field": "iter"}},

View File

@ -435,13 +435,15 @@ class EventBLL(object):
"metrics": { "metrics": {
"terms": { "terms": {
"field": "metric", "field": "metric",
"size": EventMetrics.MAX_METRICS_COUNT, "size": EventMetrics.max_metrics_count,
"order": {"_key": "asc"},
}, },
"aggs": { "aggs": {
"variants": { "variants": {
"terms": { "terms": {
"field": "variant", "field": "variant",
"size": EventMetrics.MAX_VARIANTS_COUNT, "size": EventMetrics.max_variants_count,
"order": {"_key": "asc"},
}, },
"aggs": { "aggs": {
"iters": { "iters": {
@ -661,13 +663,15 @@ class EventBLL(object):
"metrics": { "metrics": {
"terms": { "terms": {
"field": "metric", "field": "metric",
"size": EventMetrics.MAX_METRICS_COUNT, "size": EventMetrics.max_metrics_count,
"order": {"_key": "asc"},
}, },
"aggs": { "aggs": {
"variants": { "variants": {
"terms": { "terms": {
"field": "variant", "field": "variant",
"size": EventMetrics.MAX_VARIANTS_COUNT, "size": EventMetrics.max_variants_count,
"order": {"_key": "asc"},
} }
} }
}, },
@ -710,14 +714,14 @@ class EventBLL(object):
"metrics": { "metrics": {
"terms": { "terms": {
"field": "metric", "field": "metric",
"size": EventMetrics.MAX_METRICS_COUNT, "size": EventMetrics.max_metrics_count,
"order": {"_key": "asc"}, "order": {"_key": "asc"},
}, },
"aggs": { "aggs": {
"variants": { "variants": {
"terms": { "terms": {
"field": "variant", "field": "variant",
"size": EventMetrics.MAX_VARIANTS_COUNT, "size": EventMetrics.max_variants_count,
"order": {"_key": "asc"}, "order": {"_key": "asc"},
}, },
"aggs": { "aggs": {

View File

@ -7,6 +7,7 @@ from functools import partial
from operator import itemgetter from operator import itemgetter
from typing import Sequence, Tuple from typing import Sequence, Tuple
from boltons.typeutils import classproperty
from elasticsearch import Elasticsearch from elasticsearch import Elasticsearch
from mongoengine import Q from mongoengine import Q
@ -30,17 +31,23 @@ class EventType(Enum):
class EventMetrics: class EventMetrics:
MAX_METRICS_COUNT = 100
MAX_VARIANTS_COUNT = 100
MAX_AGGS_ELEMENTS_COUNT = 50 MAX_AGGS_ELEMENTS_COUNT = 50
MAX_SAMPLE_BUCKETS = 6000 MAX_SAMPLE_BUCKETS = 6000
def __init__(self, es: Elasticsearch): def __init__(self, es: Elasticsearch):
self.es = es self.es = es
@classproperty
def max_metrics_count(self):
return config.get("services.events.events_retrieval.max_metrics_count", 100)
@classproperty
def max_variants_count(self):
return config.get("services.events.events_retrieval.max_variants_count", 100)
@property @property
def _max_concurrency(self): def _max_concurrency(self):
return config.get("services.events.max_metrics_concurrency", 4) return config.get("services.events.events_retrieval.max_metrics_concurrency", 4)
@staticmethod @staticmethod
def get_index_name(company_id, event_type): def get_index_name(company_id, event_type):
@ -207,12 +214,17 @@ class EventMetrics:
"query": {"term": {"task": task_id}}, "query": {"term": {"task": task_id}},
"aggs": { "aggs": {
"metrics": { "metrics": {
"terms": {"field": "metric", "size": self.MAX_METRICS_COUNT}, "terms": {
"field": "metric",
"size": self.max_metrics_count,
"order": {"_key": "asc"},
},
"aggs": { "aggs": {
"variants": { "variants": {
"terms": { "terms": {
"field": "variant", "field": "variant",
"size": self.MAX_VARIANTS_COUNT, "size": self.max_variants_count,
"order": {"_key": "asc"},
}, },
"aggs": { "aggs": {
"count": {"value_count": {"field": field}}, "count": {"value_count": {"field": field}},
@ -281,15 +293,15 @@ class EventMetrics:
"metrics": { "metrics": {
"terms": { "terms": {
"field": "metric", "field": "metric",
"size": self.MAX_METRICS_COUNT, "size": self.max_metrics_count,
"order": {"_key": "desc"}, "order": {"_key": "asc"},
}, },
"aggs": { "aggs": {
"variants": { "variants": {
"terms": { "terms": {
"field": "variant", "field": "variant",
"size": self.MAX_VARIANTS_COUNT, "size": self.max_variants_count,
"order": {"_key": "desc"}, "order": {"_key": "asc"},
}, },
"aggs": aggregation, "aggs": aggregation,
} }
@ -396,7 +408,11 @@ class EventMetrics:
}, },
"aggs": { "aggs": {
"metrics": { "metrics": {
"terms": {"field": "metric", "size": self.MAX_METRICS_COUNT} "terms": {
"field": "metric",
"size": self.max_metrics_count,
"order": {"_key": "asc"},
}
} }
}, },
} }

View File

@ -4,12 +4,19 @@ ignore_iteration {
metrics: [":monitor:machine", ":monitor:gpu"] metrics: [":monitor:machine", ":monitor:gpu"]
} }
# max number of concurrent queries to ES when calculating events metrics
# should not exceed the amount of concurrent connections set in the ES driver
max_metrics_concurrency: 4
events_retrieval { events_retrieval {
state_expiration_sec: 3600 state_expiration_sec: 3600
# max number of concurrent queries to ES when calculating events metrics
# should not exceed the amount of concurrent connections set in the ES driver
max_metrics_concurrency: 4
# the max amount of metrics to aggregate on
max_metrics_count: 100
# the max amount of variants to aggregate on
max_variants_count: 100
} }
# if set then plot str will be checked for the valid json on plot add # if set then plot str will be checked for the valid json on plot add