From 18392ad2fd454901078a9570320a1d8428e607d7 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Mon, 8 Jul 2019 23:59:54 +0300 Subject: [PATCH] Add missing completed status --- server/bll/auth/__init__.py | 19 +++++------- server/bll/task/task_bll.py | 59 ++++++++++++++++++++++++++++++++++++- server/bll/task/utils.py | 13 +++++++- 3 files changed, 77 insertions(+), 14 deletions(-) diff --git a/server/bll/auth/__init__.py b/server/bll/auth/__init__.py index 1b18f2d..f83cb8d 100644 --- a/server/bll/auth/__init__.py +++ b/server/bll/auth/__init__.py @@ -2,24 +2,16 @@ from datetime import datetime import database from apierrors import errors -from apimodels.auth import ( - GetTokenResponse, - CreateUserRequest, - Credentials as CredModel, -) +from apimodels.auth import GetTokenResponse, CreateUserRequest, Credentials as CredModel from apimodels.users import CreateRequest as Users_CreateRequest from bll.user import UserBLL from config import config +from config.info import get_version, get_build_number from database.errors import translate_errors_context from database.model.auth import User, Role, Credentials from database.model.company import Company -from service_repo import APICall -from service_repo.auth import ( - Identity, - Token, - get_client_id, - get_secret_key, -) +from service_repo import APICall, ServiceRepo +from service_repo.auth import Identity, Token, get_client_id, get_secret_key log = config.logger("AuthBLL") @@ -62,6 +54,9 @@ class AuthBLL: identity=identity, entities=entities, expiration_sec=expiration_sec, + api_version=str(ServiceRepo.max_endpoint_version()), + server_version=str(get_version()), + server_build=str(get_build_number()), ) return GetTokenResponse(token=token.decode("ascii")) diff --git a/server/bll/task/task_bll.py b/server/bll/task/task_bll.py index c0bba5d..40f21f8 100644 --- a/server/bll/task/task_bll.py +++ b/server/bll/task/task_bll.py @@ -1,6 +1,7 @@ import re from collections import OrderedDict -from datetime import datetime +from datetime import datetime, timedelta +from time import sleep from typing import Mapping, Collection from urllib.parse import urlparse @@ -10,6 +11,7 @@ from six import string_types import es_factory from apierrors import errors +from config import config from database.errors import translate_errors_context from database.fields import OutputDestinationField from database.model.model import Model @@ -20,10 +22,13 @@ from database.model.task.task import Task, TaskStatus, TaskStatusMessage, TaskTa from database.utils import get_company_or_none_constraint, id as create_id from service_repo import APICall from timing_context import TimingContext +from utilities.threads_manager import ThreadsManager from .utils import ChangeStatusRequest, validate_status_change class TaskBLL(object): + threads = ThreadsManager() + def __init__(self, events_es=None): self.events_es = ( events_es if events_es is not None else es_factory.connect("events") @@ -391,3 +396,55 @@ class TaskBLL(object): status_message=status_message, force=force, ).execute() + + @classmethod + @threads.register("non_responsive_tasks_watchdog", daemon=True) + def start_non_responsive_tasks_watchdog(cls): + log = config.logger("non_responsive_tasks_watchdog") + relevant_status = (TaskStatus.in_progress,) + threshold = timedelta( + seconds=config.get( + "services.tasks.non_responsive_tasks_watchdog.threshold_sec", 7200 + ) + ) + while True: + sleep( + config.get( + "services.tasks.non_responsive_tasks_watchdog.watch_interval_sec", + 900, + ) + ) + try: + + ref_time = datetime.utcnow() - threshold + + log.info( + f"Starting cleanup cycle for running tasks last updated before {ref_time}" + ) + + tasks = list( + Task.objects( + status__in=relevant_status, last_update__lt=ref_time + ).only("id", "name", "status", "project", "last_update") + ) + + if tasks: + + log.info(f"Stopping {len(tasks)} non-responsive tasks") + + for task in tasks: + log.info( + f"Stopping {task.id} ({task.name}), last updated at {task.last_update}" + ) + ChangeStatusRequest( + task=task, + new_status=TaskStatus.stopped, + status_reason="Forced stop (non-responsive)", + status_message="Forced stop (non-responsive)", + force=True, + ).execute() + + log.info(f"Done") + + except Exception as ex: + log.exception(f"Failed stopping tasks: {str(ex)}") diff --git a/server/bll/task/utils.py b/server/bll/task/utils.py index 5fca3a0..56f3091 100644 --- a/server/bll/task/utils.py +++ b/server/bll/task/utils.py @@ -96,7 +96,12 @@ def validate_status_change(current_status, new_status): state_machine = { TaskStatus.created: {TaskStatus.in_progress}, - TaskStatus.in_progress: {TaskStatus.stopped, TaskStatus.failed, TaskStatus.created}, + TaskStatus.in_progress: { + TaskStatus.stopped, + TaskStatus.failed, + TaskStatus.created, + TaskStatus.completed, + }, TaskStatus.stopped: { TaskStatus.closed, TaskStatus.created, @@ -104,6 +109,7 @@ state_machine = { TaskStatus.in_progress, TaskStatus.published, TaskStatus.publishing, + TaskStatus.completed, }, TaskStatus.closed: { TaskStatus.created, @@ -115,6 +121,11 @@ state_machine = { TaskStatus.failed: {TaskStatus.created, TaskStatus.stopped, TaskStatus.published}, TaskStatus.publishing: {TaskStatus.published}, TaskStatus.published: set(), + TaskStatus.completed: { + TaskStatus.published, + TaskStatus.in_progress, + TaskStatus.created, + } }