From e66257761ab81a78cb967e0d6e37f9be17d45174 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Wed, 21 Dec 2022 18:41:16 +0200 Subject: [PATCH] Add support for server-side delete for AWS S3, Google Storage and Azure Blob Storage --- apiserver/bll/event/event_bll.py | 21 +- apiserver/bll/storage/__init__.py | 48 ++ apiserver/bll/task/task_cleanup.py | 13 +- .../default/services/async_urls_delete.conf | 4 +- .../default/services/storage_credentials.conf | 53 ++ apiserver/database/model/url_to_delete.py | 9 +- apiserver/jobs/async_urls_delete.py | 614 +++++++++++++++--- apiserver/requirements.txt | 5 + .../tests/automated/test_tasks_delete.py | 3 +- fileserver/config/default/fileserver.conf | 2 +- 10 files changed, 648 insertions(+), 124 deletions(-) create mode 100644 apiserver/bll/storage/__init__.py create mode 100644 apiserver/config/default/services/storage_credentials.conf diff --git a/apiserver/bll/event/event_bll.py b/apiserver/bll/event/event_bll.py index 73a67ad..6f6792a 100644 --- a/apiserver/bll/event/event_bll.py +++ b/apiserver/bll/event/event_bll.py @@ -9,6 +9,7 @@ from operator import attrgetter from typing import Sequence, Set, Tuple, Optional, List, Mapping, Union import elasticsearch +from boltons.iterutils import chunked_iter from elasticsearch.helpers import BulkIndexError from mongoengine import Q from nested_dict import nested_dict @@ -1188,15 +1189,19 @@ class EventBLL(object): Delete mutliple task events. No check is done for tasks write access so it should be checked by the calling code """ - es_req = {"query": {"terms": {"task": task_ids}}} + deleted = 0 with translate_errors_context(): - es_res = delete_company_events( - es=self.es, - company_id=company_id, - event_type=EventType.all, - body=es_req, - **self._get_events_deletion_params(async_delete), - ) + for tasks in chunked_iter(task_ids, 100): + es_req = {"query": {"terms": {"task": tasks}}} + es_res = delete_company_events( + es=self.es, + company_id=company_id, + event_type=EventType.all, + body=es_req, + **self._get_events_deletion_params(async_delete), + ) + if not async_delete: + deleted += es_res.get("deleted", 0) if not async_delete: return es_res.get("deleted", 0) diff --git a/apiserver/bll/storage/__init__.py b/apiserver/bll/storage/__init__.py new file mode 100644 index 0000000..7c88689 --- /dev/null +++ b/apiserver/bll/storage/__init__.py @@ -0,0 +1,48 @@ +from copy import copy + +from boltons.cacheutils import cachedproperty +from clearml.backend_config.bucket_config import ( + S3BucketConfigurations, + AzureContainerConfigurations, + GSBucketConfigurations, +) + +from apiserver.config_repo import config + + +log = config.logger(__file__) + + +class StorageBLL: + default_aws_configs: S3BucketConfigurations = None + conf = config.get("services.storage_credentials") + + @cachedproperty + def _default_aws_configs(self) -> S3BucketConfigurations: + return S3BucketConfigurations.from_config(self.conf.get("aws.s3")) + + @cachedproperty + def _default_azure_configs(self) -> AzureContainerConfigurations: + return AzureContainerConfigurations.from_config(self.conf.get("azure.storage")) + + @cachedproperty + def _default_gs_configs(self) -> GSBucketConfigurations: + return GSBucketConfigurations.from_config(self.conf.get("google.storage")) + + def get_azure_settings_for_company( + self, + company_id: str, + ) -> AzureContainerConfigurations: + return copy(self._default_azure_configs) + + def get_gs_settings_for_company( + self, + company_id: str, + ) -> GSBucketConfigurations: + return copy(self._default_gs_configs) + + def get_aws_settings_for_company( + self, + company_id: str, + ) -> S3BucketConfigurations: + return copy(self._default_aws_configs) diff --git a/apiserver/bll/task/task_cleanup.py b/apiserver/bll/task/task_cleanup.py index 53eabc1..e060fce 100644 --- a/apiserver/bll/task/task_cleanup.py +++ b/apiserver/bll/task/task_cleanup.py @@ -5,6 +5,7 @@ from typing import Sequence, Set, Tuple import attr from boltons.iterutils import partition, bucketize, first +from furl import furl from mongoengine import NotUniqueError from pymongo.errors import DuplicateKeyError @@ -106,6 +107,9 @@ def collect_debug_image_urls(company: str, task_or_model: str) -> Set[str]: supported_storage_types = { "https://": StorageType.fileserver, "http://": StorageType.fileserver, + "s3://": StorageType.s3, + "azure://": StorageType.azure, + "gs://": StorageType.gs, } @@ -129,7 +133,14 @@ def _schedule_for_delete( for url in storage_urls: folder = None if delete_folders: - folder, _, _ = url.rpartition("/") + try: + parsed = furl(url) + if parsed.path and len(parsed.path.segments) > 1: + folder = parsed.remove( + args=True, fragment=True, path=parsed.path.segments[-1] + ).url.rstrip("/") + except Exception as ex: + pass to_delete = folder or url if to_delete in scheduled_to_delete: diff --git a/apiserver/config/default/services/async_urls_delete.conf b/apiserver/config/default/services/async_urls_delete.conf index b997e33..26dc5e4 100644 --- a/apiserver/config/default/services/async_urls_delete.conf +++ b/apiserver/config/default/services/async_urls_delete.conf @@ -1,6 +1,6 @@ # if set to True then on task delete/reset external file urls for know storage types are scheduled for async delete # otherwise they are returned to a client for the client side delete -enabled: false +enabled: true max_retries: 3 retry_timeout_sec: 60 @@ -9,4 +9,4 @@ fileserver { # Can be in the form ://host:port/path or /path url_prefixes: ["https://files.community-master.hosted.allegro.ai/"] timeout_sec: 300 -} \ No newline at end of file +} diff --git a/apiserver/config/default/services/storage_credentials.conf b/apiserver/config/default/services/storage_credentials.conf new file mode 100644 index 0000000..cd80772 --- /dev/null +++ b/apiserver/config/default/services/storage_credentials.conf @@ -0,0 +1,53 @@ +aws { + s3 { + # S3 credentials, used for read/write access by various SDK elements + # default, used for any bucket not specified below + key: "" + secret: "" + region: "" + use_credentials_chain: false + # Additional ExtraArgs passed to boto3 when uploading files. Can also be set per-bucket under "credentials". + extra_args: {} + credentials: [ + # specifies key/secret credentials to use when handling s3 urls (read or write) + # { + # bucket: "my-bucket-name" + # key: "my-access-key" + # secret: "my-secret-key" + # }, + { + # This will apply to all buckets in this host (unless key/value is specifically provided for a given bucket) + host: "localhost:9000" + key: "evg_user" + secret: "evg_pass" + multipart: false + secure: false + } + ] + } +} +google.storage { + # Default project and credentials file + # Will be used when no bucket configuration is found +// project: "clearml" +// credentials_json: "/path/to/credentials.json" +// +// # Specific credentials per bucket and sub directory +// credentials = [ +// { +// bucket: "my-bucket" +// subdir: "path/in/bucket" # Not required +// project: "clearml" +// credentials_json: "/path/to/credentials.json" +// }, +// ] +} +azure.storage { + # containers: [ + # { + # account_name: "clearml" + # account_key: "secret" + # # container_name: + # } + # ] +} diff --git a/apiserver/database/model/url_to_delete.py b/apiserver/database/model/url_to_delete.py index 4845def..25b538d 100644 --- a/apiserver/database/model/url_to_delete.py +++ b/apiserver/database/model/url_to_delete.py @@ -8,6 +8,9 @@ from apiserver.database.model import AttributedDocument class StorageType(str, Enum): fileserver = "fileserver" + s3 = "s3" + azure = "azure" + gs = "gs" unknown = "unknown" @@ -32,10 +35,8 @@ class UrlToDelete(AttributedDocument): "strict": strict, "indexes": [ ("company", "user", "task"), - "storage_type", - "created", - "retry_count", - "type", + ("company", "storage_type", "url"), + ("status", "retry_count", "storage_type"), ], } diff --git a/apiserver/jobs/async_urls_delete.py b/apiserver/jobs/async_urls_delete.py index 1a13da6..9c7f4d4 100644 --- a/apiserver/jobs/async_urls_delete.py +++ b/apiserver/jobs/async_urls_delete.py @@ -1,3 +1,5 @@ +import os +from abc import ABC, ABCMeta, abstractmethod from argparse import ArgumentParser from collections import defaultdict from datetime import datetime, timedelta @@ -5,38 +7,28 @@ from functools import partial from itertools import chain from pathlib import Path from time import sleep -from typing import Sequence, Tuple +from typing import Sequence, Optional, Tuple, Mapping, TypeVar, Hashable, Generic +from urllib.parse import urlparse +import boto3 import requests +from azure.storage.blob import ContainerClient, PartialBatchErrorException +from boltons.iterutils import bucketize, chunked_iter from furl import furl +from google.cloud import storage as google_storage from mongoengine import Q +from mypy_boto3_s3.service_resource import Bucket as AWSBucket +from apiserver.bll.storage import StorageBLL from apiserver.config_repo import config from apiserver.database import db -from apiserver.database.model.url_to_delete import ( - UrlToDelete, - DeletionStatus, - StorageType, -) +from apiserver.database.model.url_to_delete import UrlToDelete, StorageType, DeletionStatus log = config.logger(f"JOB-{Path(__file__).name}") conf = config.get("services.async_urls_delete") max_retries = conf.get("max_retries", 3) retry_timeout = timedelta(seconds=conf.get("retry_timeout_sec", 60)) -fileserver_timeout = conf.get("fileserver.timeout_sec", 300) -UrlPrefix = Tuple[str, str] - - -def validate_fileserver_access(fileserver_host: str) -> str: - fileserver_host = fileserver_host or config.get("hosts.fileserver", None) - if not fileserver_host: - log.error(f"Fileserver host not configured") - exit(1) - - res = requests.get(url=fileserver_host) - res.raise_for_status() - - return fileserver_host +storage_bll = StorageBLL() def mark_retry_failed(ids: Sequence[str], reason: str): @@ -58,112 +50,517 @@ def mark_failed(query: Q, reason: str): ) -def delete_fileserver_urls( - urls_query: Q, fileserver_host: str, url_prefixes: Sequence[UrlPrefix] -): - to_delete = list(UrlToDelete.objects(urls_query).limit(10000)) +def scheme_prefix(scheme: str) -> str: + return str(furl(scheme=scheme, netloc="")) + + +T = TypeVar("T", bound=Hashable) + + +class Storage(Generic[T], metaclass=ABCMeta): + class Client(ABC): + @property + @abstractmethod + def chunk_size(self) -> int: + pass + + def get_path(self, url: UrlToDelete) -> str: + pass + + def delete_many( + self, paths: Sequence[str] + ) -> Tuple[Sequence[str], Mapping[str, Sequence[str]]]: + pass + + @property + @abstractmethod + def name(self) -> str: + pass + + def group_urls( + self, urls: Sequence[UrlToDelete] + ) -> Mapping[T, Sequence[UrlToDelete]]: + pass + + def get_client(self, base: T, urls: Sequence[UrlToDelete]) -> Client: + pass + + +def delete_urls(urls_query: Q, storage: Storage): + to_delete = list(UrlToDelete.objects(urls_query).order_by("url").limit(10000)) if not to_delete: return - def resolve_path(url_: UrlToDelete) -> str: - parsed = furl(url_.url) - url_host = f"{parsed.scheme}://{parsed.netloc}" if parsed.scheme else None - url_path = str(parsed.path) + grouped_urls = storage.group_urls(to_delete) + for base, urls in grouped_urls.items(): + if not base: + msg = f"Invalid {storage.name} url or missing {storage.name} configuration for account" + mark_failed( + Q(id__in=[url.id for url in urls]), msg, + ) + log.warning( + f"Failed to delete {len(urls)} files from {storage.name} due to: {msg}" + ) + continue - for host, path_prefix in url_prefixes: + try: + client = storage.get_client(base, urls) + except Exception as ex: + failed = [url.id for url in urls] + mark_retry_failed(failed, reason=str(ex)) + log.warning( + f"Failed to delete {len(failed)} files from {storage.name} due to: {str(ex)}" + ) + continue + + for chunk in chunked_iter(urls, client.chunk_size): + paths = [] + path_to_id_mapping = defaultdict(list) + ids_to_delete = set() + for url in chunk: + try: + path = client.get_path(url) + except Exception as ex: + err = str(ex) + mark_failed(Q(id=url.id), err) + log.warning(f"Error getting path for {url.url}: {err}") + continue + + paths.append(path) + path_to_id_mapping[path].append(url.id) + ids_to_delete.add(url.id) + + if not paths: + continue + + try: + deleted_paths, errors = client.delete_many(paths) + except Exception as ex: + mark_retry_failed([url.id for url in urls], str(ex)) + log.warning( + f"Error deleting {len(paths)} files from {storage.name}: {str(ex)}" + ) + continue + + failed_ids = set() + for reason, err_paths in errors.items(): + error_ids = set( + chain.from_iterable( + path_to_id_mapping.get(p, []) for p in err_paths + ) + ) + mark_retry_failed(list(error_ids), reason) + log.warning( + f"Failed to delete {len(error_ids)} files from {storage.name} storage due to: {reason}" + ) + failed_ids.update(error_ids) + + deleted_ids = set( + chain.from_iterable( + path_to_id_mapping.get(p, []) for p in deleted_paths + ) + ) + if deleted_ids: + UrlToDelete.objects(id__in=list(deleted_ids)).delete() + log.info( + f"{len(deleted_ids)} files deleted from {storage.name} storage" + ) + + missing_ids = ids_to_delete - deleted_ids - failed_ids + if missing_ids: + mark_retry_failed(list(missing_ids), "Not succeeded") + + +class FileserverStorage(Storage): + class Client(Storage.Client): + timeout = conf.get("fileserver.timeout_sec", 300) + + def __init__(self, session: requests.Session, host: str): + self.session = session + self.delete_url = furl(host).add(path="delete_many").url + + @property + def chunk_size(self) -> int: + return 10000 + + def get_path(self, url: UrlToDelete) -> str: + path = url.url.strip("/") + if not path: + raise ValueError("Empty path") + + return path + + def delete_many( + self, paths: Sequence[str] + ) -> Tuple[Sequence[str], Mapping[str, Sequence[str]]]: + res = self.session.post( + url=self.delete_url, json={"files": list(paths)}, timeout=self.timeout + ) + res.raise_for_status() + res_data = res.json() + return list(res_data.get("deleted", {})), res_data.get("errors", {}) + + def __init__(self, company: str, fileserver_host: str = None): + fileserver_host = fileserver_host or config.get("hosts.fileserver", None) + self.host = fileserver_host.rstrip("/") + if not self.host: + log.warning(f"Fileserver host not configured") + + def _parse_url_prefix(prefix) -> Tuple[str, str]: + url = furl(prefix) + host = f"{url.scheme}://{url.netloc}" if url.scheme else None + return host, str(url.path).rstrip("/") + + url_prefixes = [ + _parse_url_prefix(p) for p in conf.get("fileserver.url_prefixes", []) + ] + if not any(self.host == host for host, _ in url_prefixes): + url_prefixes.append((self.host, "")) + self.url_prefixes = url_prefixes + + self.company = company + + # @classmethod + # def validate_fileserver_access(cls, fileserver_host: str): + # res = requests.get( + # url=fileserver_host + # ) + # res.raise_for_status() + + @property + def name(self) -> str: + return "Fileserver" + + def _resolve_base_url(self, url: UrlToDelete) -> Optional[str]: + """ + For the url return the base_url containing schema, optional host and bucket name + """ + if not url.url: + return None + + try: + parsed = furl(url.url) + url_host = f"{parsed.scheme}://{parsed.netloc}" if parsed.scheme else None + url_path = str(parsed.path) + except Exception: + return None + + for host, path_prefix in self.url_prefixes: if host and url_host != host: continue if path_prefix and not url_path.startswith(path_prefix + "/"): continue - return url_path[len(path_prefix or ""):] + url.url = url_path[len(path_prefix or "") :] + return self.host - raise ValueError("could not map path") + def group_urls( + self, urls: Sequence[UrlToDelete] + ) -> Mapping[str, Sequence[UrlToDelete]]: + return bucketize(urls, key=self._resolve_base_url) - paths = set() - path_to_id_mapping = defaultdict(list) - for url in to_delete: - try: - path = resolve_path(url) - path = path.strip("/") - if not path: - raise ValueError("Empty path") - except Exception as ex: - err = str(ex) - log.warn(f"Error getting path for {url.url}: {err}") - mark_failed(Q(id=url.id), err) - continue - - paths.add(path) - path_to_id_mapping[path].append(url.id) - - if not paths: - return - - ids_to_delete = set(chain.from_iterable(path_to_id_mapping.values())) - try: - res = requests.post( - url=furl(fileserver_host).add(path="delete_many").url, - json={"files": list(paths)}, - timeout=fileserver_timeout, - ) + def get_client(self, base: str, urls: Sequence[UrlToDelete]) -> Client: + host = base + session = requests.session() + res = session.get(url=host, timeout=self.Client.timeout) res.raise_for_status() - except Exception as ex: - err = str(ex) - log.warn(f"Error deleting {len(paths)} files from fileserver: {err}") - mark_retry_failed(list(ids_to_delete), err) - return - res_data = res.json() - deleted_ids = set( - chain.from_iterable( - path_to_id_mapping.get(path, []) - for path in list(res_data.get("deleted", {})) + return self.Client(session, host) + + +class AzureStorage(Storage): + class Client(Storage.Client): + def __init__(self, container: ContainerClient): + self.container = container + + @property + def chunk_size(self) -> int: + return 256 + + def get_path(self, url: UrlToDelete) -> str: + parsed = furl(url.url) + if ( + not parsed.path + or not parsed.path.segments + or len(parsed.path.segments) <= 1 + ): + raise ValueError("No path found following container name") + + return os.path.join(*parsed.path.segments[1:]) + + @staticmethod + def _path_from_request_url(request_url: str) -> str: + try: + return furl(request_url).path.segments[-1] + except Exception: + return request_url + + def delete_many( + self, paths: Sequence[str] + ) -> Tuple[Sequence[str], Mapping[str, Sequence[str]]]: + try: + res = self.container.delete_blobs(*paths) + except PartialBatchErrorException as pex: + deleted = [] + errors = defaultdict(list) + for part in pex.parts: + if 300 >= part.status_code >= 200: + deleted.append(self._path_from_request_url(part.request.url)) + else: + errors[part.reason].append( + self._path_from_request_url(part.request.url) + ) + return deleted, errors + + return [self._path_from_request_url(part.request.url) for part in res], {} + + def __init__(self, company: str): + self.configs = storage_bll.get_azure_settings_for_company(company) + self.scheme = "azure" + + @property + def name(self) -> str: + return "Azure" + + def _resolve_base_url(self, url: UrlToDelete) -> Optional[Tuple]: + """ + For the url return the base_url containing schema, optional host and bucket name + """ + try: + parsed = urlparse(url.url) + if parsed.scheme != self.scheme: + return None + + azure_conf = self.configs.get_config_by_uri(url.url) + if azure_conf is None: + return None + + account_url = parsed.netloc + return account_url, azure_conf.container_name + except Exception as ex: + log.warning(f"Error resolving base url for {url.url}: " + str(ex)) + return None + + def group_urls( + self, urls: Sequence[UrlToDelete] + ) -> Mapping[Tuple, Sequence[UrlToDelete]]: + return bucketize(urls, key=self._resolve_base_url) + + def get_client(self, base: Tuple, urls: Sequence[UrlToDelete]) -> Client: + account_url, container_name = base + sample_url = urls[0].url + cfg = self.configs.get_config_by_uri(sample_url) + if not cfg or not cfg.account_name or not cfg.account_key: + raise ValueError( + f"Missing account name or key for Azure Blob Storage " + f"account: {account_url}, container: {container_name}" + ) + + return self.Client( + ContainerClient( + account_url=account_url, + container_name=cfg.container_name, + credential={ + "account_name": cfg.account_name, + "account_key": cfg.account_key, + }, + ) ) - ) - if deleted_ids: - UrlToDelete.objects(id__in=list(deleted_ids)).delete() - log.info(f"{len(deleted_ids)} files deleted from the fileserver") - failed_ids = set() - for err, error_ids in res_data.get("errors", {}).items(): - error_ids = list( - chain.from_iterable(path_to_id_mapping.get(path, []) for path in error_ids) + +class AWSStorage(Storage): + class Client(Storage.Client): + def __init__(self, base_url: str, container: AWSBucket): + self.container = container + self.base_url = base_url + + @property + def chunk_size(self) -> int: + return 1000 + + def get_path(self, url: UrlToDelete) -> str: + """ Normalize remote path. Remove any prefix that is already handled by the container """ + path = url.url + if path.startswith(self.base_url): + path = path[len(self.base_url) :] + path = path.lstrip("/") + return path + + @staticmethod + def _path_from_request_url(request_url: str) -> str: + try: + return furl(request_url).path.segments[-1] + except Exception: + return request_url + + def delete_many( + self, paths: Sequence[str] + ) -> Tuple[Sequence[str], Mapping[str, Sequence[str]]]: + res = self.container.delete_objects( + Delete={"Objects": [{"Key": p} for p in paths]} + ) + errors = defaultdict(list) + for err in res.get("Errors", []): + msg = err.get("Message", "") + errors[msg].append(err.get("Key")) + + return [d.get("Key") for d in res.get("Deleted", [])], errors + + def __init__(self, company: str): + self.configs = storage_bll.get_aws_settings_for_company(company) + self.scheme = "s3" + + @property + def name(self) -> str: + return "AWS" + + def _resolve_base_url(self, url: UrlToDelete) -> Optional[str]: + """ + For the url return the base_url containing schema, optional host and bucket name + """ + try: + parsed = urlparse(url.url) + if parsed.scheme != self.scheme: + return None + + s3_conf = self.configs.get_config_by_uri(url.url) + if s3_conf is None: + return None + + s3_bucket = s3_conf.bucket + if not s3_bucket: + parts = Path(parsed.path.strip("/")).parts + if parts: + s3_bucket = parts[0] + return "/".join(filter(None, ("s3:/", s3_conf.host, s3_bucket))) + except Exception as ex: + log.warning(f"Error resolving base url for {url.url}: " + str(ex)) + return None + + def group_urls( + self, urls: Sequence[UrlToDelete] + ) -> Mapping[str, Sequence[UrlToDelete]]: + return bucketize(urls, key=self._resolve_base_url) + + def get_client(self, base: str, urls: Sequence[UrlToDelete]) -> Client: + sample_url = urls[0].url + cfg = self.configs.get_config_by_uri(sample_url) + boto_kwargs = { + "endpoint_url": (("https://" if cfg.secure else "http://") + cfg.host) + if cfg.host + else None, + "use_ssl": cfg.secure, + "verify": cfg.verify, + } + name = base[len(scheme_prefix(self.scheme)) :] + bucket_name = name[len(cfg.host) + 1 :] if cfg.host else name + if not cfg.use_credentials_chain: + if not cfg.key or not cfg.secret: + raise ValueError( + f"Missing key or secret for AWS S3 host: {cfg.host}, bucket: {str(bucket_name)}" + ) + + boto_kwargs["aws_access_key_id"] = cfg.key + boto_kwargs["aws_secret_access_key"] = cfg.secret + if cfg.token: + boto_kwargs["aws_session_token"] = cfg.token + + return self.Client( + base, boto3.resource("s3", **boto_kwargs).Bucket(bucket_name) ) - mark_retry_failed(error_ids, err) - log.warning( - f"Failed to delete {len(error_ids)} files from the fileserver due to: {err}" + + +class GoogleCloudStorage(Storage): + class Client(Storage.Client): + def __init__(self, base_url: str, container: google_storage.Bucket): + self.container = container + self.base_url = base_url + + @property + def chunk_size(self) -> int: + return 100 + + def get_path(self, url: UrlToDelete) -> str: + """ Normalize remote path. Remove any prefix that is already handled by the container """ + path = url.url + if path.startswith(self.base_url): + path = path[len(self.base_url) :] + path = path.lstrip("/") + return path + + def delete_many( + self, paths: Sequence[str] + ) -> Tuple[Sequence[str], Mapping[str, Sequence[str]]]: + not_found = set() + + def error_callback(blob: google_storage.Blob): + not_found.add(blob.name) + + self.container.delete_blobs( + [self.container.blob(p) for p in paths], on_error=error_callback, + ) + errors = {"Not found": list(not_found)} if not_found else {} + return list(set(paths) - not_found), errors + + def __init__(self, company: str): + self.configs = storage_bll.get_gs_settings_for_company(company) + self.scheme = "gs" + + @property + def name(self) -> str: + return "Google Storage" + + def _resolve_base_url(self, url: UrlToDelete) -> Optional[str]: + """ + For the url return the base_url containing schema, optional host and bucket name + """ + try: + parsed = urlparse(url.url) + if parsed.scheme != self.scheme: + return None + + gs_conf = self.configs.get_config_by_uri(url.url) + if gs_conf is None: + return None + + return str(furl(scheme=parsed.scheme, netloc=gs_conf.bucket)) + except Exception as ex: + log.warning(f"Error resolving base url for {url.url}: " + str(ex)) + return None + + def group_urls( + self, urls: Sequence[UrlToDelete] + ) -> Mapping[str, Sequence[UrlToDelete]]: + return bucketize(urls, key=self._resolve_base_url) + + def get_client(self, base: str, urls: Sequence[UrlToDelete]) -> Client: + sample_url = urls[0].url + cfg = self.configs.get_config_by_uri(sample_url) + if cfg.credentials_json: + from google.oauth2 import service_account + + credentials = service_account.Credentials.from_service_account_file( + cfg.credentials_json + ) + else: + credentials = None + + bucket_name = base[len(scheme_prefix(self.scheme)) :] + return self.Client( + base, + google_storage.Client(project=cfg.project, credentials=credentials).bucket( + bucket_name + ), ) - failed_ids.update(error_ids) - - missing_ids = ids_to_delete - deleted_ids - failed_ids - if missing_ids: - mark_retry_failed(list(missing_ids), "Not succeeded") - - -def _get_fileserver_url_prefixes(fileserver_host: str) -> Sequence[UrlPrefix]: - def _parse_url_prefix(prefix) -> UrlPrefix: - url = furl(prefix) - host = f"{url.scheme}://{url.netloc}" if url.scheme else None - return host, str(url.path).rstrip("/") - - url_prefixes = [ - _parse_url_prefix(p) for p in conf.get("fileserver.url_prefixes", []) - ] - if not any(fileserver_host == host for host, _ in url_prefixes): - url_prefixes.append((fileserver_host, "")) - - return url_prefixes def run_delete_loop(fileserver_host: str): - fileserver_host = validate_fileserver_access(fileserver_host) - - storage_delete_funcs = { + storage_helpers = { StorageType.fileserver: partial( - delete_fileserver_urls, - fileserver_host=fileserver_host, - url_prefixes=_get_fileserver_url_prefixes(fileserver_host), + FileserverStorage, fileserver_host=fileserver_host ), + StorageType.s3: AWSStorage, + StorageType.azure: AzureStorage, + StorageType.gs: GoogleCloudStorage, } while True: now = datetime.utcnow() @@ -177,7 +574,7 @@ def run_delete_loop(fileserver_host: str): ) url_to_delete: UrlToDelete = UrlToDelete.objects( - urls_query & Q(storage_type__in=list(storage_delete_funcs)) + urls_query & Q(storage_type__in=list(storage_helpers)) ).order_by("retry_count").limit(1).first() if not url_to_delete: sleep(10) @@ -192,7 +589,10 @@ def run_delete_loop(fileserver_host: str): company_storage_urls_query = urls_query & Q( company=company, storage_type=storage_type, ) - storage_delete_funcs[storage_type](urls_query=company_storage_urls_query) + delete_urls( + urls_query=company_storage_urls_query, + storage=storage_helpers[storage_type](company=company), + ) def main(): diff --git a/apiserver/requirements.txt b/apiserver/requirements.txt index 8cf0159..8f179a5 100644 --- a/apiserver/requirements.txt +++ b/apiserver/requirements.txt @@ -1,7 +1,10 @@ attrs>=22.1.0 +azure-storage-blob>=12.13.1 bcrypt>=3.1.4 boltons>=19.1.0 boto3==1.14.13 +boto3-stubs[s3]>=1.24.35 +clearml>=1.6.0,<1.7.0 dpath>=1.4.2,<2.0 elasticsearch==7.13.3 fastjsonschema>=2.8 @@ -10,6 +13,8 @@ flask-cors>=3.0.5 flask>=0.12.2 funcsigs==1.0.2 furl>=2.0.0 +google-cloud-storage==2.0.0 +protobuf==3.19.5 gunicorn>=19.7.1 humanfriendly==4.18 jinja2==2.11.3 diff --git a/apiserver/tests/automated/test_tasks_delete.py b/apiserver/tests/automated/test_tasks_delete.py index 650b150..1f4652e 100644 --- a/apiserver/tests/automated/test_tasks_delete.py +++ b/apiserver/tests/automated/test_tasks_delete.py @@ -178,9 +178,10 @@ class TestTasksResetDelete(TestService): return {url1, url2} def send_debug_image_events(self, task) -> Set[str]: + url_pattern = "url_{num}.txt" events = [ self.create_event( - task, "training_debug_image", iteration, url=f"url_{iteration}" + task, "training_debug_image", iteration, url=url_pattern.format(num=iteration) ) for iteration in range(5) ] diff --git a/fileserver/config/default/fileserver.conf b/fileserver/config/default/fileserver.conf index 13d58ef..ad1b7e2 100644 --- a/fileserver/config/default/fileserver.conf +++ b/fileserver/config/default/fileserver.conf @@ -7,7 +7,7 @@ download { } delete { - allow_batch: false + allow_batch: true } cors {