mirror of
https://github.com/clearml/clearml-agent
synced 2025-03-13 06:58:37 +00:00
Add base-pod-number parameter to k8s glue and example
This commit is contained in:
parent
c578b37c6d
commit
8229843018
@ -88,6 +88,7 @@ class K8sIntegration(Worker):
|
|||||||
debug=False,
|
debug=False,
|
||||||
ports_mode=False,
|
ports_mode=False,
|
||||||
num_of_services=20,
|
num_of_services=20,
|
||||||
|
base_pod_num=1,
|
||||||
user_props_cb=None,
|
user_props_cb=None,
|
||||||
overrides_yaml=None,
|
overrides_yaml=None,
|
||||||
template_yaml=None,
|
template_yaml=None,
|
||||||
@ -111,6 +112,7 @@ class K8sIntegration(Worker):
|
|||||||
Requires the `num_of_services` parameter.
|
Requires the `num_of_services` parameter.
|
||||||
:param int num_of_services: Number of k8s services configured in the cluster. Required if `port_mode` is True.
|
:param int num_of_services: Number of k8s services configured in the cluster. Required if `port_mode` is True.
|
||||||
(default: 20)
|
(default: 20)
|
||||||
|
:param int base_pod_num: Used when `ports_mode` is True, sets the base pod number to a given value (default: 1)
|
||||||
:param callable user_props_cb: An Optional callable allowing additional user properties to be specified
|
:param callable user_props_cb: An Optional callable allowing additional user properties to be specified
|
||||||
when scheduling a task to run in a pod. Callable can receive an optional pod number and should return
|
when scheduling a task to run in a pod. Callable can receive an optional pod number and should return
|
||||||
a dictionary of user properties (name and value). Signature is [[Optional[int]], Dict[str,str]]
|
a dictionary of user properties (name and value). Signature is [[Optional[int]], Dict[str,str]]
|
||||||
@ -133,6 +135,7 @@ class K8sIntegration(Worker):
|
|||||||
self.log.logger.setLevel(logging.INFO)
|
self.log.logger.setLevel(logging.INFO)
|
||||||
self.ports_mode = ports_mode
|
self.ports_mode = ports_mode
|
||||||
self.num_of_services = num_of_services
|
self.num_of_services = num_of_services
|
||||||
|
self.base_pod_num = base_pod_num
|
||||||
self._edit_hyperparams_support = None
|
self._edit_hyperparams_support = None
|
||||||
self._user_props_cb = user_props_cb
|
self._user_props_cb = user_props_cb
|
||||||
self.conf_file_content = None
|
self.conf_file_content = None
|
||||||
@ -306,8 +309,10 @@ class K8sIntegration(Worker):
|
|||||||
safe_queue_name = re.sub(r'\W+', '', safe_queue_name).replace('_', '').replace('-', '')
|
safe_queue_name = re.sub(r'\W+', '', safe_queue_name).replace('_', '').replace('-', '')
|
||||||
|
|
||||||
# Search for a free pod number
|
# Search for a free pod number
|
||||||
pod_number = 1
|
pod_count = 0
|
||||||
|
pod_number = self.base_pod_num
|
||||||
while self.ports_mode:
|
while self.ports_mode:
|
||||||
|
pod_number = self.base_pod_num + pod_count
|
||||||
kubectl_cmd_new = "kubectl get pods -l {pod_label},{agent_label} -n {namespace}".format(
|
kubectl_cmd_new = "kubectl get pods -l {pod_label},{agent_label} -n {namespace}".format(
|
||||||
pod_label=self.LIMIT_POD_LABEL.format(pod_number=pod_number),
|
pod_label=self.LIMIT_POD_LABEL.format(pod_number=pod_number),
|
||||||
agent_label=self.AGENT_LABEL,
|
agent_label=self.AGENT_LABEL,
|
||||||
@ -321,7 +326,7 @@ class K8sIntegration(Worker):
|
|||||||
if not output:
|
if not output:
|
||||||
# No such pod exist so we can use the pod_number we found
|
# No such pod exist so we can use the pod_number we found
|
||||||
break
|
break
|
||||||
if pod_number >= self.num_of_services:
|
if pod_count >= self.num_of_services - 1:
|
||||||
# All pod numbers are taken, exit
|
# All pod numbers are taken, exit
|
||||||
self.log.warning(
|
self.log.warning(
|
||||||
"kubectl last result: {}\n{}\nAll k8s services are in use, task '{}' "
|
"kubectl last result: {}\n{}\nAll k8s services are in use, task '{}' "
|
||||||
@ -333,12 +338,12 @@ class K8sIntegration(Worker):
|
|||||||
self._session.api_client.tasks.enqueue(
|
self._session.api_client.tasks.enqueue(
|
||||||
task_id, queue=queue, status_reason='k8s max pod limit (no free k8s service)')
|
task_id, queue=queue, status_reason='k8s max pod limit (no free k8s service)')
|
||||||
return
|
return
|
||||||
pod_number += 1
|
pod_count += 1
|
||||||
|
|
||||||
labels = ([self.LIMIT_POD_LABEL.format(pod_number=pod_number)] if self.ports_mode else []) + [self.AGENT_LABEL]
|
labels = ([self.LIMIT_POD_LABEL.format(pod_number=pod_number)] if self.ports_mode else []) + [self.AGENT_LABEL]
|
||||||
|
|
||||||
if self.ports_mode:
|
if self.ports_mode:
|
||||||
print("Kubernetes scheduling task id={} on pod={}".format(task_id, pod_number))
|
print("Kubernetes scheduling task id={} on pod={} (pod_count={})".format(task_id, pod_number, pod_count))
|
||||||
else:
|
else:
|
||||||
print("Kubernetes scheduling task id={}".format(task_id))
|
print("Kubernetes scheduling task id={}".format(task_id))
|
||||||
|
|
||||||
@ -364,7 +369,13 @@ class K8sIntegration(Worker):
|
|||||||
|
|
||||||
user_props = {"k8s-queue": str(queue_name)}
|
user_props = {"k8s-queue": str(queue_name)}
|
||||||
if self.ports_mode:
|
if self.ports_mode:
|
||||||
user_props.update({"k8s-pod-number": pod_number, "k8s-pod-label": labels[0]})
|
user_props.update(
|
||||||
|
{
|
||||||
|
"k8s-pod-number": pod_number,
|
||||||
|
"k8s-pod-label": labels[0],
|
||||||
|
"k8s-internal-pod-count": pod_count,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
if self._user_props_cb:
|
if self._user_props_cb:
|
||||||
# noinspection PyBroadException
|
# noinspection PyBroadException
|
||||||
|
@ -24,7 +24,13 @@ def parse_args():
|
|||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--base-port", type=int,
|
"--base-port", type=int,
|
||||||
help="Used in conjunction with ports-mode, specifies the base port exposed by the services. "
|
help="Used in conjunction with ports-mode, specifies the base port exposed by the services. "
|
||||||
"For pod #X, the port will be <base-port>+X"
|
"For pod #X, the port will be <base-port>+X. Note that pod number is calculated based on base-pod-num"
|
||||||
|
"e.g. if base-port=20000 and base-pod-num=3, the port for the first pod will be 20003"
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--base-pod-num", type=int, default=1,
|
||||||
|
help="Used in conjunction with ports-mode and base-port, specifies the base pod number to be used by the "
|
||||||
|
"service (default: %(default)s)"
|
||||||
)
|
)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--gateway-address", type=str, default=None,
|
"--gateway-address", type=str, default=None,
|
||||||
@ -67,9 +73,9 @@ def main():
|
|||||||
user_props_cb = k8s_user_props_cb
|
user_props_cb = k8s_user_props_cb
|
||||||
|
|
||||||
k8s = K8sIntegration(
|
k8s = K8sIntegration(
|
||||||
ports_mode=args.ports_mode, num_of_services=args.num_of_services, user_props_cb=user_props_cb,
|
ports_mode=args.ports_mode, num_of_services=args.num_of_services, base_pod_num=args.base_pod_num,
|
||||||
overrides_yaml=args.overrides_yaml, clearml_conf_file=args.pod_clearml_conf, template_yaml=args.template_yaml,
|
user_props_cb=user_props_cb, overrides_yaml=args.overrides_yaml, clearml_conf_file=args.pod_clearml_conf,
|
||||||
extra_bash_init_script=K8sIntegration.get_ssh_server_bash(
|
template_yaml=args.template_yaml, extra_bash_init_script=K8sIntegration.get_ssh_server_bash(
|
||||||
ssh_port_number=args.ssh_server_port) if args.ssh_server_port else None,
|
ssh_port_number=args.ssh_server_port) if args.ssh_server_port else None,
|
||||||
namespace=args.namespace,
|
namespace=args.namespace,
|
||||||
)
|
)
|
||||||
|
Loading…
Reference in New Issue
Block a user