Enhance async_urls_delete feature with max_async_deleted_events_per_sec setting and fileserver timeout and prefixes

This commit is contained in:
allegroai 2022-11-29 17:41:49 +02:00
parent 9e0adc77dd
commit e3cc689528
5 changed files with 78 additions and 33 deletions

View File

@ -1084,6 +1084,18 @@ class EventBLL(object):
extra_msg, company=company_id, id=task_id
)
@staticmethod
def _get_events_deletion_params(async_delete: bool) -> dict:
if async_delete:
return {
"wait_for_completion": False,
"requests_per_second": config.get(
"services.events.max_async_deleted_events_per_sec", 1000
),
}
return {"refresh": True}
def delete_task_events(
self, company_id, task_id, allow_locked=False, model=False, async_delete=False,
):
@ -1103,11 +1115,7 @@ class EventBLL(object):
company_id=company_id,
event_type=EventType.all,
body=es_req,
**(
{"wait_for_completion": False}
if async_delete
else {"refresh": True}
),
**self._get_events_deletion_params(async_delete),
)
if not async_delete:
@ -1170,11 +1178,7 @@ class EventBLL(object):
company_id=company_id,
event_type=EventType.all,
body=es_req,
**(
{"wait_for_completion": False}
if async_delete
else {"refresh": True}
),
**self._get_events_deletion_params(async_delete),
)
if not async_delete:

View File

@ -3,3 +3,10 @@
enabled: false
max_retries: 3
retry_timeout_sec: 60
fileserver {
# fileserver url prefixes. Evaluated in the order of priority
# Can be in the form <schema>://host:port/path or /path
url_prefixes: ["https://files.community-master.hosted.allegro.ai/"]
timeout_sec: 300
}

View File

@ -39,4 +39,7 @@ events_retrieval {
validate_plot_str: false
# If not 0 then the plots equal or greater to the size will be stored compressed in the DB
plot_compression_threshold: 100000
plot_compression_threshold: 100000
# async events delete threshold
max_async_deleted_events_per_sec: 1000

View File

@ -5,7 +5,7 @@ from functools import partial
from itertools import chain
from pathlib import Path
from time import sleep
from typing import Sequence
from typing import Sequence, Tuple
import requests
from furl import furl
@ -13,13 +13,18 @@ from mongoengine import Q
from apiserver.config_repo import config
from apiserver.database import db
from apiserver.database.model.url_to_delete import UrlToDelete, DeletionStatus, StorageType
from apiserver.database.model.url_to_delete import (
UrlToDelete,
DeletionStatus,
StorageType,
)
log = config.logger(f"JOB-{Path(__file__).name}")
conf = config.get("services.async_urls_delete")
max_retries = conf.get("max_retries", 3)
retry_timeout = timedelta(seconds=conf.get("retry_timeout_sec", 60))
token_expiration_sec = 600
fileserver_timeout = conf.get("fileserver.timeout_sec", 300)
UrlPrefix = Tuple[str, str]
def validate_fileserver_access(fileserver_host: str) -> str:
@ -28,9 +33,7 @@ def validate_fileserver_access(fileserver_host: str) -> str:
log.error(f"Fileserver host not configured")
exit(1)
res = requests.get(
url=fileserver_host
)
res = requests.get(url=fileserver_host)
res.raise_for_status()
return fileserver_host
@ -55,16 +58,32 @@ def mark_failed(query: Q, reason: str):
)
def delete_fileserver_urls(urls_query: Q, fileserver_host: str):
def delete_fileserver_urls(
urls_query: Q, fileserver_host: str, url_prefixes: Sequence[UrlPrefix]
):
to_delete = list(UrlToDelete.objects(urls_query).limit(10000))
if not to_delete:
return
def resolve_path(url_: UrlToDelete) -> str:
parsed = furl(url_.url)
url_host = f"{parsed.scheme}://{parsed.netloc}" if parsed.scheme else None
url_path = str(parsed.path)
for host, path_prefix in url_prefixes:
if host and url_host != host:
continue
if path_prefix and not url_path.startswith(path_prefix + "/"):
continue
return url_path[len(path_prefix or ""):]
raise ValueError("could not map path")
paths = set()
path_to_id_mapping = defaultdict(list)
for url in to_delete:
try:
path = str(furl(url.url).path)
path = resolve_path(url)
path = path.strip("/")
if not path:
raise ValueError("Empty path")
@ -85,6 +104,7 @@ def delete_fileserver_urls(urls_query: Q, fileserver_host: str):
res = requests.post(
url=furl(fileserver_host).add(path="delete_many").url,
json={"files": list(paths)},
timeout=fileserver_timeout,
)
res.raise_for_status()
except Exception as ex:
@ -120,11 +140,29 @@ def delete_fileserver_urls(urls_query: Q, fileserver_host: str):
mark_retry_failed(list(missing_ids), "Not succeeded")
def _get_fileserver_url_prefixes(fileserver_host: str) -> Sequence[UrlPrefix]:
def _parse_url_prefix(prefix) -> UrlPrefix:
url = furl(prefix)
host = f"{url.scheme}://{url.netloc}" if url.scheme else None
return host, str(url.path).rstrip("/")
url_prefixes = [
_parse_url_prefix(p) for p in conf.get("fileserver.url_prefixes", [])
]
if not any(fileserver_host == host for host, _ in url_prefixes):
url_prefixes.append((fileserver_host, ""))
return url_prefixes
def run_delete_loop(fileserver_host: str):
fileserver_host = validate_fileserver_access(fileserver_host)
storage_delete_funcs = {
StorageType.fileserver: partial(
delete_fileserver_urls, fileserver_host=fileserver_host
delete_fileserver_urls,
fileserver_host=fileserver_host,
url_prefixes=_get_fileserver_url_prefixes(fileserver_host),
),
}
while True:
@ -154,9 +192,7 @@ def run_delete_loop(fileserver_host: str):
company_storage_urls_query = urls_query & Q(
company=company, storage_type=storage_type,
)
storage_delete_funcs[storage_type](
urls_query=company_storage_urls_query
)
storage_delete_funcs[storage_type](urls_query=company_storage_urls_query)
def main():

View File

@ -46,11 +46,6 @@ _definitions {
type: string
format: "date-time"
}
last_update {
description: "Last update time"
type: string
format: "date-time"
}
tags {
description: "User-defined tags"
type: array
@ -181,11 +176,6 @@ _definitions {
type: string
format: "date-time"
}
last_update {
description: "Last update time"
type: string
format: "date-time"
}
tags {
description: "User-defined tags"
type: array
@ -200,6 +190,11 @@ _definitions {
description: "The default output destination URL for new tasks under this project"
type: string
}
last_update {
description: """Last project update time. Reflects the last time the project metadata was changed or a task in this project has changed status"""
type: string
format: "date-time"
}
// extra properties
stats {
description: "Additional project stats"