Fix timeout for uploading images

This commit is contained in:
allegroai 2019-06-19 19:47:12 +03:00
parent cc536a8f3f
commit 77e2c32523
4 changed files with 6 additions and 5 deletions

View File

@ -74,6 +74,7 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
:type force_create: bool :type force_create: bool
""" """
task_id = self._resolve_task_id(task_id, log=log) if not force_create else None task_id = self._resolve_task_id(task_id, log=log) if not force_create else None
self._edit_lock = RLock()
super(Task, self).__init__(id=task_id, session=session, log=log) super(Task, self).__init__(id=task_id, session=session, log=log)
self._storage_uri = None self._storage_uri = None
self._input_model = None self._input_model = None
@ -85,7 +86,6 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
self._parameters_allowed_types = ( self._parameters_allowed_types = (
six.string_types + six.integer_types + (six.text_type, float, list, dict, type(None)) six.string_types + six.integer_types + (six.text_type, float, list, dict, type(None))
) )
self._edit_lock = RLock()
if not task_id: if not task_id:
# generate a new task # generate a new task

View File

@ -965,6 +965,7 @@ class StorageHelper(object):
class _HttpDriver(object): class _HttpDriver(object):
""" LibCloud http/https adapter (simple, enough for now) """ """ LibCloud http/https adapter (simple, enough for now) """
timeout = (5.0, None)
class _Container(object): class _Container(object):
def __init__(self, name, retries=5, **kwargs): def __init__(self, name, retries=5, **kwargs):
@ -983,7 +984,7 @@ class _HttpDriver(object):
def upload_object_via_stream(self, iterator, container, object_name, extra=None, **kwargs): def upload_object_via_stream(self, iterator, container, object_name, extra=None, **kwargs):
url = object_name[:object_name.index('/')] url = object_name[:object_name.index('/')]
url_path = object_name[len(url)+1:] url_path = object_name[len(url)+1:]
res = container.session.post(container.name+url, files={url_path: iterator}) res = container.session.post(container.name+url, files={url_path: iterator}, timeout=self.timeout)
if res.status_code != requests.codes.ok: if res.status_code != requests.codes.ok:
raise ValueError('Failed uploading object %s (%d): %s' % (object_name, res.status_code, res.text)) raise ValueError('Failed uploading object %s (%d): %s' % (object_name, res.status_code, res.text))
return res return res
@ -998,7 +999,7 @@ class _HttpDriver(object):
container = self._containers[container_name] container = self._containers[container_name]
# set stream flag before get request # set stream flag before get request
container.session.stream = kwargs.get('stream', True) container.session.stream = kwargs.get('stream', True)
res = container.session.get(''.join((container_name, object_name.lstrip('/')))) res = container.session.get(''.join((container_name, object_name.lstrip('/'))), timeout=self.timeout)
if res.status_code != requests.codes.ok: if res.status_code != requests.codes.ok:
raise ValueError('Failed getting object %s (%d): %s' % (object_name, res.status_code, res.text)) raise ValueError('Failed getting object %s (%d): %s' % (object_name, res.status_code, res.text))
return res return res

View File

@ -89,6 +89,7 @@ class Task(_Task):
if private is not Task.__create_protection: if private is not Task.__create_protection:
raise UsageError( raise UsageError(
'Task object cannot be instantiated externally, use Task.current_task() or Task.get_task(...)') 'Task object cannot be instantiated externally, use Task.current_task() or Task.get_task(...)')
self._lock = threading.RLock()
super(Task, self).__init__(**kwargs) super(Task, self).__init__(**kwargs)
self._arguments = _Arguments(self) self._arguments = _Arguments(self)
@ -100,7 +101,6 @@ class Task(_Task):
self._dev_mode_periodic_flag = False self._dev_mode_periodic_flag = False
self._connected_parameter_type = None self._connected_parameter_type = None
self._detect_repo_async_thread = None self._detect_repo_async_thread = None
self._lock = threading.RLock()
# register atexit, so that we mark the task as stopped # register atexit, so that we mark the task as stopped
self._at_exit_called = False self._at_exit_called = False
self.__register_at_exit(self._at_exit) self.__register_at_exit(self._at_exit)

View File

@ -36,7 +36,7 @@ class AsyncManagerMixin(object):
if r.ready(): if r.ready():
continue continue
t = time.time() t = time.time()
r.wait(timeout=remaining or 2.0) r.wait(timeout=remaining)
count += 1 count += 1
if max_num_uploads is not None and max_num_uploads - count <= 0: if max_num_uploads is not None and max_num_uploads - count <= 0:
break break