diff --git a/clearml/backend_config/bucket_config.py b/clearml/backend_config/bucket_config.py index 62fc5b74..1d8d2938 100644 --- a/clearml/backend_config/bucket_config.py +++ b/clearml/backend_config/bucket_config.py @@ -261,6 +261,9 @@ class GSBucketConfig(object): else: setattr(self, item, kwargs[item]) + def is_valid(self): + return self.bucket + class GSBucketConfigurations(BaseBucketConfigurations): def __init__( @@ -315,10 +318,12 @@ class GSBucketConfigurations(BaseBucketConfigurations): pool_maxsize=bucket_config.pool_maxsize or self._default_pool_maxsize ) - def get_config_by_uri(self, uri): + def get_config_by_uri(self, uri, create_if_not_found=True): """ Get the credentials for a Google Storage bucket from the config :param uri: URI of bucket, directory or file + :param create_if_not_found: If True and the config is not found in the current configurations, create a new one. + Else, don't create a new one and return None :return: GSBucketConfig: bucket config """ @@ -331,7 +336,8 @@ class GSBucketConfigurations(BaseBucketConfigurations): try: return next(res) except StopIteration: - pass + if not create_if_not_found: + return None parsed = furl.furl(uri) @@ -355,11 +361,23 @@ class AzureContainerConfig(object): account_key = attrib(type=str) container_name = attrib(type=str, default=None) + def update(self, **kwargs): + for item in kwargs: + if not hasattr(self, item): + warnings.warn("Unexpected argument {} for update. Ignored".format(item)) + else: + setattr(self, item, kwargs[item]) + + def is_valid(self): + return self.account_name and self.container_name + class AzureContainerConfigurations(object): - def __init__(self, container_configs=None): + def __init__(self, container_configs=None, default_account=None, default_key=None): super(AzureContainerConfigurations, self).__init__() self._container_configs = container_configs or [] + self._default_account = default_account + self._default_key = default_key @classmethod def from_config(cls, configuration): @@ -373,12 +391,12 @@ class AzureContainerConfigurations(object): )) if configuration is None: - return cls(default_container_configs) + return cls(default_container_configs, default_account=default_account, default_key=default_key) containers = configuration.get("containers", list()) container_configs = [AzureContainerConfig(**entry) for entry in containers] + default_container_configs - return cls(container_configs) + return cls(container_configs, default_account=default_account, default_key=default_key) def get_config_by_uri(self, uri): """ @@ -418,3 +436,12 @@ class AzureContainerConfigurations(object): ), None ) + + def update_config_with_defaults(self, bucket_config): + bucket_config.update( + account_name=bucket_config.account_name or self._default_account, + account_key=bucket_config.account_key or self._default_key + ) + + def add_config(self, bucket_config): + self._container_configs.append(bucket_config) diff --git a/clearml/backend_interface/setupuploadmixin.py b/clearml/backend_interface/setupuploadmixin.py index 82b9e4c2..aa2cd92b 100644 --- a/clearml/backend_interface/setupuploadmixin.py +++ b/clearml/backend_interface/setupuploadmixin.py @@ -1,6 +1,7 @@ from abc import abstractproperty +from typing import Optional -from ..backend_config.bucket_config import S3BucketConfig +from ..backend_config.bucket_config import S3BucketConfig, AzureContainerConfig, GSBucketConfig from ..storage.helper import StorageHelper @@ -9,39 +10,113 @@ class SetupUploadMixin(object): storage_uri = abstractproperty() def setup_upload( - self, bucket_name, host=None, access_key=None, secret_key=None, region=None, multipart=True, https=True, verify=True): + self, + bucket_name, # type: str + host=None, # type: Optional[str] + access_key=None, # type: Optional[str] + secret_key=None, # type: Optional[str] + multipart=True, # type: bool + https=True, # type: bool + region=None, # type: Optional[str] + verify=True, # type: bool + ): """ - Setup upload options (currently only S3 is supported) + (Deprecated) Setup upload options. Only S3 is supported. + Please note that this function is deprecated. Use `setup_aws_upload`, `setup_gcp_upload` or + `setup_azure_upload` to setup the upload options for the corresponding cloud. :param bucket_name: AWS bucket name - :type bucket_name: str :param host: Hostname. Only required in case a Non-AWS S3 solution such as a local Minio server is used) - :type host: str :param access_key: AWS access key. If not provided, we'll attempt to obtain the key from the configuration file (bucket-specific, than global) - :type access_key: str :param secret_key: AWS secret key. If not provided, we'll attempt to obtain the secret from the configuration file (bucket-specific, than global) - :type secret_key: str :param multipart: Server supports multipart. Only required when using a Non-AWS S3 solution that doesn't support multipart. - :type multipart: bool :param https: Server supports HTTPS. Only required when using a Non-AWS S3 solution that only supports HTTPS. - :type https: bool :param region: Bucket region. Required if the bucket doesn't reside in the default region (us-east-1) - :type region: str - :param verify: Whether or not to verify SSL certificates. Only required when using a Non-AWS S3 solution that only supports HTTPS with self-signed certificate. - :type verify: bool + :param verify: Whether or not to verify SSL certificates. + Only required when using a Non-AWS S3 solution that only supports HTTPS with self-signed certificate. """ - self._bucket_config = S3BucketConfig( - bucket=bucket_name, + self.setup_aws_upload( + bucket_name, host=host, key=access_key, secret=secret_key, + region=region, multipart=multipart, secure=https, - region=region, - verify=verify + verify=verify, ) - self.storage_uri = ('s3://%(host)s/%(bucket_name)s' if host else 's3://%(bucket_name)s') % locals() - StorageHelper.add_configuration(self._bucket_config, log=self.log) + + def setup_aws_upload( + self, bucket, host=None, key=None, secret=None, region=None, multipart=True, secure=True, verify=True + ): + """ + Setup S3 upload options. + + :param bucket: AWS bucket name + :param host: Hostname. Only required in case a Non-AWS S3 solution such as a local Minio server is used) + :param key: AWS access key. If not provided, we'll attempt to obtain the key from the + configuration file (bucket-specific, than global) + :param secret: AWS secret key. If not provided, we'll attempt to obtain the secret from the + configuration file (bucket-specific, than global) + :param region: Bucket region. Required if the bucket doesn't reside in the default region (us-east-1) + :param multipart: Server supports multipart. Only required when using a Non-AWS S3 solution that doesn't support + multipart. + :param secure: Server supports HTTPS. Only required when using a Non-AWS S3 solution that only supports HTTPS. + :param verify: Whether or not to verify SSL certificates. + Only required when using a Non-AWS S3 solution that only supports HTTPS with self-signed certificate. + """ + self._bucket_config = S3BucketConfig( # noqa + bucket=bucket, + host=host, + key=key, + secret=secret, + region=region, + multipart=multipart, + secure=secure, + verify=verify, + ) + StorageHelper.add_aws_configuration(self._bucket_config, log=self.log) + self.storage_uri = StorageHelper.get_aws_storage_uri_from_config(self._bucket_config) + + def setup_gcp_upload( + self, bucket, subdir="", project=None, credentials_json=None, pool_connections=None, pool_maxsize=None + ): + # type: (str, str, Optional[str], Optional[str], Optional[int], Optional[int]) -> None + """ + Setup GCP upload options. + + :param bucket: Bucket to upload to + :param subdir: Subdir in bucket to upload to + :param project: Project the bucket belongs to + :param credentials_json: Path to the JSON file that contains the credentials + :param pool_connections: The number of urllib3 connection pools to cache + :param pool_maxsize: The maximum number of connections to save in the pool + """ + self._bucket_config = GSBucketConfig( # noqa + bucket, + subdir=subdir, + project=project, + credentials_json=credentials_json, + pool_connections=pool_connections, + pool_maxsize=pool_maxsize, + ) + StorageHelper.add_gcp_configuration(self._bucket_config, log=self.log) + self.storage_uri = StorageHelper.get_gcp_storage_uri_from_config(self._bucket_config) + + def setup_azure_upload(self, account_name, account_key, container_name=None): + # type: (str, str, Optional[str]) -> None + """ + Setup Azure upload options. + + :param account_name: Name of the account + :param account_key: Secret key used to authenticate the account + :param container_name: The name of the blob container to upload to + """ + self._bucket_config = AzureContainerConfig( # noqa + account_name=account_name, account_key=account_key, container_name=container_name + ) + StorageHelper.add_azure_configuration(self._bucket_config, log=self.log) + self.storage_uri = StorageHelper.get_azure_storage_uri_from_config(self._bucket_config) diff --git a/clearml/storage/helper.py b/clearml/storage/helper.py index d84c501c..ae64e68a 100644 --- a/clearml/storage/helper.py +++ b/clearml/storage/helper.py @@ -399,53 +399,129 @@ class StorageHelper(object): pass remaining_timeout -= (time() - t) + @classmethod + def get_aws_storage_uri_from_config(cls, bucket_config): + return ( + "s3://{}/{}".format(bucket_config.host, bucket_config.bucket) + if bucket_config.host + else "s3://{}".format(bucket_config.bucket) + ) + + @classmethod + def get_gcp_storage_uri_from_config(cls, bucket_config): + return ( + "gs://{}/{}".format(bucket_config.bucket, bucket_config.subdir) + if bucket_config.subdir + else "gs://{}".format(bucket_config.bucket) + ) + + @classmethod + def get_azure_storage_uri_from_config(cls, bucket_config): + return "azure://{}.blob.core.windows.net/{}".format(bucket_config.account_name, bucket_config.container_name) + @classmethod def get_configuration(cls, bucket_config): + return cls.get_aws_configuration(bucket_config) + + @classmethod + def get_aws_configuration(cls, bucket_config): return cls._s3_configurations.get_config_by_bucket(bucket_config.bucket, bucket_config.host) + @classmethod + def get_gcp_configuration(cls, bucket_config): + return cls._gs_configurations.get_config_by_uri( + cls.get_gcp_storage_uri_from_config(bucket_config), + create_if_not_found=False + ) + + @classmethod + def get_azure_configuration(cls, bucket_config): + return cls._azure_configurations.get_config(bucket_config.account_name, bucket_config.container_name) + @classmethod def add_configuration(cls, bucket_config, log=None, _test_config=True): + return cls.add_aws_configuration(bucket_config, log=log, _test_config=_test_config) + + @classmethod + def add_aws_configuration(cls, bucket_config, log=None, _test_config=True): # Try to use existing configuration if we have no key and secret use_existing = not bucket_config.is_valid() - # Get existing config anyway (we'll either try to use it or alert we're replacing it - existing = cls.get_configuration(bucket_config) - + existing = cls.get_aws_configuration(bucket_config) configs = cls._s3_configurations + uri = cls.get_aws_storage_uri_from_config(bucket_config) if not use_existing: # Test bucket config, fails if unsuccessful if _test_config: - _Boto3Driver._test_bucket_config(bucket_config, log) + _Boto3Driver._test_bucket_config(bucket_config, log) # noqa if existing: if log: - log.warning('Overriding existing configuration for %s/%s' - % (existing.host or 'AWS', existing.bucket)) + log.warning("Overriding existing configuration for '{}'".format(uri)) configs.remove_config(existing) else: # Try to use existing configuration good_config = False if existing: if log: - log.info('Using existing credentials for bucket %s/%s' - % (bucket_config.host or 'AWS', bucket_config.bucket)) - good_config = _Boto3Driver._test_bucket_config(existing, log, raise_on_error=False) + log.info("Using existing credentials for '{}'".format(uri)) + good_config = _Boto3Driver._test_bucket_config(existing, log, raise_on_error=False) # noqa if not good_config: # Try to use global key/secret configs.update_config_with_defaults(bucket_config) if log: - log.info('Using global credentials for bucket %s/%s' - % (bucket_config.host or 'AWS', bucket_config.bucket)) + log.info("Using global credentials for '{}'".format(uri)) if _test_config: - _Boto3Driver._test_bucket_config(bucket_config, log) - else: - # do not add anything, existing config is OK - return + _Boto3Driver._test_bucket_config(bucket_config, log) # noqa + configs.add_config(bucket_config) - configs.add_config(bucket_config) + @classmethod + def add_gcp_configuration(cls, bucket_config, log=None): + use_existing = not bucket_config.is_valid() + existing = cls.get_gcp_configuration(bucket_config) + configs = cls._gs_configurations + uri = cls.get_gcp_storage_uri_from_config(bucket_config) + + if not use_existing and existing: + if log: + log.warning("Overriding existing configuration for '{}'".format(uri)) + configs.remove_config(existing) + else: + good_config = False + if existing: + if log: + log.info("Using existing config for '{}'".format(uri)) + good_config = _GoogleCloudStorageDriver.test_upload(None, bucket_config) + if not good_config: + configs.update_config_with_defaults(bucket_config) + if log: + log.info("Using global credentials for '{}'".format(uri)) + configs.add_config(bucket_config) + + @classmethod + def add_azure_configuration(cls, bucket_config, log=None): + use_existing = not bucket_config.is_valid() + existing = cls.get_azure_configuration(bucket_config) + configs = cls._azure_configurations + uri = cls.get_azure_storage_uri_from_config(bucket_config) + if not use_existing and existing: + if log: + log.warning("Overriding existing configuration for '{}'".format(uri)) + configs.remove_config(existing) + else: + good_config = False + if existing: + if log: + log.info("Using existing config for '{}'".format(uri)) + good_config = _AzureBlobServiceStorageDriver.test_upload(None, bucket_config) + if not good_config: + configs.update_config_with_defaults(bucket_config) + if log: + log.info("Using global credentials for '{}'".format(uri)) + configs.add_config(bucket_config) @classmethod def add_path_substitution(