From b41ab8c550684aca0888f01936b7e9fbd80345b8 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Fri, 8 Jul 2022 17:36:46 +0300 Subject: [PATCH] Better support for queue metrics and queue metrics refresh on sample --- apiserver/apimodels/queues.py | 1 + apiserver/bll/queue/queue_bll.py | 20 ++- apiserver/bll/queue/queue_metrics.py | 116 +++++++++++++----- apiserver/config/default/services/queues.conf | 5 + apiserver/schema/services/queues.conf | 7 ++ apiserver/services/queues.py | 15 ++- 6 files changed, 109 insertions(+), 55 deletions(-) create mode 100644 apiserver/config/default/services/queues.conf diff --git a/apiserver/apimodels/queues.py b/apiserver/apimodels/queues.py index c749785..7a593c6 100644 --- a/apiserver/apimodels/queues.py +++ b/apiserver/apimodels/queues.py @@ -59,6 +59,7 @@ class GetMetricsRequest(Base): from_date = FloatField(required=True, validators=validators.Min(0)) to_date = FloatField(required=True, validators=validators.Min(0)) interval = IntField(required=True, validators=validators.Min(1)) + refresh = BoolField(default=False) class QueueMetrics(Base): diff --git a/apiserver/bll/queue/queue_bll.py b/apiserver/bll/queue/queue_bll.py index d8b698f..2fdf85e 100644 --- a/apiserver/bll/queue/queue_bll.py +++ b/apiserver/bll/queue/queue_bll.py @@ -7,7 +7,7 @@ from elasticsearch import Elasticsearch from apiserver import database from apiserver.es_factory import es_factory from apiserver.apierrors import errors -from apiserver.bll.queue.queue_metrics import QueueMetrics +from apiserver.bll.queue.queue_metrics import QueueMetrics, MetricsRefresher from apiserver.bll.workers import WorkerBLL from apiserver.config_repo import config from apiserver.database.errors import translate_errors_context @@ -51,10 +51,7 @@ class QueueBLL(object): return queue def get_by_name( - self, - company_id: str, - queue_name: str, - only: Optional[Sequence[str]] = None, + self, company_id: str, queue_name: str, only: Optional[Sequence[str]] = None, ) -> Queue: qs = Queue.objects(name=queue_name, company=company_id) if only: @@ -139,10 +136,7 @@ class QueueBLL(object): queue.delete() def get_all( - self, - company_id: str, - query_dict: dict, - ret_params: dict = None, + self, company_id: str, query_dict: dict, ret_params: dict = None, ) -> Sequence[dict]: """Get all the queues according to the query""" with translate_errors_context(): @@ -154,10 +148,7 @@ class QueueBLL(object): ) def get_queue_infos( - self, - company_id: str, - query_dict: dict, - ret_params: dict = None, + self, company_id: str, query_dict: dict, ret_params: dict = None, ) -> Sequence[dict]: """ Get infos on all the company queues, including queue tasks and workers @@ -300,3 +291,6 @@ class QueueBLL(object): ) return new_position + + +MetricsRefresher.start(queue_metrics=QueueBLL().metrics) diff --git a/apiserver/bll/queue/queue_metrics.py b/apiserver/bll/queue/queue_metrics.py index 03bc129..40d4d8f 100644 --- a/apiserver/bll/queue/queue_metrics.py +++ b/apiserver/bll/queue/queue_metrics.py @@ -1,8 +1,10 @@ +import json from collections import defaultdict from datetime import datetime +from time import sleep from typing import Sequence -import elasticsearch.helpers +from boltons.typeutils import classproperty from elasticsearch import Elasticsearch from apiserver.es_factory import es_factory @@ -11,18 +13,24 @@ from apiserver.bll.query import Builder as QueryBuilder from apiserver.config_repo import config from apiserver.database.errors import translate_errors_context from apiserver.database.model.queue import Queue, Entry +from apiserver.redis_manager import redman from apiserver.timing_context import TimingContext +from apiserver.utilities.threads_manager import ThreadsManager log = config.logger(__file__) +_conf = config.get("services.queues") +_queue_metrics_key_pattern = "queue_metrics_{queue}" +redis = redman.connection("apiserver") + + +class EsKeys: + WAITING_TIME_FIELD = "average_waiting_time" + QUEUE_LENGTH_FIELD = "queue_length" + TIMESTAMP_FIELD = "timestamp" + QUEUE_FIELD = "queue" class QueueMetrics: - class EsKeys: - WAITING_TIME_FIELD = "average_waiting_time" - QUEUE_LENGTH_FIELD = "queue_length" - TIMESTAMP_FIELD = "timestamp" - QUEUE_FIELD = "queue" - def __init__(self, es: Elasticsearch): self.es = es @@ -49,7 +57,7 @@ class QueueMetrics: total_waiting_in_secs = sum((now - e.added).total_seconds() for e in entries) return total_waiting_in_secs / len(entries) - def log_queue_metrics_to_es(self, company_id: str, queues: Sequence[Queue]) -> bool: + def log_queue_metrics_to_es(self, company_id: str, queues: Sequence[Queue]) -> int: """ Calculate and write queue statistics (avg waiting time and queue length) to Elastic :return: True if the write to es was successful, false otherwise @@ -63,23 +71,22 @@ class QueueMetrics: def make_doc(queue: Queue) -> dict: entries = [e for e in queue.entries if e.added] - return dict( - _index=es_index, - _source={ - self.EsKeys.TIMESTAMP_FIELD: timestamp, - self.EsKeys.QUEUE_FIELD: queue.id, - self.EsKeys.WAITING_TIME_FIELD: self._calc_avg_waiting_time( - entries - ), - self.EsKeys.QUEUE_LENGTH_FIELD: len(entries), - }, - ) + return { + EsKeys.TIMESTAMP_FIELD: timestamp, + EsKeys.QUEUE_FIELD: queue.id, + EsKeys.WAITING_TIME_FIELD: self._calc_avg_waiting_time(entries), + EsKeys.QUEUE_LENGTH_FIELD: len(entries), + } - actions = list(map(make_doc, queues)) + logged = 0 + for q in queues: + queue_doc = make_doc(q) + self.es.index(index=es_index, body=queue_doc) + redis_key = _queue_metrics_key_pattern.format(queue=q.id) + redis.set(redis_key, json.dumps(queue_doc)) + logged += 1 - es_res = elasticsearch.helpers.bulk(self.es, actions) - added, errors = es_res[:2] - return (added == len(actions)) and not errors + return logged def _log_current_metrics(self, company_id: str, queue_ids=Sequence[str]): query = dict(company=company_id) @@ -90,8 +97,7 @@ class QueueMetrics: def _search_company_metrics(self, company_id: str, es_req: dict) -> dict: return self.es.search( - index=f"{self._queue_metrics_prefix_for_company(company_id)}*", - body=es_req, + index=f"{self._queue_metrics_prefix_for_company(company_id)}*", body=es_req, ) @classmethod @@ -105,13 +111,13 @@ class QueueMetrics: return { "dates": { "date_histogram": { - "field": cls.EsKeys.TIMESTAMP_FIELD, + "field": EsKeys.TIMESTAMP_FIELD, "fixed_interval": f"{interval}s", "min_doc_count": 1, }, "aggs": { "queues": { - "terms": {"field": cls.EsKeys.QUEUE_FIELD}, + "terms": {"field": EsKeys.QUEUE_FIELD}, "aggs": cls._get_top_waiting_agg(), } }, @@ -128,13 +134,13 @@ class QueueMetrics: "top_avg_waiting": { "top_hits": { "sort": [ - {cls.EsKeys.WAITING_TIME_FIELD: {"order": "desc"}}, - {cls.EsKeys.QUEUE_LENGTH_FIELD: {"order": "desc"}}, + {EsKeys.WAITING_TIME_FIELD: {"order": "desc"}}, + {EsKeys.QUEUE_LENGTH_FIELD: {"order": "desc"}}, ], "_source": { "includes": [ - cls.EsKeys.WAITING_TIME_FIELD, - cls.EsKeys.QUEUE_LENGTH_FIELD, + EsKeys.WAITING_TIME_FIELD, + EsKeys.QUEUE_LENGTH_FIELD, ] }, "size": 1, @@ -149,6 +155,7 @@ class QueueMetrics: to_date: float, interval: int, queue_ids: Sequence[str], + refresh: bool = False, ) -> dict: """ Get the company queue metrics in the specified time range. @@ -158,7 +165,8 @@ class QueueMetrics: In case no queue ids are specified the avg across all the company queues is calculated for each metric """ - # self._log_current_metrics(company, queue_ids=queue_ids) + if refresh: + self._log_current_metrics(company_id, queue_ids=queue_ids) if from_date >= to_date: raise bad_request.FieldsValueError("from_date must be less than to_date") @@ -256,7 +264,47 @@ class QueueMetrics: continue res = queue_data["top_avg_waiting"]["hits"]["hits"][0]["_source"] queue_metrics[queue_data["key"]] = { - "queue_length": res[cls.EsKeys.QUEUE_LENGTH_FIELD], - "avg_waiting_time": res[cls.EsKeys.WAITING_TIME_FIELD], + "queue_length": res[EsKeys.QUEUE_LENGTH_FIELD], + "avg_waiting_time": res[EsKeys.WAITING_TIME_FIELD], } return queue_metrics + + +class MetricsRefresher: + threads = ThreadsManager() + + @classproperty + def watch_interval_sec(self): + return _conf.get("metrics_refresh_interval_sec", 300) + + @classmethod + @threads.register("queue_metrics_refresh_watchdog", daemon=True) + def start(cls, queue_metrics: QueueMetrics): + if not cls.watch_interval_sec: + return + + sleep(10) + while not ThreadsManager.terminating: + try: + for queue in Queue.objects(): + timestamp = es_factory.get_timestamp_millis() + doc_time = 0 + try: + redis_key = _queue_metrics_key_pattern.format(queue=queue.id) + data = redis.get(redis_key) + if data: + queue_doc = json.loads(data) + doc_time = int(queue_doc.get(EsKeys.TIMESTAMP_FIELD)) + except Exception as ex: + log.exception( + f"Error reading queue metrics data for queue {queue.id}: {str(ex)}" + ) + + if ( + not doc_time + or (timestamp - doc_time) > cls.watch_interval_sec * 1000 + ): + queue_metrics.log_queue_metrics_to_es(queue.company, [queue]) + except Exception as ex: + log.exception(f"Failed collecting queue metrics: {str(ex)}") + sleep(60) diff --git a/apiserver/config/default/services/queues.conf b/apiserver/config/default/services/queues.conf new file mode 100644 index 0000000..f2adab2 --- /dev/null +++ b/apiserver/config/default/services/queues.conf @@ -0,0 +1,5 @@ +{ + metrics_before_from_date: 3600 + # interval in seconds to update queue metrics. Put 0 to disable + metrics_refresh_interval_sec: 300 +} \ No newline at end of file diff --git a/apiserver/schema/services/queues.conf b/apiserver/schema/services/queues.conf index f6d6e16..dc5f4d0 100644 --- a/apiserver/schema/services/queues.conf +++ b/apiserver/schema/services/queues.conf @@ -634,6 +634,13 @@ get_queue_metrics : { } } } + "999.0": ${get_queue_metrics."2.4"} { + request.properties.refresh { + type: boolean + default: false + description: If set then the new queue metrics is taken + } + } } add_or_update_metadata { "2.13" { diff --git a/apiserver/services/queues.py b/apiserver/services/queues.py index 0e9dcec..a160f18 100644 --- a/apiserver/services/queues.py +++ b/apiserver/services/queues.py @@ -127,9 +127,7 @@ def add_task(call: APICall, company_id, req_model: TaskRequest): @endpoint("queues.get_next_task", request_data_model=GetNextTaskRequest) def get_next_task(call: APICall, company_id, req_model: GetNextTaskRequest): - entry = queue_bll.get_next_task( - company_id=company_id, queue_id=req_model.queue - ) + entry = queue_bll.get_next_task(company_id=company_id, queue_id=req_model.queue) if entry: data = {"entry": entry.to_proper_dict()} if req_model.get_task_info: @@ -224,14 +222,15 @@ def move_task_to_back(call: APICall, company_id, req_model: TaskRequest): response_data_model=GetMetricsResponse, ) def get_queue_metrics( - call: APICall, company_id, req_model: GetMetricsRequest + call: APICall, company_id, request: GetMetricsRequest ) -> GetMetricsResponse: ret = queue_bll.metrics.get_queue_metrics( company_id=company_id, - from_date=req_model.from_date, - to_date=req_model.to_date, - interval=req_model.interval, - queue_ids=req_model.queue_ids, + from_date=request.from_date, + to_date=request.to_date, + interval=request.interval, + queue_ids=request.queue_ids, + refresh=request.refresh, ) queue_dicts = {