diff --git a/server/bll/task/non_responsive_tasks_watchdog.py b/server/bll/task/non_responsive_tasks_watchdog.py new file mode 100644 index 0000000..b0c2b4a --- /dev/null +++ b/server/bll/task/non_responsive_tasks_watchdog.py @@ -0,0 +1,89 @@ +from datetime import timedelta, datetime +from time import sleep + +from apierrors import errors +from bll.task import ChangeStatusRequest +from config import config +from database.model.task.task import TaskStatus, Task +from utilities.threads_manager import ThreadsManager + +log = config.logger(__file__) + + +class NonResponsiveTasksWatchdog: + threads = ThreadsManager() + + class _Settings: + """ + Retrieves watchdog settings from the config file + The properties are not cached so that the updates in + the config file are reflected + """ + + _prefix = "services.tasks.non_responsive_tasks_watchdog" + + @property + def enabled(self): + return config.get(f"{self._prefix}.enabled", True) + + @property + def watch_interval_sec(self): + return config.get(f"{self._prefix}.watch_interval_sec", 900) + + @property + def threshold_sec(self): + return config.get(f"{self._prefix}.threshold_sec", 7200) + + settings = _Settings() + + @classmethod + @threads.register("non_responsive_tasks_watchdog", daemon=True) + def start(cls): + sleep(cls.settings.watch_interval_sec) + while not ThreadsManager.terminating: + watch_interval = cls.settings.watch_interval_sec + if cls.settings.enabled: + try: + stopped = cls.cleanup_tasks( + threshold_sec=cls.settings.threshold_sec + ) + log.info(f"{stopped} non-responsive tasks stopped") + except Exception as ex: + log.exception(f"Failed stopping tasks: {str(ex)}") + sleep(watch_interval) + + @classmethod + def cleanup_tasks(cls, threshold_sec): + relevant_status = (TaskStatus.in_progress,) + threshold = timedelta(seconds=threshold_sec) + ref_time = datetime.utcnow() - threshold + log.info( + f"Starting cleanup cycle for running tasks last updated before {ref_time}" + ) + + tasks = list( + Task.objects(status__in=relevant_status, last_update__lt=ref_time).only( + "id", "name", "status", "project", "last_update" + ) + ) + log.info(f"{len(tasks)} non-responsive tasks found") + if not tasks: + return 0 + + err_count = 0 + for task in tasks: + log.info( + f"Stopping {task.id} ({task.name}), last updated at {task.last_update}" + ) + try: + ChangeStatusRequest( + task=task, + new_status=TaskStatus.stopped, + status_reason="Forced stop (non-responsive)", + status_message="Forced stop (non-responsive)", + force=True, + ).execute() + except errors.bad_request.FailedChangingTaskStatus: + err_count += 1 + + return len(tasks) - err_count diff --git a/server/bll/task/task_bll.py b/server/bll/task/task_bll.py index 8d83e8d..f7896d4 100644 --- a/server/bll/task/task_bll.py +++ b/server/bll/task/task_bll.py @@ -1,5 +1,5 @@ from collections import OrderedDict -from datetime import datetime, timedelta +from datetime import datetime from operator import attrgetter from random import random from time import sleep @@ -32,15 +32,12 @@ from database.utils import get_company_or_none_constraint, id as create_id from service_repo import APICall from timing_context import TimingContext from utilities.dicts import deep_merge -from utilities.threads_manager import ThreadsManager from .utils import ChangeStatusRequest, validate_status_change, ParameterKeyEscaper log = config.logger(__file__) class TaskBLL(object): - threads = ThreadsManager("TaskBLL") - def __init__(self, events_es=None): self.events_es = ( events_es if events_es is not None else es_factory.connect("events") @@ -575,58 +572,6 @@ class TaskBLL(object): return [a.key for a in added], [a.key for a in updated] - @classmethod - @threads.register("non_responsive_tasks_watchdog", daemon=True) - def start_non_responsive_tasks_watchdog(cls): - log = config.logger("non_responsive_tasks_watchdog") - relevant_status = (TaskStatus.in_progress,) - threshold = timedelta( - seconds=config.get( - "services.tasks.non_responsive_tasks_watchdog.threshold_sec", 7200 - ) - ) - watch_interval = config.get( - "services.tasks.non_responsive_tasks_watchdog.watch_interval_sec", 900 - ) - sleep(watch_interval) - while not ThreadsManager.terminating: - try: - - ref_time = datetime.utcnow() - threshold - - log.info( - f"Starting cleanup cycle for running tasks last updated before {ref_time}" - ) - - tasks = list( - Task.objects( - status__in=relevant_status, last_update__lt=ref_time - ).only("id", "name", "status", "project", "last_update") - ) - - if tasks: - - log.info(f"Stopping {len(tasks)} non-responsive tasks") - - for task in tasks: - log.info( - f"Stopping {task.id} ({task.name}), last updated at {task.last_update}" - ) - ChangeStatusRequest( - task=task, - new_status=TaskStatus.stopped, - status_reason="Forced stop (non-responsive)", - status_message="Forced stop (non-responsive)", - force=True, - ).execute() - - log.info(f"Done") - - except Exception as ex: - log.exception(f"Failed stopping tasks: {str(ex)}") - - sleep(watch_interval) - @staticmethod def get_aggregated_project_execution_parameters( company_id, diff --git a/server/config/default/services/tasks.conf b/server/config/default/services/tasks.conf index 2e25c83..63936f9 100644 --- a/server/config/default/services/tasks.conf +++ b/server/config/default/services/tasks.conf @@ -1,4 +1,6 @@ non_responsive_tasks_watchdog { + enabled: true + # In-progress tasks older than this value in seconds will be stopped by the watchdog threshold_sec: 7200 diff --git a/server/services/tasks.py b/server/services/tasks.py index a2bef35..1ac0475 100644 --- a/server/services/tasks.py +++ b/server/services/tasks.py @@ -39,6 +39,7 @@ from bll.task import ( split_by, ParameterKeyEscaper, ) +from bll.task.non_responsive_tasks_watchdog import NonResponsiveTasksWatchdog from bll.util import SetFieldsResolver from database.errors import translate_errors_context from database.model.model import Model @@ -70,7 +71,7 @@ event_bll = EventBLL() queue_bll = QueueBLL() -TaskBLL.start_non_responsive_tasks_watchdog() +NonResponsiveTasksWatchdog.start() def set_task_status_from_call( diff --git a/server/utilities/threads_manager.py b/server/utilities/threads_manager.py index 094bdb6..1c5f604 100644 --- a/server/utilities/threads_manager.py +++ b/server/utilities/threads_manager.py @@ -10,7 +10,7 @@ class ThreadsManager: def __init__(self, name=None, **threads): super(ThreadsManager, self).__init__() - self.name = name or self.__class__.name + self.name = name or self.__class__.__name__ self.objects = {} self.lock = Lock()