Allow providing namespace in k8s glue and k8s glue example

This commit is contained in:
allegroai 2021-01-20 19:01:03 +02:00
parent 5a94a4048e
commit 0462af6a3d
2 changed files with 24 additions and 9 deletions

View File

@ -30,18 +30,20 @@ from clearml_agent.interface.base import ObjectID
class K8sIntegration(Worker): class K8sIntegration(Worker):
K8S_PENDING_QUEUE = "k8s_scheduler" K8S_PENDING_QUEUE = "k8s_scheduler"
K8S_DEFAULT_NAMESPACE = "clearml"
KUBECTL_APPLY_CMD = "kubectl apply -f" KUBECTL_APPLY_CMD = "kubectl apply -f"
KUBECTL_RUN_CMD = "kubectl run clearml-{queue_name}-id-{task_id} " \ KUBECTL_RUN_CMD = "kubectl run clearml-{queue_name}-id-{task_id} " \
"--image {docker_image} " \ "--image {docker_image} " \
"--restart=Never --replicas=1 " \ "--restart=Never --replicas=1 " \
"--generator=run-pod/v1 " \ "--generator=run-pod/v1 " \
"--namespace=clearml" "--namespace={namespace}"
KUBECTL_DELETE_CMD = "kubectl delete pods " \ KUBECTL_DELETE_CMD = "kubectl delete pods " \
"--selector=TRAINS=agent " \ "--selector=TRAINS=agent " \
"--field-selector=status.phase!=Pending,status.phase!=Running " \ "--field-selector=status.phase!=Pending,status.phase!=Running " \
"--namespace=clearml" "--namespace={namespace}"
BASH_INSTALL_SSH_CMD = [ BASH_INSTALL_SSH_CMD = [
"apt-get install -y openssh-server", "apt-get install -y openssh-server",
@ -91,6 +93,7 @@ class K8sIntegration(Worker):
template_yaml=None, template_yaml=None,
clearml_conf_file=None, clearml_conf_file=None,
extra_bash_init_script=None, extra_bash_init_script=None,
namespace=None,
**kwargs **kwargs
): ):
""" """
@ -116,6 +119,7 @@ class K8sIntegration(Worker):
If provided the pod is scheduled with kubectl apply and overrides are ignored, otherwise with kubectl run. If provided the pod is scheduled with kubectl apply and overrides are ignored, otherwise with kubectl run.
:param str clearml_conf_file: clearml.conf file to be use by the pod itself (optional) :param str clearml_conf_file: clearml.conf file to be use by the pod itself (optional)
:param str extra_bash_init_script: Additional bash script to run before starting the Task inside the container :param str extra_bash_init_script: Additional bash script to run before starting the Task inside the container
:param str namespace: K8S namespace to be used when creating the new pods (default: clearml)
""" """
super(K8sIntegration, self).__init__() super(K8sIntegration, self).__init__()
self.k8s_pending_queue_name = k8s_pending_queue_name or self.K8S_PENDING_QUEUE self.k8s_pending_queue_name = k8s_pending_queue_name or self.K8S_PENDING_QUEUE
@ -137,6 +141,7 @@ class K8sIntegration(Worker):
self.extra_bash_init_script = extra_bash_init_script or None self.extra_bash_init_script = extra_bash_init_script or None
if self.extra_bash_init_script and not isinstance(self.extra_bash_init_script, str): if self.extra_bash_init_script and not isinstance(self.extra_bash_init_script, str):
self.extra_bash_init_script = ' ; '.join(self.extra_bash_init_script) # noqa self.extra_bash_init_script = ' ; '.join(self.extra_bash_init_script) # noqa
self.namespace = namespace or self.K8S_DEFAULT_NAMESPACE
self.pod_limits = [] self.pod_limits = []
self.pod_requests = [] self.pod_requests = []
if overrides_yaml: if overrides_yaml:
@ -182,7 +187,9 @@ class K8sIntegration(Worker):
def _monitor_hanging_pods_daemon(self): def _monitor_hanging_pods_daemon(self):
while True: while True:
output = get_bash_output('kubectl get pods -n clearml -o=JSON') output = get_bash_output('kubectl get pods -n {namespace} -o=JSON'.format(
namespace=self.namespace
))
output = '' if not output else output if isinstance(output, str) else output.decode('utf-8') output = '' if not output else output if isinstance(output, str) else output.decode('utf-8')
try: try:
output_config = json.loads(output) output_config = json.loads(output)
@ -202,7 +209,7 @@ class K8sIntegration(Worker):
pod_name = pod.get('metadata', {}).get('name', None) pod_name = pod.get('metadata', {}).get('name', None)
if pod_name: if pod_name:
task_id = pod_name.rpartition('-')[-1] task_id = pod_name.rpartition('-')[-1]
delete_pod_cmd = 'kubectl delete pods {} -n clearml'.format(pod_name) delete_pod_cmd = 'kubectl delete pods {} -n {}'.format(pod_name, self.namespace)
get_bash_output(delete_pod_cmd) get_bash_output(delete_pod_cmd)
try: try:
self._session.api_client.tasks.failed( self._session.api_client.tasks.failed(
@ -301,9 +308,10 @@ class K8sIntegration(Worker):
# Search for a free pod number # Search for a free pod number
pod_number = 1 pod_number = 1
while self.ports_mode: while self.ports_mode:
kubectl_cmd_new = "kubectl get pods -l {pod_label},{agent_label} -n clearml".format( kubectl_cmd_new = "kubectl get pods -l {pod_label},{agent_label} -n {namespace}".format(
pod_label=self.LIMIT_POD_LABEL.format(pod_number=pod_number), pod_label=self.LIMIT_POD_LABEL.format(pod_number=pod_number),
agent_label=self.AGENT_LABEL agent_label=self.AGENT_LABEL,
namespace=self.namespace,
) )
process = subprocess.Popen(kubectl_cmd_new.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) process = subprocess.Popen(kubectl_cmd_new.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate() output, error = process.communicate()
@ -436,6 +444,7 @@ class K8sIntegration(Worker):
task_id=task_id, task_id=task_id,
docker_image=docker_image, docker_image=docker_image,
queue_id=queue, queue_id=queue,
namespace=self.namespace
) )
# make sure we provide a list # make sure we provide a list
if isinstance(kubectl_cmd, str): if isinstance(kubectl_cmd, str):
@ -461,7 +470,8 @@ class K8sIntegration(Worker):
queue_name=queue_name, queue_name=queue_name,
task_id=task_id, task_id=task_id,
docker_image=docker_image, docker_image=docker_image,
queue_id=queue queue_id=queue,
namespace=self.namespace,
) )
# make sure we provide a list # make sure we provide a list
if isinstance(kubectl_cmd, str): if isinstance(kubectl_cmd, str):
@ -519,7 +529,7 @@ class K8sIntegration(Worker):
# iterate over queues (priority style, queues[0] is highest) # iterate over queues (priority style, queues[0] is highest)
for queue in queues: for queue in queues:
# delete old completed / failed pods # delete old completed / failed pods
get_bash_output(self.KUBECTL_DELETE_CMD) get_bash_output(self.KUBECTL_DELETE_CMD.format(namespace=self.namespace))
# get next task in queue # get next task in queue
try: try:

View File

@ -47,6 +47,10 @@ def parse_args():
"--ssh-server-port", type=int, default=0, "--ssh-server-port", type=int, default=0,
help="If non-zero, every pod will also start an SSH server on the selected port (default: zero, not active)" help="If non-zero, every pod will also start an SSH server on the selected port (default: zero, not active)"
) )
parser.add_argument(
"--namespace", type=str,
help="Specify the namespace in which pods will be created (default: %(default)s)", default="clearml"
)
return parser.parse_args() return parser.parse_args()
@ -66,7 +70,8 @@ def main():
ports_mode=args.ports_mode, num_of_services=args.num_of_services, user_props_cb=user_props_cb, ports_mode=args.ports_mode, num_of_services=args.num_of_services, user_props_cb=user_props_cb,
overrides_yaml=args.overrides_yaml, clearml_conf_file=args.pod_clearml_conf, template_yaml=args.template_yaml, overrides_yaml=args.overrides_yaml, clearml_conf_file=args.pod_clearml_conf, template_yaml=args.template_yaml,
extra_bash_init_script=K8sIntegration.get_ssh_server_bash( extra_bash_init_script=K8sIntegration.get_ssh_server_bash(
ssh_port_number=args.ssh_server_port) if args.ssh_server_port else None ssh_port_number=args.ssh_server_port) if args.ssh_server_port else None,
namespace=args.namespace,
) )
k8s.k8s_daemon(args.queue) k8s.k8s_daemon(args.queue)