Add skip_zero_size_check to StorageManager.download_folder()

This commit is contained in:
allegroai 2022-01-31 11:07:15 +02:00
parent 6229d5a67a
commit 5f1345b148
3 changed files with 61 additions and 26 deletions

View File

@ -19,15 +19,17 @@ from ..utilities.locks.exceptions import LockException
class CacheManager(object): class CacheManager(object):
__cache_managers = {} __cache_managers = {}
_default_cache_file_limit = deferred_config("storage.cache.default_cache_manager_size", 100) _default_cache_file_limit = deferred_config(
"storage.cache.default_cache_manager_size", 100
)
_storage_manager_folder = "storage_manager" _storage_manager_folder = "storage_manager"
_default_context = "global" _default_context = "global"
_local_to_remote_url_lookup = OrderedDict() _local_to_remote_url_lookup = OrderedDict()
__local_to_remote_url_lookup_max_size = 1024 __local_to_remote_url_lookup_max_size = 1024
_context_to_folder_lookup = dict() _context_to_folder_lookup = dict()
_default_context_folder_template = "{0}_artifacts_archive_{1}" _default_context_folder_template = "{0}_artifacts_archive_{1}"
_lockfile_prefix = '.lock.' _lockfile_prefix = ".lock."
_lockfile_suffix = '.clearml' _lockfile_suffix = ".clearml"
class CacheContext(object): class CacheContext(object):
_folder_locks = dict() # type: Dict[str, FileLock] _folder_locks = dict() # type: Dict[str, FileLock]
@ -44,8 +46,10 @@ class CacheManager(object):
self._file_limit = max(self._file_limit, int(cache_file_limit)) self._file_limit = max(self._file_limit, int(cache_file_limit))
return self._file_limit return self._file_limit
def get_local_copy(self, remote_url, force_download): def get_local_copy(
# type: (str, bool) -> Optional[str] self, remote_url, force_download, skip_zero_size_check=False
):
# type: (str, bool, bool) -> Optional[str]
helper = StorageHelper.get(remote_url) helper = StorageHelper.get(remote_url)
if not helper: if not helper:
raise ValueError("Storage access failed: {}".format(remote_url)) raise ValueError("Storage access failed: {}".format(remote_url))
@ -54,7 +58,9 @@ class CacheManager(object):
# noinspection PyProtectedMember # noinspection PyProtectedMember
direct_access = helper.get_driver_direct_access(remote_url) direct_access = helper.get_driver_direct_access(remote_url)
except (OSError, ValueError): except (OSError, ValueError):
LoggerRoot.get_base_logger().debug("Failed accessing local file: {}".format(remote_url)) LoggerRoot.get_base_logger().debug(
"Failed accessing local file: {}".format(remote_url)
)
return None return None
if direct_access: if direct_access:
@ -66,7 +72,12 @@ class CacheManager(object):
CacheManager._add_remote_url(remote_url, cached_file) CacheManager._add_remote_url(remote_url, cached_file)
return cached_file return cached_file
# we need to download the file: # we need to download the file:
downloaded_file = helper.download_to_file(remote_url, cached_file, overwrite_existing=force_download) downloaded_file = helper.download_to_file(
remote_url,
cached_file,
overwrite_existing=force_download,
skip_zero_size_check=skip_zero_size_check,
)
if downloaded_file != cached_file: if downloaded_file != cached_file:
# something happened # something happened
return None return None
@ -78,7 +89,10 @@ class CacheManager(object):
# type: (str, str, bool, int) -> Optional[str] # type: (str, str, bool, int) -> Optional[str]
helper = StorageHelper.get(remote_url) helper = StorageHelper.get(remote_url)
result = helper.upload( result = helper.upload(
local_file, remote_url, async_enable=not wait_for_upload, retries=retries, local_file,
remote_url,
async_enable=not wait_for_upload,
retries=retries,
) )
CacheManager._add_remote_url(remote_url, local_file) CacheManager._add_remote_url(remote_url, local_file)
return result return result
@ -107,6 +121,7 @@ class CacheManager(object):
:param local_filename: if local_file is given, search for the local file/directory in the cache folder :param local_filename: if local_file is given, search for the local file/directory in the cache folder
:return: full path to file name, current file size or None :return: full path to file name, current file size or None
""" """
def safe_time(x): def safe_time(x):
# noinspection PyBroadException # noinspection PyBroadException
try: try:
@ -147,10 +162,16 @@ class CacheManager(object):
lock_files = dict() lock_files = dict()
files = [] files = []
for f in sorted(folder.iterdir(), reverse=True, key=sort_max_access_time): for f in sorted(folder.iterdir(), reverse=True, key=sort_max_access_time):
if f.name.startswith(CacheManager._lockfile_prefix) and f.name.endswith(CacheManager._lockfile_suffix): if f.name.startswith(CacheManager._lockfile_prefix) and f.name.endswith(
CacheManager._lockfile_suffix
):
# parse the lock filename # parse the lock filename
name = f.name[len(CacheManager._lockfile_prefix):-len(CacheManager._lockfile_suffix)] name = f.name[
num, _, name = name.partition('.') len(CacheManager._lockfile_prefix) : -len(
CacheManager._lockfile_suffix
)
]
num, _, name = name.partition(".")
lock_files[name] = lock_files.get(name, []) + [f.as_posix()] lock_files[name] = lock_files.get(name, []) + [f.as_posix()]
else: else:
files.append(f) files.append(f)
@ -239,8 +260,12 @@ class CacheManager(object):
i = 0 i = 0
# try to create a lock if we do not already have one (if we do, we assume it is locked) # try to create a lock if we do not already have one (if we do, we assume it is locked)
while not lock: while not lock:
lock_path = local_path.parent / '{}{:03d}.{}{}'.format( lock_path = local_path.parent / "{}{:03d}.{}{}".format(
CacheManager._lockfile_prefix, i, local_path.name, CacheManager._lockfile_suffix) CacheManager._lockfile_prefix,
i,
local_path.name,
CacheManager._lockfile_suffix,
)
lock = FileLock(filename=lock_path) lock = FileLock(filename=lock_path)
# try to lock folder (if we failed to create lock, try nex number) # try to lock folder (if we failed to create lock, try nex number)
@ -287,8 +312,7 @@ class CacheManager(object):
cache_context = cache_context or cls._default_context cache_context = cache_context or cls._default_context
if cache_context not in cls.__cache_managers: if cache_context not in cls.__cache_managers:
cls.__cache_managers[cache_context] = cls.CacheContext( cls.__cache_managers[cache_context] = cls.CacheContext(
cache_context, cache_context, cache_file_limit or cls._default_cache_file_limit
cache_file_limit or cls._default_cache_file_limit
) )
if cache_file_limit: if cache_file_limit:
cls.__cache_managers[cache_context].set_cache_limit(cache_file_limit) cls.__cache_managers[cache_context].set_cache_limit(cache_file_limit)
@ -307,7 +331,9 @@ class CacheManager(object):
except Exception: except Exception:
return local_copy_path return local_copy_path
return CacheManager._local_to_remote_url_lookup.get(hash(conform_local_copy_path), local_copy_path) return CacheManager._local_to_remote_url_lookup.get(
hash(conform_local_copy_path), local_copy_path
)
@staticmethod @staticmethod
def _add_remote_url(remote_url, local_copy_path): def _add_remote_url(remote_url, local_copy_path):
@ -322,7 +348,7 @@ class CacheManager(object):
except Exception: except Exception:
return return
if remote_url.startswith('file://'): if remote_url.startswith("file://"):
return return
local_copy_path = str(local_copy_path) local_copy_path = str(local_copy_path)
@ -334,7 +360,10 @@ class CacheManager(object):
pass pass
CacheManager._local_to_remote_url_lookup[hash(local_copy_path)] = remote_url CacheManager._local_to_remote_url_lookup[hash(local_copy_path)] = remote_url
# protect against overuse, so we do not blowup the memory # protect against overuse, so we do not blowup the memory
if len(CacheManager._local_to_remote_url_lookup) > CacheManager.__local_to_remote_url_lookup_max_size: if (
len(CacheManager._local_to_remote_url_lookup)
> CacheManager.__local_to_remote_url_lookup_max_size
):
# pop the first item (FIFO) # pop the first item (FIFO)
CacheManager._local_to_remote_url_lookup.popitem(last=False) CacheManager._local_to_remote_url_lookup.popitem(last=False)
@ -349,4 +378,6 @@ class CacheManager(object):
# type: (Optional[str]) -> str # type: (Optional[str]) -> str
if not context: if not context:
return cls._default_context_folder_template return cls._default_context_folder_template
return cls._context_to_folder_lookup.get(str(context), cls._default_context_folder_template) return cls._context_to_folder_lookup.get(
str(context), cls._default_context_folder_template
)

View File

@ -248,11 +248,12 @@ class StorageHelper(object):
return instance return instance
@classmethod @classmethod
def get_local_copy(cls, remote_url): def get_local_copy(cls, remote_url, skip_zero_size_check=False):
""" """
Download a file from remote URL to a local storage, and return path to local copy, Download a file from remote URL to a local storage, and return path to local copy,
:param remote_url: Remote URL. Example: https://example.com/file.jpg s3://bucket/folder/file.mp4 etc. :param remote_url: Remote URL. Example: https://example.com/file.jpg s3://bucket/folder/file.mp4 etc.
:param skip_zero_size_check: If True no error will be raised for files with zero bytes size.
:return: Path to local copy of the downloaded file. None if error occurred. :return: Path to local copy of the downloaded file. None if error occurred.
""" """
helper = cls.get(remote_url) helper = cls.get(remote_url)
@ -261,7 +262,7 @@ class StorageHelper(object):
# create temp file with the requested file name # create temp file with the requested file name
file_name = '.' + remote_url.split('/')[-1].split(os.path.sep)[-1] file_name = '.' + remote_url.split('/')[-1].split(os.path.sep)[-1]
local_path = mktemp(suffix=file_name) local_path = mktemp(suffix=file_name)
return helper.download_to_file(remote_url, local_path) return helper.download_to_file(remote_url, local_path, skip_zero_size_check=skip_zero_size_check)
def __init__(self, base_url, url, key=None, secret=None, region=None, verbose=False, logger=None, retries=5, def __init__(self, base_url, url, key=None, secret=None, region=None, verbose=False, logger=None, retries=5,
**kwargs): **kwargs):
@ -802,19 +803,22 @@ class StorageHelper(object):
return True return True
@classmethod @classmethod
def download_from_url(cls, remote_url, local_path, overwrite_existing=False): def download_from_url(cls, remote_url, local_path, overwrite_existing=False, skip_zero_size_check=False):
""" """
Download a file from remote URL to a local storage Download a file from remote URL to a local storage
:param remote_url: Remote URL. Example: https://example.com/image.jpg or s3://bucket/folder/file.mp4 etc. :param remote_url: Remote URL. Example: https://example.com/image.jpg or s3://bucket/folder/file.mp4 etc.
:param local_path: target location for downloaded file. Example: /tmp/image.jpg :param local_path: target location for downloaded file. Example: /tmp/image.jpg
:param overwrite_existing: If True and local_path exists, it will overwrite it, otherwise print warning :param overwrite_existing: If True and local_path exists, it will overwrite it, otherwise print warning
:param skip_zero_size_check: If True no error will be raised for files with zero bytes size.
:return: local_path if download was successful. :return: local_path if download was successful.
""" """
helper = cls.get(remote_url) helper = cls.get(remote_url)
if not helper: if not helper:
return None return None
return helper.download_to_file(remote_url, local_path, overwrite_existing=overwrite_existing) return helper.download_to_file(
remote_url, local_path, overwrite_existing=overwrite_existing, skip_zero_size_check=skip_zero_size_check
)
def get_driver_direct_access(self, path): def get_driver_direct_access(self, path):
""" """

View File

@ -305,7 +305,7 @@ class StorageManager(object):
pool.apply_async( pool.apply_async(
helper.download_to_file, helper.download_to_file,
args=(remote_path, local_url), args=(remote_path, local_url),
kwds={"overwrite_existing": overwrite}, kwds={"overwrite_existing": overwrite, "skip_zero_size_check": skip_zero_size_check},
) )
) )
for res in results: for res in results: