mirror of
https://github.com/clearml/clearml-agent
synced 2025-06-26 18:16:15 +00:00
Compare commits
13 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
91dfa09466 | ||
|
|
f110bbf5b4 | ||
|
|
070919973b | ||
|
|
47d35ef48f | ||
|
|
54ed234fca | ||
|
|
a26860e79f | ||
|
|
fc1abbab0b | ||
|
|
4fa61dde1f | ||
|
|
26d748a4d8 | ||
|
|
5419fd84ae | ||
|
|
d8366dedc6 | ||
|
|
cc656e2969 | ||
|
|
b65e5fed94 |
@@ -54,15 +54,17 @@
|
||||
# docker_use_activated_venv: true
|
||||
|
||||
# select python package manager:
|
||||
# currently supported: pip, conda and poetry
|
||||
# currently supported: pip, conda, uv and poetry
|
||||
# if "pip" or "conda" are used, the agent installs the required packages
|
||||
# based on the "installed packages" section of the Task. If the "installed packages" is empty,
|
||||
# it will revert to using `requirements.txt` from the repository's root directory.
|
||||
# If Poetry is selected and the root repository contains `poetry.lock` or `pyproject.toml`,
|
||||
# the "installed packages" section is ignored, and poetry is used.
|
||||
# If Poetry is selected and no lock file is found, it reverts to "pip" package manager behaviour.
|
||||
# If uv is selected and the root repository contains `uv.lock` or `pyproject.toml`,
|
||||
# the "installed packages" section is ignored, and uv is used.
|
||||
package_manager: {
|
||||
# supported options: pip, conda, poetry
|
||||
# supported options: pip, conda, poetry, uv
|
||||
type: pip,
|
||||
|
||||
# specify pip version to use (examples "<20.2", "==19.3.1", "", empty string will install the latest version)
|
||||
@@ -70,6 +72,8 @@
|
||||
# specify poetry version to use (examples "<2", "==1.1.1", "", empty string will install the latest version)
|
||||
# poetry_version: "<2",
|
||||
# poetry_install_extra_args: ["-v"]
|
||||
# uv_version: ">0.4",
|
||||
# uv_sync_extra_args: ["--all-extras"]
|
||||
|
||||
# virtual environment inherits packages from system
|
||||
system_site_packages: false,
|
||||
@@ -133,6 +137,10 @@
|
||||
# if set to true, the agent will look for the "poetry.lock" file
|
||||
# in the passed current working directory instead of the repository's root directory.
|
||||
poetry_files_from_repo_working_dir: false
|
||||
|
||||
# if set to true, the agent will look for the "uv.lock" file
|
||||
# in the passed current working directory instead of the repository's root directory.
|
||||
uv_files_from_repo_working_dir: false
|
||||
},
|
||||
|
||||
# target folder for virtual environments builds, created when executing experiment
|
||||
@@ -192,6 +200,12 @@
|
||||
# allows the following task docker args to be overridden by the extra_docker_arguments
|
||||
# protected_docker_extra_args: ["privileged", "security-opt", "network", "ipc"]
|
||||
|
||||
# Enforce filter whitelist on docker arguments, allowing only those matching these filters to be used when running
|
||||
# a task. These can also be provided using the CLEARML_AGENT_DOCKER_ARGS_FILTERS environment variable
|
||||
# (using shlex.split whitespace-separated format).
|
||||
# For example, allow only environment variables:
|
||||
# docker_args_filters: ["^--env$", "^-e$"]
|
||||
|
||||
# optional shell script to run in docker when started before the experiment is started
|
||||
# extra_docker_shell_script: ["apt-get install -y bindfs", ]
|
||||
|
||||
|
||||
@@ -134,7 +134,7 @@ class BaseField(object):
|
||||
def _validate_name(self):
|
||||
if self.name is None:
|
||||
return
|
||||
if not re.match('^[A-Za-z_](([\w\-]*)?\w+)?$', self.name):
|
||||
if not re.match(r'^[A-Za-z_](([\w\-]*)?\w+)?$', self.name):
|
||||
raise ValueError('Wrong name', self.name)
|
||||
|
||||
def structue_name(self, default):
|
||||
|
||||
@@ -19,8 +19,19 @@ class Request(ApiModel):
|
||||
_method = ENV_API_DEFAULT_REQ_METHOD.get(default="get")
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
if kwargs:
|
||||
allow_extra_fields = kwargs.pop("_allow_extra_fields_", False)
|
||||
if not allow_extra_fields and kwargs:
|
||||
raise ValueError('Unsupported keyword arguments: %s' % ', '.join(kwargs.keys()))
|
||||
elif allow_extra_fields and kwargs:
|
||||
self._extra_fields = kwargs
|
||||
else:
|
||||
self._extra_fields = {}
|
||||
|
||||
def to_dict(self, *args, **kwargs):
|
||||
res = super(Request, self).to_dict(*args, **kwargs)
|
||||
if self._extra_fields:
|
||||
res.update(self._extra_fields)
|
||||
return res
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
|
||||
@@ -327,7 +327,7 @@ class ServiceCommandSection(BaseCommandSection):
|
||||
def get_service(self, service_class):
|
||||
return service_class(config=self._session.config)
|
||||
|
||||
def _resolve_name(self, name, service=None):
|
||||
def _resolve_name(self, name, service=None, search_hidden=False):
|
||||
"""
|
||||
Resolve an object name to an object ID.
|
||||
Operation:
|
||||
@@ -349,7 +349,11 @@ class ServiceCommandSection(BaseCommandSection):
|
||||
except AttributeError:
|
||||
raise NameResolutionError('Name resolution unavailable for {}'.format(service))
|
||||
|
||||
request = request_cls.from_dict(dict(name=re.escape(name), only_fields=['name', 'id']))
|
||||
req_dict = {"name": re.escape(name), "only_fields": ['name', 'id']}
|
||||
if search_hidden:
|
||||
req_dict["_allow_extra_fields_"] = True
|
||||
req_dict["search_hidden"] = True
|
||||
request = request_cls.from_dict(req_dict)
|
||||
# from_dict will ignore unrecognised keyword arguments - not all GetAll's have only_fields
|
||||
response = getattr(self._session.send_api(request), service)
|
||||
matches = [db_object for db_object in response if name.lower() == db_object.name.lower()]
|
||||
|
||||
@@ -122,6 +122,7 @@ from clearml_agent.helper.package.external_req import ExternalRequirements, Only
|
||||
from clearml_agent.helper.package.pip_api.system import SystemPip
|
||||
from clearml_agent.helper.package.pip_api.venv import VirtualenvPip
|
||||
from clearml_agent.helper.package.poetry_api import PoetryConfig, PoetryAPI
|
||||
from clearml_agent.helper.package.uv_api import UvConfig, UvAPI
|
||||
from clearml_agent.helper.package.post_req import PostRequirement
|
||||
from clearml_agent.helper.package.priority_req import PriorityPackageRequirement, PackageCollectorRequirement, \
|
||||
CachedPackageRequirement
|
||||
@@ -756,6 +757,7 @@ class Worker(ServiceCommandSection):
|
||||
|
||||
self.is_venv_update = self._session.config.agent.venv_update.enabled
|
||||
self.poetry = PoetryConfig(self._session)
|
||||
self.uv = UvConfig(self._session)
|
||||
self.docker_image_func = None
|
||||
self._patch_docker_cmd_func = None
|
||||
self._docker_image = None
|
||||
@@ -3000,9 +3002,11 @@ class Worker(ServiceCommandSection):
|
||||
|
||||
# Add the script CWD to the python path
|
||||
if repo_info and repo_info.root and self._session.config.get('agent.force_git_root_python_path', None):
|
||||
python_path = get_python_path(repo_info.root, None, self.package_api, is_conda_env=self.is_conda)
|
||||
python_path = get_python_path(repo_info.root, None, self.package_api,
|
||||
is_conda_env=self.is_conda or self.uv.enabled)
|
||||
else:
|
||||
python_path = get_python_path(script_dir, execution.entry_point, self.package_api, is_conda_env=self.is_conda)
|
||||
python_path = get_python_path(script_dir, execution.entry_point, self.package_api,
|
||||
is_conda_env=self.is_conda or self.uv.enabled)
|
||||
if ENV_TASK_EXTRA_PYTHON_PATH.get():
|
||||
python_path = add_python_path(python_path, ENV_TASK_EXTRA_PYTHON_PATH.get())
|
||||
if python_path:
|
||||
@@ -3017,7 +3021,7 @@ class Worker(ServiceCommandSection):
|
||||
ENV_TASK_EXECUTE_AS_USER.get())
|
||||
use_execv = False
|
||||
else:
|
||||
use_execv = is_linux_platform() and not isinstance(self.package_api, (PoetryAPI, CondaAPI))
|
||||
use_execv = is_linux_platform() and not isinstance(self.package_api, (PoetryAPI, UvAPI ,CondaAPI))
|
||||
|
||||
self._session.api_client.tasks.started(
|
||||
task=current_task.id,
|
||||
@@ -3383,7 +3387,7 @@ class Worker(ServiceCommandSection):
|
||||
|
||||
# disable caching with poetry because we cannot make it install into a specific folder
|
||||
# Todo: add support for poetry caching
|
||||
if not self.poetry.enabled:
|
||||
if not self.poetry.enabled and not self.uv.enabled:
|
||||
# disable caching if we skipped the venv creation or the entire python setup
|
||||
if add_venv_folder_cache and not self._standalone_mode and (
|
||||
not ENV_AGENT_SKIP_PIP_VENV_INSTALL.get() and
|
||||
@@ -3434,6 +3438,31 @@ class Worker(ServiceCommandSection):
|
||||
except Exception as ex:
|
||||
self.log.error("failed installing poetry requirements: {}".format(ex))
|
||||
return None
|
||||
|
||||
def _install_uv_requirements(self, repo_info, working_dir=None):
|
||||
# type: (Optional[RepoInfo], Optional[str]) -> Optional[UvAPI]
|
||||
if not repo_info:
|
||||
return None
|
||||
|
||||
files_from_working_dir = self._session.config.get(
|
||||
"agent.package_manager.uv_files_from_repo_working_dir", False)
|
||||
lockfile_path = Path(repo_info.root) / ((working_dir or "") if files_from_working_dir else "")
|
||||
|
||||
try:
|
||||
if not self.uv.enabled:
|
||||
return None
|
||||
|
||||
self.uv.initialize(cwd=lockfile_path)
|
||||
api = self.uv.get_api(lockfile_path)
|
||||
if api.enabled:
|
||||
print('UV Enabled: Ignoring requested python packages, using repository uv lock file!')
|
||||
api.install()
|
||||
return api
|
||||
|
||||
print(f"Could not find pyproject.toml or uv.lock file in {lockfile_path} \n")
|
||||
except Exception as ex:
|
||||
self.log.error("failed installing uv requirements: {}".format(ex))
|
||||
return None
|
||||
|
||||
def install_requirements(
|
||||
self, execution, repo_info, requirements_manager, cached_requirements=None, cwd=None, package_api=None
|
||||
@@ -3463,6 +3492,9 @@ class Worker(ServiceCommandSection):
|
||||
package_api.cwd = cwd
|
||||
|
||||
api = self._install_poetry_requirements(repo_info, execution.working_dir)
|
||||
if not api:
|
||||
api = self._install_uv_requirements(repo_info, execution.working_dir)
|
||||
|
||||
if api:
|
||||
# update back the package manager, this hack should be fixed
|
||||
if package_api == self.package_api:
|
||||
@@ -3920,12 +3952,14 @@ class Worker(ServiceCommandSection):
|
||||
'To accelerate spin-up time set `agent.venvs_cache.path=~/.clearml/venvs-cache` :::\n')
|
||||
|
||||
# check if we have a cached folder
|
||||
if cached_requirements and not skip_pip_venv_install and self.package_api.get_cached_venv(
|
||||
requirements=cached_requirements,
|
||||
docker_cmd=execution_info.docker_cmd if execution_info else None,
|
||||
python_version=self.package_api.python,
|
||||
cuda_version=self._session.config.get("agent.cuda_version"),
|
||||
destination_folder=Path(venv_dir)
|
||||
if (cached_requirements and not skip_pip_venv_install and
|
||||
not self.poetry.enabled and not self.uv.enabled and
|
||||
self.package_api.get_cached_venv(
|
||||
requirements=cached_requirements,
|
||||
docker_cmd=execution_info.docker_cmd if execution_info else None,
|
||||
python_version=self.package_api.python,
|
||||
cuda_version=self._session.config.get("agent.cuda_version"),
|
||||
destination_folder=Path(venv_dir))
|
||||
):
|
||||
print('::: Using Cached environment {} :::'.format(self.package_api.get_last_used_entry_cache()))
|
||||
return venv_dir, requirements_manager, True
|
||||
@@ -4582,11 +4616,12 @@ class Worker(ServiceCommandSection):
|
||||
"cp -Rf {mount_ssh_ro} -T {mount_ssh}" if host_ssh_cache else "",
|
||||
"[ ! -z $(which git) ] || export CLEARML_APT_INSTALL=\"$CLEARML_APT_INSTALL git\"",
|
||||
"declare LOCAL_PYTHON",
|
||||
"[ ! -z $LOCAL_PYTHON ] || for i in {{15..5}}; do which {python_single_digit}.$i && " +
|
||||
"[ ! -z $LOCAL_PYTHON ] || for i in {{20..5}}; do which {python_single_digit}.$i && " +
|
||||
"{python_single_digit}.$i -m pip --version && " +
|
||||
"export LOCAL_PYTHON=$(which {python_single_digit}.$i) && break ; done",
|
||||
"[ ! -z $LOCAL_PYTHON ] || export CLEARML_APT_INSTALL=\"$CLEARML_APT_INSTALL {python_single_digit}-pip\"", # noqa
|
||||
"[ -z \"$CLEARML_APT_INSTALL\" ] || (apt-get update -y ; apt-get install -y $CLEARML_APT_INSTALL)",
|
||||
"rm /usr/lib/python3.*/EXTERNALLY-MANAGED", # remove PEP 668
|
||||
]
|
||||
|
||||
if preprocess_bash_script:
|
||||
@@ -4803,7 +4838,7 @@ class Worker(ServiceCommandSection):
|
||||
worker_name = '{}:cpu'.format(worker_name)
|
||||
return worker_id, worker_name
|
||||
|
||||
def _resolve_queue_names(self, queues, create_if_missing=False):
|
||||
def _resolve_queue_names(self, queues, create_if_missing=False, create_system_tags=None):
|
||||
if not queues:
|
||||
# try to look for queues with "default" tag
|
||||
try:
|
||||
@@ -4815,15 +4850,25 @@ class Worker(ServiceCommandSection):
|
||||
|
||||
queues = return_list(queues)
|
||||
if not create_if_missing:
|
||||
return [self._resolve_name(q if isinstance(q, str) else q.name, "queues") for q in queues]
|
||||
return [
|
||||
self._resolve_name(q if isinstance(q, str) else q.name, service="queues", search_hidden=True)
|
||||
for q in queues
|
||||
]
|
||||
|
||||
queue_ids = []
|
||||
for q in queues:
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
q_id = self._resolve_name(q if isinstance(q, str) else q.name, "queues")
|
||||
q_id = self._resolve_name(
|
||||
q if isinstance(q, str) else q.name, service="queues", search_hidden=True
|
||||
)
|
||||
except:
|
||||
self._session.send_api(queues_api.CreateRequest(name=q if isinstance(q, str) else q.name))
|
||||
q_id = self._resolve_name(q if isinstance(q, str) else q.name, "queues")
|
||||
self._session.send_api(
|
||||
queues_api.CreateRequest(name=q if isinstance(q, str) else q.name, system_tags=create_system_tags)
|
||||
)
|
||||
q_id = self._resolve_name(
|
||||
q if isinstance(q, str) else q.name, service="queues", search_hidden=True
|
||||
)
|
||||
queue_ids.append(q_id)
|
||||
return queue_ids
|
||||
|
||||
|
||||
@@ -161,6 +161,7 @@ ENV_AGENT_SKIP_PYTHON_ENV_INSTALL = EnvironmentConfig("CLEARML_AGENT_SKIP_PYTHON
|
||||
ENV_AGENT_FORCE_CODE_DIR = EnvironmentConfig("CLEARML_AGENT_FORCE_CODE_DIR")
|
||||
ENV_AGENT_FORCE_EXEC_SCRIPT = EnvironmentConfig("CLEARML_AGENT_FORCE_EXEC_SCRIPT")
|
||||
ENV_AGENT_FORCE_POETRY = EnvironmentConfig("CLEARML_AGENT_FORCE_POETRY", type=bool)
|
||||
ENV_AGENT_FORCE_UV = EnvironmentConfig("CLEARML_AGENT_FORCE_UV", type=bool)
|
||||
ENV_AGENT_FORCE_TASK_INIT = EnvironmentConfig("CLEARML_AGENT_FORCE_TASK_INIT", type=bool)
|
||||
ENV_DOCKER_SKIP_GPUS_FLAG = EnvironmentConfig("CLEARML_DOCKER_SKIP_GPUS_FLAG", "TRAINS_DOCKER_SKIP_GPUS_FLAG")
|
||||
ENV_AGENT_GIT_USER = EnvironmentConfig("CLEARML_AGENT_GIT_USER", "TRAINS_AGENT_GIT_USER")
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import shlex
|
||||
|
||||
from clearml_agent.helper.environment import EnvEntry
|
||||
|
||||
ENV_START_AGENT_SCRIPT_PATH = EnvEntry("CLEARML_K8S_GLUE_START_AGENT_SCRIPT_PATH", default="~/__start_agent__.sh")
|
||||
@@ -17,4 +19,14 @@ ENV_POD_USE_IMAGE_ENTRYPOINT = EnvEntry("K8S_GLUE_POD_USE_IMAGE_ENTRYPOINT", def
|
||||
"""
|
||||
Do not inject a cmd and args to the container's image when building the k8s template (depend on the built-in image
|
||||
entrypoint)
|
||||
"""
|
||||
"""
|
||||
|
||||
ENV_KUBECTL_IGNORE_ERROR = EnvEntry("K8S_GLUE_IGNORE_KUBECTL_ERROR", default=None)
|
||||
"""
|
||||
Ignore kubectl errors matching this string pattern (allows ignoring warnings sent on stderr while
|
||||
kubectl actually works and starts the pod)
|
||||
"""
|
||||
|
||||
ENV_DEFAULT_SCHEDULER_QUEUE_TAGS = EnvEntry(
|
||||
"K8S_GLUE_DEFAULT_SCHEDULER_QUEUE_TAGS", default=["k8s-glue"], converter=shlex.split
|
||||
)
|
||||
|
||||
@@ -42,6 +42,8 @@ from clearml_agent.glue.definitions import (
|
||||
ENV_DEFAULT_EXECUTION_AGENT_ARGS,
|
||||
ENV_POD_AGENT_INSTALL_ARGS,
|
||||
ENV_POD_USE_IMAGE_ENTRYPOINT,
|
||||
ENV_KUBECTL_IGNORE_ERROR,
|
||||
ENV_DEFAULT_SCHEDULER_QUEUE_TAGS,
|
||||
)
|
||||
|
||||
|
||||
@@ -83,10 +85,11 @@ class K8sIntegration(Worker):
|
||||
for line in _CONTAINER_APT_SCRIPT_SECTION
|
||||
),
|
||||
"declare LOCAL_PYTHON",
|
||||
"[ ! -z $LOCAL_PYTHON ] || for i in {{15..5}}; do which python3.$i && python3.$i -m pip --version && "
|
||||
"[ ! -z $LOCAL_PYTHON ] || for i in {{20..5}}; do which python3.$i && python3.$i -m pip --version && "
|
||||
"export LOCAL_PYTHON=$(which python3.$i) && break ; done",
|
||||
'[ ! -z "$CLEARML_AGENT_SKIP_CONTAINER_APT" ] || [ ! -z "$LOCAL_PYTHON" ] || apt-get install -y python3-pip',
|
||||
"[ ! -z $LOCAL_PYTHON ] || export LOCAL_PYTHON=python3",
|
||||
"rm /usr/lib/python3.*/EXTERNALLY-MANAGED", # remove PEP 668
|
||||
"{extra_bash_init_cmd}",
|
||||
"[ ! -z $CLEARML_AGENT_NO_UPDATE ] || $LOCAL_PYTHON -m pip install clearml-agent{agent_install_args}",
|
||||
"{extra_docker_bash_script}",
|
||||
@@ -208,6 +211,10 @@ class K8sIntegration(Worker):
|
||||
self._session.feature_set != "basic" and self._session.check_min_server_version("3.22.3")
|
||||
)
|
||||
|
||||
self.ignore_kubectl_errors_re = (
|
||||
re.compile(ENV_KUBECTL_IGNORE_ERROR.get()) if ENV_KUBECTL_IGNORE_ERROR.get() else None
|
||||
)
|
||||
|
||||
@property
|
||||
def agent_label(self):
|
||||
return self._get_agent_label()
|
||||
@@ -466,13 +473,34 @@ class K8sIntegration(Worker):
|
||||
queue=self.k8s_pending_queue_id,
|
||||
)
|
||||
|
||||
res = self._session.api_client.tasks.enqueue(
|
||||
task_id,
|
||||
queue=self.k8s_pending_queue_id,
|
||||
status_reason='k8s pending scheduler',
|
||||
)
|
||||
if res.meta.result_code != 200:
|
||||
raise Exception(res.meta.result_msg)
|
||||
for attempt in range(2):
|
||||
res = self._session.send_request(
|
||||
"tasks",
|
||||
"enqueue",
|
||||
json={
|
||||
"task": task_id,
|
||||
"queue": self.k8s_pending_queue_id,
|
||||
"status_reason": "k8s pending scheduler",
|
||||
"update_execution_queue": False,
|
||||
}
|
||||
)
|
||||
if res.ok:
|
||||
break
|
||||
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
result_subcode = res.json()["meta"]["result_subcode"]
|
||||
result_msg = res.json()["meta"]["result_msg"]
|
||||
except Exception:
|
||||
result_subcode = None
|
||||
result_msg = res.text
|
||||
|
||||
if attempt == 0 and res.status_code == 400 and result_subcode == 701:
|
||||
# Invalid queue ID, only retry once
|
||||
self._ensure_pending_queue_exists()
|
||||
continue
|
||||
raise Exception(result_msg)
|
||||
|
||||
except Exception as e:
|
||||
self.log.error("ERROR: Could not push back task [{}] to k8s pending queue {} [{}], error: {}".format(
|
||||
task_id, self.k8s_pending_queue_name, self.k8s_pending_queue_id, e))
|
||||
@@ -611,34 +639,76 @@ class K8sIntegration(Worker):
|
||||
print("ERROR: no template for task {}, skipping".format(task_id))
|
||||
return
|
||||
|
||||
pod_name = self.pod_name_prefix + str(task_id)
|
||||
|
||||
self.apply_template_and_handle_result(
|
||||
pod_name=pod_name,
|
||||
clearml_conf_create_script=clearml_conf_create_script,
|
||||
labels=labels,
|
||||
queue=queue,
|
||||
task_id=task_id,
|
||||
namespace=namespace,
|
||||
template=template,
|
||||
docker_image=container['image'],
|
||||
docker_args=container.get('arguments'),
|
||||
docker_bash=container.get('setup_shell_script'),
|
||||
session=session,
|
||||
task_session=task_session,
|
||||
pod_number=pod_number,
|
||||
queue_name=queue_name,
|
||||
task_data=task_data,
|
||||
ports_mode=ports_mode,
|
||||
pod_count=pod_count,
|
||||
)
|
||||
|
||||
def apply_template_and_handle_result(
|
||||
self,
|
||||
pod_name,
|
||||
clearml_conf_create_script: List[str],
|
||||
labels,
|
||||
queue,
|
||||
task_id,
|
||||
namespace,
|
||||
template,
|
||||
docker_image,
|
||||
docker_args,
|
||||
docker_bash,
|
||||
session,
|
||||
task_session,
|
||||
queue_name,
|
||||
task_data,
|
||||
ports_mode,
|
||||
pod_count,
|
||||
pod_number=None,
|
||||
base_spec: dict = None,
|
||||
):
|
||||
"""Apply the provided template with all custom settings and handle bookkeeping for the reaults"""
|
||||
|
||||
output, error, pod_name = self._kubectl_apply(
|
||||
pod_name=pod_name,
|
||||
template=template,
|
||||
pod_number=pod_number,
|
||||
clearml_conf_create_script=clearml_conf_create_script,
|
||||
labels=labels,
|
||||
docker_image=container['image'],
|
||||
docker_args=container.get('arguments'),
|
||||
docker_bash=container.get('setup_shell_script'),
|
||||
docker_image=docker_image,
|
||||
docker_args=docker_args,
|
||||
docker_bash=docker_bash,
|
||||
task_id=task_id,
|
||||
queue=queue,
|
||||
namespace=namespace,
|
||||
task_token=task_session.token.encode("ascii") if task_session else None,
|
||||
base_spec=base_spec,
|
||||
)
|
||||
|
||||
print('kubectl output:\n{}\n{}'.format(error, output))
|
||||
if error:
|
||||
send_log = "Running kubectl encountered an error: {}".format(error)
|
||||
self.log.error(send_log)
|
||||
self.send_logs(task_id, send_log.splitlines())
|
||||
|
||||
# Make sure to remove the task from our k8s pending queue
|
||||
self._session.api_client.queues.remove_task(
|
||||
task=task_id,
|
||||
queue=self.k8s_pending_queue_id,
|
||||
)
|
||||
# Set task as failed
|
||||
session.api_client.tasks.failed(task_id, force=True)
|
||||
return
|
||||
if self.ignore_kubectl_errors_re and self.ignore_kubectl_errors_re.match(error):
|
||||
print(f"Ignoring error due to {ENV_KUBECTL_IGNORE_ERROR.key}")
|
||||
else:
|
||||
self._set_task_failed_while_applying(
|
||||
session, task_id, f"Running kubectl encountered an error: {error}"
|
||||
)
|
||||
return
|
||||
|
||||
if pod_name:
|
||||
self.resource_applied(
|
||||
@@ -650,6 +720,19 @@ class K8sIntegration(Worker):
|
||||
pod_number=pod_number, pod_count=pod_count, task_data=task_data
|
||||
)
|
||||
|
||||
def _set_task_failed_while_applying(self, session, task_id: str, error: str):
|
||||
send_log = "Running kubectl encountered an error: {}".format(error)
|
||||
self.log.error(send_log)
|
||||
self.send_logs(task_id, send_log.splitlines())
|
||||
|
||||
# Make sure to remove the task from our k8s pending queue
|
||||
self._session.api_client.queues.remove_task(
|
||||
task=task_id,
|
||||
queue=self.k8s_pending_queue_id,
|
||||
)
|
||||
# Set task as failed
|
||||
session.api_client.tasks.failed(task_id, force=True)
|
||||
|
||||
def set_task_info(
|
||||
self, task_id: str, task_session, task_data, queue_name: str, ports_mode: bool, pod_number, pod_count
|
||||
):
|
||||
@@ -830,6 +913,7 @@ class K8sIntegration(Worker):
|
||||
|
||||
def _kubectl_apply(
|
||||
self,
|
||||
pod_name,
|
||||
clearml_conf_create_script: List[str],
|
||||
docker_image,
|
||||
docker_args,
|
||||
@@ -841,6 +925,7 @@ class K8sIntegration(Worker):
|
||||
template,
|
||||
pod_number=None,
|
||||
task_token=None,
|
||||
base_spec: dict = None, # base values for the spec (might be overridden)
|
||||
):
|
||||
if "apiVersion" not in template:
|
||||
template["apiVersion"] = "batch/v1" if self.using_jobs else "v1"
|
||||
@@ -855,8 +940,7 @@ class K8sIntegration(Worker):
|
||||
template["kind"] = self.kind.capitalize()
|
||||
|
||||
metadata = template.setdefault('metadata', {})
|
||||
name = self.pod_name_prefix + str(task_id)
|
||||
metadata['name'] = name
|
||||
metadata['name'] = pod_name
|
||||
|
||||
def place_labels(metadata_dict):
|
||||
labels_dict = dict(pair.split('=', 1) for pair in labels)
|
||||
@@ -876,13 +960,16 @@ class K8sIntegration(Worker):
|
||||
|
||||
spec = spec_template.setdefault('spec', {})
|
||||
|
||||
if base_spec:
|
||||
merge_dicts(spec, base_spec)
|
||||
|
||||
containers = spec.setdefault('containers', [])
|
||||
spec.setdefault('restartPolicy', 'Never')
|
||||
|
||||
task_worker_id = self.get_task_worker_id(template, task_id, name, namespace, queue)
|
||||
task_worker_id = self.get_task_worker_id(template, task_id, pod_name, namespace, queue)
|
||||
|
||||
container = self._create_template_container(
|
||||
pod_name=name,
|
||||
pod_name=pod_name,
|
||||
task_id=task_id,
|
||||
docker_image=docker_image,
|
||||
docker_args=docker_args,
|
||||
@@ -928,7 +1015,7 @@ class K8sIntegration(Worker):
|
||||
finally:
|
||||
safe_remove_file(yaml_file)
|
||||
|
||||
return stringify_bash_output(output), stringify_bash_output(error), name
|
||||
return stringify_bash_output(output), stringify_bash_output(error), pod_name
|
||||
|
||||
def _process_bash_lines_response(self, bash_cmd: str, raise_error=True):
|
||||
res = get_bash_output(bash_cmd, raise_error=raise_error)
|
||||
@@ -1013,7 +1100,7 @@ class K8sIntegration(Worker):
|
||||
deleted_pods = defaultdict(list)
|
||||
for namespace in namespaces:
|
||||
if time() - self._last_pod_cleanup_per_ns[namespace] < self._min_cleanup_interval_per_ns_sec:
|
||||
# Do not try to cleanup the same namespace too quickly
|
||||
# Do not try to clean up the same namespace too quickly
|
||||
continue
|
||||
|
||||
try:
|
||||
@@ -1089,6 +1176,21 @@ class K8sIntegration(Worker):
|
||||
def check_if_suspended(self) -> bool:
|
||||
pass
|
||||
|
||||
def check_if_schedulable(self, queue: str) -> bool:
|
||||
return True
|
||||
|
||||
def _ensure_pending_queue_exists(self):
|
||||
resolved_ids = self._resolve_queue_names(
|
||||
[self.k8s_pending_queue_name],
|
||||
create_if_missing=True,
|
||||
create_system_tags=ENV_DEFAULT_SCHEDULER_QUEUE_TAGS.get()
|
||||
)
|
||||
if not resolved_ids:
|
||||
raise ValueError(
|
||||
"Failed resolving or creating k8s pending queue {}".format(self.k8s_pending_queue_name)
|
||||
)
|
||||
self.k8s_pending_queue_id = resolved_ids[0]
|
||||
|
||||
def run_tasks_loop(self, queues: List[Text], worker_params, **kwargs):
|
||||
"""
|
||||
:summary: Pull and run tasks from queues.
|
||||
@@ -1104,14 +1206,8 @@ class K8sIntegration(Worker):
|
||||
|
||||
events_service = self.get_service(Events)
|
||||
|
||||
# make sure we have a k8s pending queue
|
||||
if not self.k8s_pending_queue_id:
|
||||
resolved_ids = self._resolve_queue_names([self.k8s_pending_queue_name], create_if_missing=True)
|
||||
if not resolved_ids:
|
||||
raise ValueError(
|
||||
"Failed resolving or creating k8s pending queue {}".format(self.k8s_pending_queue_name)
|
||||
)
|
||||
self.k8s_pending_queue_id = resolved_ids[0]
|
||||
self._ensure_pending_queue_exists()
|
||||
|
||||
_last_machine_update_ts = 0
|
||||
while True:
|
||||
@@ -1143,6 +1239,9 @@ class K8sIntegration(Worker):
|
||||
sleep(self._polling_interval)
|
||||
break
|
||||
|
||||
if not self.check_if_schedulable(queue):
|
||||
continue
|
||||
|
||||
# get next task in queue
|
||||
try:
|
||||
# print(f"debug> getting tasks for queue {queue}")
|
||||
|
||||
@@ -1,11 +1,9 @@
|
||||
from time import sleep
|
||||
from typing import Dict, Tuple, Optional, List
|
||||
from typing import Dict, List
|
||||
|
||||
from clearml_agent.backend_api.session import Request
|
||||
from clearml_agent.glue.utilities import get_bash_output
|
||||
|
||||
from clearml_agent.helper.process import stringify_bash_output
|
||||
|
||||
from .daemon import K8sDaemon
|
||||
from .utilities import get_path
|
||||
from .errors import GetPodsError
|
||||
@@ -38,7 +36,11 @@ class PendingPodsDaemon(K8sDaemon):
|
||||
return get_path(pod, "metadata", "name")
|
||||
|
||||
def _get_task_id(self, pod: dict):
|
||||
return self._get_k8s_resource_name(pod).rpartition('-')[-1]
|
||||
prefix, _, value = self._get_k8s_resource_name(pod).rpartition('-')
|
||||
if len(value) > 4:
|
||||
return value
|
||||
# we assume this is a multi-node rank x (>0) pod
|
||||
return prefix.rpartition('-')[-1] or value
|
||||
|
||||
@staticmethod
|
||||
def _get_k8s_resource_namespace(pod: dict):
|
||||
@@ -239,6 +241,11 @@ class PendingPodsDaemon(K8sDaemon):
|
||||
result_msg = get_path(result.json(), 'meta', 'result_msg')
|
||||
raise Exception(result_msg or result.text)
|
||||
|
||||
self._agent.send_logs(
|
||||
task_id, ["Kubernetes Pod status: {}".format(msg)],
|
||||
session=self._session
|
||||
)
|
||||
|
||||
# update last msg for this task
|
||||
self._last_tasks_msgs[task_id] = msg
|
||||
except Exception as ex:
|
||||
|
||||
@@ -14,7 +14,7 @@ def merge_dicts(dict1, dict2, custom_merge_func=None):
|
||||
return dict2
|
||||
for k in dict2:
|
||||
if k in dict1:
|
||||
res = None
|
||||
res = _not_set
|
||||
if custom_merge_func:
|
||||
res = custom_merge_func(k, dict1[k], dict2[k], _not_set)
|
||||
dict1[k] = merge_dicts(dict1[k], dict2[k], custom_merge_func) if res is _not_set else res
|
||||
|
||||
@@ -7,7 +7,7 @@ from .requirements import SimpleSubstitution
|
||||
|
||||
class PriorityPackageRequirement(SimpleSubstitution):
|
||||
|
||||
name = ("cython", "numpy", "setuptools", "pip", )
|
||||
name = ("cython", "numpy", "setuptools", "pip", "uv", )
|
||||
optional_package_names = tuple()
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
|
||||
234
clearml_agent/helper/package/uv_api.py
Normal file
234
clearml_agent/helper/package/uv_api.py
Normal file
@@ -0,0 +1,234 @@
|
||||
from copy import deepcopy
|
||||
from functools import wraps
|
||||
|
||||
import attr
|
||||
import sys
|
||||
import os
|
||||
from pathlib2 import Path
|
||||
|
||||
from clearml_agent.definitions import ENV_AGENT_FORCE_UV
|
||||
from clearml_agent.helper.base import select_for_platform
|
||||
from clearml_agent.helper.process import Argv, DEVNULL, check_if_command_exists
|
||||
from clearml_agent.session import Session, UV
|
||||
|
||||
|
||||
def prop_guard(prop, log_prop=None):
|
||||
assert isinstance(prop, property)
|
||||
assert not log_prop or isinstance(log_prop, property)
|
||||
|
||||
def decorator(func):
|
||||
message = "%s:%s calling {}, {} = %s".format(func.__name__, prop.fget.__name__)
|
||||
|
||||
@wraps(func)
|
||||
def new_func(self, *args, **kwargs):
|
||||
prop_value = prop.fget(self)
|
||||
if log_prop:
|
||||
log_prop.fget(self).debug(
|
||||
message,
|
||||
type(self).__name__,
|
||||
"" if prop_value else " not",
|
||||
prop_value,
|
||||
)
|
||||
if prop_value:
|
||||
return func(self, *args, **kwargs)
|
||||
|
||||
return new_func
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
class UvConfig:
|
||||
def __init__(self, session):
|
||||
# type: (Session, str) -> None
|
||||
self.session = session
|
||||
self._log = session.get_logger(__name__)
|
||||
self._python = (
|
||||
sys.executable
|
||||
) # default, overwritten from session config in initialize()
|
||||
self._initialized = False
|
||||
|
||||
@property
|
||||
def log(self):
|
||||
return self._log
|
||||
|
||||
@property
|
||||
def enabled(self):
|
||||
return (
|
||||
ENV_AGENT_FORCE_UV.get()
|
||||
or self.session.config["agent.package_manager.type"] == UV
|
||||
)
|
||||
|
||||
_guard_enabled = prop_guard(enabled, log)
|
||||
|
||||
def run(self, *args, **kwargs):
|
||||
func = kwargs.pop("func", Argv.get_output)
|
||||
kwargs.setdefault("stdin", DEVNULL)
|
||||
kwargs["env"] = deepcopy(os.environ)
|
||||
if "VIRTUAL_ENV" in kwargs["env"] or "CONDA_PREFIX" in kwargs["env"]:
|
||||
kwargs["env"].pop("VIRTUAL_ENV", None)
|
||||
kwargs["env"].pop("CONDA_PREFIX", None)
|
||||
kwargs["env"].pop("PYTHONPATH", None)
|
||||
if hasattr(sys, "real_prefix") and hasattr(sys, "base_prefix"):
|
||||
path = ":" + kwargs["env"]["PATH"]
|
||||
path = path.replace(":" + sys.base_prefix, ":" + sys.real_prefix, 1)
|
||||
kwargs["env"]["PATH"] = path
|
||||
|
||||
if self.session and self.session.config and args and args[0] == "sync":
|
||||
# Set the cache dir to venvs dir
|
||||
cache_dir = self.session.config.get("agent.venvs_dir", None)
|
||||
if cache_dir is not None:
|
||||
os.environ["UV_CACHE_DIR"] = cache_dir
|
||||
|
||||
extra_args = self.session.config.get(
|
||||
"agent.package_manager.uv_sync_extra_args", None
|
||||
)
|
||||
if extra_args:
|
||||
args = args + tuple(extra_args)
|
||||
|
||||
if check_if_command_exists("uv"):
|
||||
argv = Argv("uv", *args)
|
||||
else:
|
||||
argv = Argv(self._python, "-m", "uv", *args)
|
||||
self.log.debug("running: %s", argv)
|
||||
return func(argv, **kwargs)
|
||||
|
||||
@_guard_enabled
|
||||
def initialize(self, cwd=None):
|
||||
if not self._initialized:
|
||||
# use correct python version -- detected in Worker.install_virtualenv() and written to
|
||||
# session
|
||||
if self.session.config.get("agent.python_binary", None):
|
||||
self._python = self.session.config.get("agent.python_binary")
|
||||
|
||||
if (
|
||||
self.session.config.get("agent.package_manager.uv_version", None)
|
||||
is not None
|
||||
):
|
||||
version = str(
|
||||
self.session.config.get("agent.package_manager.uv_version")
|
||||
)
|
||||
|
||||
# get uv version
|
||||
version = version.replace(" ", "")
|
||||
if (
|
||||
("=" in version)
|
||||
or ("~" in version)
|
||||
or ("<" in version)
|
||||
or (">" in version)
|
||||
):
|
||||
version = version
|
||||
elif version:
|
||||
version = "==" + version
|
||||
# (we are not running it yet)
|
||||
argv = Argv(
|
||||
self._python,
|
||||
"-m",
|
||||
"pip",
|
||||
"install",
|
||||
"uv{}".format(version),
|
||||
"--upgrade",
|
||||
"--disable-pip-version-check",
|
||||
)
|
||||
# this is just for beauty and checks, we already set the verion in the Argv
|
||||
if not version:
|
||||
version = "latest"
|
||||
else:
|
||||
# mark to install uv if not already installed (we are not running it yet)
|
||||
argv = Argv(
|
||||
self._python,
|
||||
"-m",
|
||||
"pip",
|
||||
"install",
|
||||
"uv",
|
||||
"--disable-pip-version-check",
|
||||
)
|
||||
version = ""
|
||||
|
||||
# first upgrade pip if we need to
|
||||
try:
|
||||
from clearml_agent.helper.package.pip_api.venv import VirtualenvPip
|
||||
|
||||
pip = VirtualenvPip(
|
||||
session=self.session,
|
||||
python=self._python,
|
||||
requirements_manager=None,
|
||||
path=None,
|
||||
interpreter=self._python,
|
||||
)
|
||||
pip.upgrade_pip()
|
||||
except Exception as ex:
|
||||
self.log.warning("failed upgrading pip: {}".format(ex))
|
||||
|
||||
# check if we do not have a specific version and uv is found skip installation
|
||||
if not version and check_if_command_exists("uv"):
|
||||
print(
|
||||
"Notice: uv was found, no specific version required, skipping uv installation"
|
||||
)
|
||||
else:
|
||||
print("Installing / Upgrading uv package to {}".format(version))
|
||||
# now install uv
|
||||
try:
|
||||
print(argv.get_output())
|
||||
except Exception as ex:
|
||||
self.log.warning("failed installing uv: {}".format(ex))
|
||||
|
||||
# all done.
|
||||
self._initialized = True
|
||||
|
||||
def get_api(self, path):
|
||||
# type: (Path) -> UvAPI
|
||||
return UvAPI(self, path)
|
||||
|
||||
|
||||
@attr.s
|
||||
class UvAPI(object):
|
||||
config = attr.ib(type=UvConfig)
|
||||
path = attr.ib(type=Path, converter=Path)
|
||||
|
||||
INDICATOR_FILES = "pyproject.toml", "uv.lock"
|
||||
|
||||
def install(self):
|
||||
# type: () -> bool
|
||||
if self.enabled:
|
||||
self.config.run("sync", "--locked", cwd=str(self.path), func=Argv.check_call)
|
||||
return True
|
||||
return False
|
||||
|
||||
@property
|
||||
def enabled(self):
|
||||
return self.config.enabled and (
|
||||
any((self.path / indicator).exists() for indicator in self.INDICATOR_FILES)
|
||||
)
|
||||
|
||||
def freeze(self, freeze_full_environment=False):
|
||||
python = Path(self.path) / ".venv" / select_for_platform(linux="bin/python", windows="scripts/python.exe")
|
||||
lines = self.config.run("pip", "freeze", "--python", str(python), cwd=str(self.path)).splitlines()
|
||||
# fix local filesystem reference in freeze
|
||||
from clearml_agent.external.requirements_parser.requirement import Requirement
|
||||
packages = [Requirement.parse(p) for p in lines]
|
||||
for p in packages:
|
||||
if p.local_file and p.editable:
|
||||
p.path = str(Path(p.path).relative_to(self.path))
|
||||
p.line = "-e {}".format(p.path)
|
||||
|
||||
return {
|
||||
"pip": [p.line for p in packages]
|
||||
}
|
||||
|
||||
def get_python_command(self, extra):
|
||||
if check_if_command_exists("uv"):
|
||||
return Argv("uv", "run", "python", *extra)
|
||||
else:
|
||||
return Argv(self.config._python, "-m", "uv", "run", "python", *extra)
|
||||
|
||||
def upgrade_pip(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def set_selected_package_manager(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def out_of_scope_install_package(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def install_from_file(self, *args, **kwargs):
|
||||
pass
|
||||
@@ -24,6 +24,7 @@ from clearml_agent.helper.docker_args import DockerArgsSanitizer, sanitize_urls
|
||||
from .version import __version__
|
||||
|
||||
POETRY = "poetry"
|
||||
UV = "uv"
|
||||
|
||||
|
||||
@attr.s
|
||||
|
||||
@@ -53,8 +53,9 @@ agent {
|
||||
# select python package manager:
|
||||
# currently supported pip and conda
|
||||
# poetry is used if pip selected and repository contains poetry.lock file
|
||||
# uv is used if pip selected and repository contains uv.lock file
|
||||
package_manager: {
|
||||
# supported options: pip, conda, poetry
|
||||
# supported options: pip, conda, poetry, uv
|
||||
type: pip,
|
||||
|
||||
# specify pip version to use (examples "<20.2", "==19.3.1", "", empty string will install the latest version)
|
||||
|
||||
@@ -74,8 +74,10 @@ agent {
|
||||
# If Poetry is selected and the root repository contains `poetry.lock` or `pyproject.toml`,
|
||||
# the "installed packages" section is ignored, and poetry is used.
|
||||
# If Poetry is selected and no lock file is found, it reverts to "pip" package manager behaviour.
|
||||
# If uv is selected and the root repository contains `uv.lock` or `pyproject.toml`,
|
||||
# the "installed packages" section is ignored, and uv is used.
|
||||
package_manager: {
|
||||
# supported options: pip, conda, poetry
|
||||
# supported options: pip, conda, poetry, uv
|
||||
type: pip,
|
||||
|
||||
# specify pip version to use (examples "<20.2", "==19.3.1", "", empty string will install the latest version)
|
||||
@@ -83,6 +85,8 @@ agent {
|
||||
# specify poetry version to use (examples "<2", "==1.1.1", "", empty string will install the latest version)
|
||||
# poetry_version: "<2",
|
||||
# poetry_install_extra_args: ["-v"]
|
||||
# uv_version: ">0.4",
|
||||
# uv_sync_extra_args: ["--all-extras"]
|
||||
|
||||
# virtual environment inheres packages from system
|
||||
system_site_packages: false,
|
||||
|
||||
@@ -13,3 +13,4 @@ six>=1.13.0,<1.17.0
|
||||
typing>=3.6.4,<3.8.0 ; python_version < '3.5'
|
||||
urllib3>=1.21.1,<2
|
||||
virtualenv>=16,<21
|
||||
setuptools ; python_version > '3.11'
|
||||
|
||||
8
setup.py
8
setup.py
@@ -1,5 +1,5 @@
|
||||
"""
|
||||
ClearML - Artificial Intelligence Version Control
|
||||
ClearML Inc.
|
||||
CLEARML-AGENT DevOps for machine/deep learning
|
||||
https://github.com/allegroai/clearml-agent
|
||||
"""
|
||||
@@ -40,8 +40,8 @@ setup(
|
||||
long_description_content_type='text/markdown',
|
||||
# The project's main homepage.
|
||||
url='https://github.com/allegroai/clearml-agent',
|
||||
author='Allegroai',
|
||||
author_email='clearml@allegro.ai',
|
||||
author='clearml',
|
||||
author_email='clearml@clearml.ai',
|
||||
license='Apache License 2.0',
|
||||
classifiers=[
|
||||
'Development Status :: 5 - Production/Stable',
|
||||
@@ -56,7 +56,6 @@ setup(
|
||||
'Topic :: Scientific/Engineering :: Image Recognition',
|
||||
'Topic :: System :: Logging',
|
||||
'Topic :: System :: Monitoring',
|
||||
'Programming Language :: Python :: 3.5',
|
||||
'Programming Language :: Python :: 3.6',
|
||||
'Programming Language :: Python :: 3.7',
|
||||
'Programming Language :: Python :: 3.8',
|
||||
@@ -64,6 +63,7 @@ setup(
|
||||
'Programming Language :: Python :: 3.10',
|
||||
'Programming Language :: Python :: 3.11',
|
||||
'Programming Language :: Python :: 3.12',
|
||||
'Programming Language :: Python :: 3.13',
|
||||
'License :: OSI Approved :: Apache Software License',
|
||||
],
|
||||
|
||||
|
||||
Reference in New Issue
Block a user