Add support for k8s jobs execution

Strip docker container obtained from task in k8s apply
This commit is contained in:
allegroai 2023-07-04 14:45:00 +03:00
parent 21d98afca5
commit 4c056a17b9
8 changed files with 584 additions and 233 deletions

View File

@ -12,6 +12,7 @@ import shlex
import shutil
import signal
import string
import socket
import subprocess
import sys
import traceback
@ -24,7 +25,7 @@ from functools import partial
from os.path import basename
from tempfile import mkdtemp, NamedTemporaryFile
from time import sleep, time
from typing import Text, Optional, Any, Tuple, List
from typing import Text, Optional, Any, Tuple, List, Dict, Mapping, Union
import attr
import six
@ -368,6 +369,8 @@ def get_task_container(session, task_id):
container = result.json()['data']['tasks'][0]['container'] if result.ok else {}
if container.get('arguments'):
container['arguments'] = shlex.split(str(container.get('arguments')).strip())
if container.get('image'):
container['image'] = container.get('image').strip()
except (ValueError, TypeError):
container = {}
else:
@ -635,6 +638,8 @@ class Worker(ServiceCommandSection):
_docker_fixed_user_cache = '/clearml_agent_cache'
_temp_cleanup_list = []
hostname_task_runtime_prop = "_exec_agent_hostname"
@property
def service(self):
""" Worker command service endpoint """
@ -850,6 +855,20 @@ class Worker(ServiceCommandSection):
# "Running task '{}'".format(task_id)
print(self._task_logging_start_message.format(task_id))
task_session = task_session or self._session
# noinspection PyBroadException
try:
res = task_session.send_request(
service='tasks', action='edit', method=Request.def_method,
json={
"task": task_id, "force": True, "runtime": {self.hostname_task_runtime_prop: socket.gethostname()}
},
)
if not res.ok:
raise Exception("failed setting runtime property")
except Exception as ex:
print("Warning: failed obtaining/setting hostname for task '{}': {}".format(task_id, ex))
# set task status to in_progress so we know it was popped from the queue
# noinspection PyBroadException
try:
@ -2352,6 +2371,7 @@ class Worker(ServiceCommandSection):
raise CommandFailedError("Cloning failed")
else:
# make sure this task is not stuck in an execution queue, it shouldn't have been, but just in case.
# noinspection PyBroadException
try:
res = self._session.api_client.tasks.dequeue(task=current_task.id)
if require_queue and res.meta.result_code != 200:

View File

@ -0,0 +1,15 @@
from threading import Thread
from clearml_agent.session import Session
class K8sDaemon(Thread):
def __init__(self, agent):
super(K8sDaemon, self).__init__(target=self.target)
self.daemon = True
self._agent = agent
self.log = agent.log
self._session: Session = agent._session
def target(self):
pass

View File

@ -0,0 +1,12 @@
class GetPodsError(Exception):
pass
class GetJobsError(Exception):
pass
class GetPodCountError(Exception):
pass

View File

@ -13,7 +13,6 @@ from collections import defaultdict, namedtuple
from copy import deepcopy
from pathlib import Path
from pprint import pformat
from threading import Thread
from time import sleep, time
from typing import Text, List, Callable, Any, Collection, Optional, Union, Iterable, Dict, Tuple, Set
@ -28,8 +27,11 @@ from clearml_agent.definitions import (
ENV_AGENT_GIT_PASS,
ENV_FORCE_SYSTEM_SITE_PACKAGES,
)
from clearml_agent.errors import APIError
from clearml_agent.errors import APIError, UsageError
from clearml_agent.glue.definitions import ENV_START_AGENT_SCRIPT_PATH
from clearml_agent.glue.errors import GetPodCountError
from clearml_agent.glue.utilities import get_path, get_bash_output
from clearml_agent.glue.pending_pods_daemon import PendingPodsDaemon
from clearml_agent.helper.base import safe_remove_file
from clearml_agent.helper.dicts import merge_dicts
from clearml_agent.helper.process import get_bash_output, stringify_bash_output
@ -38,6 +40,7 @@ from clearml_agent.interface.base import ObjectID
class K8sIntegration(Worker):
SUPPORTED_KIND = ("pod", "job")
K8S_PENDING_QUEUE = "k8s_scheduler"
K8S_DEFAULT_NAMESPACE = "clearml"
@ -46,12 +49,6 @@ class K8sIntegration(Worker):
KUBECTL_APPLY_CMD = "kubectl apply --namespace={namespace} -f"
KUBECTL_CLEANUP_DELETE_CMD = "kubectl delete pods " \
"-l={agent_label} " \
"--field-selector=status.phase!=Pending,status.phase!=Running " \
"--namespace={namespace} " \
"--output name"
BASH_INSTALL_SSH_CMD = [
"apt-get update",
"apt-get install -y openssh-server",
@ -136,6 +133,10 @@ class K8sIntegration(Worker):
:param int max_pods_limit: Maximum number of pods that K8S glue can run at the same time
"""
super(K8sIntegration, self).__init__()
self.kind = os.environ.get("CLEARML_K8S_GLUE_KIND", "pod").strip().lower()
if self.kind not in self.SUPPORTED_KIND:
raise UsageError(f"Kind '{self.kind}' not supported (expected {','.join(self.SUPPORTED_KIND)})")
self.using_jobs = self.kind == "job"
self.pod_name_prefix = pod_name_prefix or self.DEFAULT_POD_NAME_PREFIX
self.limit_pod_label = limit_pod_label or self.DEFAULT_LIMIT_POD_LABEL
self.k8s_pending_queue_name = k8s_pending_queue_name or self.K8S_PENDING_QUEUE
@ -180,11 +181,18 @@ class K8sIntegration(Worker):
self._agent_label = None
self._monitor_hanging_pods()
self._pending_pods_daemon = self._create_pending_pods_daemon(
cls_=PendingPodsDaemon,
polling_interval=self._polling_interval
)
self._pending_pods_daemon.start()
self._min_cleanup_interval_per_ns_sec = 1.0
self._last_pod_cleanup_per_ns = defaultdict(lambda: 0.)
def _create_pending_pods_daemon(self, cls_, **kwargs):
return cls_(agent=self, **kwargs)
def _load_overrides_yaml(self, overrides_yaml):
if not overrides_yaml:
return
@ -210,26 +218,33 @@ class K8sIntegration(Worker):
self.log.warning('Removing containers section: {}'.format(overrides['spec'].pop('containers')))
self.overrides_json_string = json.dumps(overrides)
def _monitor_hanging_pods(self):
_check_pod_thread = Thread(target=self._monitor_hanging_pods_daemon)
_check_pod_thread.daemon = True
_check_pod_thread.start()
@staticmethod
def _load_template_file(path):
with open(os.path.expandvars(os.path.expanduser(str(path))), 'rt') as f:
return yaml.load(f, Loader=getattr(yaml, 'FullLoader', None))
def _get_kubectl_options(self, command, extra_labels=None, filters=None, output="json", labels=None):
# type: (str, Iterable[str], Iterable[str], str, Iterable[str]) -> Dict
if not labels:
@staticmethod
def _get_path(d, *path, default=None):
try:
return functools.reduce(
lambda a, b: a[b], path, d
)
except (IndexError, KeyError):
return default
def _get_kubectl_options(self, command, extra_labels=None, filters=None, output="json", labels=None, ns=None):
# type: (str, Iterable[str], Iterable[str], str, Iterable[str], str) -> Dict
if labels is False:
labels = []
elif not labels:
labels = [self._get_agent_label()]
labels = list(labels) + (list(extra_labels) if extra_labels else [])
d = {
"-l": ",".join(labels),
"-n": str(self.namespace),
"-n": ns or str(self.namespace),
"-o": output,
}
if labels:
d["-l"] = ",".join(labels)
if filters:
d["--field-selector"] = ",".join(filters)
return d
@ -240,132 +255,6 @@ class K8sIntegration(Worker):
command=command, opts=" ".join(x for item in opts.items() for x in item)
)
def _monitor_hanging_pods_daemon(self):
last_tasks_msgs = {} # last msg updated for every task
while True:
kubectl_cmd = self.get_kubectl_command("get pods", filters=["status.phase=Pending"])
self.log.debug("Detecting hanging pods: {}".format(kubectl_cmd))
output = stringify_bash_output(get_bash_output(kubectl_cmd))
try:
output_config = json.loads(output)
except Exception as ex:
self.log.warning('K8S Glue pods monitor: Failed parsing kubectl output:\n{}\nEx: {}'.format(output, ex))
sleep(self._polling_interval)
continue
pods = output_config.get('items', [])
task_id_to_details = dict()
for pod in pods:
pod_name = pod.get('metadata', {}).get('name', None)
if not pod_name:
continue
task_id = pod_name.rpartition('-')[-1]
if not task_id:
continue
namespace = pod.get('metadata', {}).get('namespace', None)
if not namespace:
continue
task_id_to_details[task_id] = (pod_name, namespace)
msg = None
waiting = self._get_path(pod, 'status', 'containerStatuses', 0, 'state', 'waiting')
if not waiting:
condition = self._get_path(pod, 'status', 'conditions', 0)
if condition:
reason = condition.get('reason')
if reason == 'Unschedulable':
message = condition.get('message')
msg = reason + (" ({})".format(message) if message else "")
else:
reason = waiting.get("reason", None)
message = waiting.get("message", None)
msg = reason + (" ({})".format(message) if message else "")
if reason == 'ImagePullBackOff':
delete_pod_cmd = 'kubectl delete pods {} -n {}'.format(pod_name, namespace)
self.log.debug(" - deleting pod due to ImagePullBackOff: {}".format(delete_pod_cmd))
get_bash_output(delete_pod_cmd)
try:
self.log.debug(" - Detecting hanging pods: {}".format(kubectl_cmd))
self._session.api_client.tasks.failed(
task=task_id,
status_reason="K8S glue error: {}".format(msg),
status_message="Changed by K8S glue",
force=True
)
except Exception as ex:
self.log.warning(
'K8S Glue pods monitor: Failed deleting task "{}"\nEX: {}'.format(task_id, ex)
)
# clean up any msg for this task
last_tasks_msgs.pop(task_id, None)
continue
if msg and last_tasks_msgs.get(task_id, None) != msg:
try:
result = self._session.send_request(
service='tasks',
action='update',
json={"task": task_id, "status_message": "K8S glue status: {}".format(msg)},
method=Request.def_method,
async_enable=False,
)
if not result.ok:
result_msg = self._get_path(result.json(), 'meta', 'result_msg')
raise Exception(result_msg or result.text)
# update last msg for this task
last_tasks_msgs[task_id] = msg
except Exception as ex:
self.log.warning(
'K8S Glue pods monitor: Failed setting status message for task "{}"\nMSG: {}\nEX: {}'.format(
task_id, msg, ex
)
)
if task_id_to_details:
try:
result = self._session.get(
service='tasks',
action='get_all',
json={"id": list(task_id_to_details), "status": ["stopped"], "only_fields": ["id"]},
method=Request.def_method,
async_enable=False,
)
aborted_task_ids = list(filter(None, (task.get("id") for task in result["tasks"])))
for task_id in aborted_task_ids:
pod_name, namespace = task_id_to_details.get(task_id)
if not pod_name:
self.log.error("Failed locating aborted task {} in pending pods list".format(task_id))
continue
self.log.info(
"K8S Glue pods monitor: task {} was aborted by its pod {} is still pending, "
"deleting pod".format(task_id, pod_name)
)
kubectl_cmd = "kubectl delete pod {pod_name} --output name {namespace}".format(
namespace=f"--namespace={namespace}" if namespace else "", pod_name=pod_name,
).strip()
self.log.debug("Deleting aborted task pending pod: {}".format(kubectl_cmd))
output = stringify_bash_output(get_bash_output(kubectl_cmd))
if not output:
self.log.warning("K8S Glue pods monitor: failed deleting pod {}".format(pod_name))
except Exception as ex:
self.log.warning(
'K8S Glue pods monitor: failed checking aborted tasks for hanging pods: {}'.format(ex)
)
# clean up any last message for a task that wasn't seen as a pod
last_tasks_msgs = {k: v for k, v in last_tasks_msgs.items() if k in task_id_to_details}
sleep(self._polling_interval)
def _set_task_user_properties(self, task_id: str, task_session=None, **properties: str):
session = task_session or self._session
if self._edit_hyperparams_support is not True:
@ -465,6 +354,69 @@ class K8sIntegration(Worker):
except Exception as ex:
print("ERROR: Failed getting tenant for task session: {}".format(ex))
def get_jobs_info(self, info_path: str, condition: str = None, namespace=None, debug_msg: str = None)\
-> Dict[str, str]:
cond = "==".join((x.strip("=") for x in condition.partition("=")[::2]))
output = f"jsonpath='{{range .items[?(@.{cond})]}}{{@.{info_path}}}{{\" \"}}{{@.metadata.namespace}}{{\"\\n\"}}{{end}}'"
kubectl_cmd = self.get_kubectl_command("get job", output=output, ns=namespace)
if debug_msg:
self.log.debug(debug_msg.format(cmd=kubectl_cmd))
output = stringify_bash_output(get_bash_output(kubectl_cmd))
output = output.strip("'") # for Windows debugging :(
try:
data_items = dict(l.strip().partition(" ")[::2] for l in output.splitlines())
return data_items
except Exception as ex:
self.log.warning('Failed parsing kubectl output:\n{}\nEx: {}'.format(output, ex))
def get_pods_for_jobs(self, job_condition: str = None, pod_filters: List[str] = None, debug_msg: str = None):
controller_uids = self.get_jobs_info(
"spec.selector.matchLabels.controller-uid", condition=job_condition, debug_msg=debug_msg
)
if not controller_uids:
# No pods were found for these jobs
return []
pods = self.get_pods(filters=pod_filters, debug_msg=debug_msg)
return [
pod for pod in pods
if get_path(pod, "metadata", "labels", "controller-uid") in controller_uids
]
def get_pods(self, filters: List[str] = None, debug_msg: str = None):
kubectl_cmd = self.get_kubectl_command(
"get pods",
filters=filters,
labels=False if self.using_jobs else None,
)
if debug_msg:
self.log.debug(debug_msg.format(cmd=kubectl_cmd))
output = stringify_bash_output(get_bash_output(kubectl_cmd))
try:
output_config = json.loads(output)
except Exception as ex:
self.log.warning('Failed parsing kubectl output:\n{}\nEx: {}'.format(output, ex))
return
return output_config.get('items', [])
def _get_pod_count(self, extra_labels: List[str] = None, msg: str = None):
kubectl_cmd_new = self.get_kubectl_command(
f"get {self.kind}s",
extra_labels= extra_labels
)
self.log.debug("{}{}".format((msg + ": ") if msg else "", kubectl_cmd_new))
process = subprocess.Popen(kubectl_cmd_new.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate()
output = stringify_bash_output(output)
error = stringify_bash_output(error)
try:
return len(json.loads(output).get("items", []))
except (ValueError, TypeError) as ex:
self.log.warning(
"K8S Glue pods monitor: Failed parsing kubectl output:\n{}\nEx: {}".format(output, ex)
)
raise GetPodCountError()
def run_one_task(self, queue: Text, task_id: Text, worker_args=None, task_session=None, **_):
print('Pulling task {} launching on kubernetes cluster'.format(task_id))
session = task_session or self._session
@ -476,6 +428,12 @@ class K8sIntegration(Worker):
print('Pushing task {} into temporary pending queue'.format(task_id))
_ = session.api_client.tasks.stop(task_id, force=True)
# Just make sure to clean up in case the task is stuck in the queue (known issue)
self._session.api_client.queues.remove_task(
task=task_id,
queue=self.k8s_pending_queue_id,
)
res = self._session.api_client.tasks.enqueue(
task_id,
queue=self.k8s_pending_queue_id,
@ -515,14 +473,14 @@ class K8sIntegration(Worker):
hocon_config_encoded = config_content.encode("ascii")
create_clearml_conf = ["echo '{}' | base64 --decode >> ~/clearml.conf".format(
clearml_conf_create_script = ["echo '{}' | base64 --decode >> ~/clearml.conf".format(
base64.b64encode(
hocon_config_encoded
).decode('ascii')
)]
if task_session:
create_clearml_conf.append(
clearml_conf_create_script.append(
"export CLEARML_AUTH_TOKEN=$(echo '{}' | base64 --decode)".format(
base64.b64encode(task_session.token.encode("ascii")).decode('ascii')
)
@ -543,23 +501,15 @@ class K8sIntegration(Worker):
while self.ports_mode or self.max_pods_limit:
pod_number = self.base_pod_num + pod_count
kubectl_cmd_new = self.get_kubectl_command(
"get pods",
extra_labels=[self.limit_pod_label.format(pod_number=pod_number)] if self.ports_mode else None
)
self.log.debug("Looking for a free pod/port: {}".format(kubectl_cmd_new))
process = subprocess.Popen(kubectl_cmd_new.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate()
output = stringify_bash_output(output)
error = stringify_bash_output(error)
try:
items_count = len(json.loads(output).get("items", []))
except (ValueError, TypeError) as ex:
items_count = self._get_pod_count(
extra_labels=[self.limit_pod_label.format(pod_number=pod_number)] if self.ports_mode else None,
msg="Looking for a free pod/port"
)
except GetPodCountError:
self.log.warning(
"K8S Glue pods monitor: Failed parsing kubectl output:\n{}\ntask '{}' "
"will be enqueued back to queue '{}'\nEx: {}".format(
output, task_id, queue, ex
"K8S Glue pods monitor: task '{}' will be enqueued back to queue '{}'".format(
task_id, queue
)
)
session.api_client.tasks.stop(task_id, force=True)
@ -583,8 +533,6 @@ class K8sIntegration(Worker):
if current_pod_count >= max_count:
# All pods are taken, exit
self.log.debug(
"kubectl last result: {}\n{}".format(error, output))
self.log.warning(
"All k8s services are in use, task '{}' "
"will be enqueued back to queue '{}'".format(
@ -629,7 +577,7 @@ class K8sIntegration(Worker):
output, error = self._kubectl_apply(
template=template,
pod_number=pod_number,
create_clearml_conf=create_clearml_conf,
clearml_conf_create_script=clearml_conf_create_script,
labels=labels,
docker_image=container['image'],
docker_args=container['arguments'],
@ -639,11 +587,11 @@ class K8sIntegration(Worker):
namespace=namespace,
)
print('kubectl output:\n{}\n{}'.format(error, output))
if error:
send_log = "Running kubectl encountered an error: {}".format(error)
self.log.error(send_log)
self.send_logs(task_id, send_log.splitlines())
print('kubectl output:\n{}\n{}'.format(error, output))
if error:
send_log = "Running kubectl encountered an error: {}".format(error)
self.log.error(send_log)
self.send_logs(task_id, send_log.splitlines())
user_props = {"k8s-queue": str(queue_name)}
if self.ports_mode:
@ -704,32 +652,10 @@ class K8sIntegration(Worker):
return {target: results} if results else {}
return results
def _kubectl_apply(
self,
create_clearml_conf,
docker_image,
docker_args,
docker_bash,
labels,
queue,
task_id,
namespace,
template=None,
pod_number=None
):
template.setdefault('apiVersion', 'v1')
template['kind'] = 'Pod'
template.setdefault('metadata', {})
name = self.pod_name_prefix + str(task_id)
template['metadata']['name'] = name
template.setdefault('spec', {})
template['spec'].setdefault('containers', [])
template['spec'].setdefault('restartPolicy', 'Never')
if labels:
labels_dict = dict(pair.split('=', 1) for pair in labels)
template['metadata'].setdefault('labels', {})
template['metadata']['labels'].update(labels_dict)
def _create_template_container(
self, pod_name: str, task_id: str, docker_image: str, docker_args: List[str],
docker_bash: str, clearml_conf_create_script: List[str]
) -> dict:
container = self._get_docker_args(
docker_args,
target="env",
@ -753,7 +679,7 @@ class K8sIntegration(Worker):
agent_install_args=self.POD_AGENT_INSTALL_ARGS)
for line in container_bash_script])
extra_bash_commands = list(create_clearml_conf or [])
extra_bash_commands = list(clearml_conf_create_script or [])
start_agent_script_path = ENV_START_AGENT_SCRIPT_PATH.get() or "~/__start_agent__.sh"
@ -767,20 +693,77 @@ class K8sIntegration(Worker):
)
# Notice: we always leave with exit code 0, so pods are never restarted
container = self._merge_containers(
return self._merge_containers(
container,
dict(name=name, image=docker_image,
dict(name=pod_name, image=docker_image,
command=['/bin/bash'],
args=['-c', '{} ; exit 0'.format(' ; '.join(extra_bash_commands))])
)
if template['spec']['containers']:
template['spec']['containers'][0] = self._merge_containers(template['spec']['containers'][0], container)
def _kubectl_apply(
self,
clearml_conf_create_script: List[str],
docker_image,
docker_args,
docker_bash,
labels,
queue,
task_id,
namespace,
template=None,
pod_number=None
):
if "apiVersion" not in template:
template["apiVersion"] = "batch/v1" if self.using_jobs else "v1"
if "kind" in template:
if template["kind"].lower() != self.kind:
return (
"", f"Template kind {template['kind']} does not maych kind {self.kind.capitalize()} set for agent"
)
else:
template['spec']['containers'].append(container)
template["kind"] = self.kind.capitalize()
metadata = template.setdefault('metadata', {})
name = self.pod_name_prefix + str(task_id)
metadata['name'] = name
def place_labels(metadata_dict):
labels_dict = dict(pair.split('=', 1) for pair in labels)
metadata_dict.setdefault('labels', {}).update(labels_dict)
if labels:
# Place labels on base resource (job or single pod)
place_labels(metadata)
spec = template.setdefault('spec', {})
if self.using_jobs:
spec.setdefault('backoffLimit', 0)
spec_template = spec.setdefault('template', {})
if labels:
# Place same labels fro any pod spawned by the job
place_labels(spec_template.setdefault('metadata', {}))
spec = spec_template.setdefault('spec', {})
containers = spec.setdefault('containers', [])
spec.setdefault('restartPolicy', 'Never')
container = self._create_template_container(
pod_name=name,
task_id=task_id,
docker_image=docker_image,
docker_args=docker_args,
docker_bash=docker_bash,
clearml_conf_create_script=clearml_conf_create_script
)
if containers:
containers[0] = self._merge_containers(containers[0], container)
else:
containers.append(container)
if self._docker_force_pull:
for c in template['spec']['containers']:
for c in containers:
c.setdefault('imagePullPolicy', 'Always')
fp, yaml_file = tempfile.mkstemp(prefix='clearml_k8stmpl_', suffix='.yml')
@ -812,6 +795,83 @@ class K8sIntegration(Worker):
return stringify_bash_output(output), stringify_bash_output(error)
def _process_bash_lines_response(self, bash_cmd: str, raise_error=True):
res = get_bash_output(bash_cmd, raise_error=raise_error)
lines = [
line for line in
(r.strip().rpartition("/")[-1] for r in res.splitlines())
if line.startswith(self.pod_name_prefix)
]
return lines
def _delete_pods(self, selectors: List[str], namespace: str, msg: str = None) -> List[str]:
kubectl_cmd = \
"kubectl delete pod -l={agent_label} " \
"--namespace={namespace} --field-selector={selector} --output name".format(
selector=",".join(selectors),
agent_label=self._get_agent_label(),
namespace=namespace,
)
self.log.debug("Deleting old/failed pods{} for ns {}: {}".format(
msg or "", namespace, kubectl_cmd
))
lines = self._process_bash_lines_response(kubectl_cmd)
self.log.debug(" - deleted pods %s", ", ".join(lines))
return lines
def _delete_jobs_by_names(self, names_to_ns: Dict[str, str], msg: str = None) -> List[str]:
if not names_to_ns:
return []
ns_to_names = defaultdict(list)
for name, ns in names_to_ns.items():
ns_to_names[ns].append(name)
results = []
for ns, names in ns_to_names.items():
kubectl_cmd = "kubectl delete job --namespace={ns} --output=name {names}".format(
ns=ns, names=" ".join(names)
)
self.log.debug("Deleting jobs {}: {}".format(
msg or "", kubectl_cmd
))
lines = self._process_bash_lines_response(kubectl_cmd)
if not lines:
continue
self.log.debug(" - deleted jobs %s", ", ".join(lines))
results.extend(lines)
return results
def _delete_completed_or_failed_pods(self, namespace, msg: str = None):
if not self.using_jobs:
return self._delete_pods(
selectors=["status.phase!=Pending", "status.phase!=Running"], namespace=namespace, msg=msg
)
job_names_to_delete = {}
# locate failed pods for jobs
failed_pods = self.get_pods_for_jobs(
job_condition="status.active=1",
pod_filters=["status.phase!=Pending", "status.phase!=Running", "status.phase!=Terminating"],
debug_msg="Deleting failed pods: {cmd}"
)
if failed_pods:
job_names_to_delete = {
get_path(pod, "metadata", "labels", "job-name"): get_path(pod, "metadata", "namespace")
for pod in failed_pods
if get_path(pod, "metadata", "labels", "job-name")
}
self.log.debug(f" - found jobs with failed pods: {' '.join(job_names_to_delete)}")
completed_job_names = self.get_jobs_info(
"metadata.name", condition="status.succeeded=1", namespace=namespace, debug_msg=msg
)
if completed_job_names:
self.log.debug(f" - found completed jobs: {' '.join(completed_job_names)}")
job_names_to_delete.update(completed_job_names)
return self._delete_jobs_by_names(names_to_ns=job_names_to_delete, msg=msg)
def _cleanup_old_pods(self, namespaces, extra_msg=None):
# type: (Iterable[str], Optional[str]) -> Dict[str, List[str]]
self.log.debug("Cleaning up pods")
@ -820,23 +880,12 @@ class K8sIntegration(Worker):
if time() - self._last_pod_cleanup_per_ns[namespace] < self._min_cleanup_interval_per_ns_sec:
# Do not try to cleanup the same namespace too quickly
continue
kubectl_cmd = self.KUBECTL_CLEANUP_DELETE_CMD.format(
namespace=namespace, agent_label=self._get_agent_label()
)
self.log.debug("Deleting old/failed pods{} for ns {}: {}".format(
extra_msg or "", namespace, kubectl_cmd
))
try:
res = get_bash_output(kubectl_cmd, raise_error=True)
lines = [
line for line in
(r.strip().rpartition("/")[-1] for r in res.splitlines())
if line.startswith(self.pod_name_prefix)
]
self.log.debug(" - deleted pod(s) %s", ", ".join(lines))
deleted_pods[namespace].extend(lines)
res = self._delete_completed_or_failed_pods(namespace, extra_msg)
deleted_pods[namespace].extend(res)
except Exception as ex:
self.log.error("Failed deleting old/failed pods for ns %s: %s", namespace, str(ex))
self.log.error("Failed deleting completed/failed pods for ns %s: %s", namespace, str(ex))
finally:
self._last_pod_cleanup_per_ns[namespace] = time()
@ -857,7 +906,7 @@ class K8sIntegration(Worker):
)
tasks_to_abort = result["tasks"]
except Exception as ex:
self.log.warning('Failed getting running tasks for deleted pods: {}'.format(ex))
self.log.warning('Failed getting running tasks for deleted {}(s): {}'.format(self.kind, ex))
for task in tasks_to_abort:
task_id = task.get("id")
@ -870,15 +919,27 @@ class K8sIntegration(Worker):
self._session.get(
service='tasks',
action='dequeue',
json={"task": task_id, "force": True, "status_reason": "Pod deleted (not pending or running)",
"status_message": "Pod deleted by agent {}".format(self.worker_id or "unknown")},
json={
"task": task_id,
"force": True,
"status_reason": "Pod deleted (not pending or running)",
"status_message": "{} deleted by agent {}".format(
self.kind.capitalize(), self.worker_id or "unknown"
)
},
method=Request.def_method,
)
self._session.get(
service='tasks',
action='failed',
json={"task": task_id, "force": True, "status_reason": "Pod deleted (not pending or running)",
"status_message": "Pod deleted by agent {}".format(self.worker_id or "unknown")},
json={
"task": task_id,
"force": True,
"status_reason": "Pod deleted (not pending or running)",
"status_message": "{} deleted by agent {}".format(
self.kind.capitalize(), self.worker_id or "unknown"
)
},
method=Request.def_method,
)
except Exception as ex:
@ -919,10 +980,10 @@ class K8sIntegration(Worker):
# check if have pod limit, then check if we hit it.
if self.max_pods_limit:
if current_pods >= self.max_pods_limit:
print("Maximum pod limit reached {}/{}, sleeping for {:.1f} seconds".format(
current_pods, self.max_pods_limit, self._polling_interval))
print("Maximum {} limit reached {}/{}, sleeping for {:.1f} seconds".format(
self.kind, current_pods, self.max_pods_limit, self._polling_interval))
# delete old completed / failed pods
self._cleanup_old_pods(namespaces, " due to pod limit")
self._cleanup_old_pods(namespaces, f" due to {self.kind} limit")
# go to sleep
sleep(self._polling_interval)
continue
@ -930,7 +991,7 @@ class K8sIntegration(Worker):
# iterate over queues (priority style, queues[0] is highest)
for queue in queues:
# delete old completed / failed pods
self._cleanup_old_pods(namespaces)
self._cleanup_old_pods(namespaces, extra_msg="Cleanup cycle {cmd}")
# get next task in queue
try:

View File

@ -0,0 +1,223 @@
from time import sleep
from typing import Dict, Tuple, Optional, List
from clearml_agent.backend_api.session import Request
from clearml_agent.glue.utilities import get_bash_output
from clearml_agent.helper.process import stringify_bash_output
from .daemon import K8sDaemon
from .utilities import get_path
from .errors import GetPodsError
class PendingPodsDaemon(K8sDaemon):
def __init__(self, polling_interval: float, agent):
super(PendingPodsDaemon, self).__init__(agent=agent)
self._polling_interval = polling_interval
self._last_tasks_msgs = {} # last msg updated for every task
def get_pods(self):
if self._agent.using_jobs:
return self._agent.get_pods_for_jobs(
job_condition="status.active=1",
pod_filters=["status.phase=Pending"],
debug_msg="Detecting pending pods: {cmd}"
)
return self._agent.get_pods(
filters=["status.phase=Pending"],
debug_msg="Detecting pending pods: {cmd}"
)
def _get_pod_name(self, pod: dict):
return get_path(pod, "metadata", "name")
def _get_k8s_resource_name(self, pod: dict):
if self._agent.using_jobs:
return get_path(pod, "metadata", "labels", "job-name")
return get_path(pod, "metadata", "name")
def _get_task_id(self, pod: dict):
return self._get_k8s_resource_name(pod).rpartition('-')[-1]
@staticmethod
def _get_k8s_resource_namespace(pod: dict):
return pod.get('metadata', {}).get('namespace', None)
def target(self):
"""
Handle pending objects (pods or jobs, depending on the agent mode).
- Delete any pending objects that are not expected to recover
- Delete any pending objects for whom the associated task was aborted
"""
while True:
# noinspection PyBroadException
try:
# Get pods (standalone pods if we're in pods mode, or pods associated to jobs if we're in jobs mode)
pods = self.get_pods()
if pods is None:
raise GetPodsError()
task_id_to_pod = dict()
for pod in pods:
pod_name = self._get_pod_name(pod)
if not pod_name:
continue
task_id = self._get_task_id(pod)
if not task_id:
continue
namespace = self._get_k8s_resource_namespace(pod)
if not namespace:
continue
task_id_to_pod[task_id] = pod
msg = None
tags = []
waiting = get_path(pod, 'status', 'containerStatuses', 0, 'state', 'waiting')
if not waiting:
condition = get_path(pod, 'status', 'conditions', 0)
if condition:
reason = condition.get('reason')
if reason == 'Unschedulable':
message = condition.get('message')
msg = reason + (" ({})".format(message) if message else "")
else:
reason = waiting.get("reason", None)
message = waiting.get("message", None)
msg = reason + (" ({})".format(message) if message else "")
if reason == 'ImagePullBackOff':
self.delete_k8s_resource(k8s_resource=pod, msg=reason)
try:
self._session.api_client.tasks.failed(
task=task_id,
status_reason="K8S glue error: {}".format(msg),
status_message="Changed by K8S glue",
force=True
)
self._agent.command.send_logs(
task_id, ["K8S Error: {}".format(msg)],
session=self._session
)
except Exception as ex:
self.log.warning(
'K8S Glue pending monitor: Failed deleting task "{}"\nEX: {}'.format(task_id, ex)
)
# clean up any msg for this task
self._last_tasks_msgs.pop(task_id, None)
continue
self._update_pending_task_msg(task_id, msg, tags)
if task_id_to_pod:
self._process_tasks_for_pending_pods(task_id_to_pod)
# clean up any last message for a task that wasn't seen as a pod
self._last_tasks_msgs = {k: v for k, v in self._last_tasks_msgs.items() if k in task_id_to_pod}
except GetPodsError:
pass
except Exception:
self.log.exception("Hanging pods daemon loop")
sleep(self._polling_interval)
def delete_k8s_resource(self, k8s_resource: dict, msg: str = None):
delete_cmd = "kubectl delete {kind} {name} -n {namespace} --output name".format(
kind=self._agent.kind,
name=self._get_k8s_resource_name(k8s_resource),
namespace=self._get_k8s_resource_namespace(k8s_resource)
).strip()
self.log.debug(" - deleting {} {}: {}".format(self._agent.kind, (" " + msg) if msg else "", delete_cmd))
return get_bash_output(delete_cmd).strip()
def _process_tasks_for_pending_pods(self, task_id_to_details: Dict[str, dict]):
self._handle_aborted_tasks(task_id_to_details)
def _handle_aborted_tasks(self, pending_tasks_details: Dict[str, dict]):
try:
result = self._session.get(
service='tasks',
action='get_all',
json={
"id": list(pending_tasks_details),
"status": ["stopped"],
"only_fields": ["id"]
},
method=Request.def_method,
async_enable=False,
)
aborted_task_ids = list(filter(None, (task.get("id") for task in result["tasks"])))
for task_id in aborted_task_ids:
pod = pending_tasks_details.get(task_id)
if not pod:
self.log.error("Failed locating aborted task {} in pending pods list".format(task_id))
continue
resource_name = self._get_k8s_resource_name(pod)
self.log.info(
"K8S Glue pending monitor: task {} was aborted but the k8s resource {} is still pending, "
"deleting pod".format(task_id, resource_name)
)
output = self.delete_k8s_resource(k8s_resource=pod, msg="Pending resource of an aborted task")
if not output:
self.log.warning("K8S Glue pending monitor: failed deleting resource {}".format(resource_name))
except Exception as ex:
self.log.warning(
'K8S Glue pending monitor: failed checking aborted tasks for pending resources: {}'.format(ex)
)
def _update_pending_task_msg(self, task_id: str, msg: str, tags: List[str] = None):
if not msg or self._last_tasks_msgs.get(task_id, None) == (msg, tags):
return
try:
# Make sure the task is queued
result = self._session.send_request(
service='tasks',
action='get_all',
json={"id": task_id, "only_fields": ["status"]},
method=Request.def_method,
async_enable=False,
)
if result.ok:
status = get_path(result.json(), 'data', 'tasks', 0, 'status')
# if task is in progress, change its status to enqueued
if status == "in_progress":
result = self._session.send_request(
service='tasks', action='enqueue',
json={
"task": task_id, "force": True, "queue": self._agent.k8s_pending_queue_id
},
method=Request.def_method,
async_enable=False,
)
if not result.ok:
result_msg = get_path(result.json(), 'meta', 'result_msg')
self.log.debug(
"K8S Glue pods monitor: failed forcing task status change"
" for pending task {}: {}".format(task_id, result_msg)
)
# Update task status message
payload = {"task": task_id, "status_message": "K8S glue status: {}".format(msg)}
if tags:
payload["tags"] = tags
result = self._session.send_request('tasks', 'update', json=payload, method=Request.def_method)
if not result.ok:
result_msg = get_path(result.json(), 'meta', 'result_msg')
raise Exception(result_msg or result.text)
# update last msg for this task
self._last_tasks_msgs[task_id] = msg
except Exception as ex:
self.log.warning(
'K8S Glue pods monitor: Failed setting status message for task "{}"\nMSG: {}\nEX: {}'.format(
task_id, msg, ex
)
)

View File

@ -0,0 +1,18 @@
import functools
from subprocess import DEVNULL
from clearml_agent.helper.process import get_bash_output as _get_bash_output
def get_path(d, *path, default=None):
try:
return functools.reduce(
lambda a, b: a[b], path, d
)
except (IndexError, KeyError):
return default
def get_bash_output(cmd, stderr=DEVNULL, raise_error=False):
return _get_bash_output(cmd, stderr=stderr, raise_error=raise_error)

View File

@ -20,20 +20,22 @@ from typing import Text, Dict, Any, Optional, AnyStr, IO, Union
import attr
import furl
import six
import yaml
from attr import fields_dict
from pathlib2 import Path
import six
from six.moves import reduce
from clearml_agent.external import pyhocon
from clearml_agent.errors import CommandFailedError
from clearml_agent.external import pyhocon
from clearml_agent.helper.dicts import filter_keys
pretty_lines = False
log = logging.getLogger(__name__)
use_powershell = os.getenv("CLEARML_AGENT_USE_POWERSHELL", None)
def which(cmd, path=None):
result = find_executable(cmd, path)
@ -52,7 +54,7 @@ def select_for_platform(linux, windows):
def bash_c():
return 'bash -c' if not is_windows_platform() else 'cmd /c'
return 'bash -c' if not is_windows_platform() else ('powershell -Command' if use_powershell else 'cmd /c')
def return_list(arg):

View File

@ -1 +1 @@
__version__ = '1.5.3rc3'
__version__ = '1.5.3rc4'