From d397d2ae204f56f087c28eab218527d24689c3e6 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Tue, 29 Nov 2022 17:38:41 +0200 Subject: [PATCH] Support optional async events deletion when deleting tasks --- apiserver/bll/event/event_bll.py | 26 +++++++++++++++----- apiserver/bll/project/project_cleanup.py | 9 +++++-- apiserver/bll/task/task_cleanup.py | 15 +++++++---- apiserver/config/default/services/tasks.conf | 3 +++ 4 files changed, 40 insertions(+), 13 deletions(-) diff --git a/apiserver/bll/event/event_bll.py b/apiserver/bll/event/event_bll.py index 9c50c3c..7646194 100644 --- a/apiserver/bll/event/event_bll.py +++ b/apiserver/bll/event/event_bll.py @@ -1084,7 +1084,9 @@ class EventBLL(object): extra_msg, company=company_id, id=task_id ) - def delete_task_events(self, company_id, task_id, allow_locked=False, model=False): + def delete_task_events( + self, company_id, task_id, allow_locked=False, model=False, async_delete=False, + ): if model: self._validate_model_state( company_id=company_id, model_id=task_id, allow_locked=allow_locked, @@ -1101,10 +1103,15 @@ class EventBLL(object): company_id=company_id, event_type=EventType.all, body=es_req, - refresh=True, + **( + {"wait_for_completion": False} + if async_delete + else {"refresh": True} + ), ) - return es_res.get("deleted", 0) + if not async_delete: + return es_res.get("deleted", 0) def clear_task_log( self, @@ -1149,7 +1156,9 @@ class EventBLL(object): ) return es_res.get("deleted", 0) - def delete_multi_task_events(self, company_id: str, task_ids: Sequence[str]): + def delete_multi_task_events( + self, company_id: str, task_ids: Sequence[str], async_delete=False + ): """ Delete mutliple task events. No check is done for tasks write access so it should be checked by the calling code @@ -1161,10 +1170,15 @@ class EventBLL(object): company_id=company_id, event_type=EventType.all, body=es_req, - refresh=True, + **( + {"wait_for_completion": False} + if async_delete + else {"refresh": True} + ), ) - return es_res.get("deleted", 0) + if not async_delete: + return es_res.get("deleted", 0) def clear_scroll(self, scroll_id: str): if scroll_id == self.empty_scroll: diff --git a/apiserver/bll/project/project_cleanup.py b/apiserver/bll/project/project_cleanup.py index 9906785..2ba33f6 100644 --- a/apiserver/bll/project/project_cleanup.py +++ b/apiserver/bll/project/project_cleanup.py @@ -19,6 +19,7 @@ from .sub_projects import _ids_with_children log = config.logger(__file__) event_bll = EventBLL() +async_events_delete = config.get("services.tasks.async_events_delete", False) @attr.s(auto_attribs=True) @@ -162,7 +163,9 @@ def _delete_tasks(company: str, projects: Sequence[str]) -> Tuple[int, Set, Set] } ) - event_bll.delete_multi_task_events(company, list(task_ids)) + event_bll.delete_multi_task_events( + company, list(task_ids), async_delete=async_events_delete + ) deleted = tasks.delete() return deleted, event_urls, artifact_urls @@ -210,6 +213,8 @@ def _delete_models( if m.uri: model_urls.add(m.uri) - event_bll.delete_multi_task_events(company, model_ids) + event_bll.delete_multi_task_events( + company, model_ids, async_delete=async_events_delete + ) deleted = models.delete() return deleted, event_urls, model_urls diff --git a/apiserver/bll/task/task_cleanup.py b/apiserver/bll/task/task_cleanup.py index 0e106e9..53eabc1 100644 --- a/apiserver/bll/task/task_cleanup.py +++ b/apiserver/bll/task/task_cleanup.py @@ -25,6 +25,7 @@ from apiserver.database.utils import id as db_id log = config.logger(__file__) event_bll = EventBLL() +async_events_delete = config.get("services.tasks.async_events_delete", False) @attr.s(auto_attribs=True) @@ -218,12 +219,14 @@ def cleanup_task( event_urls.update(collect_plot_image_urls(task.company, m_id)) try: event_bll.delete_task_events( - task.company, m_id, allow_locked=True, model=True + task.company, + m_id, + allow_locked=True, + model=True, + async_delete=async_events_delete, ) except errors.bad_request.InvalidModelId as ex: - log.info( - f"Error deleting events for the model {m_id}: {str(ex)}" - ) + log.info(f"Error deleting events for the model {m_id}: {str(ex)}") deleted_models += Model.objects(id__in=list(model_ids)).delete() if in_use_model_ids: @@ -237,7 +240,9 @@ def cleanup_task( else: Model.objects(id__in=[m.id for m in models]).update(unset__task=1) - event_bll.delete_task_events(task.company, task.id, allow_locked=force) + event_bll.delete_task_events( + task.company, task.id, allow_locked=force, async_delete=async_events_delete + ) if delete_external_artifacts: scheduled = _schedule_for_delete( diff --git a/apiserver/config/default/services/tasks.conf b/apiserver/config/default/services/tasks.conf index fc2ea35..c51cccb 100644 --- a/apiserver/config/default/services/tasks.conf +++ b/apiserver/config/default/services/tasks.conf @@ -24,3 +24,6 @@ hyperparam_values { # the maximum amount of unique last metrics/variants combinations # for which the last values are stored in a task max_last_metrics: 2000 + +# if set then call to tasks.delete/cleanup does not wait for ES events deletion +async_events_delete: false