Fix Task.import_offline_session() should support continuing a previous Task

This commit is contained in:
allegroai 2022-04-27 17:19:07 +03:00
parent b1b4bf664f
commit e65b800e93
3 changed files with 39 additions and 18 deletions

View File

@ -257,7 +257,7 @@ class Metrics(InterfaceBase):
pass pass
@classmethod @classmethod
def report_offline_session(cls, task, folder): def report_offline_session(cls, task, folder, iteration_offset=0):
from ... import StorageManager from ... import StorageManager
filename = Path(folder) / cls.__offline_filename filename = Path(folder) / cls.__offline_filename
if not filename.is_file(): if not filename.is_file():
@ -277,6 +277,11 @@ class Metrics(InterfaceBase):
break break
list_requests = json.loads(line) list_requests = json.loads(line)
for r in list_requests: for r in list_requests:
# noinspection PyBroadException
try:
r["iter"] += iteration_offset
except Exception:
pass
org_task_id = r['task'] org_task_id = r['task']
r['task'] = task_id r['task'] = task_id
if r.get('key') and r.get('url'): if r.get('key') and r.get('url'):

View File

@ -292,7 +292,7 @@ class TaskHandler(BufferingHandler):
super(TaskHandler, self).close() super(TaskHandler, self).close()
@classmethod @classmethod
def report_offline_session(cls, task, folder): def report_offline_session(cls, task, folder, iteration_offset=0):
filename = Path(folder) / cls.__offline_filename filename = Path(folder) / cls.__offline_filename
if not filename.is_file(): if not filename.is_file():
return False return False
@ -306,6 +306,11 @@ class TaskHandler(BufferingHandler):
list_requests = json.loads(line) list_requests = json.loads(line)
for r in list_requests: for r in list_requests:
r.pop('task', None) r.pop('task', None)
# noinspection PyBroadException
try:
r["iter"] += iteration_offset
except Exception:
pass
i += 1 i += 1
except StopIteration: except StopIteration:
break break

View File

@ -2488,14 +2488,19 @@ class Task(_Task):
return target_task return target_task
@classmethod @classmethod
def import_offline_session(cls, session_folder_zip): def import_offline_session(cls, session_folder_zip, previous_task_id=None, iteration_offset=0):
# type: (str) -> (Optional[str]) # type: (str, Optional[str], Optional[int]) -> (Optional[str])
""" """
Upload an off line session (execution) of a Task. Upload an off line session (execution) of a Task.
Full Task execution includes repository details, installed packages, artifacts, logs, metric and debug samples. Full Task execution includes repository details, installed packages, artifacts, logs, metric and debug samples.
This function may also be used to continue a previously executed task with a task executed offline.
:param session_folder_zip: Path to a folder containing the session, or zip-file of the session folder. :param session_folder_zip: Path to a folder containing the session, or zip-file of the session folder.
:return: Newly created task ID (str) :param previous_task_id: Task ID of the task you wish to continue with this offline session.
:param iteration_offset: Reporting of the offline session will be offset with the
number specified by this parameter. Useful for avoiding overwriting metrics.
:return: Newly created task ID or the ID of the continued task (previous_task_id)
""" """
print('ClearML: Importing offline session from {}'.format(session_folder_zip)) print('ClearML: Importing offline session from {}'.format(session_folder_zip))
@ -2516,38 +2521,44 @@ class Task(_Task):
except Exception as ex: except Exception as ex:
raise ValueError( raise ValueError(
"Could not read Task object {}: Exception {}".format(session_folder / cls._offline_filename, ex)) "Could not read Task object {}: Exception {}".format(session_folder / cls._offline_filename, ex))
task = cls.import_task(export_data) current_task = cls.import_task(export_data)
task.mark_started(force=True) if previous_task_id:
task_holding_reports = cls.get_task(task_id=previous_task_id)
task_holding_reports.mark_started(force=True)
task_holding_reports = cls.import_task(export_data, target_task=task_holding_reports, update=True)
else:
task_holding_reports = current_task
task_holding_reports.mark_started(force=True)
# fix artifacts # fix artifacts
if task.data.execution.artifacts: if current_task.data.execution.artifacts:
from . import StorageManager from . import StorageManager
# noinspection PyProtectedMember # noinspection PyProtectedMember
offline_folder = os.path.join(export_data.get('offline_folder', ''), 'data/') offline_folder = os.path.join(export_data.get('offline_folder', ''), 'data/')
# noinspection PyProtectedMember # noinspection PyProtectedMember
remote_url = task._get_default_report_storage_uri() remote_url = current_task._get_default_report_storage_uri()
if remote_url and remote_url.endswith('/'): if remote_url and remote_url.endswith('/'):
remote_url = remote_url[:-1] remote_url = remote_url[:-1]
for artifact in task.data.execution.artifacts: for artifact in current_task.data.execution.artifacts:
local_path = artifact.uri.replace(offline_folder, '', 1) local_path = artifact.uri.replace(offline_folder, '', 1)
local_file = session_folder / 'data' / local_path local_file = session_folder / 'data' / local_path
if local_file.is_file(): if local_file.is_file():
remote_path = local_path.replace( remote_path = local_path.replace(
'.{}{}'.format(export_data['id'], os.sep), '.{}{}'.format(task.id, os.sep), 1) '.{}{}'.format(export_data['id'], os.sep), '.{}{}'.format(current_task.id, os.sep), 1)
artifact.uri = '{}/{}'.format(remote_url, remote_path) artifact.uri = '{}/{}'.format(remote_url, remote_path)
StorageManager.upload_file(local_file=local_file.as_posix(), remote_url=artifact.uri) StorageManager.upload_file(local_file=local_file.as_posix(), remote_url=artifact.uri)
# noinspection PyProtectedMember # noinspection PyProtectedMember
task._edit(execution=task.data.execution) task_holding_reports._edit(execution=current_task.data.execution)
# logs # logs
TaskHandler.report_offline_session(task, session_folder) TaskHandler.report_offline_session(task_holding_reports, session_folder, iteration_offset=iteration_offset)
# metrics # metrics
Metrics.report_offline_session(task, session_folder) Metrics.report_offline_session(task_holding_reports, session_folder, iteration_offset=iteration_offset)
# print imported results page # print imported results page
print('ClearML results page: {}'.format(task.get_output_log_web_page())) print('ClearML results page: {}'.format(task_holding_reports.get_output_log_web_page()))
task.mark_completed() task_holding_reports.mark_completed()
# close task # close task
task.close() task_holding_reports.close()
# cleanup # cleanup
if temp_folder: if temp_folder:
@ -2557,7 +2568,7 @@ class Task(_Task):
except Exception: except Exception:
pass pass
return task.id return task_holding_reports.id
@classmethod @classmethod
def set_credentials( def set_credentials(