Fix k8s glue debug mode, refactoring

This commit is contained in:
allegroai 2022-08-01 18:55:49 +03:00
parent 1d1ffd17fb
commit 820ab4dc0c

View File

@ -135,7 +135,8 @@ class K8sIntegration(Worker):
# Add debug logging
if debug:
self.log.logger.disabled = False
self.log.logger.setLevel(logging.INFO)
self.log.logger.setLevel(logging.DEBUG)
self.log.logger.addHandler(logging.StreamHandler())
self.ports_mode = ports_mode
self.num_of_services = num_of_services
self.base_pod_num = base_pod_num
@ -152,8 +153,7 @@ class K8sIntegration(Worker):
self.pod_requests = []
self.max_pods_limit = max_pods_limit if not self.ports_mode else None
if overrides_yaml:
with open(os.path.expandvars(os.path.expanduser(str(overrides_yaml))), 'rt') as f:
overrides = yaml.load(f, Loader=getattr(yaml, 'FullLoader', None))
overrides = self._load_template_file(overrides_yaml)
if overrides:
containers = overrides.get('spec', {}).get('containers', [])
for c in containers:
@ -174,8 +174,7 @@ class K8sIntegration(Worker):
self.log.warning('Removing containers section: {}'.format(overrides['spec'].pop('containers')))
self.overrides_json_string = json.dumps(overrides)
if template_yaml:
with open(os.path.expandvars(os.path.expanduser(str(template_yaml))), 'rt') as f:
self.template_dict = yaml.load(f, Loader=getattr(yaml, 'FullLoader', None))
self.template_dict = self._load_template_file(template_yaml)
clearml_conf_file = clearml_conf_file or kwargs.get('trains_conf_file')
@ -194,6 +193,11 @@ class K8sIntegration(Worker):
_check_pod_thread.daemon = True
_check_pod_thread.start()
@staticmethod
def _load_template_file(path):
with open(os.path.expandvars(os.path.expanduser(str(path))), 'rt') as f:
return yaml.load(f, Loader=getattr(yaml, 'FullLoader', None))
@staticmethod
def _get_path(d, *path, default=None):
try:
@ -203,13 +207,27 @@ class K8sIntegration(Worker):
except (IndexError, KeyError):
return default
def _get_kubectl_options(self, command, extra_labels=None):
labels = [self._get_agent_label()] + (list(extra_labels) if extra_labels else [])
return {
"-l": ",".join(labels),
"-n": str(self.namespace),
"-o": "json"
}
def get_kubectl_command(self, command, extra_labels=None):
opts = self._get_kubectl_options(command, extra_labels)
return 'kubectl {command} {opts}'.format(
command=command, opts=" ".join(x for item in opts.items() for x in item)
)
def _monitor_hanging_pods_daemon(self):
last_tasks_msgs = {} # last msg updated for every task
while True:
output = get_bash_output('kubectl get pods -n {namespace} -o=JSON'.format(
namespace=self.namespace
))
kubectl_cmd = self.get_kubectl_command("get pods")
self.log.debug("Detecting hanging pods: {}".format(kubectl_cmd))
output = get_bash_output(kubectl_cmd)
output = '' if not output else output if isinstance(output, str) else output.decode('utf-8')
try:
output_config = json.loads(output)
@ -231,6 +249,10 @@ class K8sIntegration(Worker):
if not task_id:
continue
namespace = pod.get('metadata', {}).get('namespace', None)
if not namespace:
continue
task_ids.add(task_id)
msg = None
@ -250,7 +272,7 @@ class K8sIntegration(Worker):
msg = reason + (" ({})".format(message) if message else "")
if reason == 'ImagePullBackOff':
delete_pod_cmd = 'kubectl delete pods {} -n {}'.format(pod_name, self.namespace)
delete_pod_cmd = 'kubectl delete pods {} -n {}'.format(pod_name, namespace)
get_bash_output(delete_pod_cmd)
try:
self._session.api_client.tasks.failed(
@ -336,13 +358,11 @@ class K8sIntegration(Worker):
return self._agent_label
def _get_number_used_pods(self):
def _get_used_pods(self):
# noinspection PyBroadException
try:
kubectl_cmd_new = "kubectl get pods -l {agent_label} -n {namespace} -o json".format(
agent_label=self._get_agent_label(),
namespace=self.namespace,
)
kubectl_cmd_new = self.get_kubectl_command("get pods")
self.log.debug("Getting used pods: {}".format(kubectl_cmd_new))
process = subprocess.Popen(kubectl_cmd_new.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate()
output = '' if not output else output if isinstance(output, str) else output.decode('utf-8')
@ -350,17 +370,20 @@ class K8sIntegration(Worker):
if not output:
# No such pod exist so we can use the pod_number we found
return 0
return 0, {}
try:
current_pod_count = len(json.loads(output).get("items", []))
except (ValueError, TypeError) as ex:
return -1
items = json.loads(output).get("items", [])
current_pod_count = len(items)
namespaces = {item["metadata"]["namespace"] for item in items}
except (KeyError, ValueError, TypeError, AttributeError) as ex:
print("Failed parsing used pods command response for cleanup: {}".format(ex))
return -1, {}
return current_pod_count
return current_pod_count, namespaces
except Exception as ex:
print('Failed getting number of used pods: {}'.format(ex))
return -2
print('Failed obtaining used pods information: {}'.format(ex))
return -2, {}
def run_one_task(self, queue: Text, task_id: Text, worker_args=None, task_session=None, **_):
print('Pulling task {} launching on kubernetes cluster'.format(task_id))
@ -426,39 +449,36 @@ class K8sIntegration(Worker):
pod_number = self.base_pod_num
while self.ports_mode or self.max_pods_limit:
pod_number = self.base_pod_num + pod_count
if self.ports_mode:
kubectl_cmd_new = "kubectl get pods -l {pod_label},{agent_label} -n {namespace}".format(
pod_label=self.LIMIT_POD_LABEL.format(pod_number=pod_number),
agent_label=self._get_agent_label(),
namespace=self.namespace,
)
else:
kubectl_cmd_new = "kubectl get pods -l {agent_label} -n {namespace} -o json".format(
agent_label=self._get_agent_label(),
namespace=self.namespace,
)
kubectl_cmd_new = self.get_kubectl_command(
"get pods",
extra_labels=[self.LIMIT_POD_LABEL.format(pod_number=pod_number)] if self.ports_mode else None
)
self.log.debug("Looking for a free pod/port: {}".format(kubectl_cmd_new))
process = subprocess.Popen(kubectl_cmd_new.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate()
output = '' if not output else output if isinstance(output, str) else output.decode('utf-8')
error = '' if not error else error if isinstance(error, str) else error.decode('utf-8')
if not output:
# No such pod exist so we can use the pod_number we found
try:
items_count = len(json.loads(output).get("items", []))
except (ValueError, TypeError) as ex:
self.log.warning(
"K8S Glue pods monitor: Failed parsing kubectl output:\n{}\ntask '{}' "
"will be enqueued back to queue '{}'\nEx: {}".format(
output, task_id, queue, ex
)
)
self._session.api_client.tasks.stop(task_id, force=True)
self._session.api_client.tasks.enqueue(task_id, queue=queue, status_reason='kubectl parsing error')
return
if not items_count:
# No such pod exist so we can use the pod_number we found (result exists but with no items)
break
if self.max_pods_limit:
try:
current_pod_count = len(json.loads(output).get("items", []))
except (ValueError, TypeError) as ex:
self.log.warning(
"K8S Glue pods monitor: Failed parsing kubectl output:\n{}\ntask '{}' "
"will be enqueued back to queue '{}'\nEx: {}".format(
output, task_id, queue, ex
)
)
self._session.api_client.tasks.stop(task_id, force=True)
self._session.api_client.tasks.enqueue(task_id, queue=queue, status_reason='kubectl parsing error')
return
current_pod_count = items_count
max_count = self.max_pods_limit
else:
current_pod_count = pod_count
@ -483,10 +503,9 @@ class K8sIntegration(Worker):
break
pod_count += 1
labels = ([self.LIMIT_POD_LABEL.format(pod_number=pod_number)] if self.ports_mode else []) + \
[self._get_agent_label()]
labels.append("clearml-agent-queue={}".format(self._safe_k8s_label_value(queue)))
labels.append("clearml-agent-queue-name={}".format(self._safe_k8s_label_value(queue_name)))
labels = self._get_pod_labels(queue, queue_name)
if self.ports_mode:
labels.append(self.LIMIT_POD_LABEL.format(pod_number=pod_number))
if self.ports_mode:
print("Kubernetes scheduling task id={} on pod={} (pod_count={})".format(task_id, pod_number, pod_count))
@ -503,7 +522,11 @@ class K8sIntegration(Worker):
queue=queue
)
template = self._resolve_template(task_session, task_data, queue)
try:
template = self._resolve_template(task_session, task_data, queue)
except Exception as ex:
print("ERROR: Failed resolving template (skipping): {}".format(ex))
return
if template:
output, error = self._kubectl_apply(template=template, **kubectl_kwargs)
@ -542,6 +565,13 @@ class K8sIntegration(Worker):
**user_props
)
def _get_pod_labels(self, queue, queue_name):
return [
self._get_agent_label(),
"clearml-agent-queue={}".format(self._safe_k8s_label_value(queue)),
"clearml-agent-queue-name={}".format(self._safe_k8s_label_value(queue_name))
]
def _get_docker_args(self, docker_args, flags, target=None, convert=None):
# type: (List[str], Collection[str], Optional[str], Callable[[str], Any]) -> Union[dict, List[str]]
"""
@ -740,16 +770,19 @@ class K8sIntegration(Worker):
_last_machine_update_ts = 0
while True:
# Get used pods and namespaces
current_pods, namespaces = self._get_used_pods()
# check if have pod limit, then check if we hit it.
if self.max_pods_limit:
current_pods = self._get_number_used_pods()
if current_pods >= self.max_pods_limit:
print("Maximum pod limit reached {}/{}, sleeping for {:.1f} seconds".format(
current_pods, self.max_pods_limit, self._polling_interval))
# delete old completed / failed pods
get_bash_output(
self.KUBECTL_DELETE_CMD.format(namespace=self.namespace, selector=self._get_agent_label())
)
for namespace in namespaces:
kubectl_cmd = self.KUBECTL_DELETE_CMD.format(namespace=namespace, selector=self._get_agent_label())
self.log.debug("Deleting old/failed pods due to pod limit: {}".format(kubectl_cmd))
get_bash_output(kubectl_cmd)
# go to sleep
sleep(self._polling_interval)
continue
@ -757,9 +790,10 @@ class K8sIntegration(Worker):
# iterate over queues (priority style, queues[0] is highest)
for queue in queues:
# delete old completed / failed pods
get_bash_output(
self.KUBECTL_DELETE_CMD.format(namespace=self.namespace, selector=self._get_agent_label())
)
for namespace in namespaces:
kubectl_cmd = self.KUBECTL_DELETE_CMD.format(namespace=namespace, selector=self._get_agent_label())
self.log.debug("Deleting old/failed pods: {}".format(kubectl_cmd))
get_bash_output(kubectl_cmd)
# get next task in queue
try: