Fix unique_selector is not applied properly on batches after the first batch. Remove default selector value since it does not work for all event types (and we always specify it anyway)

This commit is contained in:
allegroai 2022-10-30 19:24:19 +02:00
parent ff7b174bf1
commit 9c171d8a5d

View File

@ -2458,7 +2458,7 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
return {p.id: p.name for p in all_responses} return {p.id: p.name for p in all_responses}
def _get_all_events( def _get_all_events(
self, max_events=100, batch_size=500, order='asc', event_type=None, unique_selector=itemgetter("url") self, max_events=100, batch_size=500, order='asc', event_type=None, unique_selector=None
): ):
# type: (int, int, str, str, Callable[[dict], Any]) -> Union[List[Any], Set[Any]] # type: (int, int, str, str, Callable[[dict], Any]) -> Union[List[Any], Set[Any]]
""" """
@ -2479,6 +2479,16 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
""" """
batch_size = max_events or batch_size batch_size = max_events or batch_size
def apply_unique_selector(events_set, evs):
# type: (Set[Any], List[dict]) -> ()
try:
events_set.update(map(unique_selector, evs))
except TypeError:
self.log.error(
"Failed applying unique_selector on events (note the selector's result must be hashable)"
)
raise
log_events = self.send(events.GetTaskEventsRequest( log_events = self.send(events.GetTaskEventsRequest(
task=self.id, task=self.id,
order=order, order=order,
@ -2490,7 +2500,8 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
total_events = log_events.response.total total_events = log_events.response.total
scroll = log_events.response.scroll_id scroll = log_events.response.scroll_id
if unique_selector: if unique_selector:
events_list = set(map(unique_selector, log_events.response.events)) events_list = set([])
apply_unique_selector(events_list, log_events.response.events)
else: else:
events_list = log_events.response.events events_list = log_events.response.events
@ -2505,7 +2516,7 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
scroll = log_events.response.scroll_id scroll = log_events.response.scroll_id
returned_count += log_events.response.returned returned_count += log_events.response.returned
if unique_selector: if unique_selector:
events_list.update(log_events.response.events) apply_unique_selector(events_list, log_events.response.events)
else: else:
events_list.extend(log_events.response.events) events_list.extend(log_events.response.events)