Change Task.wait_for_status() defaults & Add TrainsJob.is_failed()

This commit is contained in:
allegroai 2020-06-18 01:08:54 +03:00
parent 29b4b7338b
commit 2004b64a5b
2 changed files with 104 additions and 90 deletions

View File

@ -202,12 +202,21 @@ class TrainsJob(object):
"""
Return True, if job is has executed and is not any more
:return: True the task is currently one of these states, stopped / completed / failed.
:return: True the task is currently one of these states, stopped / completed / failed / published.
"""
return self.task.status in (
Task.TaskStatusEnum.stopped, Task.TaskStatusEnum.completed,
Task.TaskStatusEnum.failed, Task.TaskStatusEnum.published)
def is_failed(self):
# type: () -> bool
"""
Return True, if job is has executed and failed
:return: True the task is currently in failed state
"""
return self.task.status in (Task.TaskStatusEnum.failed, )
def is_pending(self):
# type: () -> bool
"""

View File

@ -13,7 +13,7 @@ try:
except ImportError:
from collections import Callable, Sequence as CollectionsSequence
from typing import Optional, Union, Mapping, Sequence, Any, Dict, TYPE_CHECKING, Iterable
from typing import Optional, Union, Mapping, Sequence, Any, Dict, Iterable, TYPE_CHECKING
import psutil
import six
@ -1349,6 +1349,99 @@ class Task(_Task):
"""
self._arguments.copy_from_dict(flatten_dictionary(dictionary))
def execute_remotely(self, queue_name=None, clone=False, exit_process=True):
# type: (Optional[str], bool, bool) -> ()
"""
If task is running locally (i.e., not by ``trains-agent``), then clone the Task and enqueue it for remote
execution; or, stop the execution of the current Task, reset its state, and enqueue it. If ``exit==True``,
*exit* this process.
.. note::
If the task is running remotely (i.e., ``trains-agent`` is executing it), this call is a no-op
(i.e., does nothing).
:param queue_name: The queue name used for enqueueing the task. If ``None``, this call exits the process
without enqueuing the task.
:param clone: Clone the Task and execute the newly cloned Task?
The values are:
- ``True`` - A cloned copy of the Task will be created, and enqueued, instead of this Task.
- ``False`` - The Task will be enqueued.
:param exit_process: The function call will leave the calling process at the end?
- ``True`` - Exit the process (exit(0)).
- ``False`` - Do not exit the process.
.. warning::
If ``clone==False``, then ``exit_process`` must be ``True``.
"""
# do nothing, we are running remotely
if running_remotely():
return
if not clone and not exit_process:
raise ValueError(
"clone==False and exit_process==False is not supported. "
"Task enqueuing itself must exit the process afterwards.")
# make sure we analyze the process
if self.status in (Task.TaskStatusEnum.in_progress, ):
if clone:
# wait for repository detection (5 minutes should be reasonable time to detect all packages)
self.flush(wait_for_uploads=True)
if self._logger and not self.__is_subprocess():
self._wait_for_repo_detection(timeout=300.)
else:
# close ourselves (it will make sure the repo is updated)
self.close()
# clone / reset Task
if clone:
task = Task.clone(self)
else:
task = self
self.reset()
# enqueue ourselves
if queue_name:
Task.enqueue(task, queue_name=queue_name)
LoggerRoot.get_base_logger().warning(
'Switching to remote execution, output log page {}'.format(task.get_output_log_web_page()))
# leave this process.
if exit_process:
LoggerRoot.get_base_logger().warning('Terminating local execution process')
exit(0)
return
def wait_for_status(
self,
status=(_Task.TaskStatusEnum.completed, _Task.TaskStatusEnum.stopped, _Task.TaskStatusEnum.closed),
raise_on_status=(tasks.TaskStatusEnum.failed,),
check_interval_sec=60.,
):
# type: (Iterable[Task.TaskStatusEnum], Optional[Iterable[Task.TaskStatusEnum]], float) -> ()
"""
Wait for a task to reach a defined status.
:param status: Status to wait for. Defaults to ('completed', 'stopped', 'closed', )
:param raise_on_status: Raise RuntimeError if the status of the tasks matches one of these values.
Defaults to ('failed').
:param check_interval_sec: Interval in seconds between two checks. Defaults to 60 seconds.
:raise: RuntimeError if the status is one of {raise_on_status}.
"""
stopped_status = list(status) + (list(raise_on_status) if raise_on_status else [])
while self.status not in stopped_status:
time.sleep(check_interval_sec)
if raise_on_status and self.status in raise_on_status:
raise RuntimeError("Task {} has status: {}.".format(self.task_id, self.status))
@classmethod
def set_credentials(cls, api_host=None, web_host=None, files_host=None, key=None, secret=None, host=None):
# type: (Optional[str], Optional[str], Optional[str], Optional[str], Optional[str], Optional[str]) -> ()
@ -2415,91 +2508,3 @@ class Task(_Task):
return True
return False
def execute_remotely(self, queue_name=None, clone=False, exit_process=True):
# type: (Optional[str], bool, bool) -> ()
"""
If task is running locally (i.e., not by ``trains-agent``), then clone the Task and enqueue it for remote
execution; or, stop the execution of the current Task, reset its state, and enqueue it. If ``exit==True``,
*exit* this process.
.. note::
If the task is running remotely (i.e., ``trains-agent`` is executing it), this call is a no-op
(i.e., does nothing).
:param queue_name: The queue name used for enqueueing the task. If ``None``, this call exits the process
without enqueuing the task.
:param clone: Clone the Task and execute the newly cloned Task?
The values are:
- ``True`` - A cloned copy of the Task will be created, and enqueued, instead of this Task.
- ``False`` - The Task will be enqueued.
:param exit_process: The function call will leave the calling process at the end?
- ``True`` - Exit the process (exit(0)).
- ``False`` - Do not exit the process.
.. warning::
If ``clone==False``, then ``exit_process`` must be ``True``.
"""
# do nothing, we are running remotely
if running_remotely():
return
if not clone and not exit_process:
raise ValueError(
"clone==False and exit_process==False is not supported. "
"Task enqueuing itself must exit the process afterwards.")
# make sure we analyze the process
if self.status in (Task.TaskStatusEnum.in_progress, ):
if clone:
# wait for repository detection (5 minutes should be reasonable time to detect all packages)
self.flush(wait_for_uploads=True)
if self._logger and not self.__is_subprocess():
self._wait_for_repo_detection(timeout=300.)
else:
# close ourselves (it will make sure the repo is updated)
self.close()
# clone / reset Task
if clone:
task = Task.clone(self)
else:
task = self
self.reset()
# enqueue ourselves
if queue_name:
Task.enqueue(task, queue_name=queue_name)
LoggerRoot.get_base_logger().warning(
'Switching to remote execution, output log page {}'.format(task.get_output_log_web_page()))
# leave this process.
if exit_process:
LoggerRoot.get_base_logger().warning('Terminating local execution process')
exit(0)
return
def wait_for_status(self, status=(tasks.TaskStatusEnum.completed),
raise_on_status=(tasks.TaskStatusEnum.failed),
check_interval_sec=60):
# type: (Iterable[tasks.TaskStatusEnum], Iterable[tasks.TaskStatusEnum], int) -> ()
"""
Wait for a task to reach a defined status.
:param status: Status to wait for. Defaults to ('completed')
:param raise_on_status: Raise RuntimeError if the status of the tasks matches one of these values.
Defaults to ('failed').
:param check_interval_sec: Interval in seconds between two checks. Defaults to 60 seconds.
:raise: RuntimeError if the status is one of {raise_on_status}.
"""
while self.status not in status or self.status not in raise_on_status:
time.sleep(check_interval_sec)
if self.status in raise_on_status:
raise RuntimeError("Task {} has status: {}.".format(self.task_id, self.status))