From 6c6c1c3f418445c42ca7948d02c4197a497cbf2e Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Sat, 14 Dec 2019 23:33:36 +0200 Subject: [PATCH] Add server resource monitoring --- server/bll/statistics/resource_monitor.py | 87 +++++++++++++++++++++++ server/bll/task/task_bll.py | 2 +- server/bll/util.py | 7 ++ 3 files changed, 95 insertions(+), 1 deletion(-) create mode 100644 server/bll/statistics/resource_monitor.py diff --git a/server/bll/statistics/resource_monitor.py b/server/bll/statistics/resource_monitor.py new file mode 100644 index 0000000..509546c --- /dev/null +++ b/server/bll/statistics/resource_monitor.py @@ -0,0 +1,87 @@ +from datetime import datetime +import operator +from threading import Thread, Lock +from time import sleep + +import attr +import psutil + + +class ResourceMonitor(Thread): + @attr.s(auto_attribs=True) + class Sample: + cpu_usage: float = 0.0 + mem_used_gb: float = 0 + mem_free_gb: float = 0 + + @classmethod + def _apply(cls, op, *samples): + return cls( + **{ + field: op(*(getattr(sample, field) for sample in samples)) + for field in attr.fields_dict(cls) + } + ) + + def min(self, sample): + return self._apply(min, self, sample) + + def max(self, sample): + return self._apply(max, self, sample) + + def avg(self, sample, count): + res = self._apply(lambda x: x * count, self) + res = self._apply(operator.add, res, sample) + res = self._apply(lambda x: x / (count + 1), res) + return res + + def __init__(self, sample_interval_sec=5): + super(ResourceMonitor, self).__init__(daemon=True) + self.sample_interval_sec = sample_interval_sec + self._lock = Lock() + self._clear() + + def _clear(self): + sample = self._get_sample() + self._avg = sample + self._min = sample + self._max = sample + self._clear_time = datetime.utcnow() + self._count = 1 + + @classmethod + def _get_sample(cls) -> Sample: + return cls.Sample( + cpu_usage=psutil.cpu_percent(), + mem_used_gb=psutil.virtual_memory().used / (1024 ** 3), + mem_free_gb=psutil.virtual_memory().free / (1024 ** 3), + ) + + def run(self): + while True: + sample = self._get_sample() + + with self._lock: + self._min = self._min.min(sample) + self._max = self._max.max(sample) + self._avg = self._avg.avg(sample, self._count) + self._count += 1 + + sleep(self.sample_interval_sec) + + def get_stats(self) -> dict: + """ Returns current resource statistics and clears internal resource statistics """ + with self._lock: + min_ = attr.asdict(self._min) + max_ = attr.asdict(self._max) + avg = attr.asdict(self._avg) + res = { + "interval_sec": (datetime.utcnow() - self._clear_time).total_seconds(), + "num_cores": psutil.cpu_count(), + **{ + k: {"min": v, "max": max_[k], "avg": avg[k]} + for k, v in min_.items() + } + } + self._clear() + return res diff --git a/server/bll/task/task_bll.py b/server/bll/task/task_bll.py index 74ad88f..cc4790e 100644 --- a/server/bll/task/task_bll.py +++ b/server/bll/task/task_bll.py @@ -29,7 +29,7 @@ from .utils import ChangeStatusRequest, validate_status_change class TaskBLL(object): - threads = ThreadsManager() + threads = ThreadsManager("TaskBLL") def __init__(self, events_es=None): self.events_es = ( diff --git a/server/bll/util.py b/server/bll/util.py index 71b4557..77c1b4d 100644 --- a/server/bll/util.py +++ b/server/bll/util.py @@ -1,7 +1,9 @@ +import functools from operator import itemgetter from typing import Sequence, Optional, Callable, Tuple, Dict, Any, Set from database.model import AttributedDocument +from database.model.settings import Settings def extract_properties_to_lists( @@ -64,3 +66,8 @@ class SetFieldsResolver: in the format suitable for projection (dot separated) """ return set(name.replace("__", ".") for name in self.fields.values()) + + +@functools.lru_cache() +def get_server_uuid() -> Optional[str]: + return Settings.get_by_key("server.uuid")