From d26ce48dbe4dd349b5b3bca13beaaf407b182d07 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Fri, 4 Aug 2023 19:08:51 +0300 Subject: [PATCH] Improve artifacts serialization: should now be consistent when serializing pandas object into gzip stream (csv.gz) --- clearml/binding/artifacts.py | 134 ++++++++++++++++++++++++++--------- 1 file changed, 101 insertions(+), 33 deletions(-) diff --git a/clearml/binding/artifacts.py b/clearml/binding/artifacts.py index d4794b54..93062ebf 100644 --- a/clearml/binding/artifacts.py +++ b/clearml/binding/artifacts.py @@ -1,3 +1,4 @@ +import gzip import json import yaml import mimetypes @@ -38,7 +39,7 @@ try: except ImportError: np = None try: - from pathlib import Path as pathlib_Path + from pathlib import Path as pathlib_Path # noqa except ImportError: pathlib_Path = None @@ -321,6 +322,7 @@ class Artifacts(object): self._storage_prefix = None self._task_name = None self._project_name = None + self._temp_files_lookup = {} def register_artifact(self, name, artifact, metadata=None, uniqueness_columns=True): # type: (str, DataFrame, Optional[dict], Union[bool, Sequence[str]]) -> () @@ -428,15 +430,15 @@ class Artifacts(object): artifact_type_data.preview = "" override_filename_ext_in_uri = extension_name or "" override_filename_in_uri = name + override_filename_ext_in_uri - fd, local_filename = mkstemp(prefix=quote(name, safe="") + ".", suffix=override_filename_ext_in_uri) - os.close(fd) + local_filename = self._push_temp_file( + prefix=quote(name, safe="") + ".", suffix=override_filename_ext_in_uri) # noinspection PyBroadException try: with open(local_filename, "wb") as f: f.write(serialization_function(artifact_object)) except Exception: # cleanup and raise exception - os.unlink(local_filename) + self._delete_temp_file(local_filename) raise artifact_type_data.content_type = mimetypes.guess_type(local_filename)[0] elif extension_name == ".pkl": @@ -448,8 +450,8 @@ class Artifacts(object): extension_name, [".npz", ".csv.gz"], ".npz", artifact_type ) override_filename_in_uri = name + override_filename_ext_in_uri - fd, local_filename = mkstemp(prefix=quote(name, safe="") + '.', suffix=override_filename_ext_in_uri) - os.close(fd) + local_filename = self._push_temp_file( + prefix=quote(name, safe="") + '.', suffix=override_filename_ext_in_uri) if override_filename_ext_in_uri == ".npz": artifact_type_data.content_type = "application/numpy" np.savez_compressed(local_filename, **{name: artifact_object}) @@ -464,11 +466,10 @@ class Artifacts(object): extension_name, [".csv.gz", ".parquet", ".feather", ".pickle"], ".csv.gz", artifact_type ) override_filename_in_uri = name - fd, local_filename = mkstemp(prefix=quote(name, safe="") + '.', suffix=override_filename_ext_in_uri) - os.close(fd) + local_filename = self._push_temp_file(prefix=quote(name, safe="") + '.', suffix=override_filename_ext_in_uri) if override_filename_ext_in_uri == ".csv.gz": artifact_type_data.content_type = "text/csv" - artifact_object.to_csv(local_filename, compression=self._compression) + self._store_compressed_pd_csv(artifact_object, local_filename) elif override_filename_ext_in_uri == ".parquet": try: artifact_type_data.content_type = "application/parquet" @@ -480,7 +481,7 @@ class Artifacts(object): ) ) artifact_type_data.content_type = "text/csv" - artifact_object.to_csv(local_filename, compression=self._compression) + self._store_compressed_pd_csv(artifact_object, local_filename) elif override_filename_ext_in_uri == ".feather": try: artifact_type_data.content_type = "application/feather" @@ -492,7 +493,7 @@ class Artifacts(object): ) ) artifact_type_data.content_type = "text/csv" - artifact_object.to_csv(local_filename, compression=self._compression) + self._store_compressed_pd_csv(artifact_object, local_filename) elif override_filename_ext_in_uri == ".pickle": artifact_type_data.content_type = "application/pickle" artifact_object.to_pickle(local_filename) @@ -527,8 +528,8 @@ class Artifacts(object): if guessed_type: artifact_type_data.content_type = guessed_type - fd, local_filename = mkstemp(prefix=quote(name, safe="") + '.', suffix=override_filename_ext_in_uri) - os.close(fd) + local_filename = self._push_temp_file( + prefix=quote(name, safe="") + '.', suffix=override_filename_ext_in_uri) artifact_object.save(local_filename) delete_after_upload = True elif isinstance(artifact_object, dict): @@ -561,8 +562,9 @@ class Artifacts(object): if serialized_text is not None: override_filename_in_uri = name + override_filename_ext_in_uri - fd, local_filename = mkstemp(prefix=quote(name, safe="") + ".", suffix=override_filename_ext_in_uri) - with open(fd, "w") as f: + local_filename = self._push_temp_file( + prefix=quote(name, safe="") + ".", suffix=override_filename_ext_in_uri) + with open(local_filename, "w") as f: f.write(serialized_text) preview = preview or serialized_text if len(preview) < self.max_preview_size_bytes: @@ -599,7 +601,7 @@ class Artifacts(object): files = list(Path(folder).rglob(wildcard)) override_filename_ext_in_uri = '.zip' override_filename_in_uri = folder.parts[-1] + override_filename_ext_in_uri - fd, zip_file = mkstemp( + zip_file = self._push_temp_file( prefix=quote(folder.parts[-1], safe="") + '.', suffix=override_filename_ext_in_uri ) try: @@ -618,8 +620,7 @@ class Artifacts(object): LoggerRoot.get_base_logger().warning('Exception {}\nFailed zipping artifact folder {}'.format( folder, e)) return False - finally: - os.close(fd) + artifact_type_data.preview = preview or archive_preview artifact_object = zip_file artifact_type = 'archive' @@ -647,7 +648,7 @@ class Artifacts(object): override_filename_ext_in_uri = '.zip' override_filename_in_uri = quote(name, safe="") + override_filename_ext_in_uri common_path = get_common_path(list_files) - fd, zip_file = mkstemp( + zip_file = self._push_temp_file( prefix='artifact_folder.', suffix=override_filename_ext_in_uri ) try: @@ -670,8 +671,7 @@ class Artifacts(object): LoggerRoot.get_base_logger().warning('Exception {}\nFailed zipping artifact files {}'.format( artifact_object, e)) return False - finally: - os.close(fd) + artifact_type_data.preview = preview or archive_preview artifact_object = zip_file artifact_type = 'archive' @@ -704,15 +704,15 @@ class Artifacts(object): delete_after_upload = True override_filename_ext_in_uri = ".txt" override_filename_in_uri = name + override_filename_ext_in_uri - fd, local_filename = mkstemp(prefix=quote(name, safe="") + ".", suffix=override_filename_ext_in_uri) - os.close(fd) + local_filename = self._push_temp_file( + prefix=quote(name, safe="") + ".", suffix=override_filename_ext_in_uri) # noinspection PyBroadException try: with open(local_filename, "wt") as f: f.write(artifact_object) except Exception: # cleanup and raise exception - os.unlink(local_filename) + self._delete_temp_file(local_filename) raise elif artifact_object is None or (isinstance(artifact_object, str) and artifact_object == ""): artifact_type = '' @@ -736,15 +736,15 @@ class Artifacts(object): delete_after_upload = True override_filename_ext_in_uri = '.pkl' override_filename_in_uri = name + override_filename_ext_in_uri - fd, local_filename = mkstemp(prefix=quote(name, safe="") + '.', suffix=override_filename_ext_in_uri) - os.close(fd) + local_filename = self._push_temp_file( + prefix=quote(name, safe="") + '.', suffix=override_filename_ext_in_uri) # noinspection PyBroadException try: with open(local_filename, 'wb') as f: pickle.dump(artifact_object, f) except Exception: # cleanup and raise exception - os.unlink(local_filename) + self._delete_temp_file(local_filename) raise # verify preview not out of scope: @@ -875,10 +875,10 @@ class Artifacts(object): override_filename_ext_in_uri = self._save_format override_filename_in_uri = name - fd, local_csv = mkstemp(prefix=quote(name, safe="") + '.', suffix=override_filename_ext_in_uri) - os.close(fd) + local_csv = self._push_temp_file( + prefix=quote(name, safe="") + '.', suffix=override_filename_ext_in_uri) local_csv = Path(local_csv) - pd_artifact.to_csv(local_csv.as_posix(), index=False, compression=self._compression) + self._store_compressed_pd_csv(pd_artifact, local_csv.as_posix(), index=False) current_sha2, file_sha2 = sha256sum( local_csv.as_posix(), skip_header=32, block_size=Artifacts._hash_block_size) if name in self._last_artifacts_upload: @@ -887,7 +887,7 @@ class Artifacts(object): # nothing to do, we can skip the upload # noinspection PyBroadException try: - local_csv.unlink() + self._delete_temp_file(local_csv) except Exception: pass return @@ -944,6 +944,8 @@ class Artifacts(object): """ from clearml.storage import StorageManager + local_file = self._pop_temp_file(local_file) + upload_uri = self._task.output_uri or self._task.get_logger().get_default_upload_destination() if not isinstance(local_file, Path): local_file = Path(local_file) @@ -962,7 +964,7 @@ class Artifacts(object): StorageManager.upload_file(local_file.as_posix(), uri, wait_for_upload=True, retries=ev.retries) if delete_after_upload: try: - os.unlink(local_file.as_posix()) + self._delete_temp_file(local_file) except OSError: LoggerRoot.get_base_logger().warning('Failed removing temporary {}'.format(local_file)) else: @@ -1047,9 +1049,75 @@ class Artifacts(object): def _get_storage_uri_prefix(self): # type: () -> str - if not self._storage_prefix or self._task_name != self._task.name or self._project_name != self._task.get_project_name(): + if not self._storage_prefix or self._task_name != self._task.name or \ + self._project_name != self._task.get_project_name(): # noinspection PyProtectedMember self._storage_prefix = self._task._get_output_destination_suffix() self._task_name = self._task.name self._project_name = self._task.get_project_name() return self._storage_prefix + + def _store_compressed_pd_csv(self, artifact_object, local_filename, **kwargs): + # bugfix: to make pandas csv.gz consistent file hash we must pass mtime=0 + # (otherwise it is encoded and creates new hash every time) + if self._compression == "gzip": + with gzip.GzipFile(local_filename, 'wb', mtime=0) as gzip_file: + artifact_object.to_csv(gzip_file, **kwargs) + else: + artifact_object.to_csv(local_filename, compression=self._compression) + + def _push_temp_file(self, prefix=None, suffix=None): + """ + Same prefix/suffix as mkstemp uses + :param prefix: Same prefix/suffix as mkstemp uses + :param suffix: Same prefix/suffix as mkstemp uses + :return: consistent temp file inside a single folder that later we rename to a temp file + """ + # we want to make sure our temp naming convention is consistent + # this is important for hashing zip files and gz files, because the name of the internal + # file becomes part of the content and then hash changes + + # temp filename is based on the assumption + + # put a consistent the file into a temp folder because the filename is part of + # the compressed artifact and we want consistency. After that we rename compressed file to temp file and + # delete temp folder + temp_folder = mkdtemp(prefix='artifacts_') + local_filename = Path(temp_folder) / (str(prefix).rstrip(".") + "." + str(suffix).lstrip(".")) + local_filename = local_filename.as_posix() + self._temp_files_lookup[local_filename] = (temp_folder, deepcopy(prefix), deepcopy(suffix)) + return local_filename + + def _pop_temp_file(self, local_filename=None): + """ + Now we need to move the consistent file from the temp folder to the main temp folder, + give it a new temp name, and remove the temp folder + + :param local_filename: local file name inside a temp folder, assumed to be a single file in the temp folder + :return: new temp file inside the main temp folder + """ + # convert to posix if Path + if isinstance(local_filename, Path): + local_filename = local_filename.as_posix() + + # if this is not our temp file, just do nothing + if local_filename not in self._temp_files_lookup: + return local_filename + + # move file out of temp folder + try: + temp_folder, prefix, suffix = self._temp_files_lookup.pop(local_filename) + fd, temp_filename = mkstemp(prefix=prefix, suffix=suffix) + os.close(fd) + os.replace(local_filename, temp_filename) + local_filename = temp_filename + os.rmdir(temp_folder) + except Exception as ex: + raise ValueError("Failed storing temp artifact into {}: error: {}".format(local_filename, ex)) + + return temp_filename + + def _delete_temp_file(self, local_filename): + # cleanup and raise exception + local_filename = self._pop_temp_file(local_filename) + os.unlink(local_filename)