1
0
mirror of https://github.com/clearml/clearml synced 2025-04-16 21:42:10 +00:00

Fix LocalClearmlJob setting failed status

Cache ClreamlJob state (refresh every one second)
This commit is contained in:
allegroai 2021-10-16 23:13:49 +03:00
parent 88e4f8db82
commit 0ade8b0717

View File

@ -38,6 +38,8 @@ class BaseJob(object):
self.task_parameter_override = None self.task_parameter_override = None
self.task = None self.task = None
self.task_started = False self.task_started = False
self._last_status_ts = 0
self._last_status = None
def get_metric(self, title, series): def get_metric(self, title, series):
# type: (str, str) -> (float, float, float) # type: (str, str) -> (float, float, float)
@ -96,10 +98,15 @@ class BaseJob(object):
""" """
if not self.task or self._is_cached_task: if not self.task or self._is_cached_task:
return return
try:
self.task.stopped() if self.task.status == Task.TaskStatusEnum.queued:
except Exception as ex: Task.dequeue(self.task)
logger.warning(ex)
elif self.task.status == Task.TaskStatusEnum.in_progress:
try:
self.task.stopped()
except Exception as ex:
logger.warning(ex)
def elapsed(self): def elapsed(self):
# type: () -> float # type: () -> float
@ -138,14 +145,22 @@ class BaseJob(object):
""" """
return self.task.id return self.task.id
def status(self): def status(self, force=False):
# type: () -> str # type: (bool) -> str
""" """
Return the Job Task current status, see Task.TaskStatusEnum Return the Job Task current status, see Task.TaskStatusEnum
:param force: Force status update, otherwise, only refresh state every 1 sec
:return: Task status Task.TaskStatusEnum in string. :return: Task status Task.TaskStatusEnum in string.
""" """
return self.task.status if self._last_status and not force and time() - self._last_status_ts < 1.:
return self._last_status
self._last_status = self.task.status
# update timestamp after api call status()
self._last_status_ts = time()
return self._last_status
def wait(self, timeout=None, pool_period=30.): def wait(self, timeout=None, pool_period=30.):
# type: (Optional[float], float) -> bool # type: (Optional[float], float) -> bool
@ -201,7 +216,7 @@ class BaseJob(object):
:return: True, if the task is currently in progress. :return: True, if the task is currently in progress.
""" """
return self.task.status == Task.TaskStatusEnum.in_progress return self.status() == Task.TaskStatusEnum.in_progress
def is_stopped(self): def is_stopped(self):
# type: () -> bool # type: () -> bool
@ -210,7 +225,7 @@ class BaseJob(object):
:return: True the task is currently one of these states, stopped / completed / failed / published. :return: True the task is currently one of these states, stopped / completed / failed / published.
""" """
return self.task.status in ( return self.status() in (
Task.TaskStatusEnum.stopped, Task.TaskStatusEnum.completed, Task.TaskStatusEnum.stopped, Task.TaskStatusEnum.completed,
Task.TaskStatusEnum.failed, Task.TaskStatusEnum.published) Task.TaskStatusEnum.failed, Task.TaskStatusEnum.published)
@ -221,7 +236,7 @@ class BaseJob(object):
:return: True the task is currently in failed state :return: True the task is currently in failed state
""" """
return self.task.status in (Task.TaskStatusEnum.failed, ) return self.status() in (Task.TaskStatusEnum.failed, )
def is_completed(self): def is_completed(self):
# type: () -> bool # type: () -> bool
@ -230,7 +245,7 @@ class BaseJob(object):
:return: True the task is currently in completed or published state :return: True the task is currently in completed or published state
""" """
return self.task.status in (Task.TaskStatusEnum.completed, Task.TaskStatusEnum.published) return self.status() in (Task.TaskStatusEnum.completed, Task.TaskStatusEnum.published)
def is_aborted(self): def is_aborted(self):
# type: () -> bool # type: () -> bool
@ -239,7 +254,7 @@ class BaseJob(object):
:return: True the task is currently in aborted state :return: True the task is currently in aborted state
""" """
return self.task.status in (Task.TaskStatusEnum.stopped, ) return self.status() in (Task.TaskStatusEnum.stopped, )
def is_pending(self): def is_pending(self):
# type: () -> bool # type: () -> bool
@ -248,7 +263,7 @@ class BaseJob(object):
:return: True the task is currently is currently queued. :return: True the task is currently is currently queued.
""" """
return self.task.status in (Task.TaskStatusEnum.queued, Task.TaskStatusEnum.created) return self.status() in (Task.TaskStatusEnum.queued, Task.TaskStatusEnum.created)
def started(self): def started(self):
# type: () -> bool # type: () -> bool
@ -615,7 +630,7 @@ class LocalClearmlJob(ClearmlJob):
""" """
Wait until Job subprocess completed/exited Wait until Job subprocess completed/exited
:param timeout: Timeout in seconds to wait for the subprocess to finish. Default None==infinite :param timeout: Timeout in seconds to wait for the subprocess to finish. Default: None => infinite
:return Sub-process exit code. 0 is success, None if subprocess is not running or timeout :return Sub-process exit code. 0 is success, None if subprocess is not running or timeout
""" """
if not self._job_process: if not self._job_process:
@ -637,10 +652,32 @@ class LocalClearmlJob(ClearmlJob):
if exit_code == 0: if exit_code == 0:
self.task.mark_completed() self.task.mark_completed()
else: else:
self.task.mark_failed() user_aborted = False
if self.task.status == Task.TaskStatusEnum.stopped:
self.task.reload()
if str(self.task.data.status_reason).lower().startswith('user aborted'):
user_aborted = True
if not user_aborted:
self.task.mark_failed(force=True)
return exit_code return exit_code
def status(self, force=False):
# type: (bool) -> str
"""
Return the Job Task current status, see Task.TaskStatusEnum
:param force: Force status update, otherwise, only refresh state every 1 sec
:return: Task status Task.TaskStatusEnum in string.
"""
if self._job_process:
# refresh the task state, we need to do it manually
self.wait_for_process(timeout=0)
return super(LocalClearmlJob, self).status(force=force)
class RunningJob(BaseJob): class RunningJob(BaseJob):
""" """