Fix uploading same metric, variant from multiple process in threading mode from multiple process should create a unique file per process (since global counter is Not passed between the subprocesses)

This commit is contained in:
allegroai 2022-01-08 22:56:45 +02:00
parent ccc8e83c58
commit 0191493864

View File

@ -1,5 +1,6 @@
import abc
import hashlib
import os
import time
from functools import reduce
from logging import getLevelName
@ -227,26 +228,41 @@ class UploadEvent(MetricsEventAdapter):
self._local_image_path = local_image_path
self._url = None
self._key = None
self._count = self._get_metric_count(metric, variant)
if not file_history_size:
file_history_size = int(self._file_history_size)
self._filename = kwargs.pop('override_filename', None)
self._count = None
self._filename = None
self.file_history_size = file_history_size or int(self._file_history_size)
self._override_filename = kwargs.pop('override_filename', None)
self._upload_uri = upload_uri
self._delete_after_upload = delete_after_upload
# get upload uri upfront, either predefined image format or local file extension
# e.g.: image.png -> .png or image.raw.gz -> .raw.gz
self._override_filename_ext = kwargs.pop('override_filename_ext', None)
self._upload_filename = None
self._override_storage_key_prefix = kwargs.pop('override_storage_key_prefix', None)
self.retries = self._upload_retries
super(UploadEvent, self).__init__(metric, variant, iter=iter, **kwargs)
def _generate_file_name(self, force_pid_suffix=None):
if force_pid_suffix is None and self._filename is not None:
return
self._count = self._get_metric_count(self._metric, self._variant)
self._filename = self._override_filename
if not self._filename:
if file_history_size < 1:
self._filename = '%s_%s_%08d' % (metric, variant, self._count)
else:
self._filename = '%s_%s_%08d' % (metric, variant, self._count % file_history_size)
self._filename = '{}_{}'.format(self._metric, self._variant)
cnt = self._count if self.file_history_size < 1 else (self._count % self.file_history_size)
self._filename += '_{:05x}{:03d}'.format(force_pid_suffix, cnt) \
if force_pid_suffix else '_{:08d}'.format(cnt)
# make sure we have to '/' in the filename because it might access other folders,
# and we don't want that to occur
self._filename = self._replace_slash(self._filename)
self._upload_uri = upload_uri
self._delete_after_upload = delete_after_upload
# get upload uri upfront, either predefined image format or local file extension
# e.g.: image.png -> .png or image.raw.gz -> .raw.gz
filename_ext = kwargs.pop('override_filename_ext', None)
filename_ext = self._override_filename_ext
if filename_ext is None:
filename_ext = str(self._format).lower() if self._image_data is not None else \
'.' + '.'.join(pathlib2.Path(self._local_image_path).parts[-1].split('.')[1:])
@ -257,10 +273,6 @@ class UploadEvent(MetricsEventAdapter):
if self._filename.rpartition(".")[2] != filename_ext.rpartition(".")[2]:
self._upload_filename += filename_ext
self._override_storage_key_prefix = kwargs.pop('override_storage_key_prefix', None)
self.retries = self._upload_retries
super(UploadEvent, self).__init__(metric, variant, iter=iter, **kwargs)
@classmethod
def _get_metric_count(cls, metric, variant, next=True):
""" Returns the next count number for the given metric/variant (rotates every few calls) """
@ -287,6 +299,8 @@ class UploadEvent(MetricsEventAdapter):
self._key = key
def get_file_entry(self):
self._generate_file_name()
local_file = None
# Notice that in case we are running with reporter in subprocess,
@ -359,6 +373,7 @@ class UploadEvent(MetricsEventAdapter):
return new_path
return hashlib.md5(str(folder_path).encode('utf-8')).hexdigest()
self._generate_file_name()
e_storage_uri = self._upload_uri or storage_uri
# if we have an entry (with or without a stream), we'll generate the URL and store it in the event
filename = self._upload_filename