mirror of
https://github.com/clearml/clearml-agent
synced 2025-06-26 18:16:15 +00:00
Compare commits
19 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3e5153a068 | ||
|
|
740f90c96f | ||
|
|
ba854aa53b | ||
|
|
c76dfe7ce6 | ||
|
|
e551ee1eb5 | ||
|
|
eed930b9a6 | ||
|
|
4881b9638d | ||
|
|
3585786348 | ||
|
|
afe69c822f | ||
|
|
8d8dc4e396 | ||
|
|
553c72e06a | ||
|
|
768ee3d2cf | ||
|
|
30d24beb51 | ||
|
|
ee21944f6b | ||
|
|
55790b3c3a | ||
|
|
bb9ad6b213 | ||
|
|
94fc0138b5 | ||
|
|
f2df45cad6 | ||
|
|
326ba81105 |
@@ -192,6 +192,9 @@
|
||||
|
||||
# optional arguments to pass to docker image
|
||||
# these are local for this agent and will not be updated in the experiment's docker_cmd section
|
||||
# Supports parsing ${CLEARML_TASK:default} and ${CLEARML_QUEUE_NAME:default} values based on Task object
|
||||
# replace with real-time values.
|
||||
# Example: "${CLEARML_TASK.project}", "${TASK.hyperparams.properties.user_key.value:default_value}"
|
||||
# extra_docker_arguments: ["--ipc=host", ]
|
||||
|
||||
# Allow the extra docker arg to override task level docker arg (if the same argument is passed on both),
|
||||
|
||||
@@ -79,7 +79,11 @@ from clearml_agent.definitions import (
|
||||
ENV_AGENT_FORCE_EXEC_SCRIPT,
|
||||
ENV_TEMP_STDOUT_FILE_DIR,
|
||||
ENV_AGENT_FORCE_TASK_INIT,
|
||||
ENV_AGENT_DEBUG_GET_NEXT_TASK, ENV_ABORT_CALLBACK_CMD, ENV_ABORT_CALLBACK_CMD_TIMEOUT,
|
||||
ENV_AGENT_DEBUG_GET_NEXT_TASK,
|
||||
ENV_ABORT_CALLBACK_CMD,
|
||||
ENV_ABORT_CALLBACK_CMD_TIMEOUT,
|
||||
ENV_QUEUE_POLL_FREQ_SEC,
|
||||
ENV_STATUS_REPORT_FREQ_SEC,
|
||||
)
|
||||
from clearml_agent.definitions import WORKING_REPOSITORY_DIR, PIP_EXTRA_INDICES
|
||||
from clearml_agent.errors import (
|
||||
@@ -150,7 +154,7 @@ from clearml_agent.helper.repo import clone_repository_cached, RepoInfo, VCS, fi
|
||||
from clearml_agent.helper.resource_monitor import ResourceMonitor
|
||||
from clearml_agent.helper.runtime_verification import check_runtime, print_uptime_properties
|
||||
from clearml_agent.helper.singleton import Singleton
|
||||
from clearml_agent.helper.docker_args import DockerArgsSanitizer
|
||||
from clearml_agent.helper.docker_args import DockerArgsSanitizer, DockerArgsTemplateResolver
|
||||
from clearml_agent.session import Session
|
||||
from .events import Events
|
||||
|
||||
@@ -480,7 +484,7 @@ def get_task_container(session, task_id, ignore_match_rules=False, allow_force_c
|
||||
},
|
||||
)
|
||||
if not res.ok:
|
||||
raise Exception("failed setting runtime property")
|
||||
raise Exception("failed setting container property")
|
||||
except Exception as ex:
|
||||
print("WARNING: failed setting container properties for task '{}': {}".format(task_id, ex))
|
||||
|
||||
@@ -865,10 +869,10 @@ class Worker(ServiceCommandSection):
|
||||
)
|
||||
|
||||
# default poll queues every _polling_interval seconds
|
||||
_polling_interval = 5.0
|
||||
_polling_interval = ENV_QUEUE_POLL_FREQ_SEC.get() or 5.0
|
||||
|
||||
# machine status update intervals, seconds
|
||||
_machine_update_interval = 30.0
|
||||
_machine_update_interval = ENV_STATUS_REPORT_FREQ_SEC.get() or 30.0
|
||||
|
||||
# message printed before starting task logging,
|
||||
# it will be parsed by services_mode, to identify internal docker logging start
|
||||
@@ -1240,6 +1244,15 @@ class Worker(ServiceCommandSection):
|
||||
else:
|
||||
print("Warning: generated docker container name is invalid: {}".format(name))
|
||||
|
||||
# convert template arguments now (i.e. ${CLEARML_} ), this is important for the docker arg
|
||||
# resolve the Task's docker arguments before everything else, because
|
||||
# unlike the vault/config these are not running as the agent's user, they are the user's,
|
||||
# we need to filter them post template parsing limitation to happen before the `docker_image_func` call
|
||||
docker_args_template_resolver = DockerArgsTemplateResolver(task_session=self._session, task_id=task_id)
|
||||
if docker_params.get("docker_arguments"):
|
||||
docker_params["docker_arguments"] = docker_args_template_resolver.resolve_docker_args_from_template(
|
||||
full_docker_cmd=docker_params["docker_arguments"])
|
||||
|
||||
full_docker_cmd = self.docker_image_func(env_task_id=task_id, **docker_params)
|
||||
|
||||
# if we are using the default docker, update back the Task:
|
||||
@@ -1256,6 +1269,12 @@ class Worker(ServiceCommandSection):
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# convert template arguments now (i.e. ${CLEARML_} )
|
||||
# Notice we do not parse the last part of the docker cmd because that's
|
||||
# the actual command to be executed inside the docker
|
||||
full_docker_cmd = docker_args_template_resolver.resolve_docker_args_from_template(
|
||||
full_docker_cmd=full_docker_cmd[:-1]) + [full_docker_cmd[-1]]
|
||||
|
||||
# if this is services_mode, change the worker_id to a unique name
|
||||
# abd use full-monitoring, ot it registers itself as a worker for this specific service.
|
||||
# notice, the internal agent will monitor itself once the docker is up and running
|
||||
@@ -1376,6 +1395,10 @@ class Worker(ServiceCommandSection):
|
||||
headers=headers
|
||||
)
|
||||
if not (result.ok() and result.response):
|
||||
try:
|
||||
self.log.debug("Failed creating task session: %s", result.response)
|
||||
except:
|
||||
pass
|
||||
return
|
||||
new_session = copy(session)
|
||||
new_session.config = deepcopy(session.config)
|
||||
@@ -1707,16 +1730,24 @@ class Worker(ServiceCommandSection):
|
||||
def _dynamic_gpu_get_available(self, gpu_indexes):
|
||||
# key: cast to string, value: 1 (i.e. gull GPU)
|
||||
gpu_indexes = {str(g): 1 for g in gpu_indexes}
|
||||
worker_name = self._session.config.get("agent.worker_name", "") + ':gpu'
|
||||
|
||||
# only return "Our" workers (requires server API +2, otherwise the selecort pattern is ignored)
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
response = self._session.send_api(workers_api.GetAllRequest(last_seen=600))
|
||||
response = self._session.send_api(workers_api.GetAllRequest(
|
||||
last_seen=600,
|
||||
worker_pattern="{}*".format(worker_name),
|
||||
_allow_extra_fields_=True
|
||||
))
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
worker_name = self._session.config.get("agent.worker_name", "") + ':gpu'
|
||||
# filter only our workers, in case the selector pattern above was ignored due to lower version API server
|
||||
our_workers = [
|
||||
w.id for w in response.workers
|
||||
if w.id.startswith(worker_name) and w.id != self.worker_id]
|
||||
if w.id.startswith(worker_name) and w.id != self.worker_id
|
||||
]
|
||||
gpus = {}
|
||||
allocated_gpus = {}
|
||||
gpu_pattern = re.compile(r"\d+[.]?\d*[a-z]?")
|
||||
@@ -2002,7 +2033,7 @@ class Worker(ServiceCommandSection):
|
||||
columns = ("id", "name", "tags")
|
||||
print("Listening to queues:")
|
||||
if dynamic_gpus:
|
||||
columns = ("id", "name", "tags", "gpus")
|
||||
columns = ("id", "name", "tags", "gpus (min, max)")
|
||||
for q in queues_info:
|
||||
q['gpus'] = str(dict(dynamic_gpus).get(q['id']) or '')
|
||||
print_table(queues_info, columns=columns, titles=columns)
|
||||
@@ -2711,6 +2742,11 @@ class Worker(ServiceCommandSection):
|
||||
docker_image=docker_image, docker_arguments=docker_arguments, docker_bash_setup_script=docker_setup_script
|
||||
)
|
||||
|
||||
# convert docker template arguments (i.e. ${CLEARML_} ) based on the current Task
|
||||
docker_args_template_resolver = DockerArgsTemplateResolver(task_session=self._session, task_id=task_id)
|
||||
full_docker_cmd = docker_args_template_resolver.resolve_docker_args_from_template(
|
||||
full_docker_cmd=full_docker_cmd)
|
||||
|
||||
end_of_build_marker = "build.done=true"
|
||||
docker_cmd_suffix = ' build --id {task_id} --install-globally; ' \
|
||||
'ORG=$(stat -c "%u:%g" {conf_file}) ; chown $(whoami):$(whoami) {conf_file} ; ' \
|
||||
@@ -3457,16 +3493,24 @@ class Worker(ServiceCommandSection):
|
||||
session = session or self._session
|
||||
try:
|
||||
if stop_reason == TaskStopReason.stopped:
|
||||
self.log("Stopping - tasks.stop was called for task")
|
||||
self.send_logs(task_id, ["Process aborted by user"], session=session)
|
||||
session.send_api(
|
||||
tasks_api.StoppedRequest(
|
||||
task=task_id,
|
||||
status_reason="task was stopped by tasks.stop",
|
||||
status_message=self._task_status_change_message,
|
||||
force=False
|
||||
# do not change the status to stopped if the Task status is already failed
|
||||
task_status = get_task(
|
||||
session, task_id, only_fields=["status"]
|
||||
).status
|
||||
if str(task_status) == "failed":
|
||||
self.send_logs(task_id, ["Process aborted by user - Task status was Failed"], session=session)
|
||||
self.log("Stopping - task was already marked as failed")
|
||||
else:
|
||||
self.send_logs(task_id, ["Process aborted by user"], session=session)
|
||||
self.log("Stopping - tasks.stop was called for task")
|
||||
session.send_api(
|
||||
tasks_api.StoppedRequest(
|
||||
task=task_id,
|
||||
status_reason="task was stopped by tasks.stop",
|
||||
status_message=self._task_status_change_message,
|
||||
force=False
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
elif stop_reason == TaskStopReason.status_changed:
|
||||
try:
|
||||
@@ -3716,7 +3760,7 @@ class Worker(ServiceCommandSection):
|
||||
# revert to venv that we used inside UV
|
||||
api = None
|
||||
self.package_api = package_api = package_api.get_venv_manager()
|
||||
elif not api:
|
||||
elif self._session.config.get("agent.package_manager.type", None) == "uv" and not api:
|
||||
# this means `agent.package_manager.uv_replace_pip` is set to true
|
||||
print("INFO: using UV as pip drop-in replacement")
|
||||
|
||||
@@ -4705,7 +4749,7 @@ class Worker(ServiceCommandSection):
|
||||
docker_arguments = self._resolve_docker_env_args(docker_arguments)
|
||||
|
||||
if extra_docker_arguments:
|
||||
# we always resolve environments in the `extra_docker_arguments` becuase the admin set them (not users)
|
||||
# we always resolve environments in the `extra_docker_arguments` because the admin set them (not users)
|
||||
extra_docker_arguments = self._resolve_docker_env_args(extra_docker_arguments)
|
||||
extra_docker_arguments = [extra_docker_arguments] \
|
||||
if isinstance(extra_docker_arguments, six.string_types) else extra_docker_arguments
|
||||
@@ -4874,7 +4918,7 @@ class Worker(ServiceCommandSection):
|
||||
"declare LOCAL_PYTHON",
|
||||
"[ ! -z $LOCAL_PYTHON ] || for i in {{20..5}}; do (which {python_single_digit}.$i 2> /dev/null || command -v {python_single_digit}.$i) && " +
|
||||
"{python_single_digit}.$i -m pip --version && " +
|
||||
"export LOCAL_PYTHON=$(which {python_single_digit}.$i 2> /dev/null || command -v git) && break ; done",
|
||||
"export LOCAL_PYTHON=$(which {python_single_digit}.$i 2> /dev/null || command -v {python_single_digit}.$i) && break ; done",
|
||||
"[ ! -z $LOCAL_PYTHON ] || export CLEARML_APT_INSTALL=\"$CLEARML_APT_INSTALL {python_single_digit}-pip\"", # noqa
|
||||
"[ -z \"$CLEARML_APT_INSTALL\" ] || "
|
||||
"(apt-get update -y ; apt-get install -y $CLEARML_APT_INSTALL) || "
|
||||
@@ -4888,11 +4932,13 @@ class Worker(ServiceCommandSection):
|
||||
docker_bash_script = " ; ".join([line for line in bash_script if line]) \
|
||||
if not isinstance(bash_script, str) else bash_script
|
||||
|
||||
# make sure that if we do not have $LOCAL_PYTHON defined
|
||||
# we set it to python3
|
||||
# make sure that if we do not have $LOCAL_PYTHON defined, we set it to python3
|
||||
# notice that if $LOCAL_PYTHON -m pip fails, that means we might have a broken python in the path
|
||||
# so we set to default path and try to set global python
|
||||
update_scheme += (
|
||||
docker_bash_script + " ; " +
|
||||
"[ ! -z $LOCAL_PYTHON ] || export LOCAL_PYTHON={python} ; " +
|
||||
"$LOCAL_PYTHON -m pip --version > /dev/null || export LOCAL_PYTHON=$(PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin command -v python3) ; " +
|
||||
"$LOCAL_PYTHON -m pip install -U {pip_version} ; " +
|
||||
"$LOCAL_PYTHON -m pip install -U {clearml_agent_wheel} ; ").format(
|
||||
python_single_digit=python_version.split('.')[0],
|
||||
|
||||
@@ -256,6 +256,10 @@ ENV_GIT_CLONE_VERBOSE = EnvironmentConfig("CLEARML_AGENT_GIT_CLONE_VERBOSE", typ
|
||||
|
||||
ENV_GPU_FRACTIONS = EnvironmentConfig("CLEARML_AGENT_GPU_FRACTIONS")
|
||||
|
||||
ENV_QUEUE_POLL_FREQ_SEC = EnvironmentConfig("CLEARML_AGENT_QUEUE_POLL_FREQ_SEC", type=float)
|
||||
|
||||
ENV_STATUS_REPORT_FREQ_SEC = EnvironmentConfig("CLEARML_AGENT_STATUS_REPORT_FREQ_SEC", type=float)
|
||||
|
||||
|
||||
class FileBuffering(IntEnum):
|
||||
"""
|
||||
|
||||
@@ -90,6 +90,7 @@ class K8sIntegration(Worker):
|
||||
'[ ! -z "$CLEARML_AGENT_SKIP_CONTAINER_APT" ] || [ ! -z "$LOCAL_PYTHON" ] || '
|
||||
'apt-get install -y python3-pip || dnf install -y python3-pip',
|
||||
"[ ! -z $LOCAL_PYTHON ] || export LOCAL_PYTHON=python3",
|
||||
"[ ! -z $CLEARML_AGENT_NO_UPDATE ] || $LOCAL_PYTHON -m pip --version > /dev/null || export LOCAL_PYTHON=$(PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin command -v python3)",
|
||||
"rm -f /usr/lib/python3.*/EXTERNALLY-MANAGED", # remove PEP 668
|
||||
"{extra_bash_init_cmd}",
|
||||
"[ ! -z $CLEARML_AGENT_NO_UPDATE ] || $LOCAL_PYTHON -m pip install clearml-agent{agent_install_args}",
|
||||
@@ -147,6 +148,10 @@ class K8sIntegration(Worker):
|
||||
:param str extra_bash_init_script: Additional bash script to run before starting the Task inside the container
|
||||
:param str namespace: K8S namespace to be used when creating the new pods (default: clearml)
|
||||
:param int max_pods_limit: Maximum number of pods that K8S glue can run at the same time
|
||||
:param str pod_name_prefix: Define pod name prefix for k8s (default: clearml-id-)
|
||||
:param str limit_pod_label: Define limit pod label for k8s (default: ai.allegro.agent.serial=pod-{pod_number})
|
||||
:param bool force_system_packages: true when running tasks in containers (i.e. docker mode or k8s glue).
|
||||
(default: true)
|
||||
"""
|
||||
super(K8sIntegration, self).__init__()
|
||||
self.kind = os.environ.get("CLEARML_K8S_GLUE_KIND", "pod").strip().lower()
|
||||
@@ -455,6 +460,9 @@ class K8sIntegration(Worker):
|
||||
def ports_mode_supported_for_task(self, task_id: str, task_data):
|
||||
return self.ports_mode
|
||||
|
||||
def get_default_docker_image(self, session, queue: str) -> str:
|
||||
return str(ENV_DOCKER_IMAGE.get() or session.config.get("agent.default_docker.image", "nvidia/cuda"))
|
||||
|
||||
def run_one_task(self, queue: Text, task_id: Text, worker_args=None, task_session=None, **_):
|
||||
print('Pulling task {} launching on kubernetes cluster'.format(task_id))
|
||||
session = task_session or self._session
|
||||
@@ -509,9 +517,7 @@ class K8sIntegration(Worker):
|
||||
|
||||
container = get_task_container(session, task_id)
|
||||
if not container.get('image'):
|
||||
container['image'] = str(
|
||||
ENV_DOCKER_IMAGE.get() or session.config.get("agent.default_docker.image", "nvidia/cuda")
|
||||
)
|
||||
container['image'] = self.get_default_docker_image(session, queue)
|
||||
container['arguments'] = session.config.get("agent.default_docker.arguments", None)
|
||||
set_task_container(
|
||||
session, task_id, docker_image=container['image'], docker_arguments=container['arguments']
|
||||
@@ -838,8 +844,11 @@ class K8sIntegration(Worker):
|
||||
def get_task_worker_id(self, template, task_id, pod_name, namespace, queue):
|
||||
return f"{self.worker_id}:{task_id}"
|
||||
|
||||
def use_image_entrypoint(self, queue: str, task_id: str, docker_image: str) -> bool:
|
||||
return ENV_POD_USE_IMAGE_ENTRYPOINT.get()
|
||||
|
||||
def _create_template_container(
|
||||
self, pod_name: str, task_id: str, docker_image: str, docker_args: List[str],
|
||||
self, pod_name: str, task_id: str, docker_image: str, docker_args: List[str], queue: str,
|
||||
docker_bash: str, clearml_conf_create_script: List[str], task_worker_id: str, task_token: str = None
|
||||
) -> dict:
|
||||
container = self._get_docker_args(
|
||||
@@ -861,7 +870,7 @@ class K8sIntegration(Worker):
|
||||
# Set worker ID
|
||||
add_or_update_env_var('CLEARML_WORKER_ID', task_worker_id)
|
||||
|
||||
if ENV_POD_USE_IMAGE_ENTRYPOINT.get():
|
||||
if self.use_image_entrypoint(queue=queue, task_id=task_id, docker_image=docker_image):
|
||||
# Don't add a cmd and args, just the image
|
||||
|
||||
# Add the task ID and token since we need it (it's usually in the init script passed to us
|
||||
@@ -978,6 +987,7 @@ class K8sIntegration(Worker):
|
||||
clearml_conf_create_script=clearml_conf_create_script,
|
||||
task_worker_id=task_worker_id,
|
||||
task_token=task_token,
|
||||
queue=queue,
|
||||
)
|
||||
|
||||
if containers:
|
||||
@@ -1291,7 +1301,7 @@ class K8sIntegration(Worker):
|
||||
|
||||
self.report_monitor(ResourceMonitor.StatusReport(queues=queues, queue=queue, task=task_id))
|
||||
self.run_one_task(queue, task_id, worker_params, task_session)
|
||||
self.report_monitor(ResourceMonitor.StatusReport(queues=self.queues))
|
||||
self.report_monitor(ResourceMonitor.StatusReport(queues=queues))
|
||||
break
|
||||
else:
|
||||
# sleep and retry polling
|
||||
|
||||
@@ -1,8 +1,12 @@
|
||||
import re
|
||||
import shlex
|
||||
from functools import partial
|
||||
from typing import Tuple, List, TYPE_CHECKING, Optional
|
||||
from urllib.parse import urlunparse, urlparse
|
||||
from string import Template
|
||||
|
||||
from clearml_agent.backend_api.services import queues as queues_api
|
||||
from clearml_agent.backend_api.session import Request
|
||||
from clearml_agent.definitions import (
|
||||
ENV_AGENT_GIT_PASS,
|
||||
ENV_AGENT_SECRET_KEY,
|
||||
@@ -196,7 +200,7 @@ class DockerArgsSanitizer:
|
||||
(i.e. changing the ports if needed and adding the new env var), runtime property
|
||||
"""
|
||||
if not docker_arguments:
|
||||
return
|
||||
return None
|
||||
# make a copy we are going to change it
|
||||
docker_arguments = docker_arguments[:]
|
||||
port_mapping_filtered = [
|
||||
@@ -205,7 +209,7 @@ class DockerArgsSanitizer:
|
||||
]
|
||||
|
||||
if not port_mapping_filtered:
|
||||
return
|
||||
return None
|
||||
|
||||
# test if network=host was requested, docker will ignore published ports anyhow, so no use in parsing them
|
||||
network_filtered = DockerArgsSanitizer.filter_switches(
|
||||
@@ -213,7 +217,7 @@ class DockerArgsSanitizer:
|
||||
network_filtered = [t for t in network_filtered if t.strip == "host" or "host" in t.split("=")]
|
||||
# if any network is configured, we ignore it, there is nothing we can do
|
||||
if network_filtered:
|
||||
return
|
||||
return None
|
||||
|
||||
# verifying available ports, remapping if necessary
|
||||
port_checks = TcpPorts()
|
||||
@@ -270,3 +274,168 @@ class DockerArgsSanitizer:
|
||||
additional_task_runtime = {"_external_host_tcp_port_mapping": ports}
|
||||
|
||||
return docker_arguments+additional_cmd, additional_task_runtime
|
||||
|
||||
|
||||
class CustomTemplate(Template):
|
||||
"""
|
||||
Parse ${CLEARML_<something>:default} values based on Task object and replace with real-time value
|
||||
Example: "-e project_id=${CLEARML_TASK.project}" will be replaced with the
|
||||
Task actual project ID from the Task object "-e project_id=<task.project>"
|
||||
"-e queue_name=${CLEARML_QUEUE_NAME}"
|
||||
"-e user_key=${TASK.hyperparams.properties.user_key.value:default_value}"
|
||||
|
||||
It supports:
|
||||
|
||||
${QUEUE_NAME} - name of the queue
|
||||
# Task object nested variables:
|
||||
${TASK.id}
|
||||
${TASK.name}
|
||||
${TASK.project}
|
||||
${TASK.hyperparams.properties.user_key.value}
|
||||
"""
|
||||
|
||||
idpattern = r'(?a:[_a-z][_a-z0-9|.|:]*)'
|
||||
prefix = "CLEARML_"
|
||||
queue_id_to_name_map = {}
|
||||
|
||||
@classmethod
|
||||
def get_queue_name(cls, task_session, queue_id):
|
||||
if queue_id in cls.queue_id_to_name_map:
|
||||
return cls.queue_id_to_name_map[queue_id]
|
||||
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
response = task_session.send_api(queues_api.GetByIdRequest(queue=queue_id))
|
||||
cls.queue_id_to_name_map[queue_id] = response.queue.name
|
||||
except Exception:
|
||||
# if something went wrong start over from the highest priority queue
|
||||
return None
|
||||
return cls.queue_id_to_name_map.get(queue_id)
|
||||
|
||||
def default_custom_substitute(self, task_info, queue_name):
|
||||
return self.custom_substitute(partial(CustomTemplate.default_resolve_template, task_info, queue_name))
|
||||
|
||||
def custom_substitute(self, mapping_func):
|
||||
# Helper function for .sub()
|
||||
def convert(mo):
|
||||
named = mo.group('named') or mo.group('braced')
|
||||
if not named or not str(named).startswith(self.prefix):
|
||||
return mo.group()
|
||||
named = named[len(self.prefix):]
|
||||
if named is not None:
|
||||
default_value = None
|
||||
try:
|
||||
if ":" in named:
|
||||
named, default_value = named.split(":", 1)
|
||||
|
||||
return str(mapping_func(named, default_value))
|
||||
except KeyError:
|
||||
return mo.group()
|
||||
if mo.group('escaped') is not None:
|
||||
return self.delimiter
|
||||
if mo.group('invalid') is not None:
|
||||
return mo.group()
|
||||
raise ValueError('Unrecognized named group in pattern', self.pattern)
|
||||
|
||||
return self.pattern.sub(convert, self.template)
|
||||
|
||||
def substitute(self, *args, **kwds):
|
||||
raise ValueError("Unsupported")
|
||||
|
||||
def safe_substitute(self, *args, **kwds):
|
||||
raise ValueError("Unsupported")
|
||||
|
||||
@classmethod
|
||||
def default_resolve_template(cls, task_info, queue, key, default):
|
||||
"""
|
||||
Notice CLEARML_ prefix omitted! (i.e. ${QUEUE_ID} is ${CLEARML_QUEUE_ID})
|
||||
|
||||
we support:
|
||||
${QUEUE_NAME} - name of the queue
|
||||
${WORKER_ID} - FUTURE
|
||||
|
||||
# we also complex variables:
|
||||
${TASK.id}
|
||||
${TASK.name}
|
||||
${TASK.project.id}
|
||||
${TASK.project.name}
|
||||
${TASK.hyperparams.properties.user_key.value}
|
||||
|
||||
:param task_info: nested dict with task information
|
||||
:param queue: queue_id (str)
|
||||
:param key: key to be replaced
|
||||
:param default: default value, None will raise exception
|
||||
:return: string value
|
||||
"""
|
||||
try:
|
||||
parts = key.split(".")
|
||||
main_part = parts[0]
|
||||
if main_part == "QUEUE_NAME":
|
||||
if len(parts) == 1:
|
||||
return queue or default
|
||||
raise ValueError()
|
||||
elif main_part == "QUEUE_NAME":
|
||||
# future support
|
||||
raise ValueError()
|
||||
elif main_part == "WORKER_ID":
|
||||
# future support
|
||||
raise ValueError()
|
||||
elif main_part == "TASK":
|
||||
for part in parts[1:]:
|
||||
|
||||
task_info = task_info.get(part)
|
||||
if task_info is None:
|
||||
break
|
||||
|
||||
if isinstance(task_info, str):
|
||||
return task_info
|
||||
|
||||
if default:
|
||||
return default
|
||||
raise ValueError()
|
||||
|
||||
except Exception:
|
||||
raise KeyError((key,))
|
||||
|
||||
# default, nothing
|
||||
raise KeyError((key,))
|
||||
|
||||
|
||||
class DockerArgsTemplateResolver:
|
||||
def __init__(self, task_session, task_id):
|
||||
self._task_session = task_session
|
||||
self.task_info = None
|
||||
self.queue_name = None
|
||||
self.task_id = task_id
|
||||
|
||||
def resolve_docker_args_from_template(self, full_docker_cmd):
|
||||
if not full_docker_cmd or not self._task_session.check_min_api_version("2.20"):
|
||||
return full_docker_cmd
|
||||
|
||||
# convert docker template arguments (i.e. ${CLEARML_} ) based on the current Task
|
||||
for i, token in enumerate(full_docker_cmd[:-1]):
|
||||
# skip the ones which are obviously not our prefix
|
||||
if not CustomTemplate.delimiter in token or not CustomTemplate.prefix in token:
|
||||
continue
|
||||
|
||||
if self.task_info is None:
|
||||
result = self._task_session.send_request(
|
||||
service='tasks',
|
||||
action='get_all',
|
||||
version='2.20',
|
||||
method=Request.def_method,
|
||||
json={'id': [self.task_id], 'search_hidden': True}
|
||||
)
|
||||
# we should not fail here
|
||||
self.task_info = result.json().get("data", {}).get("tasks", [])[0] or {}
|
||||
queue_id = self.task_info.get("execution", {}).get("queue")
|
||||
self.queue_name = CustomTemplate.get_queue_name(self._task_session, queue_id)
|
||||
|
||||
tmpl = CustomTemplate(token)
|
||||
# replace it
|
||||
try:
|
||||
full_docker_cmd[i] = tmpl.default_custom_substitute(self.task_info, self.queue_name)
|
||||
except Exception as ex:
|
||||
print("Failed parsing ClearML Template argument [{}] skipped: error ()".format(token, ex))
|
||||
|
||||
return full_docker_cmd
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import sys
|
||||
from typing import Any
|
||||
|
||||
from ...._vendor.pathlib2 import Path
|
||||
@@ -72,11 +73,20 @@ class VirtualenvPip(SystemPip, PackageManager):
|
||||
self.python, "-m", "virtualenv", self.path, *self.create_flags()
|
||||
).check_call()
|
||||
except Exception as ex:
|
||||
# let's try with std library instead
|
||||
print("WARNING: virtualenv call failed: {}\n INFO: Creating virtual environment with venv".format(ex))
|
||||
self.session.command(
|
||||
self.python, "-m", "venv", self.path, *self.create_flags()
|
||||
).check_call()
|
||||
try:
|
||||
# let's try with std library instead
|
||||
print("WARNING: virtualenv call failed: {}\n INFO: Creating virtual environment with venv".format(ex))
|
||||
self.session.command(
|
||||
self.python, "-m", "venv", self.path, *self.create_flags()
|
||||
).check_call()
|
||||
except Exception as ex:
|
||||
# let's try with std library instead
|
||||
print("WARNING: virtualenv and venv failed with [{}] trying virtualenv with python [{}]".format(
|
||||
self.python, sys.executable))
|
||||
self.python = str(sys.executable)
|
||||
self.session.command(
|
||||
self.python, "-m", "virtualenv", self.path, *self.create_flags()
|
||||
).check_call()
|
||||
|
||||
return self
|
||||
|
||||
|
||||
@@ -247,7 +247,7 @@ class UvAPI(VirtualenvPip):
|
||||
# there is a bug so we have to call pip to get the freeze because UV will return the wrong list
|
||||
# packages = self.run_with_env(('freeze',), output=True).splitlines()
|
||||
packages = self.lock_config.get_run_argv(
|
||||
"pip", "freeze", "--python", str(Path(self.path) / "bin" / "python"), cwd=self.lockfile_path).get_output().splitlines()
|
||||
"pip", "freeze", cwd=self.lockfile_path).get_output().splitlines()
|
||||
# list clearml_agent as well
|
||||
# packages_without_program = [package for package in packages if PROGRAM_NAME not in package]
|
||||
return {'pip': packages}
|
||||
|
||||
@@ -89,9 +89,9 @@ def kill_all_child_processes(pid=None, include_parent=True):
|
||||
print("\nLeaving process id {}".format(pid))
|
||||
try:
|
||||
parent = psutil.Process(pid)
|
||||
except psutil.Error:
|
||||
# could not find parent process id
|
||||
print("ERROR: could not find parent process id {}".format(pid))
|
||||
except psutil.Error as ex:
|
||||
# could not find process id
|
||||
print("ERROR: could not find process id {}: {}".format(pid, ex))
|
||||
return
|
||||
for child in parent.children(recursive=True):
|
||||
try:
|
||||
@@ -113,7 +113,7 @@ def terminate_all_child_processes(pid=None, timeout=10., include_parent=True):
|
||||
try:
|
||||
parent = psutil.Process(pid)
|
||||
except psutil.Error:
|
||||
# could not find parent process id
|
||||
# could not find process id
|
||||
return
|
||||
for child in parent.children(recursive=False):
|
||||
print('Terminating child process {}'.format(child.pid))
|
||||
|
||||
@@ -2,6 +2,7 @@ from __future__ import unicode_literals, division
|
||||
|
||||
import logging
|
||||
import re
|
||||
import os
|
||||
import shlex
|
||||
from collections import deque
|
||||
from itertools import starmap
|
||||
@@ -112,7 +113,15 @@ class ResourceMonitor(object):
|
||||
active_gpus = Session.get_nvidia_visible_env()
|
||||
# None means no filtering, report all gpus
|
||||
if active_gpus and active_gpus != "all":
|
||||
self._active_gpus = [g.strip() for g in str(active_gpus).split(',')]
|
||||
if os.path.isdir(active_gpus):
|
||||
try:
|
||||
self._active_gpus = os.listdir(active_gpus)
|
||||
except OSError as e:
|
||||
log.warning(
|
||||
"Failed listing {}: {}".format(active_gpus, e)
|
||||
)
|
||||
else:
|
||||
self._active_gpus = [g.strip() for g in active_gpus.split(",")]
|
||||
except Exception:
|
||||
pass
|
||||
self._cluster_report_interval_sec = int(session.config.get(
|
||||
|
||||
@@ -141,7 +141,9 @@ DAEMON_ARGS = dict({
|
||||
'action': 'store_true',
|
||||
},
|
||||
'--use-owner-token': {
|
||||
'help': 'Generate and use task owner token for the execution of the task',
|
||||
'help': 'Run tasks under the identity of each task\'s owner: all calls made by the task code during execution '
|
||||
'will use the owner\'s credentials instead of the agent\'s. This feature requires the agent to use a '
|
||||
'ClearML Enterprise Server.',
|
||||
'action': 'store_true',
|
||||
}
|
||||
}, **WORKER_ARGS)
|
||||
|
||||
@@ -13,7 +13,7 @@ api {
|
||||
agent.git_user=""
|
||||
agent.git_pass=""
|
||||
|
||||
# extra_index_url: ["https://allegroai.jfrog.io/clearml/api/pypi/public/simple"]
|
||||
# extra_index_url: ["https://clearml.jfrog.io/clearml/api/pypi/public/simple"]
|
||||
agent.package_manager.extra_index_url= [
|
||||
|
||||
]
|
||||
@@ -68,7 +68,7 @@ agent {
|
||||
force_upgrade: false,
|
||||
|
||||
# additional artifact repositories to use when installing python packages
|
||||
# extra_index_url: ["https://allegroai.jfrog.io/clearmlai/api/pypi/public/simple"]
|
||||
# extra_index_url: ["https://clearml.jfrog.io/clearmlai/api/pypi/public/simple"]
|
||||
|
||||
# additional conda channels to use when installing with conda package manager
|
||||
conda_channels: ["pytorch", "conda-forge", "defaults", ]
|
||||
|
||||
@@ -1,6 +1,4 @@
|
||||
ARG TAG=3.7.17-alpine3.18
|
||||
|
||||
FROM python:${TAG} as build
|
||||
FROM python:3.14-rc-alpine3.21 as build
|
||||
|
||||
RUN apk add --no-cache \
|
||||
gcc \
|
||||
@@ -16,7 +14,7 @@ RUN python3 \
|
||||
clearml-agent \
|
||||
cryptography>=2.9
|
||||
|
||||
FROM python:${TAG} as target
|
||||
FROM python:3.14-rc-alpine3.21 as target
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
|
||||
@@ -1,6 +1,4 @@
|
||||
ARG TAG=3.7.17-slim-bullseye
|
||||
|
||||
FROM python:${TAG} as target
|
||||
FROM python:3.10-slim-bookworm as target
|
||||
|
||||
ARG KUBECTL_VERSION=1.29.3
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ spec:
|
||||
serviceAccountName: ""
|
||||
containers:
|
||||
- name: k8s-glue-container
|
||||
image: allegroai/clearml-agent-k8s:aws-latest-1.21
|
||||
image: clearml/clearml-agent-k8s:aws-latest-1.21
|
||||
imagePullPolicy: Always
|
||||
command: [
|
||||
"/bin/bash",
|
||||
|
||||
@@ -6,7 +6,7 @@ spec:
|
||||
serviceAccountName: ""
|
||||
containers:
|
||||
- name: k8s-glue-container
|
||||
image: allegroai/clearml-agent-k8s:gcp-latest-1.21
|
||||
image: clearml/clearml-agent-k8s:gcp-latest-1.21
|
||||
imagePullPolicy: Always
|
||||
command: [
|
||||
"/bin/bash",
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
"This notebook defines a cloud budget (currently only AWS is supported, but feel free to expand with PRs), and spins an instance the minute a job is waiting for execution. It will also spin down idle machines, saving you some $$$ :)\n",
|
||||
"\n",
|
||||
"> **Note:**\n",
|
||||
"> This is just an example of how you can use ClearML Agent to implement custom autoscaling. For a more structured autoscaler script, see [here](https://github.com/allegroai/clearml/blob/master/clearml/automation/auto_scaler.py).\n",
|
||||
"> This is just an example of how you can use ClearML Agent to implement custom autoscaling. For a more structured autoscaler script, see [here](https://github.com/clearml/clearml/blob/master/clearml/automation/auto_scaler.py).\n",
|
||||
"\n",
|
||||
"Configuration steps:\n",
|
||||
"- Define maximum budget to be used (instance type / number of instances).\n",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"""
|
||||
This example assumes you have preconfigured services with selectors in the form of
|
||||
"ai.allegro.agent.serial=pod-<number>" and a targetPort of 10022.
|
||||
"ai.clearml.agent.serial=pod-<number>" and a targetPort of 10022.
|
||||
The K8sIntegration component will label each pod accordingly.
|
||||
"""
|
||||
from argparse import ArgumentParser
|
||||
@@ -13,16 +13,26 @@ def parse_args():
|
||||
group = parser.add_mutually_exclusive_group()
|
||||
|
||||
parser.add_argument(
|
||||
"--queue",
|
||||
type=str,
|
||||
help="Queues to pull tasks from. If multiple queues, use comma separated list, e.g. 'queue1,queue2'",
|
||||
"--k8s-pending-queue-name", type=str,
|
||||
help="Queue name to use when task is pending in the k8s scheduler (default: %(default)s)", default="k8s_scheduler"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--container-bash-script", type=str,
|
||||
help="Path to the file with container bash script to be executed in k8s", default=None
|
||||
)
|
||||
parser.add_argument(
|
||||
"--debug", action="store_true", default=False,
|
||||
help="Switch logging on (default: %(default)s)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--queue", type=str, help="Queues to pull tasks from. If multiple queues, use comma separated list, e.g. 'queue1,queue2'",
|
||||
)
|
||||
group.add_argument(
|
||||
"--ports-mode",
|
||||
action="store_true",
|
||||
default=False,
|
||||
help="Ports-Mode will add a label to the pod which can be used as service, in order to expose ports"
|
||||
"Should not be used with max-pods"
|
||||
"Should not be used with max-pods",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--num-of-services",
|
||||
@@ -34,15 +44,15 @@ def parse_args():
|
||||
"--base-port",
|
||||
type=int,
|
||||
help="Used in conjunction with ports-mode, specifies the base port exposed by the services. "
|
||||
"For pod #X, the port will be <base-port>+X. Note that pod number is calculated based on base-pod-num"
|
||||
"e.g. if base-port=20000 and base-pod-num=3, the port for the first pod will be 20003"
|
||||
"For pod #X, the port will be <base-port>+X. Note that pod number is calculated based on base-pod-num"
|
||||
"e.g. if base-port=20000 and base-pod-num=3, the port for the first pod will be 20003",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--base-pod-num",
|
||||
type=int,
|
||||
default=1,
|
||||
help="Used in conjunction with ports-mode and base-port, specifies the base pod number to be used by the "
|
||||
"service (default: %(default)s)"
|
||||
"service (default: %(default)s)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--gateway-address",
|
||||
@@ -62,7 +72,7 @@ def parse_args():
|
||||
"--template-yaml",
|
||||
type=str,
|
||||
help="YAML file containing pod template. If provided pod will be scheduled with kubectl apply "
|
||||
"and overrides are ignored, otherwise it will be scheduled with kubectl run"
|
||||
"and overrides are ignored, otherwise it will be scheduled with kubectl run",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--ssh-server-port",
|
||||
@@ -80,13 +90,27 @@ def parse_args():
|
||||
"--max-pods",
|
||||
type=int,
|
||||
help="Limit the maximum number of pods that this service can run at the same time."
|
||||
"Should not be used with ports-mode"
|
||||
"Should not be used with ports-mode",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--pod-name-prefix", type=str,
|
||||
help="Define pod name prefix for k8s (default: %(default)s)", default="clearml-id-"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--limit-pod-label", type=str,
|
||||
help="Define limit pod label for k8s (default: %(default)s)", default="ai.allegro.agent.serial=pod-{pod_number}"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--no-system-packages", action="store_true", default=False,
|
||||
help="False when running tasks in containers (default: %(default)s)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--use-owner-token",
|
||||
action="store_true",
|
||||
default=False,
|
||||
help="Generate and use task owner token for the execution of each task",
|
||||
help="Run tasks under the identity of each task's owner: all calls made by the task code during execution will "
|
||||
"use the owner's credentials instead of the agent's. This features requires the agent to use a ClearML "
|
||||
"Enterprise Server.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--create-queue",
|
||||
@@ -111,7 +135,15 @@ def main():
|
||||
|
||||
user_props_cb = k8s_user_props_cb
|
||||
|
||||
if args.container_bash_script:
|
||||
with open(args.container_bash_script, "r") as file:
|
||||
container_bash_script = file.read().splitlines()
|
||||
else:
|
||||
container_bash_script = None
|
||||
|
||||
k8s = K8sIntegration(
|
||||
k8s_pending_queue_name=args.k8s_pending_queue_name,
|
||||
container_bash_script=container_bash_script,
|
||||
ports_mode=args.ports_mode,
|
||||
num_of_services=args.num_of_services,
|
||||
base_pod_num=args.base_pod_num,
|
||||
@@ -124,6 +156,10 @@ def main():
|
||||
else None,
|
||||
namespace=args.namespace,
|
||||
max_pods_limit=args.max_pods or None,
|
||||
pod_name_prefix=args.pod_name_prefix,
|
||||
limit_pod_label=args.limit_pod_label,
|
||||
force_system_packages=not args.no_system_packages,
|
||||
debug=args.debug,
|
||||
)
|
||||
queue = [q.strip() for q in args.queue.split(",") if q.strip()] if args.queue else None
|
||||
|
||||
|
||||
Reference in New Issue
Block a user