Support Azure and GCP storage in Task.setup_upload()

This commit is contained in:
allegroai 2022-09-02 23:36:32 +03:00
parent 495741df0a
commit a663805eb7
3 changed files with 217 additions and 39 deletions

View File

@ -261,6 +261,9 @@ class GSBucketConfig(object):
else:
setattr(self, item, kwargs[item])
def is_valid(self):
return self.bucket
class GSBucketConfigurations(BaseBucketConfigurations):
def __init__(
@ -315,10 +318,12 @@ class GSBucketConfigurations(BaseBucketConfigurations):
pool_maxsize=bucket_config.pool_maxsize or self._default_pool_maxsize
)
def get_config_by_uri(self, uri):
def get_config_by_uri(self, uri, create_if_not_found=True):
"""
Get the credentials for a Google Storage bucket from the config
:param uri: URI of bucket, directory or file
:param create_if_not_found: If True and the config is not found in the current configurations, create a new one.
Else, don't create a new one and return None
:return: GSBucketConfig: bucket config
"""
@ -331,7 +336,8 @@ class GSBucketConfigurations(BaseBucketConfigurations):
try:
return next(res)
except StopIteration:
pass
if not create_if_not_found:
return None
parsed = furl.furl(uri)
@ -355,11 +361,23 @@ class AzureContainerConfig(object):
account_key = attrib(type=str)
container_name = attrib(type=str, default=None)
def update(self, **kwargs):
for item in kwargs:
if not hasattr(self, item):
warnings.warn("Unexpected argument {} for update. Ignored".format(item))
else:
setattr(self, item, kwargs[item])
def is_valid(self):
return self.account_name and self.container_name
class AzureContainerConfigurations(object):
def __init__(self, container_configs=None):
def __init__(self, container_configs=None, default_account=None, default_key=None):
super(AzureContainerConfigurations, self).__init__()
self._container_configs = container_configs or []
self._default_account = default_account
self._default_key = default_key
@classmethod
def from_config(cls, configuration):
@ -373,12 +391,12 @@ class AzureContainerConfigurations(object):
))
if configuration is None:
return cls(default_container_configs)
return cls(default_container_configs, default_account=default_account, default_key=default_key)
containers = configuration.get("containers", list())
container_configs = [AzureContainerConfig(**entry) for entry in containers] + default_container_configs
return cls(container_configs)
return cls(container_configs, default_account=default_account, default_key=default_key)
def get_config_by_uri(self, uri):
"""
@ -418,3 +436,12 @@ class AzureContainerConfigurations(object):
),
None
)
def update_config_with_defaults(self, bucket_config):
bucket_config.update(
account_name=bucket_config.account_name or self._default_account,
account_key=bucket_config.account_key or self._default_key
)
def add_config(self, bucket_config):
self._container_configs.append(bucket_config)

View File

@ -1,6 +1,7 @@
from abc import abstractproperty
from typing import Optional
from ..backend_config.bucket_config import S3BucketConfig
from ..backend_config.bucket_config import S3BucketConfig, AzureContainerConfig, GSBucketConfig
from ..storage.helper import StorageHelper
@ -9,39 +10,113 @@ class SetupUploadMixin(object):
storage_uri = abstractproperty()
def setup_upload(
self, bucket_name, host=None, access_key=None, secret_key=None, region=None, multipart=True, https=True, verify=True):
self,
bucket_name, # type: str
host=None, # type: Optional[str]
access_key=None, # type: Optional[str]
secret_key=None, # type: Optional[str]
multipart=True, # type: bool
https=True, # type: bool
region=None, # type: Optional[str]
verify=True, # type: bool
):
"""
Setup upload options (currently only S3 is supported)
(Deprecated) Setup upload options. Only S3 is supported.
Please note that this function is deprecated. Use `setup_aws_upload`, `setup_gcp_upload` or
`setup_azure_upload` to setup the upload options for the corresponding cloud.
:param bucket_name: AWS bucket name
:type bucket_name: str
:param host: Hostname. Only required in case a Non-AWS S3 solution such as a local Minio server is used)
:type host: str
:param access_key: AWS access key. If not provided, we'll attempt to obtain the key from the
configuration file (bucket-specific, than global)
:type access_key: str
:param secret_key: AWS secret key. If not provided, we'll attempt to obtain the secret from the
configuration file (bucket-specific, than global)
:type secret_key: str
:param multipart: Server supports multipart. Only required when using a Non-AWS S3 solution that doesn't support
multipart.
:type multipart: bool
:param https: Server supports HTTPS. Only required when using a Non-AWS S3 solution that only supports HTTPS.
:type https: bool
:param region: Bucket region. Required if the bucket doesn't reside in the default region (us-east-1)
:type region: str
:param verify: Whether or not to verify SSL certificates. Only required when using a Non-AWS S3 solution that only supports HTTPS with self-signed certificate.
:type verify: bool
:param verify: Whether or not to verify SSL certificates.
Only required when using a Non-AWS S3 solution that only supports HTTPS with self-signed certificate.
"""
self._bucket_config = S3BucketConfig(
bucket=bucket_name,
self.setup_aws_upload(
bucket_name,
host=host,
key=access_key,
secret=secret_key,
region=region,
multipart=multipart,
secure=https,
region=region,
verify=verify
verify=verify,
)
self.storage_uri = ('s3://%(host)s/%(bucket_name)s' if host else 's3://%(bucket_name)s') % locals()
StorageHelper.add_configuration(self._bucket_config, log=self.log)
def setup_aws_upload(
self, bucket, host=None, key=None, secret=None, region=None, multipart=True, secure=True, verify=True
):
"""
Setup S3 upload options.
:param bucket: AWS bucket name
:param host: Hostname. Only required in case a Non-AWS S3 solution such as a local Minio server is used)
:param key: AWS access key. If not provided, we'll attempt to obtain the key from the
configuration file (bucket-specific, than global)
:param secret: AWS secret key. If not provided, we'll attempt to obtain the secret from the
configuration file (bucket-specific, than global)
:param region: Bucket region. Required if the bucket doesn't reside in the default region (us-east-1)
:param multipart: Server supports multipart. Only required when using a Non-AWS S3 solution that doesn't support
multipart.
:param secure: Server supports HTTPS. Only required when using a Non-AWS S3 solution that only supports HTTPS.
:param verify: Whether or not to verify SSL certificates.
Only required when using a Non-AWS S3 solution that only supports HTTPS with self-signed certificate.
"""
self._bucket_config = S3BucketConfig( # noqa
bucket=bucket,
host=host,
key=key,
secret=secret,
region=region,
multipart=multipart,
secure=secure,
verify=verify,
)
StorageHelper.add_aws_configuration(self._bucket_config, log=self.log)
self.storage_uri = StorageHelper.get_aws_storage_uri_from_config(self._bucket_config)
def setup_gcp_upload(
self, bucket, subdir="", project=None, credentials_json=None, pool_connections=None, pool_maxsize=None
):
# type: (str, str, Optional[str], Optional[str], Optional[int], Optional[int]) -> None
"""
Setup GCP upload options.
:param bucket: Bucket to upload to
:param subdir: Subdir in bucket to upload to
:param project: Project the bucket belongs to
:param credentials_json: Path to the JSON file that contains the credentials
:param pool_connections: The number of urllib3 connection pools to cache
:param pool_maxsize: The maximum number of connections to save in the pool
"""
self._bucket_config = GSBucketConfig( # noqa
bucket,
subdir=subdir,
project=project,
credentials_json=credentials_json,
pool_connections=pool_connections,
pool_maxsize=pool_maxsize,
)
StorageHelper.add_gcp_configuration(self._bucket_config, log=self.log)
self.storage_uri = StorageHelper.get_gcp_storage_uri_from_config(self._bucket_config)
def setup_azure_upload(self, account_name, account_key, container_name=None):
# type: (str, str, Optional[str]) -> None
"""
Setup Azure upload options.
:param account_name: Name of the account
:param account_key: Secret key used to authenticate the account
:param container_name: The name of the blob container to upload to
"""
self._bucket_config = AzureContainerConfig( # noqa
account_name=account_name, account_key=account_key, container_name=container_name
)
StorageHelper.add_azure_configuration(self._bucket_config, log=self.log)
self.storage_uri = StorageHelper.get_azure_storage_uri_from_config(self._bucket_config)

View File

@ -399,53 +399,129 @@ class StorageHelper(object):
pass
remaining_timeout -= (time() - t)
@classmethod
def get_aws_storage_uri_from_config(cls, bucket_config):
return (
"s3://{}/{}".format(bucket_config.host, bucket_config.bucket)
if bucket_config.host
else "s3://{}".format(bucket_config.bucket)
)
@classmethod
def get_gcp_storage_uri_from_config(cls, bucket_config):
return (
"gs://{}/{}".format(bucket_config.bucket, bucket_config.subdir)
if bucket_config.subdir
else "gs://{}".format(bucket_config.bucket)
)
@classmethod
def get_azure_storage_uri_from_config(cls, bucket_config):
return "azure://{}.blob.core.windows.net/{}".format(bucket_config.account_name, bucket_config.container_name)
@classmethod
def get_configuration(cls, bucket_config):
return cls.get_aws_configuration(bucket_config)
@classmethod
def get_aws_configuration(cls, bucket_config):
return cls._s3_configurations.get_config_by_bucket(bucket_config.bucket, bucket_config.host)
@classmethod
def get_gcp_configuration(cls, bucket_config):
return cls._gs_configurations.get_config_by_uri(
cls.get_gcp_storage_uri_from_config(bucket_config),
create_if_not_found=False
)
@classmethod
def get_azure_configuration(cls, bucket_config):
return cls._azure_configurations.get_config(bucket_config.account_name, bucket_config.container_name)
@classmethod
def add_configuration(cls, bucket_config, log=None, _test_config=True):
return cls.add_aws_configuration(bucket_config, log=log, _test_config=_test_config)
@classmethod
def add_aws_configuration(cls, bucket_config, log=None, _test_config=True):
# Try to use existing configuration if we have no key and secret
use_existing = not bucket_config.is_valid()
# Get existing config anyway (we'll either try to use it or alert we're replacing it
existing = cls.get_configuration(bucket_config)
existing = cls.get_aws_configuration(bucket_config)
configs = cls._s3_configurations
uri = cls.get_aws_storage_uri_from_config(bucket_config)
if not use_existing:
# Test bucket config, fails if unsuccessful
if _test_config:
_Boto3Driver._test_bucket_config(bucket_config, log)
_Boto3Driver._test_bucket_config(bucket_config, log) # noqa
if existing:
if log:
log.warning('Overriding existing configuration for %s/%s'
% (existing.host or 'AWS', existing.bucket))
log.warning("Overriding existing configuration for '{}'".format(uri))
configs.remove_config(existing)
else:
# Try to use existing configuration
good_config = False
if existing:
if log:
log.info('Using existing credentials for bucket %s/%s'
% (bucket_config.host or 'AWS', bucket_config.bucket))
good_config = _Boto3Driver._test_bucket_config(existing, log, raise_on_error=False)
log.info("Using existing credentials for '{}'".format(uri))
good_config = _Boto3Driver._test_bucket_config(existing, log, raise_on_error=False) # noqa
if not good_config:
# Try to use global key/secret
configs.update_config_with_defaults(bucket_config)
if log:
log.info('Using global credentials for bucket %s/%s'
% (bucket_config.host or 'AWS', bucket_config.bucket))
log.info("Using global credentials for '{}'".format(uri))
if _test_config:
_Boto3Driver._test_bucket_config(bucket_config, log)
else:
# do not add anything, existing config is OK
return
_Boto3Driver._test_bucket_config(bucket_config, log) # noqa
configs.add_config(bucket_config)
configs.add_config(bucket_config)
@classmethod
def add_gcp_configuration(cls, bucket_config, log=None):
use_existing = not bucket_config.is_valid()
existing = cls.get_gcp_configuration(bucket_config)
configs = cls._gs_configurations
uri = cls.get_gcp_storage_uri_from_config(bucket_config)
if not use_existing and existing:
if log:
log.warning("Overriding existing configuration for '{}'".format(uri))
configs.remove_config(existing)
else:
good_config = False
if existing:
if log:
log.info("Using existing config for '{}'".format(uri))
good_config = _GoogleCloudStorageDriver.test_upload(None, bucket_config)
if not good_config:
configs.update_config_with_defaults(bucket_config)
if log:
log.info("Using global credentials for '{}'".format(uri))
configs.add_config(bucket_config)
@classmethod
def add_azure_configuration(cls, bucket_config, log=None):
use_existing = not bucket_config.is_valid()
existing = cls.get_azure_configuration(bucket_config)
configs = cls._azure_configurations
uri = cls.get_azure_storage_uri_from_config(bucket_config)
if not use_existing and existing:
if log:
log.warning("Overriding existing configuration for '{}'".format(uri))
configs.remove_config(existing)
else:
good_config = False
if existing:
if log:
log.info("Using existing config for '{}'".format(uri))
good_config = _AzureBlobServiceStorageDriver.test_upload(None, bucket_config)
if not good_config:
configs.update_config_with_defaults(bucket_config)
if log:
log.info("Using global credentials for '{}'".format(uri))
configs.add_config(bucket_config)
@classmethod
def add_path_substitution(