clearml-agent/clearml_agent/glue/pending_pods_daemon.py
2023-08-24 18:51:27 +03:00

224 lines
9.5 KiB
Python

from time import sleep
from typing import Dict, Tuple, Optional, List
from clearml_agent.backend_api.session import Request
from clearml_agent.glue.utilities import get_bash_output
from clearml_agent.helper.process import stringify_bash_output
from .daemon import K8sDaemon
from .utilities import get_path
from .errors import GetPodsError
class PendingPodsDaemon(K8sDaemon):
def __init__(self, polling_interval: float, agent):
super(PendingPodsDaemon, self).__init__(agent=agent)
self._polling_interval = polling_interval
self._last_tasks_msgs = {} # last msg updated for every task
def get_pods(self):
if self._agent.using_jobs:
return self._agent.get_pods_for_jobs(
job_condition="status.active=1",
pod_filters=["status.phase=Pending"],
debug_msg="Detecting pending pods: {cmd}"
)
return self._agent.get_pods(
filters=["status.phase=Pending"],
debug_msg="Detecting pending pods: {cmd}"
)
def _get_pod_name(self, pod: dict):
return get_path(pod, "metadata", "name")
def _get_k8s_resource_name(self, pod: dict):
if self._agent.using_jobs:
return get_path(pod, "metadata", "labels", "job-name")
return get_path(pod, "metadata", "name")
def _get_task_id(self, pod: dict):
return self._get_k8s_resource_name(pod).rpartition('-')[-1]
@staticmethod
def _get_k8s_resource_namespace(pod: dict):
return pod.get('metadata', {}).get('namespace', None)
def target(self):
"""
Handle pending objects (pods or jobs, depending on the agent mode).
- Delete any pending objects that are not expected to recover
- Delete any pending objects for whom the associated task was aborted
"""
while True:
# noinspection PyBroadException
try:
# Get pods (standalone pods if we're in pods mode, or pods associated to jobs if we're in jobs mode)
pods = self.get_pods()
if pods is None:
raise GetPodsError()
task_id_to_pod = dict()
for pod in pods:
pod_name = self._get_pod_name(pod)
if not pod_name:
continue
task_id = self._get_task_id(pod)
if not task_id:
continue
namespace = self._get_k8s_resource_namespace(pod)
if not namespace:
continue
task_id_to_pod[task_id] = pod
msg = None
tags = []
waiting = get_path(pod, 'status', 'containerStatuses', 0, 'state', 'waiting')
if not waiting:
condition = get_path(pod, 'status', 'conditions', 0)
if condition:
reason = condition.get('reason')
if reason == 'Unschedulable':
message = condition.get('message')
msg = reason + (" ({})".format(message) if message else "")
else:
reason = waiting.get("reason", None)
message = waiting.get("message", None)
msg = reason + (" ({})".format(message) if message else "")
if reason == 'ImagePullBackOff':
self.delete_k8s_resource(k8s_resource=pod, msg=reason)
try:
self._session.api_client.tasks.failed(
task=task_id,
status_reason="K8S glue error: {}".format(msg),
status_message="Changed by K8S glue",
force=True
)
self._agent.send_logs(
task_id, ["K8S Error: {}".format(msg)],
session=self._session
)
except Exception as ex:
self.log.warning(
'K8S Glue pending monitor: Failed deleting task "{}"\nEX: {}'.format(task_id, ex)
)
# clean up any msg for this task
self._last_tasks_msgs.pop(task_id, None)
continue
self._update_pending_task_msg(task_id, msg, tags)
if task_id_to_pod:
self._process_tasks_for_pending_pods(task_id_to_pod)
# clean up any last message for a task that wasn't seen as a pod
self._last_tasks_msgs = {k: v for k, v in self._last_tasks_msgs.items() if k in task_id_to_pod}
except GetPodsError:
pass
except Exception:
self.log.exception("Hanging pods daemon loop")
sleep(self._polling_interval)
def delete_k8s_resource(self, k8s_resource: dict, msg: str = None):
delete_cmd = "kubectl delete {kind} {name} -n {namespace} --output name".format(
kind=self._agent.kind,
name=self._get_k8s_resource_name(k8s_resource),
namespace=self._get_k8s_resource_namespace(k8s_resource)
).strip()
self.log.debug(" - deleting {} {}: {}".format(self._agent.kind, (" " + msg) if msg else "", delete_cmd))
return get_bash_output(delete_cmd).strip()
def _process_tasks_for_pending_pods(self, task_id_to_details: Dict[str, dict]):
self._handle_aborted_tasks(task_id_to_details)
def _handle_aborted_tasks(self, pending_tasks_details: Dict[str, dict]):
try:
result = self._session.get(
service='tasks',
action='get_all',
json={
"id": list(pending_tasks_details),
"status": ["stopped"],
"only_fields": ["id"]
},
method=Request.def_method,
async_enable=False,
)
aborted_task_ids = list(filter(None, (task.get("id") for task in result["tasks"])))
for task_id in aborted_task_ids:
pod = pending_tasks_details.get(task_id)
if not pod:
self.log.error("Failed locating aborted task {} in pending pods list".format(task_id))
continue
resource_name = self._get_k8s_resource_name(pod)
self.log.info(
"K8S Glue pending monitor: task {} was aborted but the k8s resource {} is still pending, "
"deleting pod".format(task_id, resource_name)
)
output = self.delete_k8s_resource(k8s_resource=pod, msg="Pending resource of an aborted task")
if not output:
self.log.warning("K8S Glue pending monitor: failed deleting resource {}".format(resource_name))
except Exception as ex:
self.log.warning(
'K8S Glue pending monitor: failed checking aborted tasks for pending resources: {}'.format(ex)
)
def _update_pending_task_msg(self, task_id: str, msg: str, tags: List[str] = None):
if not msg or self._last_tasks_msgs.get(task_id, None) == (msg, tags):
return
try:
# Make sure the task is queued
result = self._session.send_request(
service='tasks',
action='get_all',
json={"id": task_id, "only_fields": ["status"]},
method=Request.def_method,
async_enable=False,
)
if result.ok:
status = get_path(result.json(), 'data', 'tasks', 0, 'status')
# if task is in progress, change its status to enqueued
if status == "in_progress":
result = self._session.send_request(
service='tasks', action='enqueue',
json={
"task": task_id, "force": True, "queue": self._agent.k8s_pending_queue_id
},
method=Request.def_method,
async_enable=False,
)
if not result.ok:
result_msg = get_path(result.json(), 'meta', 'result_msg')
self.log.debug(
"K8S Glue pods monitor: failed forcing task status change"
" for pending task {}: {}".format(task_id, result_msg)
)
# Update task status message
payload = {"task": task_id, "status_message": "K8S glue status: {}".format(msg)}
if tags:
payload["tags"] = tags
result = self._session.send_request('tasks', 'update', json=payload, method=Request.def_method)
if not result.ok:
result_msg = get_path(result.json(), 'meta', 'result_msg')
raise Exception(result_msg or result.text)
# update last msg for this task
self._last_tasks_msgs[task_id] = msg
except Exception as ex:
self.log.warning(
'K8S Glue pods monitor: Failed setting status message for task "{}"\nMSG: {}\nEX: {}'.format(
task_id, msg, ex
)
)