Improve server threads shutdown on SIGTERM

This commit is contained in:
allegroai 2019-12-29 09:04:07 +02:00
parent 87d2b6fa15
commit ed910d5f6a
6 changed files with 38 additions and 26 deletions

View File

@ -6,6 +6,8 @@ from time import sleep
import attr import attr
import psutil import psutil
from utilities.threads_manager import ThreadsManager
class ResourceMonitor(Thread): class ResourceMonitor(Thread):
@attr.s(auto_attribs=True) @attr.s(auto_attribs=True)
@ -58,7 +60,7 @@ class ResourceMonitor(Thread):
) )
def run(self): def run(self):
while True: while not ThreadsManager.terminating:
sleep(self.sample_interval_sec) sleep(self.sample_interval_sec)
sample = self._get_sample() sample = self._get_sample()

View File

@ -53,11 +53,8 @@ class StatisticsReporter:
report_interval = timedelta( report_interval = timedelta(
hours=config.get("apiserver.statistics.report_interval_hours", 24) hours=config.get("apiserver.statistics.report_interval_hours", 24)
) )
sleep(report_interval.total_seconds())
while True: while not ThreadsManager.terminating:
sleep(report_interval.total_seconds())
try: try:
for company in Company.objects( for company in Company.objects(
defaults__stats_option__enabled=True defaults__stats_option__enabled=True
@ -68,6 +65,8 @@ class StatisticsReporter:
except Exception as ex: except Exception as ex:
log.exception(f"Failed collecting stats: {str(ex)}") log.exception(f"Failed collecting stats: {str(ex)}")
sleep(report_interval.total_seconds())
@classmethod @classmethod
@threads.register("sender", daemon=True) @threads.register("sender", daemon=True)
def start_sender(cls): def start_sender(cls):
@ -86,7 +85,7 @@ class StatisticsReporter:
WarningFilter.attach() WarningFilter.attach()
while True: while not ThreadsManager.terminating:
try: try:
report = cls.send_queue.get() report = cls.send_queue.get()

View File

@ -569,13 +569,11 @@ class TaskBLL(object):
"services.tasks.non_responsive_tasks_watchdog.threshold_sec", 7200 "services.tasks.non_responsive_tasks_watchdog.threshold_sec", 7200
) )
) )
while True: watch_interval = config.get(
sleep( "services.tasks.non_responsive_tasks_watchdog.watch_interval_sec", 900
config.get( )
"services.tasks.non_responsive_tasks_watchdog.watch_interval_sec", sleep(watch_interval)
900, while not ThreadsManager.terminating:
)
)
try: try:
ref_time = datetime.utcnow() - threshold ref_time = datetime.utcnow() - threshold
@ -611,6 +609,8 @@ class TaskBLL(object):
except Exception as ex: except Exception as ex:
log.exception(f"Failed stopping tasks: {str(ex)}") log.exception(f"Failed stopping tasks: {str(ex)}")
sleep(watch_interval)
@staticmethod @staticmethod
def get_aggregated_project_execution_parameters( def get_aggregated_project_execution_parameters(
company_id, company_id,

View File

@ -1,3 +1,4 @@
import atexit
from argparse import ArgumentParser from argparse import ArgumentParser
from flask import Flask, request, Response from flask import Flask, request, Response
@ -16,6 +17,7 @@ from service_repo.errors import PathParsingError
from timing_context import TimingContext from timing_context import TimingContext
from updates import check_updates_thread from updates import check_updates_thread
from utilities import json from utilities import json
from utilities.threads_manager import ThreadsManager
app = Flask(__name__, static_url_path="/static") app = Flask(__name__, static_url_path="/static")
CORS(app, **config.get("apiserver.cors")) CORS(app, **config.get("apiserver.cors"))
@ -41,6 +43,13 @@ check_updates_thread.start()
StatisticsReporter.start() StatisticsReporter.start()
def graceful_shutdown():
ThreadsManager.terminating = True
atexit.register(graceful_shutdown)
@app.before_first_request @app.before_first_request
def before_app_first_request(): def before_app_first_request():
pass pass

View File

@ -10,6 +10,7 @@ from semantic_version import Version
from config import config from config import config
from config.info import get_version from config.info import get_version
from database.model.settings import Settings from database.model.settings import Settings
from utilities.threads_manager import ThreadsManager
log = config.logger(__name__) log = config.logger(__name__)
@ -80,7 +81,16 @@ class CheckUpdatesThread(Thread):
) )
def _check_updates(self): def _check_updates(self):
while True: update_interval_sec = max(
float(
config.get(
"apiserver.check_for_updates.check_interval_sec",
60 * 60 * 24,
)
),
60 * 5,
)
while not ThreadsManager.terminating:
# noinspection PyBroadException # noinspection PyBroadException
try: try:
response = self._check_new_version_available() response = self._check_new_version_available()
@ -98,17 +108,7 @@ class CheckUpdatesThread(Thread):
except Exception: except Exception:
log.exception("Failed obtaining updates") log.exception("Failed obtaining updates")
sleep( sleep(update_interval_sec)
max(
float(
config.get(
"apiserver.check_for_updates.check_interval_sec",
60 * 60 * 24,
)
),
60 * 5,
)
)
check_updates_thread = CheckUpdatesThread() check_updates_thread = CheckUpdatesThread()

View File

@ -1,10 +1,12 @@
from functools import wraps from functools import wraps
from threading import Lock, Thread from threading import Lock, Thread
from typing import ClassVar
class ThreadsManager: class ThreadsManager:
objects = {} objects = {}
lock = Lock() lock = Lock()
terminating: ClassVar[bool] = False
def __init__(self, name=None, **threads): def __init__(self, name=None, **threads):
super(ThreadsManager, self).__init__() super(ThreadsManager, self).__init__()