diff --git a/apiserver/bll/queue/queue_bll.py b/apiserver/bll/queue/queue_bll.py index da88084..9e425b2 100644 --- a/apiserver/bll/queue/queue_bll.py +++ b/apiserver/bll/queue/queue_bll.py @@ -7,7 +7,7 @@ from elasticsearch import Elasticsearch from apiserver import database from apiserver.es_factory import es_factory from apiserver.apierrors import errors -from apiserver.bll.queue.queue_metrics import QueueMetrics, MetricsRefresher +from apiserver.bll.queue.queue_metrics import QueueMetrics from apiserver.bll.workers import WorkerBLL from apiserver.config_repo import config from apiserver.database.errors import translate_errors_context @@ -334,6 +334,3 @@ class QueueBLL(object): if res is None: raise errors.bad_request.InvalidQueueId(queue_id=queue_id) return int(res.get("count")) - - -MetricsRefresher.start(queue_metrics=QueueBLL().metrics) diff --git a/apiserver/bll/queue/queue_metrics.py b/apiserver/bll/queue/queue_metrics.py index 40d4d8f..27a1e35 100644 --- a/apiserver/bll/queue/queue_metrics.py +++ b/apiserver/bll/queue/queue_metrics.py @@ -279,10 +279,14 @@ class MetricsRefresher: @classmethod @threads.register("queue_metrics_refresh_watchdog", daemon=True) - def start(cls, queue_metrics: QueueMetrics): + def start(cls, queue_metrics: QueueMetrics = None): if not cls.watch_interval_sec: return + if not queue_metrics: + from .queue_bll import QueueBLL + queue_metrics = QueueBLL().metrics + sleep(10) while not ThreadsManager.terminating: try: diff --git a/apiserver/bll/statistics/resource_monitor.py b/apiserver/bll/statistics/resource_monitor.py index d7e3634..9f62986 100644 --- a/apiserver/bll/statistics/resource_monitor.py +++ b/apiserver/bll/statistics/resource_monitor.py @@ -1,6 +1,6 @@ from datetime import datetime import operator -from threading import Thread, Lock +from threading import Lock from time import sleep import attr @@ -9,76 +9,83 @@ import psutil from apiserver.utilities.threads_manager import ThreadsManager -class ResourceMonitor(Thread): - @attr.s(auto_attribs=True) - class Sample: - cpu_usage: float = 0.0 - mem_used_gb: float = 0 - mem_free_gb: float = 0 +stat_threads = ThreadsManager("Statistics") - @classmethod - def _apply(cls, op, *samples): - return cls( - **{ - field: op(*(getattr(sample, field) for sample in samples)) - for field in attr.fields_dict(cls) - } - ) - def min(self, sample): - return self._apply(min, self, sample) - - def max(self, sample): - return self._apply(max, self, sample) - - def avg(self, sample, count): - res = self._apply(lambda x: x * count, self) - res = self._apply(operator.add, res, sample) - res = self._apply(lambda x: x / (count + 1), res) - return res - - def __init__(self, sample_interval_sec=5): - super(ResourceMonitor, self).__init__(daemon=True) - self.sample_interval_sec = sample_interval_sec - self._lock = Lock() - self._clear() - - def _clear(self): - sample = self._get_sample() - self._avg = sample - self._min = sample - self._max = sample - self._clear_time = datetime.utcnow() - self._count = 1 +@attr.s(auto_attribs=True) +class Sample: + cpu_usage: float = 0.0 + mem_used_gb: float = 0 + mem_free_gb: float = 0 @classmethod - def _get_sample(cls) -> Sample: - return cls.Sample( + def _apply(cls, op, *samples): + return cls( + **{ + field: op(*(getattr(sample, field) for sample in samples)) + for field in attr.fields_dict(cls) + } + ) + + def min(self, sample): + return self._apply(min, self, sample) + + def max(self, sample): + return self._apply(max, self, sample) + + def avg(self, sample, count): + res = self._apply(lambda x: x * count, self) + res = self._apply(operator.add, res, sample) + res = self._apply(lambda x: x / (count + 1), res) + return res + + @classmethod + def get_current_sample(cls) -> "Sample": + return cls( cpu_usage=psutil.cpu_percent(), mem_used_gb=psutil.virtual_memory().used / (1024 ** 3), mem_free_gb=psutil.virtual_memory().free / (1024 ** 3), ) - def run(self): + +class ResourceMonitor: + class Accumulator: + def __init__(self): + sample = Sample.get_current_sample() + self.avg = sample + self.min = sample + self.max = sample + self.time = datetime.utcnow() + self.count = 1 + + def add_sample(self, sample: Sample): + self.min = self.min.min(sample) + self.max = self.max.max(sample) + self.avg = self.avg.avg(sample, self.count) + self.count += 1 + + sample_interval_sec = 5 + _lock = Lock() + accumulator = Accumulator() + + @classmethod + @stat_threads.register("resource_monitor", daemon=True) + def start(cls): while not ThreadsManager.terminating: - sleep(self.sample_interval_sec) + sleep(cls.sample_interval_sec) + sample = Sample.get_current_sample() + with cls._lock: + cls.accumulator.add_sample(sample) - sample = self._get_sample() - - with self._lock: - self._min = self._min.min(sample) - self._max = self._max.max(sample) - self._avg = self._avg.avg(sample, self._count) - self._count += 1 - - def get_stats(self) -> dict: + @classmethod + def get_stats(cls) -> dict: """ Returns current resource statistics and clears internal resource statistics """ - with self._lock: - min_ = attr.asdict(self._min) - max_ = attr.asdict(self._max) - avg = attr.asdict(self._avg) - interval = datetime.utcnow() - self._clear_time - self._clear() + with cls._lock: + min_ = attr.asdict(cls.accumulator.min) + max_ = attr.asdict(cls.accumulator.max) + avg = attr.asdict(cls.accumulator.avg) + interval = datetime.utcnow() - cls.accumulator.time + cls.accumulator = cls.Accumulator() return { "interval_sec": interval.total_seconds(), diff --git a/apiserver/bll/statistics/stats_reporter.py b/apiserver/bll/statistics/stats_reporter.py index e61e469..c0850e7 100644 --- a/apiserver/bll/statistics/stats_reporter.py +++ b/apiserver/bll/statistics/stats_reporter.py @@ -23,7 +23,7 @@ from apiserver.tools import safe_get from apiserver.utilities.json import dumps from apiserver.utilities.threads_manager import ThreadsManager from apiserver.version import __version__ as current_version -from .resource_monitor import ResourceMonitor +from .resource_monitor import ResourceMonitor, stat_threads log = config.logger(__file__) @@ -31,17 +31,19 @@ worker_bll = WorkerBLL() class StatisticsReporter: - threads = ThreadsManager("Statistics", resource_monitor=ResourceMonitor) send_queue = queue.Queue() supported = config.get("apiserver.statistics.supported", True) @classmethod def start(cls): + if not cls.supported: + return + ResourceMonitor.start() cls.start_sender() cls.start_reporter() @classmethod - @threads.register("reporter", daemon=True) + @stat_threads.register("reporter", daemon=True) def start_reporter(cls): """ Periodically send statistics reports for companies who have opted in. @@ -68,7 +70,7 @@ class StatisticsReporter: sleep(report_interval.total_seconds()) @classmethod - @threads.register("sender", daemon=True) + @stat_threads.register("sender", daemon=True) def start_sender(cls): if not cls.supported: return @@ -111,7 +113,7 @@ class StatisticsReporter: "uuid": get_server_uuid(), "queues": {"count": Queue.objects(company=company_id).count()}, "users": {"count": User.objects(company=company_id).count()}, - "resources": cls.threads.resource_monitor.get_stats(), + "resources": ResourceMonitor.get_stats(), "experiments": next( iter(cls._get_experiments_stats(company_id).values()), {} ), diff --git a/apiserver/server_init/app_sequence.py b/apiserver/server_init/app_sequence.py index cf14bb9..c4c7ea8 100644 --- a/apiserver/server_init/app_sequence.py +++ b/apiserver/server_init/app_sequence.py @@ -6,6 +6,8 @@ from flask_compress import Compress from flask_cors import CORS from packaging.version import Version +from apiserver.bll.queue.queue_metrics import MetricsRefresher +from apiserver.bll.task.non_responsive_tasks_watchdog import NonResponsiveTasksWatchdog from apiserver.database import db from apiserver.bll.statistics.stats_reporter import StatisticsReporter from apiserver.config import info @@ -119,6 +121,8 @@ class AppSequence: def _start_worker(self): check_updates_thread.start() StatisticsReporter.start() + MetricsRefresher.start() + NonResponsiveTasksWatchdog.start() def _on_worker_stop(self): ThreadsManager.terminating = True diff --git a/apiserver/services/tasks.py b/apiserver/services/tasks.py index 5fea6fe..ff6a068 100644 --- a/apiserver/services/tasks.py +++ b/apiserver/services/tasks.py @@ -135,8 +135,6 @@ queue_bll = QueueBLL() org_bll = OrgBLL() project_bll = ProjectBLL() -NonResponsiveTasksWatchdog.start() - def set_task_status_from_call( request: UpdateRequest, company_id, new_status=None, **set_fields diff --git a/apiserver/utilities/threads_manager.py b/apiserver/utilities/threads_manager.py index a9b052b..68d0798 100644 --- a/apiserver/utilities/threads_manager.py +++ b/apiserver/utilities/threads_manager.py @@ -9,22 +9,8 @@ class ThreadsManager: request_context_creator: ClassVar[Callable] = None terminating: ClassVar[bool] = False - def __init__(self, name=None, **threads): + def __init__(self, name=None): self.name = name or self.__class__.__name__ - self.objects = {} - self.lock = Lock() - - for thread_name, thread in threads.items(): - if issubclass(thread, Thread): - thread = thread() - thread.start() - elif isinstance(thread, Thread): - if not thread.is_alive(): - thread.start() - else: - raise Exception(f"Expected thread or thread class ({thread_name}): {thread}") - - self.objects[thread_name] = thread def register(self, thread_name, daemon=True): def decorator(f):