Fix multi-processing issue with Task of type inference

This commit is contained in:
allegroai 2019-10-04 01:32:59 +03:00
parent 6aa22e449e
commit e5c0a738c3
2 changed files with 10 additions and 9 deletions

View File

@ -63,7 +63,7 @@ class PatchOsFork(object):
from ..task import Task from ..task import Task
if Task.current_task() is not None: if Task.current_task() is not None:
# bind sub-process logger # bind sub-process logger
task = Task.init() task = Task.init(project_name=None, task_name=None, task_type=None)
task.get_logger().flush() task.get_logger().flush()
# Hack: now make sure we setup the reporter thread # Hack: now make sure we setup the reporter thread

View File

@ -534,7 +534,7 @@ class StorageHelper(object):
else: else:
return [obj.name for obj in self._driver.list_container_objects(self._container)] return [obj.name for obj in self._driver.list_container_objects(self._container)]
def download_to_file(self, remote_path, local_path, overwrite_existing=False, delete_on_failure=True): def download_to_file(self, remote_path, local_path, overwrite_existing=False, delete_on_failure=True, verbose=None):
def next_chunk(astream): def next_chunk(astream):
_tic = time() _tic = time()
if isinstance(astream, binary_type): if isinstance(astream, binary_type):
@ -551,10 +551,11 @@ class StorageHelper(object):
return chunk, astream, _tic return chunk, astream, _tic
remote_path = self._canonize_url(remote_path) remote_path = self._canonize_url(remote_path)
verbose = self._verbose if verbose is None else verbose
temp_local_path = None temp_local_path = None
try: try:
if self._verbose: if verbose:
self._log.info('Start downloading from %s' % remote_path) self._log.info('Start downloading from %s' % remote_path)
if not overwrite_existing and Path(local_path).is_file(): if not overwrite_existing and Path(local_path).is_file():
self._log.warning( self._log.warning(
@ -599,7 +600,7 @@ class StorageHelper(object):
# if driver supports download with call back, use it (it might be faster) # if driver supports download with call back, use it (it might be faster)
if hasattr(self._driver, 'download_object'): if hasattr(self._driver, 'download_object'):
# callback # callback
cb = _DownloadProgressReport(total_size_mb, self._verbose, cb = _DownloadProgressReport(total_size_mb, verbose,
remote_path, chunk_size_mb, self._log) remote_path, chunk_size_mb, self._log)
self._driver.download_object(obj, temp_local_path, callback=cb) self._driver.download_object(obj, temp_local_path, callback=cb)
download_reported = bool(cb.last_reported) download_reported = bool(cb.last_reported)
@ -615,7 +616,7 @@ class StorageHelper(object):
dl_rate = len(data) / float(1024 * 1024 * tic + 0.000001) dl_rate = len(data) / float(1024 * 1024 * tic + 0.000001)
dl_total_mb += len(data) / float(1024 * 1024) dl_total_mb += len(data) / float(1024 * 1024)
# report download if we are on the second chunk # report download if we are on the second chunk
if self._verbose or (dl_total_mb * 0.9 > chunk_size_mb): if verbose or (dl_total_mb * 0.9 > chunk_size_mb):
download_reported = True download_reported = True
self._log.info('Downloading: %.0fMB / %.2fMb @ %.2fMbs from %s' % self._log.info('Downloading: %.0fMB / %.2fMb @ %.2fMbs from %s' %
(dl_total_mb, total_size_mb, dl_rate, remote_path)) (dl_total_mb, total_size_mb, dl_rate, remote_path))
@ -635,7 +636,7 @@ class StorageHelper(object):
# rename temp file to local_file # rename temp file to local_file
os.rename(temp_local_path, local_path) os.rename(temp_local_path, local_path)
# report download if we are on the second chunk # report download if we are on the second chunk
if self._verbose or download_reported: if verbose or download_reported:
self._log.info( self._log.info(
'Downloaded %.2f MB successfully from %s , saved to %s' % (dl_total_mb, remote_path, local_path)) 'Downloaded %.2f MB successfully from %s , saved to %s' % (dl_total_mb, remote_path, local_path))
return local_path return local_path
@ -875,7 +876,7 @@ class _HttpDriver(object):
def __init__(self, name, retries=5, **kwargs): def __init__(self, name, retries=5, **kwargs):
self.name = name self.name = name
self.session = get_http_session_with_retry(total=retries) self.session = get_http_session_with_retry(total=retries, connect=retries, read=retries, redirect=retries)
def get_headers(self, url): def get_headers(self, url):
if not self._default_backend_session: if not self._default_backend_session:
@ -1007,7 +1008,7 @@ class _Stream(object):
self._leftover = None self._leftover = None
try: try:
while size is None or len(data) < size: while size is None or not data or len(data) < size:
chunk = self.next() chunk = self.next()
if chunk is not None: if chunk is not None:
if data is not None: if data is not None:
@ -1017,7 +1018,7 @@ class _Stream(object):
except StopIteration: except StopIteration:
pass pass
if size is not None and len(data) > size: if size is not None and data and len(data) > size:
self._leftover = data[size:] self._leftover = data[size:]
return data[:size] return data[:size]