diff --git a/clearml/automation/job.py b/clearml/automation/job.py index d24d0ec2..a0547db1 100644 --- a/clearml/automation/job.py +++ b/clearml/automation/job.py @@ -194,16 +194,17 @@ class BaseJob(object): have_job_with_no_status = True if not id_map or (time() - cls._last_batch_status_update_ts < 1 and not have_job_with_no_status): return - batch_status = Task._get_status(list(id_map.keys())) + # noinspection PyProtectedMember + batch_status = Task._get_tasks_status(list(id_map.keys())) last_batch_update_ts = time() cls._last_batch_status_update_ts = last_batch_update_ts - for status in batch_status: - if not status.status or not status.id: + for status, message, task_id in batch_status: + if not status or not task_id: continue # noinspection PyProtectedMember - id_map[status.id]._last_status = status.status + id_map[task_id]._last_status = status # noinspection PyProtectedMember - id_map[status.id]._last_status_ts = last_batch_update_ts + id_map[task_id]._last_status_ts = last_batch_update_ts def wait(self, timeout=None, pool_period=30., aborted_nonresponsive_as_running=False): # type: (Optional[float], float, bool) -> bool diff --git a/clearml/backend_interface/task/development/stop_signal.py b/clearml/backend_interface/task/development/stop_signal.py index ba3cb049..9a91e0e1 100644 --- a/clearml/backend_interface/task/development/stop_signal.py +++ b/clearml/backend_interface/task/development/stop_signal.py @@ -30,7 +30,9 @@ class TaskStopSignal(object): try: # we use internal status read, so we do not need to constantly pull the entire task object, # it might be large, and we want to also avoid the edit lock on it. - status, message, _ = self.task._get_status([self.task.id])[0] + # do not update the task object so that other threads can count on consistency in data structure + # because we are running this function in background thread + status, message = self.task._get_status() # if we did not get a proper status, return and recheck later if status is None and message is None: return None diff --git a/clearml/backend_interface/task/task.py b/clearml/backend_interface/task/task.py index 67dd0263..9909edc2 100644 --- a/clearml/backend_interface/task/task.py +++ b/clearml/backend_interface/task/task.py @@ -14,7 +14,6 @@ from operator import itemgetter from tempfile import gettempdir from threading import Thread from typing import Optional, Any, Sequence, Callable, Mapping, Union, List, Set, Dict -from collections import namedtuple from uuid import uuid4 from pathlib2 import Path @@ -1882,10 +1881,7 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin): :return: str: Task status as string (TaskStatusEnum) """ - status, status_message, _ = self._get_status([self.id])[0] - if self._data: - self._data.status = status - self._data.status_message = str(status_message) + status, status_message = self.get_status_message() return str(status) @@ -2277,21 +2273,55 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin): self._files_server = Session.get_files_server_host() return self._files_server + def get_status_message(self): + # type: () -> (Optional[str], Optional[str]) + """ + Return The task status without refreshing the entire Task object (only the status property) + Return also the last message coupled with the status change + + Task Status options: ["created", "in_progress", "stopped", "closed", "failed", "completed", + "queued", "published", "publishing", "unknown"] + Message: is a string + + :return: (Task status as string, last message) + """ + status, status_message, _ = self._get_tasks_status([self.id])[0] + if self._data and status: + self._data.status = status + self._data.status_message = status_message + + return status, status_message + + def _get_status(self): + # type: () -> (Optional[str], Optional[str]) + """ + retrieve Task status & message, But do not update the Task local status + this is important if we want to query in the background without breaking Tasks consistency + + backwards compatibility, + :return: (status enum as string or None, str or None) + """ + status, status_message, _ = self._get_tasks_status([self.id])[0] + return status, status_message + @classmethod - def _get_status(cls, ids): - # type: (List[str]) -> List[namedtuple[Optional[str], Optional[str], Optional[str]]] + def _get_tasks_status(cls, ids): + # type: (List[str]) -> List[(Optional[str], Optional[str], Optional[str])] + """ + :param ids: task IDs (str) to query + :return: list of tuples (status, status_message, task_id) + """ if cls._offline_mode: - return [(tasks.TaskStatusEnum.created, "offline") for _ in ids] - Status = namedtuple("Status", ["status", "status_message", "id"]) + return [(tasks.TaskStatusEnum.created, "offline", i) for i in ids] # noinspection PyBroadException try: all_tasks = cls._get_default_session().send( tasks.GetAllRequest(id=ids, only_fields=["status", "status_message", "id"]), ).response.tasks - return [Status(task.status, task.status_message, task.id) for task in all_tasks] + return [(task.status, task.status_message, task.id) for task in all_tasks] except Exception: - return [Status(None, None, None) for _ in ids] + return [(None, None, None) for _ in ids] def _get_last_update(self): # type: () -> (Optional[datetime])