Add missing completed status

This commit is contained in:
allegroai
2019-07-08 23:59:54 +03:00
parent 30c8be79b5
commit 18392ad2fd
3 changed files with 77 additions and 14 deletions

View File

@@ -1,6 +1,7 @@
import re
from collections import OrderedDict
from datetime import datetime
from datetime import datetime, timedelta
from time import sleep
from typing import Mapping, Collection
from urllib.parse import urlparse
@@ -10,6 +11,7 @@ from six import string_types
import es_factory
from apierrors import errors
from config import config
from database.errors import translate_errors_context
from database.fields import OutputDestinationField
from database.model.model import Model
@@ -20,10 +22,13 @@ from database.model.task.task import Task, TaskStatus, TaskStatusMessage, TaskTa
from database.utils import get_company_or_none_constraint, id as create_id
from service_repo import APICall
from timing_context import TimingContext
from utilities.threads_manager import ThreadsManager
from .utils import ChangeStatusRequest, validate_status_change
class TaskBLL(object):
threads = ThreadsManager()
def __init__(self, events_es=None):
self.events_es = (
events_es if events_es is not None else es_factory.connect("events")
@@ -391,3 +396,55 @@ class TaskBLL(object):
status_message=status_message,
force=force,
).execute()
@classmethod
@threads.register("non_responsive_tasks_watchdog", daemon=True)
def start_non_responsive_tasks_watchdog(cls):
log = config.logger("non_responsive_tasks_watchdog")
relevant_status = (TaskStatus.in_progress,)
threshold = timedelta(
seconds=config.get(
"services.tasks.non_responsive_tasks_watchdog.threshold_sec", 7200
)
)
while True:
sleep(
config.get(
"services.tasks.non_responsive_tasks_watchdog.watch_interval_sec",
900,
)
)
try:
ref_time = datetime.utcnow() - threshold
log.info(
f"Starting cleanup cycle for running tasks last updated before {ref_time}"
)
tasks = list(
Task.objects(
status__in=relevant_status, last_update__lt=ref_time
).only("id", "name", "status", "project", "last_update")
)
if tasks:
log.info(f"Stopping {len(tasks)} non-responsive tasks")
for task in tasks:
log.info(
f"Stopping {task.id} ({task.name}), last updated at {task.last_update}"
)
ChangeStatusRequest(
task=task,
new_status=TaskStatus.stopped,
status_reason="Forced stop (non-responsive)",
status_message="Forced stop (non-responsive)",
force=True,
).execute()
log.info(f"Done")
except Exception as ex:
log.exception(f"Failed stopping tasks: {str(ex)}")

View File

@@ -96,7 +96,12 @@ def validate_status_change(current_status, new_status):
state_machine = {
TaskStatus.created: {TaskStatus.in_progress},
TaskStatus.in_progress: {TaskStatus.stopped, TaskStatus.failed, TaskStatus.created},
TaskStatus.in_progress: {
TaskStatus.stopped,
TaskStatus.failed,
TaskStatus.created,
TaskStatus.completed,
},
TaskStatus.stopped: {
TaskStatus.closed,
TaskStatus.created,
@@ -104,6 +109,7 @@ state_machine = {
TaskStatus.in_progress,
TaskStatus.published,
TaskStatus.publishing,
TaskStatus.completed,
},
TaskStatus.closed: {
TaskStatus.created,
@@ -115,6 +121,11 @@ state_machine = {
TaskStatus.failed: {TaskStatus.created, TaskStatus.stopped, TaskStatus.published},
TaskStatus.publishing: {TaskStatus.published},
TaskStatus.published: set(),
TaskStatus.completed: {
TaskStatus.published,
TaskStatus.in_progress,
TaskStatus.created,
}
}