From 3d73d60826cf98d31f3af4ce0ec492a34af1493d Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Mon, 3 May 2021 18:05:24 +0300 Subject: [PATCH] Better handling of invalid iterations on add_batch --- apiserver/bll/event/event_bll.py | 91 +++++++++++++++++++------------- 1 file changed, 53 insertions(+), 38 deletions(-) diff --git a/apiserver/bll/event/event_bll.py b/apiserver/bll/event/event_bll.py index b8c5444..365c4de 100644 --- a/apiserver/bll/event/event_bll.py +++ b/apiserver/bll/event/event_bll.py @@ -10,6 +10,7 @@ from typing import Sequence, Set, Tuple, Optional, Dict import six from elasticsearch import helpers +from elasticsearch.helpers import BulkIndexError from mongoengine import Q from nested_dict import nested_dict @@ -42,6 +43,8 @@ from apiserver.utilities.json import loads # noinspection PyTypeChecker EVENT_TYPES: Set[str] = set(map(attrgetter("value"), EventType)) LOCKED_TASK_STATUSES = (TaskStatus.publishing, TaskStatus.published) +MAX_LONG = 2**63 - 1 +MIN_LONG = -2**63 class PlotFields: @@ -101,6 +104,7 @@ class EventBLL(object): 3, dict ) # task_id -> metric_hash -> event_type -> MetricEvent errors_per_type = defaultdict(int) + invalid_iteration_error = f"Iteration number should not exceed {MAX_LONG}" valid_tasks = self._get_valid_tasks( company_id, task_ids={ @@ -150,6 +154,9 @@ class EventBLL(object): iter = event.get("iter") if iter is not None: iter = int(iter) + if iter > MAX_LONG or iter < MIN_LONG: + errors_per_type[invalid_iteration_error] += 1 + continue event["iter"] = iter # used to have "values" to indicate array. no need anymore @@ -206,47 +213,55 @@ class EventBLL(object): ) added = 0 - if actions: - chunk_size = 500 - with translate_errors_context(), TimingContext("es", "events_add_batch"): - # TODO: replace it with helpers.parallel_bulk in the future once the parallel pool leak is fixed - with closing( - helpers.streaming_bulk( - self.es, - actions, - chunk_size=chunk_size, - # thread_count=8, - refresh=True, - ) - ) as it: - for success, info in it: - if success: - added += 1 - else: - errors_per_type["Error when indexing events batch"] += 1 + with translate_errors_context(): + if actions: + chunk_size = 500 + with TimingContext("es", "events_add_batch"): + # TODO: replace it with helpers.parallel_bulk in the future once the parallel pool leak is fixed + with closing( + helpers.streaming_bulk( + self.es, + actions, + chunk_size=chunk_size, + # thread_count=8, + refresh=True, + ) + ) as it: + for success, info in it: + if success: + added += 1 + else: + errors_per_type["Error when indexing events batch"] += 1 - remaining_tasks = set() - now = datetime.utcnow() - for task_id in task_ids: - # Update related tasks. For reasons of performance, we prefer to update - # all of them and not only those who's events were successful - updated = self._update_task( - company_id=company_id, - task_id=task_id, - now=now, - iter_max=task_iteration.get(task_id), - last_scalar_events=task_last_scalar_events.get(task_id), - last_events=task_last_events.get(task_id), - ) + remaining_tasks = set() + now = datetime.utcnow() + for task_id in task_ids: + # Update related tasks. For reasons of performance, we prefer to update + # all of them and not only those who's events were successful + updated = self._update_task( + company_id=company_id, + task_id=task_id, + now=now, + iter_max=task_iteration.get(task_id), + last_scalar_events=task_last_scalar_events.get(task_id), + last_events=task_last_events.get(task_id), + ) - if not updated: - remaining_tasks.add(task_id) - continue + if not updated: + remaining_tasks.add(task_id) + continue - if remaining_tasks: - TaskBLL.set_last_update( - remaining_tasks, company_id, last_update=now - ) + if remaining_tasks: + TaskBLL.set_last_update( + remaining_tasks, company_id, last_update=now + ) + + # this is for backwards compatibility with streaming bulk throwing exception on those + invalid_iterations_count = errors_per_type.get(invalid_iteration_error) + if invalid_iterations_count: + raise BulkIndexError( + f"{invalid_iterations_count} document(s) failed to index.", [invalid_iteration_error] + ) if not added: raise errors.bad_request.EventsNotAdded(**errors_per_type)