From 01914938647189fa23122b399b003d353e0933aa Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Sat, 8 Jan 2022 22:56:45 +0200 Subject: [PATCH] Fix uploading same metric, variant from multiple process in threading mode from multiple process should create a unique file per process (since global counter is Not passed between the subprocesses) --- clearml/backend_interface/metrics/events.py | 47 ++++++++++++++------- 1 file changed, 31 insertions(+), 16 deletions(-) diff --git a/clearml/backend_interface/metrics/events.py b/clearml/backend_interface/metrics/events.py index 4ac3ec96..c6b262a7 100644 --- a/clearml/backend_interface/metrics/events.py +++ b/clearml/backend_interface/metrics/events.py @@ -1,5 +1,6 @@ import abc import hashlib +import os import time from functools import reduce from logging import getLevelName @@ -227,26 +228,41 @@ class UploadEvent(MetricsEventAdapter): self._local_image_path = local_image_path self._url = None self._key = None - self._count = self._get_metric_count(metric, variant) - if not file_history_size: - file_history_size = int(self._file_history_size) - self._filename = kwargs.pop('override_filename', None) + self._count = None + self._filename = None + self.file_history_size = file_history_size or int(self._file_history_size) + self._override_filename = kwargs.pop('override_filename', None) + self._upload_uri = upload_uri + self._delete_after_upload = delete_after_upload + # get upload uri upfront, either predefined image format or local file extension + # e.g.: image.png -> .png or image.raw.gz -> .raw.gz + self._override_filename_ext = kwargs.pop('override_filename_ext', None) + self._upload_filename = None + + self._override_storage_key_prefix = kwargs.pop('override_storage_key_prefix', None) + self.retries = self._upload_retries + super(UploadEvent, self).__init__(metric, variant, iter=iter, **kwargs) + + def _generate_file_name(self, force_pid_suffix=None): + if force_pid_suffix is None and self._filename is not None: + return + + self._count = self._get_metric_count(self._metric, self._variant) + + self._filename = self._override_filename if not self._filename: - if file_history_size < 1: - self._filename = '%s_%s_%08d' % (metric, variant, self._count) - else: - self._filename = '%s_%s_%08d' % (metric, variant, self._count % file_history_size) + self._filename = '{}_{}'.format(self._metric, self._variant) + cnt = self._count if self.file_history_size < 1 else (self._count % self.file_history_size) + self._filename += '_{:05x}{:03d}'.format(force_pid_suffix, cnt) \ + if force_pid_suffix else '_{:08d}'.format(cnt) # make sure we have to '/' in the filename because it might access other folders, # and we don't want that to occur self._filename = self._replace_slash(self._filename) - self._upload_uri = upload_uri - self._delete_after_upload = delete_after_upload - # get upload uri upfront, either predefined image format or local file extension # e.g.: image.png -> .png or image.raw.gz -> .raw.gz - filename_ext = kwargs.pop('override_filename_ext', None) + filename_ext = self._override_filename_ext if filename_ext is None: filename_ext = str(self._format).lower() if self._image_data is not None else \ '.' + '.'.join(pathlib2.Path(self._local_image_path).parts[-1].split('.')[1:]) @@ -257,10 +273,6 @@ class UploadEvent(MetricsEventAdapter): if self._filename.rpartition(".")[2] != filename_ext.rpartition(".")[2]: self._upload_filename += filename_ext - self._override_storage_key_prefix = kwargs.pop('override_storage_key_prefix', None) - self.retries = self._upload_retries - super(UploadEvent, self).__init__(metric, variant, iter=iter, **kwargs) - @classmethod def _get_metric_count(cls, metric, variant, next=True): """ Returns the next count number for the given metric/variant (rotates every few calls) """ @@ -287,6 +299,8 @@ class UploadEvent(MetricsEventAdapter): self._key = key def get_file_entry(self): + self._generate_file_name() + local_file = None # Notice that in case we are running with reporter in subprocess, @@ -359,6 +373,7 @@ class UploadEvent(MetricsEventAdapter): return new_path return hashlib.md5(str(folder_path).encode('utf-8')).hexdigest() + self._generate_file_name() e_storage_uri = self._upload_uri or storage_uri # if we have an entry (with or without a stream), we'll generate the URL and store it in the event filename = self._upload_filename