From 0a8cf706bd6815668a91e877c4525e34824a2aa0 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Mon, 19 Aug 2019 21:15:31 +0300 Subject: [PATCH] Add initial Azure storage support --- docs/trains.conf | 12 + trains/backend_config/bucket_config.py | 64 +++ trains/config/__init__.py | 4 +- trains/config/default/sdk.conf | 12 + trains/storage/helper.py | 669 +++++++++++++++++-------- 5 files changed, 552 insertions(+), 209 deletions(-) diff --git a/docs/trains.conf b/docs/trains.conf index 5ac7d4fb..f17965e6 100644 --- a/docs/trains.conf +++ b/docs/trains.conf @@ -105,6 +105,15 @@ sdk { # }, # ] } + azure.storage { + # containers: [ + # { + # account_name: "trains" + # account_key: "secret" + # # container_name: + # } + # ] + } log { # debugging feature: set this to true to make null log propagate messages to root logger (so they appear in stdout) @@ -136,6 +145,9 @@ sdk { # Status report period in seconds report_period_sec: 2 + # ping to the server - check connectivity + ping_period_sec: 30 + # Log all stdout & stderr log_stdout: True } diff --git a/trains/backend_config/bucket_config.py b/trains/backend_config/bucket_config.py index 14d1a7d5..24380a78 100644 --- a/trains/backend_config/bucket_config.py +++ b/trains/backend_config/bucket_config.py @@ -1,5 +1,6 @@ import abc import warnings +from copy import copy from operator import itemgetter import furl @@ -289,3 +290,66 @@ class GSBucketConfigurations(BaseBucketConfigurations): def _get_prefix_from_bucket_config(self, config): prefix = furl.furl(scheme="gs", netloc=config.bucket, path=config.subdir) return str(prefix) + + +@attrs +class AzureContainerConfig(object): + account_name = attrib(type=str) + account_key = attrib(type=str) + container_name = attrib(type=str, default=None) + + +class AzureContainerConfigurations(object): + def __init__(self, container_configs=None): + super(AzureContainerConfigurations, self).__init__() + self._container_configs = container_configs or [] + + @classmethod + def from_config(cls, configuration): + if configuration is None: + return cls() + + containers = configuration.get("containers", list()) + container_configs = [AzureContainerConfig(**entry) for entry in containers] + + return cls(container_configs) + + def get_config_by_uri(self, uri): + """ + Get the credentials for an Azure Blob Storage container from the config + :param uri: URI of container or blob + :return: container config + :rtype: AzureContainerConfig + """ + f = furl.furl(uri) + account_name = f.host.partition(".")[0] + + if not f.path.segments: + raise ValueError( + "URI {} is missing a container name (expected " + "[https/azure]://.../)".format( + uri + ) + ) + + container = f.path.segments[0] + + config = copy(self.get_config(account_name, container)) + + if config and not config.container_name: + config.container_name = container + + return config + + def get_config(self, account_name, container): + return next( + ( + config + for config in self._container_configs + if config.account_name == account_name and ( + not config.container_name + or config.container_name == container + ) + ), + None + ) diff --git a/trains/config/__init__.py b/trains/config/__init__.py index edba8a74..f11e0eb2 100644 --- a/trains/config/__init__.py +++ b/trains/config/__init__.py @@ -35,11 +35,11 @@ def get_config_for_bucket(base_url, extra_configurations=None): def get_remote_task_id(): - return None + return _running_remotely_task_id def running_remotely(): - return False + return bool(_running_remotely_task_id) def get_log_to_backend(default=None): diff --git a/trains/config/default/sdk.conf b/trains/config/default/sdk.conf index cedb023e..8fd198c6 100644 --- a/trains/config/default/sdk.conf +++ b/trains/config/default/sdk.conf @@ -88,6 +88,15 @@ # }, # ] } + azure.storage { + # containers: [ + # { + # account_name: "trains" + # account_key: "secret" + # # container_name: + # } + # ] + } log { # debugging feature: set this to true to make null log propagate messages to root logger (so they appear in stdout) @@ -119,6 +128,9 @@ # Status report period in seconds report_period_sec: 2 + # ping to the server - check connectivity + ping_period_sec: 30 + # Log all stdout & stderr log_stdout: True } diff --git a/trains/storage/helper.py b/trains/storage/helper.py index 6c745ba2..47d431fb 100644 --- a/trains/storage/helper.py +++ b/trains/storage/helper.py @@ -1,8 +1,8 @@ import getpass +import io import json import os import threading -from _socket import gethostname from concurrent.futures import ThreadPoolExecutor from copy import copy from datetime import datetime @@ -10,15 +10,11 @@ from multiprocessing.pool import ThreadPool from time import time from types import GeneratorType -import boto3 -import botocore.client import numpy as np import requests import six -from ..backend_api.utils import get_http_session_with_retry -from ..backend_config.bucket_config import S3BucketConfigurations, GSBucketConfigurations +from _socket import gethostname from attr import attrs, attrib, asdict -from botocore.exceptions import ClientError from furl import furl from libcloud.common.types import ProviderError, LibcloudError from libcloud.storage.providers import get_driver @@ -29,6 +25,8 @@ from six.moves.queue import Queue, Empty from six.moves.urllib.parse import urlparse from six.moves.urllib.request import url2pathname +from ..backend_api.utils import get_http_session_with_retry +from ..backend_config.bucket_config import S3BucketConfigurations, GSBucketConfigurations, AzureContainerConfigurations from ..config import config from ..debugging import get_logger from ..errors import UsageError @@ -160,11 +158,10 @@ class StorageHelper(object): # collect all bucket credentials that aren't empty (ignore entries with an empty key or secret) _s3_configurations = S3BucketConfigurations.from_config(config.get('aws.s3', {})) _gs_configurations = GSBucketConfigurations.from_config(config.get('google.storage', {})) + _azure_configurations = AzureContainerConfigurations.from_config(config.get('azure.storage', {})) _path_substitutions = _PathSubstitutionRule.load_list_from_config() - _bucket_location_failure_reported = set() - @property def log(self): return self._log @@ -219,107 +216,119 @@ class StorageHelper(object): parsed = urlparse(url) self._scheme = parsed.scheme - if self._scheme == 'libcloud-s3': - self._conf = copy(self._s3_configurations.get_config_by_uri(url)) - self._secure = self._conf.secure + # if self._scheme == 'libcloud-s3': + # self._conf = copy(self._s3_configurations.get_config_by_uri(url)) + # self._secure = self._conf.secure + # + # final_region = region if region else self._conf.region + # if not final_region: + # final_region = None + # + # self._conf.update( + # key=key or self._conf.key, + # secret=secret or self._conf.secret, + # multipart=self._conf.multipart, + # region=final_region, + # ) + # + # if not self._conf.key or not self._conf.secret: + # raise ValueError('Missing key and secret for S3 storage access (%s)' % base_url) + # + # def init_driver_and_container(host, port, bucket): + # s3_region_to_libcloud_driver = { + # None: Provider.S3, + # "": Provider.S3, + # 'us-east-1': Provider.S3, + # 'ap-northeast': Provider.S3_AP_NORTHEAST, + # 'ap-northeast-1': Provider.S3_AP_NORTHEAST1, + # 'ap-northeast-2': Provider.S3_AP_NORTHEAST2, + # 'ap-south': Provider.S3_AP_SOUTH, + # 'ap-south-1': Provider.S3_AP_SOUTH, + # 'ap-southeast': Provider.S3_AP_SOUTHEAST, + # 'ap-southeast-1': Provider.S3_AP_SOUTHEAST, + # 'ap-southeast-2': Provider.S3_AP_SOUTHEAST2, + # 'ca-central': Provider.S3_CA_CENTRAL, + # 'cn-north': Provider.S3_CN_NORTH, + # 'eu-west': Provider.S3_EU_WEST, + # 'eu-west-1': Provider.S3_EU_WEST, + # 'eu-west-2': Provider.S3_EU_WEST2, + # 'eu-central': Provider.S3_EU_CENTRAL, + # 'eu-central-1': Provider.S3_EU_CENTRAL, + # 'sa-east': Provider.S3_SA_EAST, + # 'sa-east-1': Provider.S3_SA_EAST, + # 'us-east-2': Provider.S3_US_EAST2, + # 'us-west': Provider.S3_US_WEST, + # 'us-west-1': Provider.S3_US_WEST, + # 'us-west-2': Provider.S3_US_WEST_OREGON, + # 'us-west-oregon': Provider.S3_US_WEST_OREGON, + # 'us-gov-west': Provider.S3_US_GOV_WEST, + # 'rgw': Provider.S3_RGW, + # 'rgw_outscale': Provider.S3_RGW_OUTSCALE, + # } + # + # driver_name = s3_region_to_libcloud_driver.get( + # self._conf.region or _Boto3Driver._get_bucket_region(self._conf, self._log) + # ) + # + # if not driver_name: + # self._log.error("Invalid S3 region `%s`: no driver found" % self._conf.region) + # raise ValueError("Invalid s3 region") + # + # host = host or None + # port = port or None + # + # try: + # driver = get_driver(driver_name)( + # self._conf.key, + # self._conf.secret, + # host=host, + # port=port, + # secure=self._secure, + # region=self._conf.region, + # ) + # + # driver.supports_s3_multipart_upload = self._conf.multipart + # container = driver.get_container(container_name=bucket) + # + # except LibcloudError: + # attempted_uri = str(furl(host=host, port=port, path=bucket)).strip('/') + # self._log.error( + # 'Could not create S3 driver for {} in region {}'.format(attempted_uri, self._conf.region)) + # raise + # + # return driver, container + # + # parts = Path(parsed.path.strip('/')).parts + # first_part = parts[0] if parts else "" + # if not self._conf.host and not self._conf.bucket: + # # configuration has no indication of host or bucket, we'll just go with what we have + # try: + # self._driver, self._container = init_driver_and_container(parsed.netloc, None, first_part) + # except Exception as e: + # self._driver, self._container = init_driver_and_container(None, None, parsed.netloc) + # else: + # # configuration provides at least one of host/bucket + # host, _, port = (self._conf.host or '').partition(':') + # port = int(port) if port else None + # bucket = self._conf.bucket or first_part + # self._driver, self._container = init_driver_and_container(host, port, bucket) + # + # if self._conf.acl: + # self._extra['acl'] = self._conf.acl + if self._scheme == _AzureBlobServiceStorageDriver.scheme: + self._conf = copy(self._azure_configurations.get_config_by_uri(url)) + if self._conf is None: + raise StorageError("Missing Azure Blob Storage configuration for {}".format(url)) - final_region = region if region else self._conf.region - if not final_region: - final_region = None - - self._conf.update( - key=key or self._conf.key, - secret=secret or self._conf.secret, - multipart=self._conf.multipart, - region=final_region, - ) - - if not self._conf.key or not self._conf.secret: - raise ValueError('Missing key and secret for S3 storage access (%s)' % base_url) - - def init_driver_and_container(host, port, bucket): - s3_region_to_libcloud_driver = { - None: Provider.S3, - "": Provider.S3, - 'us-east-1': Provider.S3, - 'ap-northeast': Provider.S3_AP_NORTHEAST, - 'ap-northeast-1': Provider.S3_AP_NORTHEAST1, - 'ap-northeast-2': Provider.S3_AP_NORTHEAST2, - 'ap-south': Provider.S3_AP_SOUTH, - 'ap-south-1': Provider.S3_AP_SOUTH, - 'ap-southeast': Provider.S3_AP_SOUTHEAST, - 'ap-southeast-1': Provider.S3_AP_SOUTHEAST, - 'ap-southeast-2': Provider.S3_AP_SOUTHEAST2, - 'ca-central': Provider.S3_CA_CENTRAL, - 'cn-north': Provider.S3_CN_NORTH, - 'eu-west': Provider.S3_EU_WEST, - 'eu-west-1': Provider.S3_EU_WEST, - 'eu-west-2': Provider.S3_EU_WEST2, - 'eu-central': Provider.S3_EU_CENTRAL, - 'eu-central-1': Provider.S3_EU_CENTRAL, - 'sa-east': Provider.S3_SA_EAST, - 'sa-east-1': Provider.S3_SA_EAST, - 'us-east-2': Provider.S3_US_EAST2, - 'us-west': Provider.S3_US_WEST, - 'us-west-1': Provider.S3_US_WEST, - 'us-west-2': Provider.S3_US_WEST_OREGON, - 'us-west-oregon': Provider.S3_US_WEST_OREGON, - 'us-gov-west': Provider.S3_US_GOV_WEST, - 'rgw': Provider.S3_RGW, - 'rgw_outscale': Provider.S3_RGW_OUTSCALE, - } - - driver_name = s3_region_to_libcloud_driver.get( - self._conf.region or self._get_bucket_region(self._conf, self._log) + if not self._conf.account_name or not self._conf.account_key: + raise StorageError( + "Missing account name or key for Azure Blob Storage access for {}".format(base_url) ) - if not driver_name: - self._log.error("Invalid S3 region `%s`: no driver found" % self._conf.region) - raise ValueError("Invalid s3 region") + self._driver = _AzureBlobServiceStorageDriver() + self._container = self._driver.get_container(config=self._conf) - host = host or None - port = port or None - - try: - driver = get_driver(driver_name)( - self._conf.key, - self._conf.secret, - host=host, - port=port, - secure=self._secure, - region=self._conf.region, - ) - - driver.supports_s3_multipart_upload = self._conf.multipart - container = driver.get_container(container_name=bucket) - - except LibcloudError: - attempted_uri = str(furl(host=host, port=port, path=bucket)).strip('/') - self._log.error( - 'Could not create S3 driver for {} in region {}'.format(attempted_uri, self._conf.region)) - raise - - return driver, container - - parts = Path(parsed.path.strip('/')).parts - first_part = parts[0] if parts else "" - if not self._conf.host and not self._conf.bucket: - # configuration has no indication of host or bucket, we'll just go with what we have - try: - self._driver, self._container = init_driver_and_container(parsed.netloc, None, first_part) - except Exception as e: - self._driver, self._container = init_driver_and_container(None, None, parsed.netloc) - else: - # configuration provides at least one of host/bucket - host, _, port = (self._conf.host or '').partition(':') - port = int(port) if port else None - bucket = self._conf.bucket or first_part - self._driver, self._container = init_driver_and_container(host, port, bucket) - - if self._conf.acl: - self._extra['acl'] = self._conf.acl - - elif self._scheme == 's3': + elif self._scheme == _Boto3Driver.scheme: self._conf = copy(self._s3_configurations.get_config_by_uri(url)) self._secure = self._conf.secure @@ -406,7 +415,7 @@ class StorageHelper(object): if not use_existing: # Test bucket config, fails if unsuccessful if _test_config: - cls._test_bucket_config(bucket_config, log) + _Boto3Driver._test_bucket_config(bucket_config, log) if existing: if log: @@ -420,7 +429,7 @@ class StorageHelper(object): if log: log.info('Using existing credentials for bucket %s/%s' % (bucket_config.host or 'AWS', bucket_config.bucket)) - good_config = cls._test_bucket_config(existing, log, raise_on_error=False) + good_config = _Boto3Driver._test_bucket_config(existing, log, raise_on_error=False) if not good_config: # Try to use global key/secret @@ -430,7 +439,7 @@ class StorageHelper(object): log.info('Using global credentials for bucket %s/%s' % (bucket_config.host or 'AWS', bucket_config.bucket)) if _test_config: - cls._test_bucket_config(bucket_config, log) + _Boto3Driver._test_bucket_config(bucket_config, log) else: # do not add anything, existing config is OK return @@ -509,8 +518,8 @@ class StorageHelper(object): test_path = self._normalize_object_name(folder_uri) - if self._scheme == 's3': - self._test_bucket_config( + if self._scheme == _Boto3Driver.scheme: + _Boto3Driver._test_bucket_config( self._conf, self._log, test_path=test_path, @@ -728,6 +737,20 @@ class StorageHelper(object): def delete(self, path): return self._driver.delete_object(self._get_object(path)) + def check_write_permissions(self, dest_path=None): + # create a temporary file, then de;ete it + base_url = dest_path or self._base_url + dest_path = base_url + '/.trains.test' + # do not check http/s connection permissions + if dest_path.startswith('http'): + return True + try: + self.upload_from_stream(stream=iter(b'trains'), dest_path=dest_path) + self.delete(path=dest_path) + except Exception: + raise ValueError('Insufficient permissions for {}'.format(base_url)) + return True + @classmethod def _canonize_url(cls, url): return cls._apply_url_substitutions(url) @@ -755,95 +778,10 @@ class StorageHelper(object): return url - @classmethod - def _get_bucket_region(cls, conf, log=None, report_info=False): - if not conf.bucket: - return None - - def report(msg): - if log and conf.get_bucket_host() not in cls._bucket_location_failure_reported: - if report_info: - log.debug(msg) - else: - log.warning(msg) - cls._bucket_location_failure_reported.add(conf.get_bucket_host()) - - try: - boto_session = boto3.Session(conf.key, conf.secret) - boto_resource = boto_session.resource('s3') - return boto_resource.meta.client.get_bucket_location(Bucket=conf.bucket)["LocationConstraint"] - - except ClientError as ex: - report("Failed getting bucket location (region) for bucket " - "%s: %s (%s, access_key=%s). Default region will be used. " - "This is normal if you do not have GET_BUCKET_LOCATION permission" - % (conf.bucket, ex.response['Error']['Message'], ex.response['Error']['Code'], conf.key)) - except Exception as ex: - report("Failed getting bucket location (region) for bucket %s: %s. Default region will be used." - % (conf.bucket, str(ex))) - - return None - - @classmethod - def _test_bucket_config(cls, conf, log, test_path='', raise_on_error=True, log_on_error=True): - if not conf.bucket: - return False - try: - if not conf.is_valid(): - raise Exception('Missing credentials') - - fullname = furl(conf.bucket).add(path=test_path).add(path='%s-upload_test' % cls.__module__) - bucket_name = str(fullname.path.segments[0]) - filename = str(furl(path=fullname.path.segments[1:])) - - data = { - 'user': getpass.getuser(), - 'machine': gethostname(), - 'time': datetime.utcnow().isoformat() - } - - boto_session = boto3.Session(conf.key, conf.secret) - boto_resource = boto_session.resource('s3', conf.region) - bucket = boto_resource.Bucket(bucket_name) - bucket.put_object(Key=filename, Body=six.b(json.dumps(data))) - - region = cls._get_bucket_region(conf=conf, log=log, report_info=True) - - if region and ((conf.region and region != conf.region) or (not conf.region and region != 'us-east-1')): - msg = "incorrect region specified for bucket %s (detected region %s)" % (conf.bucket, region) - else: - return True - - except ClientError as ex: - msg = ex.response['Error']['Message'] - if log_on_error and log: - log.error(msg) - - if raise_on_error: - raise - - except Exception as ex: - msg = str(ex) - if log_on_error and log: - log.error(msg) - - if raise_on_error: - raise - - msg = ("Failed testing access to bucket %s: " % conf.bucket) + msg - - if log_on_error and log: - log.error(msg) - - if raise_on_error: - raise StorageError(msg) - - return False - @classmethod def _resolve_base_url(cls, base_url): parsed = urlparse(base_url) - if parsed.scheme == 's3': + if parsed.scheme == _Boto3Driver.scheme: conf = cls._s3_configurations.get_config_by_uri(base_url) bucket = conf.bucket if not bucket: @@ -851,6 +789,11 @@ class StorageHelper(object): if parts: bucket = parts[0] return '/'.join(x for x in ('s3:/', conf.host, bucket) if x) + elif parsed.scheme == _AzureBlobServiceStorageDriver.scheme: + conf = cls._azure_configurations.get_config_by_uri(base_url) + if not conf: + raise StorageError("Can't find azure configuration for {}".format(base_url)) + return str(furl(base_url).set(path=conf.container_name)) elif parsed.scheme == _GoogleCloudStorageDriver.scheme: conf = cls._gs_configurations.get_config_by_uri(base_url) return str(furl(scheme=parsed.scheme, netloc=conf.bucket)) @@ -896,7 +839,8 @@ class StorageHelper(object): path = path[len(self.base_url):] if path.startswith('/') and os.name == 'nt': path = path[1:] - if self.scheme in ('s3', _GoogleCloudStorageDriver.scheme): + if self.scheme in (_Boto3Driver.scheme, _GoogleCloudStorageDriver.scheme, + _AzureBlobServiceStorageDriver.scheme): path = path.lstrip('/') return path @@ -940,7 +884,7 @@ class StorageHelper(object): try: cb(False) except Exception as e: - self._log.warn("Exception on upload callback: %s" % str(e)) + self._log.warning("Exception on upload callback: %s" % str(e)) raise if verbose: self._log.debug("Finished upload: %s => %s" % (src_path, object_name)) @@ -948,7 +892,7 @@ class StorageHelper(object): try: cb(dest_path) except Exception as e: - self._log.warn("Exception on upload callback: %s" % str(e)) + self._log.warning("Exception on upload callback: %s" % str(e)) return dest_path @@ -1013,12 +957,19 @@ class _HttpDriver(object): if not overwrite_existing and p.is_file(): log.warn('failed saving after download: overwrite=False and file exists (%s)' % str(p)) return - length = p.write_bytes(obj.content) - if callback: - try: - callback(length) - except Exception as e: - log.warn('Failed reporting downloaded file size for {}: {}'.format(p, e)) + length = 0 + with p.open(mode='wb') as f: + for chunk in obj.iter_content(chunk_size=5 * 1024 * 1024): + # filter out keep-alive new chunks + if not chunk: + continue + chunk_size = len(chunk) + f.write(chunk) + length += chunk_size + if callback: + callback(chunk_size) + + return length class _Stream(object): @@ -1126,10 +1077,25 @@ class _Boto3Driver(object): _containers = {} + scheme = 's3' + scheme_prefix = str(furl(scheme=scheme, netloc='')) + + _bucket_location_failure_reported = set() + class _Container(object): _creation_lock = threading.Lock() def __init__(self, name, cfg): + try: + import boto3 + import botocore.client + from botocore.exceptions import ClientError + except ImportError: + raise UsageError( + 'AWS S3 storage driver (boto3) not found. ' + 'Please install driver using "pip install \'boto3>=1.9\'"' + ) + # skip 's3://' self.name = name[5:] endpoint = (('https://' if cfg.secure else 'http://') + cfg.host) if cfg.host else None @@ -1172,6 +1138,7 @@ class _Boto3Driver(object): return self._containers[container_name] def upload_object_via_stream(self, iterator, container, object_name, extra=None, **kwargs): + import boto3 stream = _Stream(iterator) try: container.bucket.upload_fileobj(stream, object_name, Config=boto3.s3.transfer.TransferConfig( @@ -1184,6 +1151,7 @@ class _Boto3Driver(object): return True def upload_object(self, file_path, container, object_name, extra=None, **kwargs): + import boto3 try: container.bucket.upload_file(file_path, object_name, Config=boto3.s3.transfer.TransferConfig( use_threads=container.config.multipart, @@ -1220,6 +1188,7 @@ class _Boto3Driver(object): log.error('Failed downloading: %s' % ex) a_stream.close() + import boto3 # return iterable object stream = _Stream() container = self._containers[obj.container_name] @@ -1232,6 +1201,7 @@ class _Boto3Driver(object): return stream def download_object(self, obj, local_path, overwrite_existing=True, delete_on_failure=True, callback=None): + import boto3 p = Path(local_path) if not overwrite_existing and p.is_file(): log.warn('failed saving after download: overwrite=False and file exists (%s)' % str(p)) @@ -1244,6 +1214,100 @@ class _Boto3Driver(object): max_concurrency=self._max_multipart_concurrency if container.config.multipart else 1, num_download_attempts=container.config.retries)) + @classmethod + def _test_bucket_config(cls, conf, log, test_path='', raise_on_error=True, log_on_error=True): + try: + import boto3 + from botocore.exceptions import ClientError + except ImportError: + return False + + if not conf.bucket: + return False + try: + if not conf.is_valid(): + raise Exception('Missing credentials') + + fullname = furl(conf.bucket).add(path=test_path).add(path='%s-upload_test' % cls.__module__) + bucket_name = str(fullname.path.segments[0]) + filename = str(furl(path=fullname.path.segments[1:])) + + data = { + 'user': getpass.getuser(), + 'machine': gethostname(), + 'time': datetime.utcnow().isoformat() + } + + boto_session = boto3.Session(conf.key, conf.secret) + boto_resource = boto_session.resource('s3', conf.region) + bucket = boto_resource.Bucket(bucket_name) + bucket.put_object(Key=filename, Body=six.b(json.dumps(data))) + + region = cls._get_bucket_region(conf=conf, log=log, report_info=True) + + if region and ((conf.region and region != conf.region) or (not conf.region and region != 'us-east-1')): + msg = "incorrect region specified for bucket %s (detected region %s)" % (conf.bucket, region) + else: + return True + + except ClientError as ex: + msg = ex.response['Error']['Message'] + if log_on_error and log: + log.error(msg) + + if raise_on_error: + raise + + except Exception as ex: + msg = str(ex) + if log_on_error and log: + log.error(msg) + + if raise_on_error: + raise + + msg = ("Failed testing access to bucket %s: " % conf.bucket) + msg + + if log_on_error and log: + log.error(msg) + + if raise_on_error: + raise StorageError(msg) + + return False + + @classmethod + def _get_bucket_region(cls, conf, log=None, report_info=False): + import boto3 + from botocore.exceptions import ClientError + + if not conf.bucket: + return None + + def report(msg): + if log and conf.get_bucket_host() not in cls._bucket_location_failure_reported: + if report_info: + log.debug(msg) + else: + log.warning(msg) + cls._bucket_location_failure_reported.add(conf.get_bucket_host()) + + try: + boto_session = boto3.Session(conf.key, conf.secret) + boto_resource = boto_session.resource('s3') + return boto_resource.meta.client.get_bucket_location(Bucket=conf.bucket)["LocationConstraint"] + + except ClientError as ex: + report("Failed getting bucket location (region) for bucket " + "%s: %s (%s, access_key=%s). Default region will be used. " + "This is normal if you do not have GET_BUCKET_LOCATION permission" + % (conf.bucket, ex.response['Error']['Message'], ex.response['Error']['Code'], conf.key)) + except Exception as ex: + report("Failed getting bucket location (region) for bucket %s: %s. Default region will be used." + % (conf.bucket, str(ex))) + + return None + class _GoogleCloudStorageDriver(object): """Storage driver for google cloud storage""" @@ -1263,8 +1327,8 @@ class _GoogleCloudStorageDriver(object): from google.oauth2 import service_account except ImportError: raise UsageError( - 'Google cloud driver not found.' - 'Please install driver using "pip install google-cloud-storage"' + 'Google cloud driver not found. ' + 'Please install driver using "pip install \'google-cloud-storage>=1.13.2\'"' ) self.name = name[len(_GoogleCloudStorageDriver.scheme_prefix):] @@ -1361,3 +1425,194 @@ class _GoogleCloudStorageDriver(object): permissions_to_test = ('storage.objects.get', 'storage.objects.update') return set(test_obj.test_iam_permissions(permissions_to_test)) == set(permissions_to_test) + + +class _AzureBlobServiceStorageDriver(object): + scheme = 'azure' + + _containers = {} + + class _Container(object): + def __init__(self, name, config): + try: + from azure.common import AzureHttpError + from azure.storage.blob import BlockBlobService + except ImportError: + raise UsageError( + 'Azure blob storage driver not found. ' + 'Please install driver using "pip install \'azure.storage.blob>=2.0.1\'"' + ) + + self.name = name + self.config = config + self.blob_service = BlockBlobService( + account_name=config.account_name, + account_key=config.account_key, + ) + + @attrs + class _Object(object): + container = attrib() + blob_name = attrib() + content_length = attrib() + + def get_container(self, config, *_, **kwargs): + container_name = config.container_name + if container_name not in self._containers: + self._containers[container_name] = self._Container(name=container_name, config=config) + # self._containers[container_name].config.retries = kwargs.get('retries', 5) + return self._containers[container_name] + + def upload_object_via_stream(self, iterator, container, object_name, extra=None, **kwargs): + from azure.common import AzureHttpError + + blob_name = self._blob_name_from_object_path(object_name, container.name) + try: + container.blob_service.MAX_SINGLE_PUT_SIZE = 16 * 1024 * 1024 + container.blob_service.socket_timeout = (300, 2000) + container.blob_service.create_blob_from_bytes( + container.name, + object_name, + bytes(iterator), + # timeout=300, + max_connections=2, + ) + return True + except AzureHttpError as ex: + log.error('Failed uploading (Azure error): %s' % ex) + except Exception as ex: + log.error('Failed uploading: %s' % ex) + return False + + def upload_object(self, file_path, container, object_name, extra=None, **kwargs): + from azure.common import AzureHttpError + + blob_name = self._blob_name_from_object_path(object_name, container.name) + stream = None + try: + from azure.storage.blob import ContentSettings + from mimetypes import guess_type + container.blob_service.MAX_SINGLE_PUT_SIZE = 16 * 1024 * 1024 + container.blob_service.socket_timeout = (300, 2000) + container.blob_service.create_blob_from_path( + container.name, + blob_name, + file_path, + # timeout=300, + max_connections=2, + content_settings=ContentSettings(content_type=guess_type(file_path)) + ) + return True + except AzureHttpError as ex: + log.error('Failed uploading (Azure error): %s' % ex) + except Exception as ex: + log.error('Failed uploading: %s' % ex) + finally: + if stream: + stream.close() + + def list_container_objects(self, container, ex_prefix=None, **kwargs): + return list(container.blob_service.list_blobs(container_name=container.name, prefix=ex_prefix)) + + def delete_object(self, object, **kwargs): + container = object.container + container.blob_service.delete_blob( + container.name, + object.blob_name, + ) + + def get_object(self, container_name, object_name, *args, **kwargs): + container = self._containers.get(container_name) + if not container: + raise StorageError("Container `{}` not found for object {}".format(container_name, object_name)) + + # blob_name = self._blob_name_from_object_path(object_name, container_name) + blob = container.blob_service.get_blob_properties(container.name, object_name) + + return self._Object(container=container, blob_name=blob.name, content_length=blob.properties.content_length) + + def download_object_as_stream(self, obj, *_, **__): + container = obj.container + blob = container.blob_service.get_blob_to_bytes( + container.name, + container.blob_name, + ) + return blob.content + + def download_object(self, obj, local_path, overwrite_existing=True, delete_on_failure=True, callback=None): + p = Path(local_path) + if not overwrite_existing and p.is_file(): + log.warn('failed saving after download: overwrite=False and file exists (%s)' % str(p)) + return + + download_done = threading.Event() + download_done.counter = 0 + + def callback_func(current, total): + if callback: + chunk = current-download_done.counter + download_done.counter += chunk + callback(chunk) + if current >= total: + download_done.set() + + container = obj.container + container.blob_service.MAX_SINGLE_GET_SIZE = 5 * 1024 * 1024 + _ = container.blob_service.get_blob_to_path( + container.name, + obj.blob_name, + local_path, + max_connections=10, + progress_callback=callback_func, + ) + download_done.wait() + + def test_upload(self, test_path, config): + container = self.get_container(config=config) + try: + container.blob_service.get_container_properties(container.name) + except Exception: + return False + else: + # Using the account Key, we can always upload... + return True + + @classmethod + def _blob_name_from_object_path(cls, name, container_name): + scheme = urlparse(name).scheme + if scheme: + if scheme != cls.scheme: + raise StorageError( + "When using a URL, only the `{}` scheme is supported for Azure storage: {}", + cls.scheme, + name, + ) + + f = furl(name) + + if not f.path.segments: + raise StorageError( + "Missing container name in URL {}", + name, + ) + + parsed_container_name = f.path.segments[0] + + if parsed_container_name != container_name: + raise StorageError( + "Container name mismatch (expected {}, found {}) in {}", + container_name, + parsed_container_name, + name, + ) + + if len(f.path.segments) == 1: + raise StorageError( + "No path found following container name {} in {}", + container_name, + name, + ) + + return f.path.segments[0], join(*f.path.segments[1:]) + + return name