Add extra_bash_init_script to k8s glue. Default config is the raw config file (not created at runtime)

This commit is contained in:
allegroai 2020-11-11 16:31:25 +02:00
parent a61265effe
commit 40b3c1502d
2 changed files with 43 additions and 10 deletions

View File

@ -52,7 +52,9 @@ def main():
k8s = K8sIntegration( k8s = K8sIntegration(
ports_mode=args.ports_mode, num_of_services=args.num_of_services, user_props_cb=user_props_cb, ports_mode=args.ports_mode, num_of_services=args.num_of_services, user_props_cb=user_props_cb,
overrides_yaml=args.overrides_yaml, trains_conf_file=args.pod_trains_conf, template_yaml=args.template_yaml) overrides_yaml=args.overrides_yaml, trains_conf_file=args.pod_trains_conf, template_yaml=args.template_yaml,
extra_bash_init_script=K8sIntegration.get_ssh_server_bash(ssh_port_number=10022)
)
k8s.k8s_daemon(args.queue) k8s.k8s_daemon(args.queue)

View File

@ -40,15 +40,32 @@ class K8sIntegration(Worker):
"--field-selector=status.phase!=Pending,status.phase!=Running " \ "--field-selector=status.phase!=Pending,status.phase!=Running " \
"--namespace=trains" "--namespace=trains"
BASH_INSTALL_SSH_CMD = \
"apt-get install -y openssh-server ; " \
"mkdir -p /var/run/sshd ; " \
"echo 'root:training' | chpasswd ; " \
"echo 'PermitRootLogin yes' >> /etc/ssh/sshd_config ; " \
"sed -i 's/PermitRootLogin prohibit-password/PermitRootLogin yes/' /etc/ssh/sshd_config ; " \
"sed 's@session\s*required\s*pam_loginuid.so@session optional pam_loginuid.so@g' -i /etc/pam.d/sshd ; " \
"echo 'AcceptEnv TRAINS_API_ACCESS_KEY TRAINS_API_SECRET_KEY' >> /etc/ssh/sshd_config ; " \
'echo "export VISIBLE=now" >> /etc/profile ; ' \
"/usr/sbin/sshd -p {port} & ; "
CONTAINER_BASH_SCRIPT = \ CONTAINER_BASH_SCRIPT = \
"export DEBIAN_FRONTEND='noninteractive'; " \ "export DEBIAN_FRONTEND='noninteractive'; " \
"echo 'Binary::apt::APT::Keep-Downloaded-Packages \"true\";' > /etc/apt/apt.conf.d/docker-clean ; " \ "echo 'Binary::apt::APT::Keep-Downloaded-Packages \"true\";' > /etc/apt/apt.conf.d/docker-clean ; " \
"chown -R root /root/.cache/pip ; " \ "chown -R root /root/.cache/pip ; " \
"apt-get update ; " \ "apt-get update ; " \
"apt-get install -y git libsm6 libxext6 libxrender-dev libglib2.0-0 ; " \ "apt-get install -y git libsm6 libxext6 libxrender-dev libglib2.0-0 ; " \
"(which python3 && python3 -m pip --version) || apt-get install -y python3-pip ; " \ "declare LOCAL_PYTHON ; " \
"python3 -m pip install trains-agent ; " \ "for i in {{10..5}}; do which python3.$i && " \
"python3 -m trains_agent execute --full-monitoring --require-queue --id {} ; " "python3.$i -m pip --version && " \
"export LOCAL_PYTHON=$(which python3.$i) && break ; done ; " \
"[ ! -z $LOCAL_PYTHON ] || apt-get install -y python3-pip ; " \
"[ ! -z $LOCAL_PYTHON ] || export LOCAL_PYTHON=python3 ; " \
"$LOCAL_PYTHON -m pip install trains-agent ; " \
"{extra_bash_init_cmd} ; " \
"$LOCAL_PYTHON -m trains_agent execute --full-monitoring --require-queue --id {task_id} ; "
AGENT_LABEL = "TRAINS=agent" AGENT_LABEL = "TRAINS=agent"
LIMIT_POD_LABEL = "ai.allegro.agent.serial=pod-{pod_number}" LIMIT_POD_LABEL = "ai.allegro.agent.serial=pod-{pod_number}"
@ -67,6 +84,7 @@ class K8sIntegration(Worker):
overrides_yaml=None, overrides_yaml=None,
template_yaml=None, template_yaml=None,
trains_conf_file=None, trains_conf_file=None,
extra_bash_init_script=None,
): ):
""" """
Initialize the k8s integration glue layer daemon Initialize the k8s integration glue layer daemon
@ -76,6 +94,8 @@ class K8sIntegration(Worker):
example: "task={task_id} image={docker_image} queue_id={queue_id}" example: "task={task_id} image={docker_image} queue_id={queue_id}"
or a callable function: kubectl_cmd(task_id, docker_image, queue_id, task_data) or a callable function: kubectl_cmd(task_id, docker_image, queue_id, task_data)
:param str container_bash_script: container bash script to be executed in k8s (default: CONTAINER_BASH_SCRIPT) :param str container_bash_script: container bash script to be executed in k8s (default: CONTAINER_BASH_SCRIPT)
Notice this string will use format() call, if you have curly brackets they should be doubled { -> {{
Format arguments passed: {task_id} and {extra_bash_init_cmd}
:param bool debug: Switch logging on :param bool debug: Switch logging on
:param bool ports_mode: Adds a label to each pod which can be used in services in order to expose ports. :param bool ports_mode: Adds a label to each pod which can be used in services in order to expose ports.
Requires the `num_of_services` parameter. Requires the `num_of_services` parameter.
@ -88,6 +108,7 @@ class K8sIntegration(Worker):
:param str template_yaml: YAML file containing the template for the pod (optional). :param str template_yaml: YAML file containing the template for the pod (optional).
If provided the pod is scheduled with kubectl apply and overrides are ignored, otherwise with kubectl run. If provided the pod is scheduled with kubectl apply and overrides are ignored, otherwise with kubectl run.
:param str trains_conf_file: trains.conf file to be use by the pod itself (optional) :param str trains_conf_file: trains.conf file to be use by the pod itself (optional)
:param str extra_bash_init_script: Additional bash script to run before starting the Task inside the container
""" """
super(K8sIntegration, self).__init__() super(K8sIntegration, self).__init__()
self.k8s_pending_queue_name = k8s_pending_queue_name or self.K8S_PENDING_QUEUE self.k8s_pending_queue_name = k8s_pending_queue_name or self.K8S_PENDING_QUEUE
@ -106,11 +127,14 @@ class K8sIntegration(Worker):
self.trains_conf_file = None self.trains_conf_file = None
self.overrides_json_string = None self.overrides_json_string = None
self.template_dict = None self.template_dict = None
self.extra_bash_init_script = extra_bash_init_script or None
if self.extra_bash_init_script and not isinstance(self.extra_bash_init_script, str):
self.extra_bash_init_script = ' ; '.join(self.extra_bash_init_script)
self.pod_limits = [] self.pod_limits = []
self.pod_requests = [] self.pod_requests = []
if overrides_yaml: if overrides_yaml:
with open(os.path.expandvars(os.path.expanduser(str(overrides_yaml))), 'rt') as f: with open(os.path.expandvars(os.path.expanduser(str(overrides_yaml))), 'rt') as f:
overrides = yaml.load(f) overrides = yaml.load(f, Loader=getattr(yaml, 'FullLoader', None))
if overrides: if overrides:
containers = overrides.get('spec', {}).get('containers', []) containers = overrides.get('spec', {}).get('containers', [])
for c in containers: for c in containers:
@ -132,7 +156,7 @@ class K8sIntegration(Worker):
self.overrides_json_string = json.dumps(overrides) self.overrides_json_string = json.dumps(overrides)
if template_yaml: if template_yaml:
with open(os.path.expandvars(os.path.expanduser(str(template_yaml))), 'rt') as f: with open(os.path.expandvars(os.path.expanduser(str(template_yaml))), 'rt') as f:
self.template_dict = yaml.load(f) self.template_dict = yaml.load(f, Loader=getattr(yaml, 'FullLoader', None))
if trains_conf_file: if trains_conf_file:
with open(os.path.expandvars(os.path.expanduser(str(trains_conf_file))), 'rt') as f: with open(os.path.expandvars(os.path.expanduser(str(trains_conf_file))), 'rt') as f:
@ -195,8 +219,8 @@ class K8sIntegration(Worker):
docker_args = docker_parts[1:] if len(docker_parts) > 1 else [] docker_args = docker_parts[1:] if len(docker_parts) > 1 else []
# get the trains.conf encoded file # get the trains.conf encoded file
hocon_config_encoded = ( # noinspection PyProtectedMember
self.trains_conf_file or HOCONConverter.to_hocon(self._session.config._config)).encode('ascii') hocon_config_encoded = (self.trains_conf_file or self._session._config_file).encode('ascii')
create_trains_conf = "echo '{}' | base64 --decode >> ~/trains.conf && ".format( create_trains_conf = "echo '{}' | base64 --decode >> ~/trains.conf && ".format(
base64.b64encode( base64.b64encode(
hocon_config_encoded hocon_config_encoded
@ -304,7 +328,9 @@ class K8sIntegration(Worker):
container, container,
dict(name=name, image=docker_image, dict(name=name, image=docker_image,
command=['/bin/sh'], command=['/bin/sh'],
args=['-c', create_trains_conf + self.container_bash_script.format(task_id)])) args=['-c', create_trains_conf + self.container_bash_script.format(
extra_bash_init_cmd=self.extra_bash_init_script,
task_id=task_id)]))
if template['spec']['containers']: if template['spec']['containers']:
template['spec']['containers'][0] = merge_dicts(template['spec']['containers'][0], container) template['spec']['containers'][0] = merge_dicts(template['spec']['containers'][0], container)
@ -364,7 +390,8 @@ class K8sIntegration(Worker):
"--", "--",
"/bin/sh", "/bin/sh",
"-c", "-c",
create_trains_conf + self.container_bash_script.format(task_id), create_trains_conf + self.container_bash_script.format(
extra_bash_init_cmd=self.extra_bash_init_script, task_id=task_id),
] ]
process = subprocess.Popen(kubectl_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) process = subprocess.Popen(kubectl_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate() output, error = process.communicate()
@ -443,3 +470,7 @@ class K8sIntegration(Worker):
:param list(str) queue: queue name to pull from :param list(str) queue: queue name to pull from
""" """
return self.daemon(queues=[ObjectID(name=queue)] if queue else None, log_level=logging.INFO, foreground=True, docker=False) return self.daemon(queues=[ObjectID(name=queue)] if queue else None, log_level=logging.INFO, foreground=True, docker=False)
@classmethod
def get_ssh_server_bash(cls, ssh_port_number):
return cls.BASH_INSTALL_SSH_CMD.format(port=ssh_port_number)