Add K8s glue support for limited number of services exposing ports

This commit is contained in:
allegroai 2020-10-20 14:17:30 +03:00
parent 89a3020c5e
commit 92fc8e838f
2 changed files with 122 additions and 19 deletions

View File

@ -0,0 +1,34 @@
"""
This example assumes you have preconfigured services with selectors in the form of
"ai.allegro.agent.serial=pod-<number>" and a targetPort of 10022.
The K8sIntegration component will label each pod accordingly.
"""
from argparse import ArgumentParser
from trains_agent.glue.k8s import K8sIntegration
def parse_args():
parser = ArgumentParser()
parser.add_argument(
"--queue", type=str, help="Queue to pull tasks from"
)
parser.add_argument(
"--ports-mode", action='store_true', default=False,
help="Ports-mode will add a label to the pod which can be used in services in order to expose ports"
)
parser.add_argument(
"--num-of-services", type=int, default=20,
help="Specify the number of k8s services to be used. Use only with ports-mode."
)
return parser.parse_args()
def main():
args = parse_args()
k8s = K8sIntegration(ports_mode=args.ports_mode, num_of_services=args.num_of_services)
k8s.k8s_daemon(args.queue)
if __name__ == "__main__":
main()

View File

@ -1,5 +1,6 @@
from __future__ import print_function, division, unicode_literals from __future__ import print_function, division, unicode_literals
import base64
import logging import logging
import os import os
import subprocess import subprocess
@ -12,39 +13,58 @@ from trains_agent.commands.events import Events
from trains_agent.commands.worker import Worker from trains_agent.commands.worker import Worker
from trains_agent.helper.process import get_bash_output from trains_agent.helper.process import get_bash_output
from trains_agent.helper.resource_monitor import ResourceMonitor from trains_agent.helper.resource_monitor import ResourceMonitor
from trains_agent.interface.base import ObjectID
class K8sIntegration(Worker): class K8sIntegration(Worker):
K8S_PENDING_QUEUE = "k8s_scheduler" K8S_PENDING_QUEUE = "k8s_scheduler"
KUBECTL_RUN_CMD = "kubectl run trains_id_{task_id} " \ KUBECTL_RUN_CMD = "kubectl run trains-id-{task_id} " \
"--image {docker_image} " \ "--image {docker_image} " \
"--restart=Never --replicas=1 " \ "--restart=Never --replicas=1 " \
"--generator=run-pod/v1" "--generator=run-pod/v1 " \
"--namespace=trains"
KUBECTL_DELETE_CMD = "kubectl delete pods " \ KUBECTL_DELETE_CMD = "kubectl delete pods " \
"--selector=TRAINS=agent " \ "--selector=TRAINS=agent " \
"--field-selector=status.phase!=Pending,status.phase!=Running" "--field-selector=status.phase!=Pending,status.phase!=Running " \
"--namespace=trains"
CONTAINER_BASH_SCRIPT = \ CONTAINER_BASH_SCRIPT = \
"export DEBIAN_FRONTEND='noninteractive'; " \
"echo 'Binary::apt::APT::Keep-Downloaded-Packages \"true\";' > /etc/apt/apt.conf.d/docker-clean ; " \ "echo 'Binary::apt::APT::Keep-Downloaded-Packages \"true\";' > /etc/apt/apt.conf.d/docker-clean ; " \
"chown -R root /root/.cache/pip ; " \ "chown -R root /root/.cache/pip ; " \
"apt-get update ; " \ "apt-get update ; " \
"apt-get install -y git libsm6 libxext6 libxrender-dev libglib2.0-0 ; " \ "apt-get install -y git libsm6 libxext6 libxrender-dev libglib2.0-0 ; " \
"(which python3 && python3 -m pip --version) || apt-get install -y python3-pip ; " \ "(which python3 && python3 -m pip --version) || apt-get install -y python3-pip ; " \
"python3 -m pip install trains-agent ; " \ "python3 -m pip install trains-agent ; " \
"python3 -m trains_agent execute --full-monitoring --require-queue --id {}" "python3 -m trains_agent execute --full-monitoring --require-queue --id {} ; "
def __init__(self, k8s_pending_queue_name=None, kubectl_cmd=None, container_bash_script=None, debug=False): AGENT_LABEL = "TRAINS=agent"
LIMIT_POD_LABEL = "ai.allegro.agent.serial=pod-{pod_number}"
def __init__(
self,
k8s_pending_queue_name=None,
kubectl_cmd=None,
container_bash_script=None,
debug=False,
ports_mode=False,
num_of_services=20,
):
""" """
Initialize the k8s integration glue layer daemon Initialize the k8s integration glue layer daemon
:param str k8s_pending_queue_name: queue name to use when task is pending in the k8s scheduler :param str k8s_pending_queue_name: queue name to use when task is pending in the k8s scheduler
:param str|callable kubectl_cmd: kubectl command line str, supports formating (default: KUBECTL_RUN_CMD) :param str|callable kubectl_cmd: kubectl command line str, supports formatting (default: KUBECTL_RUN_CMD)
example: "task={task_id} image={docker_image} queue_id={queue_id}" example: "task={task_id} image={docker_image} queue_id={queue_id}"
or a callable function: kubectl_cmd(task_id, docker_image, queue_id, task_data) or a callable function: kubectl_cmd(task_id, docker_image, queue_id, task_data)
:param str container_bash_script: container bash script to be executed in k8s (default: CONTAINER_BASH_SCRIPT) :param str container_bash_script: container bash script to be executed in k8s (default: CONTAINER_BASH_SCRIPT)
:param bool debug: Switch logging on :param bool debug: Switch logging on
:param bool ports_mode: Adds a label to each pod which can be used in services in order to expose ports.
Requires the `num_of_services` parameter.
:param int num_of_services: Number of k8s services configured in the cluster. Required if `port_mode` is True.
(default: 20)
""" """
super(K8sIntegration, self).__init__() super(K8sIntegration, self).__init__()
self.k8s_pending_queue_name = k8s_pending_queue_name or self.K8S_PENDING_QUEUE self.k8s_pending_queue_name = k8s_pending_queue_name or self.K8S_PENDING_QUEUE
@ -56,12 +76,15 @@ class K8sIntegration(Worker):
if debug: if debug:
self.log.logger.disabled = False self.log.logger.disabled = False
self.log.logger.setLevel(logging.INFO) self.log.logger.setLevel(logging.INFO)
self.ports_mode = ports_mode
self.num_of_services = num_of_services
def run_one_task(self, queue: Text, task_id: Text, worker_args=None): def run_one_task(self, queue: Text, task_id: Text, worker_args=None, **_):
task_data = self._session.api_client.tasks.get_all(id=[task_id])[0] task_data = self._session.api_client.tasks.get_all(id=[task_id])[0]
# push task into the k8s queue, so we have visibility on pending tasks in the k8s scheduler # push task into the k8s queue, so we have visibility on pending tasks in the k8s scheduler
try: try:
self._session.api_client.tasks.reset(task_id)
self._session.api_client.tasks.enqueue(task_id, queue=self.k8s_pending_queue_name, self._session.api_client.tasks.enqueue(task_id, queue=self.k8s_pending_queue_name,
status_reason='k8s pending scheduler') status_reason='k8s pending scheduler')
except Exception as e: except Exception as e:
@ -78,28 +101,74 @@ class K8sIntegration(Worker):
# take the first part, this is the docker image name (not arguments) # take the first part, this is the docker image name (not arguments)
docker_image = docker_image.split()[0] docker_image = docker_image.split()[0]
create_trains_conf = "echo '{}' >> ~/trains.conf && ".format( hocon_config_encoded = HOCONConverter.to_hocon(
HOCONConverter.to_hocon(self._session.config._config)) self._session.config._config
).encode('ascii')
create_trains_conf = "echo '{}' | base64 --decode >> ~/trains.conf && ".format(
base64.b64encode(
hocon_config_encoded
).decode('ascii')
)
if callable(self.kubectl_cmd): if callable(self.kubectl_cmd):
kubectl_cmd = self.kubectl_cmd(task_id, docker_image, queue, task_data) kubectl_cmd = self.kubectl_cmd(task_id, docker_image, queue, task_data)
else: else:
kubectl_cmd = self.kubectl_cmd.format(task_id=task_id, docker_image=docker_image, queue_id=queue) kubectl_cmd = self.kubectl_cmd.format(
task_id=task_id,
docker_image=docker_image,
queue_id=queue
)
# make sure we gave a list # Search for a free pod number
pod_number = 1
while self.ports_mode:
kubectl_cmd_new = "kubectl get pods -l {pod_label},{agent_label} -n trains".format(
pod_label=self.LIMIT_POD_LABEL.format(pod_number=pod_number),
agent_label=self.AGENT_LABEL
)
process = subprocess.Popen(kubectl_cmd_new.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate()
if not output:
# No such pod exist so we can use the pod_number we found
break
if pod_number >= self.num_of_services:
# All pod numbers are taken, exit
self.log.info(
"All k8s services are in use, task '{}' will be enqueued back to queue '{}'".format(
task_id, queue
)
)
self._session.api_client.tasks.reset(task_id)
self._session.api_client.tasks.enqueue(task_id, queue=queue)
return
pod_number += 1
# make sure we provide a list
if isinstance(kubectl_cmd, str): if isinstance(kubectl_cmd, str):
kubectl_cmd = kubectl_cmd.split() kubectl_cmd = kubectl_cmd.split()
kubectl_cmd += ["--labels=TRAINS=agent", "--command", "--", "/bin/sh", "-c", labels = [self.AGENT_LABEL]
create_trains_conf + self.container_bash_script.format(task_id)] message = "K8s scheduling experiment task id={}".format(task_id)
if self.ports_mode:
labels.insert(0, self.LIMIT_POD_LABEL.format(pod_number=pod_number))
message += " pod #{}".format(pod_number)
kubectl_cmd += [
"--labels=" + ",".join(labels),
"--command",
"--",
"/bin/sh",
"-c",
create_trains_conf + self.container_bash_script.format(task_id),
]
process = subprocess.Popen(kubectl_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) process = subprocess.Popen(kubectl_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate() output, error = process.communicate()
self.log.info("K8s scheduling experiment task id={}".format(task_id)) self.log.info(message)
if error: if error:
self.log.error("Running kubectl encountered an error: {}".format( self.log.error("Running kubectl encountered an error: {}".format(
error if isinstance(error, str) else error.decode())) error if isinstance(error, str) else error.decode()))
def run_tasks_loop(self, queues: List[Text], worker_params): def run_tasks_loop(self, queues: List[Text], worker_params, **kwargs):
""" """
:summary: Pull and run tasks from queues. :summary: Pull and run tasks from queues.
:description: 1. Go through ``queues`` by order. :description: 1. Go through ``queues`` by order.
@ -160,15 +229,15 @@ class K8sIntegration(Worker):
if self._session.config["agent.reload_config"]: if self._session.config["agent.reload_config"]:
self.reload_config() self.reload_config()
def k8s_daemon(self, queues): def k8s_daemon(self, queue):
""" """
Start the k8s Glue service. Start the k8s Glue service.
This service will be pulling tasks from *queues* and scheduling them for execution using kubectl. This service will be pulling tasks from *queue* and scheduling them for execution using kubectl.
Notice all scheduled tasks are pushed back into K8S_PENDING_QUEUE, Notice all scheduled tasks are pushed back into K8S_PENDING_QUEUE,
and popped when execution actually starts. This creates full visibility into the k8s scheduler. and popped when execution actually starts. This creates full visibility into the k8s scheduler.
Manually popping a task from the K8S_PENDING_QUEUE, Manually popping a task from the K8S_PENDING_QUEUE,
will cause the k8s scheduler to skip the execution once the scheduled tasks needs to be executed will cause the k8s scheduler to skip the execution once the scheduled tasks needs to be executed
:param list(str) queues: List of queue names to pull from :param list(str) queue: queue name to pull from
""" """
return self.daemon(queues=queues, log_level=logging.INFO, foreground=True, docker=False) return self.daemon(queues=[ObjectID(name=queue)], log_level=logging.INFO, foreground=True, docker=False)