Better handling of invalid iterations on add_batch

This commit is contained in:
allegroai 2021-05-03 18:05:24 +03:00
parent 9f0c9c3690
commit 3d73d60826

View File

@ -10,6 +10,7 @@ from typing import Sequence, Set, Tuple, Optional, Dict
import six import six
from elasticsearch import helpers from elasticsearch import helpers
from elasticsearch.helpers import BulkIndexError
from mongoengine import Q from mongoengine import Q
from nested_dict import nested_dict from nested_dict import nested_dict
@ -42,6 +43,8 @@ from apiserver.utilities.json import loads
# noinspection PyTypeChecker # noinspection PyTypeChecker
EVENT_TYPES: Set[str] = set(map(attrgetter("value"), EventType)) EVENT_TYPES: Set[str] = set(map(attrgetter("value"), EventType))
LOCKED_TASK_STATUSES = (TaskStatus.publishing, TaskStatus.published) LOCKED_TASK_STATUSES = (TaskStatus.publishing, TaskStatus.published)
MAX_LONG = 2**63 - 1
MIN_LONG = -2**63
class PlotFields: class PlotFields:
@ -101,6 +104,7 @@ class EventBLL(object):
3, dict 3, dict
) # task_id -> metric_hash -> event_type -> MetricEvent ) # task_id -> metric_hash -> event_type -> MetricEvent
errors_per_type = defaultdict(int) errors_per_type = defaultdict(int)
invalid_iteration_error = f"Iteration number should not exceed {MAX_LONG}"
valid_tasks = self._get_valid_tasks( valid_tasks = self._get_valid_tasks(
company_id, company_id,
task_ids={ task_ids={
@ -150,6 +154,9 @@ class EventBLL(object):
iter = event.get("iter") iter = event.get("iter")
if iter is not None: if iter is not None:
iter = int(iter) iter = int(iter)
if iter > MAX_LONG or iter < MIN_LONG:
errors_per_type[invalid_iteration_error] += 1
continue
event["iter"] = iter event["iter"] = iter
# used to have "values" to indicate array. no need anymore # used to have "values" to indicate array. no need anymore
@ -206,47 +213,55 @@ class EventBLL(object):
) )
added = 0 added = 0
if actions: with translate_errors_context():
chunk_size = 500 if actions:
with translate_errors_context(), TimingContext("es", "events_add_batch"): chunk_size = 500
# TODO: replace it with helpers.parallel_bulk in the future once the parallel pool leak is fixed with TimingContext("es", "events_add_batch"):
with closing( # TODO: replace it with helpers.parallel_bulk in the future once the parallel pool leak is fixed
helpers.streaming_bulk( with closing(
self.es, helpers.streaming_bulk(
actions, self.es,
chunk_size=chunk_size, actions,
# thread_count=8, chunk_size=chunk_size,
refresh=True, # thread_count=8,
) refresh=True,
) as it: )
for success, info in it: ) as it:
if success: for success, info in it:
added += 1 if success:
else: added += 1
errors_per_type["Error when indexing events batch"] += 1 else:
errors_per_type["Error when indexing events batch"] += 1
remaining_tasks = set() remaining_tasks = set()
now = datetime.utcnow() now = datetime.utcnow()
for task_id in task_ids: for task_id in task_ids:
# Update related tasks. For reasons of performance, we prefer to update # Update related tasks. For reasons of performance, we prefer to update
# all of them and not only those who's events were successful # all of them and not only those who's events were successful
updated = self._update_task( updated = self._update_task(
company_id=company_id, company_id=company_id,
task_id=task_id, task_id=task_id,
now=now, now=now,
iter_max=task_iteration.get(task_id), iter_max=task_iteration.get(task_id),
last_scalar_events=task_last_scalar_events.get(task_id), last_scalar_events=task_last_scalar_events.get(task_id),
last_events=task_last_events.get(task_id), last_events=task_last_events.get(task_id),
) )
if not updated: if not updated:
remaining_tasks.add(task_id) remaining_tasks.add(task_id)
continue continue
if remaining_tasks: if remaining_tasks:
TaskBLL.set_last_update( TaskBLL.set_last_update(
remaining_tasks, company_id, last_update=now remaining_tasks, company_id, last_update=now
) )
# this is for backwards compatibility with streaming bulk throwing exception on those
invalid_iterations_count = errors_per_type.get(invalid_iteration_error)
if invalid_iterations_count:
raise BulkIndexError(
f"{invalid_iterations_count} document(s) failed to index.", [invalid_iteration_error]
)
if not added: if not added:
raise errors.bad_request.EventsNotAdded(**errors_per_type) raise errors.bad_request.EventsNotAdded(**errors_per_type)