From 0a7a32f2eb8dea0763fc251f00ba069002100238 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Sun, 26 Apr 2020 23:19:42 +0300 Subject: [PATCH] Support caching of extracted zip artifacts --- docs/trains.conf | 4 ++ trains/binding/artifacts.py | 21 +++------- trains/storage/cache.py | 54 ++++++++++++++++++++----- trains/storage/manager.py | 79 ++++++++++++++++++++++++++++++++++--- 4 files changed, 126 insertions(+), 32 deletions(-) diff --git a/docs/trains.conf b/docs/trains.conf index da0601c4..7f18388b 100644 --- a/docs/trains.conf +++ b/docs/trains.conf @@ -167,6 +167,10 @@ sdk { # Log all stdout & stderr log_stdout: true + + # compatibility feature, report memory usage for the entire machine + # default (false), report only on the running process and its sub-processes + report_global_mem_used: false } } } diff --git a/trains/binding/artifacts.py b/trains/binding/artifacts.py index 4a79f388..d428170c 100644 --- a/trains/binding/artifacts.py +++ b/trains/binding/artifacts.py @@ -157,22 +157,11 @@ class Artifact(object): :return: a local path to a downloaded copy of the artifact """ from trains.storage import StorageManager - local_path = StorageManager.get_local_copy(self.url) - if local_path and extract_archive and self.type == 'archive': - temp_folder = None - try: - temp_folder = mkdtemp(prefix='artifact_', suffix='.archive_'+self.name) - ZipFile(local_path).extractall(path=temp_folder) - except Exception: - try: - if temp_folder: - Path(temp_folder).rmdir() - except Exception: - pass - return local_path - return temp_folder - - return local_path + return StorageManager.get_local_copy( + remote_url=self.url, + extract_archive=extract_archive and self.type == 'archive', + name=self.name + ) def __repr__(self): return str({'name': self.name, 'size': self.size, 'type': self.type, 'mode': self.mode, 'url': self.url, diff --git a/trains/storage/cache.py b/trains/storage/cache.py index e818b363..db9b519c 100644 --- a/trains/storage/cache.py +++ b/trains/storage/cache.py @@ -1,17 +1,19 @@ import hashlib +import shutil from pathlib2 import Path from .helper import StorageHelper from .util import quote_url from ..config import get_cache_dir +from ..debugging.log import LoggerRoot class CacheManager(object): __cache_managers = {} _default_cache_file_limit = 100 - _storage_manager_folder = 'storage_manager' - _default_context = 'global' + _storage_manager_folder = "storage_manager" + _default_context = "global" class CacheContext(object): def __init__(self, cache_context, default_cache_file_limit=10): @@ -44,20 +46,24 @@ class CacheManager(object): @staticmethod def upload_file(local_file, remote_url, wait_for_upload=True): helper = StorageHelper.get(remote_url) - return helper.upload(local_file, remote_url, async_enable=not wait_for_upload) + return helper.upload( + local_file, remote_url, async_enable=not wait_for_upload + ) @classmethod def _get_hashed_url_file(cls, url): str_hash = hashlib.md5(url.encode()).hexdigest() - filename = url.split('/')[-1] - return '{}.{}'.format(str_hash, quote_url(filename)) + filename = url.split("/")[-1] + return "{}.{}".format(str_hash, quote_url(filename)) def _get_cache_file(self, remote_url): """ :param remote_url: check if we have the remote url in our cache :return: full path to file name, current file size or None """ - folder = Path(get_cache_dir() / CacheManager._storage_manager_folder / self._context) + folder = Path( + get_cache_dir() / CacheManager._storage_manager_folder / self._context + ) folder.mkdir(parents=True, exist_ok=True) local_filename = self._get_hashed_url_file(remote_url) new_file = folder / local_filename @@ -65,20 +71,48 @@ class CacheManager(object): new_file.touch(exist_ok=True) # delete old files - files = sorted(folder.iterdir(), reverse=True, key=lambda x: x.stat().st_atime) + def sort_max_access_time(x): + atime = x.stat().st_atime + # noinspection PyBroadException + try: + if x.is_dir(): + dir_files = list(x.iterdir()) + atime = ( + max(atime, max(s.stat().st_atime for s in dir_files)) + if dir_files + else atime + ) + except Exception: + pass + return atime + + files = sorted(folder.iterdir(), reverse=True, key=sort_max_access_time) files = files[self._file_limit:] for f in files: - f.unlink() + if not f.is_dir(): + f.unlink() + else: + try: + shutil.rmtree(f) + except Exception as e: + # failed deleting folder + LoggerRoot.get_base_logger().warning( + "Exception {}\nFailed deleting folder {}".format(e, f) + ) # if file doesn't exist, return file size None - return new_file.as_posix(), new_file.stat().st_size if new_file.exists() else None + return ( + new_file.as_posix(), + new_file.stat().st_size if new_file.exists() else None, + ) @classmethod def get_cache_manager(cls, cache_context=None, cache_file_limit=None): cache_context = cache_context or cls._default_context if cache_context not in cls.__cache_managers: cls.__cache_managers[cache_context] = cls.CacheContext( - cache_context, cache_file_limit or cls._default_cache_file_limit) + cache_context, cache_file_limit or cls._default_cache_file_limit + ) if cache_file_limit: cls.__cache_managers[cache_context].set_cache_limit(cache_file_limit) diff --git a/trains/storage/manager.py b/trains/storage/manager.py index 2e8f8adb..e30dfaf1 100644 --- a/trains/storage/manager.py +++ b/trains/storage/manager.py @@ -1,5 +1,12 @@ +import os +import shutil +from time import time from typing import Optional +from zipfile import ZipFile +from pathlib2 import Path + +from ..debugging.log import LoggerRoot from .cache import CacheManager @@ -11,7 +18,10 @@ class StorageManager(object): """ @classmethod - def get_local_copy(cls, remote_url, cache_context=None): # type: (str, Optional[str]) -> str + def get_local_copy( + cls, remote_url, cache_context=None, extract_archive=True, name=None + ): + # type: (str, Optional[str], Optional[bool], Optional[str]) -> str """ Get a local copy of the remote file. If the remote URL is a direct file access, the returned link is the same, otherwise a link to a local copy of the url file is returned. @@ -20,12 +30,63 @@ class StorageManager(object): :param str remote_url: remote url link (string) :param str cache_context: Optional caching context identifier (string), default context 'global' + :param bool extract_archive: if True returned path will be a cached folder containing the archive's content, + currently only zip files are supported. + :param name: name of artifact. :return str: full path to local copy of the requested url. Return None on Error. """ - return CacheManager.get_cache_manager(cache_context=cache_context).get_local_copy(remote_url=remote_url) + cached_file = CacheManager.get_cache_manager( + cache_context=cache_context + ).get_local_copy(remote_url=remote_url) + if not extract_archive or not cached_file: + return cached_file + archive_suffix = cached_file.rpartition(".")[0] + target_folder = Path("{0}_artifact_archive_{1}".format(archive_suffix, name)) + base_logger = LoggerRoot.get_base_logger() + try: + temp_target_folder = "{0}_{1}".format(target_folder.name, time() * 1000) + os.mkdir(path=temp_target_folder) + ZipFile(cached_file).extractall(path=temp_target_folder) + # we assume we will have such folder if we already extract the zip file + # noinspection PyBroadException + try: + # if rename fails, it means that someone else already manged to extract the zip, delete the current + # folder and return the already existing cached zip folder + os.rename(temp_target_folder, str(target_folder)) + except Exception: + if target_folder.exists(): + target_folder.touch(exist_ok=True) + else: + base_logger.warning( + "Failed renaming {0} to {1}".format( + temp_target_folder, target_folder + ) + ) + try: + shutil.rmtree(temp_target_folder) + except Exception as ex: + base_logger.warning( + "Exception {}\nFailed deleting folder {}".format( + ex, temp_target_folder + ) + ) + except Exception as ex: + # failed extracting zip file: + base_logger.warning( + "Exception {}\nFailed extracting zip file {}".format(ex, cached_file) + ) + # noinspection PyBroadException + try: + target_folder.rmdir() + except Exception: + pass + return cached_file + return target_folder @classmethod - def upload_file(cls, local_file, remote_url, wait_for_upload=True): # type: (str, str, bool) -> str + def upload_file( + cls, local_file, remote_url, wait_for_upload=True + ): # type: (str, str, bool) -> str """ Upload a local file to a remote location. remote url is the finale destination of the uploaded file. @@ -40,10 +101,15 @@ class StorageManager(object): :return str: Newly uploaded remote url """ return CacheManager.get_cache_manager().upload_file( - local_file=local_file, remote_url=remote_url, wait_for_upload=wait_for_upload) + local_file=local_file, + remote_url=remote_url, + wait_for_upload=wait_for_upload, + ) @classmethod - def set_cache_file_limit(cls, cache_file_limit, cache_context=None): # type: (int, Optional[str]) -> int + def set_cache_file_limit( + cls, cache_file_limit, cache_context=None + ): # type: (int, Optional[str]) -> int """ Set the cache context file limit. File limit is the maximum number of files the specific cache context holds. Notice, there is no limit on the size of these files, only the total number of cached files. @@ -53,4 +119,5 @@ class StorageManager(object): :return int: Return new cache context file limit """ return CacheManager.get_cache_manager( - cache_context=cache_context, cache_file_limit=cache_file_limit).set_cache_limit(cache_file_limit) + cache_context=cache_context, cache_file_limit=cache_file_limit + ).set_cache_limit(cache_file_limit)