diff --git a/clearml/storage/helper.py b/clearml/storage/helper.py index ff785925..3c48275e 100644 --- a/clearml/storage/helper.py +++ b/clearml/storage/helper.py @@ -116,1184 +116,6 @@ class _Driver(object): cls._file_server_hosts = hosts return cls._file_server_hosts - -class StorageHelper(object): - """ Storage helper. - Used by the entire system to download/upload files. - Supports both local and remote files (currently local files, network-mapped files, HTTP/S and Amazon S3) - """ - _temp_download_suffix = '.partially' - - @classmethod - def _get_logger(cls): - return get_logger('storage') - - @attrs - class _PathSubstitutionRule(object): - registered_prefix = attrib(type=str) - local_prefix = attrib(type=str) - replace_windows_sep = attrib(type=bool) - replace_linux_sep = attrib(type=bool) - - path_substitution_config = 'storage.path_substitution' - - @classmethod - def load_list_from_config(cls): - rules_list = [] - for index, sub_config in enumerate(config.get(cls.path_substitution_config, list())): - rule = cls( - registered_prefix=sub_config.get('registered_prefix', None), - local_prefix=sub_config.get('local_prefix', None), - replace_windows_sep=sub_config.get('replace_windows_sep', False), - replace_linux_sep=sub_config.get('replace_linux_sep', False), - ) - - if any(prefix is None for prefix in (rule.registered_prefix, rule.local_prefix)): - StorageHelper._get_logger().warning( - "Illegal substitution rule configuration '{}[{}]': {}".format( - cls.path_substitution_config, - index, - asdict(rule), - )) - - continue - - if all((rule.replace_windows_sep, rule.replace_linux_sep)): - StorageHelper._get_logger().warning( - "Only one of replace_windows_sep and replace_linux_sep flags may be set." - "'{}[{}]': {}".format( - cls.path_substitution_config, - index, - asdict(rule), - )) - continue - - rules_list.append(rule) - - return rules_list - - class _UploadData(object): - @property - def src_path(self): - return self._src_path - - @property - def dest_path(self): - return self._dest_path - - @property - def canonized_dest_path(self): - return self._canonized_dest_path - - @property - def extra(self): - return self._extra - - @property - def callback(self): - return self._callback - - @property - def retries(self): - return self._retries - - @property - def return_canonized(self): - return self._return_canonized - - def __init__(self, src_path, dest_path, canonized_dest_path, extra, callback, retries, return_canonized): - self._src_path = src_path - self._dest_path = dest_path - self._canonized_dest_path = canonized_dest_path - self._extra = extra - self._callback = callback - self._retries = retries - self._return_canonized = return_canonized - - def __str__(self): - return "src=%s" % self.src_path - - _helpers = {} # cache of helper instances - - # global terminate event for async upload threads - # _terminate = threading.Event() - _async_upload_threads = set() - _upload_pool = None - _upload_pool_pid = None - - # collect all bucket credentials that aren't empty (ignore entries with an empty key or secret) - _s3_configurations = deferred_config('aws.s3', {}, transform=S3BucketConfigurations.from_config) - _gs_configurations = deferred_config('google.storage', {}, transform=GSBucketConfigurations.from_config) - _azure_configurations = deferred_config('azure.storage', {}, transform=AzureContainerConfigurations.from_config) - _path_substitutions = deferred_config(transform=_PathSubstitutionRule.load_list_from_config) - - @property - def log(self): - return self._log - - @property - def scheme(self): - return self._scheme - - @property - def secure(self): - return self._secure - - @property - def base_url(self): - return self._base_url - - @classmethod - def get(cls, url, logger=None, **kwargs): - """ - Get a storage helper instance for the given URL - - :return: A StorageHelper instance. - """ - # Handle URL substitution etc before locating the correct storage driver - url = cls._canonize_url(url) - - # Get the credentials we should use for this url - base_url = cls._resolve_base_url(url) - - instance_key = '%s_%s' % (base_url, threading.current_thread().ident or 0) - # noinspection PyBroadException - try: - configs = kwargs.get("configs") - if configs: - instance_key += "_{}".format(configs.cache_name) - except Exception: - pass - - force_create = kwargs.pop('__force_create', False) - if (instance_key in cls._helpers) and (not force_create) and base_url != "file://": - return cls._helpers[instance_key] - - # Don't canonize URL since we already did it - try: - instance = cls(base_url=base_url, url=url, logger=logger, canonize_url=False, **kwargs) - except (StorageError, UsageError) as ex: - cls._get_logger().error(str(ex)) - return None - except Exception as ex: - cls._get_logger().error("Failed creating storage object {} Reason: {}".format( - base_url or url, ex)) - return None - - cls._helpers[instance_key] = instance - return instance - - @classmethod - def get_local_copy(cls, remote_url, skip_zero_size_check=False): - """ - Download a file from remote URL to a local storage, and return path to local copy, - - :param remote_url: Remote URL. Example: https://example.com/file.jpg s3://bucket/folder/file.mp4 etc. - :param skip_zero_size_check: If True, no error will be raised for files with zero bytes size. - :return: Path to local copy of the downloaded file. None if error occurred. - """ - helper = cls.get(remote_url) - if not helper: - return None - # create temp file with the requested file name - file_name = '.' + remote_url.split('/')[-1].split(os.path.sep)[-1] - local_path = mktemp(suffix=file_name) - return helper.download_to_file(remote_url, local_path, skip_zero_size_check=skip_zero_size_check) - - def __init__( - self, - base_url, - url, - key=None, - secret=None, - region=None, - verbose=False, - logger=None, - retries=5, - token=None, - **kwargs - ): - level = config.get("storage.log.level", None) - - if level: - try: - self._get_logger().setLevel(level) - except (TypeError, ValueError): - self._get_logger().error('invalid storage log level in configuration: %s' % level) - - self._log = logger or self._get_logger() - self._verbose = verbose - self._retries = retries - self._extra = {} - self._base_url = base_url - self._secure = True - self._driver = None - self._container = None - self._conf = None - - if kwargs.get('canonize_url', True): - url = self._canonize_url(url) - - parsed = urlparse(url) - self._scheme = parsed.scheme - - if self._scheme == _AzureBlobServiceStorageDriver.scheme: - self._conf = copy(self._azure_configurations.get_config_by_uri(url)) - if self._conf is None: - raise StorageError("Missing Azure Blob Storage configuration for {}".format(url)) - - if not self._conf.account_name or not self._conf.account_key: - raise StorageError( - "Missing account name or key for Azure Blob Storage access for {}".format(base_url) - ) - - self._driver = _AzureBlobServiceStorageDriver() - self._container = self._driver.get_container(config=self._conf, account_url=parsed.netloc) - - elif self._scheme == _Boto3Driver.scheme: - self._conf = copy(self._s3_configurations.get_config_by_uri(url)) - self._secure = self._conf.secure - - final_region = region if region else self._conf.region - if not final_region: - final_region = None - - self._conf.update( - key=key or self._conf.key, - secret=secret or self._conf.secret, - multipart=self._conf.multipart, - region=final_region, - use_credentials_chain=self._conf.use_credentials_chain, - token=token or self._conf.token, - extra_args=self._conf.extra_args, - ) - - if not self._conf.use_credentials_chain: - if not self._conf.key or not self._conf.secret: - raise ValueError( - "Missing key and secret for S3 storage access (%s)" % base_url - ) - - self._driver = _Boto3Driver() - self._container = self._driver.get_container( - container_name=self._base_url, retries=retries, config=self._conf) - - elif self._scheme == _GoogleCloudStorageDriver.scheme: - self._conf = copy(self._gs_configurations.get_config_by_uri(url)) - self._driver = _GoogleCloudStorageDriver() - self._container = self._driver.get_container( - container_name=self._base_url, - config=self._conf - ) - - elif self._scheme in _HttpDriver.schemes: - self._driver = _HttpDriver(retries=retries) - self._container = self._driver.get_container(container_name=self._base_url) - else: # elif self._scheme == 'file': - # if this is not a known scheme assume local file - # url2pathname is specifically intended to operate on (urlparse result).path - # and returns a cross-platform compatible result - new_url = normalize_local_path(url[len("file://"):] if url.startswith("file://") else url) - self._driver = _FileStorageDriver(new_url) - # noinspection PyBroadException - try: - self._container = self._driver.get_container("") - except Exception: - self._container = None - - @classmethod - def terminate_uploads(cls, force=True, timeout=2.0): - if force: - # since async uploaders are daemon threads, we can just return and let them close by themselves - return - # signal all threads to terminate and give them a chance for 'timeout' seconds (total, not per-thread) - # cls._terminate.set() - remaining_timeout = timeout - for thread in cls._async_upload_threads: - t = time() - # noinspection PyBroadException - try: - thread.join(timeout=remaining_timeout) - except Exception: - pass - remaining_timeout -= (time() - t) - - @classmethod - def get_aws_storage_uri_from_config(cls, bucket_config): - uri = ( - "s3://{}/{}".format(bucket_config.host, bucket_config.bucket) - if bucket_config.host - else "s3://{}".format(bucket_config.bucket) - ) - if bucket_config.subdir: - uri += "/" + bucket_config.subdir - return uri - - @classmethod - def get_gcp_storage_uri_from_config(cls, bucket_config): - return ( - "gs://{}/{}".format(bucket_config.bucket, bucket_config.subdir) - if bucket_config.subdir - else "gs://{}".format(bucket_config.bucket) - ) - - @classmethod - def get_azure_storage_uri_from_config(cls, bucket_config): - return "azure://{}.blob.core.windows.net/{}".format(bucket_config.account_name, bucket_config.container_name) - - @classmethod - def get_configuration(cls, bucket_config): - return cls.get_aws_configuration(bucket_config) - - @classmethod - def get_aws_configuration(cls, bucket_config): - return cls._s3_configurations.get_config_by_bucket(bucket_config.bucket, bucket_config.host) - - @classmethod - def get_gcp_configuration(cls, bucket_config): - return cls._gs_configurations.get_config_by_uri( - cls.get_gcp_storage_uri_from_config(bucket_config), - create_if_not_found=False - ) - - @classmethod - def get_azure_configuration(cls, bucket_config): - return cls._azure_configurations.get_config(bucket_config.account_name, bucket_config.container_name) - - @classmethod - def add_configuration(cls, bucket_config, log=None, _test_config=True): - return cls.add_aws_configuration(bucket_config, log=log, _test_config=_test_config) - - @classmethod - def add_aws_configuration(cls, bucket_config, log=None, _test_config=True): - # Try to use existing configuration if we have no key and secret - use_existing = not bucket_config.is_valid() - # Get existing config anyway (we'll either try to use it or alert we're replacing it - existing = cls.get_aws_configuration(bucket_config) - configs = cls._s3_configurations - uri = cls.get_aws_storage_uri_from_config(bucket_config) - - if not use_existing: - # Test bucket config, fails if unsuccessful - if _test_config: - _Boto3Driver._test_bucket_config(bucket_config, log) # noqa - if existing: - if log: - log.warning("Overriding existing configuration for '{}'".format(uri)) - configs.remove_config(existing) - configs.add_config(bucket_config) - else: - # Try to use existing configuration - good_config = False - if existing: - if log: - log.info("Using existing credentials for '{}'".format(uri)) - good_config = _Boto3Driver._test_bucket_config(existing, log, raise_on_error=False) # noqa - - if not good_config: - # Try to use global key/secret - configs.update_config_with_defaults(bucket_config) - - if log: - log.info("Using global credentials for '{}'".format(uri)) - if _test_config: - _Boto3Driver._test_bucket_config(bucket_config, log) # noqa - configs.add_config(bucket_config) - - @classmethod - def add_gcp_configuration(cls, bucket_config, log=None): - use_existing = not bucket_config.is_valid() - existing = cls.get_gcp_configuration(bucket_config) - configs = cls._gs_configurations - uri = cls.get_gcp_storage_uri_from_config(bucket_config) - - if not use_existing: - if existing: - if log: - log.warning("Overriding existing configuration for '{}'".format(uri)) - configs.remove_config(existing) - configs.add_config(bucket_config) - else: - good_config = False - if existing: - if log: - log.info("Using existing config for '{}'".format(uri)) - good_config = _GoogleCloudStorageDriver.test_upload(None, bucket_config) - if not good_config: - configs.update_config_with_defaults(bucket_config) - if log: - log.info("Using global credentials for '{}'".format(uri)) - configs.add_config(bucket_config) - - @classmethod - def add_azure_configuration(cls, bucket_config, log=None): - use_existing = not bucket_config.is_valid() - existing = cls.get_azure_configuration(bucket_config) - configs = cls._azure_configurations - uri = cls.get_azure_storage_uri_from_config(bucket_config) - - if not use_existing: - if existing: - if log: - log.warning("Overriding existing configuration for '{}'".format(uri)) - configs.remove_config(existing) - configs.add_config(bucket_config) - else: - good_config = False - if existing: - if log: - log.info("Using existing config for '{}'".format(uri)) - good_config = _AzureBlobServiceStorageDriver.test_upload(None, bucket_config) - if not good_config: - configs.update_config_with_defaults(bucket_config) - if log: - log.info("Using global credentials for '{}'".format(uri)) - configs.add_config(bucket_config) - - @classmethod - def add_path_substitution( - cls, - registered_prefix, - local_prefix, - replace_windows_sep=False, - replace_linux_sep=False, - ): - """ - Add a path substitution rule for storage paths. - - Useful for case where the data was registered under some path, and that - path was later renamed. This may happen with local storage paths where - each machine is has different mounts or network drives configurations - - :param registered_prefix: The prefix to search for and replace. This is - the prefix of the path the data is registered under. This should be the - exact url prefix, case sensitive, as the data is registered. - :param local_prefix: The prefix to replace 'registered_prefix' with. This - is the prefix of the path the data is actually saved under. This should be the - exact url prefix, case sensitive, as the data is saved under. - :param replace_windows_sep: If set to True, and the prefix matches, the rest - of the url has all of the windows path separators (backslash '\') replaced with - the native os path separator. - :param replace_linux_sep: If set to True, and the prefix matches, the rest - of the url has all of the linux/unix path separators (slash '/') replaced with - the native os path separator. - """ - - if not registered_prefix or not local_prefix: - raise UsageError("Path substitution prefixes must be non empty strings") - - if replace_windows_sep and replace_linux_sep: - raise UsageError("Only one of replace_windows_sep and replace_linux_sep may be set.") - - rule = cls._PathSubstitutionRule( - registered_prefix=registered_prefix, - local_prefix=local_prefix, - replace_windows_sep=replace_windows_sep, - replace_linux_sep=replace_linux_sep, - ) - - cls._path_substitutions.append(rule) - - @classmethod - def clear_path_substitutions(cls): - """ - Removes all path substitution rules, including ones from the configuration file. - """ - cls._path_substitutions = list() - - def get_object_size_bytes(self, remote_url, silence_errors=False): - # type: (str, bool) -> [int, None] - """ - Get size of the remote file in bytes. - - :param str remote_url: The url where the file is stored. - E.g. 's3://bucket/some_file.txt', 'file://local/file' - :param bool silence_errors: Silence errors that might occur - when fetching the size of the file. Default: False - - :return: The size of the file in bytes. - None if the file could not be found or an error occurred. - """ - obj = self.get_object(remote_url, silence_errors=silence_errors) - return self._get_object_size_bytes(obj, silence_errors) - - def _get_object_size_bytes(self, obj, silence_errors=False): - # type: (object) -> [int, None] - """ - Auxiliary function for `get_object_size_bytes`. - Get size of the remote object in bytes. - - :param object obj: The remote object - :param bool silence_errors: Silence errors that might occur - when fetching the size of the file. Default: False - - :return: The size of the object in bytes. - None if an error occurred. - """ - if not obj: - return None - size = None - try: - if isinstance(self._driver, _HttpDriver) and obj: - obj = self._driver._get_download_object(obj) # noqa - size = int(obj.headers.get("Content-Length", 0)) - elif hasattr(obj, "size"): - size = obj.size - # Google storage has the option to reload the object to get the size - if size is None and hasattr(obj, "reload"): - # noinspection PyBroadException - try: - # To catch google.api_core exceptions - obj.reload() - size = obj.size - except Exception as e: - if not silence_errors: - self.log.warning("Failed obtaining object size on reload: {}('{}')".format( - e.__class__.__name__, str(e))) - elif hasattr(obj, "content_length"): - # noinspection PyBroadException - try: - # To catch botocore exceptions - size = obj.content_length # noqa - except Exception as e: - if not silence_errors: - self.log.warning("Failed obtaining content_length while getting object size: {}('{}')".format( - e.__class__.__name__, str(e))) - except Exception as e: - if not silence_errors: - self.log.warning("Failed getting object size: {}('{}')".format(e.__class__.__name__, str(e))) - return size - - def get_object_metadata(self, obj): - # type: (object) -> dict - """ - Get the metadata of the remote object. - The metadata is a dict containing the following keys: `name`, `size`. - - :param object obj: The remote object - - :return: A dict containing the metadata of the remote object - """ - name_fields = ("name", "url", "key", "blob_name") - metadata = { - "size": self._get_object_size_bytes(obj), - "name": next(filter(None, (getattr(obj, f, None) for f in name_fields)), None), - } - return metadata - - def verify_upload(self, folder_uri='', raise_on_error=True, log_on_error=True): - """ - Verify that this helper can upload files to a folder. - - An upload is possible iff: - 1. the destination folder is under the base uri of the url used to create the helper - 2. the helper has credentials to write to the destination folder - - :param folder_uri: The destination folder to test. Must be an absolute - url that begins with the base uri of the url used to create the helper. - :param raise_on_error: Raise an exception if an upload is not possible - :param log_on_error: Log an error if an upload is not possible - :return: True, if, and only if, an upload to folder_uri is possible. - """ - - folder_uri = self._canonize_url(folder_uri) - - folder_uri = self.conform_url(folder_uri, self._base_url) - - test_path = self._normalize_object_name(folder_uri) - - if self._scheme == _Boto3Driver.scheme: - _Boto3Driver._test_bucket_config( - self._conf, - self._log, - test_path=test_path, - raise_on_error=raise_on_error, - log_on_error=log_on_error, - ) - elif self._scheme == _GoogleCloudStorageDriver.scheme: - self._driver.test_upload(test_path, self._conf) - - elif self._scheme == 'file': - # Check path exists - Path(test_path).mkdir(parents=True, exist_ok=True) - # check path permissions - Path(test_path).touch(exist_ok=True) - - return folder_uri - - def upload_from_stream(self, stream, dest_path, extra=None, retries=1, return_canonized=True): - canonized_dest_path = self._canonize_url(dest_path) - object_name = self._normalize_object_name(canonized_dest_path) - extra = extra.copy() if extra else {} - extra.update(self._extra) - last_ex = None - cb = UploadProgressReport.from_stream(stream, object_name, self._verbose, self._log) - for i in range(max(1, int(retries))): - try: - self._driver.upload_object_via_stream( - iterator=stream, - container=self._container, - object_name=object_name, - callback=cb, - extra=extra) - last_ex = None - break - except Exception as ex: - last_ex = ex - # seek to beginning if possible - # noinspection PyBroadException - try: - stream.seek(0) - except Exception: - pass - if last_ex: - raise last_ex - - result_dest_path = canonized_dest_path if return_canonized else dest_path - - if self.scheme in _HttpDriver.schemes: - # quote link - result_dest_path = quote_url(result_dest_path) - - return result_dest_path - - def upload( - self, src_path, dest_path=None, extra=None, async_enable=False, cb=None, retries=3, return_canonized=True - ): - if not dest_path: - dest_path = os.path.basename(src_path) - - canonized_dest_path = self._canonize_url(dest_path) - dest_path = dest_path.replace('\\', '/') - canonized_dest_path = canonized_dest_path.replace('\\', '/') - - result_path = canonized_dest_path if return_canonized else dest_path - - if cb and self.scheme in _HttpDriver.schemes: - # store original callback - a_cb = cb - - # quote link - def callback(result): - return a_cb(quote_url(result_path) if result else result) - # replace callback with wrapper - cb = callback - - if async_enable: - data = self._UploadData( - src_path=src_path, - dest_path=dest_path, - canonized_dest_path=canonized_dest_path, - extra=extra, - callback=cb, - retries=retries, - return_canonized=return_canonized - ) - StorageHelper._initialize_upload_pool() - return StorageHelper._upload_pool.apply_async(self._do_async_upload, args=(data,)) - else: - res = self._do_upload( - src_path=src_path, - dest_path=dest_path, - canonized_dest_path=canonized_dest_path, - extra=extra, - cb=cb, - verbose=False, - retries=retries, - return_canonized=return_canonized) - if res: - result_path = quote_url(result_path) - return result_path - - def list(self, prefix=None, with_metadata=False): - """ - List entries in the helper base path. - - Return a list of names inside this helper base path or a list of dictionaries containing - the objects' metadata. The base path is determined at creation time and is specific - for each storage medium. - For Google Storage and S3 it is the bucket of the path. - For local files it is the root directory. - - This operation is not supported for http and https protocols. - - :param prefix: If None, return the list as described above. If not, it - must be a string - the path of a sub directory under the base path. - the returned list will include only objects under that subdir. - - :param with_metadata: Instead of returning just the names of the objects, return a list of dictionaries - containing the name and metadata of the remote file. Thus, each dictionary will contain the following - keys: `name`, `size`. - - :return: The paths of all the objects in the storage base path under prefix or - a list of dictionaries containing the objects' metadata. - Listed relative to the base path. - """ - if prefix: - prefix = self._canonize_url(prefix) - if prefix.startswith(self._base_url): - prefix = prefix[len(self._base_url):] - if self._base_url != "file://": - prefix = prefix.lstrip("/") - if self._base_url == "file://": - prefix = prefix.rstrip("/") - if prefix.startswith(str(self._driver.base_path)): - prefix = prefix[len(str(self._driver.base_path)):] - res = self._driver.list_container_objects(self._container, ex_prefix=prefix) - result = [ - obj.name if not with_metadata else self.get_object_metadata(obj) - for obj in res - ] - - if self._base_url == "file://": - if not with_metadata: - result = [Path(f).as_posix() for f in result] - else: - for metadata_entry in result: - metadata_entry["name"] = Path(metadata_entry["name"]).as_posix() - return result - else: - return [ - obj.name if not with_metadata else self.get_object_metadata(obj) - for obj in self._driver.list_container_objects(self._container) - ] - - def download_to_file( - self, - remote_path, - local_path, - overwrite_existing=False, - delete_on_failure=True, - verbose=None, - skip_zero_size_check=False, - silence_errors=False, - direct_access=True - ): - def next_chunk(astream): - if isinstance(astream, binary_type): - chunk = astream - astream = None - elif astream: - try: - chunk = next(astream) - except StopIteration: - chunk = None - else: - chunk = None - return chunk, astream - - remote_path = self._canonize_url(remote_path) - verbose = self._verbose if verbose is None else verbose - - tmp_remote_path = remote_path - # noinspection PyBroadException - try: - tmp_remote_path = normalize_local_path(tmp_remote_path) - if tmp_remote_path.exists(): - remote_path = "file://{}".format(str(tmp_remote_path)) - except Exception: - pass - # Check if driver type supports direct access: - direct_access_path = self.get_driver_direct_access(remote_path) - if direct_access_path and direct_access: - return direct_access_path - - temp_local_path = None - try: - if verbose: - self._log.info('Start downloading from %s' % remote_path) - if not overwrite_existing and Path(local_path).is_file(): - self._log.debug( - 'File {} already exists, no need to download, thread id = {}'.format( - local_path, - threading.current_thread().ident, - ), - ) - - return local_path - if remote_path.startswith("file://"): - Path(local_path).parent.mkdir(parents=True, exist_ok=True) - # use remote_path, because direct_access_path might be None, because of access_rules - # len("file://") == 7 - shutil.copyfile(remote_path[7:], local_path) - return local_path - # we download into temp_local_path so that if we accidentally stop in the middle, - # we won't think we have the entire file - temp_local_path = '{}_{}{}'.format(local_path, time(), self._temp_download_suffix) - obj = self.get_object(remote_path, silence_errors=silence_errors) - if not obj: - return None - - # object size in bytes - total_size_mb = -1 - dl_total_mb = 0. - download_reported = False - # chunks size is ignored and always 5Mb - chunk_size_mb = 5 - - # make sure we have the destination folder - # noinspection PyBroadException - Path(temp_local_path).parent.mkdir(parents=True, exist_ok=True) - - total_size_bytes = self.get_object_size_bytes(remote_path, silence_errors=silence_errors) - if total_size_bytes is not None: - total_size_mb = float(total_size_bytes) / (1024 * 1024) - - # if driver supports download with callback, use it (it might be faster) - if hasattr(self._driver, 'download_object'): - # callback - cb = DownloadProgressReport(total_size_mb, verbose, remote_path, self._log) - self._driver.download_object(obj, temp_local_path, callback=cb) - download_reported = bool(cb.last_reported) - dl_total_mb = cb.current_status_mb - else: - stream = self._driver.download_object_as_stream(obj, chunk_size_mb * 1024 * 1024) - if stream is None: - raise ValueError('Could not download %s' % remote_path) - with open(temp_local_path, 'wb') as fd: - data, stream = next_chunk(stream) - while data: - fd.write(data) - data, stream = next_chunk(stream) - - if not skip_zero_size_check and Path(temp_local_path).stat().st_size <= 0: - raise Exception('downloaded a 0-sized file') - - # if we are on Windows, we need to remove the target file before renaming - # otherwise posix rename will overwrite the target - if os.name != 'posix': - # noinspection PyBroadException - try: - os.remove(local_path) - except Exception: - pass - - # rename temp file to local_file - # noinspection PyBroadException - try: - os.rename(temp_local_path, local_path) - except Exception: - # noinspection PyBroadException - try: - os.unlink(temp_local_path) - except Exception: - pass - # file was downloaded by a parallel process, check we have the final output and delete the partial copy - path_local_path = Path(local_path) - if not path_local_path.is_file() or (not skip_zero_size_check and path_local_path.stat().st_size <= 0): - raise Exception('Failed renaming partial file, downloaded file exists and a 0-sized file') - - # report download if we are on the second chunk - if verbose or download_reported: - self._log.info( - 'Downloaded %.2f MB successfully from %s , saved to %s' % (dl_total_mb, remote_path, local_path)) - return local_path - except DownloadError: - raise - except Exception as e: - self._log.error("Could not download {} , err: {} ".format(remote_path, e)) - if delete_on_failure: - # noinspection PyBroadException - try: - os.remove(temp_local_path) - except Exception: - pass - return None - - def download_as_stream(self, remote_path, chunk_size=None): - remote_path = self._canonize_url(remote_path) - try: - obj = self.get_object(remote_path) - return self._driver.download_object_as_stream( - obj, chunk_size=chunk_size, verbose=self._verbose, log=self.log - ) - except DownloadError: - raise - except Exception as e: - self._log.error("Could not download file : %s, err:%s " % (remote_path, str(e))) - return None - - def download_as_nparray(self, remote_path, chunk_size=None): - try: - stream = self.download_as_stream(remote_path, chunk_size) - if stream is None: - return - - # TODO: ugly py3 hack, please remove ASAP - if six.PY3 and not isinstance(stream, GeneratorType): - import numpy as np - return np.frombuffer(stream, dtype=np.uint8) - else: - import numpy as np - return np.asarray(bytearray(b''.join(stream)), dtype=np.uint8) - - except Exception as e: - self._log.error("Could not download file : %s, err:%s " % (remote_path, str(e))) - - def delete(self, path): - path = self._canonize_url(path) - return self._driver.delete_object(self.get_object(path)) - - def check_write_permissions(self, dest_path=None): - # create a temporary file, then delete it - base_url = dest_path or self._base_url - dest_path = base_url + "/.clearml.{}.test".format(str(uuid.uuid4())) - # do not check http/s connection permissions - if dest_path.startswith("http"): - return True - - try: - self.upload_from_stream(stream=six.BytesIO(b"clearml"), dest_path=dest_path) - except Exception: - raise ValueError("Insufficient permissions (write failed) for {}".format(base_url)) - try: - self.delete(path=dest_path) - except Exception: - raise ValueError("Insufficient permissions (delete failed) for {}".format(base_url)) - return True - - @classmethod - def download_from_url(cls, remote_url, local_path, overwrite_existing=False, skip_zero_size_check=False): - """ - Download a file from remote URL to a local storage - - :param remote_url: Remote URL. Example: https://example.com/image.jpg or s3://bucket/folder/file.mp4 etc. - :param local_path: target location for downloaded file. Example: /tmp/image.jpg - :param overwrite_existing: If True, and local_path exists, it will overwrite it, otherwise print warning - :param skip_zero_size_check: If True, no error will be raised for files with zero bytes size. - :return: local_path if download was successful. - """ - helper = cls.get(remote_url) - if not helper: - return None - return helper.download_to_file( - remote_url, local_path, overwrite_existing=overwrite_existing, skip_zero_size_check=skip_zero_size_check - ) - - def get_driver_direct_access(self, path): - """ - Check if the helper's driver has a direct access to the file - - :param str path: file path to check access to - :return: Return the string representation of the file as path if have access to it, else None - """ - - return self._driver.get_direct_access(path) - - @classmethod - def _canonize_url(cls, url): - return cls._apply_url_substitutions(url) - - @classmethod - def _apply_url_substitutions(cls, url): - def replace_separator(_url, where, sep): - return _url[:where] + _url[where:].replace(sep, os.sep) - - for index, rule in enumerate(cls._path_substitutions): - if url.startswith(rule.registered_prefix): - url = url.replace( - rule.registered_prefix, - rule.local_prefix, - 1, # count. str.replace() does not support keyword arguments - ) - - if rule.replace_windows_sep: - url = replace_separator(url, len(rule.local_prefix), '\\') - - if rule.replace_linux_sep: - url = replace_separator(url, len(rule.local_prefix), '/') - - break - - return url - - @classmethod - def _resolve_base_url(cls, base_url): - parsed = urlparse(base_url) - if parsed.scheme == _Boto3Driver.scheme: - conf = cls._s3_configurations.get_config_by_uri(base_url) - bucket = conf.bucket - if not bucket: - parts = Path(parsed.path.strip('/')).parts - if parts: - bucket = parts[0] - return '/'.join(x for x in ('s3:/', conf.host, bucket) if x) - elif parsed.scheme == _AzureBlobServiceStorageDriver.scheme: - conf = cls._azure_configurations.get_config_by_uri(base_url) - if not conf: - raise StorageError("Can't find azure configuration for {}".format(base_url)) - return str(furl(base_url).set(path=conf.container_name)) - elif parsed.scheme == _GoogleCloudStorageDriver.scheme: - conf = cls._gs_configurations.get_config_by_uri(base_url) - return str(furl(scheme=parsed.scheme, netloc=conf.bucket)) - elif parsed.scheme in _HttpDriver.schemes: - for files_server in _Driver.get_file_server_hosts(): - if base_url.startswith(files_server): - return files_server - return parsed.scheme + "://" - else: # if parsed.scheme == 'file': - # if we do not know what it is, we assume file - return 'file://' - - @classmethod - def conform_url(cls, folder_uri, base_url=None): - if not folder_uri: - return folder_uri - _base_url = cls._resolve_base_url(folder_uri) if not base_url else base_url - - if not folder_uri.startswith(_base_url): - prev_folder_uri = folder_uri - if _base_url == 'file://': - folder_uri = str(Path(folder_uri).absolute()) - if folder_uri.startswith('/'): - folder_uri = _base_url + folder_uri - elif platform.system() == "Windows": - folder_uri = ''.join((_base_url, folder_uri)) - else: - folder_uri = '/'.join((_base_url, folder_uri)) - - cls._get_logger().debug('Upload destination {} amended to {} for registration purposes'.format( - prev_folder_uri, folder_uri)) - else: - raise ValueError('folder_uri: {} does not start with base url: {}'.format(folder_uri, _base_url)) - - return folder_uri - - def _absolute_object_name(self, path): - """ Returns absolute remote path, including any prefix that is handled by the container """ - if not path.startswith(self.base_url): - return self.base_url.rstrip('/') + '///' + path.lstrip('/') - return path - - def _normalize_object_name(self, path): - """ Normalize remote path. Remove any prefix that is already handled by the container """ - if path.startswith(self.base_url): - path = path[len(self.base_url):] - if path.startswith('/') and os.name == 'nt': - path = path[1:] - if self.scheme in (_Boto3Driver.scheme, _GoogleCloudStorageDriver.scheme, - _AzureBlobServiceStorageDriver.scheme): - path = path.lstrip('/') - return path - - def _do_async_upload(self, data): - assert isinstance(data, self._UploadData) - return self._do_upload(data.src_path, data.dest_path, data.canonized_dest_path, extra=data.extra, cb=data.callback, verbose=True, retries=data.retries, return_canonized=data.return_canonized) - - def _upload_from_file(self, local_path, dest_path, extra=None): - if not hasattr(self._driver, 'upload_object'): - with open(local_path, 'rb') as stream: - res = self.upload_from_stream(stream=stream, dest_path=dest_path, extra=extra) - else: - object_name = self._normalize_object_name(dest_path) - extra = extra.copy() if extra else {} - extra.update(self._extra) - cb = UploadProgressReport.from_file(local_path, self._verbose, self._log) - res = self._driver.upload_object( - file_path=local_path, - container=self._container, - object_name=object_name, - callback=cb, - extra=extra) - return res - - def _do_upload(self, src_path, dest_path, canonized_dest_path, extra=None, cb=None, verbose=False, retries=1, return_canonized=False): - object_name = self._normalize_object_name(canonized_dest_path) - if cb: - try: - cb(None) - except Exception as e: - self._log.error("Calling upload callback when starting upload: %s" % str(e)) - if verbose: - msg = 'Starting upload: {} => {}{}'.format( - src_path, - (self._container.name if self._container.name.endswith('/') else self._container.name + '/') - if self._container and self._container.name else '', object_name) - if object_name.startswith('file://') or object_name.startswith('/'): - self._log.debug(msg) - else: - self._log.info(msg) - last_ex = None - for i in range(max(1, int(retries))): - try: - if not self._upload_from_file(local_path=src_path, dest_path=canonized_dest_path, extra=extra): - # retry if failed - last_ex = ValueError("Upload failed") - continue - last_ex = None - break - except Exception as e: - last_ex = e - - if last_ex: - self._log.error("Exception encountered while uploading %s" % str(last_ex)) - if cb: - try: - cb(False) - except Exception as e: - self._log.warning("Exception on upload callback: %s" % str(e)) - raise last_ex - - if verbose: - self._log.debug("Finished upload: %s => %s" % (src_path, object_name)) - if cb: - try: - cb(canonized_dest_path if return_canonized else dest_path) - except Exception as e: - self._log.warning("Exception on upload callback: %s" % str(e)) - - return canonized_dest_path if return_canonized else dest_path - - def get_object(self, path, silence_errors=False): - # type: (str, bool) -> object - """ - Gets the remote object stored at path. The data held by the object - differs depending on where it is stored. - - :param str path: the path where the remote object is stored - :param bool silence_errors: Silence errors that might occur - when fetching the remote object - - :return: The remote object - """ - path = self._canonize_url(path) - object_name = self._normalize_object_name(path) - try: - return self._driver.get_object( - container_name=self._container.name if self._container else '', object_name=object_name) - except ConnectionError: - raise DownloadError - except Exception as e: - if not silence_errors: - self.log.warning("Storage helper problem for {}: {}".format(str(object_name), str(e))) - return None - - @staticmethod - def _initialize_upload_pool(): - if not StorageHelper._upload_pool or StorageHelper._upload_pool_pid != os.getpid(): - StorageHelper._upload_pool_pid = os.getpid() - StorageHelper._upload_pool = ThreadPool(processes=1) - - @staticmethod - def close_async_threads(): - if StorageHelper._upload_pool: - pool = StorageHelper._upload_pool - StorageHelper._upload_pool = None - # noinspection PyBroadException - try: - pool.terminate() - pool.join() - except Exception: - pass - - def exists_file(self, remote_url): - remote_url = self._canonize_url(remote_url) - object_name = self._normalize_object_name(remote_url) - return self._driver.exists_file( - container_name=self._container.name if self._container else "", object_name=object_name - ) - - class _HttpDriver(_Driver): """ LibCloud http/https adapter (simple, enough for now) """ @@ -2954,6 +1776,1186 @@ class _FileStorageDriver(_Driver): return os.path.isfile(object_name) + +class StorageHelper(object): + """ Storage helper. + Used by the entire system to download/upload files. + Supports both local and remote files (currently local files, network-mapped files, HTTP/S and Amazon S3) + """ + _temp_download_suffix = '.partially' + _quotable_uri_schemes = set(_HttpDriver.schemes) | set([_GoogleCloudStorageDriver.scheme]) + + @classmethod + def _get_logger(cls): + return get_logger('storage') + + @attrs + class _PathSubstitutionRule(object): + registered_prefix = attrib(type=str) + local_prefix = attrib(type=str) + replace_windows_sep = attrib(type=bool) + replace_linux_sep = attrib(type=bool) + + path_substitution_config = 'storage.path_substitution' + + @classmethod + def load_list_from_config(cls): + rules_list = [] + for index, sub_config in enumerate(config.get(cls.path_substitution_config, list())): + rule = cls( + registered_prefix=sub_config.get('registered_prefix', None), + local_prefix=sub_config.get('local_prefix', None), + replace_windows_sep=sub_config.get('replace_windows_sep', False), + replace_linux_sep=sub_config.get('replace_linux_sep', False), + ) + + if any(prefix is None for prefix in (rule.registered_prefix, rule.local_prefix)): + StorageHelper._get_logger().warning( + "Illegal substitution rule configuration '{}[{}]': {}".format( + cls.path_substitution_config, + index, + asdict(rule), + )) + + continue + + if all((rule.replace_windows_sep, rule.replace_linux_sep)): + StorageHelper._get_logger().warning( + "Only one of replace_windows_sep and replace_linux_sep flags may be set." + "'{}[{}]': {}".format( + cls.path_substitution_config, + index, + asdict(rule), + )) + continue + + rules_list.append(rule) + + return rules_list + + class _UploadData(object): + @property + def src_path(self): + return self._src_path + + @property + def dest_path(self): + return self._dest_path + + @property + def canonized_dest_path(self): + return self._canonized_dest_path + + @property + def extra(self): + return self._extra + + @property + def callback(self): + return self._callback + + @property + def retries(self): + return self._retries + + @property + def return_canonized(self): + return self._return_canonized + + def __init__(self, src_path, dest_path, canonized_dest_path, extra, callback, retries, return_canonized): + self._src_path = src_path + self._dest_path = dest_path + self._canonized_dest_path = canonized_dest_path + self._extra = extra + self._callback = callback + self._retries = retries + self._return_canonized = return_canonized + + def __str__(self): + return "src=%s" % self.src_path + + _helpers = {} # cache of helper instances + + # global terminate event for async upload threads + # _terminate = threading.Event() + _async_upload_threads = set() + _upload_pool = None + _upload_pool_pid = None + + # collect all bucket credentials that aren't empty (ignore entries with an empty key or secret) + _s3_configurations = deferred_config('aws.s3', {}, transform=S3BucketConfigurations.from_config) + _gs_configurations = deferred_config('google.storage', {}, transform=GSBucketConfigurations.from_config) + _azure_configurations = deferred_config('azure.storage', {}, transform=AzureContainerConfigurations.from_config) + _path_substitutions = deferred_config(transform=_PathSubstitutionRule.load_list_from_config) + + @property + def log(self): + return self._log + + @property + def scheme(self): + return self._scheme + + @property + def secure(self): + return self._secure + + @property + def base_url(self): + return self._base_url + + @classmethod + def get(cls, url, logger=None, **kwargs): + """ + Get a storage helper instance for the given URL + + :return: A StorageHelper instance. + """ + # Handle URL substitution etc before locating the correct storage driver + url = cls._canonize_url(url) + + # Get the credentials we should use for this url + base_url = cls._resolve_base_url(url) + + instance_key = '%s_%s' % (base_url, threading.current_thread().ident or 0) + # noinspection PyBroadException + try: + configs = kwargs.get("configs") + if configs: + instance_key += "_{}".format(configs.cache_name) + except Exception: + pass + + force_create = kwargs.pop('__force_create', False) + if (instance_key in cls._helpers) and (not force_create) and base_url != "file://": + return cls._helpers[instance_key] + + # Don't canonize URL since we already did it + try: + instance = cls(base_url=base_url, url=url, logger=logger, canonize_url=False, **kwargs) + except (StorageError, UsageError) as ex: + cls._get_logger().error(str(ex)) + return None + except Exception as ex: + cls._get_logger().error("Failed creating storage object {} Reason: {}".format( + base_url or url, ex)) + return None + + cls._helpers[instance_key] = instance + return instance + + @classmethod + def get_local_copy(cls, remote_url, skip_zero_size_check=False): + """ + Download a file from remote URL to a local storage, and return path to local copy, + + :param remote_url: Remote URL. Example: https://example.com/file.jpg s3://bucket/folder/file.mp4 etc. + :param skip_zero_size_check: If True, no error will be raised for files with zero bytes size. + :return: Path to local copy of the downloaded file. None if error occurred. + """ + helper = cls.get(remote_url) + if not helper: + return None + # create temp file with the requested file name + file_name = '.' + remote_url.split('/')[-1].split(os.path.sep)[-1] + local_path = mktemp(suffix=file_name) + return helper.download_to_file(remote_url, local_path, skip_zero_size_check=skip_zero_size_check) + + def __init__( + self, + base_url, + url, + key=None, + secret=None, + region=None, + verbose=False, + logger=None, + retries=5, + token=None, + **kwargs + ): + level = config.get("storage.log.level", None) + + if level: + try: + self._get_logger().setLevel(level) + except (TypeError, ValueError): + self._get_logger().error('invalid storage log level in configuration: %s' % level) + + self._log = logger or self._get_logger() + self._verbose = verbose + self._retries = retries + self._extra = {} + self._base_url = base_url + self._secure = True + self._driver = None + self._container = None + self._conf = None + + if kwargs.get('canonize_url', True): + url = self._canonize_url(url) + + parsed = urlparse(url) + self._scheme = parsed.scheme + + if self._scheme == _AzureBlobServiceStorageDriver.scheme: + self._conf = copy(self._azure_configurations.get_config_by_uri(url)) + if self._conf is None: + raise StorageError("Missing Azure Blob Storage configuration for {}".format(url)) + + if not self._conf.account_name or not self._conf.account_key: + raise StorageError( + "Missing account name or key for Azure Blob Storage access for {}".format(base_url) + ) + + self._driver = _AzureBlobServiceStorageDriver() + self._container = self._driver.get_container(config=self._conf, account_url=parsed.netloc) + + elif self._scheme == _Boto3Driver.scheme: + self._conf = copy(self._s3_configurations.get_config_by_uri(url)) + self._secure = self._conf.secure + + final_region = region if region else self._conf.region + if not final_region: + final_region = None + + self._conf.update( + key=key or self._conf.key, + secret=secret or self._conf.secret, + multipart=self._conf.multipart, + region=final_region, + use_credentials_chain=self._conf.use_credentials_chain, + token=token or self._conf.token, + extra_args=self._conf.extra_args, + ) + + if not self._conf.use_credentials_chain: + if not self._conf.key or not self._conf.secret: + raise ValueError( + "Missing key and secret for S3 storage access (%s)" % base_url + ) + + self._driver = _Boto3Driver() + self._container = self._driver.get_container( + container_name=self._base_url, retries=retries, config=self._conf) + + elif self._scheme == _GoogleCloudStorageDriver.scheme: + self._conf = copy(self._gs_configurations.get_config_by_uri(url)) + self._driver = _GoogleCloudStorageDriver() + self._container = self._driver.get_container( + container_name=self._base_url, + config=self._conf + ) + + elif self._scheme in _HttpDriver.schemes: + self._driver = _HttpDriver(retries=retries) + self._container = self._driver.get_container(container_name=self._base_url) + else: # elif self._scheme == 'file': + # if this is not a known scheme assume local file + # url2pathname is specifically intended to operate on (urlparse result).path + # and returns a cross-platform compatible result + new_url = normalize_local_path(url[len("file://"):] if url.startswith("file://") else url) + self._driver = _FileStorageDriver(new_url) + # noinspection PyBroadException + try: + self._container = self._driver.get_container("") + except Exception: + self._container = None + + @classmethod + def terminate_uploads(cls, force=True, timeout=2.0): + if force: + # since async uploaders are daemon threads, we can just return and let them close by themselves + return + # signal all threads to terminate and give them a chance for 'timeout' seconds (total, not per-thread) + # cls._terminate.set() + remaining_timeout = timeout + for thread in cls._async_upload_threads: + t = time() + # noinspection PyBroadException + try: + thread.join(timeout=remaining_timeout) + except Exception: + pass + remaining_timeout -= (time() - t) + + @classmethod + def get_aws_storage_uri_from_config(cls, bucket_config): + uri = ( + "s3://{}/{}".format(bucket_config.host, bucket_config.bucket) + if bucket_config.host + else "s3://{}".format(bucket_config.bucket) + ) + if bucket_config.subdir: + uri += "/" + bucket_config.subdir + return uri + + @classmethod + def get_gcp_storage_uri_from_config(cls, bucket_config): + return ( + "gs://{}/{}".format(bucket_config.bucket, bucket_config.subdir) + if bucket_config.subdir + else "gs://{}".format(bucket_config.bucket) + ) + + @classmethod + def get_azure_storage_uri_from_config(cls, bucket_config): + return "azure://{}.blob.core.windows.net/{}".format(bucket_config.account_name, bucket_config.container_name) + + @classmethod + def get_configuration(cls, bucket_config): + return cls.get_aws_configuration(bucket_config) + + @classmethod + def get_aws_configuration(cls, bucket_config): + return cls._s3_configurations.get_config_by_bucket(bucket_config.bucket, bucket_config.host) + + @classmethod + def get_gcp_configuration(cls, bucket_config): + return cls._gs_configurations.get_config_by_uri( + cls.get_gcp_storage_uri_from_config(bucket_config), + create_if_not_found=False + ) + + @classmethod + def get_azure_configuration(cls, bucket_config): + return cls._azure_configurations.get_config(bucket_config.account_name, bucket_config.container_name) + + @classmethod + def add_configuration(cls, bucket_config, log=None, _test_config=True): + return cls.add_aws_configuration(bucket_config, log=log, _test_config=_test_config) + + @classmethod + def add_aws_configuration(cls, bucket_config, log=None, _test_config=True): + # Try to use existing configuration if we have no key and secret + use_existing = not bucket_config.is_valid() + # Get existing config anyway (we'll either try to use it or alert we're replacing it + existing = cls.get_aws_configuration(bucket_config) + configs = cls._s3_configurations + uri = cls.get_aws_storage_uri_from_config(bucket_config) + + if not use_existing: + # Test bucket config, fails if unsuccessful + if _test_config: + _Boto3Driver._test_bucket_config(bucket_config, log) # noqa + if existing: + if log: + log.warning("Overriding existing configuration for '{}'".format(uri)) + configs.remove_config(existing) + configs.add_config(bucket_config) + else: + # Try to use existing configuration + good_config = False + if existing: + if log: + log.info("Using existing credentials for '{}'".format(uri)) + good_config = _Boto3Driver._test_bucket_config(existing, log, raise_on_error=False) # noqa + + if not good_config: + # Try to use global key/secret + configs.update_config_with_defaults(bucket_config) + + if log: + log.info("Using global credentials for '{}'".format(uri)) + if _test_config: + _Boto3Driver._test_bucket_config(bucket_config, log) # noqa + configs.add_config(bucket_config) + + @classmethod + def add_gcp_configuration(cls, bucket_config, log=None): + use_existing = not bucket_config.is_valid() + existing = cls.get_gcp_configuration(bucket_config) + configs = cls._gs_configurations + uri = cls.get_gcp_storage_uri_from_config(bucket_config) + + if not use_existing: + if existing: + if log: + log.warning("Overriding existing configuration for '{}'".format(uri)) + configs.remove_config(existing) + configs.add_config(bucket_config) + else: + good_config = False + if existing: + if log: + log.info("Using existing config for '{}'".format(uri)) + good_config = _GoogleCloudStorageDriver.test_upload(None, bucket_config) + if not good_config: + configs.update_config_with_defaults(bucket_config) + if log: + log.info("Using global credentials for '{}'".format(uri)) + configs.add_config(bucket_config) + + @classmethod + def add_azure_configuration(cls, bucket_config, log=None): + use_existing = not bucket_config.is_valid() + existing = cls.get_azure_configuration(bucket_config) + configs = cls._azure_configurations + uri = cls.get_azure_storage_uri_from_config(bucket_config) + + if not use_existing: + if existing: + if log: + log.warning("Overriding existing configuration for '{}'".format(uri)) + configs.remove_config(existing) + configs.add_config(bucket_config) + else: + good_config = False + if existing: + if log: + log.info("Using existing config for '{}'".format(uri)) + good_config = _AzureBlobServiceStorageDriver.test_upload(None, bucket_config) + if not good_config: + configs.update_config_with_defaults(bucket_config) + if log: + log.info("Using global credentials for '{}'".format(uri)) + configs.add_config(bucket_config) + + @classmethod + def add_path_substitution( + cls, + registered_prefix, + local_prefix, + replace_windows_sep=False, + replace_linux_sep=False, + ): + """ + Add a path substitution rule for storage paths. + + Useful for case where the data was registered under some path, and that + path was later renamed. This may happen with local storage paths where + each machine is has different mounts or network drives configurations + + :param registered_prefix: The prefix to search for and replace. This is + the prefix of the path the data is registered under. This should be the + exact url prefix, case sensitive, as the data is registered. + :param local_prefix: The prefix to replace 'registered_prefix' with. This + is the prefix of the path the data is actually saved under. This should be the + exact url prefix, case sensitive, as the data is saved under. + :param replace_windows_sep: If set to True, and the prefix matches, the rest + of the url has all of the windows path separators (backslash '\') replaced with + the native os path separator. + :param replace_linux_sep: If set to True, and the prefix matches, the rest + of the url has all of the linux/unix path separators (slash '/') replaced with + the native os path separator. + """ + + if not registered_prefix or not local_prefix: + raise UsageError("Path substitution prefixes must be non empty strings") + + if replace_windows_sep and replace_linux_sep: + raise UsageError("Only one of replace_windows_sep and replace_linux_sep may be set.") + + rule = cls._PathSubstitutionRule( + registered_prefix=registered_prefix, + local_prefix=local_prefix, + replace_windows_sep=replace_windows_sep, + replace_linux_sep=replace_linux_sep, + ) + + cls._path_substitutions.append(rule) + + @classmethod + def clear_path_substitutions(cls): + """ + Removes all path substitution rules, including ones from the configuration file. + """ + cls._path_substitutions = list() + + def get_object_size_bytes(self, remote_url, silence_errors=False): + # type: (str, bool) -> [int, None] + """ + Get size of the remote file in bytes. + + :param str remote_url: The url where the file is stored. + E.g. 's3://bucket/some_file.txt', 'file://local/file' + :param bool silence_errors: Silence errors that might occur + when fetching the size of the file. Default: False + + :return: The size of the file in bytes. + None if the file could not be found or an error occurred. + """ + obj = self.get_object(remote_url, silence_errors=silence_errors) + return self._get_object_size_bytes(obj, silence_errors) + + def _get_object_size_bytes(self, obj, silence_errors=False): + # type: (object) -> [int, None] + """ + Auxiliary function for `get_object_size_bytes`. + Get size of the remote object in bytes. + + :param object obj: The remote object + :param bool silence_errors: Silence errors that might occur + when fetching the size of the file. Default: False + + :return: The size of the object in bytes. + None if an error occurred. + """ + if not obj: + return None + size = None + try: + if isinstance(self._driver, _HttpDriver) and obj: + obj = self._driver._get_download_object(obj) # noqa + size = int(obj.headers.get("Content-Length", 0)) + elif hasattr(obj, "size"): + size = obj.size + # Google storage has the option to reload the object to get the size + if size is None and hasattr(obj, "reload"): + # noinspection PyBroadException + try: + # To catch google.api_core exceptions + obj.reload() + size = obj.size + except Exception as e: + if not silence_errors: + self.log.warning("Failed obtaining object size on reload: {}('{}')".format( + e.__class__.__name__, str(e))) + elif hasattr(obj, "content_length"): + # noinspection PyBroadException + try: + # To catch botocore exceptions + size = obj.content_length # noqa + except Exception as e: + if not silence_errors: + self.log.warning("Failed obtaining content_length while getting object size: {}('{}')".format( + e.__class__.__name__, str(e))) + except Exception as e: + if not silence_errors: + self.log.warning("Failed getting object size: {}('{}')".format(e.__class__.__name__, str(e))) + return size + + def get_object_metadata(self, obj): + # type: (object) -> dict + """ + Get the metadata of the remote object. + The metadata is a dict containing the following keys: `name`, `size`. + + :param object obj: The remote object + + :return: A dict containing the metadata of the remote object + """ + name_fields = ("name", "url", "key", "blob_name") + metadata = { + "size": self._get_object_size_bytes(obj), + "name": next(filter(None, (getattr(obj, f, None) for f in name_fields)), None), + } + return metadata + + def verify_upload(self, folder_uri='', raise_on_error=True, log_on_error=True): + """ + Verify that this helper can upload files to a folder. + + An upload is possible iff: + 1. the destination folder is under the base uri of the url used to create the helper + 2. the helper has credentials to write to the destination folder + + :param folder_uri: The destination folder to test. Must be an absolute + url that begins with the base uri of the url used to create the helper. + :param raise_on_error: Raise an exception if an upload is not possible + :param log_on_error: Log an error if an upload is not possible + :return: True, if, and only if, an upload to folder_uri is possible. + """ + + folder_uri = self._canonize_url(folder_uri) + + folder_uri = self.conform_url(folder_uri, self._base_url) + + test_path = self._normalize_object_name(folder_uri) + + if self._scheme == _Boto3Driver.scheme: + _Boto3Driver._test_bucket_config( + self._conf, + self._log, + test_path=test_path, + raise_on_error=raise_on_error, + log_on_error=log_on_error, + ) + elif self._scheme == _GoogleCloudStorageDriver.scheme: + self._driver.test_upload(test_path, self._conf) + + elif self._scheme == 'file': + # Check path exists + Path(test_path).mkdir(parents=True, exist_ok=True) + # check path permissions + Path(test_path).touch(exist_ok=True) + + return folder_uri + + def upload_from_stream(self, stream, dest_path, extra=None, retries=1, return_canonized=True): + canonized_dest_path = self._canonize_url(dest_path) + object_name = self._normalize_object_name(canonized_dest_path) + extra = extra.copy() if extra else {} + extra.update(self._extra) + last_ex = None + cb = UploadProgressReport.from_stream(stream, object_name, self._verbose, self._log) + for i in range(max(1, int(retries))): + try: + self._driver.upload_object_via_stream( + iterator=stream, + container=self._container, + object_name=object_name, + callback=cb, + extra=extra) + last_ex = None + break + except Exception as ex: + last_ex = ex + # seek to beginning if possible + # noinspection PyBroadException + try: + stream.seek(0) + except Exception: + pass + if last_ex: + raise last_ex + + result_dest_path = canonized_dest_path if return_canonized else dest_path + + if self.scheme in StorageHelper._quotable_uri_schemes: # TODO: fix-driver-schema + # quote link + result_dest_path = quote_url(result_dest_path) + + return result_dest_path + + def upload( + self, src_path, dest_path=None, extra=None, async_enable=False, cb=None, retries=3, return_canonized=True + ): + if not dest_path: + dest_path = os.path.basename(src_path) + + canonized_dest_path = self._canonize_url(dest_path) + dest_path = dest_path.replace('\\', '/') + canonized_dest_path = canonized_dest_path.replace('\\', '/') + + result_path = canonized_dest_path if return_canonized else dest_path + + if cb and self.scheme in StorageHelper._quotable_uri_schemes: # TODO: fix-driver-schema + # store original callback + a_cb = cb + + # quote link + def callback(result): + return a_cb(quote_url(result_path) if result else result) + # replace callback with wrapper + cb = callback + + if async_enable: + data = self._UploadData( + src_path=src_path, + dest_path=dest_path, + canonized_dest_path=canonized_dest_path, + extra=extra, + callback=cb, + retries=retries, + return_canonized=return_canonized + ) + StorageHelper._initialize_upload_pool() + return StorageHelper._upload_pool.apply_async(self._do_async_upload, args=(data,)) + else: + res = self._do_upload( + src_path=src_path, + dest_path=dest_path, + canonized_dest_path=canonized_dest_path, + extra=extra, + cb=cb, + verbose=False, + retries=retries, + return_canonized=return_canonized) + if res: + result_path = quote_url(result_path) + return result_path + + def list(self, prefix=None, with_metadata=False): + """ + List entries in the helper base path. + + Return a list of names inside this helper base path or a list of dictionaries containing + the objects' metadata. The base path is determined at creation time and is specific + for each storage medium. + For Google Storage and S3 it is the bucket of the path. + For local files it is the root directory. + + This operation is not supported for http and https protocols. + + :param prefix: If None, return the list as described above. If not, it + must be a string - the path of a sub directory under the base path. + the returned list will include only objects under that subdir. + + :param with_metadata: Instead of returning just the names of the objects, return a list of dictionaries + containing the name and metadata of the remote file. Thus, each dictionary will contain the following + keys: `name`, `size`. + + :return: The paths of all the objects in the storage base path under prefix or + a list of dictionaries containing the objects' metadata. + Listed relative to the base path. + """ + if prefix: + prefix = self._canonize_url(prefix) + if prefix.startswith(self._base_url): + prefix = prefix[len(self._base_url):] + if self._base_url != "file://": + prefix = prefix.lstrip("/") + if self._base_url == "file://": + prefix = prefix.rstrip("/") + if prefix.startswith(str(self._driver.base_path)): + prefix = prefix[len(str(self._driver.base_path)):] + res = self._driver.list_container_objects(self._container, ex_prefix=prefix) + result = [ + obj.name if not with_metadata else self.get_object_metadata(obj) + for obj in res + ] + + if self._base_url == "file://": + if not with_metadata: + result = [Path(f).as_posix() for f in result] + else: + for metadata_entry in result: + metadata_entry["name"] = Path(metadata_entry["name"]).as_posix() + return result + else: + return [ + obj.name if not with_metadata else self.get_object_metadata(obj) + for obj in self._driver.list_container_objects(self._container) + ] + + def download_to_file( + self, + remote_path, + local_path, + overwrite_existing=False, + delete_on_failure=True, + verbose=None, + skip_zero_size_check=False, + silence_errors=False, + direct_access=True + ): + def next_chunk(astream): + if isinstance(astream, binary_type): + chunk = astream + astream = None + elif astream: + try: + chunk = next(astream) + except StopIteration: + chunk = None + else: + chunk = None + return chunk, astream + + remote_path = self._canonize_url(remote_path) + verbose = self._verbose if verbose is None else verbose + + tmp_remote_path = remote_path + # noinspection PyBroadException + try: + tmp_remote_path = normalize_local_path(tmp_remote_path) + if tmp_remote_path.exists(): + remote_path = "file://{}".format(str(tmp_remote_path)) + except Exception: + pass + # Check if driver type supports direct access: + direct_access_path = self.get_driver_direct_access(remote_path) + if direct_access_path and direct_access: + return direct_access_path + + temp_local_path = None + try: + if verbose: + self._log.info('Start downloading from %s' % remote_path) + if not overwrite_existing and Path(local_path).is_file(): + self._log.debug( + 'File {} already exists, no need to download, thread id = {}'.format( + local_path, + threading.current_thread().ident, + ), + ) + + return local_path + if remote_path.startswith("file://"): + Path(local_path).parent.mkdir(parents=True, exist_ok=True) + # use remote_path, because direct_access_path might be None, because of access_rules + # len("file://") == 7 + shutil.copyfile(remote_path[7:], local_path) + return local_path + # we download into temp_local_path so that if we accidentally stop in the middle, + # we won't think we have the entire file + temp_local_path = '{}_{}{}'.format(local_path, time(), self._temp_download_suffix) + obj = self.get_object(remote_path, silence_errors=silence_errors) + if not obj: + return None + + # object size in bytes + total_size_mb = -1 + dl_total_mb = 0. + download_reported = False + # chunks size is ignored and always 5Mb + chunk_size_mb = 5 + + # make sure we have the destination folder + # noinspection PyBroadException + Path(temp_local_path).parent.mkdir(parents=True, exist_ok=True) + + total_size_bytes = self.get_object_size_bytes(remote_path, silence_errors=silence_errors) + if total_size_bytes is not None: + total_size_mb = float(total_size_bytes) / (1024 * 1024) + + # if driver supports download with callback, use it (it might be faster) + if hasattr(self._driver, 'download_object'): + # callback + cb = DownloadProgressReport(total_size_mb, verbose, remote_path, self._log) + self._driver.download_object(obj, temp_local_path, callback=cb) + download_reported = bool(cb.last_reported) + dl_total_mb = cb.current_status_mb + else: + stream = self._driver.download_object_as_stream(obj, chunk_size_mb * 1024 * 1024) + if stream is None: + raise ValueError('Could not download %s' % remote_path) + with open(temp_local_path, 'wb') as fd: + data, stream = next_chunk(stream) + while data: + fd.write(data) + data, stream = next_chunk(stream) + + if not skip_zero_size_check and Path(temp_local_path).stat().st_size <= 0: + raise Exception('downloaded a 0-sized file') + + # if we are on Windows, we need to remove the target file before renaming + # otherwise posix rename will overwrite the target + if os.name != 'posix': + # noinspection PyBroadException + try: + os.remove(local_path) + except Exception: + pass + + # rename temp file to local_file + # noinspection PyBroadException + try: + os.rename(temp_local_path, local_path) + except Exception: + # noinspection PyBroadException + try: + os.unlink(temp_local_path) + except Exception: + pass + # file was downloaded by a parallel process, check we have the final output and delete the partial copy + path_local_path = Path(local_path) + if not path_local_path.is_file() or (not skip_zero_size_check and path_local_path.stat().st_size <= 0): + raise Exception('Failed renaming partial file, downloaded file exists and a 0-sized file') + + # report download if we are on the second chunk + if verbose or download_reported: + self._log.info( + 'Downloaded %.2f MB successfully from %s , saved to %s' % (dl_total_mb, remote_path, local_path)) + return local_path + except DownloadError: + raise + except Exception as e: + self._log.error("Could not download {} , err: {} ".format(remote_path, e)) + if delete_on_failure: + # noinspection PyBroadException + try: + os.remove(temp_local_path) + except Exception: + pass + return None + + def download_as_stream(self, remote_path, chunk_size=None): + remote_path = self._canonize_url(remote_path) + try: + obj = self.get_object(remote_path) + return self._driver.download_object_as_stream( + obj, chunk_size=chunk_size, verbose=self._verbose, log=self.log + ) + except DownloadError: + raise + except Exception as e: + self._log.error("Could not download file : %s, err:%s " % (remote_path, str(e))) + return None + + def download_as_nparray(self, remote_path, chunk_size=None): + try: + stream = self.download_as_stream(remote_path, chunk_size) + if stream is None: + return + + # TODO: ugly py3 hack, please remove ASAP + if six.PY3 and not isinstance(stream, GeneratorType): + import numpy as np + return np.frombuffer(stream, dtype=np.uint8) + else: + import numpy as np + return np.asarray(bytearray(b''.join(stream)), dtype=np.uint8) + + except Exception as e: + self._log.error("Could not download file : %s, err:%s " % (remote_path, str(e))) + + def delete(self, path): + path = self._canonize_url(path) + return self._driver.delete_object(self.get_object(path)) + + def check_write_permissions(self, dest_path=None): + # create a temporary file, then delete it + base_url = dest_path or self._base_url + dest_path = base_url + "/.clearml.{}.test".format(str(uuid.uuid4())) + # do not check http/s connection permissions + if dest_path.startswith("http"): + return True + + try: + self.upload_from_stream(stream=six.BytesIO(b"clearml"), dest_path=dest_path) + except Exception: + raise ValueError("Insufficient permissions (write failed) for {}".format(base_url)) + try: + self.delete(path=dest_path) + except Exception: + raise ValueError("Insufficient permissions (delete failed) for {}".format(base_url)) + return True + + @classmethod + def download_from_url(cls, remote_url, local_path, overwrite_existing=False, skip_zero_size_check=False): + """ + Download a file from remote URL to a local storage + + :param remote_url: Remote URL. Example: https://example.com/image.jpg or s3://bucket/folder/file.mp4 etc. + :param local_path: target location for downloaded file. Example: /tmp/image.jpg + :param overwrite_existing: If True, and local_path exists, it will overwrite it, otherwise print warning + :param skip_zero_size_check: If True, no error will be raised for files with zero bytes size. + :return: local_path if download was successful. + """ + helper = cls.get(remote_url) + if not helper: + return None + return helper.download_to_file( + remote_url, local_path, overwrite_existing=overwrite_existing, skip_zero_size_check=skip_zero_size_check + ) + + def get_driver_direct_access(self, path): + """ + Check if the helper's driver has a direct access to the file + + :param str path: file path to check access to + :return: Return the string representation of the file as path if have access to it, else None + """ + + return self._driver.get_direct_access(path) + + @classmethod + def _canonize_url(cls, url): + return cls._apply_url_substitutions(url) + + @classmethod + def _apply_url_substitutions(cls, url): + def replace_separator(_url, where, sep): + return _url[:where] + _url[where:].replace(sep, os.sep) + + for index, rule in enumerate(cls._path_substitutions): + if url.startswith(rule.registered_prefix): + url = url.replace( + rule.registered_prefix, + rule.local_prefix, + 1, # count. str.replace() does not support keyword arguments + ) + + if rule.replace_windows_sep: + url = replace_separator(url, len(rule.local_prefix), '\\') + + if rule.replace_linux_sep: + url = replace_separator(url, len(rule.local_prefix), '/') + + break + + return url + + @classmethod + def _resolve_base_url(cls, base_url): + parsed = urlparse(base_url) + if parsed.scheme == _Boto3Driver.scheme: + conf = cls._s3_configurations.get_config_by_uri(base_url) + bucket = conf.bucket + if not bucket: + parts = Path(parsed.path.strip('/')).parts + if parts: + bucket = parts[0] + return '/'.join(x for x in ('s3:/', conf.host, bucket) if x) + elif parsed.scheme == _AzureBlobServiceStorageDriver.scheme: + conf = cls._azure_configurations.get_config_by_uri(base_url) + if not conf: + raise StorageError("Can't find azure configuration for {}".format(base_url)) + return str(furl(base_url).set(path=conf.container_name)) + elif parsed.scheme == _GoogleCloudStorageDriver.scheme: + conf = cls._gs_configurations.get_config_by_uri(base_url) + return str(furl(scheme=parsed.scheme, netloc=conf.bucket)) + elif parsed.scheme in _HttpDriver.schemes: + for files_server in _Driver.get_file_server_hosts(): + if base_url.startswith(files_server): + return files_server + return parsed.scheme + "://" + else: # if parsed.scheme == 'file': + # if we do not know what it is, we assume file + return 'file://' + + @classmethod + def conform_url(cls, folder_uri, base_url=None): + if not folder_uri: + return folder_uri + _base_url = cls._resolve_base_url(folder_uri) if not base_url else base_url + + if not folder_uri.startswith(_base_url): + prev_folder_uri = folder_uri + if _base_url == 'file://': + folder_uri = str(Path(folder_uri).absolute()) + if folder_uri.startswith('/'): + folder_uri = _base_url + folder_uri + elif platform.system() == "Windows": + folder_uri = ''.join((_base_url, folder_uri)) + else: + folder_uri = '/'.join((_base_url, folder_uri)) + + cls._get_logger().debug('Upload destination {} amended to {} for registration purposes'.format( + prev_folder_uri, folder_uri)) + else: + raise ValueError('folder_uri: {} does not start with base url: {}'.format(folder_uri, _base_url)) + + return folder_uri + + def _absolute_object_name(self, path): + """ Returns absolute remote path, including any prefix that is handled by the container """ + if not path.startswith(self.base_url): + return self.base_url.rstrip('/') + '///' + path.lstrip('/') + return path + + def _normalize_object_name(self, path): + """ Normalize remote path. Remove any prefix that is already handled by the container """ + if path.startswith(self.base_url): + path = path[len(self.base_url):] + if path.startswith('/') and os.name == 'nt': + path = path[1:] + if self.scheme in (_Boto3Driver.scheme, _GoogleCloudStorageDriver.scheme, + _AzureBlobServiceStorageDriver.scheme): + path = path.lstrip('/') + return path + + def _do_async_upload(self, data): + assert isinstance(data, self._UploadData) + return self._do_upload(data.src_path, data.dest_path, data.canonized_dest_path, extra=data.extra, cb=data.callback, verbose=True, retries=data.retries, return_canonized=data.return_canonized) + + def _upload_from_file(self, local_path, dest_path, extra=None): + if not hasattr(self._driver, 'upload_object'): + with open(local_path, 'rb') as stream: + res = self.upload_from_stream(stream=stream, dest_path=dest_path, extra=extra) + else: + object_name = self._normalize_object_name(dest_path) + extra = extra.copy() if extra else {} + extra.update(self._extra) + cb = UploadProgressReport.from_file(local_path, self._verbose, self._log) + res = self._driver.upload_object( + file_path=local_path, + container=self._container, + object_name=object_name, + callback=cb, + extra=extra) + return res + + def _do_upload(self, src_path, dest_path, canonized_dest_path, extra=None, cb=None, verbose=False, retries=1, return_canonized=False): + object_name = self._normalize_object_name(canonized_dest_path) + if cb: + try: + cb(None) + except Exception as e: + self._log.error("Calling upload callback when starting upload: %s" % str(e)) + if verbose: + msg = 'Starting upload: {} => {}{}'.format( + src_path, + (self._container.name if self._container.name.endswith('/') else self._container.name + '/') + if self._container and self._container.name else '', object_name) + if object_name.startswith('file://') or object_name.startswith('/'): + self._log.debug(msg) + else: + self._log.info(msg) + last_ex = None + for i in range(max(1, int(retries))): + try: + if not self._upload_from_file(local_path=src_path, dest_path=canonized_dest_path, extra=extra): + # retry if failed + last_ex = ValueError("Upload failed") + continue + last_ex = None + break + except Exception as e: + last_ex = e + + if last_ex: + self._log.error("Exception encountered while uploading %s" % str(last_ex)) + if cb: + try: + cb(False) + except Exception as e: + self._log.warning("Exception on upload callback: %s" % str(e)) + raise last_ex + + if verbose: + self._log.debug("Finished upload: %s => %s" % (src_path, object_name)) + if cb: + try: + cb(canonized_dest_path if return_canonized else dest_path) + except Exception as e: + self._log.warning("Exception on upload callback: %s" % str(e)) + + return canonized_dest_path if return_canonized else dest_path + + def get_object(self, path, silence_errors=False): + # type: (str, bool) -> object + """ + Gets the remote object stored at path. The data held by the object + differs depending on where it is stored. + + :param str path: the path where the remote object is stored + :param bool silence_errors: Silence errors that might occur + when fetching the remote object + + :return: The remote object + """ + path = self._canonize_url(path) + object_name = self._normalize_object_name(path) + try: + return self._driver.get_object( + container_name=self._container.name if self._container else '', object_name=object_name) + except ConnectionError: + raise DownloadError + except Exception as e: + if not silence_errors: + self.log.warning("Storage helper problem for {}: {}".format(str(object_name), str(e))) + return None + + @staticmethod + def _initialize_upload_pool(): + if not StorageHelper._upload_pool or StorageHelper._upload_pool_pid != os.getpid(): + StorageHelper._upload_pool_pid = os.getpid() + StorageHelper._upload_pool = ThreadPool(processes=1) + + @staticmethod + def close_async_threads(): + if StorageHelper._upload_pool: + pool = StorageHelper._upload_pool + StorageHelper._upload_pool = None + # noinspection PyBroadException + try: + pool.terminate() + pool.join() + except Exception: + pass + + def exists_file(self, remote_url): + remote_url = self._canonize_url(remote_url) + object_name = self._normalize_object_name(remote_url) + return self._driver.exists_file( + container_name=self._container.name if self._container else "", object_name=object_name + ) + + + def normalize_local_path(local_path): """ Get a normalized local path diff --git a/clearml/storage/util.py b/clearml/storage/util.py index c1523fbd..0d3b17fa 100644 --- a/clearml/storage/util.py +++ b/clearml/storage/util.py @@ -44,7 +44,7 @@ def get_config_object_matcher(**patterns): def quote_url(url): parsed = urlparse(url) - if parsed.scheme not in ("http", "https"): + if parsed.scheme not in ("http", "https", "gs"): return url parsed = parsed._replace(path=quote(parsed.path)) return urlunparse(parsed)