From e3cc689528c2471484f556354520aaee899e9b67 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Tue, 29 Nov 2022 17:41:49 +0200 Subject: [PATCH] Enhance async_urls_delete feature with max_async_deleted_events_per_sec setting and fileserver timeout and prefixes --- apiserver/bll/event/event_bll.py | 24 ++++---- .../default/services/async_urls_delete.conf | 7 +++ apiserver/config/default/services/events.conf | 5 +- apiserver/jobs/async_urls_delete.py | 60 +++++++++++++++---- apiserver/schema/services/projects.conf | 15 ++--- 5 files changed, 78 insertions(+), 33 deletions(-) diff --git a/apiserver/bll/event/event_bll.py b/apiserver/bll/event/event_bll.py index 7646194..a307f96 100644 --- a/apiserver/bll/event/event_bll.py +++ b/apiserver/bll/event/event_bll.py @@ -1084,6 +1084,18 @@ class EventBLL(object): extra_msg, company=company_id, id=task_id ) + @staticmethod + def _get_events_deletion_params(async_delete: bool) -> dict: + if async_delete: + return { + "wait_for_completion": False, + "requests_per_second": config.get( + "services.events.max_async_deleted_events_per_sec", 1000 + ), + } + + return {"refresh": True} + def delete_task_events( self, company_id, task_id, allow_locked=False, model=False, async_delete=False, ): @@ -1103,11 +1115,7 @@ class EventBLL(object): company_id=company_id, event_type=EventType.all, body=es_req, - **( - {"wait_for_completion": False} - if async_delete - else {"refresh": True} - ), + **self._get_events_deletion_params(async_delete), ) if not async_delete: @@ -1170,11 +1178,7 @@ class EventBLL(object): company_id=company_id, event_type=EventType.all, body=es_req, - **( - {"wait_for_completion": False} - if async_delete - else {"refresh": True} - ), + **self._get_events_deletion_params(async_delete), ) if not async_delete: diff --git a/apiserver/config/default/services/async_urls_delete.conf b/apiserver/config/default/services/async_urls_delete.conf index e3bda36..b997e33 100644 --- a/apiserver/config/default/services/async_urls_delete.conf +++ b/apiserver/config/default/services/async_urls_delete.conf @@ -3,3 +3,10 @@ enabled: false max_retries: 3 retry_timeout_sec: 60 + +fileserver { + # fileserver url prefixes. Evaluated in the order of priority + # Can be in the form ://host:port/path or /path + url_prefixes: ["https://files.community-master.hosted.allegro.ai/"] + timeout_sec: 300 +} \ No newline at end of file diff --git a/apiserver/config/default/services/events.conf b/apiserver/config/default/services/events.conf index a144abf..71b9e8a 100644 --- a/apiserver/config/default/services/events.conf +++ b/apiserver/config/default/services/events.conf @@ -39,4 +39,7 @@ events_retrieval { validate_plot_str: false # If not 0 then the plots equal or greater to the size will be stored compressed in the DB -plot_compression_threshold: 100000 \ No newline at end of file +plot_compression_threshold: 100000 + +# async events delete threshold +max_async_deleted_events_per_sec: 1000 \ No newline at end of file diff --git a/apiserver/jobs/async_urls_delete.py b/apiserver/jobs/async_urls_delete.py index ef547d9..1a13da6 100644 --- a/apiserver/jobs/async_urls_delete.py +++ b/apiserver/jobs/async_urls_delete.py @@ -5,7 +5,7 @@ from functools import partial from itertools import chain from pathlib import Path from time import sleep -from typing import Sequence +from typing import Sequence, Tuple import requests from furl import furl @@ -13,13 +13,18 @@ from mongoengine import Q from apiserver.config_repo import config from apiserver.database import db -from apiserver.database.model.url_to_delete import UrlToDelete, DeletionStatus, StorageType +from apiserver.database.model.url_to_delete import ( + UrlToDelete, + DeletionStatus, + StorageType, +) log = config.logger(f"JOB-{Path(__file__).name}") conf = config.get("services.async_urls_delete") max_retries = conf.get("max_retries", 3) retry_timeout = timedelta(seconds=conf.get("retry_timeout_sec", 60)) -token_expiration_sec = 600 +fileserver_timeout = conf.get("fileserver.timeout_sec", 300) +UrlPrefix = Tuple[str, str] def validate_fileserver_access(fileserver_host: str) -> str: @@ -28,9 +33,7 @@ def validate_fileserver_access(fileserver_host: str) -> str: log.error(f"Fileserver host not configured") exit(1) - res = requests.get( - url=fileserver_host - ) + res = requests.get(url=fileserver_host) res.raise_for_status() return fileserver_host @@ -55,16 +58,32 @@ def mark_failed(query: Q, reason: str): ) -def delete_fileserver_urls(urls_query: Q, fileserver_host: str): +def delete_fileserver_urls( + urls_query: Q, fileserver_host: str, url_prefixes: Sequence[UrlPrefix] +): to_delete = list(UrlToDelete.objects(urls_query).limit(10000)) if not to_delete: return + def resolve_path(url_: UrlToDelete) -> str: + parsed = furl(url_.url) + url_host = f"{parsed.scheme}://{parsed.netloc}" if parsed.scheme else None + url_path = str(parsed.path) + + for host, path_prefix in url_prefixes: + if host and url_host != host: + continue + if path_prefix and not url_path.startswith(path_prefix + "/"): + continue + return url_path[len(path_prefix or ""):] + + raise ValueError("could not map path") + paths = set() path_to_id_mapping = defaultdict(list) for url in to_delete: try: - path = str(furl(url.url).path) + path = resolve_path(url) path = path.strip("/") if not path: raise ValueError("Empty path") @@ -85,6 +104,7 @@ def delete_fileserver_urls(urls_query: Q, fileserver_host: str): res = requests.post( url=furl(fileserver_host).add(path="delete_many").url, json={"files": list(paths)}, + timeout=fileserver_timeout, ) res.raise_for_status() except Exception as ex: @@ -120,11 +140,29 @@ def delete_fileserver_urls(urls_query: Q, fileserver_host: str): mark_retry_failed(list(missing_ids), "Not succeeded") +def _get_fileserver_url_prefixes(fileserver_host: str) -> Sequence[UrlPrefix]: + def _parse_url_prefix(prefix) -> UrlPrefix: + url = furl(prefix) + host = f"{url.scheme}://{url.netloc}" if url.scheme else None + return host, str(url.path).rstrip("/") + + url_prefixes = [ + _parse_url_prefix(p) for p in conf.get("fileserver.url_prefixes", []) + ] + if not any(fileserver_host == host for host, _ in url_prefixes): + url_prefixes.append((fileserver_host, "")) + + return url_prefixes + + def run_delete_loop(fileserver_host: str): fileserver_host = validate_fileserver_access(fileserver_host) + storage_delete_funcs = { StorageType.fileserver: partial( - delete_fileserver_urls, fileserver_host=fileserver_host + delete_fileserver_urls, + fileserver_host=fileserver_host, + url_prefixes=_get_fileserver_url_prefixes(fileserver_host), ), } while True: @@ -154,9 +192,7 @@ def run_delete_loop(fileserver_host: str): company_storage_urls_query = urls_query & Q( company=company, storage_type=storage_type, ) - storage_delete_funcs[storage_type]( - urls_query=company_storage_urls_query - ) + storage_delete_funcs[storage_type](urls_query=company_storage_urls_query) def main(): diff --git a/apiserver/schema/services/projects.conf b/apiserver/schema/services/projects.conf index 66cc220..6b87936 100644 --- a/apiserver/schema/services/projects.conf +++ b/apiserver/schema/services/projects.conf @@ -46,11 +46,6 @@ _definitions { type: string format: "date-time" } - last_update { - description: "Last update time" - type: string - format: "date-time" - } tags { description: "User-defined tags" type: array @@ -181,11 +176,6 @@ _definitions { type: string format: "date-time" } - last_update { - description: "Last update time" - type: string - format: "date-time" - } tags { description: "User-defined tags" type: array @@ -200,6 +190,11 @@ _definitions { description: "The default output destination URL for new tasks under this project" type: string } + last_update { + description: """Last project update time. Reflects the last time the project metadata was changed or a task in this project has changed status""" + type: string + format: "date-time" + } // extra properties stats { description: "Additional project stats"