From e5c0a738c3f28d446ab48ad86bd8a84ae0f47a1d Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Fri, 4 Oct 2019 01:32:59 +0300 Subject: [PATCH] Fix multi-processing issue with Task of type inference --- trains/binding/environ_bind.py | 2 +- trains/storage/helper.py | 17 +++++++++-------- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/trains/binding/environ_bind.py b/trains/binding/environ_bind.py index b035d609..35678771 100644 --- a/trains/binding/environ_bind.py +++ b/trains/binding/environ_bind.py @@ -63,7 +63,7 @@ class PatchOsFork(object): from ..task import Task if Task.current_task() is not None: # bind sub-process logger - task = Task.init() + task = Task.init(project_name=None, task_name=None, task_type=None) task.get_logger().flush() # Hack: now make sure we setup the reporter thread diff --git a/trains/storage/helper.py b/trains/storage/helper.py index e540b0ec..bd2c5566 100644 --- a/trains/storage/helper.py +++ b/trains/storage/helper.py @@ -534,7 +534,7 @@ class StorageHelper(object): else: return [obj.name for obj in self._driver.list_container_objects(self._container)] - def download_to_file(self, remote_path, local_path, overwrite_existing=False, delete_on_failure=True): + def download_to_file(self, remote_path, local_path, overwrite_existing=False, delete_on_failure=True, verbose=None): def next_chunk(astream): _tic = time() if isinstance(astream, binary_type): @@ -551,10 +551,11 @@ class StorageHelper(object): return chunk, astream, _tic remote_path = self._canonize_url(remote_path) + verbose = self._verbose if verbose is None else verbose temp_local_path = None try: - if self._verbose: + if verbose: self._log.info('Start downloading from %s' % remote_path) if not overwrite_existing and Path(local_path).is_file(): self._log.warning( @@ -599,7 +600,7 @@ class StorageHelper(object): # if driver supports download with call back, use it (it might be faster) if hasattr(self._driver, 'download_object'): # callback - cb = _DownloadProgressReport(total_size_mb, self._verbose, + cb = _DownloadProgressReport(total_size_mb, verbose, remote_path, chunk_size_mb, self._log) self._driver.download_object(obj, temp_local_path, callback=cb) download_reported = bool(cb.last_reported) @@ -615,7 +616,7 @@ class StorageHelper(object): dl_rate = len(data) / float(1024 * 1024 * tic + 0.000001) dl_total_mb += len(data) / float(1024 * 1024) # report download if we are on the second chunk - if self._verbose or (dl_total_mb * 0.9 > chunk_size_mb): + if verbose or (dl_total_mb * 0.9 > chunk_size_mb): download_reported = True self._log.info('Downloading: %.0fMB / %.2fMb @ %.2fMbs from %s' % (dl_total_mb, total_size_mb, dl_rate, remote_path)) @@ -635,7 +636,7 @@ class StorageHelper(object): # rename temp file to local_file os.rename(temp_local_path, local_path) # report download if we are on the second chunk - if self._verbose or download_reported: + if verbose or download_reported: self._log.info( 'Downloaded %.2f MB successfully from %s , saved to %s' % (dl_total_mb, remote_path, local_path)) return local_path @@ -875,7 +876,7 @@ class _HttpDriver(object): def __init__(self, name, retries=5, **kwargs): self.name = name - self.session = get_http_session_with_retry(total=retries) + self.session = get_http_session_with_retry(total=retries, connect=retries, read=retries, redirect=retries) def get_headers(self, url): if not self._default_backend_session: @@ -1007,7 +1008,7 @@ class _Stream(object): self._leftover = None try: - while size is None or len(data) < size: + while size is None or not data or len(data) < size: chunk = self.next() if chunk is not None: if data is not None: @@ -1017,7 +1018,7 @@ class _Stream(object): except StopIteration: pass - if size is not None and len(data) > size: + if size is not None and data and len(data) > size: self._leftover = data[size:] return data[:size]