Show k8s pod number in task's User Properties configuration section

This commit is contained in:
allegroai 2020-10-20 23:27:04 +03:00
parent 92fc8e838f
commit 90fe4570b9

View File

@ -11,6 +11,7 @@ from pyhocon import HOCONConverter
from trains_agent.commands.events import Events from trains_agent.commands.events import Events
from trains_agent.commands.worker import Worker from trains_agent.commands.worker import Worker
from trains_agent.errors import APIError
from trains_agent.helper.process import get_bash_output from trains_agent.helper.process import get_bash_output
from trains_agent.helper.resource_monitor import ResourceMonitor from trains_agent.helper.resource_monitor import ResourceMonitor
from trains_agent.interface.base import ObjectID from trains_agent.interface.base import ObjectID
@ -43,6 +44,8 @@ class K8sIntegration(Worker):
AGENT_LABEL = "TRAINS=agent" AGENT_LABEL = "TRAINS=agent"
LIMIT_POD_LABEL = "ai.allegro.agent.serial=pod-{pod_number}" LIMIT_POD_LABEL = "ai.allegro.agent.serial=pod-{pod_number}"
_edit_hyperparams_version = "2.9"
def __init__( def __init__(
self, self,
k8s_pending_queue_name=None, k8s_pending_queue_name=None,
@ -78,6 +81,37 @@ class K8sIntegration(Worker):
self.log.logger.setLevel(logging.INFO) self.log.logger.setLevel(logging.INFO)
self.ports_mode = ports_mode self.ports_mode = ports_mode
self.num_of_services = num_of_services self.num_of_services = num_of_services
self._edit_hyperparams_support = None
def _set_task_user_properties(self, task_id: str, **properties: str):
if self._edit_hyperparams_support is not True:
# either not supported or never tested
if self._edit_hyperparams_support == self._session.api_version:
# tested against latest api_version, not supported
return
if not self._session.check_min_api_version(self._edit_hyperparams_version):
# not supported due to insufficient api_version
self._edit_hyperparams_support = self._session.api_version
return
try:
self._session.get(
service="tasks",
action="edit_hyper_params",
task=task_id,
hyperparams=[
{
"section": "properties",
"name": k,
"value": str(v),
}
for k, v in properties.items()
],
)
# definitely supported
self._runtime_props_support = True
except APIError as error:
if error.code == 404:
self._edit_hyperparams_support = self._session.api_version
def run_one_task(self, queue: Text, task_id: Text, worker_args=None, **_): def run_one_task(self, queue: Text, task_id: Text, worker_args=None, **_):
task_data = self._session.api_client.tasks.get_all(id=[task_id])[0] task_data = self._session.api_client.tasks.get_all(id=[task_id])[0]
@ -148,10 +182,8 @@ class K8sIntegration(Worker):
kubectl_cmd = kubectl_cmd.split() kubectl_cmd = kubectl_cmd.split()
labels = [self.AGENT_LABEL] labels = [self.AGENT_LABEL]
message = "K8s scheduling experiment task id={}".format(task_id)
if self.ports_mode: if self.ports_mode:
labels.insert(0, self.LIMIT_POD_LABEL.format(pod_number=pod_number)) labels.insert(0, self.LIMIT_POD_LABEL.format(pod_number=pod_number))
message += " pod #{}".format(pod_number)
kubectl_cmd += [ kubectl_cmd += [
"--labels=" + ",".join(labels), "--labels=" + ",".join(labels),
@ -163,10 +195,15 @@ class K8sIntegration(Worker):
] ]
process = subprocess.Popen(kubectl_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) process = subprocess.Popen(kubectl_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate() output, error = process.communicate()
self.log.info(message) self.log.info("K8s scheduling experiment task id={}".format(task_id))
if error: if error:
self.log.error("Running kubectl encountered an error: {}".format( self.log.error("Running kubectl encountered an error: {}".format(
error if isinstance(error, str) else error.decode())) error if isinstance(error, str) else error.decode()))
elif self.ports_mode:
self._set_task_user_properties(
task_id=task_id,
**{"k8s-pod-number": pod_number, "k8s-pod-label": labels[0]}
)
def run_tasks_loop(self, queues: List[Text], worker_params, **kwargs): def run_tasks_loop(self, queues: List[Text], worker_params, **kwargs):
""" """