Improve artifacts serialization: should now be consistent when serializing pandas object into gzip stream (csv.gz)

This commit is contained in:
allegroai 2023-08-04 19:08:51 +03:00
parent 197894735b
commit d26ce48dbe

View File

@ -1,3 +1,4 @@
import gzip
import json import json
import yaml import yaml
import mimetypes import mimetypes
@ -38,7 +39,7 @@ try:
except ImportError: except ImportError:
np = None np = None
try: try:
from pathlib import Path as pathlib_Path from pathlib import Path as pathlib_Path # noqa
except ImportError: except ImportError:
pathlib_Path = None pathlib_Path = None
@ -321,6 +322,7 @@ class Artifacts(object):
self._storage_prefix = None self._storage_prefix = None
self._task_name = None self._task_name = None
self._project_name = None self._project_name = None
self._temp_files_lookup = {}
def register_artifact(self, name, artifact, metadata=None, uniqueness_columns=True): def register_artifact(self, name, artifact, metadata=None, uniqueness_columns=True):
# type: (str, DataFrame, Optional[dict], Union[bool, Sequence[str]]) -> () # type: (str, DataFrame, Optional[dict], Union[bool, Sequence[str]]) -> ()
@ -428,15 +430,15 @@ class Artifacts(object):
artifact_type_data.preview = "" artifact_type_data.preview = ""
override_filename_ext_in_uri = extension_name or "" override_filename_ext_in_uri = extension_name or ""
override_filename_in_uri = name + override_filename_ext_in_uri override_filename_in_uri = name + override_filename_ext_in_uri
fd, local_filename = mkstemp(prefix=quote(name, safe="") + ".", suffix=override_filename_ext_in_uri) local_filename = self._push_temp_file(
os.close(fd) prefix=quote(name, safe="") + ".", suffix=override_filename_ext_in_uri)
# noinspection PyBroadException # noinspection PyBroadException
try: try:
with open(local_filename, "wb") as f: with open(local_filename, "wb") as f:
f.write(serialization_function(artifact_object)) f.write(serialization_function(artifact_object))
except Exception: except Exception:
# cleanup and raise exception # cleanup and raise exception
os.unlink(local_filename) self._delete_temp_file(local_filename)
raise raise
artifact_type_data.content_type = mimetypes.guess_type(local_filename)[0] artifact_type_data.content_type = mimetypes.guess_type(local_filename)[0]
elif extension_name == ".pkl": elif extension_name == ".pkl":
@ -448,8 +450,8 @@ class Artifacts(object):
extension_name, [".npz", ".csv.gz"], ".npz", artifact_type extension_name, [".npz", ".csv.gz"], ".npz", artifact_type
) )
override_filename_in_uri = name + override_filename_ext_in_uri override_filename_in_uri = name + override_filename_ext_in_uri
fd, local_filename = mkstemp(prefix=quote(name, safe="") + '.', suffix=override_filename_ext_in_uri) local_filename = self._push_temp_file(
os.close(fd) prefix=quote(name, safe="") + '.', suffix=override_filename_ext_in_uri)
if override_filename_ext_in_uri == ".npz": if override_filename_ext_in_uri == ".npz":
artifact_type_data.content_type = "application/numpy" artifact_type_data.content_type = "application/numpy"
np.savez_compressed(local_filename, **{name: artifact_object}) np.savez_compressed(local_filename, **{name: artifact_object})
@ -464,11 +466,10 @@ class Artifacts(object):
extension_name, [".csv.gz", ".parquet", ".feather", ".pickle"], ".csv.gz", artifact_type extension_name, [".csv.gz", ".parquet", ".feather", ".pickle"], ".csv.gz", artifact_type
) )
override_filename_in_uri = name override_filename_in_uri = name
fd, local_filename = mkstemp(prefix=quote(name, safe="") + '.', suffix=override_filename_ext_in_uri) local_filename = self._push_temp_file(prefix=quote(name, safe="") + '.', suffix=override_filename_ext_in_uri)
os.close(fd)
if override_filename_ext_in_uri == ".csv.gz": if override_filename_ext_in_uri == ".csv.gz":
artifact_type_data.content_type = "text/csv" artifact_type_data.content_type = "text/csv"
artifact_object.to_csv(local_filename, compression=self._compression) self._store_compressed_pd_csv(artifact_object, local_filename)
elif override_filename_ext_in_uri == ".parquet": elif override_filename_ext_in_uri == ".parquet":
try: try:
artifact_type_data.content_type = "application/parquet" artifact_type_data.content_type = "application/parquet"
@ -480,7 +481,7 @@ class Artifacts(object):
) )
) )
artifact_type_data.content_type = "text/csv" artifact_type_data.content_type = "text/csv"
artifact_object.to_csv(local_filename, compression=self._compression) self._store_compressed_pd_csv(artifact_object, local_filename)
elif override_filename_ext_in_uri == ".feather": elif override_filename_ext_in_uri == ".feather":
try: try:
artifact_type_data.content_type = "application/feather" artifact_type_data.content_type = "application/feather"
@ -492,7 +493,7 @@ class Artifacts(object):
) )
) )
artifact_type_data.content_type = "text/csv" artifact_type_data.content_type = "text/csv"
artifact_object.to_csv(local_filename, compression=self._compression) self._store_compressed_pd_csv(artifact_object, local_filename)
elif override_filename_ext_in_uri == ".pickle": elif override_filename_ext_in_uri == ".pickle":
artifact_type_data.content_type = "application/pickle" artifact_type_data.content_type = "application/pickle"
artifact_object.to_pickle(local_filename) artifact_object.to_pickle(local_filename)
@ -527,8 +528,8 @@ class Artifacts(object):
if guessed_type: if guessed_type:
artifact_type_data.content_type = guessed_type artifact_type_data.content_type = guessed_type
fd, local_filename = mkstemp(prefix=quote(name, safe="") + '.', suffix=override_filename_ext_in_uri) local_filename = self._push_temp_file(
os.close(fd) prefix=quote(name, safe="") + '.', suffix=override_filename_ext_in_uri)
artifact_object.save(local_filename) artifact_object.save(local_filename)
delete_after_upload = True delete_after_upload = True
elif isinstance(artifact_object, dict): elif isinstance(artifact_object, dict):
@ -561,8 +562,9 @@ class Artifacts(object):
if serialized_text is not None: if serialized_text is not None:
override_filename_in_uri = name + override_filename_ext_in_uri override_filename_in_uri = name + override_filename_ext_in_uri
fd, local_filename = mkstemp(prefix=quote(name, safe="") + ".", suffix=override_filename_ext_in_uri) local_filename = self._push_temp_file(
with open(fd, "w") as f: prefix=quote(name, safe="") + ".", suffix=override_filename_ext_in_uri)
with open(local_filename, "w") as f:
f.write(serialized_text) f.write(serialized_text)
preview = preview or serialized_text preview = preview or serialized_text
if len(preview) < self.max_preview_size_bytes: if len(preview) < self.max_preview_size_bytes:
@ -599,7 +601,7 @@ class Artifacts(object):
files = list(Path(folder).rglob(wildcard)) files = list(Path(folder).rglob(wildcard))
override_filename_ext_in_uri = '.zip' override_filename_ext_in_uri = '.zip'
override_filename_in_uri = folder.parts[-1] + override_filename_ext_in_uri override_filename_in_uri = folder.parts[-1] + override_filename_ext_in_uri
fd, zip_file = mkstemp( zip_file = self._push_temp_file(
prefix=quote(folder.parts[-1], safe="") + '.', suffix=override_filename_ext_in_uri prefix=quote(folder.parts[-1], safe="") + '.', suffix=override_filename_ext_in_uri
) )
try: try:
@ -618,8 +620,7 @@ class Artifacts(object):
LoggerRoot.get_base_logger().warning('Exception {}\nFailed zipping artifact folder {}'.format( LoggerRoot.get_base_logger().warning('Exception {}\nFailed zipping artifact folder {}'.format(
folder, e)) folder, e))
return False return False
finally:
os.close(fd)
artifact_type_data.preview = preview or archive_preview artifact_type_data.preview = preview or archive_preview
artifact_object = zip_file artifact_object = zip_file
artifact_type = 'archive' artifact_type = 'archive'
@ -647,7 +648,7 @@ class Artifacts(object):
override_filename_ext_in_uri = '.zip' override_filename_ext_in_uri = '.zip'
override_filename_in_uri = quote(name, safe="") + override_filename_ext_in_uri override_filename_in_uri = quote(name, safe="") + override_filename_ext_in_uri
common_path = get_common_path(list_files) common_path = get_common_path(list_files)
fd, zip_file = mkstemp( zip_file = self._push_temp_file(
prefix='artifact_folder.', suffix=override_filename_ext_in_uri prefix='artifact_folder.', suffix=override_filename_ext_in_uri
) )
try: try:
@ -670,8 +671,7 @@ class Artifacts(object):
LoggerRoot.get_base_logger().warning('Exception {}\nFailed zipping artifact files {}'.format( LoggerRoot.get_base_logger().warning('Exception {}\nFailed zipping artifact files {}'.format(
artifact_object, e)) artifact_object, e))
return False return False
finally:
os.close(fd)
artifact_type_data.preview = preview or archive_preview artifact_type_data.preview = preview or archive_preview
artifact_object = zip_file artifact_object = zip_file
artifact_type = 'archive' artifact_type = 'archive'
@ -704,15 +704,15 @@ class Artifacts(object):
delete_after_upload = True delete_after_upload = True
override_filename_ext_in_uri = ".txt" override_filename_ext_in_uri = ".txt"
override_filename_in_uri = name + override_filename_ext_in_uri override_filename_in_uri = name + override_filename_ext_in_uri
fd, local_filename = mkstemp(prefix=quote(name, safe="") + ".", suffix=override_filename_ext_in_uri) local_filename = self._push_temp_file(
os.close(fd) prefix=quote(name, safe="") + ".", suffix=override_filename_ext_in_uri)
# noinspection PyBroadException # noinspection PyBroadException
try: try:
with open(local_filename, "wt") as f: with open(local_filename, "wt") as f:
f.write(artifact_object) f.write(artifact_object)
except Exception: except Exception:
# cleanup and raise exception # cleanup and raise exception
os.unlink(local_filename) self._delete_temp_file(local_filename)
raise raise
elif artifact_object is None or (isinstance(artifact_object, str) and artifact_object == ""): elif artifact_object is None or (isinstance(artifact_object, str) and artifact_object == ""):
artifact_type = '' artifact_type = ''
@ -736,15 +736,15 @@ class Artifacts(object):
delete_after_upload = True delete_after_upload = True
override_filename_ext_in_uri = '.pkl' override_filename_ext_in_uri = '.pkl'
override_filename_in_uri = name + override_filename_ext_in_uri override_filename_in_uri = name + override_filename_ext_in_uri
fd, local_filename = mkstemp(prefix=quote(name, safe="") + '.', suffix=override_filename_ext_in_uri) local_filename = self._push_temp_file(
os.close(fd) prefix=quote(name, safe="") + '.', suffix=override_filename_ext_in_uri)
# noinspection PyBroadException # noinspection PyBroadException
try: try:
with open(local_filename, 'wb') as f: with open(local_filename, 'wb') as f:
pickle.dump(artifact_object, f) pickle.dump(artifact_object, f)
except Exception: except Exception:
# cleanup and raise exception # cleanup and raise exception
os.unlink(local_filename) self._delete_temp_file(local_filename)
raise raise
# verify preview not out of scope: # verify preview not out of scope:
@ -875,10 +875,10 @@ class Artifacts(object):
override_filename_ext_in_uri = self._save_format override_filename_ext_in_uri = self._save_format
override_filename_in_uri = name override_filename_in_uri = name
fd, local_csv = mkstemp(prefix=quote(name, safe="") + '.', suffix=override_filename_ext_in_uri) local_csv = self._push_temp_file(
os.close(fd) prefix=quote(name, safe="") + '.', suffix=override_filename_ext_in_uri)
local_csv = Path(local_csv) local_csv = Path(local_csv)
pd_artifact.to_csv(local_csv.as_posix(), index=False, compression=self._compression) self._store_compressed_pd_csv(pd_artifact, local_csv.as_posix(), index=False)
current_sha2, file_sha2 = sha256sum( current_sha2, file_sha2 = sha256sum(
local_csv.as_posix(), skip_header=32, block_size=Artifacts._hash_block_size) local_csv.as_posix(), skip_header=32, block_size=Artifacts._hash_block_size)
if name in self._last_artifacts_upload: if name in self._last_artifacts_upload:
@ -887,7 +887,7 @@ class Artifacts(object):
# nothing to do, we can skip the upload # nothing to do, we can skip the upload
# noinspection PyBroadException # noinspection PyBroadException
try: try:
local_csv.unlink() self._delete_temp_file(local_csv)
except Exception: except Exception:
pass pass
return return
@ -944,6 +944,8 @@ class Artifacts(object):
""" """
from clearml.storage import StorageManager from clearml.storage import StorageManager
local_file = self._pop_temp_file(local_file)
upload_uri = self._task.output_uri or self._task.get_logger().get_default_upload_destination() upload_uri = self._task.output_uri or self._task.get_logger().get_default_upload_destination()
if not isinstance(local_file, Path): if not isinstance(local_file, Path):
local_file = Path(local_file) local_file = Path(local_file)
@ -962,7 +964,7 @@ class Artifacts(object):
StorageManager.upload_file(local_file.as_posix(), uri, wait_for_upload=True, retries=ev.retries) StorageManager.upload_file(local_file.as_posix(), uri, wait_for_upload=True, retries=ev.retries)
if delete_after_upload: if delete_after_upload:
try: try:
os.unlink(local_file.as_posix()) self._delete_temp_file(local_file)
except OSError: except OSError:
LoggerRoot.get_base_logger().warning('Failed removing temporary {}'.format(local_file)) LoggerRoot.get_base_logger().warning('Failed removing temporary {}'.format(local_file))
else: else:
@ -1047,9 +1049,75 @@ class Artifacts(object):
def _get_storage_uri_prefix(self): def _get_storage_uri_prefix(self):
# type: () -> str # type: () -> str
if not self._storage_prefix or self._task_name != self._task.name or self._project_name != self._task.get_project_name(): if not self._storage_prefix or self._task_name != self._task.name or \
self._project_name != self._task.get_project_name():
# noinspection PyProtectedMember # noinspection PyProtectedMember
self._storage_prefix = self._task._get_output_destination_suffix() self._storage_prefix = self._task._get_output_destination_suffix()
self._task_name = self._task.name self._task_name = self._task.name
self._project_name = self._task.get_project_name() self._project_name = self._task.get_project_name()
return self._storage_prefix return self._storage_prefix
def _store_compressed_pd_csv(self, artifact_object, local_filename, **kwargs):
# bugfix: to make pandas csv.gz consistent file hash we must pass mtime=0
# (otherwise it is encoded and creates new hash every time)
if self._compression == "gzip":
with gzip.GzipFile(local_filename, 'wb', mtime=0) as gzip_file:
artifact_object.to_csv(gzip_file, **kwargs)
else:
artifact_object.to_csv(local_filename, compression=self._compression)
def _push_temp_file(self, prefix=None, suffix=None):
"""
Same prefix/suffix as mkstemp uses
:param prefix: Same prefix/suffix as mkstemp uses
:param suffix: Same prefix/suffix as mkstemp uses
:return: consistent temp file inside a single folder that later we rename to a temp file
"""
# we want to make sure our temp naming convention is consistent
# this is important for hashing zip files and gz files, because the name of the internal
# file becomes part of the content and then hash changes
# temp filename is based on the assumption
# put a consistent the file into a temp folder because the filename is part of
# the compressed artifact and we want consistency. After that we rename compressed file to temp file and
# delete temp folder
temp_folder = mkdtemp(prefix='artifacts_')
local_filename = Path(temp_folder) / (str(prefix).rstrip(".") + "." + str(suffix).lstrip("."))
local_filename = local_filename.as_posix()
self._temp_files_lookup[local_filename] = (temp_folder, deepcopy(prefix), deepcopy(suffix))
return local_filename
def _pop_temp_file(self, local_filename=None):
"""
Now we need to move the consistent file from the temp folder to the main temp folder,
give it a new temp name, and remove the temp folder
:param local_filename: local file name inside a temp folder, assumed to be a single file in the temp folder
:return: new temp file inside the main temp folder
"""
# convert to posix if Path
if isinstance(local_filename, Path):
local_filename = local_filename.as_posix()
# if this is not our temp file, just do nothing
if local_filename not in self._temp_files_lookup:
return local_filename
# move file out of temp folder
try:
temp_folder, prefix, suffix = self._temp_files_lookup.pop(local_filename)
fd, temp_filename = mkstemp(prefix=prefix, suffix=suffix)
os.close(fd)
os.replace(local_filename, temp_filename)
local_filename = temp_filename
os.rmdir(temp_folder)
except Exception as ex:
raise ValueError("Failed storing temp artifact into {}: error: {}".format(local_filename, ex))
return temp_filename
def _delete_temp_file(self, local_filename):
# cleanup and raise exception
local_filename = self._pop_temp_file(local_filename)
os.unlink(local_filename)