Compare commits

...

13 Commits

Author SHA1 Message Date
Ilia Meshcheriakov
3e5153a068 Add command line arguments for k8s_glue_example.py (#196)
Co-authored-by: Meshcheryakov Ilya <i.meshcheryakov@mts.ai>
Co-authored-by: Jake Henning <59198928+jkhenning@users.noreply.github.com>
2025-06-25 18:13:59 +03:00
pollfly
740f90c96f Add note to --use-owner-token help (#235) 2025-06-25 14:34:32 +03:00
clearml
ba854aa53b Fix fall back to system python should not change the python bin inside the new venv 2025-06-23 12:37:38 +03:00
clearml
c76dfe7ce6 Optimize dynamic GPU to query only relevant workers (requires clearml-server >=v2.0.0, otherwise the selection argument is ignored) 2025-06-22 22:42:30 +03:00
clearml
e551ee1eb5 Fix fallback to system path python3 if we failed to have pip used with the selected python - this could happen if pre-installed python is in path and it does not contain pip package (e.g. example NIM containers) 2025-06-22 22:41:20 +03:00
clearml
eed930b9a6 Fix UV as pip drop-in replacement print 2025-06-22 22:40:32 +03:00
clearml
4881b9638d Fix do not set Aborted task if it is already set to Failed 2025-06-22 22:39:55 +03:00
clearml
3585786348 Fix installing venv from the agent's python binary when the selected python failed - this could be the cause of missing pip or venv in the selected python 2025-06-22 22:39:29 +03:00
clearml
afe69c822f Add CLEARML_AGENT_QUEUE_POLL_FREQ_SEC and CLEARML_AGENT_STATUS_REPORT_FREQ_SEC env vars to customize agent behavior 2025-06-22 22:38:44 +03:00
clearml
8d8dc4e396 Add better debug logging when task session creation fails 2025-06-22 22:37:59 +03:00
clearml
553c72e06a Support NVIDIA_VISIBLE_DEVICES using volume mounts 2025-06-22 22:37:27 +03:00
clearml
768ee3d2cf Fix potential issue with agent not sending queues in status report 2025-06-22 22:36:49 +03:00
clearml
30d24beb51 Refactor env var 2025-06-22 22:35:56 +03:00
7 changed files with 130 additions and 34 deletions

View File

@@ -79,7 +79,11 @@ from clearml_agent.definitions import (
ENV_AGENT_FORCE_EXEC_SCRIPT,
ENV_TEMP_STDOUT_FILE_DIR,
ENV_AGENT_FORCE_TASK_INIT,
ENV_AGENT_DEBUG_GET_NEXT_TASK, ENV_ABORT_CALLBACK_CMD, ENV_ABORT_CALLBACK_CMD_TIMEOUT,
ENV_AGENT_DEBUG_GET_NEXT_TASK,
ENV_ABORT_CALLBACK_CMD,
ENV_ABORT_CALLBACK_CMD_TIMEOUT,
ENV_QUEUE_POLL_FREQ_SEC,
ENV_STATUS_REPORT_FREQ_SEC,
)
from clearml_agent.definitions import WORKING_REPOSITORY_DIR, PIP_EXTRA_INDICES
from clearml_agent.errors import (
@@ -865,10 +869,10 @@ class Worker(ServiceCommandSection):
)
# default poll queues every _polling_interval seconds
_polling_interval = 5.0
_polling_interval = ENV_QUEUE_POLL_FREQ_SEC.get() or 5.0
# machine status update intervals, seconds
_machine_update_interval = 30.0
_machine_update_interval = ENV_STATUS_REPORT_FREQ_SEC.get() or 30.0
# message printed before starting task logging,
# it will be parsed by services_mode, to identify internal docker logging start
@@ -1391,6 +1395,10 @@ class Worker(ServiceCommandSection):
headers=headers
)
if not (result.ok() and result.response):
try:
self.log.debug("Failed creating task session: %s", result.response)
except:
pass
return
new_session = copy(session)
new_session.config = deepcopy(session.config)
@@ -1722,16 +1730,24 @@ class Worker(ServiceCommandSection):
def _dynamic_gpu_get_available(self, gpu_indexes):
# key: cast to string, value: 1 (i.e. gull GPU)
gpu_indexes = {str(g): 1 for g in gpu_indexes}
worker_name = self._session.config.get("agent.worker_name", "") + ':gpu'
# only return "Our" workers (requires server API +2, otherwise the selecort pattern is ignored)
# noinspection PyBroadException
try:
response = self._session.send_api(workers_api.GetAllRequest(last_seen=600))
response = self._session.send_api(workers_api.GetAllRequest(
last_seen=600,
worker_pattern="{}*".format(worker_name),
_allow_extra_fields_=True
))
except Exception:
return None
worker_name = self._session.config.get("agent.worker_name", "") + ':gpu'
# filter only our workers, in case the selector pattern above was ignored due to lower version API server
our_workers = [
w.id for w in response.workers
if w.id.startswith(worker_name) and w.id != self.worker_id]
if w.id.startswith(worker_name) and w.id != self.worker_id
]
gpus = {}
allocated_gpus = {}
gpu_pattern = re.compile(r"\d+[.]?\d*[a-z]?")
@@ -2017,7 +2033,7 @@ class Worker(ServiceCommandSection):
columns = ("id", "name", "tags")
print("Listening to queues:")
if dynamic_gpus:
columns = ("id", "name", "tags", "gpus")
columns = ("id", "name", "tags", "gpus (min, max)")
for q in queues_info:
q['gpus'] = str(dict(dynamic_gpus).get(q['id']) or '')
print_table(queues_info, columns=columns, titles=columns)
@@ -3477,16 +3493,24 @@ class Worker(ServiceCommandSection):
session = session or self._session
try:
if stop_reason == TaskStopReason.stopped:
self.log("Stopping - tasks.stop was called for task")
self.send_logs(task_id, ["Process aborted by user"], session=session)
session.send_api(
tasks_api.StoppedRequest(
task=task_id,
status_reason="task was stopped by tasks.stop",
status_message=self._task_status_change_message,
force=False
# do not change the status to stopped if the Task status is already failed
task_status = get_task(
session, task_id, only_fields=["status"]
).status
if str(task_status) == "failed":
self.send_logs(task_id, ["Process aborted by user - Task status was Failed"], session=session)
self.log("Stopping - task was already marked as failed")
else:
self.send_logs(task_id, ["Process aborted by user"], session=session)
self.log("Stopping - tasks.stop was called for task")
session.send_api(
tasks_api.StoppedRequest(
task=task_id,
status_reason="task was stopped by tasks.stop",
status_message=self._task_status_change_message,
force=False
)
)
)
elif stop_reason == TaskStopReason.status_changed:
try:
@@ -3736,7 +3760,7 @@ class Worker(ServiceCommandSection):
# revert to venv that we used inside UV
api = None
self.package_api = package_api = package_api.get_venv_manager()
elif not api:
elif self._session.config.get("agent.package_manager.type", None) == "uv" and not api:
# this means `agent.package_manager.uv_replace_pip` is set to true
print("INFO: using UV as pip drop-in replacement")
@@ -4894,7 +4918,7 @@ class Worker(ServiceCommandSection):
"declare LOCAL_PYTHON",
"[ ! -z $LOCAL_PYTHON ] || for i in {{20..5}}; do (which {python_single_digit}.$i 2> /dev/null || command -v {python_single_digit}.$i) && " +
"{python_single_digit}.$i -m pip --version && " +
"export LOCAL_PYTHON=$(which {python_single_digit}.$i 2> /dev/null || command -v git) && break ; done",
"export LOCAL_PYTHON=$(which {python_single_digit}.$i 2> /dev/null || command -v {python_single_digit}.$i) && break ; done",
"[ ! -z $LOCAL_PYTHON ] || export CLEARML_APT_INSTALL=\"$CLEARML_APT_INSTALL {python_single_digit}-pip\"", # noqa
"[ -z \"$CLEARML_APT_INSTALL\" ] || "
"(apt-get update -y ; apt-get install -y $CLEARML_APT_INSTALL) || "
@@ -4908,11 +4932,13 @@ class Worker(ServiceCommandSection):
docker_bash_script = " ; ".join([line for line in bash_script if line]) \
if not isinstance(bash_script, str) else bash_script
# make sure that if we do not have $LOCAL_PYTHON defined
# we set it to python3
# make sure that if we do not have $LOCAL_PYTHON defined, we set it to python3
# notice that if $LOCAL_PYTHON -m pip fails, that means we might have a broken python in the path
# so we set to default path and try to set global python
update_scheme += (
docker_bash_script + " ; " +
"[ ! -z $LOCAL_PYTHON ] || export LOCAL_PYTHON={python} ; " +
"$LOCAL_PYTHON -m pip --version > /dev/null || export LOCAL_PYTHON=$(PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin command -v python3) ; " +
"$LOCAL_PYTHON -m pip install -U {pip_version} ; " +
"$LOCAL_PYTHON -m pip install -U {clearml_agent_wheel} ; ").format(
python_single_digit=python_version.split('.')[0],

View File

@@ -256,6 +256,10 @@ ENV_GIT_CLONE_VERBOSE = EnvironmentConfig("CLEARML_AGENT_GIT_CLONE_VERBOSE", typ
ENV_GPU_FRACTIONS = EnvironmentConfig("CLEARML_AGENT_GPU_FRACTIONS")
ENV_QUEUE_POLL_FREQ_SEC = EnvironmentConfig("CLEARML_AGENT_QUEUE_POLL_FREQ_SEC", type=float)
ENV_STATUS_REPORT_FREQ_SEC = EnvironmentConfig("CLEARML_AGENT_STATUS_REPORT_FREQ_SEC", type=float)
class FileBuffering(IntEnum):
"""

View File

@@ -90,6 +90,7 @@ class K8sIntegration(Worker):
'[ ! -z "$CLEARML_AGENT_SKIP_CONTAINER_APT" ] || [ ! -z "$LOCAL_PYTHON" ] || '
'apt-get install -y python3-pip || dnf install -y python3-pip',
"[ ! -z $LOCAL_PYTHON ] || export LOCAL_PYTHON=python3",
"[ ! -z $CLEARML_AGENT_NO_UPDATE ] || $LOCAL_PYTHON -m pip --version > /dev/null || export LOCAL_PYTHON=$(PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin command -v python3)",
"rm -f /usr/lib/python3.*/EXTERNALLY-MANAGED", # remove PEP 668
"{extra_bash_init_cmd}",
"[ ! -z $CLEARML_AGENT_NO_UPDATE ] || $LOCAL_PYTHON -m pip install clearml-agent{agent_install_args}",
@@ -147,6 +148,10 @@ class K8sIntegration(Worker):
:param str extra_bash_init_script: Additional bash script to run before starting the Task inside the container
:param str namespace: K8S namespace to be used when creating the new pods (default: clearml)
:param int max_pods_limit: Maximum number of pods that K8S glue can run at the same time
:param str pod_name_prefix: Define pod name prefix for k8s (default: clearml-id-)
:param str limit_pod_label: Define limit pod label for k8s (default: ai.allegro.agent.serial=pod-{pod_number})
:param bool force_system_packages: true when running tasks in containers (i.e. docker mode or k8s glue).
(default: true)
"""
super(K8sIntegration, self).__init__()
self.kind = os.environ.get("CLEARML_K8S_GLUE_KIND", "pod").strip().lower()
@@ -839,8 +844,11 @@ class K8sIntegration(Worker):
def get_task_worker_id(self, template, task_id, pod_name, namespace, queue):
return f"{self.worker_id}:{task_id}"
def use_image_entrypoint(self, queue: str, task_id: str, docker_image: str) -> bool:
return ENV_POD_USE_IMAGE_ENTRYPOINT.get()
def _create_template_container(
self, pod_name: str, task_id: str, docker_image: str, docker_args: List[str],
self, pod_name: str, task_id: str, docker_image: str, docker_args: List[str], queue: str,
docker_bash: str, clearml_conf_create_script: List[str], task_worker_id: str, task_token: str = None
) -> dict:
container = self._get_docker_args(
@@ -862,7 +870,7 @@ class K8sIntegration(Worker):
# Set worker ID
add_or_update_env_var('CLEARML_WORKER_ID', task_worker_id)
if ENV_POD_USE_IMAGE_ENTRYPOINT.get():
if self.use_image_entrypoint(queue=queue, task_id=task_id, docker_image=docker_image):
# Don't add a cmd and args, just the image
# Add the task ID and token since we need it (it's usually in the init script passed to us
@@ -979,6 +987,7 @@ class K8sIntegration(Worker):
clearml_conf_create_script=clearml_conf_create_script,
task_worker_id=task_worker_id,
task_token=task_token,
queue=queue,
)
if containers:
@@ -1292,7 +1301,7 @@ class K8sIntegration(Worker):
self.report_monitor(ResourceMonitor.StatusReport(queues=queues, queue=queue, task=task_id))
self.run_one_task(queue, task_id, worker_params, task_session)
self.report_monitor(ResourceMonitor.StatusReport(queues=self.queues))
self.report_monitor(ResourceMonitor.StatusReport(queues=queues))
break
else:
# sleep and retry polling

View File

@@ -1,3 +1,4 @@
import sys
from typing import Any
from ...._vendor.pathlib2 import Path
@@ -72,11 +73,20 @@ class VirtualenvPip(SystemPip, PackageManager):
self.python, "-m", "virtualenv", self.path, *self.create_flags()
).check_call()
except Exception as ex:
# let's try with std library instead
print("WARNING: virtualenv call failed: {}\n INFO: Creating virtual environment with venv".format(ex))
self.session.command(
self.python, "-m", "venv", self.path, *self.create_flags()
).check_call()
try:
# let's try with std library instead
print("WARNING: virtualenv call failed: {}\n INFO: Creating virtual environment with venv".format(ex))
self.session.command(
self.python, "-m", "venv", self.path, *self.create_flags()
).check_call()
except Exception as ex:
# let's try with std library instead
print("WARNING: virtualenv and venv failed with [{}] trying virtualenv with python [{}]".format(
self.python, sys.executable))
self.python = str(sys.executable)
self.session.command(
self.python, "-m", "virtualenv", self.path, *self.create_flags()
).check_call()
return self

View File

@@ -2,6 +2,7 @@ from __future__ import unicode_literals, division
import logging
import re
import os
import shlex
from collections import deque
from itertools import starmap
@@ -112,7 +113,15 @@ class ResourceMonitor(object):
active_gpus = Session.get_nvidia_visible_env()
# None means no filtering, report all gpus
if active_gpus and active_gpus != "all":
self._active_gpus = [g.strip() for g in str(active_gpus).split(',')]
if os.path.isdir(active_gpus):
try:
self._active_gpus = os.listdir(active_gpus)
except OSError as e:
log.warning(
"Failed listing {}: {}".format(active_gpus, e)
)
else:
self._active_gpus = [g.strip() for g in active_gpus.split(",")]
except Exception:
pass
self._cluster_report_interval_sec = int(session.config.get(

View File

@@ -141,7 +141,9 @@ DAEMON_ARGS = dict({
'action': 'store_true',
},
'--use-owner-token': {
'help': 'Generate and use task owner token for the execution of the task',
'help': 'Run tasks under the identity of each task\'s owner: all calls made by the task code during execution '
'will use the owner\'s credentials instead of the agent\'s. This feature requires the agent to use a '
'ClearML Enterprise Server.',
'action': 'store_true',
}
}, **WORKER_ARGS)

View File

@@ -13,9 +13,19 @@ def parse_args():
group = parser.add_mutually_exclusive_group()
parser.add_argument(
"--queue",
type=str,
help="Queues to pull tasks from. If multiple queues, use comma separated list, e.g. 'queue1,queue2'",
"--k8s-pending-queue-name", type=str,
help="Queue name to use when task is pending in the k8s scheduler (default: %(default)s)", default="k8s_scheduler"
)
parser.add_argument(
"--container-bash-script", type=str,
help="Path to the file with container bash script to be executed in k8s", default=None
)
parser.add_argument(
"--debug", action="store_true", default=False,
help="Switch logging on (default: %(default)s)"
)
parser.add_argument(
"--queue", type=str, help="Queues to pull tasks from. If multiple queues, use comma separated list, e.g. 'queue1,queue2'",
)
group.add_argument(
"--ports-mode",
@@ -82,11 +92,25 @@ def parse_args():
help="Limit the maximum number of pods that this service can run at the same time."
"Should not be used with ports-mode",
)
parser.add_argument(
"--pod-name-prefix", type=str,
help="Define pod name prefix for k8s (default: %(default)s)", default="clearml-id-"
)
parser.add_argument(
"--limit-pod-label", type=str,
help="Define limit pod label for k8s (default: %(default)s)", default="ai.allegro.agent.serial=pod-{pod_number}"
)
parser.add_argument(
"--no-system-packages", action="store_true", default=False,
help="False when running tasks in containers (default: %(default)s)"
)
parser.add_argument(
"--use-owner-token",
action="store_true",
default=False,
help="Generate and use task owner token for the execution of each task",
help="Run tasks under the identity of each task's owner: all calls made by the task code during execution will "
"use the owner's credentials instead of the agent's. This features requires the agent to use a ClearML "
"Enterprise Server.",
)
parser.add_argument(
"--create-queue",
@@ -111,7 +135,15 @@ def main():
user_props_cb = k8s_user_props_cb
if args.container_bash_script:
with open(args.container_bash_script, "r") as file:
container_bash_script = file.read().splitlines()
else:
container_bash_script = None
k8s = K8sIntegration(
k8s_pending_queue_name=args.k8s_pending_queue_name,
container_bash_script=container_bash_script,
ports_mode=args.ports_mode,
num_of_services=args.num_of_services,
base_pod_num=args.base_pod_num,
@@ -124,6 +156,10 @@ def main():
else None,
namespace=args.namespace,
max_pods_limit=args.max_pods or None,
pod_name_prefix=args.pod_name_prefix,
limit_pod_label=args.limit_pod_label,
force_system_packages=not args.no_system_packages,
debug=args.debug,
)
queue = [q.strip() for q in args.queue.split(",") if q.strip()] if args.queue else None