Protect against enqueue failing due to permission

This commit is contained in:
allegroai 2021-07-31 23:43:45 +03:00
parent 5f7d0348e2
commit e78c1e806a
6 changed files with 14 additions and 7 deletions

View File

@ -673,7 +673,7 @@ class PipelineController(object):
elif node.job.is_cached_task():
node.executed = node.job.task_id()
else:
node.job.launch(queue_name=node.queue or self._default_execution_queue)
return node.job.launch(queue_name=node.queue or self._default_execution_queue)
return True

View File

@ -65,7 +65,8 @@ class _TrainsBandsterWorker(Worker):
self._current_job = self.optimizer.helper_create_job(self.base_task_id, parameter_override=config)
# noinspection PyProtectedMember
self.optimizer._current_jobs.append(self._current_job)
self._current_job.launch(self.queue_name)
if not self._current_job.launch(self.queue_name):
return dict()
iteration_value = None
is_pending = True

View File

@ -159,7 +159,7 @@ class ClearmlJob(object):
:param str queue_name:
:return False if Task is not in "created" status (i.e. cannot be enqueued)
:return False if Task is not in "created" status (i.e. cannot be enqueued) or cannot be enqueued
"""
if self._is_cached_task:
return False
@ -167,7 +167,7 @@ class ClearmlJob(object):
Task.enqueue(task=self.task, queue_name=queue_name)
return True
except Exception as ex:
logger.warning(ex)
logger.warning('Error enqueuing Task {} to {}: {}'.format(self.task, queue_name, ex))
return False
def abort(self):

View File

@ -407,8 +407,10 @@ class SearchStrategy(object):
new_job = self.create_job()
if not new_job:
break
if not new_job.launch(self._execution_queue):
# error enqueuing Job, something wrong here
continue
self._num_jobs += 1
new_job.launch(self._execution_queue)
self._current_jobs.append(new_job)
self._pending_jobs.append(new_job)

View File

@ -27,7 +27,7 @@ class OptunaObjective(object):
self._config_space = config_space
def objective(self, trial):
# type: (optuna.Trial) -> float
# type: (optuna.Trial) -> Optional[float]
"""
return metric value for a specified set of parameter, pulled from the trail object
@ -42,7 +42,9 @@ class OptunaObjective(object):
current_job = self.optimizer.helper_create_job(self.base_task_id, parameter_override=parameter_override)
# noinspection PyProtectedMember
self.optimizer._current_jobs.append(current_job)
current_job.launch(self.queue_name)
if not current_job.launch(self.queue_name):
# failed launching the job
return None
iteration_value = None
is_pending = True
while True:

View File

@ -998,6 +998,8 @@ class Task(_Task):
req = tasks.EnqueueRequest(task=task_id, queue=queue_id)
res = cls._send(session=session, req=req)
if not res.ok():
raise ValueError(res.response)
resp = res.response
return resp