Remove support for kubectl run

Allow customizing pod name prefix and limit pod label
Return deleted pods from cleanup
Some refactoring
This commit is contained in:
allegroai 2022-12-05 11:40:19 +02:00
parent 3dd5973734
commit dc5e0033c8
2 changed files with 103 additions and 136 deletions

View File

@ -27,29 +27,25 @@ from clearml_agent.errors import APIError
from clearml_agent.glue.definitions import ENV_START_AGENT_SCRIPT_PATH from clearml_agent.glue.definitions import ENV_START_AGENT_SCRIPT_PATH
from clearml_agent.helper.base import safe_remove_file from clearml_agent.helper.base import safe_remove_file
from clearml_agent.helper.dicts import merge_dicts from clearml_agent.helper.dicts import merge_dicts
from clearml_agent.helper.process import get_bash_output from clearml_agent.helper.process import get_bash_output, stringify_bash_output
from clearml_agent.helper.resource_monitor import ResourceMonitor from clearml_agent.helper.resource_monitor import ResourceMonitor
from clearml_agent.interface.base import ObjectID from clearml_agent.interface.base import ObjectID
class K8sIntegration(Worker): class K8sIntegration(Worker):
K8S_PENDING_QUEUE = "k8s_scheduler" K8S_PENDING_QUEUE = "k8s_scheduler"
K8S_DEFAULT_NAMESPACE = "clearml" K8S_DEFAULT_NAMESPACE = "clearml"
AGENT_LABEL = "CLEARML=agent" AGENT_LABEL = "CLEARML=agent"
LIMIT_POD_LABEL = "ai.allegro.agent.serial=pod-{pod_number}"
KUBECTL_APPLY_CMD = "kubectl apply --namespace={namespace} -f" KUBECTL_APPLY_CMD = "kubectl apply --namespace={namespace} -f"
KUBECTL_RUN_CMD = "kubectl run clearml-id-{task_id} " \
"--image {docker_image} {docker_args} " \
"--restart=Never " \
"--namespace={namespace}"
KUBECTL_DELETE_CMD = "kubectl delete pods " \ KUBECTL_DELETE_CMD = "kubectl delete pods " \
"-l={agent_label} " \ "-l={agent_label} " \
"--field-selector=status.phase!=Pending,status.phase!=Running " \ "--field-selector=status.phase!=Pending,status.phase!=Running " \
"--namespace={namespace}" "--namespace={namespace} " \
"--output name"
BASH_INSTALL_SSH_CMD = [ BASH_INSTALL_SSH_CMD = [
"apt-get update", "apt-get update",
@ -86,12 +82,14 @@ class K8sIntegration(Worker):
"$LOCAL_PYTHON -m clearml_agent execute {default_execution_agent_args} --id {task_id}" "$LOCAL_PYTHON -m clearml_agent execute {default_execution_agent_args} --id {task_id}"
] ]
DEFAULT_POD_NAME_PREFIX = "clearml-id-"
DEFAULT_LIMIT_POD_LABEL = "ai.allegro.agent.serial=pod-{pod_number}"
_edit_hyperparams_version = "2.9" _edit_hyperparams_version = "2.9"
def __init__( def __init__(
self, self,
k8s_pending_queue_name=None, k8s_pending_queue_name=None,
kubectl_cmd=None,
container_bash_script=None, container_bash_script=None,
debug=False, debug=False,
ports_mode=False, ports_mode=False,
@ -104,15 +102,14 @@ class K8sIntegration(Worker):
extra_bash_init_script=None, extra_bash_init_script=None,
namespace=None, namespace=None,
max_pods_limit=None, max_pods_limit=None,
pod_name_prefix=None,
limit_pod_label=None,
**kwargs **kwargs
): ):
""" """
Initialize the k8s integration glue layer daemon Initialize the k8s integration glue layer daemon
:param str k8s_pending_queue_name: queue name to use when task is pending in the k8s scheduler :param str k8s_pending_queue_name: queue name to use when task is pending in the k8s scheduler
:param str|callable kubectl_cmd: kubectl command line str, supports formatting (default: KUBECTL_RUN_CMD)
example: "task={task_id} image={docker_image} queue_id={queue_id}"
or a callable function: kubectl_cmd(task_id, docker_image, docker_args, queue_id, task_data)
:param str container_bash_script: container bash script to be executed in k8s (default: CONTAINER_BASH_SCRIPT) :param str container_bash_script: container bash script to be executed in k8s (default: CONTAINER_BASH_SCRIPT)
Notice this string will use format() call, if you have curly brackets they should be doubled { -> {{ Notice this string will use format() call, if you have curly brackets they should be doubled { -> {{
Format arguments passed: {task_id} and {extra_bash_init_cmd} Format arguments passed: {task_id} and {extra_bash_init_cmd}
@ -134,9 +131,10 @@ class K8sIntegration(Worker):
:param int max_pods_limit: Maximum number of pods that K8S glue can run at the same time :param int max_pods_limit: Maximum number of pods that K8S glue can run at the same time
""" """
super(K8sIntegration, self).__init__() super(K8sIntegration, self).__init__()
self.pod_name_prefix = pod_name_prefix or self.DEFAULT_POD_NAME_PREFIX
self.limit_pod_label = limit_pod_label or self.DEFAULT_LIMIT_POD_LABEL
self.k8s_pending_queue_name = k8s_pending_queue_name or self.K8S_PENDING_QUEUE self.k8s_pending_queue_name = k8s_pending_queue_name or self.K8S_PENDING_QUEUE
self.k8s_pending_queue_id = None self.k8s_pending_queue_id = None
self.kubectl_cmd = kubectl_cmd or self.KUBECTL_RUN_CMD
self.container_bash_script = container_bash_script or self.CONTAINER_BASH_SCRIPT self.container_bash_script = container_bash_script or self.CONTAINER_BASH_SCRIPT
# Always do system packages, because by we will be running inside a docker # Always do system packages, because by we will be running inside a docker
self._session.config.put("agent.package_manager.system_site_packages", True) self._session.config.put("agent.package_manager.system_site_packages", True)
@ -160,27 +158,9 @@ class K8sIntegration(Worker):
self.pod_limits = [] self.pod_limits = []
self.pod_requests = [] self.pod_requests = []
self.max_pods_limit = max_pods_limit if not self.ports_mode else None self.max_pods_limit = max_pods_limit if not self.ports_mode else None
if overrides_yaml:
overrides = self._load_template_file(overrides_yaml) self._load_overrides_yaml(overrides_yaml)
if overrides:
containers = overrides.get('spec', {}).get('containers', [])
for c in containers:
resources = {str(k).lower(): v for k, v in c.get('resources', {}).items()}
if not resources:
continue
if resources.get('limits'):
self.pod_limits += ['{}={}'.format(k, v) for k, v in resources['limits'].items()]
if resources.get('requests'):
self.pod_requests += ['{}={}'.format(k, v) for k, v in resources['requests'].items()]
# remove double entries
self.pod_limits = list(set(self.pod_limits))
self.pod_requests = list(set(self.pod_requests))
if self.pod_limits or self.pod_requests:
self.log.warning('Found pod container requests={} limits={}'.format(
self.pod_limits, self.pod_requests))
if containers:
self.log.warning('Removing containers section: {}'.format(overrides['spec'].pop('containers')))
self.overrides_json_string = json.dumps(overrides)
if template_yaml: if template_yaml:
self.template_dict = self._load_template_file(template_yaml) self.template_dict = self._load_template_file(template_yaml)
@ -197,6 +177,31 @@ class K8sIntegration(Worker):
self._min_cleanup_interval_per_ns_sec = 1.0 self._min_cleanup_interval_per_ns_sec = 1.0
self._last_pod_cleanup_per_ns = defaultdict(lambda: 0.) self._last_pod_cleanup_per_ns = defaultdict(lambda: 0.)
def _load_overrides_yaml(self, overrides_yaml):
if not overrides_yaml:
return
overrides = self._load_template_file(overrides_yaml)
if not overrides:
return
containers = overrides.get('spec', {}).get('containers', [])
for c in containers:
resources = {str(k).lower(): v for k, v in c.get('resources', {}).items()}
if not resources:
continue
if resources.get('limits'):
self.pod_limits += ['{}={}'.format(k, v) for k, v in resources['limits'].items()]
if resources.get('requests'):
self.pod_requests += ['{}={}'.format(k, v) for k, v in resources['requests'].items()]
# remove double entries
self.pod_limits = list(set(self.pod_limits))
self.pod_requests = list(set(self.pod_requests))
if self.pod_limits or self.pod_requests:
self.log.warning('Found pod container requests={} limits={}'.format(
self.pod_limits, self.pod_requests))
if containers:
self.log.warning('Removing containers section: {}'.format(overrides['spec'].pop('containers')))
self.overrides_json_string = json.dumps(overrides)
def _monitor_hanging_pods(self): def _monitor_hanging_pods(self):
_check_pod_thread = Thread(target=self._monitor_hanging_pods_daemon) _check_pod_thread = Thread(target=self._monitor_hanging_pods_daemon)
_check_pod_thread.daemon = True _check_pod_thread.daemon = True
@ -216,9 +221,11 @@ class K8sIntegration(Worker):
except (IndexError, KeyError): except (IndexError, KeyError):
return default return default
def _get_kubectl_options(self, command, extra_labels=None, filters=None, output="json"): def _get_kubectl_options(self, command, extra_labels=None, filters=None, output="json", labels=None):
# type: (str, Iterable[str], Iterable[str], str) -> Dict # type: (str, Iterable[str], Iterable[str], str, Iterable[str]) -> Dict
labels = [self._get_agent_label()] + (list(extra_labels) if extra_labels else []) if not labels:
labels = [self._get_agent_label()]
labels = list(labels) + (list(extra_labels) if extra_labels else [])
d = { d = {
"-l": ",".join(labels), "-l": ",".join(labels),
"-n": str(self.namespace), "-n": str(self.namespace),
@ -240,8 +247,7 @@ class K8sIntegration(Worker):
while True: while True:
kubectl_cmd = self.get_kubectl_command("get pods", filters=["status.phase=Pending"]) kubectl_cmd = self.get_kubectl_command("get pods", filters=["status.phase=Pending"])
self.log.debug("Detecting hanging pods: {}".format(kubectl_cmd)) self.log.debug("Detecting hanging pods: {}".format(kubectl_cmd))
output = get_bash_output(kubectl_cmd) output = stringify_bash_output(get_bash_output(kubectl_cmd))
output = '' if not output else output if isinstance(output, str) else output.decode('utf-8')
try: try:
output_config = json.loads(output) output_config = json.loads(output)
except Exception as ex: except Exception as ex:
@ -380,8 +386,7 @@ class K8sIntegration(Worker):
output="jsonpath=\"{range .items[*]}{.metadata.name}{' '}{.metadata.namespace}{'\\n'}{end}\"" output="jsonpath=\"{range .items[*]}{.metadata.name}{' '}{.metadata.namespace}{'\\n'}{end}\""
) )
self.log.debug("Getting used pods: {}".format(kubectl_cmd)) self.log.debug("Getting used pods: {}".format(kubectl_cmd))
output = get_bash_output(kubectl_cmd, raise_error=True) output = stringify_bash_output(get_bash_output(kubectl_cmd, raise_error=True))
output = '' if not output else output if isinstance(output, str) else output.decode('utf-8')
if not output: if not output:
# No such pod exist so we can use the pod_number we found # No such pod exist so we can use the pod_number we found
@ -492,13 +497,13 @@ class K8sIntegration(Worker):
kubectl_cmd_new = self.get_kubectl_command( kubectl_cmd_new = self.get_kubectl_command(
"get pods", "get pods",
extra_labels=[self.LIMIT_POD_LABEL.format(pod_number=pod_number)] if self.ports_mode else None extra_labels=[self.limit_pod_label.format(pod_number=pod_number)] if self.ports_mode else None
) )
self.log.debug("Looking for a free pod/port: {}".format(kubectl_cmd_new)) self.log.debug("Looking for a free pod/port: {}".format(kubectl_cmd_new))
process = subprocess.Popen(kubectl_cmd_new.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) process = subprocess.Popen(kubectl_cmd_new.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate() output, error = process.communicate()
output = '' if not output else output if isinstance(output, str) else output.decode('utf-8') output = stringify_bash_output(output)
error = '' if not error else error if isinstance(error, str) else error.decode('utf-8') error = stringify_bash_output(error)
try: try:
items_count = len(json.loads(output).get("items", [])) items_count = len(json.loads(output).get("items", []))
@ -554,36 +559,38 @@ class K8sIntegration(Worker):
labels = self._get_pod_labels(queue, queue_name) labels = self._get_pod_labels(queue, queue_name)
if self.ports_mode: if self.ports_mode:
labels.append(self.LIMIT_POD_LABEL.format(pod_number=pod_number)) labels.append(self.limit_pod_label.format(pod_number=pod_number))
if self.ports_mode: if self.ports_mode:
print("Kubernetes scheduling task id={} on pod={} (pod_count={})".format(task_id, pod_number, pod_count)) print("Kubernetes scheduling task id={} on pod={} (pod_count={})".format(task_id, pod_number, pod_count))
else: else:
print("Kubernetes scheduling task id={}".format(task_id)) print("Kubernetes scheduling task id={}".format(task_id))
kubectl_kwargs = dict(
create_clearml_conf=create_clearml_conf,
labels=labels,
docker_image=container['image'],
docker_args=container['arguments'],
docker_bash=container.get('setup_shell_script'),
task_id=task_id,
queue=queue,
)
try: try:
template = self._resolve_template(task_session, task_data, queue) template = self._resolve_template(task_session, task_data, queue)
except Exception as ex: except Exception as ex:
print("ERROR: Failed resolving template (skipping): {}".format(ex)) print("ERROR: Failed resolving template (skipping): {}".format(ex))
return return
if template: try:
output, error = self._kubectl_apply(template=template, **kubectl_kwargs) namespace = template['metadata']['namespace'] or self.namespace
else: except (KeyError, TypeError, AttributeError):
output, error = self._kubectl_run(task_data=task_data, **kubectl_kwargs) namespace = self.namespace
if template:
output, error = self._kubectl_apply(
template=template,
pod_number=pod_number,
create_clearml_conf=create_clearml_conf,
labels=labels,
docker_image=container['image'],
docker_args=container['arguments'],
docker_bash=container.get('setup_shell_script'),
task_id=task_id,
queue=queue,
namespace=namespace,
)
error = '' if not error else (error if isinstance(error, str) else error.decode('utf-8'))
output = '' if not output else (output if isinstance(output, str) else output.decode('utf-8'))
print('kubectl output:\n{}\n{}'.format(error, output)) print('kubectl output:\n{}\n{}'.format(error, output))
if error: if error:
send_log = "Running kubectl encountered an error: {}".format(error) send_log = "Running kubectl encountered an error: {}".format(error)
@ -649,19 +656,22 @@ class K8sIntegration(Worker):
return results return results
def _kubectl_apply( def _kubectl_apply(
self, create_clearml_conf, docker_image, docker_args, docker_bash, labels, queue, task_id, template=None self,
create_clearml_conf,
docker_image,
docker_args,
docker_bash,
labels,
queue,
task_id,
namespace,
template=None,
pod_number=None
): ):
template = template or deepcopy(self.template_dict)
try:
namespace = template['metadata']['namespace'] or self.namespace
except (KeyError, TypeError, AttributeError):
namespace = self.namespace
template.setdefault('apiVersion', 'v1') template.setdefault('apiVersion', 'v1')
template['kind'] = 'Pod' template['kind'] = 'Pod'
template.setdefault('metadata', {}) template.setdefault('metadata', {})
name = 'clearml-id-{task_id}'.format(task_id=task_id) name = self.pod_name_prefix + str(task_id)
template['metadata']['name'] = name template['metadata']['name'] = name
template.setdefault('spec', {}) template.setdefault('spec', {})
template['spec'].setdefault('containers', []) template['spec'].setdefault('containers', [])
@ -751,81 +761,34 @@ class K8sIntegration(Worker):
finally: finally:
safe_remove_file(yaml_file) safe_remove_file(yaml_file)
return output, error return stringify_bash_output(output), stringify_bash_output(error)
def _kubectl_run(
self, create_clearml_conf, docker_image, docker_args, docker_bash, labels, queue, task_data, task_id
):
if callable(self.kubectl_cmd):
kubectl_cmd = self.kubectl_cmd(task_id, docker_image, docker_args, queue, task_data)
else:
kubectl_cmd = self.kubectl_cmd.format(
task_id=task_id,
docker_image=docker_image,
docker_args=" ".join(self._get_docker_args(
docker_args, flags={"-e", "--env"}, convert=lambda env: '--env={}'.format(env))
),
queue_id=queue,
namespace=self.namespace,
)
# make sure we provide a list
if isinstance(kubectl_cmd, str):
kubectl_cmd = kubectl_cmd.split()
if self.overrides_json_string:
kubectl_cmd += ['--overrides=' + self.overrides_json_string]
if self.pod_limits:
kubectl_cmd += ['--limits', ",".join(self.pod_limits)]
if self.pod_requests:
kubectl_cmd += ['--requests', ",".join(self.pod_requests)]
if self._docker_force_pull and not any(x.startswith("--image-pull-policy=") for x in kubectl_cmd):
kubectl_cmd += ["--image-pull-policy='always'"]
container_bash_script = [self.container_bash_script] if isinstance(self.container_bash_script, str) \
else self.container_bash_script
container_bash_script = [
line.format(
extra_bash_init_cmd=self.extra_bash_init_script or "",
extra_docker_bash_script=docker_bash or "",
task_id=task_id,
default_execution_agent_args=self.DEFAULT_EXECUTION_AGENT_ARGS,
agent_install_args=self.POD_AGENT_INSTALL_ARGS
)
for line in container_bash_script
]
kubectl_cmd += [
"--labels=" + ",".join(labels),
"--command",
"--",
"/bin/sh",
"-c",
"{} ; {}".format(
" ; ".join(create_clearml_conf or []),
' ; '.join(line for line in container_bash_script if line.strip())
),
]
process = subprocess.Popen(kubectl_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate()
return output, error
def _cleanup_old_pods(self, namespaces, extra_msg=None): def _cleanup_old_pods(self, namespaces, extra_msg=None):
# type: (Iterable[str], Optional[str]) -> None # type: (Iterable[str], Optional[str]) -> Dict[str, List[str]]
self.log.debug("Cleaning up pods") self.log.debug("Cleaning up pods")
deleted_pods = defaultdict(list)
for namespace in namespaces: for namespace in namespaces:
if time() - self._last_pod_cleanup_per_ns[namespace] < self._min_cleanup_interval_per_ns_sec: if time() - self._last_pod_cleanup_per_ns[namespace] < self._min_cleanup_interval_per_ns_sec:
# Do not try to cleanup the same namespace too quickly # Do not try to cleanup the same namespace too quickly
continue continue
kubectl_cmd = self.KUBECTL_DELETE_CMD.format(namespace=namespace, agent_label=self._get_agent_label()) kubectl_cmd = self.KUBECTL_DELETE_CMD.format(namespace=namespace, agent_label=self._get_agent_label())
self.log.debug("Deleting old/failed pods{}: {}".format( self.log.debug("Deleting old/failed pods{} for ns {}: {}".format(
extra_msg or "", extra_msg or "", namespace, kubectl_cmd
kubectl_cmd
)) ))
get_bash_output(kubectl_cmd) try:
self._last_pod_cleanup_per_ns[namespace] = time() res = get_bash_output(kubectl_cmd, raise_error=True)
lines = [
line for line in
(r.strip().rpartition("/")[-1] for r in res.splitlines())
if line.startswith(self.pod_name_prefix)
]
self.log.debug(" - deleted pod(s) %s", ", ".join(lines))
deleted_pods[namespace].extend(lines)
except Exception as ex:
self.log.error("Failed deleting old/failed pods for ns %s: %s", namespace, str(ex))
finally:
self._last_pod_cleanup_per_ns[namespace] = time()
return deleted_pods
def run_tasks_loop(self, queues: List[Text], worker_params, **kwargs): def run_tasks_loop(self, queues: List[Text], worker_params, **kwargs):
""" """

View File

@ -43,6 +43,10 @@ def get_bash_output(cmd, strip=False, stderr=subprocess.STDOUT, stdin=False, rai
return output if not strip or not output else output.strip() return output if not strip or not output else output.strip()
def stringify_bash_output(value):
return '' if not value else (value if isinstance(value, str) else value.decode('utf-8'))
def terminate_process(pid, timeout=10., ignore_zombie=True, include_children=False): def terminate_process(pid, timeout=10., ignore_zombie=True, include_children=False):
# noinspection PyBroadException # noinspection PyBroadException
try: try: