From 0ade8b07175288b52a58061bbe78a5fbed500b71 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Sat, 16 Oct 2021 23:13:49 +0300 Subject: [PATCH] Fix LocalClearmlJob setting failed status Cache ClreamlJob state (refresh every one second) --- clearml/automation/job.py | 67 ++++++++++++++++++++++++++++++--------- 1 file changed, 52 insertions(+), 15 deletions(-) diff --git a/clearml/automation/job.py b/clearml/automation/job.py index e8ff416f..ac83a4c0 100644 --- a/clearml/automation/job.py +++ b/clearml/automation/job.py @@ -38,6 +38,8 @@ class BaseJob(object): self.task_parameter_override = None self.task = None self.task_started = False + self._last_status_ts = 0 + self._last_status = None def get_metric(self, title, series): # type: (str, str) -> (float, float, float) @@ -96,10 +98,15 @@ class BaseJob(object): """ if not self.task or self._is_cached_task: return - try: - self.task.stopped() - except Exception as ex: - logger.warning(ex) + + if self.task.status == Task.TaskStatusEnum.queued: + Task.dequeue(self.task) + + elif self.task.status == Task.TaskStatusEnum.in_progress: + try: + self.task.stopped() + except Exception as ex: + logger.warning(ex) def elapsed(self): # type: () -> float @@ -138,14 +145,22 @@ class BaseJob(object): """ return self.task.id - def status(self): - # type: () -> str + def status(self, force=False): + # type: (bool) -> str """ Return the Job Task current status, see Task.TaskStatusEnum + :param force: Force status update, otherwise, only refresh state every 1 sec + :return: Task status Task.TaskStatusEnum in string. """ - return self.task.status + if self._last_status and not force and time() - self._last_status_ts < 1.: + return self._last_status + + self._last_status = self.task.status + # update timestamp after api call status() + self._last_status_ts = time() + return self._last_status def wait(self, timeout=None, pool_period=30.): # type: (Optional[float], float) -> bool @@ -201,7 +216,7 @@ class BaseJob(object): :return: True, if the task is currently in progress. """ - return self.task.status == Task.TaskStatusEnum.in_progress + return self.status() == Task.TaskStatusEnum.in_progress def is_stopped(self): # type: () -> bool @@ -210,7 +225,7 @@ class BaseJob(object): :return: True the task is currently one of these states, stopped / completed / failed / published. """ - return self.task.status in ( + return self.status() in ( Task.TaskStatusEnum.stopped, Task.TaskStatusEnum.completed, Task.TaskStatusEnum.failed, Task.TaskStatusEnum.published) @@ -221,7 +236,7 @@ class BaseJob(object): :return: True the task is currently in failed state """ - return self.task.status in (Task.TaskStatusEnum.failed, ) + return self.status() in (Task.TaskStatusEnum.failed, ) def is_completed(self): # type: () -> bool @@ -230,7 +245,7 @@ class BaseJob(object): :return: True the task is currently in completed or published state """ - return self.task.status in (Task.TaskStatusEnum.completed, Task.TaskStatusEnum.published) + return self.status() in (Task.TaskStatusEnum.completed, Task.TaskStatusEnum.published) def is_aborted(self): # type: () -> bool @@ -239,7 +254,7 @@ class BaseJob(object): :return: True the task is currently in aborted state """ - return self.task.status in (Task.TaskStatusEnum.stopped, ) + return self.status() in (Task.TaskStatusEnum.stopped, ) def is_pending(self): # type: () -> bool @@ -248,7 +263,7 @@ class BaseJob(object): :return: True the task is currently is currently queued. """ - return self.task.status in (Task.TaskStatusEnum.queued, Task.TaskStatusEnum.created) + return self.status() in (Task.TaskStatusEnum.queued, Task.TaskStatusEnum.created) def started(self): # type: () -> bool @@ -615,7 +630,7 @@ class LocalClearmlJob(ClearmlJob): """ Wait until Job subprocess completed/exited - :param timeout: Timeout in seconds to wait for the subprocess to finish. Default None==infinite + :param timeout: Timeout in seconds to wait for the subprocess to finish. Default: None => infinite :return Sub-process exit code. 0 is success, None if subprocess is not running or timeout """ if not self._job_process: @@ -637,10 +652,32 @@ class LocalClearmlJob(ClearmlJob): if exit_code == 0: self.task.mark_completed() else: - self.task.mark_failed() + user_aborted = False + if self.task.status == Task.TaskStatusEnum.stopped: + self.task.reload() + if str(self.task.data.status_reason).lower().startswith('user aborted'): + user_aborted = True + + if not user_aborted: + self.task.mark_failed(force=True) return exit_code + def status(self, force=False): + # type: (bool) -> str + """ + Return the Job Task current status, see Task.TaskStatusEnum + + :param force: Force status update, otherwise, only refresh state every 1 sec + + :return: Task status Task.TaskStatusEnum in string. + """ + if self._job_process: + # refresh the task state, we need to do it manually + self.wait_for_process(timeout=0) + + return super(LocalClearmlJob, self).status(force=force) + class RunningJob(BaseJob): """