From 4223fe73d1b8870759562007a0667598057b4bc8 Mon Sep 17 00:00:00 2001 From: clearml <> Date: Thu, 5 Dec 2024 19:13:06 +0200 Subject: [PATCH] Single task/model delete waits for events deletion in order to mitigate too many ES open scrolls due to repeated calls --- apiserver/bll/event/event_bll.py | 14 ++++++++--- apiserver/bll/project/project_cleanup.py | 4 ++-- apiserver/bll/task/task_cleanup.py | 30 ++++++++++++++---------- apiserver/services/events.py | 4 ++-- apiserver/services/models.py | 11 ++++++--- apiserver/services/pipelines.py | 3 ++- apiserver/services/tasks.py | 19 ++++++++++++--- 7 files changed, 58 insertions(+), 27 deletions(-) diff --git a/apiserver/bll/event/event_bll.py b/apiserver/bll/event/event_bll.py index fa2e880..034572f 100644 --- a/apiserver/bll/event/event_bll.py +++ b/apiserver/bll/event/event_bll.py @@ -660,7 +660,9 @@ class EventBLL(object): Release the scroll once it is exhausted """ total_events = nested_get(es_res, ("hits", "total", "value"), default=0) - events = [doc["_source"] for doc in nested_get(es_res, ("hits", "hits"), default=[])] + events = [ + doc["_source"] for doc in nested_get(es_res, ("hits", "hits"), default=[]) + ] next_scroll_id = es_res.get("_scroll_id") if next_scroll_id and not events: self.clear_scroll(next_scroll_id) @@ -1161,7 +1163,13 @@ class EventBLL(object): return {"refresh": True} - def delete_task_events(self, company_id, task_ids: Union[str, Sequence[str]], model=False): + def delete_task_events( + self, + company_id, + task_ids: Union[str, Sequence[str]], + wait_for_delete: bool, + model=False, + ): """ Delete task events. No check is done for tasks write access so it should be checked by the calling code @@ -1170,7 +1178,7 @@ class EventBLL(object): task_ids = [task_ids] deleted = 0 with translate_errors_context(): - async_delete = async_task_events_delete + async_delete = async_task_events_delete and not wait_for_delete if async_delete and len(task_ids) < 100: total = self.events_iterator.count_task_events( event_type=EventType.all, diff --git a/apiserver/bll/project/project_cleanup.py b/apiserver/bll/project/project_cleanup.py index 8386cc0..d137b7c 100644 --- a/apiserver/bll/project/project_cleanup.py +++ b/apiserver/bll/project/project_cleanup.py @@ -253,7 +253,7 @@ def _delete_tasks( ) event_urls = delete_task_events_and_collect_urls( - company=company, task_ids=task_ids + company=company, task_ids=task_ids, wait_for_delete=False ) deleted = tasks.delete() @@ -317,7 +317,7 @@ def _delete_models( model_urls = {m.uri for m in models if m.uri} event_urls = delete_task_events_and_collect_urls( - company=company, task_ids=model_ids, model=True + company=company, task_ids=model_ids, model=True, wait_for_delete=False ) deleted = models.delete() diff --git a/apiserver/bll/task/task_cleanup.py b/apiserver/bll/task/task_cleanup.py index 57f78c5..c9b91a8 100644 --- a/apiserver/bll/task/task_cleanup.py +++ b/apiserver/bll/task/task_cleanup.py @@ -79,7 +79,7 @@ class CleanupResult: updated_models=self.updated_models + other.updated_models, deleted_models=self.deleted_models + other.deleted_models, urls=self.urls + other.urls if self.urls else other.urls, - deleted_model_ids=self.deleted_model_ids | other.deleted_model_ids + deleted_model_ids=self.deleted_model_ids | other.deleted_model_ids, ) @staticmethod @@ -222,13 +222,15 @@ def schedule_for_delete( def delete_task_events_and_collect_urls( - company: str, task_ids: Sequence[str], model=False + company: str, task_ids: Sequence[str], wait_for_delete: bool, model=False ) -> Set[str]: - event_urls = collect_debug_image_urls( + event_urls = collect_debug_image_urls(company, task_ids) | collect_plot_image_urls( company, task_ids - ) | collect_plot_image_urls(company, task_ids) + ) - event_bll.delete_task_events(company, task_ids, model=model) + event_bll.delete_task_events( + company, task_ids, model=model, wait_for_delete=wait_for_delete + ) return event_urls @@ -250,14 +252,16 @@ def cleanup_task( published_models, draft_models, in_use_model_ids = verify_task_children_and_ouptuts( task, force ) - artifact_urls = { - a.uri - for a in task.execution.artifacts.values() - if a.mode == ArtifactModes.output and a.uri - } if task.execution and task.execution.artifacts else {} - model_urls = { - m.uri for m in draft_models if m.uri and m.id not in in_use_model_ids - } + artifact_urls = ( + { + a.uri + for a in task.execution.artifacts.values() + if a.mode == ArtifactModes.output and a.uri + } + if task.execution and task.execution.artifacts + else {} + ) + model_urls = {m.uri for m in draft_models if m.uri and m.id not in in_use_model_ids} deleted_task_id = f"{deleted_prefix}{task.id}" updated_children = 0 diff --git a/apiserver/services/events.py b/apiserver/services/events.py index b52b89a..fadc579 100644 --- a/apiserver/services/events.py +++ b/apiserver/services/events.py @@ -1033,7 +1033,7 @@ def delete_for_task(call, company_id, request: TaskRequest): ) call.result.data = dict( - deleted=event_bll.delete_task_events(company_id, task_id) + deleted=event_bll.delete_task_events(company_id, task_id, wait_for_delete=True) ) @@ -1060,7 +1060,7 @@ def delete_for_model(call: APICall, company_id: str, request: ModelRequest): call.result.data = dict( deleted=event_bll.delete_task_events( - company_id, model_id, model=True + company_id, model_id, model=True, wait_for_delete=True ) ) diff --git a/apiserver/services/models.py b/apiserver/services/models.py index 77fa50d..af7b63a 100644 --- a/apiserver/services/models.py +++ b/apiserver/services/models.py @@ -569,7 +569,10 @@ def _delete_model_events( user_id: str, models: Sequence[Model], delete_external_artifacts: bool, + sync_delete: bool, ): + if not models: + return model_ids = [m.id for m in models] delete_external_artifacts = delete_external_artifacts and config.get( "services.async_urls_delete.enabled", True @@ -587,7 +590,7 @@ def _delete_model_events( ) event_urls = delete_task_events_and_collect_urls( - company=company_id, task_ids=model_ids, model=True + company=company_id, task_ids=model_ids, model=True, wait_for_delete=sync_delete ) if event_urls: schedule_for_delete( @@ -598,7 +601,7 @@ def _delete_model_events( can_delete_folders=False, ) - event_bll.delete_task_events(company_id, model_ids, model=True) + event_bll.delete_task_events(company_id, model_ids, model=True, wait_for_delete=sync_delete) @endpoint("models.delete", request_data_model=DeleteModelRequest) @@ -616,6 +619,7 @@ def delete(call: APICall, company_id, request: DeleteModelRequest): user_id=user_id, models=[model], delete_external_artifacts=request.delete_external_artifacts, + sync_delete=True, ) _reset_cached_tags( company_id, projects=[model.project] if model.project else [] @@ -629,7 +633,7 @@ def delete(call: APICall, company_id, request: DeleteModelRequest): request_data_model=ModelsDeleteManyRequest, response_data_model=BatchResponse, ) -def delete(call: APICall, company_id, request: ModelsDeleteManyRequest): +def delete_many(call: APICall, company_id, request: ModelsDeleteManyRequest): user_id = call.identity.user results, failures = run_batch_operation( @@ -654,6 +658,7 @@ def delete(call: APICall, company_id, request: ModelsDeleteManyRequest): user_id=user_id, models=deleted_models, delete_external_artifacts=request.delete_external_artifacts, + sync_delete=False, ) projects = set(model.project for model in deleted_models) _reset_cached_tags(company_id, projects=list(projects)) diff --git a/apiserver/services/pipelines.py b/apiserver/services/pipelines.py index 1927559..6efd10b 100644 --- a/apiserver/services/pipelines.py +++ b/apiserver/services/pipelines.py @@ -84,7 +84,8 @@ def delete_runs(call: APICall, company_id: str, request: DeleteRunsRequest): company_id=company_id, user_id=call.identity.user, tasks=tasks, - delete_external_artifacts=True + delete_external_artifacts=True, + sync_delete=True, ) call.result.data = dict(succeeded=succeeded, failed=failures) diff --git a/apiserver/services/tasks.py b/apiserver/services/tasks.py index 4b36ebb..90a07ad 100644 --- a/apiserver/services/tasks.py +++ b/apiserver/services/tasks.py @@ -1031,7 +1031,10 @@ def _delete_task_events( user_id: str, tasks: Mapping[str, CleanupResult], delete_external_artifacts: bool, + sync_delete: bool, ): + if not tasks: + return task_ids = list(tasks) deleted_model_ids = set( itertools.chain.from_iterable( @@ -1057,7 +1060,9 @@ def _delete_task_events( ) event_urls = delete_task_events_and_collect_urls( - company=company_id, task_ids=task_ids + company=company_id, + task_ids=task_ids, + wait_for_delete=sync_delete, ) if deleted_model_ids: event_urls.update( @@ -1065,6 +1070,7 @@ def _delete_task_events( company=company_id, task_ids=list(deleted_model_ids), model=True, + wait_for_delete=sync_delete, ) ) @@ -1077,10 +1083,13 @@ def _delete_task_events( can_delete_folders=False, ) else: - event_bll.delete_task_events(company_id, task_ids) + event_bll.delete_task_events(company_id, task_ids, wait_for_delete=sync_delete) if deleted_model_ids: event_bll.delete_task_events( - company_id, list(deleted_model_ids), model=True + company_id, + list(deleted_model_ids), + model=True, + wait_for_delete=sync_delete, ) @@ -1102,6 +1111,7 @@ def reset(call: APICall, company_id, request: ResetRequest): user_id=call.identity.user, tasks={task_id: cleanup_res}, delete_external_artifacts=request.delete_external_artifacts, + sync_delete=True, ) res = ResetResponse( **updates, @@ -1147,6 +1157,7 @@ def reset_many(call: APICall, company_id, request: ResetManyRequest): user_id=call.identity.user, tasks=tasks, delete_external_artifacts=request.delete_external_artifacts, + sync_delete=False, ) call.result.data_model = ResetManyResponse( @@ -1255,6 +1266,7 @@ def delete(call: APICall, company_id, request: DeleteRequest): user_id=call.identity.user, tasks={request.task: cleanup_res}, delete_external_artifacts=request.delete_external_artifacts, + sync_delete=True, ) _reset_cached_tags(company_id, projects=[task.project] if task.project else []) call.result.data = dict( @@ -1301,6 +1313,7 @@ def delete_many(call: APICall, company_id, request: DeleteManyRequest): user_id=call.identity.user, tasks=tasks, delete_external_artifacts=request.delete_external_artifacts, + sync_delete=False, ) _reset_cached_tags(company_id, projects=list(projects))