Turn on async task events deletion in case there are more than 100_000 events

This commit is contained in:
allegroai 2023-11-17 09:45:55 +02:00
parent ec14f327c6
commit a7865ccbec
6 changed files with 44 additions and 46 deletions

View File

@ -5,7 +5,6 @@ import zlib
from collections import defaultdict
from contextlib import closing
from datetime import datetime
from operator import attrgetter
from typing import Sequence, Set, Tuple, Optional, List, Mapping, Union
import elasticsearch
@ -24,6 +23,7 @@ from apiserver.bll.event.event_common import (
get_metric_variants_condition,
uncompress_plot,
get_max_metric_and_variant_counts,
PlotFields,
)
from apiserver.bll.event.events_iterator import EventsIterator, TaskEventsResult
from apiserver.bll.event.history_debug_image_iterator import HistoryDebugImageIterator
@ -47,21 +47,15 @@ from apiserver.utilities.dicts import nested_get
from apiserver.utilities.json import loads
# noinspection PyTypeChecker
EVENT_TYPES: Set[str] = set(map(attrgetter("value"), EventType))
EVENT_TYPES: Set[str] = set(et.value for et in EventType if et != EventType.all)
LOCKED_TASK_STATUSES = (TaskStatus.publishing, TaskStatus.published)
MAX_LONG = 2**63 - 1
MIN_LONG = -(2**63)
log = config.logger(__file__)
class PlotFields:
valid_plot = "valid_plot"
plot_len = "plot_len"
plot_str = "plot_str"
plot_data = "plot_data"
source_urls = "source_urls"
async_task_events_delete = config.get("services.tasks.async_events_delete", False)
async_delete_threshold = config.get("services.tasks.async_events_delete_threshold", 100_000)
class EventBLL(object):
@ -333,8 +327,8 @@ class EventBLL(object):
# all of them and not only those who's events were successful
updated = self._update_task(
company_id=company_id,
task_id=task_id,
user_id=user_id,
task_id=task_id,
now=now,
iter_max=task_iteration.get(task_id),
last_scalar_events=task_last_scalar_events.get(task_id),
@ -1173,14 +1167,7 @@ class EventBLL(object):
return {"refresh": True}
def delete_task_events(
self,
company_id,
task_id,
allow_locked=False,
model=False,
async_delete=False,
):
def delete_task_events(self, company_id, task_id, allow_locked=False, model=False):
if model:
self._validate_model_state(
company_id=company_id,
@ -1191,7 +1178,15 @@ class EventBLL(object):
self._validate_task_state(
company_id=company_id, task_id=task_id, allow_locked=allow_locked
)
async_delete = async_task_events_delete
if async_delete:
total = self.events_iterator.count_task_events(
event_type=EventType.all,
company_id=company_id,
task_ids=[task_id],
)
if total <= async_delete_threshold:
async_delete = False
es_req = {"query": {"term": {"task": task_id}}}
with translate_errors_context():
es_res = delete_company_events(
@ -1249,7 +1244,7 @@ class EventBLL(object):
return es_res.get("deleted", 0)
def delete_multi_task_events(
self, company_id: str, task_ids: Sequence[str], async_delete=False
self, company_id: str, task_ids: Sequence[str], model=False
):
"""
Delete multiple task events. No check is done for tasks write access
@ -1257,6 +1252,15 @@ class EventBLL(object):
"""
deleted = 0
with translate_errors_context():
async_delete = async_task_events_delete
if async_delete and len(task_ids) < 100:
total = self.events_iterator.count_task_events(
event_type=EventType.all,
company_id=company_id,
task_ids=task_ids,
)
if total <= async_delete_threshold:
async_delete = False
for tasks in chunked_iter(task_ids, 100):
es_req = {"query": {"terms": {"task": tasks}}}
es_res = delete_company_events(

View File

@ -64,13 +64,13 @@ class EventsIterator:
self,
event_type: EventType,
company_id: str,
task_id: str,
task_ids: Sequence[str],
metric_variants: MetricVariants = None,
) -> int:
if check_empty_data(self.es, company_id, event_type):
return 0
query, _ = self._get_initial_query_and_must(task_id, metric_variants)
query, _ = self._get_initial_query_and_must(task_ids, metric_variants)
es_req = {
"query": query,
}
@ -100,7 +100,7 @@ class EventsIterator:
For the last key-field value all the events are brought (even if the resulting size exceeds batch_size)
so that events with this value will not be lost between the calls.
"""
query, must = self._get_initial_query_and_must(task_id, metric_variants)
query, must = self._get_initial_query_and_must([task_id], metric_variants)
# retrieve the next batch of events
es_req = {
@ -158,14 +158,14 @@ class EventsIterator:
@staticmethod
def _get_initial_query_and_must(
task_id: str, metric_variants: MetricVariants = None
task_ids: Sequence[str], metric_variants: MetricVariants = None
) -> Tuple[dict, list]:
if not metric_variants:
must = [{"term": {"task": task_id}}]
query = {"term": {"task": task_id}}
query = {"terms": {"task": task_ids}}
must = [query]
else:
must = [
{"term": {"task": task_id}},
{"terms": {"task": task_ids}},
get_metric_variants_condition(metric_variants),
]
query = {"bool": {"must": must}}

View File

@ -30,7 +30,6 @@ from .sub_projects import _ids_with_children
log = config.logger(__file__)
event_bll = EventBLL()
async_events_delete = config.get("services.tasks.async_events_delete", False)
@attr.s(auto_attribs=True)
@ -258,9 +257,7 @@ def _delete_tasks(
}
)
event_bll.delete_multi_task_events(
company, task_ids, async_delete=async_events_delete
)
event_bll.delete_multi_task_events(company, task_ids)
deleted = tasks.delete()
return deleted, event_urls, artifact_urls
@ -325,8 +322,6 @@ def _delete_models(
)
model_urls = {m.uri for m in models if m.uri}
event_bll.delete_multi_task_events(
company, model_ids, async_delete=async_events_delete
)
event_bll.delete_multi_task_events(company, model_ids, model=True)
deleted = models.delete()
return deleted, event_urls, model_urls

View File

@ -26,7 +26,6 @@ from apiserver.database.utils import id as db_id
log = config.logger(__file__)
event_bll = EventBLL()
async_events_delete = config.get("services.tasks.async_events_delete", False)
@attr.s(auto_attribs=True)
@ -259,9 +258,9 @@ def cleanup_task(
event_bll.delete_multi_task_events(
task.company,
model_ids,
async_delete=async_events_delete,
model=True,
)
deleted_models += Model.objects(id__in=list(model_ids)).delete()
deleted_models += Model.objects(id__in=model_ids).delete()
if in_use_model_ids:
Model.objects(id__in=list(in_use_model_ids)).update(
@ -284,9 +283,7 @@ def cleanup_task(
set__last_changed_by=user,
)
event_bll.delete_task_events(
task.company, task.id, allow_locked=force, async_delete=async_events_delete
)
event_bll.delete_task_events(task.company, task.id, allow_locked=force)
if delete_external_artifacts:
scheduled = _schedule_for_delete(

View File

@ -23,4 +23,6 @@ hyperparam_values {
max_last_metrics: 2000
# if set then call to tasks.delete/cleanup does not wait for ES events deletion
async_events_delete: false
async_events_delete: true
# do not use async_delete if the deleted task has amount of events lower than this threshold
async_events_delete_threshold: 100000

View File

@ -366,7 +366,7 @@ def get_task_events(_, company_id, request: TaskEventsRequest):
total = event_bll.events_iterator.count_task_events(
event_type=request.event_type,
company_id=task_or_model.get_index_company(),
task_id=task_id,
task_ids=[task_id],
metric_variants=metric_variants,
)
@ -564,8 +564,8 @@ def get_multi_task_plots_v1_7(call, company_id, _):
# Get last 10K events by iteration and group them by unique metric+variant, returning top events for combination
result = event_bll.get_task_events(
list(companies),
task_ids,
company_id=list(companies),
task_id=task_ids,
event_type=EventType.metrics_plot,
sort=[{"iter": {"order": "desc"}}],
size=10000,
@ -1091,7 +1091,7 @@ def scalar_metrics_iter_raw(
total = event_bll.events_iterator.count_task_events(
event_type=EventType.metrics_scalar,
company_id=task_or_model.get_index_company(),
task_id=task_id,
task_ids=[task_id],
metric_variants=metric_variants,
)