Bugfix: Broken Task._get_status()

Avoid creating an ephemeral object on every status call (we call them a lot)
This commit is contained in:
Alex Burlacu 2023-03-08 21:39:16 +02:00 committed by Alex Burlacu
parent fda5095ab2
commit 4a928addb7
3 changed files with 50 additions and 17 deletions

View File

@ -194,16 +194,17 @@ class BaseJob(object):
have_job_with_no_status = True have_job_with_no_status = True
if not id_map or (time() - cls._last_batch_status_update_ts < 1 and not have_job_with_no_status): if not id_map or (time() - cls._last_batch_status_update_ts < 1 and not have_job_with_no_status):
return return
batch_status = Task._get_status(list(id_map.keys())) # noinspection PyProtectedMember
batch_status = Task._get_tasks_status(list(id_map.keys()))
last_batch_update_ts = time() last_batch_update_ts = time()
cls._last_batch_status_update_ts = last_batch_update_ts cls._last_batch_status_update_ts = last_batch_update_ts
for status in batch_status: for status, message, task_id in batch_status:
if not status.status or not status.id: if not status or not task_id:
continue continue
# noinspection PyProtectedMember # noinspection PyProtectedMember
id_map[status.id]._last_status = status.status id_map[task_id]._last_status = status
# noinspection PyProtectedMember # noinspection PyProtectedMember
id_map[status.id]._last_status_ts = last_batch_update_ts id_map[task_id]._last_status_ts = last_batch_update_ts
def wait(self, timeout=None, pool_period=30., aborted_nonresponsive_as_running=False): def wait(self, timeout=None, pool_period=30., aborted_nonresponsive_as_running=False):
# type: (Optional[float], float, bool) -> bool # type: (Optional[float], float, bool) -> bool

View File

@ -30,7 +30,9 @@ class TaskStopSignal(object):
try: try:
# we use internal status read, so we do not need to constantly pull the entire task object, # we use internal status read, so we do not need to constantly pull the entire task object,
# it might be large, and we want to also avoid the edit lock on it. # it might be large, and we want to also avoid the edit lock on it.
status, message, _ = self.task._get_status([self.task.id])[0] # do not update the task object so that other threads can count on consistency in data structure
# because we are running this function in background thread
status, message = self.task._get_status()
# if we did not get a proper status, return and recheck later # if we did not get a proper status, return and recheck later
if status is None and message is None: if status is None and message is None:
return None return None

View File

@ -14,7 +14,6 @@ from operator import itemgetter
from tempfile import gettempdir from tempfile import gettempdir
from threading import Thread from threading import Thread
from typing import Optional, Any, Sequence, Callable, Mapping, Union, List, Set, Dict from typing import Optional, Any, Sequence, Callable, Mapping, Union, List, Set, Dict
from collections import namedtuple
from uuid import uuid4 from uuid import uuid4
from pathlib2 import Path from pathlib2 import Path
@ -1882,10 +1881,7 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
:return: str: Task status as string (TaskStatusEnum) :return: str: Task status as string (TaskStatusEnum)
""" """
status, status_message, _ = self._get_status([self.id])[0] status, status_message = self.get_status_message()
if self._data:
self._data.status = status
self._data.status_message = str(status_message)
return str(status) return str(status)
@ -2277,21 +2273,55 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
self._files_server = Session.get_files_server_host() self._files_server = Session.get_files_server_host()
return self._files_server return self._files_server
def get_status_message(self):
# type: () -> (Optional[str], Optional[str])
"""
Return The task status without refreshing the entire Task object (only the status property)
Return also the last message coupled with the status change
Task Status options: ["created", "in_progress", "stopped", "closed", "failed", "completed",
"queued", "published", "publishing", "unknown"]
Message: is a string
:return: (Task status as string, last message)
"""
status, status_message, _ = self._get_tasks_status([self.id])[0]
if self._data and status:
self._data.status = status
self._data.status_message = status_message
return status, status_message
def _get_status(self):
# type: () -> (Optional[str], Optional[str])
"""
retrieve Task status & message, But do not update the Task local status
this is important if we want to query in the background without breaking Tasks consistency
backwards compatibility,
:return: (status enum as string or None, str or None)
"""
status, status_message, _ = self._get_tasks_status([self.id])[0]
return status, status_message
@classmethod @classmethod
def _get_status(cls, ids): def _get_tasks_status(cls, ids):
# type: (List[str]) -> List[namedtuple[Optional[str], Optional[str], Optional[str]]] # type: (List[str]) -> List[(Optional[str], Optional[str], Optional[str])]
"""
:param ids: task IDs (str) to query
:return: list of tuples (status, status_message, task_id)
"""
if cls._offline_mode: if cls._offline_mode:
return [(tasks.TaskStatusEnum.created, "offline") for _ in ids] return [(tasks.TaskStatusEnum.created, "offline", i) for i in ids]
Status = namedtuple("Status", ["status", "status_message", "id"])
# noinspection PyBroadException # noinspection PyBroadException
try: try:
all_tasks = cls._get_default_session().send( all_tasks = cls._get_default_session().send(
tasks.GetAllRequest(id=ids, only_fields=["status", "status_message", "id"]), tasks.GetAllRequest(id=ids, only_fields=["status", "status_message", "id"]),
).response.tasks ).response.tasks
return [Status(task.status, task.status_message, task.id) for task in all_tasks] return [(task.status, task.status_message, task.id) for task in all_tasks]
except Exception: except Exception:
return [Status(None, None, None) for _ in ids] return [(None, None, None) for _ in ids]
def _get_last_update(self): def _get_last_update(self):
# type: () -> (Optional[datetime]) # type: () -> (Optional[datetime])