Support caching of extracted zip artifacts

This commit is contained in:
allegroai 2020-04-26 23:19:42 +03:00
parent 6ff3cc0ee4
commit 0a7a32f2eb
4 changed files with 126 additions and 32 deletions

View File

@ -167,6 +167,10 @@ sdk {
# Log all stdout & stderr
log_stdout: true
# compatibility feature, report memory usage for the entire machine
# default (false), report only on the running process and its sub-processes
report_global_mem_used: false
}
}
}

View File

@ -157,22 +157,11 @@ class Artifact(object):
:return: a local path to a downloaded copy of the artifact
"""
from trains.storage import StorageManager
local_path = StorageManager.get_local_copy(self.url)
if local_path and extract_archive and self.type == 'archive':
temp_folder = None
try:
temp_folder = mkdtemp(prefix='artifact_', suffix='.archive_'+self.name)
ZipFile(local_path).extractall(path=temp_folder)
except Exception:
try:
if temp_folder:
Path(temp_folder).rmdir()
except Exception:
pass
return local_path
return temp_folder
return local_path
return StorageManager.get_local_copy(
remote_url=self.url,
extract_archive=extract_archive and self.type == 'archive',
name=self.name
)
def __repr__(self):
return str({'name': self.name, 'size': self.size, 'type': self.type, 'mode': self.mode, 'url': self.url,

View File

@ -1,17 +1,19 @@
import hashlib
import shutil
from pathlib2 import Path
from .helper import StorageHelper
from .util import quote_url
from ..config import get_cache_dir
from ..debugging.log import LoggerRoot
class CacheManager(object):
__cache_managers = {}
_default_cache_file_limit = 100
_storage_manager_folder = 'storage_manager'
_default_context = 'global'
_storage_manager_folder = "storage_manager"
_default_context = "global"
class CacheContext(object):
def __init__(self, cache_context, default_cache_file_limit=10):
@ -44,20 +46,24 @@ class CacheManager(object):
@staticmethod
def upload_file(local_file, remote_url, wait_for_upload=True):
helper = StorageHelper.get(remote_url)
return helper.upload(local_file, remote_url, async_enable=not wait_for_upload)
return helper.upload(
local_file, remote_url, async_enable=not wait_for_upload
)
@classmethod
def _get_hashed_url_file(cls, url):
str_hash = hashlib.md5(url.encode()).hexdigest()
filename = url.split('/')[-1]
return '{}.{}'.format(str_hash, quote_url(filename))
filename = url.split("/")[-1]
return "{}.{}".format(str_hash, quote_url(filename))
def _get_cache_file(self, remote_url):
"""
:param remote_url: check if we have the remote url in our cache
:return: full path to file name, current file size or None
"""
folder = Path(get_cache_dir() / CacheManager._storage_manager_folder / self._context)
folder = Path(
get_cache_dir() / CacheManager._storage_manager_folder / self._context
)
folder.mkdir(parents=True, exist_ok=True)
local_filename = self._get_hashed_url_file(remote_url)
new_file = folder / local_filename
@ -65,20 +71,48 @@ class CacheManager(object):
new_file.touch(exist_ok=True)
# delete old files
files = sorted(folder.iterdir(), reverse=True, key=lambda x: x.stat().st_atime)
def sort_max_access_time(x):
atime = x.stat().st_atime
# noinspection PyBroadException
try:
if x.is_dir():
dir_files = list(x.iterdir())
atime = (
max(atime, max(s.stat().st_atime for s in dir_files))
if dir_files
else atime
)
except Exception:
pass
return atime
files = sorted(folder.iterdir(), reverse=True, key=sort_max_access_time)
files = files[self._file_limit:]
for f in files:
f.unlink()
if not f.is_dir():
f.unlink()
else:
try:
shutil.rmtree(f)
except Exception as e:
# failed deleting folder
LoggerRoot.get_base_logger().warning(
"Exception {}\nFailed deleting folder {}".format(e, f)
)
# if file doesn't exist, return file size None
return new_file.as_posix(), new_file.stat().st_size if new_file.exists() else None
return (
new_file.as_posix(),
new_file.stat().st_size if new_file.exists() else None,
)
@classmethod
def get_cache_manager(cls, cache_context=None, cache_file_limit=None):
cache_context = cache_context or cls._default_context
if cache_context not in cls.__cache_managers:
cls.__cache_managers[cache_context] = cls.CacheContext(
cache_context, cache_file_limit or cls._default_cache_file_limit)
cache_context, cache_file_limit or cls._default_cache_file_limit
)
if cache_file_limit:
cls.__cache_managers[cache_context].set_cache_limit(cache_file_limit)

View File

@ -1,5 +1,12 @@
import os
import shutil
from time import time
from typing import Optional
from zipfile import ZipFile
from pathlib2 import Path
from ..debugging.log import LoggerRoot
from .cache import CacheManager
@ -11,7 +18,10 @@ class StorageManager(object):
"""
@classmethod
def get_local_copy(cls, remote_url, cache_context=None): # type: (str, Optional[str]) -> str
def get_local_copy(
cls, remote_url, cache_context=None, extract_archive=True, name=None
):
# type: (str, Optional[str], Optional[bool], Optional[str]) -> str
"""
Get a local copy of the remote file. If the remote URL is a direct file access,
the returned link is the same, otherwise a link to a local copy of the url file is returned.
@ -20,12 +30,63 @@ class StorageManager(object):
:param str remote_url: remote url link (string)
:param str cache_context: Optional caching context identifier (string), default context 'global'
:param bool extract_archive: if True returned path will be a cached folder containing the archive's content,
currently only zip files are supported.
:param name: name of artifact.
:return str: full path to local copy of the requested url. Return None on Error.
"""
return CacheManager.get_cache_manager(cache_context=cache_context).get_local_copy(remote_url=remote_url)
cached_file = CacheManager.get_cache_manager(
cache_context=cache_context
).get_local_copy(remote_url=remote_url)
if not extract_archive or not cached_file:
return cached_file
archive_suffix = cached_file.rpartition(".")[0]
target_folder = Path("{0}_artifact_archive_{1}".format(archive_suffix, name))
base_logger = LoggerRoot.get_base_logger()
try:
temp_target_folder = "{0}_{1}".format(target_folder.name, time() * 1000)
os.mkdir(path=temp_target_folder)
ZipFile(cached_file).extractall(path=temp_target_folder)
# we assume we will have such folder if we already extract the zip file
# noinspection PyBroadException
try:
# if rename fails, it means that someone else already manged to extract the zip, delete the current
# folder and return the already existing cached zip folder
os.rename(temp_target_folder, str(target_folder))
except Exception:
if target_folder.exists():
target_folder.touch(exist_ok=True)
else:
base_logger.warning(
"Failed renaming {0} to {1}".format(
temp_target_folder, target_folder
)
)
try:
shutil.rmtree(temp_target_folder)
except Exception as ex:
base_logger.warning(
"Exception {}\nFailed deleting folder {}".format(
ex, temp_target_folder
)
)
except Exception as ex:
# failed extracting zip file:
base_logger.warning(
"Exception {}\nFailed extracting zip file {}".format(ex, cached_file)
)
# noinspection PyBroadException
try:
target_folder.rmdir()
except Exception:
pass
return cached_file
return target_folder
@classmethod
def upload_file(cls, local_file, remote_url, wait_for_upload=True): # type: (str, str, bool) -> str
def upload_file(
cls, local_file, remote_url, wait_for_upload=True
): # type: (str, str, bool) -> str
"""
Upload a local file to a remote location.
remote url is the finale destination of the uploaded file.
@ -40,10 +101,15 @@ class StorageManager(object):
:return str: Newly uploaded remote url
"""
return CacheManager.get_cache_manager().upload_file(
local_file=local_file, remote_url=remote_url, wait_for_upload=wait_for_upload)
local_file=local_file,
remote_url=remote_url,
wait_for_upload=wait_for_upload,
)
@classmethod
def set_cache_file_limit(cls, cache_file_limit, cache_context=None): # type: (int, Optional[str]) -> int
def set_cache_file_limit(
cls, cache_file_limit, cache_context=None
): # type: (int, Optional[str]) -> int
"""
Set the cache context file limit. File limit is the maximum number of files the specific cache context holds.
Notice, there is no limit on the size of these files, only the total number of cached files.
@ -53,4 +119,5 @@ class StorageManager(object):
:return int: Return new cache context file limit
"""
return CacheManager.get_cache_manager(
cache_context=cache_context, cache_file_limit=cache_file_limit).set_cache_limit(cache_file_limit)
cache_context=cache_context, cache_file_limit=cache_file_limit
).set_cache_limit(cache_file_limit)