Better support for queue metrics and queue metrics refresh on sample

This commit is contained in:
allegroai 2022-07-08 17:36:46 +03:00
parent 62d5779bd5
commit b41ab8c550
6 changed files with 109 additions and 55 deletions

View File

@ -59,6 +59,7 @@ class GetMetricsRequest(Base):
from_date = FloatField(required=True, validators=validators.Min(0)) from_date = FloatField(required=True, validators=validators.Min(0))
to_date = FloatField(required=True, validators=validators.Min(0)) to_date = FloatField(required=True, validators=validators.Min(0))
interval = IntField(required=True, validators=validators.Min(1)) interval = IntField(required=True, validators=validators.Min(1))
refresh = BoolField(default=False)
class QueueMetrics(Base): class QueueMetrics(Base):

View File

@ -7,7 +7,7 @@ from elasticsearch import Elasticsearch
from apiserver import database from apiserver import database
from apiserver.es_factory import es_factory from apiserver.es_factory import es_factory
from apiserver.apierrors import errors from apiserver.apierrors import errors
from apiserver.bll.queue.queue_metrics import QueueMetrics from apiserver.bll.queue.queue_metrics import QueueMetrics, MetricsRefresher
from apiserver.bll.workers import WorkerBLL from apiserver.bll.workers import WorkerBLL
from apiserver.config_repo import config from apiserver.config_repo import config
from apiserver.database.errors import translate_errors_context from apiserver.database.errors import translate_errors_context
@ -51,10 +51,7 @@ class QueueBLL(object):
return queue return queue
def get_by_name( def get_by_name(
self, self, company_id: str, queue_name: str, only: Optional[Sequence[str]] = None,
company_id: str,
queue_name: str,
only: Optional[Sequence[str]] = None,
) -> Queue: ) -> Queue:
qs = Queue.objects(name=queue_name, company=company_id) qs = Queue.objects(name=queue_name, company=company_id)
if only: if only:
@ -139,10 +136,7 @@ class QueueBLL(object):
queue.delete() queue.delete()
def get_all( def get_all(
self, self, company_id: str, query_dict: dict, ret_params: dict = None,
company_id: str,
query_dict: dict,
ret_params: dict = None,
) -> Sequence[dict]: ) -> Sequence[dict]:
"""Get all the queues according to the query""" """Get all the queues according to the query"""
with translate_errors_context(): with translate_errors_context():
@ -154,10 +148,7 @@ class QueueBLL(object):
) )
def get_queue_infos( def get_queue_infos(
self, self, company_id: str, query_dict: dict, ret_params: dict = None,
company_id: str,
query_dict: dict,
ret_params: dict = None,
) -> Sequence[dict]: ) -> Sequence[dict]:
""" """
Get infos on all the company queues, including queue tasks and workers Get infos on all the company queues, including queue tasks and workers
@ -300,3 +291,6 @@ class QueueBLL(object):
) )
return new_position return new_position
MetricsRefresher.start(queue_metrics=QueueBLL().metrics)

View File

@ -1,8 +1,10 @@
import json
from collections import defaultdict from collections import defaultdict
from datetime import datetime from datetime import datetime
from time import sleep
from typing import Sequence from typing import Sequence
import elasticsearch.helpers from boltons.typeutils import classproperty
from elasticsearch import Elasticsearch from elasticsearch import Elasticsearch
from apiserver.es_factory import es_factory from apiserver.es_factory import es_factory
@ -11,18 +13,24 @@ from apiserver.bll.query import Builder as QueryBuilder
from apiserver.config_repo import config from apiserver.config_repo import config
from apiserver.database.errors import translate_errors_context from apiserver.database.errors import translate_errors_context
from apiserver.database.model.queue import Queue, Entry from apiserver.database.model.queue import Queue, Entry
from apiserver.redis_manager import redman
from apiserver.timing_context import TimingContext from apiserver.timing_context import TimingContext
from apiserver.utilities.threads_manager import ThreadsManager
log = config.logger(__file__) log = config.logger(__file__)
_conf = config.get("services.queues")
_queue_metrics_key_pattern = "queue_metrics_{queue}"
redis = redman.connection("apiserver")
class QueueMetrics: class EsKeys:
class EsKeys:
WAITING_TIME_FIELD = "average_waiting_time" WAITING_TIME_FIELD = "average_waiting_time"
QUEUE_LENGTH_FIELD = "queue_length" QUEUE_LENGTH_FIELD = "queue_length"
TIMESTAMP_FIELD = "timestamp" TIMESTAMP_FIELD = "timestamp"
QUEUE_FIELD = "queue" QUEUE_FIELD = "queue"
class QueueMetrics:
def __init__(self, es: Elasticsearch): def __init__(self, es: Elasticsearch):
self.es = es self.es = es
@ -49,7 +57,7 @@ class QueueMetrics:
total_waiting_in_secs = sum((now - e.added).total_seconds() for e in entries) total_waiting_in_secs = sum((now - e.added).total_seconds() for e in entries)
return total_waiting_in_secs / len(entries) return total_waiting_in_secs / len(entries)
def log_queue_metrics_to_es(self, company_id: str, queues: Sequence[Queue]) -> bool: def log_queue_metrics_to_es(self, company_id: str, queues: Sequence[Queue]) -> int:
""" """
Calculate and write queue statistics (avg waiting time and queue length) to Elastic Calculate and write queue statistics (avg waiting time and queue length) to Elastic
:return: True if the write to es was successful, false otherwise :return: True if the write to es was successful, false otherwise
@ -63,23 +71,22 @@ class QueueMetrics:
def make_doc(queue: Queue) -> dict: def make_doc(queue: Queue) -> dict:
entries = [e for e in queue.entries if e.added] entries = [e for e in queue.entries if e.added]
return dict( return {
_index=es_index, EsKeys.TIMESTAMP_FIELD: timestamp,
_source={ EsKeys.QUEUE_FIELD: queue.id,
self.EsKeys.TIMESTAMP_FIELD: timestamp, EsKeys.WAITING_TIME_FIELD: self._calc_avg_waiting_time(entries),
self.EsKeys.QUEUE_FIELD: queue.id, EsKeys.QUEUE_LENGTH_FIELD: len(entries),
self.EsKeys.WAITING_TIME_FIELD: self._calc_avg_waiting_time( }
entries
),
self.EsKeys.QUEUE_LENGTH_FIELD: len(entries),
},
)
actions = list(map(make_doc, queues)) logged = 0
for q in queues:
queue_doc = make_doc(q)
self.es.index(index=es_index, body=queue_doc)
redis_key = _queue_metrics_key_pattern.format(queue=q.id)
redis.set(redis_key, json.dumps(queue_doc))
logged += 1
es_res = elasticsearch.helpers.bulk(self.es, actions) return logged
added, errors = es_res[:2]
return (added == len(actions)) and not errors
def _log_current_metrics(self, company_id: str, queue_ids=Sequence[str]): def _log_current_metrics(self, company_id: str, queue_ids=Sequence[str]):
query = dict(company=company_id) query = dict(company=company_id)
@ -90,8 +97,7 @@ class QueueMetrics:
def _search_company_metrics(self, company_id: str, es_req: dict) -> dict: def _search_company_metrics(self, company_id: str, es_req: dict) -> dict:
return self.es.search( return self.es.search(
index=f"{self._queue_metrics_prefix_for_company(company_id)}*", index=f"{self._queue_metrics_prefix_for_company(company_id)}*", body=es_req,
body=es_req,
) )
@classmethod @classmethod
@ -105,13 +111,13 @@ class QueueMetrics:
return { return {
"dates": { "dates": {
"date_histogram": { "date_histogram": {
"field": cls.EsKeys.TIMESTAMP_FIELD, "field": EsKeys.TIMESTAMP_FIELD,
"fixed_interval": f"{interval}s", "fixed_interval": f"{interval}s",
"min_doc_count": 1, "min_doc_count": 1,
}, },
"aggs": { "aggs": {
"queues": { "queues": {
"terms": {"field": cls.EsKeys.QUEUE_FIELD}, "terms": {"field": EsKeys.QUEUE_FIELD},
"aggs": cls._get_top_waiting_agg(), "aggs": cls._get_top_waiting_agg(),
} }
}, },
@ -128,13 +134,13 @@ class QueueMetrics:
"top_avg_waiting": { "top_avg_waiting": {
"top_hits": { "top_hits": {
"sort": [ "sort": [
{cls.EsKeys.WAITING_TIME_FIELD: {"order": "desc"}}, {EsKeys.WAITING_TIME_FIELD: {"order": "desc"}},
{cls.EsKeys.QUEUE_LENGTH_FIELD: {"order": "desc"}}, {EsKeys.QUEUE_LENGTH_FIELD: {"order": "desc"}},
], ],
"_source": { "_source": {
"includes": [ "includes": [
cls.EsKeys.WAITING_TIME_FIELD, EsKeys.WAITING_TIME_FIELD,
cls.EsKeys.QUEUE_LENGTH_FIELD, EsKeys.QUEUE_LENGTH_FIELD,
] ]
}, },
"size": 1, "size": 1,
@ -149,6 +155,7 @@ class QueueMetrics:
to_date: float, to_date: float,
interval: int, interval: int,
queue_ids: Sequence[str], queue_ids: Sequence[str],
refresh: bool = False,
) -> dict: ) -> dict:
""" """
Get the company queue metrics in the specified time range. Get the company queue metrics in the specified time range.
@ -158,7 +165,8 @@ class QueueMetrics:
In case no queue ids are specified the avg across all the In case no queue ids are specified the avg across all the
company queues is calculated for each metric company queues is calculated for each metric
""" """
# self._log_current_metrics(company, queue_ids=queue_ids) if refresh:
self._log_current_metrics(company_id, queue_ids=queue_ids)
if from_date >= to_date: if from_date >= to_date:
raise bad_request.FieldsValueError("from_date must be less than to_date") raise bad_request.FieldsValueError("from_date must be less than to_date")
@ -256,7 +264,47 @@ class QueueMetrics:
continue continue
res = queue_data["top_avg_waiting"]["hits"]["hits"][0]["_source"] res = queue_data["top_avg_waiting"]["hits"]["hits"][0]["_source"]
queue_metrics[queue_data["key"]] = { queue_metrics[queue_data["key"]] = {
"queue_length": res[cls.EsKeys.QUEUE_LENGTH_FIELD], "queue_length": res[EsKeys.QUEUE_LENGTH_FIELD],
"avg_waiting_time": res[cls.EsKeys.WAITING_TIME_FIELD], "avg_waiting_time": res[EsKeys.WAITING_TIME_FIELD],
} }
return queue_metrics return queue_metrics
class MetricsRefresher:
threads = ThreadsManager()
@classproperty
def watch_interval_sec(self):
return _conf.get("metrics_refresh_interval_sec", 300)
@classmethod
@threads.register("queue_metrics_refresh_watchdog", daemon=True)
def start(cls, queue_metrics: QueueMetrics):
if not cls.watch_interval_sec:
return
sleep(10)
while not ThreadsManager.terminating:
try:
for queue in Queue.objects():
timestamp = es_factory.get_timestamp_millis()
doc_time = 0
try:
redis_key = _queue_metrics_key_pattern.format(queue=queue.id)
data = redis.get(redis_key)
if data:
queue_doc = json.loads(data)
doc_time = int(queue_doc.get(EsKeys.TIMESTAMP_FIELD))
except Exception as ex:
log.exception(
f"Error reading queue metrics data for queue {queue.id}: {str(ex)}"
)
if (
not doc_time
or (timestamp - doc_time) > cls.watch_interval_sec * 1000
):
queue_metrics.log_queue_metrics_to_es(queue.company, [queue])
except Exception as ex:
log.exception(f"Failed collecting queue metrics: {str(ex)}")
sleep(60)

View File

@ -0,0 +1,5 @@
{
metrics_before_from_date: 3600
# interval in seconds to update queue metrics. Put 0 to disable
metrics_refresh_interval_sec: 300
}

View File

@ -634,6 +634,13 @@ get_queue_metrics : {
} }
} }
} }
"999.0": ${get_queue_metrics."2.4"} {
request.properties.refresh {
type: boolean
default: false
description: If set then the new queue metrics is taken
}
}
} }
add_or_update_metadata { add_or_update_metadata {
"2.13" { "2.13" {

View File

@ -127,9 +127,7 @@ def add_task(call: APICall, company_id, req_model: TaskRequest):
@endpoint("queues.get_next_task", request_data_model=GetNextTaskRequest) @endpoint("queues.get_next_task", request_data_model=GetNextTaskRequest)
def get_next_task(call: APICall, company_id, req_model: GetNextTaskRequest): def get_next_task(call: APICall, company_id, req_model: GetNextTaskRequest):
entry = queue_bll.get_next_task( entry = queue_bll.get_next_task(company_id=company_id, queue_id=req_model.queue)
company_id=company_id, queue_id=req_model.queue
)
if entry: if entry:
data = {"entry": entry.to_proper_dict()} data = {"entry": entry.to_proper_dict()}
if req_model.get_task_info: if req_model.get_task_info:
@ -224,14 +222,15 @@ def move_task_to_back(call: APICall, company_id, req_model: TaskRequest):
response_data_model=GetMetricsResponse, response_data_model=GetMetricsResponse,
) )
def get_queue_metrics( def get_queue_metrics(
call: APICall, company_id, req_model: GetMetricsRequest call: APICall, company_id, request: GetMetricsRequest
) -> GetMetricsResponse: ) -> GetMetricsResponse:
ret = queue_bll.metrics.get_queue_metrics( ret = queue_bll.metrics.get_queue_metrics(
company_id=company_id, company_id=company_id,
from_date=req_model.from_date, from_date=request.from_date,
to_date=req_model.to_date, to_date=request.to_date,
interval=req_model.interval, interval=request.interval,
queue_ids=req_model.queue_ids, queue_ids=request.queue_ids,
refresh=request.refresh,
) )
queue_dicts = { queue_dicts = {