Single task/model delete waits for events deletion in order to mitigate too many ES open scrolls due to repeated calls

This commit is contained in:
clearml 2024-12-05 19:13:06 +02:00
parent f9577f9faa
commit 4223fe73d1
7 changed files with 58 additions and 27 deletions

View File

@ -660,7 +660,9 @@ class EventBLL(object):
Release the scroll once it is exhausted Release the scroll once it is exhausted
""" """
total_events = nested_get(es_res, ("hits", "total", "value"), default=0) total_events = nested_get(es_res, ("hits", "total", "value"), default=0)
events = [doc["_source"] for doc in nested_get(es_res, ("hits", "hits"), default=[])] events = [
doc["_source"] for doc in nested_get(es_res, ("hits", "hits"), default=[])
]
next_scroll_id = es_res.get("_scroll_id") next_scroll_id = es_res.get("_scroll_id")
if next_scroll_id and not events: if next_scroll_id and not events:
self.clear_scroll(next_scroll_id) self.clear_scroll(next_scroll_id)
@ -1161,7 +1163,13 @@ class EventBLL(object):
return {"refresh": True} return {"refresh": True}
def delete_task_events(self, company_id, task_ids: Union[str, Sequence[str]], model=False): def delete_task_events(
self,
company_id,
task_ids: Union[str, Sequence[str]],
wait_for_delete: bool,
model=False,
):
""" """
Delete task events. No check is done for tasks write access Delete task events. No check is done for tasks write access
so it should be checked by the calling code so it should be checked by the calling code
@ -1170,7 +1178,7 @@ class EventBLL(object):
task_ids = [task_ids] task_ids = [task_ids]
deleted = 0 deleted = 0
with translate_errors_context(): with translate_errors_context():
async_delete = async_task_events_delete async_delete = async_task_events_delete and not wait_for_delete
if async_delete and len(task_ids) < 100: if async_delete and len(task_ids) < 100:
total = self.events_iterator.count_task_events( total = self.events_iterator.count_task_events(
event_type=EventType.all, event_type=EventType.all,

View File

@ -253,7 +253,7 @@ def _delete_tasks(
) )
event_urls = delete_task_events_and_collect_urls( event_urls = delete_task_events_and_collect_urls(
company=company, task_ids=task_ids company=company, task_ids=task_ids, wait_for_delete=False
) )
deleted = tasks.delete() deleted = tasks.delete()
@ -317,7 +317,7 @@ def _delete_models(
model_urls = {m.uri for m in models if m.uri} model_urls = {m.uri for m in models if m.uri}
event_urls = delete_task_events_and_collect_urls( event_urls = delete_task_events_and_collect_urls(
company=company, task_ids=model_ids, model=True company=company, task_ids=model_ids, model=True, wait_for_delete=False
) )
deleted = models.delete() deleted = models.delete()

View File

@ -79,7 +79,7 @@ class CleanupResult:
updated_models=self.updated_models + other.updated_models, updated_models=self.updated_models + other.updated_models,
deleted_models=self.deleted_models + other.deleted_models, deleted_models=self.deleted_models + other.deleted_models,
urls=self.urls + other.urls if self.urls else other.urls, urls=self.urls + other.urls if self.urls else other.urls,
deleted_model_ids=self.deleted_model_ids | other.deleted_model_ids deleted_model_ids=self.deleted_model_ids | other.deleted_model_ids,
) )
@staticmethod @staticmethod
@ -222,13 +222,15 @@ def schedule_for_delete(
def delete_task_events_and_collect_urls( def delete_task_events_and_collect_urls(
company: str, task_ids: Sequence[str], model=False company: str, task_ids: Sequence[str], wait_for_delete: bool, model=False
) -> Set[str]: ) -> Set[str]:
event_urls = collect_debug_image_urls( event_urls = collect_debug_image_urls(company, task_ids) | collect_plot_image_urls(
company, task_ids company, task_ids
) | collect_plot_image_urls(company, task_ids) )
event_bll.delete_task_events(company, task_ids, model=model) event_bll.delete_task_events(
company, task_ids, model=model, wait_for_delete=wait_for_delete
)
return event_urls return event_urls
@ -250,14 +252,16 @@ def cleanup_task(
published_models, draft_models, in_use_model_ids = verify_task_children_and_ouptuts( published_models, draft_models, in_use_model_ids = verify_task_children_and_ouptuts(
task, force task, force
) )
artifact_urls = { artifact_urls = (
{
a.uri a.uri
for a in task.execution.artifacts.values() for a in task.execution.artifacts.values()
if a.mode == ArtifactModes.output and a.uri if a.mode == ArtifactModes.output and a.uri
} if task.execution and task.execution.artifacts else {}
model_urls = {
m.uri for m in draft_models if m.uri and m.id not in in_use_model_ids
} }
if task.execution and task.execution.artifacts
else {}
)
model_urls = {m.uri for m in draft_models if m.uri and m.id not in in_use_model_ids}
deleted_task_id = f"{deleted_prefix}{task.id}" deleted_task_id = f"{deleted_prefix}{task.id}"
updated_children = 0 updated_children = 0

View File

@ -1033,7 +1033,7 @@ def delete_for_task(call, company_id, request: TaskRequest):
) )
call.result.data = dict( call.result.data = dict(
deleted=event_bll.delete_task_events(company_id, task_id) deleted=event_bll.delete_task_events(company_id, task_id, wait_for_delete=True)
) )
@ -1060,7 +1060,7 @@ def delete_for_model(call: APICall, company_id: str, request: ModelRequest):
call.result.data = dict( call.result.data = dict(
deleted=event_bll.delete_task_events( deleted=event_bll.delete_task_events(
company_id, model_id, model=True company_id, model_id, model=True, wait_for_delete=True
) )
) )

View File

@ -569,7 +569,10 @@ def _delete_model_events(
user_id: str, user_id: str,
models: Sequence[Model], models: Sequence[Model],
delete_external_artifacts: bool, delete_external_artifacts: bool,
sync_delete: bool,
): ):
if not models:
return
model_ids = [m.id for m in models] model_ids = [m.id for m in models]
delete_external_artifacts = delete_external_artifacts and config.get( delete_external_artifacts = delete_external_artifacts and config.get(
"services.async_urls_delete.enabled", True "services.async_urls_delete.enabled", True
@ -587,7 +590,7 @@ def _delete_model_events(
) )
event_urls = delete_task_events_and_collect_urls( event_urls = delete_task_events_and_collect_urls(
company=company_id, task_ids=model_ids, model=True company=company_id, task_ids=model_ids, model=True, wait_for_delete=sync_delete
) )
if event_urls: if event_urls:
schedule_for_delete( schedule_for_delete(
@ -598,7 +601,7 @@ def _delete_model_events(
can_delete_folders=False, can_delete_folders=False,
) )
event_bll.delete_task_events(company_id, model_ids, model=True) event_bll.delete_task_events(company_id, model_ids, model=True, wait_for_delete=sync_delete)
@endpoint("models.delete", request_data_model=DeleteModelRequest) @endpoint("models.delete", request_data_model=DeleteModelRequest)
@ -616,6 +619,7 @@ def delete(call: APICall, company_id, request: DeleteModelRequest):
user_id=user_id, user_id=user_id,
models=[model], models=[model],
delete_external_artifacts=request.delete_external_artifacts, delete_external_artifacts=request.delete_external_artifacts,
sync_delete=True,
) )
_reset_cached_tags( _reset_cached_tags(
company_id, projects=[model.project] if model.project else [] company_id, projects=[model.project] if model.project else []
@ -629,7 +633,7 @@ def delete(call: APICall, company_id, request: DeleteModelRequest):
request_data_model=ModelsDeleteManyRequest, request_data_model=ModelsDeleteManyRequest,
response_data_model=BatchResponse, response_data_model=BatchResponse,
) )
def delete(call: APICall, company_id, request: ModelsDeleteManyRequest): def delete_many(call: APICall, company_id, request: ModelsDeleteManyRequest):
user_id = call.identity.user user_id = call.identity.user
results, failures = run_batch_operation( results, failures = run_batch_operation(
@ -654,6 +658,7 @@ def delete(call: APICall, company_id, request: ModelsDeleteManyRequest):
user_id=user_id, user_id=user_id,
models=deleted_models, models=deleted_models,
delete_external_artifacts=request.delete_external_artifacts, delete_external_artifacts=request.delete_external_artifacts,
sync_delete=False,
) )
projects = set(model.project for model in deleted_models) projects = set(model.project for model in deleted_models)
_reset_cached_tags(company_id, projects=list(projects)) _reset_cached_tags(company_id, projects=list(projects))

View File

@ -84,7 +84,8 @@ def delete_runs(call: APICall, company_id: str, request: DeleteRunsRequest):
company_id=company_id, company_id=company_id,
user_id=call.identity.user, user_id=call.identity.user,
tasks=tasks, tasks=tasks,
delete_external_artifacts=True delete_external_artifacts=True,
sync_delete=True,
) )
call.result.data = dict(succeeded=succeeded, failed=failures) call.result.data = dict(succeeded=succeeded, failed=failures)

View File

@ -1031,7 +1031,10 @@ def _delete_task_events(
user_id: str, user_id: str,
tasks: Mapping[str, CleanupResult], tasks: Mapping[str, CleanupResult],
delete_external_artifacts: bool, delete_external_artifacts: bool,
sync_delete: bool,
): ):
if not tasks:
return
task_ids = list(tasks) task_ids = list(tasks)
deleted_model_ids = set( deleted_model_ids = set(
itertools.chain.from_iterable( itertools.chain.from_iterable(
@ -1057,7 +1060,9 @@ def _delete_task_events(
) )
event_urls = delete_task_events_and_collect_urls( event_urls = delete_task_events_and_collect_urls(
company=company_id, task_ids=task_ids company=company_id,
task_ids=task_ids,
wait_for_delete=sync_delete,
) )
if deleted_model_ids: if deleted_model_ids:
event_urls.update( event_urls.update(
@ -1065,6 +1070,7 @@ def _delete_task_events(
company=company_id, company=company_id,
task_ids=list(deleted_model_ids), task_ids=list(deleted_model_ids),
model=True, model=True,
wait_for_delete=sync_delete,
) )
) )
@ -1077,10 +1083,13 @@ def _delete_task_events(
can_delete_folders=False, can_delete_folders=False,
) )
else: else:
event_bll.delete_task_events(company_id, task_ids) event_bll.delete_task_events(company_id, task_ids, wait_for_delete=sync_delete)
if deleted_model_ids: if deleted_model_ids:
event_bll.delete_task_events( event_bll.delete_task_events(
company_id, list(deleted_model_ids), model=True company_id,
list(deleted_model_ids),
model=True,
wait_for_delete=sync_delete,
) )
@ -1102,6 +1111,7 @@ def reset(call: APICall, company_id, request: ResetRequest):
user_id=call.identity.user, user_id=call.identity.user,
tasks={task_id: cleanup_res}, tasks={task_id: cleanup_res},
delete_external_artifacts=request.delete_external_artifacts, delete_external_artifacts=request.delete_external_artifacts,
sync_delete=True,
) )
res = ResetResponse( res = ResetResponse(
**updates, **updates,
@ -1147,6 +1157,7 @@ def reset_many(call: APICall, company_id, request: ResetManyRequest):
user_id=call.identity.user, user_id=call.identity.user,
tasks=tasks, tasks=tasks,
delete_external_artifacts=request.delete_external_artifacts, delete_external_artifacts=request.delete_external_artifacts,
sync_delete=False,
) )
call.result.data_model = ResetManyResponse( call.result.data_model = ResetManyResponse(
@ -1255,6 +1266,7 @@ def delete(call: APICall, company_id, request: DeleteRequest):
user_id=call.identity.user, user_id=call.identity.user,
tasks={request.task: cleanup_res}, tasks={request.task: cleanup_res},
delete_external_artifacts=request.delete_external_artifacts, delete_external_artifacts=request.delete_external_artifacts,
sync_delete=True,
) )
_reset_cached_tags(company_id, projects=[task.project] if task.project else []) _reset_cached_tags(company_id, projects=[task.project] if task.project else [])
call.result.data = dict( call.result.data = dict(
@ -1301,6 +1313,7 @@ def delete_many(call: APICall, company_id, request: DeleteManyRequest):
user_id=call.identity.user, user_id=call.identity.user,
tasks=tasks, tasks=tasks,
delete_external_artifacts=request.delete_external_artifacts, delete_external_artifacts=request.delete_external_artifacts,
sync_delete=False,
) )
_reset_cached_tags(company_id, projects=list(projects)) _reset_cached_tags(company_id, projects=list(projects))