Add support for server-side delete for AWS S3, Google Storage and Azure Blob Storage

This commit is contained in:
allegroai 2022-12-21 18:41:16 +02:00
parent 0ffde24dc2
commit e66257761a
10 changed files with 648 additions and 124 deletions

View File

@ -9,6 +9,7 @@ from operator import attrgetter
from typing import Sequence, Set, Tuple, Optional, List, Mapping, Union
import elasticsearch
from boltons.iterutils import chunked_iter
from elasticsearch.helpers import BulkIndexError
from mongoengine import Q
from nested_dict import nested_dict
@ -1188,15 +1189,19 @@ class EventBLL(object):
Delete mutliple task events. No check is done for tasks write access
so it should be checked by the calling code
"""
es_req = {"query": {"terms": {"task": task_ids}}}
deleted = 0
with translate_errors_context():
es_res = delete_company_events(
es=self.es,
company_id=company_id,
event_type=EventType.all,
body=es_req,
**self._get_events_deletion_params(async_delete),
)
for tasks in chunked_iter(task_ids, 100):
es_req = {"query": {"terms": {"task": tasks}}}
es_res = delete_company_events(
es=self.es,
company_id=company_id,
event_type=EventType.all,
body=es_req,
**self._get_events_deletion_params(async_delete),
)
if not async_delete:
deleted += es_res.get("deleted", 0)
if not async_delete:
return es_res.get("deleted", 0)

View File

@ -0,0 +1,48 @@
from copy import copy
from boltons.cacheutils import cachedproperty
from clearml.backend_config.bucket_config import (
S3BucketConfigurations,
AzureContainerConfigurations,
GSBucketConfigurations,
)
from apiserver.config_repo import config
log = config.logger(__file__)
class StorageBLL:
default_aws_configs: S3BucketConfigurations = None
conf = config.get("services.storage_credentials")
@cachedproperty
def _default_aws_configs(self) -> S3BucketConfigurations:
return S3BucketConfigurations.from_config(self.conf.get("aws.s3"))
@cachedproperty
def _default_azure_configs(self) -> AzureContainerConfigurations:
return AzureContainerConfigurations.from_config(self.conf.get("azure.storage"))
@cachedproperty
def _default_gs_configs(self) -> GSBucketConfigurations:
return GSBucketConfigurations.from_config(self.conf.get("google.storage"))
def get_azure_settings_for_company(
self,
company_id: str,
) -> AzureContainerConfigurations:
return copy(self._default_azure_configs)
def get_gs_settings_for_company(
self,
company_id: str,
) -> GSBucketConfigurations:
return copy(self._default_gs_configs)
def get_aws_settings_for_company(
self,
company_id: str,
) -> S3BucketConfigurations:
return copy(self._default_aws_configs)

View File

@ -5,6 +5,7 @@ from typing import Sequence, Set, Tuple
import attr
from boltons.iterutils import partition, bucketize, first
from furl import furl
from mongoengine import NotUniqueError
from pymongo.errors import DuplicateKeyError
@ -106,6 +107,9 @@ def collect_debug_image_urls(company: str, task_or_model: str) -> Set[str]:
supported_storage_types = {
"https://": StorageType.fileserver,
"http://": StorageType.fileserver,
"s3://": StorageType.s3,
"azure://": StorageType.azure,
"gs://": StorageType.gs,
}
@ -129,7 +133,14 @@ def _schedule_for_delete(
for url in storage_urls:
folder = None
if delete_folders:
folder, _, _ = url.rpartition("/")
try:
parsed = furl(url)
if parsed.path and len(parsed.path.segments) > 1:
folder = parsed.remove(
args=True, fragment=True, path=parsed.path.segments[-1]
).url.rstrip("/")
except Exception as ex:
pass
to_delete = folder or url
if to_delete in scheduled_to_delete:

View File

@ -1,6 +1,6 @@
# if set to True then on task delete/reset external file urls for know storage types are scheduled for async delete
# otherwise they are returned to a client for the client side delete
enabled: false
enabled: true
max_retries: 3
retry_timeout_sec: 60
@ -9,4 +9,4 @@ fileserver {
# Can be in the form <schema>://host:port/path or /path
url_prefixes: ["https://files.community-master.hosted.allegro.ai/"]
timeout_sec: 300
}
}

View File

@ -0,0 +1,53 @@
aws {
s3 {
# S3 credentials, used for read/write access by various SDK elements
# default, used for any bucket not specified below
key: ""
secret: ""
region: ""
use_credentials_chain: false
# Additional ExtraArgs passed to boto3 when uploading files. Can also be set per-bucket under "credentials".
extra_args: {}
credentials: [
# specifies key/secret credentials to use when handling s3 urls (read or write)
# {
# bucket: "my-bucket-name"
# key: "my-access-key"
# secret: "my-secret-key"
# },
{
# This will apply to all buckets in this host (unless key/value is specifically provided for a given bucket)
host: "localhost:9000"
key: "evg_user"
secret: "evg_pass"
multipart: false
secure: false
}
]
}
}
google.storage {
# Default project and credentials file
# Will be used when no bucket configuration is found
// project: "clearml"
// credentials_json: "/path/to/credentials.json"
//
// # Specific credentials per bucket and sub directory
// credentials = [
// {
// bucket: "my-bucket"
// subdir: "path/in/bucket" # Not required
// project: "clearml"
// credentials_json: "/path/to/credentials.json"
// },
// ]
}
azure.storage {
# containers: [
# {
# account_name: "clearml"
# account_key: "secret"
# # container_name:
# }
# ]
}

View File

@ -8,6 +8,9 @@ from apiserver.database.model import AttributedDocument
class StorageType(str, Enum):
fileserver = "fileserver"
s3 = "s3"
azure = "azure"
gs = "gs"
unknown = "unknown"
@ -32,10 +35,8 @@ class UrlToDelete(AttributedDocument):
"strict": strict,
"indexes": [
("company", "user", "task"),
"storage_type",
"created",
"retry_count",
"type",
("company", "storage_type", "url"),
("status", "retry_count", "storage_type"),
],
}

View File

@ -1,3 +1,5 @@
import os
from abc import ABC, ABCMeta, abstractmethod
from argparse import ArgumentParser
from collections import defaultdict
from datetime import datetime, timedelta
@ -5,38 +7,28 @@ from functools import partial
from itertools import chain
from pathlib import Path
from time import sleep
from typing import Sequence, Tuple
from typing import Sequence, Optional, Tuple, Mapping, TypeVar, Hashable, Generic
from urllib.parse import urlparse
import boto3
import requests
from azure.storage.blob import ContainerClient, PartialBatchErrorException
from boltons.iterutils import bucketize, chunked_iter
from furl import furl
from google.cloud import storage as google_storage
from mongoengine import Q
from mypy_boto3_s3.service_resource import Bucket as AWSBucket
from apiserver.bll.storage import StorageBLL
from apiserver.config_repo import config
from apiserver.database import db
from apiserver.database.model.url_to_delete import (
UrlToDelete,
DeletionStatus,
StorageType,
)
from apiserver.database.model.url_to_delete import UrlToDelete, StorageType, DeletionStatus
log = config.logger(f"JOB-{Path(__file__).name}")
conf = config.get("services.async_urls_delete")
max_retries = conf.get("max_retries", 3)
retry_timeout = timedelta(seconds=conf.get("retry_timeout_sec", 60))
fileserver_timeout = conf.get("fileserver.timeout_sec", 300)
UrlPrefix = Tuple[str, str]
def validate_fileserver_access(fileserver_host: str) -> str:
fileserver_host = fileserver_host or config.get("hosts.fileserver", None)
if not fileserver_host:
log.error(f"Fileserver host not configured")
exit(1)
res = requests.get(url=fileserver_host)
res.raise_for_status()
return fileserver_host
storage_bll = StorageBLL()
def mark_retry_failed(ids: Sequence[str], reason: str):
@ -58,112 +50,517 @@ def mark_failed(query: Q, reason: str):
)
def delete_fileserver_urls(
urls_query: Q, fileserver_host: str, url_prefixes: Sequence[UrlPrefix]
):
to_delete = list(UrlToDelete.objects(urls_query).limit(10000))
def scheme_prefix(scheme: str) -> str:
return str(furl(scheme=scheme, netloc=""))
T = TypeVar("T", bound=Hashable)
class Storage(Generic[T], metaclass=ABCMeta):
class Client(ABC):
@property
@abstractmethod
def chunk_size(self) -> int:
pass
def get_path(self, url: UrlToDelete) -> str:
pass
def delete_many(
self, paths: Sequence[str]
) -> Tuple[Sequence[str], Mapping[str, Sequence[str]]]:
pass
@property
@abstractmethod
def name(self) -> str:
pass
def group_urls(
self, urls: Sequence[UrlToDelete]
) -> Mapping[T, Sequence[UrlToDelete]]:
pass
def get_client(self, base: T, urls: Sequence[UrlToDelete]) -> Client:
pass
def delete_urls(urls_query: Q, storage: Storage):
to_delete = list(UrlToDelete.objects(urls_query).order_by("url").limit(10000))
if not to_delete:
return
def resolve_path(url_: UrlToDelete) -> str:
parsed = furl(url_.url)
url_host = f"{parsed.scheme}://{parsed.netloc}" if parsed.scheme else None
url_path = str(parsed.path)
grouped_urls = storage.group_urls(to_delete)
for base, urls in grouped_urls.items():
if not base:
msg = f"Invalid {storage.name} url or missing {storage.name} configuration for account"
mark_failed(
Q(id__in=[url.id for url in urls]), msg,
)
log.warning(
f"Failed to delete {len(urls)} files from {storage.name} due to: {msg}"
)
continue
for host, path_prefix in url_prefixes:
try:
client = storage.get_client(base, urls)
except Exception as ex:
failed = [url.id for url in urls]
mark_retry_failed(failed, reason=str(ex))
log.warning(
f"Failed to delete {len(failed)} files from {storage.name} due to: {str(ex)}"
)
continue
for chunk in chunked_iter(urls, client.chunk_size):
paths = []
path_to_id_mapping = defaultdict(list)
ids_to_delete = set()
for url in chunk:
try:
path = client.get_path(url)
except Exception as ex:
err = str(ex)
mark_failed(Q(id=url.id), err)
log.warning(f"Error getting path for {url.url}: {err}")
continue
paths.append(path)
path_to_id_mapping[path].append(url.id)
ids_to_delete.add(url.id)
if not paths:
continue
try:
deleted_paths, errors = client.delete_many(paths)
except Exception as ex:
mark_retry_failed([url.id for url in urls], str(ex))
log.warning(
f"Error deleting {len(paths)} files from {storage.name}: {str(ex)}"
)
continue
failed_ids = set()
for reason, err_paths in errors.items():
error_ids = set(
chain.from_iterable(
path_to_id_mapping.get(p, []) for p in err_paths
)
)
mark_retry_failed(list(error_ids), reason)
log.warning(
f"Failed to delete {len(error_ids)} files from {storage.name} storage due to: {reason}"
)
failed_ids.update(error_ids)
deleted_ids = set(
chain.from_iterable(
path_to_id_mapping.get(p, []) for p in deleted_paths
)
)
if deleted_ids:
UrlToDelete.objects(id__in=list(deleted_ids)).delete()
log.info(
f"{len(deleted_ids)} files deleted from {storage.name} storage"
)
missing_ids = ids_to_delete - deleted_ids - failed_ids
if missing_ids:
mark_retry_failed(list(missing_ids), "Not succeeded")
class FileserverStorage(Storage):
class Client(Storage.Client):
timeout = conf.get("fileserver.timeout_sec", 300)
def __init__(self, session: requests.Session, host: str):
self.session = session
self.delete_url = furl(host).add(path="delete_many").url
@property
def chunk_size(self) -> int:
return 10000
def get_path(self, url: UrlToDelete) -> str:
path = url.url.strip("/")
if not path:
raise ValueError("Empty path")
return path
def delete_many(
self, paths: Sequence[str]
) -> Tuple[Sequence[str], Mapping[str, Sequence[str]]]:
res = self.session.post(
url=self.delete_url, json={"files": list(paths)}, timeout=self.timeout
)
res.raise_for_status()
res_data = res.json()
return list(res_data.get("deleted", {})), res_data.get("errors", {})
def __init__(self, company: str, fileserver_host: str = None):
fileserver_host = fileserver_host or config.get("hosts.fileserver", None)
self.host = fileserver_host.rstrip("/")
if not self.host:
log.warning(f"Fileserver host not configured")
def _parse_url_prefix(prefix) -> Tuple[str, str]:
url = furl(prefix)
host = f"{url.scheme}://{url.netloc}" if url.scheme else None
return host, str(url.path).rstrip("/")
url_prefixes = [
_parse_url_prefix(p) for p in conf.get("fileserver.url_prefixes", [])
]
if not any(self.host == host for host, _ in url_prefixes):
url_prefixes.append((self.host, ""))
self.url_prefixes = url_prefixes
self.company = company
# @classmethod
# def validate_fileserver_access(cls, fileserver_host: str):
# res = requests.get(
# url=fileserver_host
# )
# res.raise_for_status()
@property
def name(self) -> str:
return "Fileserver"
def _resolve_base_url(self, url: UrlToDelete) -> Optional[str]:
"""
For the url return the base_url containing schema, optional host and bucket name
"""
if not url.url:
return None
try:
parsed = furl(url.url)
url_host = f"{parsed.scheme}://{parsed.netloc}" if parsed.scheme else None
url_path = str(parsed.path)
except Exception:
return None
for host, path_prefix in self.url_prefixes:
if host and url_host != host:
continue
if path_prefix and not url_path.startswith(path_prefix + "/"):
continue
return url_path[len(path_prefix or ""):]
url.url = url_path[len(path_prefix or "") :]
return self.host
raise ValueError("could not map path")
def group_urls(
self, urls: Sequence[UrlToDelete]
) -> Mapping[str, Sequence[UrlToDelete]]:
return bucketize(urls, key=self._resolve_base_url)
paths = set()
path_to_id_mapping = defaultdict(list)
for url in to_delete:
try:
path = resolve_path(url)
path = path.strip("/")
if not path:
raise ValueError("Empty path")
except Exception as ex:
err = str(ex)
log.warn(f"Error getting path for {url.url}: {err}")
mark_failed(Q(id=url.id), err)
continue
paths.add(path)
path_to_id_mapping[path].append(url.id)
if not paths:
return
ids_to_delete = set(chain.from_iterable(path_to_id_mapping.values()))
try:
res = requests.post(
url=furl(fileserver_host).add(path="delete_many").url,
json={"files": list(paths)},
timeout=fileserver_timeout,
)
def get_client(self, base: str, urls: Sequence[UrlToDelete]) -> Client:
host = base
session = requests.session()
res = session.get(url=host, timeout=self.Client.timeout)
res.raise_for_status()
except Exception as ex:
err = str(ex)
log.warn(f"Error deleting {len(paths)} files from fileserver: {err}")
mark_retry_failed(list(ids_to_delete), err)
return
res_data = res.json()
deleted_ids = set(
chain.from_iterable(
path_to_id_mapping.get(path, [])
for path in list(res_data.get("deleted", {}))
return self.Client(session, host)
class AzureStorage(Storage):
class Client(Storage.Client):
def __init__(self, container: ContainerClient):
self.container = container
@property
def chunk_size(self) -> int:
return 256
def get_path(self, url: UrlToDelete) -> str:
parsed = furl(url.url)
if (
not parsed.path
or not parsed.path.segments
or len(parsed.path.segments) <= 1
):
raise ValueError("No path found following container name")
return os.path.join(*parsed.path.segments[1:])
@staticmethod
def _path_from_request_url(request_url: str) -> str:
try:
return furl(request_url).path.segments[-1]
except Exception:
return request_url
def delete_many(
self, paths: Sequence[str]
) -> Tuple[Sequence[str], Mapping[str, Sequence[str]]]:
try:
res = self.container.delete_blobs(*paths)
except PartialBatchErrorException as pex:
deleted = []
errors = defaultdict(list)
for part in pex.parts:
if 300 >= part.status_code >= 200:
deleted.append(self._path_from_request_url(part.request.url))
else:
errors[part.reason].append(
self._path_from_request_url(part.request.url)
)
return deleted, errors
return [self._path_from_request_url(part.request.url) for part in res], {}
def __init__(self, company: str):
self.configs = storage_bll.get_azure_settings_for_company(company)
self.scheme = "azure"
@property
def name(self) -> str:
return "Azure"
def _resolve_base_url(self, url: UrlToDelete) -> Optional[Tuple]:
"""
For the url return the base_url containing schema, optional host and bucket name
"""
try:
parsed = urlparse(url.url)
if parsed.scheme != self.scheme:
return None
azure_conf = self.configs.get_config_by_uri(url.url)
if azure_conf is None:
return None
account_url = parsed.netloc
return account_url, azure_conf.container_name
except Exception as ex:
log.warning(f"Error resolving base url for {url.url}: " + str(ex))
return None
def group_urls(
self, urls: Sequence[UrlToDelete]
) -> Mapping[Tuple, Sequence[UrlToDelete]]:
return bucketize(urls, key=self._resolve_base_url)
def get_client(self, base: Tuple, urls: Sequence[UrlToDelete]) -> Client:
account_url, container_name = base
sample_url = urls[0].url
cfg = self.configs.get_config_by_uri(sample_url)
if not cfg or not cfg.account_name or not cfg.account_key:
raise ValueError(
f"Missing account name or key for Azure Blob Storage "
f"account: {account_url}, container: {container_name}"
)
return self.Client(
ContainerClient(
account_url=account_url,
container_name=cfg.container_name,
credential={
"account_name": cfg.account_name,
"account_key": cfg.account_key,
},
)
)
)
if deleted_ids:
UrlToDelete.objects(id__in=list(deleted_ids)).delete()
log.info(f"{len(deleted_ids)} files deleted from the fileserver")
failed_ids = set()
for err, error_ids in res_data.get("errors", {}).items():
error_ids = list(
chain.from_iterable(path_to_id_mapping.get(path, []) for path in error_ids)
class AWSStorage(Storage):
class Client(Storage.Client):
def __init__(self, base_url: str, container: AWSBucket):
self.container = container
self.base_url = base_url
@property
def chunk_size(self) -> int:
return 1000
def get_path(self, url: UrlToDelete) -> str:
""" Normalize remote path. Remove any prefix that is already handled by the container """
path = url.url
if path.startswith(self.base_url):
path = path[len(self.base_url) :]
path = path.lstrip("/")
return path
@staticmethod
def _path_from_request_url(request_url: str) -> str:
try:
return furl(request_url).path.segments[-1]
except Exception:
return request_url
def delete_many(
self, paths: Sequence[str]
) -> Tuple[Sequence[str], Mapping[str, Sequence[str]]]:
res = self.container.delete_objects(
Delete={"Objects": [{"Key": p} for p in paths]}
)
errors = defaultdict(list)
for err in res.get("Errors", []):
msg = err.get("Message", "")
errors[msg].append(err.get("Key"))
return [d.get("Key") for d in res.get("Deleted", [])], errors
def __init__(self, company: str):
self.configs = storage_bll.get_aws_settings_for_company(company)
self.scheme = "s3"
@property
def name(self) -> str:
return "AWS"
def _resolve_base_url(self, url: UrlToDelete) -> Optional[str]:
"""
For the url return the base_url containing schema, optional host and bucket name
"""
try:
parsed = urlparse(url.url)
if parsed.scheme != self.scheme:
return None
s3_conf = self.configs.get_config_by_uri(url.url)
if s3_conf is None:
return None
s3_bucket = s3_conf.bucket
if not s3_bucket:
parts = Path(parsed.path.strip("/")).parts
if parts:
s3_bucket = parts[0]
return "/".join(filter(None, ("s3:/", s3_conf.host, s3_bucket)))
except Exception as ex:
log.warning(f"Error resolving base url for {url.url}: " + str(ex))
return None
def group_urls(
self, urls: Sequence[UrlToDelete]
) -> Mapping[str, Sequence[UrlToDelete]]:
return bucketize(urls, key=self._resolve_base_url)
def get_client(self, base: str, urls: Sequence[UrlToDelete]) -> Client:
sample_url = urls[0].url
cfg = self.configs.get_config_by_uri(sample_url)
boto_kwargs = {
"endpoint_url": (("https://" if cfg.secure else "http://") + cfg.host)
if cfg.host
else None,
"use_ssl": cfg.secure,
"verify": cfg.verify,
}
name = base[len(scheme_prefix(self.scheme)) :]
bucket_name = name[len(cfg.host) + 1 :] if cfg.host else name
if not cfg.use_credentials_chain:
if not cfg.key or not cfg.secret:
raise ValueError(
f"Missing key or secret for AWS S3 host: {cfg.host}, bucket: {str(bucket_name)}"
)
boto_kwargs["aws_access_key_id"] = cfg.key
boto_kwargs["aws_secret_access_key"] = cfg.secret
if cfg.token:
boto_kwargs["aws_session_token"] = cfg.token
return self.Client(
base, boto3.resource("s3", **boto_kwargs).Bucket(bucket_name)
)
mark_retry_failed(error_ids, err)
log.warning(
f"Failed to delete {len(error_ids)} files from the fileserver due to: {err}"
class GoogleCloudStorage(Storage):
class Client(Storage.Client):
def __init__(self, base_url: str, container: google_storage.Bucket):
self.container = container
self.base_url = base_url
@property
def chunk_size(self) -> int:
return 100
def get_path(self, url: UrlToDelete) -> str:
""" Normalize remote path. Remove any prefix that is already handled by the container """
path = url.url
if path.startswith(self.base_url):
path = path[len(self.base_url) :]
path = path.lstrip("/")
return path
def delete_many(
self, paths: Sequence[str]
) -> Tuple[Sequence[str], Mapping[str, Sequence[str]]]:
not_found = set()
def error_callback(blob: google_storage.Blob):
not_found.add(blob.name)
self.container.delete_blobs(
[self.container.blob(p) for p in paths], on_error=error_callback,
)
errors = {"Not found": list(not_found)} if not_found else {}
return list(set(paths) - not_found), errors
def __init__(self, company: str):
self.configs = storage_bll.get_gs_settings_for_company(company)
self.scheme = "gs"
@property
def name(self) -> str:
return "Google Storage"
def _resolve_base_url(self, url: UrlToDelete) -> Optional[str]:
"""
For the url return the base_url containing schema, optional host and bucket name
"""
try:
parsed = urlparse(url.url)
if parsed.scheme != self.scheme:
return None
gs_conf = self.configs.get_config_by_uri(url.url)
if gs_conf is None:
return None
return str(furl(scheme=parsed.scheme, netloc=gs_conf.bucket))
except Exception as ex:
log.warning(f"Error resolving base url for {url.url}: " + str(ex))
return None
def group_urls(
self, urls: Sequence[UrlToDelete]
) -> Mapping[str, Sequence[UrlToDelete]]:
return bucketize(urls, key=self._resolve_base_url)
def get_client(self, base: str, urls: Sequence[UrlToDelete]) -> Client:
sample_url = urls[0].url
cfg = self.configs.get_config_by_uri(sample_url)
if cfg.credentials_json:
from google.oauth2 import service_account
credentials = service_account.Credentials.from_service_account_file(
cfg.credentials_json
)
else:
credentials = None
bucket_name = base[len(scheme_prefix(self.scheme)) :]
return self.Client(
base,
google_storage.Client(project=cfg.project, credentials=credentials).bucket(
bucket_name
),
)
failed_ids.update(error_ids)
missing_ids = ids_to_delete - deleted_ids - failed_ids
if missing_ids:
mark_retry_failed(list(missing_ids), "Not succeeded")
def _get_fileserver_url_prefixes(fileserver_host: str) -> Sequence[UrlPrefix]:
def _parse_url_prefix(prefix) -> UrlPrefix:
url = furl(prefix)
host = f"{url.scheme}://{url.netloc}" if url.scheme else None
return host, str(url.path).rstrip("/")
url_prefixes = [
_parse_url_prefix(p) for p in conf.get("fileserver.url_prefixes", [])
]
if not any(fileserver_host == host for host, _ in url_prefixes):
url_prefixes.append((fileserver_host, ""))
return url_prefixes
def run_delete_loop(fileserver_host: str):
fileserver_host = validate_fileserver_access(fileserver_host)
storage_delete_funcs = {
storage_helpers = {
StorageType.fileserver: partial(
delete_fileserver_urls,
fileserver_host=fileserver_host,
url_prefixes=_get_fileserver_url_prefixes(fileserver_host),
FileserverStorage, fileserver_host=fileserver_host
),
StorageType.s3: AWSStorage,
StorageType.azure: AzureStorage,
StorageType.gs: GoogleCloudStorage,
}
while True:
now = datetime.utcnow()
@ -177,7 +574,7 @@ def run_delete_loop(fileserver_host: str):
)
url_to_delete: UrlToDelete = UrlToDelete.objects(
urls_query & Q(storage_type__in=list(storage_delete_funcs))
urls_query & Q(storage_type__in=list(storage_helpers))
).order_by("retry_count").limit(1).first()
if not url_to_delete:
sleep(10)
@ -192,7 +589,10 @@ def run_delete_loop(fileserver_host: str):
company_storage_urls_query = urls_query & Q(
company=company, storage_type=storage_type,
)
storage_delete_funcs[storage_type](urls_query=company_storage_urls_query)
delete_urls(
urls_query=company_storage_urls_query,
storage=storage_helpers[storage_type](company=company),
)
def main():

View File

@ -1,7 +1,10 @@
attrs>=22.1.0
azure-storage-blob>=12.13.1
bcrypt>=3.1.4
boltons>=19.1.0
boto3==1.14.13
boto3-stubs[s3]>=1.24.35
clearml>=1.6.0,<1.7.0
dpath>=1.4.2,<2.0
elasticsearch==7.13.3
fastjsonschema>=2.8
@ -10,6 +13,8 @@ flask-cors>=3.0.5
flask>=0.12.2
funcsigs==1.0.2
furl>=2.0.0
google-cloud-storage==2.0.0
protobuf==3.19.5
gunicorn>=19.7.1
humanfriendly==4.18
jinja2==2.11.3

View File

@ -178,9 +178,10 @@ class TestTasksResetDelete(TestService):
return {url1, url2}
def send_debug_image_events(self, task) -> Set[str]:
url_pattern = "url_{num}.txt"
events = [
self.create_event(
task, "training_debug_image", iteration, url=f"url_{iteration}"
task, "training_debug_image", iteration, url=url_pattern.format(num=iteration)
)
for iteration in range(5)
]

View File

@ -7,7 +7,7 @@ download {
}
delete {
allow_batch: false
allow_batch: true
}
cors {