diff --git a/clearml/backend_api/session/session.py b/clearml/backend_api/session/session.py index 898dc82b..54f2b88a 100644 --- a/clearml/backend_api/session/session.py +++ b/clearml/backend_api/session/session.py @@ -655,7 +655,7 @@ class Session(TokenManager): if session: active_sessions.append(session) new_sessions_weakrefs.append(session_weakref) - cls._sessions_weakrefs = session_weakref + cls._sessions_weakrefs = new_sessions_weakrefs return active_sessions @classmethod diff --git a/clearml/backend_interface/task/hyperparams.py b/clearml/backend_interface/task/hyperparams.py index 9d924a47..c34367a0 100644 --- a/clearml/backend_interface/task/hyperparams.py +++ b/clearml/backend_interface/task/hyperparams.py @@ -118,6 +118,13 @@ class HyperParams(object): item = make_item(i) props.update({item.name: item}) + if self.task.is_offline(): + hyperparams = self.task.data.hyperparams or {} + hyperparams.setdefault("properties", tasks.SectionParams()) + hyperparams["properties"].update(props) + self.task._save_data_to_offline_dir(hyperparams=hyperparams) + return True + res = self.task.session.send( tasks.EditHyperParamsRequest( task=self.task.task_id, diff --git a/clearml/backend_interface/task/task.py b/clearml/backend_interface/task/task.py index 942bea9c..56c61e38 100644 --- a/clearml/backend_interface/task/task.py +++ b/clearml/backend_interface/task/task.py @@ -369,7 +369,13 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin): ) res = self.send(req) - return res.response.id if res else 'offline-{}'.format(str(uuid4()).replace("-", "")) + if res: + return res.response.id + + id = "offline-{}".format(str(uuid4()).replace("-", "")) + self._edit(type=tasks.TaskTypeEnum(task_type)) + return id + def _set_storage_uri(self, value): value = value.rstrip('/') if value else None @@ -1962,6 +1968,8 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin): 'iter': iteration (default), 'timestamp': timestamp as milliseconds since epoch, 'iso_time': absolute time :return: dict: Nested scalar graphs: dict[title(str), dict[series(str), dict[axis(str), list(float)]]] """ + scalar_metrics_iter_histogram_request_max_size = 4800 + if x_axis not in ('iter', 'timestamp', 'iso_time'): raise ValueError("Scalar x-axis supported values are: 'iter', 'timestamp', 'iso_time'") @@ -1978,8 +1986,51 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin): if not response.ok() or not response.response_data: return {} + metrics_returned = 0 + for metric in response.response_data.values(): + for series in metric.values(): + metrics_returned += len(series.get("x", [])) + if metrics_returned >= scalar_metrics_iter_histogram_request_max_size: + return self._get_all_reported_scalars(x_axis) + return response.response_data + def _get_all_reported_scalars(self, x_axis): + reported_scalars = {} + batch_size = 1000 + scroll_id = None + while True: + response = self.send( + events.GetTaskEventsRequest( + task=self.id, event_type="training_stats_scalar", scroll_id=scroll_id, batch_size=batch_size + ) + ) + if not response: + return reported_scalars + response = response.wait() + if not response.ok() or not response.response_data: + return reported_scalars + response = response.response_data + for event in response.get("events", []): + metric = event["metric"] + variant = event["variant"] + if x_axis in ["timestamp", "iter"]: + x_val = event[x_axis] + else: + x_val = datetime.utcfromtimestamp(event["timestamp"] / 1000).isoformat(timespec="milliseconds") + "Z" + y_val = event["value"] + reported_scalars.setdefault(metric, {}) + reported_scalars[metric].setdefault(variant, {"name": variant, "x": [], "y": []}) + if len(reported_scalars[metric][variant]["x"]) == 0 or reported_scalars[metric][variant]["x"][-1] != x_val: + reported_scalars[metric][variant]["x"].append(x_val) + reported_scalars[metric][variant]["y"].append(y_val) + else: + reported_scalars[metric][variant]["y"][-1] = y_val + if response.get("returned", 0) < batch_size or not response.get("scroll_id"): + break + scroll_id = response["scroll_id"] + return reported_scalars + def get_reported_plots( self, max_iterations=None @@ -2459,19 +2510,26 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin): """ return running_remotely() and get_remote_task_id() == self.id + def _save_data_to_offline_dir(self, **kwargs): + # type: (**Any) -> () + for k, v in kwargs.items(): + setattr(self.data, k, v) + offline_mode_folder = self.get_offline_mode_folder() + if not offline_mode_folder: + return + Path(offline_mode_folder).mkdir(parents=True, exist_ok=True) + with open((offline_mode_folder / self._offline_filename).as_posix(), "wt") as f: + export_data = self.data.to_dict() + export_data["project_name"] = self.get_project_name() + export_data["offline_folder"] = self.get_offline_mode_folder().as_posix() + export_data["offline_output_models"] = self._offline_output_models + json.dump(export_data, f, ensure_ascii=True, sort_keys=True) + def _edit(self, **kwargs): # type: (**Any) -> Any with self._edit_lock: if self._offline_mode: - for k, v in kwargs.items(): - setattr(self.data, k, v) - Path(self.get_offline_mode_folder()).mkdir(parents=True, exist_ok=True) - with open((self.get_offline_mode_folder() / self._offline_filename).as_posix(), "wt") as f: - export_data = self.data.to_dict() - export_data["project_name"] = self.get_project_name() - export_data["offline_folder"] = self.get_offline_mode_folder().as_posix() - export_data["offline_output_models"] = self._offline_output_models - json.dump(export_data, f, ensure_ascii=True, sort_keys=True) + self._save_data_to_offline_dir(**kwargs) return None # Since we ae using forced update, make sure he task status is valid @@ -2593,6 +2651,8 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin): Return the folder where all the task outputs and logs are stored in the offline session. :return: Path object, local folder, later to be used with `report_offline_session()` """ + if not self.task_id: + return None if self._offline_dir: return self._offline_dir if not self._offline_mode: diff --git a/clearml/datasets/dataset.py b/clearml/datasets/dataset.py index 93f29e16..fa010d29 100644 --- a/clearml/datasets/dataset.py +++ b/clearml/datasets/dataset.py @@ -122,12 +122,14 @@ class Dataset(object): __hyperparams_section = "Datasets" __datasets_runtime_prop = "datasets" __orig_datasets_runtime_prop_prefix = "orig_datasets" + __preview_media_max_file_size = deferred_config("dataset.preview.media.max_file_size", 5 * 1024 * 1024, transform=int) __preview_tabular_table_count = deferred_config("dataset.preview.tabular.table_count", 10, transform=int) __preview_tabular_row_count = deferred_config("dataset.preview.tabular.row_count", 10, transform=int) __preview_media_image_count = deferred_config("dataset.preview.media.image_count", 10, transform=int) __preview_media_video_count = deferred_config("dataset.preview.media.video_count", 10, transform=int) __preview_media_audio_count = deferred_config("dataset.preview.media.audio_count", 10, transform=int) __preview_media_html_count = deferred_config("dataset.preview.media.html_count", 10, transform=int) + __preview_media_json_count = deferred_config("dataset.preview.media.json_count", 10, transform=int) _dataset_chunk_size_mb = deferred_config("storage.dataset_chunk_size_mb", 512, transform=int) def __init__( @@ -191,7 +193,7 @@ class Dataset(object): if "/.datasets/" not in task.get_project_name() or "": dataset_project, parent_project = self._build_hidden_project_name(task.get_project_name(), task.name) task.move_to_project(new_project_name=dataset_project) - if bool(Session.check_min_api_server_version(Dataset.__min_api_version)): + if Dataset.is_offline() or bool(Session.check_min_api_server_version(Dataset.__min_api_version)): get_or_create_project(task.session, project_name=parent_project, system_tags=[self.__hidden_tag]) get_or_create_project( task.session, @@ -202,9 +204,21 @@ class Dataset(object): else: self._created_task = True dataset_project, parent_project = self._build_hidden_project_name(dataset_project, dataset_name) - task = Task.create( - project_name=dataset_project, task_name=dataset_name, task_type=Task.TaskTypes.data_processing) - if bool(Session.check_min_api_server_version(Dataset.__min_api_version)): + if not Dataset.is_offline(): + task = Task.create( + project_name=dataset_project, task_name=dataset_name, task_type=Task.TaskTypes.data_processing) + else: + task = Task.init( + project_name=dataset_project, + task_name=dataset_name, + task_type=Task.TaskTypes.data_processing, + reuse_last_task_id=False, + auto_connect_frameworks=False, + auto_connect_arg_parser=False, + auto_resource_monitoring=False, + auto_connect_streams=False + ) + if Dataset.is_offline() or bool(Session.check_min_api_server_version(Dataset.__min_api_version)): get_or_create_project(task.session, project_name=parent_project, system_tags=[self.__hidden_tag]) get_or_create_project( task.session, @@ -218,25 +232,25 @@ class Dataset(object): if dataset_tags: task.set_tags((task.get_tags() or []) + list(dataset_tags)) task.mark_started() - # generate the script section - script = ( - "from clearml import Dataset\n\n" - "ds = Dataset.create(dataset_project='{dataset_project}', dataset_name='{dataset_name}', " - "dataset_version='{dataset_version}')\n".format( - dataset_project=dataset_project, dataset_name=dataset_name, dataset_version=dataset_version + if not Dataset.is_offline(): + # generate the script section + script = ( + "from clearml import Dataset\n\n" + "ds = Dataset.create(dataset_project='{dataset_project}', dataset_name='{dataset_name}', " + "dataset_version='{dataset_version}')\n".format( + dataset_project=dataset_project, dataset_name=dataset_name, dataset_version=dataset_version + ) ) - ) - task.data.script.diff = script - task.data.script.working_dir = '.' - task.data.script.entry_point = 'register_dataset.py' - from clearml import __version__ - task.data.script.requirements = {'pip': 'clearml == {}\n'.format(__version__)} - # noinspection PyProtectedMember - task._edit(script=task.data.script) - - # if the task is running make sure we ping to the server so it will not be aborted by a watchdog - self._task_pinger = DevWorker() - self._task_pinger.register(task, stop_signal_support=False) + task.data.script.diff = script + task.data.script.working_dir = '.' + task.data.script.entry_point = 'register_dataset.py' + from clearml import __version__ + task.data.script.requirements = {'pip': 'clearml == {}\n'.format(__version__)} + # noinspection PyProtectedMember + task._edit(script=task.data.script) + # if the task is running make sure we ping to the server so it will not be aborted by a watchdog + self._task_pinger = DevWorker() + self._task_pinger.register(task, stop_signal_support=False) # set the newly created Dataset parent ot the current Task, so we know who created it. if Task.current_task() and Task.current_task().id != task.id: task.set_parent(Task.current_task()) @@ -279,6 +293,7 @@ class Dataset(object): self.__preview_video_count = 0 self.__preview_audio_count = 0 self.__preview_html_count = 0 + self.__preview_json_count = 0 @property def id(self): @@ -321,7 +336,7 @@ class Dataset(object): @property def name(self): # type: () -> str - if bool(Session.check_min_api_server_version(Dataset.__min_api_version)): + if Dataset.is_offline() or bool(Session.check_min_api_server_version(Dataset.__min_api_version)): return self._task.get_project_name().partition("/.datasets/")[-1] return self._task.name @@ -464,8 +479,8 @@ class Dataset(object): else: if len(dataset_path) != len(source_url): raise ValueError( - f"dataset_path must be a string or a list of strings with the same length as source_url" - f" (received {len(dataset_path)} paths for {len(source_url)} source urls))" + "dataset_path must be a string or a list of strings with the same length as source_url" + " (received {} paths for {} source urls))".format(len(dataset_path), len(source_url)) ) dataset_paths = dataset_path with ThreadPoolExecutor(max_workers=max_workers) as tp: @@ -635,6 +650,9 @@ class Dataset(object): :raise: If the upload failed (i.e. at least one zip failed to upload), raise a `ValueError` """ self._report_dataset_preview() + if Dataset.is_offline(): + self._serialize() + return # set output_url if output_url: @@ -642,7 +660,11 @@ class Dataset(object): self._task.get_logger().set_default_upload_destination(output_url) if not max_workers: - max_workers = 1 if self._task.output_uri.startswith(tuple(cloud_driver_schemes)) else psutil.cpu_count() + max_workers = ( + 1 + if self._task.output_uri and self._task.output_uri.startswith(tuple(cloud_driver_schemes)) + else psutil.cpu_count() + ) self._task.get_logger().report_text( "Uploading dataset files: {}".format( @@ -774,6 +796,9 @@ class Dataset(object): :param raise_on_error: If True, raise exception if dataset finalizing failed :param auto_upload: Automatically upload dataset if not called yet, will upload to default location. """ + if Dataset.is_offline(): + LoggerRoot.get_base_logger().warning("Cannot finalize dataset in offline mode.") + return # check we do not have files waiting for upload. if self._dirty: if auto_upload: @@ -905,6 +930,8 @@ class Dataset(object): :return: A base folder for the entire dataset """ assert self._id + if Dataset.is_offline(): + raise ValueError("Cannot get dataset local copy in offline mode.") if not self._task: self._task = Task.get_task(task_id=self._id) if not self.is_final(): @@ -950,6 +977,8 @@ class Dataset(object): :return: The target folder containing the entire dataset """ assert self._id + if Dataset.is_offline(): + raise ValueError("Cannot get dataset local copy in offline mode.") max_workers = max_workers or psutil.cpu_count() target_folder = Path(target_folder).absolute() target_folder.mkdir(parents=True, exist_ok=True) @@ -1204,7 +1233,7 @@ class Dataset(object): :return: Newly created Dataset object """ - if not Session.check_min_api_server_version("2.13"): + if not Dataset.is_offline() and not Session.check_min_api_server_version("2.13"): raise NotImplementedError("Datasets are not supported with your current ClearML server version. Please update your server.") parent_datasets = [cls.get(dataset_id=p) if not isinstance(p, Dataset) else p for p in (parent_datasets or [])] @@ -1264,7 +1293,7 @@ class Dataset(object): if description: instance.set_description(description) # noinspection PyProtectedMember - if output_uri and not Task._offline_mode: + if output_uri and not Dataset.is_offline(): # noinspection PyProtectedMember instance._task.output_uri = output_uri # noinspection PyProtectedMember @@ -1283,20 +1312,13 @@ class Dataset(object): instance._serialize() # noinspection PyProtectedMember instance._report_dataset_struct() - # noinspection PyProtectedMember - instance._task.get_logger().report_text( - "ClearML results page: {}".format(instance._task.get_output_log_web_page()) - ) - if bool(Session.check_min_api_server_version(cls.__min_api_version)): - instance._task.get_logger().report_text( # noqa - "ClearML dataset page: {}".format( - "{}/datasets/simple/{}/experiments/{}".format( - instance._task._get_app_server(), # noqa - instance._task.project if instance._task.project is not None else "*", # noqa - instance._task.id, # noqa - ) - ) + if not Dataset.is_offline(): + # noinspection PyProtectedMember + instance._task.get_logger().report_text( + "ClearML results page: {}".format(instance._task.get_output_log_web_page()) ) + # noinspection PyProtectedMember + instance._log_dataset_page() # noinspection PyProtectedMember instance._task.flush(wait_for_uploads=True) # noinspection PyProtectedMember @@ -1499,6 +1521,8 @@ class Dataset(object): :param dataset_project: The project the datasets to be renamed belongs to :param dataset_name: The name of the datasets (before renaming) """ + if Dataset.is_offline(): + raise ValueError("Cannot rename dataset in offline mode") if not bool(Session.check_min_api_server_version(cls.__min_api_version)): LoggerRoot.get_base_logger().warning( "Could not rename dataset because API version < {}".format(cls.__min_api_version) @@ -1544,6 +1568,8 @@ class Dataset(object): :param dataset_project: Project of the dataset(s) to move to new project :param dataset_name: Name of the dataset(s) to move to new project """ + if cls.is_offline(): + raise ValueError("Cannot move dataset project in offlime mode") if not bool(Session.check_min_api_server_version(cls.__min_api_version)): LoggerRoot.get_base_logger().warning( "Could not move dataset to another project because API version < {}".format(cls.__min_api_version) @@ -1618,6 +1644,9 @@ class Dataset(object): :return: Dataset object """ + if Dataset.is_offline(): + raise ValueError("Cannot get dataset in offline mode.") + system_tags = ["__$all", cls.__tag] if not include_archived: system_tags = ["__$all", cls.__tag, "__$not", "archived"] @@ -1801,6 +1830,9 @@ class Dataset(object): Examples: `s3://bucket/data`, `gs://bucket/data` , `azure://bucket/data` , `/mnt/share/data` :return: Newly created dataset object. """ + if Dataset.is_offline(): + raise ValueError("Cannot squash datasets in offline mode") + mutually_exclusive(dataset_ids=dataset_ids, dataset_project_name_pairs=dataset_project_name_pairs) datasets = [cls.get(dataset_id=d) for d in dataset_ids] if dataset_ids else \ [cls.get(dataset_project=pair[0], dataset_name=pair[1]) for pair in dataset_project_name_pairs] @@ -1877,7 +1909,7 @@ class Dataset(object): type=[str(Task.TaskTypes.data_processing)], tags=tags or None, status=["stopped", "published", "completed", "closed"] if only_completed else None, - only_fields=["created", "id", "name", "project", "tags"], + only_fields=["created", "id", "name", "project", "tags", "runtime"], search_hidden=True, exact_match_regex_flag=False, _allow_extra_fields_=True, @@ -1892,6 +1924,7 @@ class Dataset(object): "project": cls._remove_hidden_part_from_dataset_project(project_id_lookup[d.project]), "id": d.id, "tags": d.tags, + "version": d.runtime.get("version") } for d in datasets ] @@ -2028,6 +2061,10 @@ class Dataset(object): for k, parents in self._dependency_graph.items() if k in used_dataset_versions} # make sure we do not remove our parents, for geology sake self._dependency_graph[self._id] = current_parents + if not Dataset.is_offline(): + to_delete = [k for k in self._dependency_graph.keys() if k.startswith("offline-")] + for k in to_delete: + del self._dependency_graph[k] def _serialize(self, update_dependency_chunk_lookup=False): # type: (bool) -> () @@ -2609,6 +2646,89 @@ class Dataset(object): """ return 'dsh{}'.format(md5text(dataset_id)) + @classmethod + def is_offline(cls): + # type: () -> bool + """ + Return offline-mode state, If in offline-mode, no communication to the backend is enabled. + + :return: boolean offline-mode state + """ + return Task.is_offline() + + @classmethod + def set_offline(cls, offline_mode=False): + # type: (bool) -> None + """ + Set offline mode, where all data and logs are stored into local folder, for later transmission + + :param offline_mode: If True, offline-mode is turned on, and no communication to the backend is enabled. + """ + Task.set_offline(offline_mode=offline_mode) + + def get_offline_mode_folder(self): + # type: () -> Optional[Path] + """ + Return the folder where all the dataset data is stored in the offline session. + + :return: Path object, local folder + """ + return self._task.get_offline_mode_folder() + + @classmethod + def import_offline_session(cls, session_folder_zip, upload=True, finalize=False): + # type: (str, bool, bool) -> str + """ + Import an offline session of a dataset. + Includes repository details, installed packages, artifacts, logs, metric and debug samples. + + :param session_folder_zip: Path to a folder containing the session, or zip-file of the session folder. + :param upload: If True, upload the dataset's data + :param finalize: If True, finalize the dataset + + :return: The ID of the imported dataset + """ + id = Task.import_offline_session(session_folder_zip) + dataset = Dataset.get(dataset_id=id) + # note that there can only be one offline session in the dependency graph: our session + # noinspection PyProtectedMember + dataset._dependency_graph = { + (id if k.startswith("offline-") else k): [(id if sub_v.startswith("offline-") else sub_v) for sub_v in v] + for k, v in dataset._dependency_graph.items() # noqa + } + # noinspection PyProtectedMember + dataset._update_dependency_graph() + # noinspection PyProtectedMember + dataset._log_dataset_page() + + started = False + if upload or finalize: + started = True + # noinspection PyProtectedMember + dataset._task.mark_started(force=True) + + if upload: + dataset.upload() + if finalize: + dataset.finalize() + + if started: + # noinspection PyProtectedMember + dataset._task.mark_completed() + + return id + + def _log_dataset_page(self): + if bool(Session.check_min_api_server_version(self.__min_api_version)): + self._task.get_logger().report_text( + "ClearML dataset page: {}".format( + "{}/datasets/simple/{}/experiments/{}".format( + self._task._get_app_server(), + self._task.project if self._task.project is not None else "*", + self._task.id, + ) + ) + ) def _build_dependency_chunk_lookup(self): # type: () -> Dict[str, int] """ @@ -2850,7 +2970,10 @@ class Dataset(object): dependency_graph_ex[id_] = parents task = Task.get_task(task_id=id_) - dataset_struct_entry = {"job_id": id_, "status": task.status} + dataset_struct_entry = { + "job_id": id_[len("offline-"):] if id_.startswith("offline-") else id_, # .removeprefix not supported < Python 3.9 + "status": task.status + } # noinspection PyProtectedMember last_update = task._get_last_update() if last_update: @@ -2964,7 +3087,7 @@ class Dataset(object): except Exception: pass continue - if compression: + if compression or os.path.getsize(file_path) > self.__preview_media_max_file_size: continue guessed_type = mimetypes.guess_type(file_path) if not guessed_type or not guessed_type[0]: @@ -2982,6 +3105,9 @@ class Dataset(object): elif guessed_type == "text/html" and self.__preview_html_count < self.__preview_media_html_count: self._task.get_logger().report_media("HTML", file_name, local_path=file_path) self.__preview_html_count += 1 + elif guessed_type == "application/json" and self.__preview_json_count < self.__preview_media_json_count: + self._task.get_logger().report_media("JSON", file_name, local_path=file_path, file_extension=".txt") + self.__preview_json_count += 1 @classmethod def _set_project_system_tags(cls, task): @@ -3366,7 +3492,7 @@ class Dataset(object): if not dataset_project: return None, None project_name = cls._remove_hidden_part_from_dataset_project(dataset_project) - if bool(Session.check_min_api_server_version(cls.__min_api_version)): + if Dataset.is_offline() or bool(Session.check_min_api_server_version(cls.__min_api_version)): parent_project = "{}.datasets".format(dataset_project + "/" if dataset_project else "") if dataset_name: project_name = "{}/{}".format(parent_project, dataset_name)