mirror of
https://github.com/clearml/clearml-agent
synced 2025-05-01 10:54:11 +00:00
Add number of pods limit to k8s glue
This commit is contained in:
parent
e60a6f9d14
commit
08ff5e6db7
@ -32,7 +32,7 @@ class K8sIntegration(Worker):
|
|||||||
|
|
||||||
K8S_DEFAULT_NAMESPACE = "clearml"
|
K8S_DEFAULT_NAMESPACE = "clearml"
|
||||||
|
|
||||||
KUBECTL_APPLY_CMD = "kubectl apply -f"
|
KUBECTL_APPLY_CMD = "kubectl apply --namespace={namespace} -f"
|
||||||
|
|
||||||
KUBECTL_RUN_CMD = "kubectl run clearml-{queue_name}-id-{task_id} " \
|
KUBECTL_RUN_CMD = "kubectl run clearml-{queue_name}-id-{task_id} " \
|
||||||
"--image {docker_image} " \
|
"--image {docker_image} " \
|
||||||
@ -95,6 +95,7 @@ class K8sIntegration(Worker):
|
|||||||
clearml_conf_file=None,
|
clearml_conf_file=None,
|
||||||
extra_bash_init_script=None,
|
extra_bash_init_script=None,
|
||||||
namespace=None,
|
namespace=None,
|
||||||
|
max_pods_limit=None,
|
||||||
**kwargs
|
**kwargs
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
@ -122,6 +123,7 @@ class K8sIntegration(Worker):
|
|||||||
:param str clearml_conf_file: clearml.conf file to be use by the pod itself (optional)
|
:param str clearml_conf_file: clearml.conf file to be use by the pod itself (optional)
|
||||||
:param str extra_bash_init_script: Additional bash script to run before starting the Task inside the container
|
:param str extra_bash_init_script: Additional bash script to run before starting the Task inside the container
|
||||||
:param str namespace: K8S namespace to be used when creating the new pods (default: clearml)
|
:param str namespace: K8S namespace to be used when creating the new pods (default: clearml)
|
||||||
|
:param int max_pods_limit: Maximum number of pods that K8S glue can run at the same time
|
||||||
"""
|
"""
|
||||||
super(K8sIntegration, self).__init__()
|
super(K8sIntegration, self).__init__()
|
||||||
self.k8s_pending_queue_name = k8s_pending_queue_name or self.K8S_PENDING_QUEUE
|
self.k8s_pending_queue_name = k8s_pending_queue_name or self.K8S_PENDING_QUEUE
|
||||||
@ -147,6 +149,7 @@ class K8sIntegration(Worker):
|
|||||||
self.namespace = namespace or self.K8S_DEFAULT_NAMESPACE
|
self.namespace = namespace or self.K8S_DEFAULT_NAMESPACE
|
||||||
self.pod_limits = []
|
self.pod_limits = []
|
||||||
self.pod_requests = []
|
self.pod_requests = []
|
||||||
|
self.max_pods_limit = max_pods_limit if not self.ports_mode else None
|
||||||
if overrides_yaml:
|
if overrides_yaml:
|
||||||
with open(os.path.expandvars(os.path.expanduser(str(overrides_yaml))), 'rt') as f:
|
with open(os.path.expandvars(os.path.expanduser(str(overrides_yaml))), 'rt') as f:
|
||||||
overrides = yaml.load(f, Loader=getattr(yaml, 'FullLoader', None))
|
overrides = yaml.load(f, Loader=getattr(yaml, 'FullLoader', None))
|
||||||
@ -311,13 +314,19 @@ class K8sIntegration(Worker):
|
|||||||
# Search for a free pod number
|
# Search for a free pod number
|
||||||
pod_count = 0
|
pod_count = 0
|
||||||
pod_number = self.base_pod_num
|
pod_number = self.base_pod_num
|
||||||
while self.ports_mode:
|
while self.ports_mode or self.max_pods_limit:
|
||||||
pod_number = self.base_pod_num + pod_count
|
pod_number = self.base_pod_num + pod_count
|
||||||
kubectl_cmd_new = "kubectl get pods -l {pod_label},{agent_label} -n {namespace}".format(
|
if self.ports_mode:
|
||||||
pod_label=self.LIMIT_POD_LABEL.format(pod_number=pod_number),
|
kubectl_cmd_new = "kubectl get pods -l {pod_label},{agent_label} -n {namespace}".format(
|
||||||
agent_label=self.AGENT_LABEL,
|
pod_label=self.LIMIT_POD_LABEL.format(pod_number=pod_number),
|
||||||
namespace=self.namespace,
|
agent_label=self.AGENT_LABEL,
|
||||||
)
|
namespace=self.namespace,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
kubectl_cmd_new = "kubectl get pods -l {agent_label} -n {namespace} -o json".format(
|
||||||
|
agent_label=self.AGENT_LABEL,
|
||||||
|
namespace=self.namespace,
|
||||||
|
)
|
||||||
process = subprocess.Popen(kubectl_cmd_new.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
process = subprocess.Popen(kubectl_cmd_new.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||||
output, error = process.communicate()
|
output, error = process.communicate()
|
||||||
output = '' if not output else output if isinstance(output, str) else output.decode('utf-8')
|
output = '' if not output else output if isinstance(output, str) else output.decode('utf-8')
|
||||||
@ -326,18 +335,42 @@ class K8sIntegration(Worker):
|
|||||||
if not output:
|
if not output:
|
||||||
# No such pod exist so we can use the pod_number we found
|
# No such pod exist so we can use the pod_number we found
|
||||||
break
|
break
|
||||||
if pod_count >= self.num_of_services - 1:
|
|
||||||
# All pod numbers are taken, exit
|
if self.max_pods_limit:
|
||||||
|
try:
|
||||||
|
current_pod_count = len(json.loads(output).get("items", []))
|
||||||
|
except (ValueError, TypeError) as ex:
|
||||||
|
self.log.warning(
|
||||||
|
"K8S Glue pods monitor: Failed parsing kubectl output:\n{}\ntask '{}' "
|
||||||
|
"will be enqueued back to queue '{}'\nEx: {}".format(
|
||||||
|
output, task_id, queue, ex
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self._session.api_client.tasks.reset(task_id)
|
||||||
|
self._session.api_client.tasks.enqueue(task_id, queue=queue, status_reason='kubectl parsing error')
|
||||||
|
return
|
||||||
|
max_count = self.max_pods_limit
|
||||||
|
else:
|
||||||
|
current_pod_count = pod_count
|
||||||
|
max_count = self.num_of_services - 1
|
||||||
|
|
||||||
|
if current_pod_count >= max_count:
|
||||||
|
# All pods are taken, exit
|
||||||
|
self.log.debug(
|
||||||
|
"kubectl last result: {}\n{}".format(error, output))
|
||||||
self.log.warning(
|
self.log.warning(
|
||||||
"kubectl last result: {}\n{}\nAll k8s services are in use, task '{}' "
|
"All k8s services are in use, task '{}' "
|
||||||
"will be enqueued back to queue '{}'".format(
|
"will be enqueued back to queue '{}'".format(
|
||||||
error, output, task_id, queue
|
task_id, queue
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
self._session.api_client.tasks.reset(task_id)
|
self._session.api_client.tasks.reset(task_id)
|
||||||
self._session.api_client.tasks.enqueue(
|
self._session.api_client.tasks.enqueue(
|
||||||
task_id, queue=queue, status_reason='k8s max pod limit (no free k8s service)')
|
task_id, queue=queue, status_reason='k8s max pod limit (no free k8s service)')
|
||||||
return
|
return
|
||||||
|
elif self.max_pods_limit:
|
||||||
|
# max pods limit hasn't reached yet, so we can create the pod
|
||||||
|
break
|
||||||
pod_count += 1
|
pod_count += 1
|
||||||
|
|
||||||
labels = ([self.LIMIT_POD_LABEL.format(pod_number=pod_number)] if self.ports_mode else []) + [self.AGENT_LABEL]
|
labels = ([self.LIMIT_POD_LABEL.format(pod_number=pod_number)] if self.ports_mode else []) + [self.AGENT_LABEL]
|
||||||
|
Loading…
Reference in New Issue
Block a user