Fix resolving k8s pending queue may cause a queue with a uuid name to be created

This commit is contained in:
allegroai 2022-09-02 23:45:24 +03:00
parent 918dd39b87
commit 97c2e21dcc

View File

@ -130,6 +130,7 @@ class K8sIntegration(Worker):
""" """
super(K8sIntegration, self).__init__() super(K8sIntegration, self).__init__()
self.k8s_pending_queue_name = k8s_pending_queue_name or self.K8S_PENDING_QUEUE self.k8s_pending_queue_name = k8s_pending_queue_name or self.K8S_PENDING_QUEUE
self.k8s_pending_queue_id = None
self.kubectl_cmd = kubectl_cmd or self.KUBECTL_RUN_CMD self.kubectl_cmd = kubectl_cmd or self.KUBECTL_RUN_CMD
self.container_bash_script = container_bash_script or self.CONTAINER_BASH_SCRIPT self.container_bash_script = container_bash_script or self.CONTAINER_BASH_SCRIPT
# Always do system packages, because by we will be running inside a docker # Always do system packages, because by we will be running inside a docker
@ -394,17 +395,17 @@ class K8sIntegration(Worker):
# push task into the k8s queue, so we have visibility on pending tasks in the k8s scheduler # push task into the k8s queue, so we have visibility on pending tasks in the k8s scheduler
try: try:
print('Pushing task {} into temporary pending queue'.format(task_id)) print('Pushing task {} into temporary pending queue'.format(task_id))
res = self._session.api_client.tasks.stop(task_id, force=True) _ = self._session.api_client.tasks.stop(task_id, force=True)
res = self._session.api_client.tasks.enqueue( res = self._session.api_client.tasks.enqueue(
task_id, task_id,
queue=self.k8s_pending_queue_name, queue=self.k8s_pending_queue_id,
status_reason='k8s pending scheduler', status_reason='k8s pending scheduler',
) )
if res.meta.result_code != 200: if res.meta.result_code != 200:
raise Exception(res.meta.result_msg) raise Exception(res.meta.result_msg)
except Exception as e: except Exception as e:
self.log.error("ERROR: Could not push back task [{}] to k8s pending queue [{}], error: {}".format( self.log.error("ERROR: Could not push back task [{}] to k8s pending queue {} [{}], error: {}".format(
task_id, self.k8s_pending_queue_name, e)) task_id, self.k8s_pending_queue_name, self.k8s_pending_queue_id, e))
return return
container = get_task_container(self._session, task_id) container = get_task_container(self._session, task_id)
@ -765,13 +766,13 @@ class K8sIntegration(Worker):
events_service = self.get_service(Events) events_service = self.get_service(Events)
# make sure we have a k8s pending queue # make sure we have a k8s pending queue
# noinspection PyBroadException if not self.k8s_pending_queue_id:
try: resolved_ids = self._resolve_queue_names([self.k8s_pending_queue_name], create_if_missing=True)
self._session.api_client.queues.create(self.k8s_pending_queue_name) if not resolved_ids:
except Exception: raise ValueError(
pass "Failed resolving or creating k8s pending queue {}".format(self.k8s_pending_queue_name)
# get queue id )
self.k8s_pending_queue_name = self._resolve_name(self.k8s_pending_queue_name, "queues") self.k8s_pending_queue_id = resolved_ids[0]
_last_machine_update_ts = 0 _last_machine_update_ts = 0
while True: while True: