Fix threaded jobs management (invoke only from AppSequence)

This commit is contained in:
allegroai 2022-09-29 19:13:22 +03:00
parent 4dff163af4
commit efd56e085e
7 changed files with 84 additions and 86 deletions

View File

@ -7,7 +7,7 @@ from elasticsearch import Elasticsearch
from apiserver import database
from apiserver.es_factory import es_factory
from apiserver.apierrors import errors
from apiserver.bll.queue.queue_metrics import QueueMetrics, MetricsRefresher
from apiserver.bll.queue.queue_metrics import QueueMetrics
from apiserver.bll.workers import WorkerBLL
from apiserver.config_repo import config
from apiserver.database.errors import translate_errors_context
@ -334,6 +334,3 @@ class QueueBLL(object):
if res is None:
raise errors.bad_request.InvalidQueueId(queue_id=queue_id)
return int(res.get("count"))
MetricsRefresher.start(queue_metrics=QueueBLL().metrics)

View File

@ -279,10 +279,14 @@ class MetricsRefresher:
@classmethod
@threads.register("queue_metrics_refresh_watchdog", daemon=True)
def start(cls, queue_metrics: QueueMetrics):
def start(cls, queue_metrics: QueueMetrics = None):
if not cls.watch_interval_sec:
return
if not queue_metrics:
from .queue_bll import QueueBLL
queue_metrics = QueueBLL().metrics
sleep(10)
while not ThreadsManager.terminating:
try:

View File

@ -1,6 +1,6 @@
from datetime import datetime
import operator
from threading import Thread, Lock
from threading import Lock
from time import sleep
import attr
@ -9,76 +9,83 @@ import psutil
from apiserver.utilities.threads_manager import ThreadsManager
class ResourceMonitor(Thread):
@attr.s(auto_attribs=True)
class Sample:
cpu_usage: float = 0.0
mem_used_gb: float = 0
mem_free_gb: float = 0
stat_threads = ThreadsManager("Statistics")
@classmethod
def _apply(cls, op, *samples):
return cls(
**{
field: op(*(getattr(sample, field) for sample in samples))
for field in attr.fields_dict(cls)
}
)
def min(self, sample):
return self._apply(min, self, sample)
def max(self, sample):
return self._apply(max, self, sample)
def avg(self, sample, count):
res = self._apply(lambda x: x * count, self)
res = self._apply(operator.add, res, sample)
res = self._apply(lambda x: x / (count + 1), res)
return res
def __init__(self, sample_interval_sec=5):
super(ResourceMonitor, self).__init__(daemon=True)
self.sample_interval_sec = sample_interval_sec
self._lock = Lock()
self._clear()
def _clear(self):
sample = self._get_sample()
self._avg = sample
self._min = sample
self._max = sample
self._clear_time = datetime.utcnow()
self._count = 1
@attr.s(auto_attribs=True)
class Sample:
cpu_usage: float = 0.0
mem_used_gb: float = 0
mem_free_gb: float = 0
@classmethod
def _get_sample(cls) -> Sample:
return cls.Sample(
def _apply(cls, op, *samples):
return cls(
**{
field: op(*(getattr(sample, field) for sample in samples))
for field in attr.fields_dict(cls)
}
)
def min(self, sample):
return self._apply(min, self, sample)
def max(self, sample):
return self._apply(max, self, sample)
def avg(self, sample, count):
res = self._apply(lambda x: x * count, self)
res = self._apply(operator.add, res, sample)
res = self._apply(lambda x: x / (count + 1), res)
return res
@classmethod
def get_current_sample(cls) -> "Sample":
return cls(
cpu_usage=psutil.cpu_percent(),
mem_used_gb=psutil.virtual_memory().used / (1024 ** 3),
mem_free_gb=psutil.virtual_memory().free / (1024 ** 3),
)
def run(self):
class ResourceMonitor:
class Accumulator:
def __init__(self):
sample = Sample.get_current_sample()
self.avg = sample
self.min = sample
self.max = sample
self.time = datetime.utcnow()
self.count = 1
def add_sample(self, sample: Sample):
self.min = self.min.min(sample)
self.max = self.max.max(sample)
self.avg = self.avg.avg(sample, self.count)
self.count += 1
sample_interval_sec = 5
_lock = Lock()
accumulator = Accumulator()
@classmethod
@stat_threads.register("resource_monitor", daemon=True)
def start(cls):
while not ThreadsManager.terminating:
sleep(self.sample_interval_sec)
sleep(cls.sample_interval_sec)
sample = Sample.get_current_sample()
with cls._lock:
cls.accumulator.add_sample(sample)
sample = self._get_sample()
with self._lock:
self._min = self._min.min(sample)
self._max = self._max.max(sample)
self._avg = self._avg.avg(sample, self._count)
self._count += 1
def get_stats(self) -> dict:
@classmethod
def get_stats(cls) -> dict:
""" Returns current resource statistics and clears internal resource statistics """
with self._lock:
min_ = attr.asdict(self._min)
max_ = attr.asdict(self._max)
avg = attr.asdict(self._avg)
interval = datetime.utcnow() - self._clear_time
self._clear()
with cls._lock:
min_ = attr.asdict(cls.accumulator.min)
max_ = attr.asdict(cls.accumulator.max)
avg = attr.asdict(cls.accumulator.avg)
interval = datetime.utcnow() - cls.accumulator.time
cls.accumulator = cls.Accumulator()
return {
"interval_sec": interval.total_seconds(),

View File

@ -23,7 +23,7 @@ from apiserver.tools import safe_get
from apiserver.utilities.json import dumps
from apiserver.utilities.threads_manager import ThreadsManager
from apiserver.version import __version__ as current_version
from .resource_monitor import ResourceMonitor
from .resource_monitor import ResourceMonitor, stat_threads
log = config.logger(__file__)
@ -31,17 +31,19 @@ worker_bll = WorkerBLL()
class StatisticsReporter:
threads = ThreadsManager("Statistics", resource_monitor=ResourceMonitor)
send_queue = queue.Queue()
supported = config.get("apiserver.statistics.supported", True)
@classmethod
def start(cls):
if not cls.supported:
return
ResourceMonitor.start()
cls.start_sender()
cls.start_reporter()
@classmethod
@threads.register("reporter", daemon=True)
@stat_threads.register("reporter", daemon=True)
def start_reporter(cls):
"""
Periodically send statistics reports for companies who have opted in.
@ -68,7 +70,7 @@ class StatisticsReporter:
sleep(report_interval.total_seconds())
@classmethod
@threads.register("sender", daemon=True)
@stat_threads.register("sender", daemon=True)
def start_sender(cls):
if not cls.supported:
return
@ -111,7 +113,7 @@ class StatisticsReporter:
"uuid": get_server_uuid(),
"queues": {"count": Queue.objects(company=company_id).count()},
"users": {"count": User.objects(company=company_id).count()},
"resources": cls.threads.resource_monitor.get_stats(),
"resources": ResourceMonitor.get_stats(),
"experiments": next(
iter(cls._get_experiments_stats(company_id).values()), {}
),

View File

@ -6,6 +6,8 @@ from flask_compress import Compress
from flask_cors import CORS
from packaging.version import Version
from apiserver.bll.queue.queue_metrics import MetricsRefresher
from apiserver.bll.task.non_responsive_tasks_watchdog import NonResponsiveTasksWatchdog
from apiserver.database import db
from apiserver.bll.statistics.stats_reporter import StatisticsReporter
from apiserver.config import info
@ -119,6 +121,8 @@ class AppSequence:
def _start_worker(self):
check_updates_thread.start()
StatisticsReporter.start()
MetricsRefresher.start()
NonResponsiveTasksWatchdog.start()
def _on_worker_stop(self):
ThreadsManager.terminating = True

View File

@ -135,8 +135,6 @@ queue_bll = QueueBLL()
org_bll = OrgBLL()
project_bll = ProjectBLL()
NonResponsiveTasksWatchdog.start()
def set_task_status_from_call(
request: UpdateRequest, company_id, new_status=None, **set_fields

View File

@ -9,22 +9,8 @@ class ThreadsManager:
request_context_creator: ClassVar[Callable] = None
terminating: ClassVar[bool] = False
def __init__(self, name=None, **threads):
def __init__(self, name=None):
self.name = name or self.__class__.__name__
self.objects = {}
self.lock = Lock()
for thread_name, thread in threads.items():
if issubclass(thread, Thread):
thread = thread()
thread.start()
elif isinstance(thread, Thread):
if not thread.is_alive():
thread.start()
else:
raise Exception(f"Expected thread or thread class ({thread_name}): {thread}")
self.objects[thread_name] = thread
def register(self, thread_name, daemon=True):
def decorator(f):