diff --git a/trains/automation/job.py b/trains/automation/job.py index 05bf2a04..a9d25b79 100644 --- a/trains/automation/job.py +++ b/trains/automation/job.py @@ -202,12 +202,21 @@ class TrainsJob(object): """ Return True, if job is has executed and is not any more - :return: True the task is currently one of these states, stopped / completed / failed. + :return: True the task is currently one of these states, stopped / completed / failed / published. """ return self.task.status in ( Task.TaskStatusEnum.stopped, Task.TaskStatusEnum.completed, Task.TaskStatusEnum.failed, Task.TaskStatusEnum.published) + def is_failed(self): + # type: () -> bool + """ + Return True, if job is has executed and failed + + :return: True the task is currently in failed state + """ + return self.task.status in (Task.TaskStatusEnum.failed, ) + def is_pending(self): # type: () -> bool """ diff --git a/trains/task.py b/trains/task.py index c02521c0..c486f341 100644 --- a/trains/task.py +++ b/trains/task.py @@ -13,7 +13,7 @@ try: except ImportError: from collections import Callable, Sequence as CollectionsSequence -from typing import Optional, Union, Mapping, Sequence, Any, Dict, TYPE_CHECKING, Iterable +from typing import Optional, Union, Mapping, Sequence, Any, Dict, Iterable, TYPE_CHECKING import psutil import six @@ -1349,6 +1349,99 @@ class Task(_Task): """ self._arguments.copy_from_dict(flatten_dictionary(dictionary)) + def execute_remotely(self, queue_name=None, clone=False, exit_process=True): + # type: (Optional[str], bool, bool) -> () + """ + If task is running locally (i.e., not by ``trains-agent``), then clone the Task and enqueue it for remote + execution; or, stop the execution of the current Task, reset its state, and enqueue it. If ``exit==True``, + *exit* this process. + + .. note:: + If the task is running remotely (i.e., ``trains-agent`` is executing it), this call is a no-op + (i.e., does nothing). + + :param queue_name: The queue name used for enqueueing the task. If ``None``, this call exits the process + without enqueuing the task. + :param clone: Clone the Task and execute the newly cloned Task? + + The values are: + + - ``True`` - A cloned copy of the Task will be created, and enqueued, instead of this Task. + - ``False`` - The Task will be enqueued. + + :param exit_process: The function call will leave the calling process at the end? + + - ``True`` - Exit the process (exit(0)). + - ``False`` - Do not exit the process. + + .. warning:: + + If ``clone==False``, then ``exit_process`` must be ``True``. + """ + # do nothing, we are running remotely + if running_remotely(): + return + + if not clone and not exit_process: + raise ValueError( + "clone==False and exit_process==False is not supported. " + "Task enqueuing itself must exit the process afterwards.") + + # make sure we analyze the process + if self.status in (Task.TaskStatusEnum.in_progress, ): + if clone: + # wait for repository detection (5 minutes should be reasonable time to detect all packages) + self.flush(wait_for_uploads=True) + if self._logger and not self.__is_subprocess(): + self._wait_for_repo_detection(timeout=300.) + else: + # close ourselves (it will make sure the repo is updated) + self.close() + + # clone / reset Task + if clone: + task = Task.clone(self) + else: + task = self + self.reset() + + # enqueue ourselves + if queue_name: + Task.enqueue(task, queue_name=queue_name) + LoggerRoot.get_base_logger().warning( + 'Switching to remote execution, output log page {}'.format(task.get_output_log_web_page())) + + # leave this process. + if exit_process: + LoggerRoot.get_base_logger().warning('Terminating local execution process') + exit(0) + + return + + def wait_for_status( + self, + status=(_Task.TaskStatusEnum.completed, _Task.TaskStatusEnum.stopped, _Task.TaskStatusEnum.closed), + raise_on_status=(tasks.TaskStatusEnum.failed,), + check_interval_sec=60., + ): + # type: (Iterable[Task.TaskStatusEnum], Optional[Iterable[Task.TaskStatusEnum]], float) -> () + """ + Wait for a task to reach a defined status. + + :param status: Status to wait for. Defaults to ('completed', 'stopped', 'closed', ) + :param raise_on_status: Raise RuntimeError if the status of the tasks matches one of these values. + Defaults to ('failed'). + :param check_interval_sec: Interval in seconds between two checks. Defaults to 60 seconds. + + :raise: RuntimeError if the status is one of {raise_on_status}. + """ + stopped_status = list(status) + (list(raise_on_status) if raise_on_status else []) + while self.status not in stopped_status: + time.sleep(check_interval_sec) + + if raise_on_status and self.status in raise_on_status: + raise RuntimeError("Task {} has status: {}.".format(self.task_id, self.status)) + @classmethod def set_credentials(cls, api_host=None, web_host=None, files_host=None, key=None, secret=None, host=None): # type: (Optional[str], Optional[str], Optional[str], Optional[str], Optional[str], Optional[str]) -> () @@ -2415,91 +2508,3 @@ class Task(_Task): return True return False - - def execute_remotely(self, queue_name=None, clone=False, exit_process=True): - # type: (Optional[str], bool, bool) -> () - """ - If task is running locally (i.e., not by ``trains-agent``), then clone the Task and enqueue it for remote - execution; or, stop the execution of the current Task, reset its state, and enqueue it. If ``exit==True``, - *exit* this process. - - .. note:: - If the task is running remotely (i.e., ``trains-agent`` is executing it), this call is a no-op - (i.e., does nothing). - - :param queue_name: The queue name used for enqueueing the task. If ``None``, this call exits the process - without enqueuing the task. - :param clone: Clone the Task and execute the newly cloned Task? - - The values are: - - - ``True`` - A cloned copy of the Task will be created, and enqueued, instead of this Task. - - ``False`` - The Task will be enqueued. - - :param exit_process: The function call will leave the calling process at the end? - - - ``True`` - Exit the process (exit(0)). - - ``False`` - Do not exit the process. - - .. warning:: - - If ``clone==False``, then ``exit_process`` must be ``True``. - """ - # do nothing, we are running remotely - if running_remotely(): - return - - if not clone and not exit_process: - raise ValueError( - "clone==False and exit_process==False is not supported. " - "Task enqueuing itself must exit the process afterwards.") - - # make sure we analyze the process - if self.status in (Task.TaskStatusEnum.in_progress, ): - if clone: - # wait for repository detection (5 minutes should be reasonable time to detect all packages) - self.flush(wait_for_uploads=True) - if self._logger and not self.__is_subprocess(): - self._wait_for_repo_detection(timeout=300.) - else: - # close ourselves (it will make sure the repo is updated) - self.close() - - # clone / reset Task - if clone: - task = Task.clone(self) - else: - task = self - self.reset() - - # enqueue ourselves - if queue_name: - Task.enqueue(task, queue_name=queue_name) - LoggerRoot.get_base_logger().warning( - 'Switching to remote execution, output log page {}'.format(task.get_output_log_web_page())) - - # leave this process. - if exit_process: - LoggerRoot.get_base_logger().warning('Terminating local execution process') - exit(0) - - return - - def wait_for_status(self, status=(tasks.TaskStatusEnum.completed), - raise_on_status=(tasks.TaskStatusEnum.failed), - check_interval_sec=60): - # type: (Iterable[tasks.TaskStatusEnum], Iterable[tasks.TaskStatusEnum], int) -> () - """ - Wait for a task to reach a defined status. - - :param status: Status to wait for. Defaults to ('completed') - :param raise_on_status: Raise RuntimeError if the status of the tasks matches one of these values. - Defaults to ('failed'). - :param check_interval_sec: Interval in seconds between two checks. Defaults to 60 seconds. - :raise: RuntimeError if the status is one of {raise_on_status}. - """ - while self.status not in status or self.status not in raise_on_status: - time.sleep(check_interval_sec) - - if self.status in raise_on_status: - raise RuntimeError("Task {} has status: {}.".format(self.task_id, self.status))