diff --git a/requirements.txt b/requirements.txt index af4ca203..43f0ab10 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,3 @@ -apache-libcloud>=2.2.1 attrs>=18.0 backports.functools-lru-cache>=1.0.2 ; python_version < '3' enum34>=0.9 ; python_version < '3.6' diff --git a/trains/backend_interface/task/repo/scriptinfo.py b/trains/backend_interface/task/repo/scriptinfo.py index 6a0da3a9..18ad9d8d 100644 --- a/trains/backend_interface/task/repo/scriptinfo.py +++ b/trains/backend_interface/task/repo/scriptinfo.py @@ -108,6 +108,26 @@ class ScriptRequirements(object): modules |= fmodules try_imports |= try_ipts + # hack: forcefully insert storage modules if we have them + # noinspection PyBroadException + try: + import boto3 + modules.add('boto3', 'trains.storage', 0) + except Exception: + pass + # noinspection PyBroadException + try: + from google.cloud import storage + modules.add('google_cloud_storage', 'trains.storage', 0) + except Exception: + pass + # noinspection PyBroadException + try: + from azure.storage.blob import ContentSettings + modules.add('azure_storage_blob', 'trains.storage', 0) + except Exception: + pass + return modules, try_imports, local_mods @staticmethod @@ -247,7 +267,7 @@ class _JupyterObserver(object): requirements_txt = '' # parse jupyter python script and prepare pip requirements (pigar) # if backend supports requirements - if file_import_modules and Session.api_version > '2.1': + if file_import_modules and Session.check_min_api_version('2.2'): fmodules, _ = file_import_modules(notebook.parts[-1], script_code) installed_pkgs = get_installed_pkgs_detail() reqs = ReqsModules() @@ -435,7 +455,7 @@ class ScriptInfo(object): # if this is not jupyter, get the requirements.txt requirements = '' # create requirements if backend supports requirements - if create_requirements and not jupyter_filepath and Session.api_version > '2.1': + if create_requirements and not jupyter_filepath and Session.check_min_api_version('2.2'): script_requirements = ScriptRequirements(Path(repo_root).as_posix()) requirements = script_requirements.get_requirements() diff --git a/trains/storage/helper.py b/trains/storage/helper.py index ff13912a..85a8c6ae 100644 --- a/trains/storage/helper.py +++ b/trains/storage/helper.py @@ -1,7 +1,13 @@ +from __future__ import with_statement + +import errno import getpass import json import os +import shutil +import sys import threading +from collections import namedtuple from concurrent.futures import ThreadPoolExecutor from copy import copy from datetime import datetime @@ -15,9 +21,6 @@ import six from _socket import gethostname from attr import attrs, attrib, asdict from furl import furl -from libcloud.common.types import ProviderError, LibcloudError -from libcloud.storage.providers import get_driver -from libcloud.storage.types import Provider from pathlib2 import Path from requests.exceptions import ConnectionError from six import binary_type @@ -203,7 +206,12 @@ class StorageHelper(object): return cls._helpers[instance_key] # Don't canonize URL since we already did it - instance = cls(base_url=base_url, url=url, logger=logger, canonize_url=False, **kwargs) + try: + instance = cls(base_url=base_url, url=url, logger=logger, canonize_url=False, **kwargs) + except Exception: + log.error("Failed credentials for {}".format(base_url or url)) + return None + cls._helpers[instance_key] = instance return instance @@ -224,105 +232,7 @@ class StorageHelper(object): parsed = urlparse(url) self._scheme = parsed.scheme - # if self._scheme == 'libcloud-s3': - # self._conf = copy(self._s3_configurations.get_config_by_uri(url)) - # self._secure = self._conf.secure - # - # final_region = region if region else self._conf.region - # if not final_region: - # final_region = None - # - # self._conf.update( - # key=key or self._conf.key, - # secret=secret or self._conf.secret, - # multipart=self._conf.multipart, - # region=final_region, - # ) - # - # if not self._conf.key or not self._conf.secret: - # raise ValueError('Missing key and secret for S3 storage access (%s)' % base_url) - # - # def init_driver_and_container(host, port, bucket): - # s3_region_to_libcloud_driver = { - # None: Provider.S3, - # "": Provider.S3, - # 'us-east-1': Provider.S3, - # 'ap-northeast': Provider.S3_AP_NORTHEAST, - # 'ap-northeast-1': Provider.S3_AP_NORTHEAST1, - # 'ap-northeast-2': Provider.S3_AP_NORTHEAST2, - # 'ap-south': Provider.S3_AP_SOUTH, - # 'ap-south-1': Provider.S3_AP_SOUTH, - # 'ap-southeast': Provider.S3_AP_SOUTHEAST, - # 'ap-southeast-1': Provider.S3_AP_SOUTHEAST, - # 'ap-southeast-2': Provider.S3_AP_SOUTHEAST2, - # 'ca-central': Provider.S3_CA_CENTRAL, - # 'cn-north': Provider.S3_CN_NORTH, - # 'eu-west': Provider.S3_EU_WEST, - # 'eu-west-1': Provider.S3_EU_WEST, - # 'eu-west-2': Provider.S3_EU_WEST2, - # 'eu-central': Provider.S3_EU_CENTRAL, - # 'eu-central-1': Provider.S3_EU_CENTRAL, - # 'sa-east': Provider.S3_SA_EAST, - # 'sa-east-1': Provider.S3_SA_EAST, - # 'us-east-2': Provider.S3_US_EAST2, - # 'us-west': Provider.S3_US_WEST, - # 'us-west-1': Provider.S3_US_WEST, - # 'us-west-2': Provider.S3_US_WEST_OREGON, - # 'us-west-oregon': Provider.S3_US_WEST_OREGON, - # 'us-gov-west': Provider.S3_US_GOV_WEST, - # 'rgw': Provider.S3_RGW, - # 'rgw_outscale': Provider.S3_RGW_OUTSCALE, - # } - # - # driver_name = s3_region_to_libcloud_driver.get( - # self._conf.region or _Boto3Driver._get_bucket_region(self._conf, self._log) - # ) - # - # if not driver_name: - # self._log.error("Invalid S3 region `%s`: no driver found" % self._conf.region) - # raise ValueError("Invalid s3 region") - # - # host = host or None - # port = port or None - # - # try: - # driver = get_driver(driver_name)( - # self._conf.key, - # self._conf.secret, - # host=host, - # port=port, - # secure=self._secure, - # region=self._conf.region, - # ) - # - # driver.supports_s3_multipart_upload = self._conf.multipart - # container = driver.get_container(container_name=bucket) - # - # except LibcloudError: - # attempted_uri = str(furl(host=host, port=port, path=bucket)).strip('/') - # self._log.error( - # 'Could not create S3 driver for {} in region {}'.format(attempted_uri, self._conf.region)) - # raise - # - # return driver, container - # - # parts = Path(parsed.path.strip('/')).parts - # first_part = parts[0] if parts else "" - # if not self._conf.host and not self._conf.bucket: - # # configuration has no indication of host or bucket, we'll just go with what we have - # try: - # self._driver, self._container = init_driver_and_container(parsed.netloc, None, first_part) - # except Exception as e: - # self._driver, self._container = init_driver_and_container(None, None, parsed.netloc) - # else: - # # configuration provides at least one of host/bucket - # host, _, port = (self._conf.host or '').partition(':') - # port = int(port) if port else None - # bucket = self._conf.bucket or first_part - # self._driver, self._container = init_driver_and_container(host, port, bucket) - # - # if self._conf.acl: - # self._extra['acl'] = self._conf.acl + if self._scheme == _AzureBlobServiceStorageDriver.scheme: self._conf = copy(self._azure_configurations.get_config_by_uri(url)) if self._conf is None: @@ -387,7 +297,7 @@ class StorageHelper(object): # assume a folder and create Path(driver_uri).mkdir(parents=True, exist_ok=True) - self._driver = get_driver(Provider.LOCAL)(driver_uri) + self._driver = _FileStorageDriver(driver_uri) self._container = self._driver.get_container(container_name='.') @classmethod @@ -692,15 +602,17 @@ class StorageHelper(object): (dl_total_mb, total_size_mb, dl_rate, remote_path)) data, stream, tic = next_chunk(stream) - # remove target local_path if already exists - try: - os.remove(local_path) - except: - pass - if Path(temp_local_path).stat().st_size <= 0: raise Exception('downloaded a 0-sized file') + # if we are on windows, we need to remove the target file before renaming + # otherwise posix rename will overwrite the target + if os.name != 'posix': + try: + os.remove(local_path) + except Exception: + pass + # rename temp file to local_file os.rename(temp_local_path, local_path) # report download if we are on the second chunk @@ -763,6 +675,21 @@ class StorageHelper(object): raise ValueError('Insufficient permissions for {}'.format(base_url)) return True + @classmethod + def download_from_url(cls, remote_url, local_path, overwrite_existing=False): + """ + Download a file from remote URL to a local storage + + :param remote_url: Remote URL. Example: https://example.com/image.jpg or s3://bucket/folder/file.mp4 etc. + :param local_path: target location for downloaded file. Example: /tmp/image.jpg + :param overwrite_existing: If True and local_path exists, it will overwrite it, otherwise print warning + :return: local_path if download was successful + """ + helper = cls.get(remote_url) + if not helper: + return None + return helper.download_to_file(remote_url, local_path, overwrite_existing=overwrite_existing) + @classmethod def _canonize_url(cls, url): return cls._apply_url_substitutions(url) @@ -912,8 +839,6 @@ class StorageHelper(object): object_name = self._normalize_object_name(path) try: return self._driver.get_object(container_name=self._container.name, object_name=object_name) - except ProviderError: - raise except ConnectionError as ex: raise DownloadError except Exception as e: @@ -1647,3 +1572,528 @@ class _AzureBlobServiceStorageDriver(object): return f.path.segments[0], os.path.join(*f.path.segments[1:]) return name + + +class _FileStorageDriver(object): + """ + A base StorageDriver to derive from. + """ + + scheme = "file" + CHUNK_SIZE = 8096 + IGNORE_FOLDERS = ['.lock', '.hash'] + Object = namedtuple("Object", ['name', 'size', 'extra', 'driver', 'container', 'hash', 'meta_data']) + + class _Container(object): + def __init__(self, name, extra, driver): + self.name = name + self.extra = extra + self.driver = driver + + def __init__(self, key, secret=None, secure=True, host=None, port=None, + **kwargs): + + # Use the key as the path to the storage + self.base_path = key + + if not os.path.isdir(self.base_path): + raise ValueError('The base path is not a directory') + + def _make_path(self, path, ignore_existing=True): + """ + Create a path by checking if it already exists + """ + + try: + os.makedirs(path) + except OSError: + exp = sys.exc_info()[1] + if exp.errno == errno.EEXIST and not ignore_existing: + raise exp + + def _check_container_name(self, container_name): + """ + Check if the container name is valid + + :param container_name: Container name + :type container_name: ``str`` + """ + + if '/' in container_name or '\\' in container_name: + raise ValueError(value=None, driver=self, container_name=container_name) + + def _make_container(self, container_name): + """ + Create a container instance + + :param container_name: Container name. + :type container_name: ``str`` + + :return: Container instance. + :rtype: :class:`Container` + """ + + self._check_container_name(container_name) + + full_path = os.path.realpath(os.path.join(self.base_path, container_name)) + + try: + stat = os.stat(full_path) + if not os.path.isdir(full_path): + raise OSError('Target path is not a directory') + except OSError: + raise ValueError(value=None, driver=self, container_name=container_name) + + extra = {} + extra['creation_time'] = stat.st_ctime + extra['access_time'] = stat.st_atime + extra['modify_time'] = stat.st_mtime + + return self._Container(name=container_name, extra=extra, driver=self) + + def _make_object(self, container, object_name): + """ + Create an object instance + + :param container: Container. + :type container: :class:`Container` + + :param object_name: Object name. + :type object_name: ``str`` + + :return: Object instance. + :rtype: :class:`Object` + """ + + full_path = os.path.realpath(os.path.join(self.base_path, container.name, object_name)) + + if os.path.isdir(full_path): + raise ValueError(value=None, driver=self, object_name=object_name) + + try: + stat = os.stat(full_path) + except Exception: + raise ValueError(value=None, driver=self, object_name=object_name) + + extra = {} + extra['creation_time'] = stat.st_ctime + extra['access_time'] = stat.st_atime + extra['modify_time'] = stat.st_mtime + + return self.Object(name=object_name, size=stat.st_size, extra=extra, + driver=self, container=container, hash=None, meta_data=None) + + def iterate_containers(self): + """ + Return a generator of containers. + + :return: A generator of Container instances. + :rtype: ``generator`` of :class:`Container` + """ + + for container_name in os.listdir(self.base_path): + full_path = os.path.join(self.base_path, container_name) + if not os.path.isdir(full_path): + continue + yield self._make_container(container_name) + + def _get_objects(self, container): + """ + Recursively iterate through the file-system and return the object names + """ + + cpath = self.get_container_cdn_url(container, check=True) + + for folder, subfolders, files in os.walk(cpath, topdown=True): + # Remove unwanted subfolders + for subf in self.IGNORE_FOLDERS: + if subf in subfolders: + subfolders.remove(subf) + + for name in files: + full_path = os.path.join(folder, name) + object_name = os.path.relpath(full_path, start=cpath) + yield self._make_object(container, object_name) + + def iterate_container_objects(self, container): + """ + Returns a generator of objects for the given container. + + :param container: Container instance + :type container: :class:`Container` + + :return: A generator of Object instances. + :rtype: ``generator`` of :class:`Object` + """ + + return self._get_objects(container) + + def get_container(self, container_name): + """ + Return a container instance. + + :param container_name: Container name. + :type container_name: ``str`` + + :return: :class:`Container` instance. + :rtype: :class:`Container` + """ + return self._make_container(container_name) + + def get_container_cdn_url(self, container, check=False): + """ + Return a container CDN URL. + + :param container: Container instance + :type container: :class:`Container` + + :param check: Indicates if the path's existence must be checked + :type check: ``bool`` + + :return: A CDN URL for this container. + :rtype: ``str`` + """ + path = os.path.realpath(os.path.join(self.base_path, container.name)) + + if check and not os.path.isdir(path): + raise ValueError(value=None, driver=self, container_name=container.name) + + return path + + def get_object(self, container_name, object_name): + """ + Return an object instance. + + :param container_name: Container name. + :type container_name: ``str`` + + :param object_name: Object name. + :type object_name: ``str`` + + :return: :class:`Object` instance. + :rtype: :class:`Object` + """ + container = self._make_container(container_name) + return self._make_object(container, object_name) + + def get_object_cdn_url(self, obj): + """ + Return an object CDN URL. + + :param obj: Object instance + :type obj: :class:`Object` + + :return: A CDN URL for this object. + :rtype: ``str`` + """ + return os.path.realpath(os.path.join(self.base_path, obj.container.name, obj.name)) + + def download_object(self, obj, destination_path, overwrite_existing=False, + delete_on_failure=True): + """ + Download an object to the specified destination path. + + :param obj: Object instance. + :type obj: :class:`Object` + + :param destination_path: Full path to a file or a directory where the + incoming file will be saved. + :type destination_path: ``str`` + + :param overwrite_existing: True to overwrite an existing file, + defaults to False. + :type overwrite_existing: ``bool`` + + :param delete_on_failure: True to delete a partially downloaded file if + the download was not successful (hash mismatch / file size). + :type delete_on_failure: ``bool`` + + :return: True if an object has been successfully downloaded, False + otherwise. + :rtype: ``bool`` + """ + + obj_path = self.get_object_cdn_url(obj) + base_name = os.path.basename(destination_path) + + if not base_name and not os.path.exists(destination_path): + raise ValueError( + value='Path %s does not exist' % (destination_path), + driver=self) + + if not base_name: + file_path = os.path.join(destination_path, obj.name) + else: + file_path = destination_path + + if os.path.exists(file_path) and not overwrite_existing: + raise ValueError('File %s already exists, but ' % (file_path) + 'overwrite_existing=False') + + try: + shutil.copy(obj_path, file_path) + except IOError: + if delete_on_failure: + try: + os.unlink(file_path) + except Exception: + pass + return False + + return True + + def download_object_as_stream(self, obj, chunk_size=None): + """ + Return a generator which yields object data. + + :param obj: Object instance + :type obj: :class:`Object` + + :param chunk_size: Optional chunk size (in bytes). + :type chunk_size: ``int`` + + :return: A stream of binary chunks of data. + :rtype: ``object`` + """ + path = self.get_object_cdn_url(obj) + with open(path, 'rb') as obj_file: + for data in self._read_in_chunks(obj_file, chunk_size=chunk_size): + yield data + + def upload_object(self, file_path, container, object_name, extra=None, + verify_hash=True): + """ + Upload an object currently located on a disk. + + :param file_path: Path to the object on disk. + :type file_path: ``str`` + + :param container: Destination container. + :type container: :class:`Container` + + :param object_name: Object name. + :type object_name: ``str`` + + :param verify_hash: Verify hast + :type verify_hash: ``bool`` + + :param extra: (optional) Extra attributes (driver specific). + :type extra: ``dict`` + + :rtype: ``object`` + """ + + path = self.get_container_cdn_url(container, check=True) + obj_path = os.path.join(path, object_name) + base_path = os.path.dirname(obj_path) + + self._make_path(base_path) + + shutil.copy(file_path, obj_path) + + os.chmod(obj_path, int('664', 8)) + + return self._make_object(container, object_name) + + def upload_object_via_stream(self, iterator, container, + object_name, + extra=None): + """ + Upload an object using an iterator. + + If a provider supports it, chunked transfer encoding is used and you + don't need to know in advance the amount of data to be uploaded. + + Otherwise if a provider doesn't support it, iterator will be exhausted + so a total size for data to be uploaded can be determined. + + Note: Exhausting the iterator means that the whole data must be + buffered in memory which might result in memory exhausting when + uploading a very large object. + + If a file is located on a disk you are advised to use upload_object + function which uses fs.stat function to determine the file size and it + doesn't need to buffer whole object in the memory. + + :type iterator: ``object`` + :param iterator: An object which implements the iterator + interface and yields binary chunks of data. + + :type container: :class:`Container` + :param container: Destination container. + + :type object_name: ``str`` + :param object_name: Object name. + + :type extra: ``dict`` + :param extra: (optional) Extra attributes (driver specific). Note: + This dictionary must contain a 'content_type' key which represents + a content type of the stored object. + + :rtype: ``object`` + """ + path = self.get_container_cdn_url(container, check=True) + obj_path = os.path.join(path, object_name) + base_path = os.path.dirname(obj_path) + self._make_path(base_path) + + obj_path = os.path.realpath(obj_path) + with open(obj_path, 'wb') as obj_file: + obj_file.write(iterator.read() if hasattr(iterator, 'read') else bytes(iterator)) + + os.chmod(obj_path, int('664', 8)) + return self._make_object(container, object_name) + + def delete_object(self, obj): + """ + Delete an object. + + :type obj: :class:`Object` + :param obj: Object instance. + + :return: ``bool`` True on success. + :rtype: ``bool`` + """ + + path = self.get_object_cdn_url(obj) + + try: + os.unlink(path) + except Exception: + return False + + # # Check and delete all the empty parent folders + # path = os.path.dirname(path) + # container_url = obj.container.get_cdn_url() + # + # # Delete the empty parent folders till the container's level + # while path != container_url: + # try: + # os.rmdir(path) + # except OSError: + # exp = sys.exc_info()[1] + # if exp.errno == errno.ENOTEMPTY: + # break + # raise exp + # + # path = os.path.dirname(path) + + return True + + def create_container(self, container_name): + """ + Create a new container. + + :type container_name: ``str`` + :param container_name: Container name. + + :return: :class:`Container` instance on success. + :rtype: :class:`Container` + """ + + self._check_container_name(container_name) + + path = os.path.join(self.base_path, container_name) + + try: + self._make_path(path, ignore_existing=False) + except OSError: + exp = sys.exc_info()[1] + if exp.errno == errno.EEXIST: + raise ValueError('Container %s with this name already exists. The name ' + 'must be unique among all the containers in the ' + 'system' % container_name) + else: + raise ValueError( 'Error creating container %s' % container_name) + except Exception: + raise ValueError('Error creating container %s' % container_name) + + return self._make_container(container_name) + + def delete_container(self, container): + """ + Delete a container. + + :type container: :class:`Container` + :param container: Container instance + + :return: True on success, False otherwise. + :rtype: ``bool`` + """ + + # Check if there are any objects inside this + for obj in self._get_objects(container): + raise ValueError(value='Container %s is not empty' % container.name) + + path = self.get_container_cdn_url(container, check=True) + + try: + shutil.rmtree(path) + except Exception: + return False + + return True + + def list_container_objects(self, container, **kwargs): + return list(self.iterate_container_objects(container)) + + @staticmethod + def _read_in_chunks(iterator, chunk_size=None, fill_size=False, yield_empty=False): + """ + Return a generator which yields data in chunks. + + :param iterator: An object which implements an iterator interface + or a File like object with read method. + :type iterator: :class:`object` which implements iterator interface. + + :param chunk_size: Optional chunk size (defaults to CHUNK_SIZE) + :type chunk_size: ``int`` + + :param fill_size: If True, make sure chunks are exactly chunk_size in + length (except for last chunk). + :type fill_size: ``bool`` + + :param yield_empty: If true and iterator returned no data, only yield empty + bytes object + :type yield_empty: ``bool`` + + TODO: At some point in the future we could use byte arrays here if version + >= Python 3. This should speed things up a bit and reduce memory usage. + """ + chunk_size = chunk_size or _FileStorageDriver.CHUNK_SIZE + if six.PY3: + from io import FileIO as file + + if isinstance(iterator, (file)): + get_data = iterator.read + args = (chunk_size,) + else: + get_data = next + args = (iterator,) + + data = bytes('') + empty = False + + while not empty or len(data) > 0: + if not empty: + try: + chunk = bytes(get_data(*args)) + if len(chunk) > 0: + data += chunk + else: + empty = True + except StopIteration: + empty = True + + if len(data) == 0: + if empty and yield_empty: + yield bytes('') + + return + + if fill_size: + if empty or len(data) >= chunk_size: + yield data[:chunk_size] + data = data[chunk_size:] + else: + yield data + data = bytes('')