From 293a92f48660bac7b1dedc90ffd22803adad8ff5 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Fri, 23 Oct 2020 01:28:22 +0300 Subject: [PATCH] Improve k8s glue add --template-yaml --- examples/k8s_glue_example.py | 7 +- trains_agent/glue/k8s.py | 190 ++++++++++++++++++++++++++++------- trains_agent/helper/dicts.py | 12 +++ 3 files changed, 172 insertions(+), 37 deletions(-) diff --git a/examples/k8s_glue_example.py b/examples/k8s_glue_example.py index da67bb8..9ee4bcf 100644 --- a/examples/k8s_glue_example.py +++ b/examples/k8s_glue_example.py @@ -34,6 +34,11 @@ def parse_args(): "--overrides-yaml", type=str, help="YAML file containing pod overrides to be used when launching a new pod" ) + parser.add_argument( + "--template-yaml", type=str, + help="YAML file containing pod template. If provided pod will be scheduled with kubectl apply " + "and overrides are ignored, otherwise it will be scheduled with kubectl run" + ) return parser.parse_args() @@ -47,7 +52,7 @@ def main(): k8s = K8sIntegration( ports_mode=args.ports_mode, num_of_services=args.num_of_services, user_props_cb=user_props_cb, - overrides_yaml=args.overrides_yaml, trains_conf_file=args.pod_trains_conf) + overrides_yaml=args.overrides_yaml, trains_conf_file=args.pod_trains_conf, template_yaml=args.template_yaml) k8s.k8s_daemon(args.queue) diff --git a/trains_agent/glue/k8s.py b/trains_agent/glue/k8s.py index 72c9d3e..e4bd435 100644 --- a/trains_agent/glue/k8s.py +++ b/trains_agent/glue/k8s.py @@ -4,6 +4,9 @@ import base64 import logging import os import subprocess +import tempfile +from copy import deepcopy + import yaml import json from time import sleep @@ -14,6 +17,8 @@ from pyhocon import HOCONConverter from trains_agent.commands.events import Events from trains_agent.commands.worker import Worker from trains_agent.errors import APIError +from trains_agent.helper.base import safe_remove_file +from trains_agent.helper.dicts import merge_dicts from trains_agent.helper.process import get_bash_output from trains_agent.helper.resource_monitor import ResourceMonitor from trains_agent.interface.base import ObjectID @@ -22,6 +27,8 @@ from trains_agent.interface.base import ObjectID class K8sIntegration(Worker): K8S_PENDING_QUEUE = "k8s_scheduler" + KUBECTL_APPLY_CMD = "kubectl apply -f" + KUBECTL_RUN_CMD = "kubectl run trains-id-{task_id} " \ "--image {docker_image} " \ "--restart=Never --replicas=1 " \ @@ -58,6 +65,7 @@ class K8sIntegration(Worker): num_of_services=20, user_props_cb=None, overrides_yaml=None, + template_yaml=None, trains_conf_file=None, ): """ @@ -76,7 +84,9 @@ class K8sIntegration(Worker): :param callable user_props_cb: An Optional callable allowing additional user properties to be specified when scheduling a task to run in a pod. Callable can receive an optional pod number and should return a dictionary of user properties (name and value). Signature is [[Optional[int]], Dict[str,str]] - :param str overrides_yaml: YAML file containing the overides for the pod (optional) + :param str overrides_yaml: YAML file containing the overrides for the pod (optional) + :param str template_yaml: YAML file containing the template for the pod (optional). + If provided the pod is scheduled with kubectl apply and overrides are ignored, otherwise with kubectl run. :param str trains_conf_file: trains.conf file to be use by the pod itself (optional) """ super(K8sIntegration, self).__init__() @@ -95,11 +105,34 @@ class K8sIntegration(Worker): self._user_props_cb = user_props_cb self.trains_conf_file = None self.overrides_json_string = None + self.template_dict = None + self.pod_limits = [] + self.pod_requests = [] if overrides_yaml: with open(os.path.expandvars(os.path.expanduser(str(overrides_yaml))), 'rt') as f: overrides = yaml.load(f) if overrides: + containers = overrides.get('spec', {}).get('containers', []) + for c in containers: + resources = {str(k).lower(): v for k, v in c.get('resources', {}).items()} + if not resources: + continue + if resources.get('limits'): + self.pod_limits += ['{}={}'.format(k, v) for k, v in resources['limits'].items()] + if resources.get('requests'): + self.pod_requests += ['{}={}'.format(k, v) for k, v in resources['requests'].items()] + # remove double entries + self.pod_limits = list(set(self.pod_limits)) + self.pod_requests = list(set(self.pod_requests)) + if self.pod_limits or self.pod_requests: + self.log.warning('Found pod container requests={} limits={}'.format( + self.pod_limits, self.pod_requests)) + if containers: + self.log.warning('Removing containers section: {}'.format(overrides['spec'].pop('containers'))) self.overrides_json_string = json.dumps(overrides) + if template_yaml: + with open(os.path.expandvars(os.path.expanduser(str(template_yaml))), 'rt') as f: + self.template_dict = yaml.load(f) if trains_conf_file: with open(os.path.expandvars(os.path.expanduser(str(trains_conf_file))), 'rt') as f: @@ -151,14 +184,17 @@ class K8sIntegration(Worker): return if task_data.execution.docker_cmd: - docker_image = task_data.execution.docker_cmd + docker_parts = task_data.execution.docker_cmd else: - docker_image = str(os.environ.get("TRAINS_DOCKER_IMAGE") or + docker_parts = str(os.environ.get("TRAINS_DOCKER_IMAGE") or self._session.config.get("agent.default_docker.image", "nvidia/cuda")) # take the first part, this is the docker image name (not arguments) - docker_image = docker_image.split()[0] + docker_parts = docker_parts.split() + docker_image = docker_parts[0] + docker_args = docker_parts[1:] if len(docker_parts) > 1 else [] + # get the trains.conf encoded file hocon_config_encoded = ( self.trains_conf_file or HOCONConverter.to_hocon(self._session.config._config)).encode('ascii') create_trains_conf = "echo '{}' | base64 --decode >> ~/trains.conf && ".format( @@ -167,22 +203,6 @@ class K8sIntegration(Worker): ).decode('ascii') ) - if callable(self.kubectl_cmd): - kubectl_cmd = self.kubectl_cmd(task_id, docker_image, queue, task_data) - else: - kubectl_cmd = self.kubectl_cmd.format( - task_id=task_id, - docker_image=docker_image, - queue_id=queue - ) - - # make sure we provide a list - if isinstance(kubectl_cmd, str): - kubectl_cmd = kubectl_cmd.split() - - if self.overrides_json_string: - kubectl_cmd += ['--overrides=' + self.overrides_json_string] - if self.ports_mode: print("Kubernetes looking for available pod to use") @@ -214,26 +234,27 @@ class K8sIntegration(Worker): return pod_number += 1 - labels = [self.AGENT_LABEL] + labels = ([self.LIMIT_POD_LABEL.format(pod_number=pod_number)] if self.ports_mode else []) + [self.AGENT_LABEL] + if self.ports_mode: - labels.insert(0, self.LIMIT_POD_LABEL.format(pod_number=pod_number)) print("Kubernetes scheduling task id={} on pod={}".format(task_id, pod_number)) else: print("Kubernetes scheduling task id={}".format(task_id)) - kubectl_cmd += [ - "--labels=" + ",".join(labels), - "--command", - "--", - "/bin/sh", - "-c", - create_trains_conf + self.container_bash_script.format(task_id), - ] - process = subprocess.Popen(kubectl_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - output, error = process.communicate() + if self.template_dict: + output, error = self._kubectl_apply( + create_trains_conf=create_trains_conf, + labels=labels, docker_image=docker_image, docker_args=docker_args, + task_id=task_id, queue=queue) + else: + output, error = self._kubectl_run( + create_trains_conf=create_trains_conf, + labels=labels, docker_image=docker_image, + task_data=task_data, + task_id=task_id, queue=queue) - error = '' if not error else error if isinstance(error, str) else error.decode('utf-8') - output = '' if not output else output if isinstance(output, str) else output.decode('utf-8') + error = '' if not error else (error if isinstance(error, str) else error.decode('utf-8')) + output = '' if not output else (output if isinstance(output, str) else output.decode('utf-8')) print('kubectl output:\n{}\n{}'.format(error, output)) if error: @@ -249,9 +270,106 @@ class K8sIntegration(Worker): pass self._set_task_user_properties( task_id=task_id, - **user_props, + **user_props ) + def _parse_docker_args(self, docker_args): + # type: (list) -> dict + kube_args = {'env': []} + while docker_args: + cmd = docker_args.pop().strip() + if cmd in ('-e', '--env',): + env = docker_args.pop().strip() + key, value = env.split('=', 1) + kube_args[key] += {key: value} + else: + self.log.warning('skipping docker argument {} (only -e --env supported)'.format(cmd)) + return kube_args + + def _kubectl_apply(self, create_trains_conf, docker_image, docker_args, labels, queue, task_id): + template = deepcopy(self.template_dict) + template.setdefault('apiVersion', 'v1') + template['kind'] = 'Pod' + template.setdefault('metadata', {}) + name = 'trains-id-{task_id}'.format(task_id=task_id) + template['metadata']['name'] = name + template.setdefault('spec', {}) + template['spec'].setdefault('containers', []) + if labels: + labels_dict = dict(pair.split('=', 1) for pair in labels) + template['metadata'].setdefault('labels', {}) + template['metadata']['labels'].update(labels_dict) + container = self._parse_docker_args(docker_args) + container = merge_dicts( + container, + dict(name=name, image=docker_image, + command=['/bin/sh'], + args=['-c', create_trains_conf + self.container_bash_script.format(task_id)])) + + if template['spec']['containers']: + template['spec']['containers'][0] = merge_dicts(template['spec']['containers'][0], container) + else: + template['spec']['containers'].append(container) + + fp, yaml_file = tempfile.mkstemp(prefix='trains_k8stmpl_', suffix='.yml') + os.close(fp) + with open(yaml_file, 'wt') as f: + yaml.dump(template, f) + + kubectl_cmd = self.KUBECTL_APPLY_CMD.format( + task_id=task_id, + docker_image=docker_image, + queue_id=queue, + ) + # make sure we provide a list + if isinstance(kubectl_cmd, str): + kubectl_cmd = kubectl_cmd.split() + + # add the template file at the end + kubectl_cmd += [yaml_file] + try: + process = subprocess.Popen(kubectl_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + output, error = process.communicate() + except Exception as ex: + return None, str(ex) + finally: + safe_remove_file(yaml_file) + + return output, error + + def _kubectl_run(self, create_trains_conf, docker_image, labels, queue, task_data, task_id): + if callable(self.kubectl_cmd): + kubectl_cmd = self.kubectl_cmd(task_id, docker_image, queue, task_data) + else: + kubectl_cmd = self.kubectl_cmd.format( + task_id=task_id, + docker_image=docker_image, + queue_id=queue + ) + # make sure we provide a list + if isinstance(kubectl_cmd, str): + kubectl_cmd = kubectl_cmd.split() + + if self.overrides_json_string: + kubectl_cmd += ['--overrides=' + self.overrides_json_string] + + if self.pod_limits: + kubectl_cmd += ['--limits', ",".join(self.pod_limits)] + if self.pod_requests: + kubectl_cmd += ['--requests', ",".join(self.pod_requests)] + + kubectl_cmd += [ + "--labels=" + ",".join(labels), + "--command", + "--", + "/bin/sh", + "-c", + create_trains_conf + self.container_bash_script.format(task_id), + ] + process = subprocess.Popen(kubectl_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + output, error = process.communicate() + return output, error + def run_tasks_loop(self, queues: List[Text], worker_params, **kwargs): """ :summary: Pull and run tasks from queues. @@ -277,7 +395,7 @@ class K8sIntegration(Worker): while True: # iterate over queues (priority style, queues[0] is highest) for queue in queues: - # delete old completed /failed pods + # delete old completed / failed pods get_bash_output(self.KUBECTL_DELETE_CMD) # get next task in queue diff --git a/trains_agent/helper/dicts.py b/trains_agent/helper/dicts.py index e630937..649fb9d 100644 --- a/trains_agent/helper/dicts.py +++ b/trains_agent/helper/dicts.py @@ -3,3 +3,15 @@ from typing import Callable, Dict, Any def filter_keys(filter_, dct): # type: (Callable[[Any], bool], Dict) -> Dict return {key: value for key, value in dct.items() if filter_(key)} + + +def merge_dicts(dict1, dict2): + """ Recursively merges dict2 into dict1 """ + if not isinstance(dict1, dict) or not isinstance(dict2, dict): + return dict2 + for k in dict2: + if k in dict1: + dict1[k] = merge_dicts(dict1[k], dict2[k]) + else: + dict1[k] = dict2[k] + return dict1