Improve k8s glue layer

This commit is contained in:
allegroai 2020-10-22 18:09:56 +03:00
parent 24f57270ed
commit a09a638b9c
2 changed files with 46 additions and 14 deletions

View File

@ -30,6 +30,10 @@ def parse_args():
"--pod-trains-conf", type=str, "--pod-trains-conf", type=str,
help="Configuration file to be used by the pod itself (if not provided, current configuration is used)" help="Configuration file to be used by the pod itself (if not provided, current configuration is used)"
) )
parser.add_argument(
"--overrides-yaml", type=str,
help="YAML file containing pod overrides to be used when launching a new pod"
)
return parser.parse_args() return parser.parse_args()
@ -43,7 +47,7 @@ def main():
k8s = K8sIntegration( k8s = K8sIntegration(
ports_mode=args.ports_mode, num_of_services=args.num_of_services, user_props_cb=user_props_cb, ports_mode=args.ports_mode, num_of_services=args.num_of_services, user_props_cb=user_props_cb,
trains_conf_file=args.pod_trains_conf) overrides_yaml=args.overrides_yaml, trains_conf_file=args.pod_trains_conf)
k8s.k8s_daemon(args.queue) k8s.k8s_daemon(args.queue)

View File

@ -4,6 +4,8 @@ import base64
import logging import logging
import os import os
import subprocess import subprocess
import yaml
import json
from time import sleep from time import sleep
from typing import Text, List from typing import Text, List
@ -55,6 +57,7 @@ class K8sIntegration(Worker):
ports_mode=False, ports_mode=False,
num_of_services=20, num_of_services=20,
user_props_cb=None, user_props_cb=None,
overrides_yaml=None,
trains_conf_file=None, trains_conf_file=None,
): ):
""" """
@ -73,7 +76,8 @@ class K8sIntegration(Worker):
:param callable user_props_cb: An Optional callable allowing additional user properties to be specified :param callable user_props_cb: An Optional callable allowing additional user properties to be specified
when scheduling a task to run in a pod. Callable can receive an optional pod number and should return when scheduling a task to run in a pod. Callable can receive an optional pod number and should return
a dictionary of user properties (name and value). Signature is [[Optional[int]], Dict[str,str]] a dictionary of user properties (name and value). Signature is [[Optional[int]], Dict[str,str]]
:param str trains_conf_file: trains.conf file to be use by the pod itself :param str overrides_yaml: YAML file containing the overides for the pod (optional)
:param str trains_conf_file: trains.conf file to be use by the pod itself (optional)
""" """
super(K8sIntegration, self).__init__() super(K8sIntegration, self).__init__()
self.k8s_pending_queue_name = k8s_pending_queue_name or self.K8S_PENDING_QUEUE self.k8s_pending_queue_name = k8s_pending_queue_name or self.K8S_PENDING_QUEUE
@ -90,10 +94,16 @@ class K8sIntegration(Worker):
self._edit_hyperparams_support = None self._edit_hyperparams_support = None
self._user_props_cb = user_props_cb self._user_props_cb = user_props_cb
self.trains_conf_file = None self.trains_conf_file = None
self.overrides_json_string = None
if overrides_yaml:
with open(os.path.expandvars(os.path.expanduser(str(overrides_yaml))), 'rt') as f:
overrides = yaml.load(f)
if overrides:
self.overrides_json_string = json.dumps(overrides)
if trains_conf_file: if trains_conf_file:
with open(os.path.expandvars(os.path.expanduser(str(trains_conf_file))), 'rt') as f: with open(os.path.expandvars(os.path.expanduser(str(trains_conf_file))), 'rt') as f:
self.trains_conf_file = f.read() self.trains_conf_file = f.read()
print(self.trains_conf_file)
def _set_task_user_properties(self, task_id: str, **properties: str): def _set_task_user_properties(self, task_id: str, **properties: str):
if self._edit_hyperparams_support is not True: if self._edit_hyperparams_support is not True:
@ -126,10 +136,12 @@ class K8sIntegration(Worker):
self._edit_hyperparams_support = self._session.api_version self._edit_hyperparams_support = self._session.api_version
def run_one_task(self, queue: Text, task_id: Text, worker_args=None, **_): def run_one_task(self, queue: Text, task_id: Text, worker_args=None, **_):
print('Pulling task {} launching on kubernetes cluster'.format(task_id))
task_data = self._session.api_client.tasks.get_all(id=[task_id])[0] task_data = self._session.api_client.tasks.get_all(id=[task_id])[0]
# push task into the k8s queue, so we have visibility on pending tasks in the k8s scheduler # push task into the k8s queue, so we have visibility on pending tasks in the k8s scheduler
try: try:
print('Pushing task {} into temporary pending queue'.format(task_id))
self._session.api_client.tasks.reset(task_id) self._session.api_client.tasks.reset(task_id)
self._session.api_client.tasks.enqueue(task_id, queue=self.k8s_pending_queue_name, self._session.api_client.tasks.enqueue(task_id, queue=self.k8s_pending_queue_name,
status_reason='k8s pending scheduler') status_reason='k8s pending scheduler')
@ -164,6 +176,16 @@ class K8sIntegration(Worker):
queue_id=queue queue_id=queue
) )
# make sure we provide a list
if isinstance(kubectl_cmd, str):
kubectl_cmd = kubectl_cmd.split()
if self.overrides_json_string:
kubectl_cmd += ['--overrides=' + self.overrides_json_string]
if self.ports_mode:
print("Kubernetes looking for available pod to use")
# Search for a free pod number # Search for a free pod number
pod_number = 1 pod_number = 1
while self.ports_mode: while self.ports_mode:
@ -173,14 +195,18 @@ class K8sIntegration(Worker):
) )
process = subprocess.Popen(kubectl_cmd_new.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) process = subprocess.Popen(kubectl_cmd_new.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate() output, error = process.communicate()
output = '' if not output else output if isinstance(output, str) else output.decode('utf-8')
error = '' if not error else error if isinstance(error, str) else error.decode('utf-8')
if not output: if not output:
# No such pod exist so we can use the pod_number we found # No such pod exist so we can use the pod_number we found
break break
if pod_number >= self.num_of_services: if pod_number >= self.num_of_services:
# All pod numbers are taken, exit # All pod numbers are taken, exit
self.log.info( self.log.warning(
"All k8s services are in use, task '{}' will be enqueued back to queue '{}'".format( "kubectl last result: {}\n{}\nAll k8s services are in use, task '{}' "
task_id, queue "will be enqueued back to queue '{}'".format(
error,output, task_id, queue
) )
) )
self._session.api_client.tasks.reset(task_id) self._session.api_client.tasks.reset(task_id)
@ -188,13 +214,12 @@ class K8sIntegration(Worker):
return return
pod_number += 1 pod_number += 1
# make sure we provide a list
if isinstance(kubectl_cmd, str):
kubectl_cmd = kubectl_cmd.split()
labels = [self.AGENT_LABEL] labels = [self.AGENT_LABEL]
if self.ports_mode: if self.ports_mode:
labels.insert(0, self.LIMIT_POD_LABEL.format(pod_number=pod_number)) labels.insert(0, self.LIMIT_POD_LABEL.format(pod_number=pod_number))
print("Kubernetes scheduling task id={} on pod={}".format(task_id, pod_number))
else:
print("Kubernetes scheduling task id={}".format(task_id))
kubectl_cmd += [ kubectl_cmd += [
"--labels=" + ",".join(labels), "--labels=" + ",".join(labels),
@ -206,10 +231,13 @@ class K8sIntegration(Worker):
] ]
process = subprocess.Popen(kubectl_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) process = subprocess.Popen(kubectl_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate() output, error = process.communicate()
self.log.info("K8s scheduling experiment task id={}".format(task_id))
error = '' if not error else error if isinstance(error, str) else error.decode('utf-8')
output = '' if not output else output if isinstance(output, str) else output.decode('utf-8')
print('kubectl output:\n{}\n{}'.format(error, output))
if error: if error:
self.log.error("Running kubectl encountered an error: {}".format( self.log.error("Running kubectl encountered an error: {}".format(error))
error if isinstance(error, str) else error.decode()))
elif self.ports_mode: elif self.ports_mode:
user_props = {"k8s-pod-number": pod_number, "k8s-pod-label": labels[0]} user_props = {"k8s-pod-number": pod_number, "k8s-pod-label": labels[0]}
if self._user_props_cb: if self._user_props_cb:
@ -296,4 +324,4 @@ class K8sIntegration(Worker):
:param list(str) queue: queue name to pull from :param list(str) queue: queue name to pull from
""" """
return self.daemon(queues=[ObjectID(name=queue)], log_level=logging.INFO, foreground=True, docker=False) return self.daemon(queues=[ObjectID(name=queue)] if queue else None, log_level=logging.INFO, foreground=True, docker=False)