Detect and delete "stuck" k8s pods k8s glue

This commit is contained in:
allegroai 2021-01-10 12:37:13 +02:00
parent 8fdb87f1f5
commit b7f87fb8d3
2 changed files with 75 additions and 20 deletions

View File

@ -1,18 +1,21 @@
from __future__ import print_function, division, unicode_literals
import base64
import functools
import json
import logging
import os
import re
import subprocess
import tempfile
from copy import deepcopy
import yaml
import json
from pathlib import Path
from threading import Thread
from time import sleep
from typing import Text, List
import yaml
from clearml_agent.commands.events import Events
from clearml_agent.commands.worker import Worker
from clearml_agent.definitions import ENV_DOCKER_IMAGE
@ -86,8 +89,9 @@ class K8sIntegration(Worker):
user_props_cb=None,
overrides_yaml=None,
template_yaml=None,
trains_conf_file=None,
clearml_conf_file=None,
extra_bash_init_script=None,
**kwargs
):
"""
Initialize the k8s integration glue layer daemon
@ -110,7 +114,7 @@ class K8sIntegration(Worker):
:param str overrides_yaml: YAML file containing the overrides for the pod (optional)
:param str template_yaml: YAML file containing the template for the pod (optional).
If provided the pod is scheduled with kubectl apply and overrides are ignored, otherwise with kubectl run.
:param str trains_conf_file: clearml.conf file to be use by the pod itself (optional)
:param str clearml_conf_file: clearml.conf file to be use by the pod itself (optional)
:param str extra_bash_init_script: Additional bash script to run before starting the Task inside the container
"""
super(K8sIntegration, self).__init__()
@ -127,7 +131,7 @@ class K8sIntegration(Worker):
self.num_of_services = num_of_services
self._edit_hyperparams_support = None
self._user_props_cb = user_props_cb
self.trains_conf_file = None
self.conf_file_content = None
self.overrides_json_string = None
self.template_dict = None
self.extra_bash_init_script = extra_bash_init_script or None
@ -161,11 +165,57 @@ class K8sIntegration(Worker):
with open(os.path.expandvars(os.path.expanduser(str(template_yaml))), 'rt') as f:
self.template_dict = yaml.load(f, Loader=getattr(yaml, 'FullLoader', None))
if trains_conf_file:
with open(os.path.expandvars(os.path.expanduser(str(trains_conf_file))), 'rt') as f:
self.trains_conf_file = f.read()
clearml_conf_file = clearml_conf_file or kwargs.get('trains_conf_file')
if clearml_conf_file:
with open(os.path.expandvars(os.path.expanduser(str(clearml_conf_file))), 'rt') as f:
self.conf_file_content = f.read()
# make sure we use system packages!
self.trains_conf_file += '\nagent.package_manager.system_site_packages=true\n'
self.conf_file_content += '\nagent.package_manager.system_site_packages=true\n'
self._monitor_hanging_pods()
def _monitor_hanging_pods(self):
_check_pod_thread = Thread(target=self._monitor_hanging_pods_daemon)
_check_pod_thread.daemon = True
_check_pod_thread.start()
def _monitor_hanging_pods_daemon(self):
while True:
output = get_bash_output('kubectl get pods -n clearml -o=JSON')
output = '' if not output else output if isinstance(output, str) else output.decode('utf-8')
try:
output_config = json.loads(output)
except Exception as ex:
self.log.warning('K8S Glue pods monitor: Failed parsing kubectl output:\n{}\nEx: {}'.format(output, ex))
sleep(self._polling_interval)
continue
pods = output_config.get('items', [])
for pod in pods:
try:
reason = functools.reduce(
lambda a, b: a[b], ('status', 'containerStatuses', 0, 'state', 'waiting', 'reason'), pod
)
except (IndexError, KeyError):
continue
if reason == 'ImagePullBackOff':
pod_name = pod.get('metadata', {}).get('name', None)
if pod_name:
task_id = pod_name.rpartition('-')[-1]
delete_pod_cmd = 'kubectl delete pods {} -n clearml'.format(pod_name)
get_bash_output(delete_pod_cmd)
try:
self._session.api_client.tasks.failed(
task=task_id,
status_reason="K8S glue error due to ImagePullBackOff",
status_message="Changed by K8S glue",
force=True
)
except Exception as ex:
self.log.warning(
'K8S Glue pods monitor: Failed deleting task "{}"\nEX: {}'.format(task_id, ex)
)
sleep(self._polling_interval)
def _set_task_user_properties(self, task_id: str, **properties: str):
if self._edit_hyperparams_support is not True:
@ -225,8 +275,11 @@ class K8sIntegration(Worker):
# get the clearml.conf encoded file
# noinspection PyProtectedMember
hocon_config_encoded = (self.trains_conf_file or self._session._config_file).encode('ascii')
create_trains_conf = "echo '{}' | base64 --decode >> ~/clearml.conf".format(
hocon_config_encoded = (
self.conf_file_content
or Path(self._session._config_file).read_text()
).encode("ascii")
create_clearml_conf = "echo '{}' | base64 --decode >> ~/clearml.conf".format(
base64.b64encode(
hocon_config_encoded
).decode('ascii')
@ -283,12 +336,12 @@ class K8sIntegration(Worker):
if self.template_dict:
output, error = self._kubectl_apply(
create_trains_conf=create_trains_conf,
create_clearml_conf=create_clearml_conf,
labels=labels, docker_image=docker_image, docker_args=docker_args,
task_id=task_id, queue=queue, queue_name=safe_queue_name)
else:
output, error = self._kubectl_run(
create_trains_conf=create_trains_conf,
create_clearml_conf=create_clearml_conf,
labels=labels, docker_image=docker_image,
task_data=task_data,
task_id=task_id, queue=queue, queue_name=safe_queue_name)
@ -297,7 +350,9 @@ class K8sIntegration(Worker):
output = '' if not output else (output if isinstance(output, str) else output.decode('utf-8'))
print('kubectl output:\n{}\n{}'.format(error, output))
if error:
self.log.error("Running kubectl encountered an error: {}".format(error))
send_log = "Running kubectl encountered an error: {}".format(error)
self.log.error(send_log)
self.send_logs(task_id, send_log.splitlines())
user_props = {"k8s-queue": str(queue_name)}
if self.ports_mode:
@ -330,7 +385,7 @@ class K8sIntegration(Worker):
self.log.warning('skipping docker argument {} (only -e --env supported)'.format(cmd))
return kube_args
def _kubectl_apply(self, create_trains_conf, docker_image, docker_args, labels, queue, task_id, queue_name):
def _kubectl_apply(self, create_clearml_conf, docker_image, docker_args, labels, queue, task_id, queue_name):
template = deepcopy(self.template_dict)
template.setdefault('apiVersion', 'v1')
template['kind'] = 'Pod'
@ -364,7 +419,7 @@ class K8sIntegration(Worker):
container,
dict(name=name, image=docker_image,
command=['/bin/bash'],
args=['-c', '{} ; {}'.format(create_trains_conf, create_init_script)])
args=['-c', '{} ; {}'.format(create_clearml_conf, create_init_script)])
)
if template['spec']['containers']:
@ -398,7 +453,7 @@ class K8sIntegration(Worker):
return output, error
def _kubectl_run(self, create_trains_conf, docker_image, labels, queue, task_data, task_id, queue_name):
def _kubectl_run(self, create_clearml_conf, docker_image, labels, queue, task_data, task_id, queue_name):
if callable(self.kubectl_cmd):
kubectl_cmd = self.kubectl_cmd(task_id, docker_image, queue, task_data, queue_name)
else:
@ -430,7 +485,7 @@ class K8sIntegration(Worker):
"--",
"/bin/sh",
"-c",
"{} ; {}".format(create_trains_conf, container_bash_script.format(
"{} ; {}".format(create_clearml_conf, container_bash_script.format(
extra_bash_init_cmd=self.extra_bash_init_script, task_id=task_id)),
]
process = subprocess.Popen(kubectl_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

View File

@ -64,7 +64,7 @@ def main():
k8s = K8sIntegration(
ports_mode=args.ports_mode, num_of_services=args.num_of_services, user_props_cb=user_props_cb,
overrides_yaml=args.overrides_yaml, trains_conf_file=args.pod_trains_conf, template_yaml=args.template_yaml,
overrides_yaml=args.overrides_yaml, clearml_conf_file=args.pod_clearml_conf, template_yaml=args.template_yaml,
extra_bash_init_script=K8sIntegration.get_ssh_server_bash(
ssh_port_number=args.ssh_server_port) if args.ssh_server_port else None
)