diff --git a/clearml/storage/cache.py b/clearml/storage/cache.py index 1c91b1ef..72762e11 100644 --- a/clearml/storage/cache.py +++ b/clearml/storage/cache.py @@ -19,15 +19,17 @@ from ..utilities.locks.exceptions import LockException class CacheManager(object): __cache_managers = {} - _default_cache_file_limit = deferred_config("storage.cache.default_cache_manager_size", 100) + _default_cache_file_limit = deferred_config( + "storage.cache.default_cache_manager_size", 100 + ) _storage_manager_folder = "storage_manager" _default_context = "global" _local_to_remote_url_lookup = OrderedDict() __local_to_remote_url_lookup_max_size = 1024 _context_to_folder_lookup = dict() _default_context_folder_template = "{0}_artifacts_archive_{1}" - _lockfile_prefix = '.lock.' - _lockfile_suffix = '.clearml' + _lockfile_prefix = ".lock." + _lockfile_suffix = ".clearml" class CacheContext(object): _folder_locks = dict() # type: Dict[str, FileLock] @@ -44,8 +46,10 @@ class CacheManager(object): self._file_limit = max(self._file_limit, int(cache_file_limit)) return self._file_limit - def get_local_copy(self, remote_url, force_download): - # type: (str, bool) -> Optional[str] + def get_local_copy( + self, remote_url, force_download, skip_zero_size_check=False + ): + # type: (str, bool, bool) -> Optional[str] helper = StorageHelper.get(remote_url) if not helper: raise ValueError("Storage access failed: {}".format(remote_url)) @@ -54,7 +58,9 @@ class CacheManager(object): # noinspection PyProtectedMember direct_access = helper.get_driver_direct_access(remote_url) except (OSError, ValueError): - LoggerRoot.get_base_logger().debug("Failed accessing local file: {}".format(remote_url)) + LoggerRoot.get_base_logger().debug( + "Failed accessing local file: {}".format(remote_url) + ) return None if direct_access: @@ -66,7 +72,12 @@ class CacheManager(object): CacheManager._add_remote_url(remote_url, cached_file) return cached_file # we need to download the file: - downloaded_file = helper.download_to_file(remote_url, cached_file, overwrite_existing=force_download) + downloaded_file = helper.download_to_file( + remote_url, + cached_file, + overwrite_existing=force_download, + skip_zero_size_check=skip_zero_size_check, + ) if downloaded_file != cached_file: # something happened return None @@ -78,7 +89,10 @@ class CacheManager(object): # type: (str, str, bool, int) -> Optional[str] helper = StorageHelper.get(remote_url) result = helper.upload( - local_file, remote_url, async_enable=not wait_for_upload, retries=retries, + local_file, + remote_url, + async_enable=not wait_for_upload, + retries=retries, ) CacheManager._add_remote_url(remote_url, local_file) return result @@ -107,6 +121,7 @@ class CacheManager(object): :param local_filename: if local_file is given, search for the local file/directory in the cache folder :return: full path to file name, current file size or None """ + def safe_time(x): # noinspection PyBroadException try: @@ -147,20 +162,26 @@ class CacheManager(object): lock_files = dict() files = [] for f in sorted(folder.iterdir(), reverse=True, key=sort_max_access_time): - if f.name.startswith(CacheManager._lockfile_prefix) and f.name.endswith(CacheManager._lockfile_suffix): + if f.name.startswith(CacheManager._lockfile_prefix) and f.name.endswith( + CacheManager._lockfile_suffix + ): # parse the lock filename - name = f.name[len(CacheManager._lockfile_prefix):-len(CacheManager._lockfile_suffix)] - num, _, name = name.partition('.') + name = f.name[ + len(CacheManager._lockfile_prefix) : -len( + CacheManager._lockfile_suffix + ) + ] + num, _, name = name.partition(".") lock_files[name] = lock_files.get(name, []) + [f.as_posix()] else: files.append(f) # remove new lock files from the list (we will delete them when time comes) - for f in files[:self._file_limit]: + for f in files[: self._file_limit]: lock_files.pop(f.name, None) # delete old files - files = files[self._file_limit:] + files = files[self._file_limit :] for i, f in enumerate(files): if i < self._file_limit: continue @@ -239,8 +260,12 @@ class CacheManager(object): i = 0 # try to create a lock if we do not already have one (if we do, we assume it is locked) while not lock: - lock_path = local_path.parent / '{}{:03d}.{}{}'.format( - CacheManager._lockfile_prefix, i, local_path.name, CacheManager._lockfile_suffix) + lock_path = local_path.parent / "{}{:03d}.{}{}".format( + CacheManager._lockfile_prefix, + i, + local_path.name, + CacheManager._lockfile_suffix, + ) lock = FileLock(filename=lock_path) # try to lock folder (if we failed to create lock, try nex number) @@ -287,8 +312,7 @@ class CacheManager(object): cache_context = cache_context or cls._default_context if cache_context not in cls.__cache_managers: cls.__cache_managers[cache_context] = cls.CacheContext( - cache_context, - cache_file_limit or cls._default_cache_file_limit + cache_context, cache_file_limit or cls._default_cache_file_limit ) if cache_file_limit: cls.__cache_managers[cache_context].set_cache_limit(cache_file_limit) @@ -307,7 +331,9 @@ class CacheManager(object): except Exception: return local_copy_path - return CacheManager._local_to_remote_url_lookup.get(hash(conform_local_copy_path), local_copy_path) + return CacheManager._local_to_remote_url_lookup.get( + hash(conform_local_copy_path), local_copy_path + ) @staticmethod def _add_remote_url(remote_url, local_copy_path): @@ -322,7 +348,7 @@ class CacheManager(object): except Exception: return - if remote_url.startswith('file://'): + if remote_url.startswith("file://"): return local_copy_path = str(local_copy_path) @@ -334,7 +360,10 @@ class CacheManager(object): pass CacheManager._local_to_remote_url_lookup[hash(local_copy_path)] = remote_url # protect against overuse, so we do not blowup the memory - if len(CacheManager._local_to_remote_url_lookup) > CacheManager.__local_to_remote_url_lookup_max_size: + if ( + len(CacheManager._local_to_remote_url_lookup) + > CacheManager.__local_to_remote_url_lookup_max_size + ): # pop the first item (FIFO) CacheManager._local_to_remote_url_lookup.popitem(last=False) @@ -349,4 +378,6 @@ class CacheManager(object): # type: (Optional[str]) -> str if not context: return cls._default_context_folder_template - return cls._context_to_folder_lookup.get(str(context), cls._default_context_folder_template) + return cls._context_to_folder_lookup.get( + str(context), cls._default_context_folder_template + ) diff --git a/clearml/storage/helper.py b/clearml/storage/helper.py index 1c8f23c8..8e0ecb6e 100644 --- a/clearml/storage/helper.py +++ b/clearml/storage/helper.py @@ -248,11 +248,12 @@ class StorageHelper(object): return instance @classmethod - def get_local_copy(cls, remote_url): + def get_local_copy(cls, remote_url, skip_zero_size_check=False): """ Download a file from remote URL to a local storage, and return path to local copy, :param remote_url: Remote URL. Example: https://example.com/file.jpg s3://bucket/folder/file.mp4 etc. + :param skip_zero_size_check: If True no error will be raised for files with zero bytes size. :return: Path to local copy of the downloaded file. None if error occurred. """ helper = cls.get(remote_url) @@ -261,7 +262,7 @@ class StorageHelper(object): # create temp file with the requested file name file_name = '.' + remote_url.split('/')[-1].split(os.path.sep)[-1] local_path = mktemp(suffix=file_name) - return helper.download_to_file(remote_url, local_path) + return helper.download_to_file(remote_url, local_path, skip_zero_size_check=skip_zero_size_check) def __init__(self, base_url, url, key=None, secret=None, region=None, verbose=False, logger=None, retries=5, **kwargs): @@ -802,19 +803,22 @@ class StorageHelper(object): return True @classmethod - def download_from_url(cls, remote_url, local_path, overwrite_existing=False): + def download_from_url(cls, remote_url, local_path, overwrite_existing=False, skip_zero_size_check=False): """ Download a file from remote URL to a local storage :param remote_url: Remote URL. Example: https://example.com/image.jpg or s3://bucket/folder/file.mp4 etc. :param local_path: target location for downloaded file. Example: /tmp/image.jpg :param overwrite_existing: If True and local_path exists, it will overwrite it, otherwise print warning + :param skip_zero_size_check: If True no error will be raised for files with zero bytes size. :return: local_path if download was successful. """ helper = cls.get(remote_url) if not helper: return None - return helper.download_to_file(remote_url, local_path, overwrite_existing=overwrite_existing) + return helper.download_to_file( + remote_url, local_path, overwrite_existing=overwrite_existing, skip_zero_size_check=skip_zero_size_check + ) def get_driver_direct_access(self, path): """ diff --git a/clearml/storage/manager.py b/clearml/storage/manager.py index 71a7d183..f53d9127 100644 --- a/clearml/storage/manager.py +++ b/clearml/storage/manager.py @@ -305,7 +305,7 @@ class StorageManager(object): pool.apply_async( helper.download_to_file, args=(remote_path, local_url), - kwds={"overwrite_existing": overwrite}, + kwds={"overwrite_existing": overwrite, "skip_zero_size_check": skip_zero_size_check}, ) ) for res in results: