From 90fe4570b94f5d49bd4e3bc75cd7c77edca7f9b2 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Tue, 20 Oct 2020 23:27:04 +0300 Subject: [PATCH] Show k8s pod number in task's User Properties configuration section --- trains_agent/glue/k8s.py | 43 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 40 insertions(+), 3 deletions(-) diff --git a/trains_agent/glue/k8s.py b/trains_agent/glue/k8s.py index 9fea7a9..4feda2a 100644 --- a/trains_agent/glue/k8s.py +++ b/trains_agent/glue/k8s.py @@ -11,6 +11,7 @@ from pyhocon import HOCONConverter from trains_agent.commands.events import Events from trains_agent.commands.worker import Worker +from trains_agent.errors import APIError from trains_agent.helper.process import get_bash_output from trains_agent.helper.resource_monitor import ResourceMonitor from trains_agent.interface.base import ObjectID @@ -43,6 +44,8 @@ class K8sIntegration(Worker): AGENT_LABEL = "TRAINS=agent" LIMIT_POD_LABEL = "ai.allegro.agent.serial=pod-{pod_number}" + _edit_hyperparams_version = "2.9" + def __init__( self, k8s_pending_queue_name=None, @@ -78,6 +81,37 @@ class K8sIntegration(Worker): self.log.logger.setLevel(logging.INFO) self.ports_mode = ports_mode self.num_of_services = num_of_services + self._edit_hyperparams_support = None + + def _set_task_user_properties(self, task_id: str, **properties: str): + if self._edit_hyperparams_support is not True: + # either not supported or never tested + if self._edit_hyperparams_support == self._session.api_version: + # tested against latest api_version, not supported + return + if not self._session.check_min_api_version(self._edit_hyperparams_version): + # not supported due to insufficient api_version + self._edit_hyperparams_support = self._session.api_version + return + try: + self._session.get( + service="tasks", + action="edit_hyper_params", + task=task_id, + hyperparams=[ + { + "section": "properties", + "name": k, + "value": str(v), + } + for k, v in properties.items() + ], + ) + # definitely supported + self._runtime_props_support = True + except APIError as error: + if error.code == 404: + self._edit_hyperparams_support = self._session.api_version def run_one_task(self, queue: Text, task_id: Text, worker_args=None, **_): task_data = self._session.api_client.tasks.get_all(id=[task_id])[0] @@ -148,10 +182,8 @@ class K8sIntegration(Worker): kubectl_cmd = kubectl_cmd.split() labels = [self.AGENT_LABEL] - message = "K8s scheduling experiment task id={}".format(task_id) if self.ports_mode: labels.insert(0, self.LIMIT_POD_LABEL.format(pod_number=pod_number)) - message += " pod #{}".format(pod_number) kubectl_cmd += [ "--labels=" + ",".join(labels), @@ -163,10 +195,15 @@ class K8sIntegration(Worker): ] process = subprocess.Popen(kubectl_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, error = process.communicate() - self.log.info(message) + self.log.info("K8s scheduling experiment task id={}".format(task_id)) if error: self.log.error("Running kubectl encountered an error: {}".format( error if isinstance(error, str) else error.decode())) + elif self.ports_mode: + self._set_task_user_properties( + task_id=task_id, + **{"k8s-pod-number": pod_number, "k8s-pod-label": labels[0]} + ) def run_tasks_loop(self, queues: List[Text], worker_params, **kwargs): """