Add support for offline datasets and JSON previews

This commit is contained in:
Alex Burlacu 2023-05-25 18:15:33 +03:00
parent 60c3a5ef98
commit 5772a1551e
4 changed files with 249 additions and 56 deletions

View File

@ -655,7 +655,7 @@ class Session(TokenManager):
if session:
active_sessions.append(session)
new_sessions_weakrefs.append(session_weakref)
cls._sessions_weakrefs = session_weakref
cls._sessions_weakrefs = new_sessions_weakrefs
return active_sessions
@classmethod

View File

@ -118,6 +118,13 @@ class HyperParams(object):
item = make_item(i)
props.update({item.name: item})
if self.task.is_offline():
hyperparams = self.task.data.hyperparams or {}
hyperparams.setdefault("properties", tasks.SectionParams())
hyperparams["properties"].update(props)
self.task._save_data_to_offline_dir(hyperparams=hyperparams)
return True
res = self.task.session.send(
tasks.EditHyperParamsRequest(
task=self.task.task_id,

View File

@ -369,7 +369,13 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
)
res = self.send(req)
return res.response.id if res else 'offline-{}'.format(str(uuid4()).replace("-", ""))
if res:
return res.response.id
id = "offline-{}".format(str(uuid4()).replace("-", ""))
self._edit(type=tasks.TaskTypeEnum(task_type))
return id
def _set_storage_uri(self, value):
value = value.rstrip('/') if value else None
@ -1962,6 +1968,8 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
'iter': iteration (default), 'timestamp': timestamp as milliseconds since epoch, 'iso_time': absolute time
:return: dict: Nested scalar graphs: dict[title(str), dict[series(str), dict[axis(str), list(float)]]]
"""
scalar_metrics_iter_histogram_request_max_size = 4800
if x_axis not in ('iter', 'timestamp', 'iso_time'):
raise ValueError("Scalar x-axis supported values are: 'iter', 'timestamp', 'iso_time'")
@ -1978,8 +1986,51 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
if not response.ok() or not response.response_data:
return {}
metrics_returned = 0
for metric in response.response_data.values():
for series in metric.values():
metrics_returned += len(series.get("x", []))
if metrics_returned >= scalar_metrics_iter_histogram_request_max_size:
return self._get_all_reported_scalars(x_axis)
return response.response_data
def _get_all_reported_scalars(self, x_axis):
reported_scalars = {}
batch_size = 1000
scroll_id = None
while True:
response = self.send(
events.GetTaskEventsRequest(
task=self.id, event_type="training_stats_scalar", scroll_id=scroll_id, batch_size=batch_size
)
)
if not response:
return reported_scalars
response = response.wait()
if not response.ok() or not response.response_data:
return reported_scalars
response = response.response_data
for event in response.get("events", []):
metric = event["metric"]
variant = event["variant"]
if x_axis in ["timestamp", "iter"]:
x_val = event[x_axis]
else:
x_val = datetime.utcfromtimestamp(event["timestamp"] / 1000).isoformat(timespec="milliseconds") + "Z"
y_val = event["value"]
reported_scalars.setdefault(metric, {})
reported_scalars[metric].setdefault(variant, {"name": variant, "x": [], "y": []})
if len(reported_scalars[metric][variant]["x"]) == 0 or reported_scalars[metric][variant]["x"][-1] != x_val:
reported_scalars[metric][variant]["x"].append(x_val)
reported_scalars[metric][variant]["y"].append(y_val)
else:
reported_scalars[metric][variant]["y"][-1] = y_val
if response.get("returned", 0) < batch_size or not response.get("scroll_id"):
break
scroll_id = response["scroll_id"]
return reported_scalars
def get_reported_plots(
self,
max_iterations=None
@ -2459,19 +2510,26 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
"""
return running_remotely() and get_remote_task_id() == self.id
def _save_data_to_offline_dir(self, **kwargs):
# type: (**Any) -> ()
for k, v in kwargs.items():
setattr(self.data, k, v)
offline_mode_folder = self.get_offline_mode_folder()
if not offline_mode_folder:
return
Path(offline_mode_folder).mkdir(parents=True, exist_ok=True)
with open((offline_mode_folder / self._offline_filename).as_posix(), "wt") as f:
export_data = self.data.to_dict()
export_data["project_name"] = self.get_project_name()
export_data["offline_folder"] = self.get_offline_mode_folder().as_posix()
export_data["offline_output_models"] = self._offline_output_models
json.dump(export_data, f, ensure_ascii=True, sort_keys=True)
def _edit(self, **kwargs):
# type: (**Any) -> Any
with self._edit_lock:
if self._offline_mode:
for k, v in kwargs.items():
setattr(self.data, k, v)
Path(self.get_offline_mode_folder()).mkdir(parents=True, exist_ok=True)
with open((self.get_offline_mode_folder() / self._offline_filename).as_posix(), "wt") as f:
export_data = self.data.to_dict()
export_data["project_name"] = self.get_project_name()
export_data["offline_folder"] = self.get_offline_mode_folder().as_posix()
export_data["offline_output_models"] = self._offline_output_models
json.dump(export_data, f, ensure_ascii=True, sort_keys=True)
self._save_data_to_offline_dir(**kwargs)
return None
# Since we ae using forced update, make sure he task status is valid
@ -2593,6 +2651,8 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
Return the folder where all the task outputs and logs are stored in the offline session.
:return: Path object, local folder, later to be used with `report_offline_session()`
"""
if not self.task_id:
return None
if self._offline_dir:
return self._offline_dir
if not self._offline_mode:

View File

@ -122,12 +122,14 @@ class Dataset(object):
__hyperparams_section = "Datasets"
__datasets_runtime_prop = "datasets"
__orig_datasets_runtime_prop_prefix = "orig_datasets"
__preview_media_max_file_size = deferred_config("dataset.preview.media.max_file_size", 5 * 1024 * 1024, transform=int)
__preview_tabular_table_count = deferred_config("dataset.preview.tabular.table_count", 10, transform=int)
__preview_tabular_row_count = deferred_config("dataset.preview.tabular.row_count", 10, transform=int)
__preview_media_image_count = deferred_config("dataset.preview.media.image_count", 10, transform=int)
__preview_media_video_count = deferred_config("dataset.preview.media.video_count", 10, transform=int)
__preview_media_audio_count = deferred_config("dataset.preview.media.audio_count", 10, transform=int)
__preview_media_html_count = deferred_config("dataset.preview.media.html_count", 10, transform=int)
__preview_media_json_count = deferred_config("dataset.preview.media.json_count", 10, transform=int)
_dataset_chunk_size_mb = deferred_config("storage.dataset_chunk_size_mb", 512, transform=int)
def __init__(
@ -191,7 +193,7 @@ class Dataset(object):
if "/.datasets/" not in task.get_project_name() or "":
dataset_project, parent_project = self._build_hidden_project_name(task.get_project_name(), task.name)
task.move_to_project(new_project_name=dataset_project)
if bool(Session.check_min_api_server_version(Dataset.__min_api_version)):
if Dataset.is_offline() or bool(Session.check_min_api_server_version(Dataset.__min_api_version)):
get_or_create_project(task.session, project_name=parent_project, system_tags=[self.__hidden_tag])
get_or_create_project(
task.session,
@ -202,9 +204,21 @@ class Dataset(object):
else:
self._created_task = True
dataset_project, parent_project = self._build_hidden_project_name(dataset_project, dataset_name)
task = Task.create(
project_name=dataset_project, task_name=dataset_name, task_type=Task.TaskTypes.data_processing)
if bool(Session.check_min_api_server_version(Dataset.__min_api_version)):
if not Dataset.is_offline():
task = Task.create(
project_name=dataset_project, task_name=dataset_name, task_type=Task.TaskTypes.data_processing)
else:
task = Task.init(
project_name=dataset_project,
task_name=dataset_name,
task_type=Task.TaskTypes.data_processing,
reuse_last_task_id=False,
auto_connect_frameworks=False,
auto_connect_arg_parser=False,
auto_resource_monitoring=False,
auto_connect_streams=False
)
if Dataset.is_offline() or bool(Session.check_min_api_server_version(Dataset.__min_api_version)):
get_or_create_project(task.session, project_name=parent_project, system_tags=[self.__hidden_tag])
get_or_create_project(
task.session,
@ -218,25 +232,25 @@ class Dataset(object):
if dataset_tags:
task.set_tags((task.get_tags() or []) + list(dataset_tags))
task.mark_started()
# generate the script section
script = (
"from clearml import Dataset\n\n"
"ds = Dataset.create(dataset_project='{dataset_project}', dataset_name='{dataset_name}', "
"dataset_version='{dataset_version}')\n".format(
dataset_project=dataset_project, dataset_name=dataset_name, dataset_version=dataset_version
if not Dataset.is_offline():
# generate the script section
script = (
"from clearml import Dataset\n\n"
"ds = Dataset.create(dataset_project='{dataset_project}', dataset_name='{dataset_name}', "
"dataset_version='{dataset_version}')\n".format(
dataset_project=dataset_project, dataset_name=dataset_name, dataset_version=dataset_version
)
)
)
task.data.script.diff = script
task.data.script.working_dir = '.'
task.data.script.entry_point = 'register_dataset.py'
from clearml import __version__
task.data.script.requirements = {'pip': 'clearml == {}\n'.format(__version__)}
# noinspection PyProtectedMember
task._edit(script=task.data.script)
# if the task is running make sure we ping to the server so it will not be aborted by a watchdog
self._task_pinger = DevWorker()
self._task_pinger.register(task, stop_signal_support=False)
task.data.script.diff = script
task.data.script.working_dir = '.'
task.data.script.entry_point = 'register_dataset.py'
from clearml import __version__
task.data.script.requirements = {'pip': 'clearml == {}\n'.format(__version__)}
# noinspection PyProtectedMember
task._edit(script=task.data.script)
# if the task is running make sure we ping to the server so it will not be aborted by a watchdog
self._task_pinger = DevWorker()
self._task_pinger.register(task, stop_signal_support=False)
# set the newly created Dataset parent ot the current Task, so we know who created it.
if Task.current_task() and Task.current_task().id != task.id:
task.set_parent(Task.current_task())
@ -279,6 +293,7 @@ class Dataset(object):
self.__preview_video_count = 0
self.__preview_audio_count = 0
self.__preview_html_count = 0
self.__preview_json_count = 0
@property
def id(self):
@ -321,7 +336,7 @@ class Dataset(object):
@property
def name(self):
# type: () -> str
if bool(Session.check_min_api_server_version(Dataset.__min_api_version)):
if Dataset.is_offline() or bool(Session.check_min_api_server_version(Dataset.__min_api_version)):
return self._task.get_project_name().partition("/.datasets/")[-1]
return self._task.name
@ -464,8 +479,8 @@ class Dataset(object):
else:
if len(dataset_path) != len(source_url):
raise ValueError(
f"dataset_path must be a string or a list of strings with the same length as source_url"
f" (received {len(dataset_path)} paths for {len(source_url)} source urls))"
"dataset_path must be a string or a list of strings with the same length as source_url"
" (received {} paths for {} source urls))".format(len(dataset_path), len(source_url))
)
dataset_paths = dataset_path
with ThreadPoolExecutor(max_workers=max_workers) as tp:
@ -635,6 +650,9 @@ class Dataset(object):
:raise: If the upload failed (i.e. at least one zip failed to upload), raise a `ValueError`
"""
self._report_dataset_preview()
if Dataset.is_offline():
self._serialize()
return
# set output_url
if output_url:
@ -642,7 +660,11 @@ class Dataset(object):
self._task.get_logger().set_default_upload_destination(output_url)
if not max_workers:
max_workers = 1 if self._task.output_uri.startswith(tuple(cloud_driver_schemes)) else psutil.cpu_count()
max_workers = (
1
if self._task.output_uri and self._task.output_uri.startswith(tuple(cloud_driver_schemes))
else psutil.cpu_count()
)
self._task.get_logger().report_text(
"Uploading dataset files: {}".format(
@ -774,6 +796,9 @@ class Dataset(object):
:param raise_on_error: If True, raise exception if dataset finalizing failed
:param auto_upload: Automatically upload dataset if not called yet, will upload to default location.
"""
if Dataset.is_offline():
LoggerRoot.get_base_logger().warning("Cannot finalize dataset in offline mode.")
return
# check we do not have files waiting for upload.
if self._dirty:
if auto_upload:
@ -905,6 +930,8 @@ class Dataset(object):
:return: A base folder for the entire dataset
"""
assert self._id
if Dataset.is_offline():
raise ValueError("Cannot get dataset local copy in offline mode.")
if not self._task:
self._task = Task.get_task(task_id=self._id)
if not self.is_final():
@ -950,6 +977,8 @@ class Dataset(object):
:return: The target folder containing the entire dataset
"""
assert self._id
if Dataset.is_offline():
raise ValueError("Cannot get dataset local copy in offline mode.")
max_workers = max_workers or psutil.cpu_count()
target_folder = Path(target_folder).absolute()
target_folder.mkdir(parents=True, exist_ok=True)
@ -1204,7 +1233,7 @@ class Dataset(object):
:return: Newly created Dataset object
"""
if not Session.check_min_api_server_version("2.13"):
if not Dataset.is_offline() and not Session.check_min_api_server_version("2.13"):
raise NotImplementedError("Datasets are not supported with your current ClearML server version. Please update your server.")
parent_datasets = [cls.get(dataset_id=p) if not isinstance(p, Dataset) else p for p in (parent_datasets or [])]
@ -1264,7 +1293,7 @@ class Dataset(object):
if description:
instance.set_description(description)
# noinspection PyProtectedMember
if output_uri and not Task._offline_mode:
if output_uri and not Dataset.is_offline():
# noinspection PyProtectedMember
instance._task.output_uri = output_uri
# noinspection PyProtectedMember
@ -1283,20 +1312,13 @@ class Dataset(object):
instance._serialize()
# noinspection PyProtectedMember
instance._report_dataset_struct()
# noinspection PyProtectedMember
instance._task.get_logger().report_text(
"ClearML results page: {}".format(instance._task.get_output_log_web_page())
)
if bool(Session.check_min_api_server_version(cls.__min_api_version)):
instance._task.get_logger().report_text( # noqa
"ClearML dataset page: {}".format(
"{}/datasets/simple/{}/experiments/{}".format(
instance._task._get_app_server(), # noqa
instance._task.project if instance._task.project is not None else "*", # noqa
instance._task.id, # noqa
)
)
if not Dataset.is_offline():
# noinspection PyProtectedMember
instance._task.get_logger().report_text(
"ClearML results page: {}".format(instance._task.get_output_log_web_page())
)
# noinspection PyProtectedMember
instance._log_dataset_page()
# noinspection PyProtectedMember
instance._task.flush(wait_for_uploads=True)
# noinspection PyProtectedMember
@ -1499,6 +1521,8 @@ class Dataset(object):
:param dataset_project: The project the datasets to be renamed belongs to
:param dataset_name: The name of the datasets (before renaming)
"""
if Dataset.is_offline():
raise ValueError("Cannot rename dataset in offline mode")
if not bool(Session.check_min_api_server_version(cls.__min_api_version)):
LoggerRoot.get_base_logger().warning(
"Could not rename dataset because API version < {}".format(cls.__min_api_version)
@ -1544,6 +1568,8 @@ class Dataset(object):
:param dataset_project: Project of the dataset(s) to move to new project
:param dataset_name: Name of the dataset(s) to move to new project
"""
if cls.is_offline():
raise ValueError("Cannot move dataset project in offlime mode")
if not bool(Session.check_min_api_server_version(cls.__min_api_version)):
LoggerRoot.get_base_logger().warning(
"Could not move dataset to another project because API version < {}".format(cls.__min_api_version)
@ -1618,6 +1644,9 @@ class Dataset(object):
:return: Dataset object
"""
if Dataset.is_offline():
raise ValueError("Cannot get dataset in offline mode.")
system_tags = ["__$all", cls.__tag]
if not include_archived:
system_tags = ["__$all", cls.__tag, "__$not", "archived"]
@ -1801,6 +1830,9 @@ class Dataset(object):
Examples: `s3://bucket/data`, `gs://bucket/data` , `azure://bucket/data` , `/mnt/share/data`
:return: Newly created dataset object.
"""
if Dataset.is_offline():
raise ValueError("Cannot squash datasets in offline mode")
mutually_exclusive(dataset_ids=dataset_ids, dataset_project_name_pairs=dataset_project_name_pairs)
datasets = [cls.get(dataset_id=d) for d in dataset_ids] if dataset_ids else \
[cls.get(dataset_project=pair[0], dataset_name=pair[1]) for pair in dataset_project_name_pairs]
@ -1877,7 +1909,7 @@ class Dataset(object):
type=[str(Task.TaskTypes.data_processing)],
tags=tags or None,
status=["stopped", "published", "completed", "closed"] if only_completed else None,
only_fields=["created", "id", "name", "project", "tags"],
only_fields=["created", "id", "name", "project", "tags", "runtime"],
search_hidden=True,
exact_match_regex_flag=False,
_allow_extra_fields_=True,
@ -1892,6 +1924,7 @@ class Dataset(object):
"project": cls._remove_hidden_part_from_dataset_project(project_id_lookup[d.project]),
"id": d.id,
"tags": d.tags,
"version": d.runtime.get("version")
}
for d in datasets
]
@ -2028,6 +2061,10 @@ class Dataset(object):
for k, parents in self._dependency_graph.items() if k in used_dataset_versions}
# make sure we do not remove our parents, for geology sake
self._dependency_graph[self._id] = current_parents
if not Dataset.is_offline():
to_delete = [k for k in self._dependency_graph.keys() if k.startswith("offline-")]
for k in to_delete:
del self._dependency_graph[k]
def _serialize(self, update_dependency_chunk_lookup=False):
# type: (bool) -> ()
@ -2609,6 +2646,89 @@ class Dataset(object):
"""
return 'dsh{}'.format(md5text(dataset_id))
@classmethod
def is_offline(cls):
# type: () -> bool
"""
Return offline-mode state, If in offline-mode, no communication to the backend is enabled.
:return: boolean offline-mode state
"""
return Task.is_offline()
@classmethod
def set_offline(cls, offline_mode=False):
# type: (bool) -> None
"""
Set offline mode, where all data and logs are stored into local folder, for later transmission
:param offline_mode: If True, offline-mode is turned on, and no communication to the backend is enabled.
"""
Task.set_offline(offline_mode=offline_mode)
def get_offline_mode_folder(self):
# type: () -> Optional[Path]
"""
Return the folder where all the dataset data is stored in the offline session.
:return: Path object, local folder
"""
return self._task.get_offline_mode_folder()
@classmethod
def import_offline_session(cls, session_folder_zip, upload=True, finalize=False):
# type: (str, bool, bool) -> str
"""
Import an offline session of a dataset.
Includes repository details, installed packages, artifacts, logs, metric and debug samples.
:param session_folder_zip: Path to a folder containing the session, or zip-file of the session folder.
:param upload: If True, upload the dataset's data
:param finalize: If True, finalize the dataset
:return: The ID of the imported dataset
"""
id = Task.import_offline_session(session_folder_zip)
dataset = Dataset.get(dataset_id=id)
# note that there can only be one offline session in the dependency graph: our session
# noinspection PyProtectedMember
dataset._dependency_graph = {
(id if k.startswith("offline-") else k): [(id if sub_v.startswith("offline-") else sub_v) for sub_v in v]
for k, v in dataset._dependency_graph.items() # noqa
}
# noinspection PyProtectedMember
dataset._update_dependency_graph()
# noinspection PyProtectedMember
dataset._log_dataset_page()
started = False
if upload or finalize:
started = True
# noinspection PyProtectedMember
dataset._task.mark_started(force=True)
if upload:
dataset.upload()
if finalize:
dataset.finalize()
if started:
# noinspection PyProtectedMember
dataset._task.mark_completed()
return id
def _log_dataset_page(self):
if bool(Session.check_min_api_server_version(self.__min_api_version)):
self._task.get_logger().report_text(
"ClearML dataset page: {}".format(
"{}/datasets/simple/{}/experiments/{}".format(
self._task._get_app_server(),
self._task.project if self._task.project is not None else "*",
self._task.id,
)
)
)
def _build_dependency_chunk_lookup(self):
# type: () -> Dict[str, int]
"""
@ -2850,7 +2970,10 @@ class Dataset(object):
dependency_graph_ex[id_] = parents
task = Task.get_task(task_id=id_)
dataset_struct_entry = {"job_id": id_, "status": task.status}
dataset_struct_entry = {
"job_id": id_[len("offline-"):] if id_.startswith("offline-") else id_, # .removeprefix not supported < Python 3.9
"status": task.status
}
# noinspection PyProtectedMember
last_update = task._get_last_update()
if last_update:
@ -2964,7 +3087,7 @@ class Dataset(object):
except Exception:
pass
continue
if compression:
if compression or os.path.getsize(file_path) > self.__preview_media_max_file_size:
continue
guessed_type = mimetypes.guess_type(file_path)
if not guessed_type or not guessed_type[0]:
@ -2982,6 +3105,9 @@ class Dataset(object):
elif guessed_type == "text/html" and self.__preview_html_count < self.__preview_media_html_count:
self._task.get_logger().report_media("HTML", file_name, local_path=file_path)
self.__preview_html_count += 1
elif guessed_type == "application/json" and self.__preview_json_count < self.__preview_media_json_count:
self._task.get_logger().report_media("JSON", file_name, local_path=file_path, file_extension=".txt")
self.__preview_json_count += 1
@classmethod
def _set_project_system_tags(cls, task):
@ -3366,7 +3492,7 @@ class Dataset(object):
if not dataset_project:
return None, None
project_name = cls._remove_hidden_part_from_dataset_project(dataset_project)
if bool(Session.check_min_api_server_version(cls.__min_api_version)):
if Dataset.is_offline() or bool(Session.check_min_api_server_version(cls.__min_api_version)):
parent_project = "{}.datasets".format(dataset_project + "/" if dataset_project else "")
if dataset_name:
project_name = "{}/{}".format(parent_project, dataset_name)