Compare commits

..

6 Commits

Author SHA1 Message Date
clearml
ee21944f6b Fix UV pip freeze fails 2025-04-02 14:42:02 +03:00
clearml
55790b3c3a Add support for ${CLEARML_TASK.xx} as docker args arguments parsed based on Tasks value 2025-04-02 14:41:35 +03:00
clearml
bb9ad6b213 PEP8 2025-04-02 14:40:19 +03:00
clearml
94fc0138b5 Update base images to resolve several vulnerabilities 2025-04-02 14:39:34 +03:00
clearml
f2df45cad6 Refactor 2025-04-02 14:39:14 +03:00
clearml
326ba81105 Refactor 2025-04-02 14:38:58 +03:00
13 changed files with 222 additions and 33 deletions

View File

@ -192,6 +192,9 @@
# optional arguments to pass to docker image
# these are local for this agent and will not be updated in the experiment's docker_cmd section
# Supports parsing ${CLEARML_TASK:default} and ${CLEARML_QUEUE_NAME:default} values based on Task object
# replace with real-time values.
# Example: "${CLEARML_TASK.project}", "${TASK.hyperparams.properties.user_key.value:default_value}"
# extra_docker_arguments: ["--ipc=host", ]
# Allow the extra docker arg to override task level docker arg (if the same argument is passed on both),

View File

@ -150,7 +150,7 @@ from clearml_agent.helper.repo import clone_repository_cached, RepoInfo, VCS, fi
from clearml_agent.helper.resource_monitor import ResourceMonitor
from clearml_agent.helper.runtime_verification import check_runtime, print_uptime_properties
from clearml_agent.helper.singleton import Singleton
from clearml_agent.helper.docker_args import DockerArgsSanitizer
from clearml_agent.helper.docker_args import DockerArgsSanitizer, DockerArgsTemplateResolver
from clearml_agent.session import Session
from .events import Events
@ -480,7 +480,7 @@ def get_task_container(session, task_id, ignore_match_rules=False, allow_force_c
},
)
if not res.ok:
raise Exception("failed setting runtime property")
raise Exception("failed setting container property")
except Exception as ex:
print("WARNING: failed setting container properties for task '{}': {}".format(task_id, ex))
@ -1240,6 +1240,15 @@ class Worker(ServiceCommandSection):
else:
print("Warning: generated docker container name is invalid: {}".format(name))
# convert template arguments now (i.e. ${CLEARML_} ), this is important for the docker arg
# resolve the Task's docker arguments before everything else, because
# unlike the vault/config these are not running as the agent's user, they are the user's,
# we need to filter them post template parsing limitation to happen before the `docker_image_func` call
docker_args_template_resolver = DockerArgsTemplateResolver(task_session=self._session, task_id=task_id)
if docker_params.get("docker_arguments"):
docker_params["docker_arguments"] = docker_args_template_resolver.resolve_docker_args_from_template(
full_docker_cmd=docker_params["docker_arguments"])
full_docker_cmd = self.docker_image_func(env_task_id=task_id, **docker_params)
# if we are using the default docker, update back the Task:
@ -1256,6 +1265,12 @@ class Worker(ServiceCommandSection):
except Exception:
pass
# convert template arguments now (i.e. ${CLEARML_} )
# Notice we do not parse the last part of the docker cmd because that's
# the actual command to be executed inside the docker
full_docker_cmd = docker_args_template_resolver.resolve_docker_args_from_template(
full_docker_cmd=full_docker_cmd[:-1]) + [full_docker_cmd[-1]]
# if this is services_mode, change the worker_id to a unique name
# abd use full-monitoring, ot it registers itself as a worker for this specific service.
# notice, the internal agent will monitor itself once the docker is up and running
@ -2711,6 +2726,11 @@ class Worker(ServiceCommandSection):
docker_image=docker_image, docker_arguments=docker_arguments, docker_bash_setup_script=docker_setup_script
)
# convert docker template arguments (i.e. ${CLEARML_} ) based on the current Task
docker_args_template_resolver = DockerArgsTemplateResolver(task_session=self._session, task_id=task_id)
full_docker_cmd = docker_args_template_resolver.resolve_docker_args_from_template(
full_docker_cmd=full_docker_cmd)
end_of_build_marker = "build.done=true"
docker_cmd_suffix = ' build --id {task_id} --install-globally; ' \
'ORG=$(stat -c "%u:%g" {conf_file}) ; chown $(whoami):$(whoami) {conf_file} ; ' \
@ -4705,7 +4725,7 @@ class Worker(ServiceCommandSection):
docker_arguments = self._resolve_docker_env_args(docker_arguments)
if extra_docker_arguments:
# we always resolve environments in the `extra_docker_arguments` becuase the admin set them (not users)
# we always resolve environments in the `extra_docker_arguments` because the admin set them (not users)
extra_docker_arguments = self._resolve_docker_env_args(extra_docker_arguments)
extra_docker_arguments = [extra_docker_arguments] \
if isinstance(extra_docker_arguments, six.string_types) else extra_docker_arguments

View File

@ -455,6 +455,9 @@ class K8sIntegration(Worker):
def ports_mode_supported_for_task(self, task_id: str, task_data):
return self.ports_mode
def get_default_docker_image(self, session, queue: str) -> str:
return str(ENV_DOCKER_IMAGE.get() or session.config.get("agent.default_docker.image", "nvidia/cuda"))
def run_one_task(self, queue: Text, task_id: Text, worker_args=None, task_session=None, **_):
print('Pulling task {} launching on kubernetes cluster'.format(task_id))
session = task_session or self._session
@ -509,9 +512,7 @@ class K8sIntegration(Worker):
container = get_task_container(session, task_id)
if not container.get('image'):
container['image'] = str(
ENV_DOCKER_IMAGE.get() or session.config.get("agent.default_docker.image", "nvidia/cuda")
)
container['image'] = self.get_default_docker_image(session, queue)
container['arguments'] = session.config.get("agent.default_docker.arguments", None)
set_task_container(
session, task_id, docker_image=container['image'], docker_arguments=container['arguments']

View File

@ -1,8 +1,12 @@
import re
import shlex
from functools import partial
from typing import Tuple, List, TYPE_CHECKING, Optional
from urllib.parse import urlunparse, urlparse
from string import Template
from clearml_agent.backend_api.services import queues as queues_api
from clearml_agent.backend_api.session import Request
from clearml_agent.definitions import (
ENV_AGENT_GIT_PASS,
ENV_AGENT_SECRET_KEY,
@ -196,7 +200,7 @@ class DockerArgsSanitizer:
(i.e. changing the ports if needed and adding the new env var), runtime property
"""
if not docker_arguments:
return
return None
# make a copy we are going to change it
docker_arguments = docker_arguments[:]
port_mapping_filtered = [
@ -205,7 +209,7 @@ class DockerArgsSanitizer:
]
if not port_mapping_filtered:
return
return None
# test if network=host was requested, docker will ignore published ports anyhow, so no use in parsing them
network_filtered = DockerArgsSanitizer.filter_switches(
@ -213,7 +217,7 @@ class DockerArgsSanitizer:
network_filtered = [t for t in network_filtered if t.strip == "host" or "host" in t.split("=")]
# if any network is configured, we ignore it, there is nothing we can do
if network_filtered:
return
return None
# verifying available ports, remapping if necessary
port_checks = TcpPorts()
@ -270,3 +274,168 @@ class DockerArgsSanitizer:
additional_task_runtime = {"_external_host_tcp_port_mapping": ports}
return docker_arguments+additional_cmd, additional_task_runtime
class CustomTemplate(Template):
"""
Parse ${CLEARML_<something>:default} values based on Task object and replace with real-time value
Example: "-e project_id=${CLEARML_TASK.project}" will be replaced with the
Task actual project ID from the Task object "-e project_id=<task.project>"
"-e queue_name=${CLEARML_QUEUE_NAME}"
"-e user_key=${TASK.hyperparams.properties.user_key.value:default_value}"
It supports:
${QUEUE_NAME} - name of the queue
# Task object nested variables:
${TASK.id}
${TASK.name}
${TASK.project}
${TASK.hyperparams.properties.user_key.value}
"""
idpattern = r'(?a:[_a-z][_a-z0-9|.|:]*)'
prefix = "CLEARML_"
queue_id_to_name_map = {}
@classmethod
def get_queue_name(cls, task_session, queue_id):
if queue_id in cls.queue_id_to_name_map:
return cls.queue_id_to_name_map[queue_id]
# noinspection PyBroadException
try:
response = task_session.send_api(queues_api.GetByIdRequest(queue=queue_id))
cls.queue_id_to_name_map[queue_id] = response.queue.name
except Exception:
# if something went wrong start over from the highest priority queue
return None
return cls.queue_id_to_name_map.get(queue_id)
def default_custom_substitute(self, task_info, queue_name):
return self.custom_substitute(partial(CustomTemplate.default_resolve_template, task_info, queue_name))
def custom_substitute(self, mapping_func):
# Helper function for .sub()
def convert(mo):
named = mo.group('named') or mo.group('braced')
if not named or not str(named).startswith(self.prefix):
return mo.group()
named = named[len(self.prefix):]
if named is not None:
default_value = None
try:
if ":" in named:
named, default_value = named.split(":", 1)
return str(mapping_func(named, default_value))
except KeyError:
return mo.group()
if mo.group('escaped') is not None:
return self.delimiter
if mo.group('invalid') is not None:
return mo.group()
raise ValueError('Unrecognized named group in pattern', self.pattern)
return self.pattern.sub(convert, self.template)
def substitute(self, *args, **kwds):
raise ValueError("Unsupported")
def safe_substitute(self, *args, **kwds):
raise ValueError("Unsupported")
@classmethod
def default_resolve_template(cls, task_info, queue, key, default):
"""
Notice CLEARML_ prefix omitted! (i.e. ${QUEUE_ID} is ${CLEARML_QUEUE_ID})
we support:
${QUEUE_NAME} - name of the queue
${WORKER_ID} - FUTURE
# we also complex variables:
${TASK.id}
${TASK.name}
${TASK.project.id}
${TASK.project.name}
${TASK.hyperparams.properties.user_key.value}
:param task_info: nested dict with task information
:param queue: queue_id (str)
:param key: key to be replaced
:param default: default value, None will raise exception
:return: string value
"""
try:
parts = key.split(".")
main_part = parts[0]
if main_part == "QUEUE_NAME":
if len(parts) == 1:
return queue or default
raise ValueError()
elif main_part == "QUEUE_NAME":
# future support
raise ValueError()
elif main_part == "WORKER_ID":
# future support
raise ValueError()
elif main_part == "TASK":
for part in parts[1:]:
task_info = task_info.get(part)
if task_info is None:
break
if isinstance(task_info, str):
return task_info
if default:
return default
raise ValueError()
except Exception:
raise KeyError((key,))
# default, nothing
raise KeyError((key,))
class DockerArgsTemplateResolver:
def __init__(self, task_session, task_id):
self._task_session = task_session
self.task_info = None
self.queue_name = None
self.task_id = task_id
def resolve_docker_args_from_template(self, full_docker_cmd):
if not full_docker_cmd or not self._task_session.check_min_api_version("2.20"):
return full_docker_cmd
# convert docker template arguments (i.e. ${CLEARML_} ) based on the current Task
for i, token in enumerate(full_docker_cmd[:-1]):
# skip the ones which are obviously not our prefix
if not CustomTemplate.delimiter in token or not CustomTemplate.prefix in token:
continue
if self.task_info is None:
result = self._task_session.send_request(
service='tasks',
action='get_all',
version='2.20',
method=Request.def_method,
json={'id': [self.task_id], 'search_hidden': True}
)
# we should not fail here
self.task_info = result.json().get("data", {}).get("tasks", [])[0] or {}
queue_id = self.task_info.get("execution", {}).get("queue")
self.queue_name = CustomTemplate.get_queue_name(self._task_session, queue_id)
tmpl = CustomTemplate(token)
# replace it
try:
full_docker_cmd[i] = tmpl.default_custom_substitute(self.task_info, self.queue_name)
except Exception as ex:
print("Failed parsing ClearML Template argument [{}] skipped: error ()".format(token, ex))
return full_docker_cmd

View File

@ -247,7 +247,7 @@ class UvAPI(VirtualenvPip):
# there is a bug so we have to call pip to get the freeze because UV will return the wrong list
# packages = self.run_with_env(('freeze',), output=True).splitlines()
packages = self.lock_config.get_run_argv(
"pip", "freeze", "--python", str(Path(self.path) / "bin" / "python"), cwd=self.lockfile_path).get_output().splitlines()
"pip", "freeze", cwd=self.lockfile_path).get_output().splitlines()
# list clearml_agent as well
# packages_without_program = [package for package in packages if PROGRAM_NAME not in package]
return {'pip': packages}

View File

@ -89,9 +89,9 @@ def kill_all_child_processes(pid=None, include_parent=True):
print("\nLeaving process id {}".format(pid))
try:
parent = psutil.Process(pid)
except psutil.Error:
# could not find parent process id
print("ERROR: could not find parent process id {}".format(pid))
except psutil.Error as ex:
# could not find process id
print("ERROR: could not find process id {}: {}".format(pid, ex))
return
for child in parent.children(recursive=True):
try:
@ -113,7 +113,7 @@ def terminate_all_child_processes(pid=None, timeout=10., include_parent=True):
try:
parent = psutil.Process(pid)
except psutil.Error:
# could not find parent process id
# could not find process id
return
for child in parent.children(recursive=False):
print('Terminating child process {}'.format(child.pid))

View File

@ -13,7 +13,7 @@ api {
agent.git_user=""
agent.git_pass=""
# extra_index_url: ["https://allegroai.jfrog.io/clearml/api/pypi/public/simple"]
# extra_index_url: ["https://clearml.jfrog.io/clearml/api/pypi/public/simple"]
agent.package_manager.extra_index_url= [
]
@ -68,7 +68,7 @@ agent {
force_upgrade: false,
# additional artifact repositories to use when installing python packages
# extra_index_url: ["https://allegroai.jfrog.io/clearmlai/api/pypi/public/simple"]
# extra_index_url: ["https://clearml.jfrog.io/clearmlai/api/pypi/public/simple"]
# additional conda channels to use when installing with conda package manager
conda_channels: ["pytorch", "conda-forge", "defaults", ]

View File

@ -1,6 +1,4 @@
ARG TAG=3.7.17-alpine3.18
FROM python:${TAG} as build
FROM python:3.14-rc-alpine3.21 as build
RUN apk add --no-cache \
gcc \
@ -16,7 +14,7 @@ RUN python3 \
clearml-agent \
cryptography>=2.9
FROM python:${TAG} as target
FROM python:3.14-rc-alpine3.21 as target
WORKDIR /app

View File

@ -1,6 +1,4 @@
ARG TAG=3.7.17-slim-bullseye
FROM python:${TAG} as target
FROM python:3.10-slim-bookworm as target
ARG KUBECTL_VERSION=1.29.3

View File

@ -6,7 +6,7 @@ spec:
serviceAccountName: ""
containers:
- name: k8s-glue-container
image: allegroai/clearml-agent-k8s:aws-latest-1.21
image: clearml/clearml-agent-k8s:aws-latest-1.21
imagePullPolicy: Always
command: [
"/bin/bash",

View File

@ -6,7 +6,7 @@ spec:
serviceAccountName: ""
containers:
- name: k8s-glue-container
image: allegroai/clearml-agent-k8s:gcp-latest-1.21
image: clearml/clearml-agent-k8s:gcp-latest-1.21
imagePullPolicy: Always
command: [
"/bin/bash",

View File

@ -20,7 +20,7 @@
"This notebook defines a cloud budget (currently only AWS is supported, but feel free to expand with PRs), and spins an instance the minute a job is waiting for execution. It will also spin down idle machines, saving you some $$$ :)\n",
"\n",
"> **Note:**\n",
"> This is just an example of how you can use ClearML Agent to implement custom autoscaling. For a more structured autoscaler script, see [here](https://github.com/allegroai/clearml/blob/master/clearml/automation/auto_scaler.py).\n",
"> This is just an example of how you can use ClearML Agent to implement custom autoscaling. For a more structured autoscaler script, see [here](https://github.com/clearml/clearml/blob/master/clearml/automation/auto_scaler.py).\n",
"\n",
"Configuration steps:\n",
"- Define maximum budget to be used (instance type / number of instances).\n",

View File

@ -1,6 +1,6 @@
"""
This example assumes you have preconfigured services with selectors in the form of
"ai.allegro.agent.serial=pod-<number>" and a targetPort of 10022.
"ai.clearml.agent.serial=pod-<number>" and a targetPort of 10022.
The K8sIntegration component will label each pod accordingly.
"""
from argparse import ArgumentParser
@ -22,7 +22,7 @@ def parse_args():
action="store_true",
default=False,
help="Ports-Mode will add a label to the pod which can be used as service, in order to expose ports"
"Should not be used with max-pods"
"Should not be used with max-pods",
)
parser.add_argument(
"--num-of-services",
@ -34,15 +34,15 @@ def parse_args():
"--base-port",
type=int,
help="Used in conjunction with ports-mode, specifies the base port exposed by the services. "
"For pod #X, the port will be <base-port>+X. Note that pod number is calculated based on base-pod-num"
"e.g. if base-port=20000 and base-pod-num=3, the port for the first pod will be 20003"
"For pod #X, the port will be <base-port>+X. Note that pod number is calculated based on base-pod-num"
"e.g. if base-port=20000 and base-pod-num=3, the port for the first pod will be 20003",
)
parser.add_argument(
"--base-pod-num",
type=int,
default=1,
help="Used in conjunction with ports-mode and base-port, specifies the base pod number to be used by the "
"service (default: %(default)s)"
"service (default: %(default)s)",
)
parser.add_argument(
"--gateway-address",
@ -62,7 +62,7 @@ def parse_args():
"--template-yaml",
type=str,
help="YAML file containing pod template. If provided pod will be scheduled with kubectl apply "
"and overrides are ignored, otherwise it will be scheduled with kubectl run"
"and overrides are ignored, otherwise it will be scheduled with kubectl run",
)
parser.add_argument(
"--ssh-server-port",
@ -80,7 +80,7 @@ def parse_args():
"--max-pods",
type=int,
help="Limit the maximum number of pods that this service can run at the same time."
"Should not be used with ports-mode"
"Should not be used with ports-mode",
)
parser.add_argument(
"--use-owner-token",