Removed apache-libcloud

This commit is contained in:
allegroai 2019-09-13 17:09:58 +03:00
parent d7bdc746b8
commit 76ea73a780
3 changed files with 584 additions and 115 deletions

View File

@ -1,4 +1,3 @@
apache-libcloud>=2.2.1
attrs>=18.0
backports.functools-lru-cache>=1.0.2 ; python_version < '3'
enum34>=0.9 ; python_version < '3.6'

View File

@ -108,6 +108,26 @@ class ScriptRequirements(object):
modules |= fmodules
try_imports |= try_ipts
# hack: forcefully insert storage modules if we have them
# noinspection PyBroadException
try:
import boto3
modules.add('boto3', 'trains.storage', 0)
except Exception:
pass
# noinspection PyBroadException
try:
from google.cloud import storage
modules.add('google_cloud_storage', 'trains.storage', 0)
except Exception:
pass
# noinspection PyBroadException
try:
from azure.storage.blob import ContentSettings
modules.add('azure_storage_blob', 'trains.storage', 0)
except Exception:
pass
return modules, try_imports, local_mods
@staticmethod
@ -247,7 +267,7 @@ class _JupyterObserver(object):
requirements_txt = ''
# parse jupyter python script and prepare pip requirements (pigar)
# if backend supports requirements
if file_import_modules and Session.api_version > '2.1':
if file_import_modules and Session.check_min_api_version('2.2'):
fmodules, _ = file_import_modules(notebook.parts[-1], script_code)
installed_pkgs = get_installed_pkgs_detail()
reqs = ReqsModules()
@ -435,7 +455,7 @@ class ScriptInfo(object):
# if this is not jupyter, get the requirements.txt
requirements = ''
# create requirements if backend supports requirements
if create_requirements and not jupyter_filepath and Session.api_version > '2.1':
if create_requirements and not jupyter_filepath and Session.check_min_api_version('2.2'):
script_requirements = ScriptRequirements(Path(repo_root).as_posix())
requirements = script_requirements.get_requirements()

View File

@ -1,7 +1,13 @@
from __future__ import with_statement
import errno
import getpass
import json
import os
import shutil
import sys
import threading
from collections import namedtuple
from concurrent.futures import ThreadPoolExecutor
from copy import copy
from datetime import datetime
@ -15,9 +21,6 @@ import six
from _socket import gethostname
from attr import attrs, attrib, asdict
from furl import furl
from libcloud.common.types import ProviderError, LibcloudError
from libcloud.storage.providers import get_driver
from libcloud.storage.types import Provider
from pathlib2 import Path
from requests.exceptions import ConnectionError
from six import binary_type
@ -203,7 +206,12 @@ class StorageHelper(object):
return cls._helpers[instance_key]
# Don't canonize URL since we already did it
instance = cls(base_url=base_url, url=url, logger=logger, canonize_url=False, **kwargs)
try:
instance = cls(base_url=base_url, url=url, logger=logger, canonize_url=False, **kwargs)
except Exception:
log.error("Failed credentials for {}".format(base_url or url))
return None
cls._helpers[instance_key] = instance
return instance
@ -224,105 +232,7 @@ class StorageHelper(object):
parsed = urlparse(url)
self._scheme = parsed.scheme
# if self._scheme == 'libcloud-s3':
# self._conf = copy(self._s3_configurations.get_config_by_uri(url))
# self._secure = self._conf.secure
#
# final_region = region if region else self._conf.region
# if not final_region:
# final_region = None
#
# self._conf.update(
# key=key or self._conf.key,
# secret=secret or self._conf.secret,
# multipart=self._conf.multipart,
# region=final_region,
# )
#
# if not self._conf.key or not self._conf.secret:
# raise ValueError('Missing key and secret for S3 storage access (%s)' % base_url)
#
# def init_driver_and_container(host, port, bucket):
# s3_region_to_libcloud_driver = {
# None: Provider.S3,
# "": Provider.S3,
# 'us-east-1': Provider.S3,
# 'ap-northeast': Provider.S3_AP_NORTHEAST,
# 'ap-northeast-1': Provider.S3_AP_NORTHEAST1,
# 'ap-northeast-2': Provider.S3_AP_NORTHEAST2,
# 'ap-south': Provider.S3_AP_SOUTH,
# 'ap-south-1': Provider.S3_AP_SOUTH,
# 'ap-southeast': Provider.S3_AP_SOUTHEAST,
# 'ap-southeast-1': Provider.S3_AP_SOUTHEAST,
# 'ap-southeast-2': Provider.S3_AP_SOUTHEAST2,
# 'ca-central': Provider.S3_CA_CENTRAL,
# 'cn-north': Provider.S3_CN_NORTH,
# 'eu-west': Provider.S3_EU_WEST,
# 'eu-west-1': Provider.S3_EU_WEST,
# 'eu-west-2': Provider.S3_EU_WEST2,
# 'eu-central': Provider.S3_EU_CENTRAL,
# 'eu-central-1': Provider.S3_EU_CENTRAL,
# 'sa-east': Provider.S3_SA_EAST,
# 'sa-east-1': Provider.S3_SA_EAST,
# 'us-east-2': Provider.S3_US_EAST2,
# 'us-west': Provider.S3_US_WEST,
# 'us-west-1': Provider.S3_US_WEST,
# 'us-west-2': Provider.S3_US_WEST_OREGON,
# 'us-west-oregon': Provider.S3_US_WEST_OREGON,
# 'us-gov-west': Provider.S3_US_GOV_WEST,
# 'rgw': Provider.S3_RGW,
# 'rgw_outscale': Provider.S3_RGW_OUTSCALE,
# }
#
# driver_name = s3_region_to_libcloud_driver.get(
# self._conf.region or _Boto3Driver._get_bucket_region(self._conf, self._log)
# )
#
# if not driver_name:
# self._log.error("Invalid S3 region `%s`: no driver found" % self._conf.region)
# raise ValueError("Invalid s3 region")
#
# host = host or None
# port = port or None
#
# try:
# driver = get_driver(driver_name)(
# self._conf.key,
# self._conf.secret,
# host=host,
# port=port,
# secure=self._secure,
# region=self._conf.region,
# )
#
# driver.supports_s3_multipart_upload = self._conf.multipart
# container = driver.get_container(container_name=bucket)
#
# except LibcloudError:
# attempted_uri = str(furl(host=host, port=port, path=bucket)).strip('/')
# self._log.error(
# 'Could not create S3 driver for {} in region {}'.format(attempted_uri, self._conf.region))
# raise
#
# return driver, container
#
# parts = Path(parsed.path.strip('/')).parts
# first_part = parts[0] if parts else ""
# if not self._conf.host and not self._conf.bucket:
# # configuration has no indication of host or bucket, we'll just go with what we have
# try:
# self._driver, self._container = init_driver_and_container(parsed.netloc, None, first_part)
# except Exception as e:
# self._driver, self._container = init_driver_and_container(None, None, parsed.netloc)
# else:
# # configuration provides at least one of host/bucket
# host, _, port = (self._conf.host or '').partition(':')
# port = int(port) if port else None
# bucket = self._conf.bucket or first_part
# self._driver, self._container = init_driver_and_container(host, port, bucket)
#
# if self._conf.acl:
# self._extra['acl'] = self._conf.acl
if self._scheme == _AzureBlobServiceStorageDriver.scheme:
self._conf = copy(self._azure_configurations.get_config_by_uri(url))
if self._conf is None:
@ -387,7 +297,7 @@ class StorageHelper(object):
# assume a folder and create
Path(driver_uri).mkdir(parents=True, exist_ok=True)
self._driver = get_driver(Provider.LOCAL)(driver_uri)
self._driver = _FileStorageDriver(driver_uri)
self._container = self._driver.get_container(container_name='.')
@classmethod
@ -692,15 +602,17 @@ class StorageHelper(object):
(dl_total_mb, total_size_mb, dl_rate, remote_path))
data, stream, tic = next_chunk(stream)
# remove target local_path if already exists
try:
os.remove(local_path)
except:
pass
if Path(temp_local_path).stat().st_size <= 0:
raise Exception('downloaded a 0-sized file')
# if we are on windows, we need to remove the target file before renaming
# otherwise posix rename will overwrite the target
if os.name != 'posix':
try:
os.remove(local_path)
except Exception:
pass
# rename temp file to local_file
os.rename(temp_local_path, local_path)
# report download if we are on the second chunk
@ -763,6 +675,21 @@ class StorageHelper(object):
raise ValueError('Insufficient permissions for {}'.format(base_url))
return True
@classmethod
def download_from_url(cls, remote_url, local_path, overwrite_existing=False):
"""
Download a file from remote URL to a local storage
:param remote_url: Remote URL. Example: https://example.com/image.jpg or s3://bucket/folder/file.mp4 etc.
:param local_path: target location for downloaded file. Example: /tmp/image.jpg
:param overwrite_existing: If True and local_path exists, it will overwrite it, otherwise print warning
:return: local_path if download was successful
"""
helper = cls.get(remote_url)
if not helper:
return None
return helper.download_to_file(remote_url, local_path, overwrite_existing=overwrite_existing)
@classmethod
def _canonize_url(cls, url):
return cls._apply_url_substitutions(url)
@ -912,8 +839,6 @@ class StorageHelper(object):
object_name = self._normalize_object_name(path)
try:
return self._driver.get_object(container_name=self._container.name, object_name=object_name)
except ProviderError:
raise
except ConnectionError as ex:
raise DownloadError
except Exception as e:
@ -1647,3 +1572,528 @@ class _AzureBlobServiceStorageDriver(object):
return f.path.segments[0], os.path.join(*f.path.segments[1:])
return name
class _FileStorageDriver(object):
"""
A base StorageDriver to derive from.
"""
scheme = "file"
CHUNK_SIZE = 8096
IGNORE_FOLDERS = ['.lock', '.hash']
Object = namedtuple("Object", ['name', 'size', 'extra', 'driver', 'container', 'hash', 'meta_data'])
class _Container(object):
def __init__(self, name, extra, driver):
self.name = name
self.extra = extra
self.driver = driver
def __init__(self, key, secret=None, secure=True, host=None, port=None,
**kwargs):
# Use the key as the path to the storage
self.base_path = key
if not os.path.isdir(self.base_path):
raise ValueError('The base path is not a directory')
def _make_path(self, path, ignore_existing=True):
"""
Create a path by checking if it already exists
"""
try:
os.makedirs(path)
except OSError:
exp = sys.exc_info()[1]
if exp.errno == errno.EEXIST and not ignore_existing:
raise exp
def _check_container_name(self, container_name):
"""
Check if the container name is valid
:param container_name: Container name
:type container_name: ``str``
"""
if '/' in container_name or '\\' in container_name:
raise ValueError(value=None, driver=self, container_name=container_name)
def _make_container(self, container_name):
"""
Create a container instance
:param container_name: Container name.
:type container_name: ``str``
:return: Container instance.
:rtype: :class:`Container`
"""
self._check_container_name(container_name)
full_path = os.path.realpath(os.path.join(self.base_path, container_name))
try:
stat = os.stat(full_path)
if not os.path.isdir(full_path):
raise OSError('Target path is not a directory')
except OSError:
raise ValueError(value=None, driver=self, container_name=container_name)
extra = {}
extra['creation_time'] = stat.st_ctime
extra['access_time'] = stat.st_atime
extra['modify_time'] = stat.st_mtime
return self._Container(name=container_name, extra=extra, driver=self)
def _make_object(self, container, object_name):
"""
Create an object instance
:param container: Container.
:type container: :class:`Container`
:param object_name: Object name.
:type object_name: ``str``
:return: Object instance.
:rtype: :class:`Object`
"""
full_path = os.path.realpath(os.path.join(self.base_path, container.name, object_name))
if os.path.isdir(full_path):
raise ValueError(value=None, driver=self, object_name=object_name)
try:
stat = os.stat(full_path)
except Exception:
raise ValueError(value=None, driver=self, object_name=object_name)
extra = {}
extra['creation_time'] = stat.st_ctime
extra['access_time'] = stat.st_atime
extra['modify_time'] = stat.st_mtime
return self.Object(name=object_name, size=stat.st_size, extra=extra,
driver=self, container=container, hash=None, meta_data=None)
def iterate_containers(self):
"""
Return a generator of containers.
:return: A generator of Container instances.
:rtype: ``generator`` of :class:`Container`
"""
for container_name in os.listdir(self.base_path):
full_path = os.path.join(self.base_path, container_name)
if not os.path.isdir(full_path):
continue
yield self._make_container(container_name)
def _get_objects(self, container):
"""
Recursively iterate through the file-system and return the object names
"""
cpath = self.get_container_cdn_url(container, check=True)
for folder, subfolders, files in os.walk(cpath, topdown=True):
# Remove unwanted subfolders
for subf in self.IGNORE_FOLDERS:
if subf in subfolders:
subfolders.remove(subf)
for name in files:
full_path = os.path.join(folder, name)
object_name = os.path.relpath(full_path, start=cpath)
yield self._make_object(container, object_name)
def iterate_container_objects(self, container):
"""
Returns a generator of objects for the given container.
:param container: Container instance
:type container: :class:`Container`
:return: A generator of Object instances.
:rtype: ``generator`` of :class:`Object`
"""
return self._get_objects(container)
def get_container(self, container_name):
"""
Return a container instance.
:param container_name: Container name.
:type container_name: ``str``
:return: :class:`Container` instance.
:rtype: :class:`Container`
"""
return self._make_container(container_name)
def get_container_cdn_url(self, container, check=False):
"""
Return a container CDN URL.
:param container: Container instance
:type container: :class:`Container`
:param check: Indicates if the path's existence must be checked
:type check: ``bool``
:return: A CDN URL for this container.
:rtype: ``str``
"""
path = os.path.realpath(os.path.join(self.base_path, container.name))
if check and not os.path.isdir(path):
raise ValueError(value=None, driver=self, container_name=container.name)
return path
def get_object(self, container_name, object_name):
"""
Return an object instance.
:param container_name: Container name.
:type container_name: ``str``
:param object_name: Object name.
:type object_name: ``str``
:return: :class:`Object` instance.
:rtype: :class:`Object`
"""
container = self._make_container(container_name)
return self._make_object(container, object_name)
def get_object_cdn_url(self, obj):
"""
Return an object CDN URL.
:param obj: Object instance
:type obj: :class:`Object`
:return: A CDN URL for this object.
:rtype: ``str``
"""
return os.path.realpath(os.path.join(self.base_path, obj.container.name, obj.name))
def download_object(self, obj, destination_path, overwrite_existing=False,
delete_on_failure=True):
"""
Download an object to the specified destination path.
:param obj: Object instance.
:type obj: :class:`Object`
:param destination_path: Full path to a file or a directory where the
incoming file will be saved.
:type destination_path: ``str``
:param overwrite_existing: True to overwrite an existing file,
defaults to False.
:type overwrite_existing: ``bool``
:param delete_on_failure: True to delete a partially downloaded file if
the download was not successful (hash mismatch / file size).
:type delete_on_failure: ``bool``
:return: True if an object has been successfully downloaded, False
otherwise.
:rtype: ``bool``
"""
obj_path = self.get_object_cdn_url(obj)
base_name = os.path.basename(destination_path)
if not base_name and not os.path.exists(destination_path):
raise ValueError(
value='Path %s does not exist' % (destination_path),
driver=self)
if not base_name:
file_path = os.path.join(destination_path, obj.name)
else:
file_path = destination_path
if os.path.exists(file_path) and not overwrite_existing:
raise ValueError('File %s already exists, but ' % (file_path) + 'overwrite_existing=False')
try:
shutil.copy(obj_path, file_path)
except IOError:
if delete_on_failure:
try:
os.unlink(file_path)
except Exception:
pass
return False
return True
def download_object_as_stream(self, obj, chunk_size=None):
"""
Return a generator which yields object data.
:param obj: Object instance
:type obj: :class:`Object`
:param chunk_size: Optional chunk size (in bytes).
:type chunk_size: ``int``
:return: A stream of binary chunks of data.
:rtype: ``object``
"""
path = self.get_object_cdn_url(obj)
with open(path, 'rb') as obj_file:
for data in self._read_in_chunks(obj_file, chunk_size=chunk_size):
yield data
def upload_object(self, file_path, container, object_name, extra=None,
verify_hash=True):
"""
Upload an object currently located on a disk.
:param file_path: Path to the object on disk.
:type file_path: ``str``
:param container: Destination container.
:type container: :class:`Container`
:param object_name: Object name.
:type object_name: ``str``
:param verify_hash: Verify hast
:type verify_hash: ``bool``
:param extra: (optional) Extra attributes (driver specific).
:type extra: ``dict``
:rtype: ``object``
"""
path = self.get_container_cdn_url(container, check=True)
obj_path = os.path.join(path, object_name)
base_path = os.path.dirname(obj_path)
self._make_path(base_path)
shutil.copy(file_path, obj_path)
os.chmod(obj_path, int('664', 8))
return self._make_object(container, object_name)
def upload_object_via_stream(self, iterator, container,
object_name,
extra=None):
"""
Upload an object using an iterator.
If a provider supports it, chunked transfer encoding is used and you
don't need to know in advance the amount of data to be uploaded.
Otherwise if a provider doesn't support it, iterator will be exhausted
so a total size for data to be uploaded can be determined.
Note: Exhausting the iterator means that the whole data must be
buffered in memory which might result in memory exhausting when
uploading a very large object.
If a file is located on a disk you are advised to use upload_object
function which uses fs.stat function to determine the file size and it
doesn't need to buffer whole object in the memory.
:type iterator: ``object``
:param iterator: An object which implements the iterator
interface and yields binary chunks of data.
:type container: :class:`Container`
:param container: Destination container.
:type object_name: ``str``
:param object_name: Object name.
:type extra: ``dict``
:param extra: (optional) Extra attributes (driver specific). Note:
This dictionary must contain a 'content_type' key which represents
a content type of the stored object.
:rtype: ``object``
"""
path = self.get_container_cdn_url(container, check=True)
obj_path = os.path.join(path, object_name)
base_path = os.path.dirname(obj_path)
self._make_path(base_path)
obj_path = os.path.realpath(obj_path)
with open(obj_path, 'wb') as obj_file:
obj_file.write(iterator.read() if hasattr(iterator, 'read') else bytes(iterator))
os.chmod(obj_path, int('664', 8))
return self._make_object(container, object_name)
def delete_object(self, obj):
"""
Delete an object.
:type obj: :class:`Object`
:param obj: Object instance.
:return: ``bool`` True on success.
:rtype: ``bool``
"""
path = self.get_object_cdn_url(obj)
try:
os.unlink(path)
except Exception:
return False
# # Check and delete all the empty parent folders
# path = os.path.dirname(path)
# container_url = obj.container.get_cdn_url()
#
# # Delete the empty parent folders till the container's level
# while path != container_url:
# try:
# os.rmdir(path)
# except OSError:
# exp = sys.exc_info()[1]
# if exp.errno == errno.ENOTEMPTY:
# break
# raise exp
#
# path = os.path.dirname(path)
return True
def create_container(self, container_name):
"""
Create a new container.
:type container_name: ``str``
:param container_name: Container name.
:return: :class:`Container` instance on success.
:rtype: :class:`Container`
"""
self._check_container_name(container_name)
path = os.path.join(self.base_path, container_name)
try:
self._make_path(path, ignore_existing=False)
except OSError:
exp = sys.exc_info()[1]
if exp.errno == errno.EEXIST:
raise ValueError('Container %s with this name already exists. The name '
'must be unique among all the containers in the '
'system' % container_name)
else:
raise ValueError( 'Error creating container %s' % container_name)
except Exception:
raise ValueError('Error creating container %s' % container_name)
return self._make_container(container_name)
def delete_container(self, container):
"""
Delete a container.
:type container: :class:`Container`
:param container: Container instance
:return: True on success, False otherwise.
:rtype: ``bool``
"""
# Check if there are any objects inside this
for obj in self._get_objects(container):
raise ValueError(value='Container %s is not empty' % container.name)
path = self.get_container_cdn_url(container, check=True)
try:
shutil.rmtree(path)
except Exception:
return False
return True
def list_container_objects(self, container, **kwargs):
return list(self.iterate_container_objects(container))
@staticmethod
def _read_in_chunks(iterator, chunk_size=None, fill_size=False, yield_empty=False):
"""
Return a generator which yields data in chunks.
:param iterator: An object which implements an iterator interface
or a File like object with read method.
:type iterator: :class:`object` which implements iterator interface.
:param chunk_size: Optional chunk size (defaults to CHUNK_SIZE)
:type chunk_size: ``int``
:param fill_size: If True, make sure chunks are exactly chunk_size in
length (except for last chunk).
:type fill_size: ``bool``
:param yield_empty: If true and iterator returned no data, only yield empty
bytes object
:type yield_empty: ``bool``
TODO: At some point in the future we could use byte arrays here if version
>= Python 3. This should speed things up a bit and reduce memory usage.
"""
chunk_size = chunk_size or _FileStorageDriver.CHUNK_SIZE
if six.PY3:
from io import FileIO as file
if isinstance(iterator, (file)):
get_data = iterator.read
args = (chunk_size,)
else:
get_data = next
args = (iterator,)
data = bytes('')
empty = False
while not empty or len(data) > 0:
if not empty:
try:
chunk = bytes(get_data(*args))
if len(chunk) > 0:
data += chunk
else:
empty = True
except StopIteration:
empty = True
if len(data) == 0:
if empty and yield_empty:
yield bytes('')
return
if fill_size:
if empty or len(data) >= chunk_size:
yield data[:chunk_size]
data = data[chunk_size:]
else:
yield data
data = bytes('')