Add Task.running_locally() and Task.execute_remotely()

This commit is contained in:
allegroai 2020-05-31 12:13:54 +03:00
parent dcd16abd3b
commit 2089dde55a
2 changed files with 83 additions and 6 deletions

View File

@ -1037,6 +1037,20 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
self._data.status = status
return str(status)
def get_output_log_web_page(self):
# type: () -> str
"""
Return the Task results & outputs web page address.
For example: https://demoapp.trains.allegro.ai/projects/216431/experiments/60763e04/output/log
:return: http/s url link
"""
return '{}/projects/{}/experiments/{}/output/log'.format(
self._get_app_server(),
self.project if self.project is not None else '*',
self.id,
)
def get_reported_scalars(
self,
max_samples=0, # type: int
@ -1098,6 +1112,16 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
lines = [r.get('msg', '') for r in response.response_data['events']]
return lines
@staticmethod
def running_locally():
# type: () -> bool
"""
If the task is not executed by trains-agent, return True (i.e. running locally)
:return: True if not executed by trains-agent
"""
return not running_remotely()
@classmethod
def add_requirements(cls, package_name, package_version=None):
# type: (str, Optional[str]) -> ()

View File

@ -473,11 +473,7 @@ class Task(_Task):
# show the debug metrics page in the log, it is very convenient
if not is_sub_process_task_id:
logger.report_text(
'TRAINS results page: {}/projects/{}/experiments/{}/output/log'.format(
task._get_app_server(),
task.project if task.project is not None else '*',
task.id,
),
'TRAINS results page: {}'.format(task.get_output_log_web_page()),
)
# Make sure we start the dev worker if required, otherwise it will only be started when we write
# something to the log.
@ -678,7 +674,7 @@ class Task(_Task):
@classmethod
def enqueue(cls, task, queue_name=None, queue_id=None):
# type: (Task, Optional[str], Optional[str]) -> Any
# type: (Union[Task, str], Optional[str], Optional[str]) -> Any
"""
Enqueue a Task for execution, by adding it to an execution queue.
@ -2409,3 +2405,60 @@ class Task(_Task):
return True
return False
def execute_remotely(self, queue_name=None, clone=False, exit_process=True):
# type: (Optional[str], bool, bool) -> ()
"""
If task is running locally (i.e. not by 'trains-agent'),
this call will clone the Task and enqueue it for remote execution.
Or it will stop the execution of the current task, reset its state, and enqueue it.
Finally if exit==True it will *exit* this process!
If task is executed by trains-agent (i.e. running remotely),
this call is a no-op (i.e. does nothing).
:param queue_name: Queue name used for enqueueing the task.
If None, this call will exit the process without enqueuing the task.
:param clone: If True a cloned copy of the Task will be created (and enqueued) instead of this Task.
:param exit_process: If True, the function call will leave the calling process at the end
i.e. If True, exit(0) will be called. If clone==False exit_process must be True.
"""
# do nothing, we are running remotely
if running_remotely():
return
if not clone and not exit_process:
raise ValueError(
"clone==False and exit_process==False is not supported. "
"Task enqueuing itself must exit the process afterwards.")
# make sure we analyze the process
if self.status in (Task.TaskStatusEnum.in_progress, ):
if clone:
# wait for repository detection (5 minutes should be reasonable time to detect all packages)
self.flush(wait_for_uploads=True)
if self._logger and not self.__is_subprocess():
self._wait_for_repo_detection(timeout=300.)
else:
# close ourselves (it will make sure the repo is updated)
self.close()
# clone / reset Task
if clone:
task = Task.clone(self)
else:
task = self
self.reset()
# enqueue ourselves
if queue_name:
Task.enqueue(task, queue_name=queue_name)
LoggerRoot.get_base_logger().warning(
'Switching to remote execution, output log page {}'.format(task.get_output_log_web_page()))
# leave this process.
if exit_process:
LoggerRoot.get_base_logger().warning('Terminating local execution process')
exit(0)
return