Compare commits

..

6 Commits

Author SHA1 Message Date
clearml
ee21944f6b Fix UV pip freeze fails 2025-04-02 14:42:02 +03:00
clearml
55790b3c3a Add support for ${CLEARML_TASK.xx} as docker args arguments parsed based on Tasks value 2025-04-02 14:41:35 +03:00
clearml
bb9ad6b213 PEP8 2025-04-02 14:40:19 +03:00
clearml
94fc0138b5 Update base images to resolve several vulnerabilities 2025-04-02 14:39:34 +03:00
clearml
f2df45cad6 Refactor 2025-04-02 14:39:14 +03:00
clearml
326ba81105 Refactor 2025-04-02 14:38:58 +03:00
13 changed files with 222 additions and 33 deletions

View File

@ -192,6 +192,9 @@
# optional arguments to pass to docker image # optional arguments to pass to docker image
# these are local for this agent and will not be updated in the experiment's docker_cmd section # these are local for this agent and will not be updated in the experiment's docker_cmd section
# Supports parsing ${CLEARML_TASK:default} and ${CLEARML_QUEUE_NAME:default} values based on Task object
# replace with real-time values.
# Example: "${CLEARML_TASK.project}", "${TASK.hyperparams.properties.user_key.value:default_value}"
# extra_docker_arguments: ["--ipc=host", ] # extra_docker_arguments: ["--ipc=host", ]
# Allow the extra docker arg to override task level docker arg (if the same argument is passed on both), # Allow the extra docker arg to override task level docker arg (if the same argument is passed on both),

View File

@ -150,7 +150,7 @@ from clearml_agent.helper.repo import clone_repository_cached, RepoInfo, VCS, fi
from clearml_agent.helper.resource_monitor import ResourceMonitor from clearml_agent.helper.resource_monitor import ResourceMonitor
from clearml_agent.helper.runtime_verification import check_runtime, print_uptime_properties from clearml_agent.helper.runtime_verification import check_runtime, print_uptime_properties
from clearml_agent.helper.singleton import Singleton from clearml_agent.helper.singleton import Singleton
from clearml_agent.helper.docker_args import DockerArgsSanitizer from clearml_agent.helper.docker_args import DockerArgsSanitizer, DockerArgsTemplateResolver
from clearml_agent.session import Session from clearml_agent.session import Session
from .events import Events from .events import Events
@ -480,7 +480,7 @@ def get_task_container(session, task_id, ignore_match_rules=False, allow_force_c
}, },
) )
if not res.ok: if not res.ok:
raise Exception("failed setting runtime property") raise Exception("failed setting container property")
except Exception as ex: except Exception as ex:
print("WARNING: failed setting container properties for task '{}': {}".format(task_id, ex)) print("WARNING: failed setting container properties for task '{}': {}".format(task_id, ex))
@ -1240,6 +1240,15 @@ class Worker(ServiceCommandSection):
else: else:
print("Warning: generated docker container name is invalid: {}".format(name)) print("Warning: generated docker container name is invalid: {}".format(name))
# convert template arguments now (i.e. ${CLEARML_} ), this is important for the docker arg
# resolve the Task's docker arguments before everything else, because
# unlike the vault/config these are not running as the agent's user, they are the user's,
# we need to filter them post template parsing limitation to happen before the `docker_image_func` call
docker_args_template_resolver = DockerArgsTemplateResolver(task_session=self._session, task_id=task_id)
if docker_params.get("docker_arguments"):
docker_params["docker_arguments"] = docker_args_template_resolver.resolve_docker_args_from_template(
full_docker_cmd=docker_params["docker_arguments"])
full_docker_cmd = self.docker_image_func(env_task_id=task_id, **docker_params) full_docker_cmd = self.docker_image_func(env_task_id=task_id, **docker_params)
# if we are using the default docker, update back the Task: # if we are using the default docker, update back the Task:
@ -1256,6 +1265,12 @@ class Worker(ServiceCommandSection):
except Exception: except Exception:
pass pass
# convert template arguments now (i.e. ${CLEARML_} )
# Notice we do not parse the last part of the docker cmd because that's
# the actual command to be executed inside the docker
full_docker_cmd = docker_args_template_resolver.resolve_docker_args_from_template(
full_docker_cmd=full_docker_cmd[:-1]) + [full_docker_cmd[-1]]
# if this is services_mode, change the worker_id to a unique name # if this is services_mode, change the worker_id to a unique name
# abd use full-monitoring, ot it registers itself as a worker for this specific service. # abd use full-monitoring, ot it registers itself as a worker for this specific service.
# notice, the internal agent will monitor itself once the docker is up and running # notice, the internal agent will monitor itself once the docker is up and running
@ -2711,6 +2726,11 @@ class Worker(ServiceCommandSection):
docker_image=docker_image, docker_arguments=docker_arguments, docker_bash_setup_script=docker_setup_script docker_image=docker_image, docker_arguments=docker_arguments, docker_bash_setup_script=docker_setup_script
) )
# convert docker template arguments (i.e. ${CLEARML_} ) based on the current Task
docker_args_template_resolver = DockerArgsTemplateResolver(task_session=self._session, task_id=task_id)
full_docker_cmd = docker_args_template_resolver.resolve_docker_args_from_template(
full_docker_cmd=full_docker_cmd)
end_of_build_marker = "build.done=true" end_of_build_marker = "build.done=true"
docker_cmd_suffix = ' build --id {task_id} --install-globally; ' \ docker_cmd_suffix = ' build --id {task_id} --install-globally; ' \
'ORG=$(stat -c "%u:%g" {conf_file}) ; chown $(whoami):$(whoami) {conf_file} ; ' \ 'ORG=$(stat -c "%u:%g" {conf_file}) ; chown $(whoami):$(whoami) {conf_file} ; ' \
@ -4705,7 +4725,7 @@ class Worker(ServiceCommandSection):
docker_arguments = self._resolve_docker_env_args(docker_arguments) docker_arguments = self._resolve_docker_env_args(docker_arguments)
if extra_docker_arguments: if extra_docker_arguments:
# we always resolve environments in the `extra_docker_arguments` becuase the admin set them (not users) # we always resolve environments in the `extra_docker_arguments` because the admin set them (not users)
extra_docker_arguments = self._resolve_docker_env_args(extra_docker_arguments) extra_docker_arguments = self._resolve_docker_env_args(extra_docker_arguments)
extra_docker_arguments = [extra_docker_arguments] \ extra_docker_arguments = [extra_docker_arguments] \
if isinstance(extra_docker_arguments, six.string_types) else extra_docker_arguments if isinstance(extra_docker_arguments, six.string_types) else extra_docker_arguments

View File

@ -455,6 +455,9 @@ class K8sIntegration(Worker):
def ports_mode_supported_for_task(self, task_id: str, task_data): def ports_mode_supported_for_task(self, task_id: str, task_data):
return self.ports_mode return self.ports_mode
def get_default_docker_image(self, session, queue: str) -> str:
return str(ENV_DOCKER_IMAGE.get() or session.config.get("agent.default_docker.image", "nvidia/cuda"))
def run_one_task(self, queue: Text, task_id: Text, worker_args=None, task_session=None, **_): def run_one_task(self, queue: Text, task_id: Text, worker_args=None, task_session=None, **_):
print('Pulling task {} launching on kubernetes cluster'.format(task_id)) print('Pulling task {} launching on kubernetes cluster'.format(task_id))
session = task_session or self._session session = task_session or self._session
@ -509,9 +512,7 @@ class K8sIntegration(Worker):
container = get_task_container(session, task_id) container = get_task_container(session, task_id)
if not container.get('image'): if not container.get('image'):
container['image'] = str( container['image'] = self.get_default_docker_image(session, queue)
ENV_DOCKER_IMAGE.get() or session.config.get("agent.default_docker.image", "nvidia/cuda")
)
container['arguments'] = session.config.get("agent.default_docker.arguments", None) container['arguments'] = session.config.get("agent.default_docker.arguments", None)
set_task_container( set_task_container(
session, task_id, docker_image=container['image'], docker_arguments=container['arguments'] session, task_id, docker_image=container['image'], docker_arguments=container['arguments']

View File

@ -1,8 +1,12 @@
import re import re
import shlex import shlex
from functools import partial
from typing import Tuple, List, TYPE_CHECKING, Optional from typing import Tuple, List, TYPE_CHECKING, Optional
from urllib.parse import urlunparse, urlparse from urllib.parse import urlunparse, urlparse
from string import Template
from clearml_agent.backend_api.services import queues as queues_api
from clearml_agent.backend_api.session import Request
from clearml_agent.definitions import ( from clearml_agent.definitions import (
ENV_AGENT_GIT_PASS, ENV_AGENT_GIT_PASS,
ENV_AGENT_SECRET_KEY, ENV_AGENT_SECRET_KEY,
@ -196,7 +200,7 @@ class DockerArgsSanitizer:
(i.e. changing the ports if needed and adding the new env var), runtime property (i.e. changing the ports if needed and adding the new env var), runtime property
""" """
if not docker_arguments: if not docker_arguments:
return return None
# make a copy we are going to change it # make a copy we are going to change it
docker_arguments = docker_arguments[:] docker_arguments = docker_arguments[:]
port_mapping_filtered = [ port_mapping_filtered = [
@ -205,7 +209,7 @@ class DockerArgsSanitizer:
] ]
if not port_mapping_filtered: if not port_mapping_filtered:
return return None
# test if network=host was requested, docker will ignore published ports anyhow, so no use in parsing them # test if network=host was requested, docker will ignore published ports anyhow, so no use in parsing them
network_filtered = DockerArgsSanitizer.filter_switches( network_filtered = DockerArgsSanitizer.filter_switches(
@ -213,7 +217,7 @@ class DockerArgsSanitizer:
network_filtered = [t for t in network_filtered if t.strip == "host" or "host" in t.split("=")] network_filtered = [t for t in network_filtered if t.strip == "host" or "host" in t.split("=")]
# if any network is configured, we ignore it, there is nothing we can do # if any network is configured, we ignore it, there is nothing we can do
if network_filtered: if network_filtered:
return return None
# verifying available ports, remapping if necessary # verifying available ports, remapping if necessary
port_checks = TcpPorts() port_checks = TcpPorts()
@ -270,3 +274,168 @@ class DockerArgsSanitizer:
additional_task_runtime = {"_external_host_tcp_port_mapping": ports} additional_task_runtime = {"_external_host_tcp_port_mapping": ports}
return docker_arguments+additional_cmd, additional_task_runtime return docker_arguments+additional_cmd, additional_task_runtime
class CustomTemplate(Template):
"""
Parse ${CLEARML_<something>:default} values based on Task object and replace with real-time value
Example: "-e project_id=${CLEARML_TASK.project}" will be replaced with the
Task actual project ID from the Task object "-e project_id=<task.project>"
"-e queue_name=${CLEARML_QUEUE_NAME}"
"-e user_key=${TASK.hyperparams.properties.user_key.value:default_value}"
It supports:
${QUEUE_NAME} - name of the queue
# Task object nested variables:
${TASK.id}
${TASK.name}
${TASK.project}
${TASK.hyperparams.properties.user_key.value}
"""
idpattern = r'(?a:[_a-z][_a-z0-9|.|:]*)'
prefix = "CLEARML_"
queue_id_to_name_map = {}
@classmethod
def get_queue_name(cls, task_session, queue_id):
if queue_id in cls.queue_id_to_name_map:
return cls.queue_id_to_name_map[queue_id]
# noinspection PyBroadException
try:
response = task_session.send_api(queues_api.GetByIdRequest(queue=queue_id))
cls.queue_id_to_name_map[queue_id] = response.queue.name
except Exception:
# if something went wrong start over from the highest priority queue
return None
return cls.queue_id_to_name_map.get(queue_id)
def default_custom_substitute(self, task_info, queue_name):
return self.custom_substitute(partial(CustomTemplate.default_resolve_template, task_info, queue_name))
def custom_substitute(self, mapping_func):
# Helper function for .sub()
def convert(mo):
named = mo.group('named') or mo.group('braced')
if not named or not str(named).startswith(self.prefix):
return mo.group()
named = named[len(self.prefix):]
if named is not None:
default_value = None
try:
if ":" in named:
named, default_value = named.split(":", 1)
return str(mapping_func(named, default_value))
except KeyError:
return mo.group()
if mo.group('escaped') is not None:
return self.delimiter
if mo.group('invalid') is not None:
return mo.group()
raise ValueError('Unrecognized named group in pattern', self.pattern)
return self.pattern.sub(convert, self.template)
def substitute(self, *args, **kwds):
raise ValueError("Unsupported")
def safe_substitute(self, *args, **kwds):
raise ValueError("Unsupported")
@classmethod
def default_resolve_template(cls, task_info, queue, key, default):
"""
Notice CLEARML_ prefix omitted! (i.e. ${QUEUE_ID} is ${CLEARML_QUEUE_ID})
we support:
${QUEUE_NAME} - name of the queue
${WORKER_ID} - FUTURE
# we also complex variables:
${TASK.id}
${TASK.name}
${TASK.project.id}
${TASK.project.name}
${TASK.hyperparams.properties.user_key.value}
:param task_info: nested dict with task information
:param queue: queue_id (str)
:param key: key to be replaced
:param default: default value, None will raise exception
:return: string value
"""
try:
parts = key.split(".")
main_part = parts[0]
if main_part == "QUEUE_NAME":
if len(parts) == 1:
return queue or default
raise ValueError()
elif main_part == "QUEUE_NAME":
# future support
raise ValueError()
elif main_part == "WORKER_ID":
# future support
raise ValueError()
elif main_part == "TASK":
for part in parts[1:]:
task_info = task_info.get(part)
if task_info is None:
break
if isinstance(task_info, str):
return task_info
if default:
return default
raise ValueError()
except Exception:
raise KeyError((key,))
# default, nothing
raise KeyError((key,))
class DockerArgsTemplateResolver:
def __init__(self, task_session, task_id):
self._task_session = task_session
self.task_info = None
self.queue_name = None
self.task_id = task_id
def resolve_docker_args_from_template(self, full_docker_cmd):
if not full_docker_cmd or not self._task_session.check_min_api_version("2.20"):
return full_docker_cmd
# convert docker template arguments (i.e. ${CLEARML_} ) based on the current Task
for i, token in enumerate(full_docker_cmd[:-1]):
# skip the ones which are obviously not our prefix
if not CustomTemplate.delimiter in token or not CustomTemplate.prefix in token:
continue
if self.task_info is None:
result = self._task_session.send_request(
service='tasks',
action='get_all',
version='2.20',
method=Request.def_method,
json={'id': [self.task_id], 'search_hidden': True}
)
# we should not fail here
self.task_info = result.json().get("data", {}).get("tasks", [])[0] or {}
queue_id = self.task_info.get("execution", {}).get("queue")
self.queue_name = CustomTemplate.get_queue_name(self._task_session, queue_id)
tmpl = CustomTemplate(token)
# replace it
try:
full_docker_cmd[i] = tmpl.default_custom_substitute(self.task_info, self.queue_name)
except Exception as ex:
print("Failed parsing ClearML Template argument [{}] skipped: error ()".format(token, ex))
return full_docker_cmd

View File

@ -247,7 +247,7 @@ class UvAPI(VirtualenvPip):
# there is a bug so we have to call pip to get the freeze because UV will return the wrong list # there is a bug so we have to call pip to get the freeze because UV will return the wrong list
# packages = self.run_with_env(('freeze',), output=True).splitlines() # packages = self.run_with_env(('freeze',), output=True).splitlines()
packages = self.lock_config.get_run_argv( packages = self.lock_config.get_run_argv(
"pip", "freeze", "--python", str(Path(self.path) / "bin" / "python"), cwd=self.lockfile_path).get_output().splitlines() "pip", "freeze", cwd=self.lockfile_path).get_output().splitlines()
# list clearml_agent as well # list clearml_agent as well
# packages_without_program = [package for package in packages if PROGRAM_NAME not in package] # packages_without_program = [package for package in packages if PROGRAM_NAME not in package]
return {'pip': packages} return {'pip': packages}

View File

@ -89,9 +89,9 @@ def kill_all_child_processes(pid=None, include_parent=True):
print("\nLeaving process id {}".format(pid)) print("\nLeaving process id {}".format(pid))
try: try:
parent = psutil.Process(pid) parent = psutil.Process(pid)
except psutil.Error: except psutil.Error as ex:
# could not find parent process id # could not find process id
print("ERROR: could not find parent process id {}".format(pid)) print("ERROR: could not find process id {}: {}".format(pid, ex))
return return
for child in parent.children(recursive=True): for child in parent.children(recursive=True):
try: try:
@ -113,7 +113,7 @@ def terminate_all_child_processes(pid=None, timeout=10., include_parent=True):
try: try:
parent = psutil.Process(pid) parent = psutil.Process(pid)
except psutil.Error: except psutil.Error:
# could not find parent process id # could not find process id
return return
for child in parent.children(recursive=False): for child in parent.children(recursive=False):
print('Terminating child process {}'.format(child.pid)) print('Terminating child process {}'.format(child.pid))

View File

@ -13,7 +13,7 @@ api {
agent.git_user="" agent.git_user=""
agent.git_pass="" agent.git_pass=""
# extra_index_url: ["https://allegroai.jfrog.io/clearml/api/pypi/public/simple"] # extra_index_url: ["https://clearml.jfrog.io/clearml/api/pypi/public/simple"]
agent.package_manager.extra_index_url= [ agent.package_manager.extra_index_url= [
] ]
@ -68,7 +68,7 @@ agent {
force_upgrade: false, force_upgrade: false,
# additional artifact repositories to use when installing python packages # additional artifact repositories to use when installing python packages
# extra_index_url: ["https://allegroai.jfrog.io/clearmlai/api/pypi/public/simple"] # extra_index_url: ["https://clearml.jfrog.io/clearmlai/api/pypi/public/simple"]
# additional conda channels to use when installing with conda package manager # additional conda channels to use when installing with conda package manager
conda_channels: ["pytorch", "conda-forge", "defaults", ] conda_channels: ["pytorch", "conda-forge", "defaults", ]

View File

@ -1,6 +1,4 @@
ARG TAG=3.7.17-alpine3.18 FROM python:3.14-rc-alpine3.21 as build
FROM python:${TAG} as build
RUN apk add --no-cache \ RUN apk add --no-cache \
gcc \ gcc \
@ -16,7 +14,7 @@ RUN python3 \
clearml-agent \ clearml-agent \
cryptography>=2.9 cryptography>=2.9
FROM python:${TAG} as target FROM python:3.14-rc-alpine3.21 as target
WORKDIR /app WORKDIR /app

View File

@ -1,6 +1,4 @@
ARG TAG=3.7.17-slim-bullseye FROM python:3.10-slim-bookworm as target
FROM python:${TAG} as target
ARG KUBECTL_VERSION=1.29.3 ARG KUBECTL_VERSION=1.29.3

View File

@ -6,7 +6,7 @@ spec:
serviceAccountName: "" serviceAccountName: ""
containers: containers:
- name: k8s-glue-container - name: k8s-glue-container
image: allegroai/clearml-agent-k8s:aws-latest-1.21 image: clearml/clearml-agent-k8s:aws-latest-1.21
imagePullPolicy: Always imagePullPolicy: Always
command: [ command: [
"/bin/bash", "/bin/bash",

View File

@ -6,7 +6,7 @@ spec:
serviceAccountName: "" serviceAccountName: ""
containers: containers:
- name: k8s-glue-container - name: k8s-glue-container
image: allegroai/clearml-agent-k8s:gcp-latest-1.21 image: clearml/clearml-agent-k8s:gcp-latest-1.21
imagePullPolicy: Always imagePullPolicy: Always
command: [ command: [
"/bin/bash", "/bin/bash",

View File

@ -20,7 +20,7 @@
"This notebook defines a cloud budget (currently only AWS is supported, but feel free to expand with PRs), and spins an instance the minute a job is waiting for execution. It will also spin down idle machines, saving you some $$$ :)\n", "This notebook defines a cloud budget (currently only AWS is supported, but feel free to expand with PRs), and spins an instance the minute a job is waiting for execution. It will also spin down idle machines, saving you some $$$ :)\n",
"\n", "\n",
"> **Note:**\n", "> **Note:**\n",
"> This is just an example of how you can use ClearML Agent to implement custom autoscaling. For a more structured autoscaler script, see [here](https://github.com/allegroai/clearml/blob/master/clearml/automation/auto_scaler.py).\n", "> This is just an example of how you can use ClearML Agent to implement custom autoscaling. For a more structured autoscaler script, see [here](https://github.com/clearml/clearml/blob/master/clearml/automation/auto_scaler.py).\n",
"\n", "\n",
"Configuration steps:\n", "Configuration steps:\n",
"- Define maximum budget to be used (instance type / number of instances).\n", "- Define maximum budget to be used (instance type / number of instances).\n",

View File

@ -1,6 +1,6 @@
""" """
This example assumes you have preconfigured services with selectors in the form of This example assumes you have preconfigured services with selectors in the form of
"ai.allegro.agent.serial=pod-<number>" and a targetPort of 10022. "ai.clearml.agent.serial=pod-<number>" and a targetPort of 10022.
The K8sIntegration component will label each pod accordingly. The K8sIntegration component will label each pod accordingly.
""" """
from argparse import ArgumentParser from argparse import ArgumentParser
@ -22,7 +22,7 @@ def parse_args():
action="store_true", action="store_true",
default=False, default=False,
help="Ports-Mode will add a label to the pod which can be used as service, in order to expose ports" help="Ports-Mode will add a label to the pod which can be used as service, in order to expose ports"
"Should not be used with max-pods" "Should not be used with max-pods",
) )
parser.add_argument( parser.add_argument(
"--num-of-services", "--num-of-services",
@ -34,15 +34,15 @@ def parse_args():
"--base-port", "--base-port",
type=int, type=int,
help="Used in conjunction with ports-mode, specifies the base port exposed by the services. " help="Used in conjunction with ports-mode, specifies the base port exposed by the services. "
"For pod #X, the port will be <base-port>+X. Note that pod number is calculated based on base-pod-num" "For pod #X, the port will be <base-port>+X. Note that pod number is calculated based on base-pod-num"
"e.g. if base-port=20000 and base-pod-num=3, the port for the first pod will be 20003" "e.g. if base-port=20000 and base-pod-num=3, the port for the first pod will be 20003",
) )
parser.add_argument( parser.add_argument(
"--base-pod-num", "--base-pod-num",
type=int, type=int,
default=1, default=1,
help="Used in conjunction with ports-mode and base-port, specifies the base pod number to be used by the " help="Used in conjunction with ports-mode and base-port, specifies the base pod number to be used by the "
"service (default: %(default)s)" "service (default: %(default)s)",
) )
parser.add_argument( parser.add_argument(
"--gateway-address", "--gateway-address",
@ -62,7 +62,7 @@ def parse_args():
"--template-yaml", "--template-yaml",
type=str, type=str,
help="YAML file containing pod template. If provided pod will be scheduled with kubectl apply " help="YAML file containing pod template. If provided pod will be scheduled with kubectl apply "
"and overrides are ignored, otherwise it will be scheduled with kubectl run" "and overrides are ignored, otherwise it will be scheduled with kubectl run",
) )
parser.add_argument( parser.add_argument(
"--ssh-server-port", "--ssh-server-port",
@ -80,7 +80,7 @@ def parse_args():
"--max-pods", "--max-pods",
type=int, type=int,
help="Limit the maximum number of pods that this service can run at the same time." help="Limit the maximum number of pods that this service can run at the same time."
"Should not be used with ports-mode" "Should not be used with ports-mode",
) )
parser.add_argument( parser.add_argument(
"--use-owner-token", "--use-owner-token",