From 2089dde55a7d504050da5dd9252987207ec6ce18 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Sun, 31 May 2020 12:13:54 +0300 Subject: [PATCH] Add Task.running_locally() and Task.execute_remotely() --- trains/backend_interface/task/task.py | 24 ++++++++++ trains/task.py | 65 ++++++++++++++++++++++++--- 2 files changed, 83 insertions(+), 6 deletions(-) diff --git a/trains/backend_interface/task/task.py b/trains/backend_interface/task/task.py index f2aa2921..7021fa12 100644 --- a/trains/backend_interface/task/task.py +++ b/trains/backend_interface/task/task.py @@ -1037,6 +1037,20 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin): self._data.status = status return str(status) + def get_output_log_web_page(self): + # type: () -> str + """ + Return the Task results & outputs web page address. + For example: https://demoapp.trains.allegro.ai/projects/216431/experiments/60763e04/output/log + + :return: http/s url link + """ + return '{}/projects/{}/experiments/{}/output/log'.format( + self._get_app_server(), + self.project if self.project is not None else '*', + self.id, + ) + def get_reported_scalars( self, max_samples=0, # type: int @@ -1098,6 +1112,16 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin): lines = [r.get('msg', '') for r in response.response_data['events']] return lines + @staticmethod + def running_locally(): + # type: () -> bool + """ + If the task is not executed by trains-agent, return True (i.e. running locally) + + :return: True if not executed by trains-agent + """ + return not running_remotely() + @classmethod def add_requirements(cls, package_name, package_version=None): # type: (str, Optional[str]) -> () diff --git a/trains/task.py b/trains/task.py index 796b7729..a079640a 100644 --- a/trains/task.py +++ b/trains/task.py @@ -473,11 +473,7 @@ class Task(_Task): # show the debug metrics page in the log, it is very convenient if not is_sub_process_task_id: logger.report_text( - 'TRAINS results page: {}/projects/{}/experiments/{}/output/log'.format( - task._get_app_server(), - task.project if task.project is not None else '*', - task.id, - ), + 'TRAINS results page: {}'.format(task.get_output_log_web_page()), ) # Make sure we start the dev worker if required, otherwise it will only be started when we write # something to the log. @@ -678,7 +674,7 @@ class Task(_Task): @classmethod def enqueue(cls, task, queue_name=None, queue_id=None): - # type: (Task, Optional[str], Optional[str]) -> Any + # type: (Union[Task, str], Optional[str], Optional[str]) -> Any """ Enqueue a Task for execution, by adding it to an execution queue. @@ -2409,3 +2405,60 @@ class Task(_Task): return True return False + + def execute_remotely(self, queue_name=None, clone=False, exit_process=True): + # type: (Optional[str], bool, bool) -> () + """ + If task is running locally (i.e. not by 'trains-agent'), + this call will clone the Task and enqueue it for remote execution. + Or it will stop the execution of the current task, reset its state, and enqueue it. + Finally if exit==True it will *exit* this process! + + If task is executed by trains-agent (i.e. running remotely), + this call is a no-op (i.e. does nothing). + + :param queue_name: Queue name used for enqueueing the task. + If None, this call will exit the process without enqueuing the task. + :param clone: If True a cloned copy of the Task will be created (and enqueued) instead of this Task. + :param exit_process: If True, the function call will leave the calling process at the end + i.e. If True, exit(0) will be called. If clone==False exit_process must be True. + """ + # do nothing, we are running remotely + if running_remotely(): + return + + if not clone and not exit_process: + raise ValueError( + "clone==False and exit_process==False is not supported. " + "Task enqueuing itself must exit the process afterwards.") + + # make sure we analyze the process + if self.status in (Task.TaskStatusEnum.in_progress, ): + if clone: + # wait for repository detection (5 minutes should be reasonable time to detect all packages) + self.flush(wait_for_uploads=True) + if self._logger and not self.__is_subprocess(): + self._wait_for_repo_detection(timeout=300.) + else: + # close ourselves (it will make sure the repo is updated) + self.close() + + # clone / reset Task + if clone: + task = Task.clone(self) + else: + task = self + self.reset() + + # enqueue ourselves + if queue_name: + Task.enqueue(task, queue_name=queue_name) + LoggerRoot.get_base_logger().warning( + 'Switching to remote execution, output log page {}'.format(task.get_output_log_web_page())) + + # leave this process. + if exit_process: + LoggerRoot.get_base_logger().warning('Terminating local execution process') + exit(0) + + return