Improve k8s glue add --template-yaml

This commit is contained in:
allegroai 2020-10-23 01:28:22 +03:00
parent 6bad2b5352
commit 293a92f486
3 changed files with 172 additions and 37 deletions

View File

@ -34,6 +34,11 @@ def parse_args():
"--overrides-yaml", type=str,
help="YAML file containing pod overrides to be used when launching a new pod"
)
parser.add_argument(
"--template-yaml", type=str,
help="YAML file containing pod template. If provided pod will be scheduled with kubectl apply "
"and overrides are ignored, otherwise it will be scheduled with kubectl run"
)
return parser.parse_args()
@ -47,7 +52,7 @@ def main():
k8s = K8sIntegration(
ports_mode=args.ports_mode, num_of_services=args.num_of_services, user_props_cb=user_props_cb,
overrides_yaml=args.overrides_yaml, trains_conf_file=args.pod_trains_conf)
overrides_yaml=args.overrides_yaml, trains_conf_file=args.pod_trains_conf, template_yaml=args.template_yaml)
k8s.k8s_daemon(args.queue)

View File

@ -4,6 +4,9 @@ import base64
import logging
import os
import subprocess
import tempfile
from copy import deepcopy
import yaml
import json
from time import sleep
@ -14,6 +17,8 @@ from pyhocon import HOCONConverter
from trains_agent.commands.events import Events
from trains_agent.commands.worker import Worker
from trains_agent.errors import APIError
from trains_agent.helper.base import safe_remove_file
from trains_agent.helper.dicts import merge_dicts
from trains_agent.helper.process import get_bash_output
from trains_agent.helper.resource_monitor import ResourceMonitor
from trains_agent.interface.base import ObjectID
@ -22,6 +27,8 @@ from trains_agent.interface.base import ObjectID
class K8sIntegration(Worker):
K8S_PENDING_QUEUE = "k8s_scheduler"
KUBECTL_APPLY_CMD = "kubectl apply -f"
KUBECTL_RUN_CMD = "kubectl run trains-id-{task_id} " \
"--image {docker_image} " \
"--restart=Never --replicas=1 " \
@ -58,6 +65,7 @@ class K8sIntegration(Worker):
num_of_services=20,
user_props_cb=None,
overrides_yaml=None,
template_yaml=None,
trains_conf_file=None,
):
"""
@ -76,7 +84,9 @@ class K8sIntegration(Worker):
:param callable user_props_cb: An Optional callable allowing additional user properties to be specified
when scheduling a task to run in a pod. Callable can receive an optional pod number and should return
a dictionary of user properties (name and value). Signature is [[Optional[int]], Dict[str,str]]
:param str overrides_yaml: YAML file containing the overides for the pod (optional)
:param str overrides_yaml: YAML file containing the overrides for the pod (optional)
:param str template_yaml: YAML file containing the template for the pod (optional).
If provided the pod is scheduled with kubectl apply and overrides are ignored, otherwise with kubectl run.
:param str trains_conf_file: trains.conf file to be use by the pod itself (optional)
"""
super(K8sIntegration, self).__init__()
@ -95,11 +105,34 @@ class K8sIntegration(Worker):
self._user_props_cb = user_props_cb
self.trains_conf_file = None
self.overrides_json_string = None
self.template_dict = None
self.pod_limits = []
self.pod_requests = []
if overrides_yaml:
with open(os.path.expandvars(os.path.expanduser(str(overrides_yaml))), 'rt') as f:
overrides = yaml.load(f)
if overrides:
containers = overrides.get('spec', {}).get('containers', [])
for c in containers:
resources = {str(k).lower(): v for k, v in c.get('resources', {}).items()}
if not resources:
continue
if resources.get('limits'):
self.pod_limits += ['{}={}'.format(k, v) for k, v in resources['limits'].items()]
if resources.get('requests'):
self.pod_requests += ['{}={}'.format(k, v) for k, v in resources['requests'].items()]
# remove double entries
self.pod_limits = list(set(self.pod_limits))
self.pod_requests = list(set(self.pod_requests))
if self.pod_limits or self.pod_requests:
self.log.warning('Found pod container requests={} limits={}'.format(
self.pod_limits, self.pod_requests))
if containers:
self.log.warning('Removing containers section: {}'.format(overrides['spec'].pop('containers')))
self.overrides_json_string = json.dumps(overrides)
if template_yaml:
with open(os.path.expandvars(os.path.expanduser(str(template_yaml))), 'rt') as f:
self.template_dict = yaml.load(f)
if trains_conf_file:
with open(os.path.expandvars(os.path.expanduser(str(trains_conf_file))), 'rt') as f:
@ -151,14 +184,17 @@ class K8sIntegration(Worker):
return
if task_data.execution.docker_cmd:
docker_image = task_data.execution.docker_cmd
docker_parts = task_data.execution.docker_cmd
else:
docker_image = str(os.environ.get("TRAINS_DOCKER_IMAGE") or
docker_parts = str(os.environ.get("TRAINS_DOCKER_IMAGE") or
self._session.config.get("agent.default_docker.image", "nvidia/cuda"))
# take the first part, this is the docker image name (not arguments)
docker_image = docker_image.split()[0]
docker_parts = docker_parts.split()
docker_image = docker_parts[0]
docker_args = docker_parts[1:] if len(docker_parts) > 1 else []
# get the trains.conf encoded file
hocon_config_encoded = (
self.trains_conf_file or HOCONConverter.to_hocon(self._session.config._config)).encode('ascii')
create_trains_conf = "echo '{}' | base64 --decode >> ~/trains.conf && ".format(
@ -167,22 +203,6 @@ class K8sIntegration(Worker):
).decode('ascii')
)
if callable(self.kubectl_cmd):
kubectl_cmd = self.kubectl_cmd(task_id, docker_image, queue, task_data)
else:
kubectl_cmd = self.kubectl_cmd.format(
task_id=task_id,
docker_image=docker_image,
queue_id=queue
)
# make sure we provide a list
if isinstance(kubectl_cmd, str):
kubectl_cmd = kubectl_cmd.split()
if self.overrides_json_string:
kubectl_cmd += ['--overrides=' + self.overrides_json_string]
if self.ports_mode:
print("Kubernetes looking for available pod to use")
@ -214,26 +234,27 @@ class K8sIntegration(Worker):
return
pod_number += 1
labels = [self.AGENT_LABEL]
labels = ([self.LIMIT_POD_LABEL.format(pod_number=pod_number)] if self.ports_mode else []) + [self.AGENT_LABEL]
if self.ports_mode:
labels.insert(0, self.LIMIT_POD_LABEL.format(pod_number=pod_number))
print("Kubernetes scheduling task id={} on pod={}".format(task_id, pod_number))
else:
print("Kubernetes scheduling task id={}".format(task_id))
kubectl_cmd += [
"--labels=" + ",".join(labels),
"--command",
"--",
"/bin/sh",
"-c",
create_trains_conf + self.container_bash_script.format(task_id),
]
process = subprocess.Popen(kubectl_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate()
if self.template_dict:
output, error = self._kubectl_apply(
create_trains_conf=create_trains_conf,
labels=labels, docker_image=docker_image, docker_args=docker_args,
task_id=task_id, queue=queue)
else:
output, error = self._kubectl_run(
create_trains_conf=create_trains_conf,
labels=labels, docker_image=docker_image,
task_data=task_data,
task_id=task_id, queue=queue)
error = '' if not error else error if isinstance(error, str) else error.decode('utf-8')
output = '' if not output else output if isinstance(output, str) else output.decode('utf-8')
error = '' if not error else (error if isinstance(error, str) else error.decode('utf-8'))
output = '' if not output else (output if isinstance(output, str) else output.decode('utf-8'))
print('kubectl output:\n{}\n{}'.format(error, output))
if error:
@ -249,9 +270,106 @@ class K8sIntegration(Worker):
pass
self._set_task_user_properties(
task_id=task_id,
**user_props,
**user_props
)
def _parse_docker_args(self, docker_args):
# type: (list) -> dict
kube_args = {'env': []}
while docker_args:
cmd = docker_args.pop().strip()
if cmd in ('-e', '--env',):
env = docker_args.pop().strip()
key, value = env.split('=', 1)
kube_args[key] += {key: value}
else:
self.log.warning('skipping docker argument {} (only -e --env supported)'.format(cmd))
return kube_args
def _kubectl_apply(self, create_trains_conf, docker_image, docker_args, labels, queue, task_id):
template = deepcopy(self.template_dict)
template.setdefault('apiVersion', 'v1')
template['kind'] = 'Pod'
template.setdefault('metadata', {})
name = 'trains-id-{task_id}'.format(task_id=task_id)
template['metadata']['name'] = name
template.setdefault('spec', {})
template['spec'].setdefault('containers', [])
if labels:
labels_dict = dict(pair.split('=', 1) for pair in labels)
template['metadata'].setdefault('labels', {})
template['metadata']['labels'].update(labels_dict)
container = self._parse_docker_args(docker_args)
container = merge_dicts(
container,
dict(name=name, image=docker_image,
command=['/bin/sh'],
args=['-c', create_trains_conf + self.container_bash_script.format(task_id)]))
if template['spec']['containers']:
template['spec']['containers'][0] = merge_dicts(template['spec']['containers'][0], container)
else:
template['spec']['containers'].append(container)
fp, yaml_file = tempfile.mkstemp(prefix='trains_k8stmpl_', suffix='.yml')
os.close(fp)
with open(yaml_file, 'wt') as f:
yaml.dump(template, f)
kubectl_cmd = self.KUBECTL_APPLY_CMD.format(
task_id=task_id,
docker_image=docker_image,
queue_id=queue,
)
# make sure we provide a list
if isinstance(kubectl_cmd, str):
kubectl_cmd = kubectl_cmd.split()
# add the template file at the end
kubectl_cmd += [yaml_file]
try:
process = subprocess.Popen(kubectl_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate()
except Exception as ex:
return None, str(ex)
finally:
safe_remove_file(yaml_file)
return output, error
def _kubectl_run(self, create_trains_conf, docker_image, labels, queue, task_data, task_id):
if callable(self.kubectl_cmd):
kubectl_cmd = self.kubectl_cmd(task_id, docker_image, queue, task_data)
else:
kubectl_cmd = self.kubectl_cmd.format(
task_id=task_id,
docker_image=docker_image,
queue_id=queue
)
# make sure we provide a list
if isinstance(kubectl_cmd, str):
kubectl_cmd = kubectl_cmd.split()
if self.overrides_json_string:
kubectl_cmd += ['--overrides=' + self.overrides_json_string]
if self.pod_limits:
kubectl_cmd += ['--limits', ",".join(self.pod_limits)]
if self.pod_requests:
kubectl_cmd += ['--requests', ",".join(self.pod_requests)]
kubectl_cmd += [
"--labels=" + ",".join(labels),
"--command",
"--",
"/bin/sh",
"-c",
create_trains_conf + self.container_bash_script.format(task_id),
]
process = subprocess.Popen(kubectl_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate()
return output, error
def run_tasks_loop(self, queues: List[Text], worker_params, **kwargs):
"""
:summary: Pull and run tasks from queues.
@ -277,7 +395,7 @@ class K8sIntegration(Worker):
while True:
# iterate over queues (priority style, queues[0] is highest)
for queue in queues:
# delete old completed /failed pods
# delete old completed / failed pods
get_bash_output(self.KUBECTL_DELETE_CMD)
# get next task in queue

View File

@ -3,3 +3,15 @@ from typing import Callable, Dict, Any
def filter_keys(filter_, dct): # type: (Callable[[Any], bool], Dict) -> Dict
return {key: value for key, value in dct.items() if filter_(key)}
def merge_dicts(dict1, dict2):
""" Recursively merges dict2 into dict1 """
if not isinstance(dict1, dict) or not isinstance(dict2, dict):
return dict2
for k in dict2:
if k in dict1:
dict1[k] = merge_dicts(dict1[k], dict2[k])
else:
dict1[k] = dict2[k]
return dict1