Support optional async events deletion when deleting tasks

This commit is contained in:
allegroai 2022-11-29 17:38:41 +02:00
parent 2d711e1500
commit d397d2ae20
4 changed files with 40 additions and 13 deletions

View File

@ -1084,7 +1084,9 @@ class EventBLL(object):
extra_msg, company=company_id, id=task_id extra_msg, company=company_id, id=task_id
) )
def delete_task_events(self, company_id, task_id, allow_locked=False, model=False): def delete_task_events(
self, company_id, task_id, allow_locked=False, model=False, async_delete=False,
):
if model: if model:
self._validate_model_state( self._validate_model_state(
company_id=company_id, model_id=task_id, allow_locked=allow_locked, company_id=company_id, model_id=task_id, allow_locked=allow_locked,
@ -1101,10 +1103,15 @@ class EventBLL(object):
company_id=company_id, company_id=company_id,
event_type=EventType.all, event_type=EventType.all,
body=es_req, body=es_req,
refresh=True, **(
{"wait_for_completion": False}
if async_delete
else {"refresh": True}
),
) )
return es_res.get("deleted", 0) if not async_delete:
return es_res.get("deleted", 0)
def clear_task_log( def clear_task_log(
self, self,
@ -1149,7 +1156,9 @@ class EventBLL(object):
) )
return es_res.get("deleted", 0) return es_res.get("deleted", 0)
def delete_multi_task_events(self, company_id: str, task_ids: Sequence[str]): def delete_multi_task_events(
self, company_id: str, task_ids: Sequence[str], async_delete=False
):
""" """
Delete mutliple task events. No check is done for tasks write access Delete mutliple task events. No check is done for tasks write access
so it should be checked by the calling code so it should be checked by the calling code
@ -1161,10 +1170,15 @@ class EventBLL(object):
company_id=company_id, company_id=company_id,
event_type=EventType.all, event_type=EventType.all,
body=es_req, body=es_req,
refresh=True, **(
{"wait_for_completion": False}
if async_delete
else {"refresh": True}
),
) )
return es_res.get("deleted", 0) if not async_delete:
return es_res.get("deleted", 0)
def clear_scroll(self, scroll_id: str): def clear_scroll(self, scroll_id: str):
if scroll_id == self.empty_scroll: if scroll_id == self.empty_scroll:

View File

@ -19,6 +19,7 @@ from .sub_projects import _ids_with_children
log = config.logger(__file__) log = config.logger(__file__)
event_bll = EventBLL() event_bll = EventBLL()
async_events_delete = config.get("services.tasks.async_events_delete", False)
@attr.s(auto_attribs=True) @attr.s(auto_attribs=True)
@ -162,7 +163,9 @@ def _delete_tasks(company: str, projects: Sequence[str]) -> Tuple[int, Set, Set]
} }
) )
event_bll.delete_multi_task_events(company, list(task_ids)) event_bll.delete_multi_task_events(
company, list(task_ids), async_delete=async_events_delete
)
deleted = tasks.delete() deleted = tasks.delete()
return deleted, event_urls, artifact_urls return deleted, event_urls, artifact_urls
@ -210,6 +213,8 @@ def _delete_models(
if m.uri: if m.uri:
model_urls.add(m.uri) model_urls.add(m.uri)
event_bll.delete_multi_task_events(company, model_ids) event_bll.delete_multi_task_events(
company, model_ids, async_delete=async_events_delete
)
deleted = models.delete() deleted = models.delete()
return deleted, event_urls, model_urls return deleted, event_urls, model_urls

View File

@ -25,6 +25,7 @@ from apiserver.database.utils import id as db_id
log = config.logger(__file__) log = config.logger(__file__)
event_bll = EventBLL() event_bll = EventBLL()
async_events_delete = config.get("services.tasks.async_events_delete", False)
@attr.s(auto_attribs=True) @attr.s(auto_attribs=True)
@ -218,12 +219,14 @@ def cleanup_task(
event_urls.update(collect_plot_image_urls(task.company, m_id)) event_urls.update(collect_plot_image_urls(task.company, m_id))
try: try:
event_bll.delete_task_events( event_bll.delete_task_events(
task.company, m_id, allow_locked=True, model=True task.company,
m_id,
allow_locked=True,
model=True,
async_delete=async_events_delete,
) )
except errors.bad_request.InvalidModelId as ex: except errors.bad_request.InvalidModelId as ex:
log.info( log.info(f"Error deleting events for the model {m_id}: {str(ex)}")
f"Error deleting events for the model {m_id}: {str(ex)}"
)
deleted_models += Model.objects(id__in=list(model_ids)).delete() deleted_models += Model.objects(id__in=list(model_ids)).delete()
if in_use_model_ids: if in_use_model_ids:
@ -237,7 +240,9 @@ def cleanup_task(
else: else:
Model.objects(id__in=[m.id for m in models]).update(unset__task=1) Model.objects(id__in=[m.id for m in models]).update(unset__task=1)
event_bll.delete_task_events(task.company, task.id, allow_locked=force) event_bll.delete_task_events(
task.company, task.id, allow_locked=force, async_delete=async_events_delete
)
if delete_external_artifacts: if delete_external_artifacts:
scheduled = _schedule_for_delete( scheduled = _schedule_for_delete(

View File

@ -24,3 +24,6 @@ hyperparam_values {
# the maximum amount of unique last metrics/variants combinations # the maximum amount of unique last metrics/variants combinations
# for which the last values are stored in a task # for which the last values are stored in a task
max_last_metrics: 2000 max_last_metrics: 2000
# if set then call to tasks.delete/cleanup does not wait for ES events deletion
async_events_delete: false