mirror of
https://github.com/clearml/clearml-agent
synced 2025-06-26 18:16:15 +00:00
Compare commits
53 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3273f76b46 | ||
|
|
9af0f9fe41 | ||
|
|
205cd47cb9 | ||
|
|
0ff428bb96 | ||
|
|
bf8d9c96e9 | ||
|
|
a88487ff25 | ||
|
|
785e22dc87 | ||
|
|
6a2b778d53 | ||
|
|
b2c3702830 | ||
|
|
6302d43990 | ||
|
|
760bbca74e | ||
|
|
e63fd31420 | ||
|
|
2ff9985db7 | ||
|
|
b8c762401b | ||
|
|
99e1e54f94 | ||
|
|
a4d3b5bad6 | ||
|
|
b21665ed6e | ||
|
|
f877aa96e2 | ||
|
|
f99344d194 | ||
|
|
d9f2a1999a | ||
|
|
79d0abe707 | ||
|
|
6213ef4c02 | ||
|
|
aef6aa9fc8 | ||
|
|
0bb267115b | ||
|
|
f89a92556f | ||
|
|
8ba4d75e80 | ||
|
|
edc333ba5f | ||
|
|
2f0553b873 | ||
|
|
b2a4bf08ac | ||
|
|
f18c6b809f | ||
|
|
cd5b4d2186 | ||
|
|
5f1bab6711 | ||
|
|
ab9b9db0c9 | ||
|
|
93df021108 | ||
|
|
700ae85de0 | ||
|
|
f367c5a571 | ||
|
|
ebc5944b44 | ||
|
|
8f41002845 | ||
|
|
7e8670d57f | ||
|
|
77de343863 | ||
|
|
6b31883e45 | ||
|
|
e48b4756fa | ||
|
|
47147e3237 | ||
|
|
41fc4ec646 | ||
|
|
441e5a73b2 | ||
|
|
27ed6821c4 | ||
|
|
10c6629982 | ||
|
|
6fb48a4c6e | ||
|
|
105ade31f1 | ||
|
|
502e266b6b | ||
|
|
cd9a3b9f4e | ||
|
|
4179ac5234 | ||
|
|
98cc0d86ba |
2
.gitignore
vendored
2
.gitignore
vendored
@@ -14,3 +14,5 @@ dist/
|
||||
# VSCode
|
||||
.vscode
|
||||
|
||||
# MirrorD
|
||||
.mirrord
|
||||
|
||||
@@ -66,7 +66,7 @@
|
||||
type: pip,
|
||||
|
||||
# specify pip version to use (examples "<20.2", "==19.3.1", "", empty string will install the latest version)
|
||||
pip_version: ["<20.2 ; python_version < '3.10'", "<22.3 ; python_version >= '3.10'"],
|
||||
pip_version: ["<20.2 ; python_version < '3.10'", "<22.3 ; python_version >= '3.10' and python_version <= '3.11'", ">=23,<24.3 ; python_version >= '3.12'"]
|
||||
# specify poetry version to use (examples "<2", "==1.1.1", "", empty string will install the latest version)
|
||||
# poetry_version: "<2",
|
||||
# poetry_install_extra_args: ["-v"]
|
||||
@@ -80,6 +80,14 @@
|
||||
# additional artifact repositories to use when installing python packages
|
||||
# extra_index_url: ["https://allegroai.jfrog.io/clearml/api/pypi/public/simple"]
|
||||
|
||||
# turn on the "--use-deprecated=legacy-resolver" flag for pip, to avoid package dependency version mismatch
|
||||
# is any version restrictions are matched we add the "--use-deprecated=legacy-resolver" flag
|
||||
# example: pip_legacy_resolver = [">=20.3,<24.3", ">99"]
|
||||
# if pip==20.2 or pip==29.0 is installed we do nothing,
|
||||
# if pip==21.1 or pip==101.1 is installed the flag is added
|
||||
# disable the feature by passing an empty list
|
||||
pip_legacy_resolver = [">=20.3,<24.3"]
|
||||
|
||||
# control the pytorch wheel resolving algorithm, options are: "pip", "direct", "none"
|
||||
# Override with environment variable CLEARML_AGENT_PACKAGE_PYTORCH_RESOLVE
|
||||
# "pip" (default): would automatically detect the cuda version, and supply pip with the correct
|
||||
@@ -218,6 +226,76 @@
|
||||
|
||||
# optional arguments to pass to docker image
|
||||
# arguments: ["--ipc=host", ]
|
||||
|
||||
# Choose the default docker based on the Task properties,
|
||||
# Notice: Enterprise feature, ignored otherwise
|
||||
# Examples: 'script.requirements', 'script.binary', 'script.repository', 'script.branch', 'project'
|
||||
# Notice: Matching is done via regular expression, for example "^searchme$" will match exactly "searchme" string
|
||||
"match_rules": [
|
||||
{
|
||||
"image": "python:3.6-bullseye",
|
||||
"arguments": "--ipc=host",
|
||||
"match": {
|
||||
"script": {
|
||||
"binary": "python3.6$",
|
||||
},
|
||||
}
|
||||
},
|
||||
{
|
||||
"image": "python:3.7-bullseye",
|
||||
"arguments": "--ipc=host",
|
||||
"match": {
|
||||
"script": {
|
||||
"binary": "python3.7$",
|
||||
},
|
||||
}
|
||||
},
|
||||
{
|
||||
"image": "python:3.8-bullseye",
|
||||
"arguments": "--ipc=host",
|
||||
"match": {
|
||||
"script": {
|
||||
"binary": "python3.8$",
|
||||
},
|
||||
}
|
||||
},
|
||||
{
|
||||
"image": "python:3.9-bullseye",
|
||||
"arguments": "--ipc=host",
|
||||
"match": {
|
||||
"script": {
|
||||
"binary": "python3.9$",
|
||||
},
|
||||
}
|
||||
},
|
||||
{
|
||||
"image": "python:3.10-bullseye",
|
||||
"arguments": "--ipc=host",
|
||||
"match": {
|
||||
"script": {
|
||||
"binary": "python3.10$",
|
||||
},
|
||||
}
|
||||
},
|
||||
{
|
||||
"image": "python:3.11-bullseye",
|
||||
"arguments": "--ipc=host",
|
||||
"match": {
|
||||
"script": {
|
||||
"binary": "python3.11$",
|
||||
},
|
||||
}
|
||||
},
|
||||
{
|
||||
"image": "python:3.12-bullseye",
|
||||
"arguments": "--ipc=host",
|
||||
"match": {
|
||||
"script": {
|
||||
"binary": "python3.12$",
|
||||
},
|
||||
}
|
||||
},
|
||||
]
|
||||
}
|
||||
|
||||
# set the OS environments based on the Task's Environment section before launching the Task process.
|
||||
@@ -289,6 +367,7 @@
|
||||
pip_cache: "/root/.cache/pip"
|
||||
poetry_cache: "/root/.cache/pypoetry"
|
||||
vcs_cache: "/root/.clearml/vcs-cache"
|
||||
venvs_cache: "/root/.clearml/venvs-cache"
|
||||
venv_build: "~/.clearml/venvs-builds"
|
||||
pip_download: "/root/.clearml/pip-download-cache"
|
||||
}
|
||||
|
||||
@@ -22,6 +22,9 @@ ENV_INITIAL_CONNECT_RETRY_OVERRIDE = EnvEntry(
|
||||
'CLEARML_AGENT_INITIAL_CONNECT_RETRY_OVERRIDE', default=True, converter=safe_text_to_bool
|
||||
)
|
||||
ENV_FORCE_MAX_API_VERSION = EnvEntry("CLEARML_AGENT_FORCE_MAX_API_VERSION", type=str)
|
||||
# values are 0/None (task per node), 1/2 (multi-node reporting, colored console), -1 (only report rank 0 node)
|
||||
ENV_MULTI_NODE_SINGLE_TASK = EnvEntry("CLEARML_MULTI_NODE_SINGLE_TASK", type=int, default=None)
|
||||
|
||||
|
||||
"""
|
||||
Experimental option to set the request method for all API requests and auth login.
|
||||
|
||||
@@ -64,6 +64,8 @@ class Session(TokenManager):
|
||||
default_key = "EGRTCO8JMSIGI6S39GTP43NFWXDQOW"
|
||||
default_secret = "x!XTov_G-#vspE*Y(h$Anm&DIc5Ou-F)jsl$PdOyj5wG1&E!Z8"
|
||||
force_max_api_version = ENV_FORCE_MAX_API_VERSION.get()
|
||||
server_version = "1.0.0"
|
||||
user_id = None
|
||||
|
||||
# TODO: add requests.codes.gateway_timeout once we support async commits
|
||||
_retry_codes = [
|
||||
@@ -191,6 +193,8 @@ class Session(TokenManager):
|
||||
|
||||
Session.api_version = str(api_version)
|
||||
Session.feature_set = str(token_dict.get('feature_set', self.feature_set) or "basic")
|
||||
Session.server_version = token_dict.get('server_version', self.server_version)
|
||||
Session.user_id = (token_dict.get("identity") or {}).get("user") or ""
|
||||
except (jwt.DecodeError, ValueError):
|
||||
pass
|
||||
|
||||
@@ -256,8 +260,9 @@ class Session(TokenManager):
|
||||
def parse(vault):
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
print("Loaded {} vault: {}".format(
|
||||
print("Loaded {} vault{}: {}".format(
|
||||
vault.get("scope", ""),
|
||||
"" if not self.user_id else " for user {}".format(self.user_id),
|
||||
(vault.get("description", None) or "")[:50] or vault.get("id", ""))
|
||||
)
|
||||
d = vault.get("data", None)
|
||||
@@ -341,11 +346,12 @@ class Session(TokenManager):
|
||||
if self._propagate_exceptions_on_send:
|
||||
raise
|
||||
sleep_time = sys_random.uniform(*self._request_exception_retry_timeout)
|
||||
self._logger.error(
|
||||
"{} exception sending {} {}: {} (retrying in {:.1f}sec)".format(
|
||||
type(ex).__name__, method.upper(), url, str(ex), sleep_time
|
||||
if self._logger:
|
||||
self._logger.error(
|
||||
"{} exception sending {} {}: {} (retrying in {:.1f}sec)".format(
|
||||
type(ex).__name__, method.upper(), url, str(ex), sleep_time
|
||||
)
|
||||
)
|
||||
)
|
||||
time.sleep(sleep_time)
|
||||
continue
|
||||
|
||||
@@ -364,11 +370,12 @@ class Session(TokenManager):
|
||||
res.status_code == requests.codes.service_unavailable
|
||||
and self.config.get("api.http.wait_on_maintenance_forever", True)
|
||||
):
|
||||
self._logger.warning(
|
||||
"Service unavailable: {} is undergoing maintenance, retrying...".format(
|
||||
host
|
||||
if self._logger:
|
||||
self._logger.warning(
|
||||
"Service unavailable: {} is undergoing maintenance, retrying...".format(
|
||||
host
|
||||
)
|
||||
)
|
||||
)
|
||||
continue
|
||||
break
|
||||
self._session_requests += 1
|
||||
@@ -649,11 +656,14 @@ class Session(TokenManager):
|
||||
"""
|
||||
Return True if Session.api_version is greater or equal >= to min_api_version
|
||||
"""
|
||||
def version_tuple(v):
|
||||
v = tuple(map(int, (v.split("."))))
|
||||
return v + (0,) * max(0, 3 - len(v))
|
||||
return version_tuple(cls.api_version) >= version_tuple(str(min_api_version))
|
||||
|
||||
@classmethod
|
||||
def check_min_server_version(cls, min_server_version):
|
||||
"""
|
||||
Return True if Session.server_version is greater or equal >= to min_server_version
|
||||
"""
|
||||
return version_tuple(cls.server_version) >= version_tuple(str(min_server_version))
|
||||
def _do_refresh_token(self, current_token, exp=None):
|
||||
""" TokenManager abstract method implementation.
|
||||
Here we ignore the old token and simply obtain a new token.
|
||||
@@ -731,3 +741,8 @@ class Session(TokenManager):
|
||||
def propagate_exceptions_on_send(self, value):
|
||||
# type: (bool) -> None
|
||||
self._propagate_exceptions_on_send = value
|
||||
|
||||
|
||||
def version_tuple(v):
|
||||
v = tuple(map(int, (v.split("."))))
|
||||
return v + (0,) * max(0, 3 - len(v))
|
||||
|
||||
@@ -53,7 +53,7 @@ def apply_files(config):
|
||||
target_fmt = data.get("target_format", "string")
|
||||
overwrite = bool(data.get("overwrite", True))
|
||||
contents = data.get("contents")
|
||||
mode = data.get("mode")
|
||||
mode = data.get("mode", None)
|
||||
|
||||
target = Path(expanduser(expandvars(path)))
|
||||
|
||||
|
||||
@@ -1,14 +1,16 @@
|
||||
import json
|
||||
import re
|
||||
import shlex
|
||||
from copy import copy
|
||||
|
||||
from clearml_agent.backend_api.session import Request
|
||||
from clearml_agent.helper.docker_args import DockerArgsSanitizer
|
||||
from clearml_agent.helper.package.requirements import (
|
||||
RequirementsManager, MarkerRequirement,
|
||||
compare_version_rules, )
|
||||
|
||||
|
||||
def resolve_default_container(session, task_id, container_config):
|
||||
def resolve_default_container(session, task_id, container_config, ignore_match_rules=False):
|
||||
container_lookup = session.config.get('agent.default_docker.match_rules', None)
|
||||
if not session.check_min_api_version("2.13") or not container_lookup:
|
||||
return container_config
|
||||
@@ -17,6 +19,12 @@ def resolve_default_container(session, task_id, container_config):
|
||||
try:
|
||||
session.verify_feature_set('advanced')
|
||||
except ValueError:
|
||||
# ignoring matching rules only supported in higher tiers
|
||||
return container_config
|
||||
|
||||
if ignore_match_rules:
|
||||
print("INFO: default docker command line override, ignoring default docker container match rules")
|
||||
# ignoring matching rules only supported in higher tiers
|
||||
return container_config
|
||||
|
||||
result = session.send_request(
|
||||
@@ -159,9 +167,10 @@ def resolve_default_container(session, task_id, container_config):
|
||||
if not container_config.get('image'):
|
||||
container_config['image'] = entry.get('image', None)
|
||||
if not container_config.get('arguments'):
|
||||
container_config['arguments'] = entry.get('arguments', None)
|
||||
container_config['arguments'] = shlex.split(str(container_config.get('arguments') or '').strip())
|
||||
print('Matching default container with rule:\n{}'.format(json.dumps(entry)))
|
||||
container_config['arguments'] = entry.get('arguments', None) or ''
|
||||
if isinstance(container_config.get('arguments'), str):
|
||||
container_config['arguments'] = shlex.split(str(container_config.get('arguments') or '').strip())
|
||||
print('INFO: Matching default container with rule:\n{}'.format(json.dumps(entry)))
|
||||
return container_config
|
||||
|
||||
return container_config
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -167,6 +167,7 @@ ENV_AGENT_GIT_USER = EnvironmentConfig("CLEARML_AGENT_GIT_USER", "TRAINS_AGENT_G
|
||||
ENV_AGENT_GIT_PASS = EnvironmentConfig("CLEARML_AGENT_GIT_PASS", "TRAINS_AGENT_GIT_PASS")
|
||||
ENV_AGENT_GIT_HOST = EnvironmentConfig("CLEARML_AGENT_GIT_HOST", "TRAINS_AGENT_GIT_HOST")
|
||||
ENV_AGENT_DISABLE_SSH_MOUNT = EnvironmentConfig("CLEARML_AGENT_DISABLE_SSH_MOUNT", type=bool)
|
||||
ENV_AGENT_DEBUG_GET_NEXT_TASK = EnvironmentConfig("CLEARML_AGENT_DEBUG_GET_NEXT_TASK", type=bool)
|
||||
ENV_SSH_AUTH_SOCK = EnvironmentConfig("SSH_AUTH_SOCK")
|
||||
ENV_TASK_EXECUTE_AS_USER = EnvironmentConfig("CLEARML_AGENT_EXEC_USER", "TRAINS_AGENT_EXEC_USER")
|
||||
ENV_TASK_EXTRA_PYTHON_PATH = EnvironmentConfig("CLEARML_AGENT_EXTRA_PYTHON_PATH", "TRAINS_AGENT_EXTRA_PYTHON_PATH")
|
||||
|
||||
@@ -9,3 +9,12 @@ Script will be appended to the specified file.
|
||||
ENV_DEFAULT_EXECUTION_AGENT_ARGS = EnvEntry("K8S_GLUE_DEF_EXEC_AGENT_ARGS", default="--full-monitoring --require-queue")
|
||||
ENV_POD_AGENT_INSTALL_ARGS = EnvEntry("K8S_GLUE_POD_AGENT_INSTALL_ARGS", default="", lstrip=False)
|
||||
ENV_POD_MONITOR_LOG_BATCH_SIZE = EnvEntry("K8S_GLUE_POD_MONITOR_LOG_BATCH_SIZE", default=5, converter=int)
|
||||
ENV_POD_MONITOR_DISABLE_ENQUEUE_ON_PREEMPTION = EnvEntry(
|
||||
"K8S_GLUE_POD_MONITOR_DISABLE_ENQUEUE_ON_PREEMPTION", default=False, converter=bool
|
||||
)
|
||||
|
||||
ENV_POD_USE_IMAGE_ENTRYPOINT = EnvEntry("K8S_GLUE_POD_USE_IMAGE_ENTRYPOINT", default=False, converter=bool)
|
||||
"""
|
||||
Do not inject a cmd and args to the container's image when building the k8s template (depend on the built-in image
|
||||
entrypoint)
|
||||
"""
|
||||
@@ -25,6 +25,7 @@ from clearml_agent.definitions import (
|
||||
ENV_AGENT_GIT_USER,
|
||||
ENV_AGENT_GIT_PASS,
|
||||
ENV_FORCE_SYSTEM_SITE_PACKAGES,
|
||||
ENV_AGENT_DEBUG_GET_NEXT_TASK,
|
||||
)
|
||||
from clearml_agent.errors import APIError, UsageError
|
||||
from clearml_agent.glue.errors import GetPodCountError
|
||||
@@ -40,6 +41,7 @@ from clearml_agent.glue.definitions import (
|
||||
ENV_START_AGENT_SCRIPT_PATH,
|
||||
ENV_DEFAULT_EXECUTION_AGENT_ARGS,
|
||||
ENV_POD_AGENT_INSTALL_ARGS,
|
||||
ENV_POD_USE_IMAGE_ENTRYPOINT,
|
||||
)
|
||||
|
||||
|
||||
@@ -67,16 +69,23 @@ class K8sIntegration(Worker):
|
||||
'echo "ldconfig" >> /etc/profile',
|
||||
"/usr/sbin/sshd -p {port}"]
|
||||
|
||||
CONTAINER_BASH_SCRIPT = [
|
||||
_CONTAINER_APT_SCRIPT_SECTION = [
|
||||
"export DEBIAN_FRONTEND='noninteractive'",
|
||||
"echo 'Binary::apt::APT::Keep-Downloaded-Packages \"true\";' > /etc/apt/apt.conf.d/docker-clean",
|
||||
"chown -R root /root/.cache/pip",
|
||||
"apt-get update",
|
||||
"apt-get install -y git libsm6 libxext6 libxrender-dev libglib2.0-0",
|
||||
]
|
||||
|
||||
CONTAINER_BASH_SCRIPT = [
|
||||
*(
|
||||
'[ ! -z "$CLEARML_AGENT_SKIP_CONTAINER_APT" ] || {}'.format(line)
|
||||
for line in _CONTAINER_APT_SCRIPT_SECTION
|
||||
),
|
||||
"declare LOCAL_PYTHON",
|
||||
"[ ! -z $LOCAL_PYTHON ] || for i in {{15..5}}; do which python3.$i && python3.$i -m pip --version && "
|
||||
"export LOCAL_PYTHON=$(which python3.$i) && break ; done",
|
||||
"[ ! -z $LOCAL_PYTHON ] || apt-get install -y python3-pip",
|
||||
'[ ! -z "$CLEARML_AGENT_SKIP_CONTAINER_APT" ] || [ ! -z "$LOCAL_PYTHON" ] || apt-get install -y python3-pip',
|
||||
"[ ! -z $LOCAL_PYTHON ] || export LOCAL_PYTHON=python3",
|
||||
"{extra_bash_init_cmd}",
|
||||
"[ ! -z $CLEARML_AGENT_NO_UPDATE ] || $LOCAL_PYTHON -m pip install clearml-agent{agent_install_args}",
|
||||
@@ -98,6 +107,7 @@ class K8sIntegration(Worker):
|
||||
num_of_services=20,
|
||||
base_pod_num=1,
|
||||
user_props_cb=None,
|
||||
runtime_cb=None,
|
||||
overrides_yaml=None,
|
||||
template_yaml=None,
|
||||
clearml_conf_file=None,
|
||||
@@ -125,6 +135,7 @@ class K8sIntegration(Worker):
|
||||
:param callable user_props_cb: An Optional callable allowing additional user properties to be specified
|
||||
when scheduling a task to run in a pod. Callable can receive an optional pod number and should return
|
||||
a dictionary of user properties (name and value). Signature is [[Optional[int]], Dict[str,str]]
|
||||
:param callable runtime_cb: An Optional callable allowing additional task runtime to be specified (see user_props_cb)
|
||||
:param str overrides_yaml: YAML file containing the overrides for the pod (optional)
|
||||
:param str template_yaml: YAML file containing the template for the pod (optional).
|
||||
If provided the pod is scheduled with kubectl apply and overrides are ignored, otherwise with kubectl run.
|
||||
@@ -159,6 +170,7 @@ class K8sIntegration(Worker):
|
||||
self.base_pod_num = base_pod_num
|
||||
self._edit_hyperparams_support = None
|
||||
self._user_props_cb = user_props_cb
|
||||
self._runtime_cb = runtime_cb
|
||||
self.conf_file_content = None
|
||||
self.overrides_json_string = None
|
||||
self.template_dict = None
|
||||
@@ -192,6 +204,14 @@ class K8sIntegration(Worker):
|
||||
self._min_cleanup_interval_per_ns_sec = 1.0
|
||||
self._last_pod_cleanup_per_ns = defaultdict(lambda: 0.)
|
||||
|
||||
self._server_supports_same_state_transition = (
|
||||
self._session.feature_set != "basic" and self._session.check_min_server_version("3.22.3")
|
||||
)
|
||||
|
||||
@property
|
||||
def agent_label(self):
|
||||
return self._get_agent_label()
|
||||
|
||||
def _create_daemon_instance(self, cls_, **kwargs):
|
||||
return cls_(agent=self, **kwargs)
|
||||
|
||||
@@ -424,6 +444,9 @@ class K8sIntegration(Worker):
|
||||
""" Called when a resource (pod/job) was applied """
|
||||
pass
|
||||
|
||||
def ports_mode_supported_for_task(self, task_id: str, task_data):
|
||||
return self.ports_mode
|
||||
|
||||
def run_one_task(self, queue: Text, task_id: Text, worker_args=None, task_session=None, **_):
|
||||
print('Pulling task {} launching on kubernetes cluster'.format(task_id))
|
||||
session = task_session or self._session
|
||||
@@ -433,7 +456,9 @@ class K8sIntegration(Worker):
|
||||
if self._is_same_tenant(task_session):
|
||||
try:
|
||||
print('Pushing task {} into temporary pending queue'.format(task_id))
|
||||
_ = session.api_client.tasks.stop(task_id, force=True, status_reason="moving to k8s pending queue")
|
||||
|
||||
if not self._server_supports_same_state_transition:
|
||||
_ = session.api_client.tasks.stop(task_id, force=True, status_reason="moving to k8s pending queue")
|
||||
|
||||
# Just make sure to clean up in case the task is stuck in the queue (known issue)
|
||||
self._session.api_client.queues.remove_task(
|
||||
@@ -493,8 +518,10 @@ class K8sIntegration(Worker):
|
||||
)
|
||||
)
|
||||
|
||||
if self.ports_mode:
|
||||
ports_mode = False
|
||||
if self.ports_mode_supported_for_task(task_id, task_data):
|
||||
print("Kubernetes looking for available pod to use")
|
||||
ports_mode = True
|
||||
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
@@ -505,12 +532,12 @@ class K8sIntegration(Worker):
|
||||
# Search for a free pod number
|
||||
pod_count = 0
|
||||
pod_number = self.base_pod_num
|
||||
while self.ports_mode or self.max_pods_limit:
|
||||
while ports_mode or self.max_pods_limit:
|
||||
pod_number = self.base_pod_num + pod_count
|
||||
|
||||
try:
|
||||
items_count = self._get_pod_count(
|
||||
extra_labels=[self.limit_pod_label.format(pod_number=pod_number)] if self.ports_mode else None,
|
||||
extra_labels=[self.limit_pod_label.format(pod_number=pod_number)] if ports_mode else None,
|
||||
msg="Looking for a free pod/port"
|
||||
)
|
||||
except GetPodCountError:
|
||||
@@ -560,11 +587,11 @@ class K8sIntegration(Worker):
|
||||
break
|
||||
pod_count += 1
|
||||
|
||||
labels = self._get_pod_labels(queue, queue_name)
|
||||
if self.ports_mode:
|
||||
labels = self._get_pod_labels(queue, queue_name, task_data)
|
||||
if ports_mode:
|
||||
labels.append(self.limit_pod_label.format(pod_number=pod_number))
|
||||
|
||||
if self.ports_mode:
|
||||
if ports_mode:
|
||||
print("Kubernetes scheduling task id={} on pod={} (pod_count={})".format(task_id, pod_number, pod_count))
|
||||
else:
|
||||
print("Kubernetes scheduling task id={}".format(task_id))
|
||||
@@ -595,6 +622,7 @@ class K8sIntegration(Worker):
|
||||
task_id=task_id,
|
||||
queue=queue,
|
||||
namespace=namespace,
|
||||
task_token=task_session.token.encode("ascii") if task_session else None,
|
||||
)
|
||||
|
||||
print('kubectl output:\n{}\n{}'.format(error, output))
|
||||
@@ -602,6 +630,14 @@ class K8sIntegration(Worker):
|
||||
send_log = "Running kubectl encountered an error: {}".format(error)
|
||||
self.log.error(send_log)
|
||||
self.send_logs(task_id, send_log.splitlines())
|
||||
|
||||
# Make sure to remove the task from our k8s pending queue
|
||||
self._session.api_client.queues.remove_task(
|
||||
task=task_id,
|
||||
queue=self.k8s_pending_queue_id,
|
||||
)
|
||||
# Set task as failed
|
||||
session.api_client.tasks.failed(task_id, force=True)
|
||||
return
|
||||
|
||||
if pod_name:
|
||||
@@ -609,25 +645,41 @@ class K8sIntegration(Worker):
|
||||
resource_name=pod_name, namespace=namespace, task_id=task_id, session=session
|
||||
)
|
||||
|
||||
self.set_task_info(
|
||||
task_id=task_id, task_session=task_session, queue_name=queue_name, ports_mode=ports_mode,
|
||||
pod_number=pod_number, pod_count=pod_count, task_data=task_data
|
||||
)
|
||||
|
||||
def set_task_info(
|
||||
self, task_id: str, task_session, task_data, queue_name: str, ports_mode: bool, pod_number, pod_count
|
||||
):
|
||||
user_props = {"k8s-queue": str(queue_name)}
|
||||
if self.ports_mode:
|
||||
user_props.update(
|
||||
{
|
||||
"k8s-pod-number": pod_number,
|
||||
"k8s-pod-label": labels[0],
|
||||
"k8s-internal-pod-count": pod_count,
|
||||
"k8s-agent": self._get_agent_label(),
|
||||
}
|
||||
)
|
||||
runtime = {}
|
||||
if ports_mode:
|
||||
agent_label = self._get_agent_label()
|
||||
user_props.update({
|
||||
"k8s-pod-number": pod_number,
|
||||
"k8s-pod-label": agent_label, # backwards-compatibility / legacy
|
||||
"k8s-internal-pod-count": pod_count,
|
||||
"k8s-agent": agent_label,
|
||||
})
|
||||
|
||||
if self._user_props_cb:
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
custom_props = self._user_props_cb(pod_number) if self.ports_mode else self._user_props_cb()
|
||||
custom_props = self._user_props_cb(pod_number) if ports_mode else self._user_props_cb()
|
||||
user_props.update(custom_props)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if self._runtime_cb:
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
custom_runtime = self._runtime_cb(pod_number) if ports_mode else self._runtime_cb()
|
||||
runtime.update(custom_runtime)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if user_props:
|
||||
self._set_task_user_properties(
|
||||
task_id=task_id,
|
||||
@@ -635,7 +687,38 @@ class K8sIntegration(Worker):
|
||||
**user_props
|
||||
)
|
||||
|
||||
def _get_pod_labels(self, queue, queue_name):
|
||||
if runtime:
|
||||
task_runtime = self._get_task_runtime(task_id) or {}
|
||||
task_runtime.update(runtime)
|
||||
|
||||
try:
|
||||
res = task_session.send_request(
|
||||
service='tasks', action='edit', method=Request.def_method,
|
||||
json={
|
||||
"task": task_id, "force": True, "runtime": task_runtime
|
||||
},
|
||||
)
|
||||
if not res.ok:
|
||||
raise Exception("failed setting runtime property")
|
||||
except Exception as ex:
|
||||
print("WARNING: failed setting custom runtime properties for task '{}': {}".format(task_id, ex))
|
||||
|
||||
def _get_task_runtime(self, task_id) -> Optional[dict]:
|
||||
try:
|
||||
res = self._session.send_request(
|
||||
service='tasks', action='get_by_id', method=Request.def_method,
|
||||
json={"task": task_id, "only_fields": ["runtime"]},
|
||||
)
|
||||
if not res.ok:
|
||||
raise ValueError(f"request returned {res.status_code}")
|
||||
data = res.json().get("data")
|
||||
if not data or "task" not in data:
|
||||
raise ValueError("empty data in result")
|
||||
return data["task"].get("runtime", {})
|
||||
except Exception as ex:
|
||||
print(f"ERROR: Failed getting runtime properties for task {task_id}: {ex}")
|
||||
|
||||
def _get_pod_labels(self, queue, queue_name, task_data):
|
||||
return [
|
||||
self._get_agent_label(),
|
||||
"{}={}".format(self.QUEUE_LABEL, self._safe_k8s_label_value(queue)),
|
||||
@@ -673,7 +756,7 @@ class K8sIntegration(Worker):
|
||||
|
||||
def _create_template_container(
|
||||
self, pod_name: str, task_id: str, docker_image: str, docker_args: List[str],
|
||||
docker_bash: str, clearml_conf_create_script: List[str], task_worker_id: str
|
||||
docker_bash: str, clearml_conf_create_script: List[str], task_worker_id: str, task_token: str = None
|
||||
) -> dict:
|
||||
container = self._get_docker_args(
|
||||
docker_args,
|
||||
@@ -682,16 +765,32 @@ class K8sIntegration(Worker):
|
||||
convert=lambda env: {'name': env.partition("=")[0], 'value': env.partition("=")[2]},
|
||||
)
|
||||
|
||||
# Set worker ID
|
||||
env_vars = container.get('env', [])
|
||||
found_worker_id = False
|
||||
for entry in env_vars:
|
||||
if entry.get('name') == 'CLEARML_WORKER_ID':
|
||||
entry['name'] = task_worker_id
|
||||
found_worker_id = True
|
||||
if not found_worker_id:
|
||||
container['env'] = env_vars + [{'name': 'CLEARML_WORKER_ID', 'value': task_worker_id}]
|
||||
def add_or_update_env_var(name, value):
|
||||
env_vars = container.get('env', [])
|
||||
for entry in env_vars:
|
||||
if entry.get('name') == name:
|
||||
entry['value'] = value
|
||||
break
|
||||
else:
|
||||
container['env'] = env_vars + [{'name': name, 'value': value}]
|
||||
|
||||
# Set worker ID
|
||||
add_or_update_env_var('CLEARML_WORKER_ID', task_worker_id)
|
||||
|
||||
if ENV_POD_USE_IMAGE_ENTRYPOINT.get():
|
||||
# Don't add a cmd and args, just the image
|
||||
|
||||
# Add the task ID and token since we need it (it's usually in the init script passed to us
|
||||
add_or_update_env_var('CLEARML_TASK_ID', task_id)
|
||||
if task_token:
|
||||
# TODO: find a way to base64 encode the token
|
||||
add_or_update_env_var('CLEARML_AUTH_TOKEN', task_token)
|
||||
|
||||
return self._merge_containers(
|
||||
container, dict(name=pod_name, image=docker_image)
|
||||
)
|
||||
|
||||
# Create bash script for container and
|
||||
container_bash_script = [self.container_bash_script] if isinstance(self.container_bash_script, str) \
|
||||
else self.container_bash_script
|
||||
|
||||
@@ -740,7 +839,8 @@ class K8sIntegration(Worker):
|
||||
task_id,
|
||||
namespace,
|
||||
template,
|
||||
pod_number=None
|
||||
pod_number=None,
|
||||
task_token=None,
|
||||
):
|
||||
if "apiVersion" not in template:
|
||||
template["apiVersion"] = "batch/v1" if self.using_jobs else "v1"
|
||||
@@ -771,7 +871,7 @@ class K8sIntegration(Worker):
|
||||
spec.setdefault('backoffLimit', 0)
|
||||
spec_template = spec.setdefault('template', {})
|
||||
if labels:
|
||||
# Place same labels fro any pod spawned by the job
|
||||
# Place same labels for any pod spawned by the job
|
||||
place_labels(spec_template.setdefault('metadata', {}))
|
||||
|
||||
spec = spec_template.setdefault('spec', {})
|
||||
@@ -788,7 +888,8 @@ class K8sIntegration(Worker):
|
||||
docker_args=docker_args,
|
||||
docker_bash=docker_bash,
|
||||
clearml_conf_create_script=clearml_conf_create_script,
|
||||
task_worker_id=task_worker_id
|
||||
task_worker_id=task_worker_id,
|
||||
task_token=task_token,
|
||||
)
|
||||
|
||||
if containers:
|
||||
@@ -935,7 +1036,7 @@ class K8sIntegration(Worker):
|
||||
result = self._session.get(
|
||||
service='tasks',
|
||||
action='get_all',
|
||||
json={"id": task_ids, "status": ["in_progress", "queued"], "only_fields": ["id", "status"]},
|
||||
json={"id": task_ids, "status": ["in_progress", "queued"], "only_fields": ["id", "status", "status_reason"]},
|
||||
method=Request.def_method,
|
||||
)
|
||||
tasks_to_abort = result["tasks"]
|
||||
@@ -945,9 +1046,13 @@ class K8sIntegration(Worker):
|
||||
for task in tasks_to_abort:
|
||||
task_id = task.get("id")
|
||||
status = task.get("status")
|
||||
status_reason = (task.get("status_reason") or "").lower()
|
||||
if not task_id or not status:
|
||||
self.log.warning('Failed getting task information: id={}, status={}'.format(task_id, status))
|
||||
continue
|
||||
if status == "queued" and "pushed back by policy manager" in status_reason:
|
||||
# Task was pushed back to policy queue by policy manager, don't touch it
|
||||
continue
|
||||
try:
|
||||
if status == "queued":
|
||||
self._session.get(
|
||||
@@ -981,6 +1086,9 @@ class K8sIntegration(Worker):
|
||||
|
||||
return deleted_pods
|
||||
|
||||
def check_if_suspended(self) -> bool:
|
||||
pass
|
||||
|
||||
def run_tasks_loop(self, queues: List[Text], worker_params, **kwargs):
|
||||
"""
|
||||
:summary: Pull and run tasks from queues.
|
||||
@@ -992,6 +1100,8 @@ class K8sIntegration(Worker):
|
||||
:param worker_params: Worker command line arguments
|
||||
:type worker_params: ``clearml_agent.helper.process.WorkerParams``
|
||||
"""
|
||||
# print("debug> running tasks loop")
|
||||
|
||||
events_service = self.get_service(Events)
|
||||
|
||||
# make sure we have a k8s pending queue
|
||||
@@ -1023,12 +1133,19 @@ class K8sIntegration(Worker):
|
||||
continue
|
||||
|
||||
# iterate over queues (priority style, queues[0] is highest)
|
||||
# print("debug> iterating over queues")
|
||||
for queue in queues:
|
||||
# delete old completed / failed pods
|
||||
self._cleanup_old_pods(namespaces, extra_msg="Cleanup cycle {cmd}")
|
||||
|
||||
if self.check_if_suspended():
|
||||
print("Agent is suspended, sleeping for {:.1f} seconds".format(self._polling_interval))
|
||||
sleep(self._polling_interval)
|
||||
break
|
||||
|
||||
# get next task in queue
|
||||
try:
|
||||
# print(f"debug> getting tasks for queue {queue}")
|
||||
response = self._get_next_task(queue=queue, get_task_info=self._impersonate_as_task_owner)
|
||||
except Exception as e:
|
||||
print("Warning: Could not access task queue [{}], error: {}".format(queue, e))
|
||||
|
||||
@@ -9,6 +9,7 @@ from clearml_agent.helper.process import stringify_bash_output
|
||||
from .daemon import K8sDaemon
|
||||
from .utilities import get_path
|
||||
from .errors import GetPodsError
|
||||
from .definitions import ENV_POD_MONITOR_DISABLE_ENQUEUE_ON_PREEMPTION
|
||||
|
||||
|
||||
class PendingPodsDaemon(K8sDaemon):
|
||||
@@ -17,16 +18,16 @@ class PendingPodsDaemon(K8sDaemon):
|
||||
self._polling_interval = polling_interval
|
||||
self._last_tasks_msgs = {} # last msg updated for every task
|
||||
|
||||
def get_pods(self, pod_name=None):
|
||||
def get_pods(self, pod_name=None, debug_msg="Detecting pending pods: {cmd}"):
|
||||
filters = ["status.phase=Pending"]
|
||||
if pod_name:
|
||||
filters.append(f"metadata.name={pod_name}")
|
||||
|
||||
if self._agent.using_jobs:
|
||||
return self._agent.get_pods_for_jobs(
|
||||
job_condition="status.active=1", pod_filters=filters, debug_msg="Detecting pending pods: {cmd}"
|
||||
job_condition="status.active=1", pod_filters=filters, debug_msg=debug_msg
|
||||
)
|
||||
return self._agent.get_pods(filters=filters, debug_msg="Detecting pending pods: {cmd}")
|
||||
return self._agent.get_pods(filters=filters, debug_msg=debug_msg)
|
||||
|
||||
def _get_pod_name(self, pod: dict):
|
||||
return get_path(pod, "metadata", "name")
|
||||
@@ -72,6 +73,11 @@ class PendingPodsDaemon(K8sDaemon):
|
||||
if not namespace:
|
||||
continue
|
||||
|
||||
updated_pod = self.get_pods(pod_name=pod_name, debug_msg="Refreshing pod information: {cmd}")
|
||||
if not updated_pod:
|
||||
continue
|
||||
pod = updated_pod[0]
|
||||
|
||||
task_id_to_pod[task_id] = pod
|
||||
|
||||
msg = None
|
||||
@@ -190,32 +196,39 @@ class PendingPodsDaemon(K8sDaemon):
|
||||
if not msg or self._last_tasks_msgs.get(task_id, None) == (msg, tags):
|
||||
return
|
||||
try:
|
||||
# Make sure the task is queued
|
||||
result = self._session.send_request(
|
||||
service='tasks',
|
||||
action='get_all',
|
||||
json={"id": task_id, "only_fields": ["status"]},
|
||||
method=Request.def_method,
|
||||
async_enable=False,
|
||||
)
|
||||
if result.ok:
|
||||
status = get_path(result.json(), 'data', 'tasks', 0, 'status')
|
||||
# if task is in progress, change its status to enqueued
|
||||
if status == "in_progress":
|
||||
result = self._session.send_request(
|
||||
service='tasks', action='enqueue',
|
||||
json={
|
||||
"task": task_id, "force": True, "queue": self._agent.k8s_pending_queue_id
|
||||
},
|
||||
method=Request.def_method,
|
||||
async_enable=False,
|
||||
)
|
||||
if not result.ok:
|
||||
result_msg = get_path(result.json(), 'meta', 'result_msg')
|
||||
self.log.debug(
|
||||
"K8S Glue pods monitor: failed forcing task status change"
|
||||
" for pending task {}: {}".format(task_id, result_msg)
|
||||
if ENV_POD_MONITOR_DISABLE_ENQUEUE_ON_PREEMPTION.get():
|
||||
# This disables the option to enqueue the task which is supposed to sync the ClearML task status
|
||||
# in case the pod was preempted. In some cases this does not happen due to preemption but due to
|
||||
# cluster communication lag issues that cause us not to discover the pod is no longer pending and
|
||||
# enqueue the task when it's actually already running, thus essentially killing the task
|
||||
pass
|
||||
else:
|
||||
# Make sure the task is queued
|
||||
result = self._session.send_request(
|
||||
service='tasks',
|
||||
action='get_all',
|
||||
json={"id": task_id, "only_fields": ["status"]},
|
||||
method=Request.def_method,
|
||||
async_enable=False,
|
||||
)
|
||||
if result.ok:
|
||||
status = get_path(result.json(), 'data', 'tasks', 0, 'status')
|
||||
# if task is in progress, change its status to enqueued
|
||||
if status == "in_progress":
|
||||
result = self._session.send_request(
|
||||
service='tasks', action='enqueue',
|
||||
json={
|
||||
"task": task_id, "force": True, "queue": self._agent.k8s_pending_queue_id
|
||||
},
|
||||
method=Request.def_method,
|
||||
async_enable=False,
|
||||
)
|
||||
if not result.ok:
|
||||
result_msg = get_path(result.json(), 'meta', 'result_msg')
|
||||
self.log.debug(
|
||||
"K8S Glue pods monitor: failed forcing task status change"
|
||||
" for pending task {}: {}".format(task_id, result_msg)
|
||||
)
|
||||
|
||||
# Update task status message
|
||||
payload = {"task": task_id, "status_message": "K8S glue status: {}".format(msg)}
|
||||
|
||||
@@ -543,6 +543,36 @@ def convert_cuda_version_to_int_10_base_str(cuda_version):
|
||||
return str(int(float(cuda_version)*10))
|
||||
|
||||
|
||||
def get_python_version(python_executable, log=None):
|
||||
from clearml_agent.helper.process import Argv
|
||||
try:
|
||||
output = Argv(python_executable, "--version").get_output(
|
||||
stderr=subprocess.STDOUT
|
||||
)
|
||||
except subprocess.CalledProcessError as ex:
|
||||
# Windows returns 9009 code and suggests to install Python from Windows Store
|
||||
if is_windows_platform() and ex.returncode == 9009:
|
||||
if log:
|
||||
log.debug("version not found: {}".format(ex))
|
||||
else:
|
||||
if log:
|
||||
log.warning("error getting %s version: %s", python_executable, ex)
|
||||
return None
|
||||
except FileNotFoundError as ex:
|
||||
if log:
|
||||
log.debug("version not found: {}".format(ex))
|
||||
return None
|
||||
|
||||
match = re.search(r"Python ({}(?:\.\d+)*)".format(r"\d+"), output)
|
||||
if match:
|
||||
if log:
|
||||
log.debug("Found: {}".format(python_executable))
|
||||
# only return major.minor version
|
||||
return ".".join(str(match.group(1)).split(".")[:2])
|
||||
|
||||
return None
|
||||
|
||||
|
||||
class NonStrictAttrs(object):
|
||||
|
||||
@classmethod
|
||||
|
||||
@@ -69,7 +69,7 @@ def or_(*converters, **kwargs):
|
||||
return wrapper
|
||||
|
||||
|
||||
def strtobool (val):
|
||||
def strtobool(val):
|
||||
"""Convert a string representation of truth to true (1) or false (0).
|
||||
|
||||
True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values
|
||||
|
||||
@@ -29,9 +29,12 @@ class PackageManager(object):
|
||||
_config_cache_max_entries = 'agent.venvs_cache.max_entries'
|
||||
_config_cache_free_space_threshold = 'agent.venvs_cache.free_space_threshold_gb'
|
||||
_config_cache_lock_timeout = 'agent.venvs_cache.lock_timeout'
|
||||
_config_pip_legacy_resolver = 'agent.package_manager.pip_legacy_resolver'
|
||||
|
||||
def __init__(self):
|
||||
self._cache_manager = None
|
||||
self._existing_packages = []
|
||||
self._base_install_flags = []
|
||||
|
||||
@abc.abstractproperty
|
||||
def bin(self):
|
||||
@@ -79,6 +82,23 @@ class PackageManager(object):
|
||||
# type: (Iterable[Text]) -> None
|
||||
pass
|
||||
|
||||
def add_extra_install_flags(self, extra_flags): # type: (List[str]) -> None
|
||||
if extra_flags:
|
||||
extra_flags = [
|
||||
e for e in extra_flags if e not in list(self._base_install_flags)
|
||||
]
|
||||
self._base_install_flags = list(self._base_install_flags) + list(extra_flags)
|
||||
|
||||
def remove_extra_install_flags(self, extra_flags): # type: (List[str]) -> bool
|
||||
if extra_flags:
|
||||
_base_install_flags = [
|
||||
e for e in self._base_install_flags if e not in list(extra_flags)
|
||||
]
|
||||
if self._base_install_flags != _base_install_flags:
|
||||
self._base_install_flags = _base_install_flags
|
||||
return True
|
||||
return False
|
||||
|
||||
def upgrade_pip(self):
|
||||
result = self._install(
|
||||
*select_for_platform(
|
||||
@@ -87,19 +107,58 @@ class PackageManager(object):
|
||||
),
|
||||
"--upgrade"
|
||||
)
|
||||
packages = self.run_with_env(('list',), output=True).splitlines()
|
||||
# p.split is ('pip', 'x.y.z')
|
||||
pip = [p.split() for p in packages if len(p.split()) == 2 and p.split()[0] == 'pip']
|
||||
if pip:
|
||||
# noinspection PyBroadException
|
||||
|
||||
packages = (self.freeze(freeze_full_environment=True) or dict()).get("pip")
|
||||
if packages:
|
||||
from clearml_agent.helper.package.requirements import RequirementsManager
|
||||
from .requirements import MarkerRequirement, SimpleVersion
|
||||
|
||||
# store existing packages so that we can check if we can skip preinstalled packages
|
||||
# we will only check "@ file" "@ vcs" for exact match
|
||||
self._existing_packages = RequirementsManager.parse_requirements_section_to_marker_requirements(
|
||||
packages, skip_local_file_validation=True)
|
||||
|
||||
try:
|
||||
from .requirements import MarkerRequirement
|
||||
pip = pip[0][1].split('.')
|
||||
MarkerRequirement.pip_new_version = bool(int(pip[0]) >= 20)
|
||||
except Exception:
|
||||
pass
|
||||
pip_pkg = next(p for p in self._existing_packages if p.name == "pip")
|
||||
except StopIteration:
|
||||
pip_pkg = None
|
||||
|
||||
# check if we need to list the pip version as well
|
||||
if pip_pkg:
|
||||
MarkerRequirement.pip_new_version = SimpleVersion.compare_versions(pip_pkg.version, ">=", "20")
|
||||
|
||||
# add --use-deprecated=legacy-resolver to pip install to avoid mismatched packages issues
|
||||
self._add_legacy_resolver_flag(pip_pkg.version)
|
||||
|
||||
return result
|
||||
|
||||
def _add_legacy_resolver_flag(self, pip_pkg_version):
|
||||
if not self.session.config.get(self._config_pip_legacy_resolver, None):
|
||||
return
|
||||
|
||||
from .requirements import SimpleVersion
|
||||
|
||||
match_versions = self.session.config.get(self._config_pip_legacy_resolver)
|
||||
matched = False
|
||||
for rule in match_versions:
|
||||
matched = False
|
||||
# make sure we match all the parts of the rule
|
||||
for a_version in rule.split(","):
|
||||
o, v = SimpleVersion.split_op_version(a_version.strip())
|
||||
matched = SimpleVersion.compare_versions(pip_pkg_version, o, v)
|
||||
if not matched:
|
||||
break
|
||||
# if the rule is fully matched we have a match
|
||||
if matched:
|
||||
break
|
||||
|
||||
legacy_resolver_flags = ["--use-deprecated=legacy-resolver"]
|
||||
if matched:
|
||||
print("INFO: Using legacy resolver for PIP to avoid inconsistency with package versions!")
|
||||
self.add_extra_install_flags(legacy_resolver_flags)
|
||||
elif self.remove_extra_install_flags(legacy_resolver_flags):
|
||||
print("INFO: removing pip legacy resolver!")
|
||||
|
||||
def get_python_command(self, extra=()):
|
||||
# type: (...) -> Executable
|
||||
return Argv(self.bin, *extra)
|
||||
@@ -149,6 +208,18 @@ class PackageManager(object):
|
||||
return False
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
try:
|
||||
from .requirements import Requirement, MarkerRequirement
|
||||
req = MarkerRequirement(Requirement.parse(package_name))
|
||||
|
||||
# if pip was part of the requirements, make sure we update the flags
|
||||
# add --use-deprecated=legacy-resolver to pip install to avoid mismatched packages issues
|
||||
if req.name == "pip" and req.version:
|
||||
PackageManager._selected_manager._add_legacy_resolver_flag(req.version)
|
||||
except Exception as e:
|
||||
print("WARNING: Error while parsing pip version legacy [{}]".format(e))
|
||||
|
||||
return True
|
||||
|
||||
@classmethod
|
||||
@@ -219,6 +290,8 @@ class PackageManager(object):
|
||||
if not self._get_cache_manager():
|
||||
return
|
||||
|
||||
print('Adding venv into cache: {}'.format(source_folder))
|
||||
|
||||
try:
|
||||
keys = self._generate_reqs_hash_keys(requirements, docker_cmd, python_version, cuda_version)
|
||||
return self._get_cache_manager().add_entry(
|
||||
|
||||
@@ -208,7 +208,6 @@ class CondaAPI(PackageManager):
|
||||
except Exception as ex:
|
||||
print("WARNING: Failed using base conda environment, reverting to new environment: {}".format(ex))
|
||||
|
||||
|
||||
command = Argv(
|
||||
self.conda,
|
||||
"create",
|
||||
@@ -273,7 +272,7 @@ class CondaAPI(PackageManager):
|
||||
Conda seems to load "vcruntime140.dll" from all its environment on startup.
|
||||
This means environment have to be deleted using 'conda env remove'.
|
||||
If necessary, conda can be fooled into deleting a partially-deleted environment by creating an empty file
|
||||
in '<ENV>\conda-meta\history' (value found in 'conda.gateways.disk.test.PREFIX_MAGIC_FILE').
|
||||
in '<ENV>\\conda-meta\\history' (value found in 'conda.gateways.disk.test.PREFIX_MAGIC_FILE').
|
||||
Otherwise, it complains that said directory is not a conda environment.
|
||||
|
||||
See: https://github.com/conda/conda/issues/7682
|
||||
@@ -801,6 +800,25 @@ class CondaAPI(PackageManager):
|
||||
return conda_env
|
||||
return base_conda_env
|
||||
|
||||
def add_cached_venv(self, *args, **kwargs):
|
||||
"""
|
||||
Copy the local venv folder into the venv cache (keys are based on the requirements+python+docker).
|
||||
"""
|
||||
# do not cache if this is a base conda environment
|
||||
if self.conda_env_as_base_docker or self.use_conda_base_env:
|
||||
return
|
||||
return super().add_cached_venv(*args, **kwargs)
|
||||
|
||||
def get_cached_venv(self, *args, **kwargs):
|
||||
"""
|
||||
Copy a cached copy of the venv (based on the requirements) into destination_folder.
|
||||
Return None if failed or cached entry does not exist
|
||||
"""
|
||||
# do not cache if this is a base conda environment
|
||||
if self.conda_env_as_base_docker or self.use_conda_base_env:
|
||||
return
|
||||
return super().get_cached_venv(*args, **kwargs)
|
||||
|
||||
|
||||
# enable hashing with cmp=False because pdb fails on un-hashable exceptions
|
||||
exception = attrs(str=True, cmp=False)
|
||||
|
||||
@@ -97,7 +97,7 @@ class SystemPip(PackageManager):
|
||||
return Argv(self.bin, '-m', 'pip', '--disable-pip-version-check', *command)
|
||||
|
||||
def install_flags(self):
|
||||
indices_args = tuple(
|
||||
base_args = tuple(self._base_install_flags or []) + tuple(
|
||||
chain.from_iterable(('--extra-index-url', x) for x in PIP_EXTRA_INDICES)
|
||||
)
|
||||
|
||||
@@ -105,7 +105,7 @@ class SystemPip(PackageManager):
|
||||
ENV_PIP_EXTRA_INSTALL_FLAGS.get() or \
|
||||
self.session.config.get("agent.package_manager.extra_pip_install_flags", None)
|
||||
|
||||
return (indices_args + tuple(extra_pip_flags)) if extra_pip_flags else indices_args
|
||||
return (base_args + tuple(extra_pip_flags)) if extra_pip_flags else base_args
|
||||
|
||||
def download_flags(self):
|
||||
indices_args = tuple(
|
||||
|
||||
@@ -37,7 +37,9 @@ class VirtualenvPip(SystemPip, PackageManager):
|
||||
|
||||
def load_requirements(self, requirements):
|
||||
if isinstance(requirements, dict) and requirements.get("pip"):
|
||||
requirements["pip"] = self.requirements_manager.replace(requirements["pip"])
|
||||
requirements["pip"] = self.requirements_manager.replace(
|
||||
requirements["pip"], existing_packages=self._existing_packages
|
||||
)
|
||||
super(VirtualenvPip, self).load_requirements(requirements)
|
||||
self.requirements_manager.post_install(self.session, package_manager=self)
|
||||
|
||||
@@ -64,9 +66,18 @@ class VirtualenvPip(SystemPip, PackageManager):
|
||||
Only valid if instantiated with path.
|
||||
Use self.python as self.bin does not exist.
|
||||
"""
|
||||
self.session.command(
|
||||
self.python, "-m", "virtualenv", self.path, *self.create_flags()
|
||||
).check_call()
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
self.session.command(
|
||||
self.python, "-m", "virtualenv", self.path, *self.create_flags()
|
||||
).check_call()
|
||||
except Exception as ex:
|
||||
# let's try with std library instead
|
||||
print("WARNING: virtualenv call failed: {}\n INFO: Creating virtual environment with venv".format(ex))
|
||||
self.session.command(
|
||||
self.python, "-m", "venv", self.path, *self.create_flags()
|
||||
).check_call()
|
||||
|
||||
return self
|
||||
|
||||
def remove(self):
|
||||
|
||||
@@ -53,12 +53,18 @@ class PriorityPackageRequirement(SimpleSubstitution):
|
||||
if not self._replaced_packages:
|
||||
return list_of_requirements
|
||||
|
||||
# we assume that both pip & setup tools are not in list_of_requirements, and we need to add them
|
||||
|
||||
if "pip" in self._replaced_packages:
|
||||
full_freeze = PackageManager.out_of_scope_freeze(freeze_full_environment=True)
|
||||
# now let's look for pip
|
||||
pips = [line for line in full_freeze.get("pip", []) if line.split("==")[0] == "pip"]
|
||||
if pips and "pip" in list_of_requirements:
|
||||
list_of_requirements["pip"] = [pips[0]] + list_of_requirements["pip"]
|
||||
if not full_freeze:
|
||||
if "pip" in list_of_requirements:
|
||||
list_of_requirements["pip"] = [self._replaced_packages["pip"]] + list_of_requirements["pip"]
|
||||
else:
|
||||
# now let's look for pip
|
||||
pips = [line for line in full_freeze.get("pip", []) if str(line.split("==")[0]).strip() == "pip"]
|
||||
if pips and "pip" in list_of_requirements:
|
||||
list_of_requirements["pip"] = [pips[0]] + list_of_requirements["pip"]
|
||||
|
||||
if "setuptools" in self._replaced_packages:
|
||||
try:
|
||||
@@ -87,6 +93,20 @@ class PriorityPackageRequirement(SimpleSubstitution):
|
||||
return list_of_requirements
|
||||
|
||||
|
||||
class CachedPackageRequirement(PriorityPackageRequirement):
|
||||
|
||||
name = ("setuptools", "pip", )
|
||||
optional_package_names = tuple()
|
||||
|
||||
def replace(self, req):
|
||||
"""
|
||||
Put the requirement in the list for later conversion
|
||||
:raises: ValueError if version is pre-release
|
||||
"""
|
||||
self._replaced_packages[req.name] = req.line
|
||||
return Text(req)
|
||||
|
||||
|
||||
class PackageCollectorRequirement(SimpleSubstitution):
|
||||
"""
|
||||
This RequirementSubstitution class will allow you to have multiple instances of the same
|
||||
|
||||
@@ -19,7 +19,7 @@ import logging
|
||||
from clearml_agent.definitions import PIP_EXTRA_INDICES
|
||||
from clearml_agent.helper.base import (
|
||||
warning, is_conda, which, join_lines, is_windows_platform,
|
||||
convert_cuda_version_to_int_10_base_str, )
|
||||
convert_cuda_version_to_int_10_base_str, dump_yaml, )
|
||||
from clearml_agent.helper.process import Argv, PathLike
|
||||
from clearml_agent.helper.gpu.gpustat import get_driver_cuda_version
|
||||
from clearml_agent.session import Session, normalize_cuda_version
|
||||
@@ -94,6 +94,12 @@ class MarkerRequirement(object):
|
||||
def __repr__(self):
|
||||
return '{self.__class__.__name__}[{self}]'.format(self=self)
|
||||
|
||||
def __eq__(self, other):
|
||||
return isinstance(other, MarkerRequirement) and str(self) == str(other)
|
||||
|
||||
def __hash__(self):
|
||||
return str(self).__hash__()
|
||||
|
||||
def format_specs(self, num_parts=None, max_num_parts=None):
|
||||
max_num_parts = max_num_parts or num_parts
|
||||
if max_num_parts is None or not self.specs:
|
||||
@@ -116,6 +122,10 @@ class MarkerRequirement(object):
|
||||
def specs(self): # type: () -> List[Tuple[Text, Text]]
|
||||
return self.req.specs
|
||||
|
||||
@property
|
||||
def version(self): # type: () -> Text
|
||||
return self.specs[0][1] if self.specs else ""
|
||||
|
||||
@specs.setter
|
||||
def specs(self, value): # type: (List[Tuple[Text, Text]]) -> None
|
||||
self.req.specs = value
|
||||
@@ -143,6 +153,8 @@ class MarkerRequirement(object):
|
||||
If the requested version is 1.2 the self.spec should be 1.2*
|
||||
etc.
|
||||
|
||||
usage: it returns the value of the following comparison: requested_version "op" self.version
|
||||
|
||||
:param str requested_version:
|
||||
:param str op: '==', '>', '>=', '<=', '<', '~='
|
||||
:param int num_parts: number of parts to compare
|
||||
@@ -152,7 +164,7 @@ class MarkerRequirement(object):
|
||||
if not self.specs:
|
||||
return True
|
||||
|
||||
version = self.specs[0][1]
|
||||
version = self.version
|
||||
op = (op or self.specs[0][0]).strip()
|
||||
|
||||
return SimpleVersion.compare_versions(
|
||||
@@ -170,11 +182,21 @@ class MarkerRequirement(object):
|
||||
self.req.local_file = False
|
||||
return True
|
||||
|
||||
def validate_local_file_ref(self):
|
||||
def is_local_package_ref(self):
|
||||
# if local file does not exist, remove the reference to it
|
||||
if self.vcs or self.editable or self.path or not self.local_file or not self.name or \
|
||||
not self.uri or not self.uri.startswith("file://"):
|
||||
return False
|
||||
return True
|
||||
|
||||
def is_vcs_ref(self):
|
||||
return bool(self.vcs)
|
||||
|
||||
def validate_local_file_ref(self):
|
||||
# if local file does not exist, remove the reference to it
|
||||
if not self.is_local_package_ref():
|
||||
return
|
||||
|
||||
local_path = Path(self.uri[len("file://"):])
|
||||
if not local_path.exists():
|
||||
local_path = Path(unquote(self.uri)[len("file://"):])
|
||||
@@ -221,6 +243,19 @@ class SimpleVersion:
|
||||
_local_version_separators = re.compile(r"[\._-]")
|
||||
_regex = re.compile(r"^\s*" + VERSION_PATTERN + r"\s*$", re.VERBOSE | re.IGNORECASE)
|
||||
|
||||
@classmethod
|
||||
def split_op_version(cls, line):
|
||||
"""
|
||||
Split a string in the form of ">=1.2.3" into a (op, version), i.e. (">=", "1.2.3")
|
||||
Notice is calling with only a version string (e.g. "1.2.3") default operator is "=="
|
||||
which means you get ("==", "1.2.3")
|
||||
:param line: string examples: "<=0.1.2"
|
||||
:return: tuple of (op, version) example ("<=", "0.1.2")
|
||||
"""
|
||||
match = r"\s*([>=<~!]*)\s*(\S*)\s*"
|
||||
groups = re.match(match, line).groups()
|
||||
return groups[0] or "==", groups[1]
|
||||
|
||||
@classmethod
|
||||
def compare_versions(cls, version_a, op, version_b, ignore_sub_versions=True, num_parts=3):
|
||||
"""
|
||||
@@ -624,14 +659,54 @@ class RequirementsManager(object):
|
||||
return handler.replace(req)
|
||||
return None
|
||||
|
||||
def replace(self, requirements): # type: (Text) -> Text
|
||||
def replace(
|
||||
self,
|
||||
requirements, # type: Text
|
||||
existing_packages=None, # type: List[MarkerRequirement]
|
||||
pkg_skip_existing_local=True, # type: bool
|
||||
pkg_skip_existing_vcs=True, # type: bool
|
||||
pkg_skip_existing=True, # type: bool
|
||||
): # type: (...) -> Text
|
||||
parsed_requirements = self.parse_requirements_section_to_marker_requirements(
|
||||
requirements=requirements, cwd=self._cwd)
|
||||
requirements=requirements, cwd=self._cwd, skip_local_file_validation=True)
|
||||
|
||||
if parsed_requirements and existing_packages:
|
||||
skipped_packages = None
|
||||
if pkg_skip_existing:
|
||||
skipped_packages = set(parsed_requirements) & set(existing_packages)
|
||||
elif pkg_skip_existing_local or pkg_skip_existing_vcs:
|
||||
existing_packages = [
|
||||
p for p in existing_packages if (
|
||||
(pkg_skip_existing_local and p.is_local_package_ref()) or
|
||||
(pkg_skip_existing_vcs and p.is_vcs_ref())
|
||||
)
|
||||
]
|
||||
skipped_packages = set(parsed_requirements) & set(existing_packages)
|
||||
|
||||
if skipped_packages:
|
||||
# maintain order
|
||||
num_skipped_packages = len(parsed_requirements)
|
||||
parsed_requirements = [p for p in parsed_requirements if p not in skipped_packages]
|
||||
num_skipped_packages -= len(parsed_requirements)
|
||||
print("Skipping {} pre-installed packages:\n{}Remaining {} additional packages to install".format(
|
||||
num_skipped_packages,
|
||||
dump_yaml(sorted([str(p) for p in skipped_packages])),
|
||||
len(parsed_requirements)
|
||||
))
|
||||
|
||||
# nothing to install!
|
||||
if not parsed_requirements:
|
||||
return ""
|
||||
|
||||
# sanity check
|
||||
if not parsed_requirements:
|
||||
# return the original requirements just in case
|
||||
return requirements
|
||||
|
||||
# remove local file reference that do not exist
|
||||
for p in parsed_requirements:
|
||||
p.validate_local_file_ref()
|
||||
|
||||
def replace_one(i, req):
|
||||
# type: (int, MarkerRequirement) -> Optional[Text]
|
||||
try:
|
||||
@@ -805,7 +880,7 @@ class RequirementsManager(object):
|
||||
normalize_cuda_version(cudnn_version or 0))
|
||||
|
||||
@staticmethod
|
||||
def parse_requirements_section_to_marker_requirements(requirements, cwd=None):
|
||||
def parse_requirements_section_to_marker_requirements(requirements, cwd=None, skip_local_file_validation=False):
|
||||
def safe_parse(req_str):
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
@@ -815,7 +890,8 @@ class RequirementsManager(object):
|
||||
|
||||
def create_req(x):
|
||||
r = MarkerRequirement(x)
|
||||
r.validate_local_file_ref()
|
||||
if not skip_local_file_validation:
|
||||
r.validate_local_file_ref()
|
||||
return r
|
||||
|
||||
if not requirements:
|
||||
|
||||
@@ -322,6 +322,8 @@ class VCS(object):
|
||||
return
|
||||
|
||||
# rewrite ssh URLs only if either ssh port or ssh user are forced in config
|
||||
# TODO: fix, when url is in the form of `git@domain.com:user/project.git` we will fail to get scheme
|
||||
# need to add ssh:// and replace first ":" with / , unless port is specified
|
||||
if parsed_url.scheme == "ssh" and (
|
||||
self.session.config.get('agent.force_git_ssh_port', None) or
|
||||
self.session.config.get('agent.force_git_ssh_user', None)
|
||||
@@ -595,7 +597,8 @@ class Git(VCS):
|
||||
)
|
||||
|
||||
def pull(self):
|
||||
self.call("fetch", "--all", "--recurse-submodules", cwd=self.location)
|
||||
self._set_ssh_url()
|
||||
self.call("fetch", "--all", "--tags", "--recurse-submodules", cwd=self.location)
|
||||
|
||||
def _git_pass_auth_wrapper(self, func, *args, **kwargs):
|
||||
try:
|
||||
@@ -777,7 +780,22 @@ def clone_repository_cached(session, execution, destination):
|
||||
# We clone the entire repository, not a specific branch
|
||||
vcs.clone() # branch=execution.branch)
|
||||
|
||||
vcs.pull()
|
||||
print("pulling git")
|
||||
try:
|
||||
vcs.pull()
|
||||
except Exception as ex:
|
||||
print("git pull failed: {}".format(ex))
|
||||
if (
|
||||
session.config.get("agent.vcs_cache.enabled", False) and
|
||||
session.config.get("agent.vcs_cache.clone_on_pull_fail", False)
|
||||
):
|
||||
print("pulling git failed, re-cloning: {}".format(no_password_url))
|
||||
rm_tree(cached_repo_path)
|
||||
vcs.clone()
|
||||
else:
|
||||
raise ex
|
||||
print("pulling git completed")
|
||||
|
||||
rm_tree(destination)
|
||||
shutil.copytree(Text(cached_repo_path), Text(clone_folder),
|
||||
symlinks=select_for_platform(linux=True, windows=False),
|
||||
@@ -918,7 +936,7 @@ def _locate_future_import(lines):
|
||||
|
||||
|
||||
def patch_add_task_init_call(local_filename):
|
||||
if not local_filename or not Path(local_filename).is_file():
|
||||
if not local_filename or not Path(local_filename).is_file() or not str(local_filename).lower().endswith(".py"):
|
||||
return
|
||||
|
||||
idx_a = 0
|
||||
|
||||
@@ -401,6 +401,7 @@ class ResourceMonitor(object):
|
||||
fractions = self._fractions_handler.fractions
|
||||
stats["gpu_fraction_{}".format(report_index)] = \
|
||||
(fractions[i] if i < len(fractions) else fractions[-1]) if fractions else 1.0
|
||||
report_index += 1
|
||||
|
||||
except Exception as ex:
|
||||
# something happened and we can't use gpu stats,
|
||||
@@ -438,6 +439,7 @@ class ResourceMonitor(object):
|
||||
class GpuFractionsHandler:
|
||||
_number_re = re.compile(r"^clear\.ml/fraction(-\d+)?$")
|
||||
_mig_re = re.compile(r"^nvidia\.com/mig-(?P<compute>[0-9]+)g\.(?P<memory>[0-9]+)gb$")
|
||||
_frac_gpu_injector_re = re.compile(r"^clearml-injector/fraction$")
|
||||
|
||||
_gpu_name_to_memory_gb = {
|
||||
"A30": 24,
|
||||
@@ -514,10 +516,14 @@ class GpuFractionsHandler:
|
||||
return 0
|
||||
|
||||
@classmethod
|
||||
def encode_fractions(cls, limits: dict) -> str:
|
||||
if any(cls._number_re.match(x) for x in (limits or {})):
|
||||
return ",".join(str(v) for k, v in sorted(limits.items()) if cls._number_re.match(k))
|
||||
return ",".join(("{}:{}".format(k, v) for k, v in (limits or {}).items() if cls._mig_re.match(k)))
|
||||
def encode_fractions(cls, limits: dict, annotations: dict) -> str:
|
||||
if limits:
|
||||
if any(cls._number_re.match(x) for x in (limits or {})):
|
||||
return ",".join(str(v) for k, v in sorted(limits.items()) if cls._number_re.match(k))
|
||||
return ",".join(("{}:{}".format(k, v) for k, v in (limits or {}).items() if cls._mig_re.match(k)))
|
||||
elif annotations:
|
||||
if any(cls._frac_gpu_injector_re.match(x) for x in (annotations or {})):
|
||||
return ",".join(str(v) for k, v in sorted(annotations.items()) if cls._frac_gpu_injector_re.match(k))
|
||||
|
||||
@staticmethod
|
||||
def decode_fractions(fractions: str) -> Union[List[float], Dict[str, int]]:
|
||||
|
||||
@@ -44,6 +44,11 @@ WORKER_ARGS = {
|
||||
}
|
||||
|
||||
DAEMON_ARGS = dict({
|
||||
'--polling-interval': {
|
||||
'help': 'Polling interval in seconds. Minimum is 5 (default 5)',
|
||||
'type': int,
|
||||
'default': 5,
|
||||
},
|
||||
'--foreground': {
|
||||
'help': 'Pipe full log to stdout/stderr, should not be used if running in background',
|
||||
'action': 'store_true',
|
||||
@@ -62,7 +67,10 @@ DAEMON_ARGS = dict({
|
||||
'group': 'Docker support',
|
||||
},
|
||||
'--queue': {
|
||||
'help': 'Queue ID(s)/Name(s) to pull tasks from (\'default\' queue)',
|
||||
'help': 'Queue ID(s)/Name(s) to pull tasks from (\'default\' queue).'
|
||||
' Note that the queue list order determines priority, with the first listed queue having the'
|
||||
' highest priority. To change this behavior, use --order-fairness to pull from each queue in a'
|
||||
' round-robin order',
|
||||
'nargs': '+',
|
||||
'default': tuple(),
|
||||
'dest': 'queues',
|
||||
@@ -107,8 +115,11 @@ DAEMON_ARGS = dict({
|
||||
'--dynamic-gpus': {
|
||||
'help': 'Allow to dynamically allocate gpus based on queue properties, '
|
||||
'configure with \'--queue <queue_name>=<num_gpus>\'.'
|
||||
' Example: \'--dynamic-gpus --gpus 0-3 --queue dual_gpus=2 single_gpu=1\''
|
||||
' Example Opportunistic: \'--dynamic-gpus --gpus 0-3 --queue dual_gpus=2 max_quad_gpus=1-4 \'',
|
||||
' Example: \'--dynamic-gpus --gpus 0-3 --queue dual_gpus=2 single_gpu=1\'.'
|
||||
' Example Opportunistic: \'--dynamic-gpus --gpus 0-3 --queue dual_gpus=2 max_quad_gpus=1-4\'.'
|
||||
' Note that the queue list order determines priority, with the first listed queue having the'
|
||||
' highest priority. To change this behavior, use --order-fairness to pull from each queue in a'
|
||||
' round-robin order',
|
||||
'action': 'store_true',
|
||||
},
|
||||
'--uptime': {
|
||||
|
||||
@@ -1 +1 @@
|
||||
__version__ = '1.8.0'
|
||||
__version__ = '1.9.2'
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
FROM ubuntu:18.04
|
||||
FROM ubuntu:22.04
|
||||
|
||||
USER root
|
||||
WORKDIR /root
|
||||
|
||||
@@ -4,7 +4,8 @@ curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip
|
||||
unzip awscliv2.zip
|
||||
./aws/install
|
||||
|
||||
curl -o kubectl https://amazon-eks.s3-us-west-2.amazonaws.com/1.21.2/2021-07-05/bin/linux/amd64/kubectl
|
||||
curl -o kubectl https://s3.us-west-2.amazonaws.com/amazon-eks/1.29.3/2024-04-19/bin/linux/amd64/kubectl
|
||||
#curl -o kubectl https://amazon-eks.s3-us-west-2.amazonaws.com/1.21.2/2021-07-05/bin/linux/amd64/kubectl
|
||||
#curl -o kubectl https://amazon-eks.s3.us-west-2.amazonaws.com/1.19.6/2021-01-05/bin/linux/amd64/kubectl
|
||||
chmod +x ./kubectl && mkdir -p $HOME/bin && cp ./kubectl $HOME/bin/kubectl && export PATH=$PATH:$HOME/bin
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
FROM ubuntu:18.04
|
||||
FROM ubuntu:22.04
|
||||
|
||||
USER root
|
||||
WORKDIR /root
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
#!/bin/bash
|
||||
|
||||
curl -LO https://dl.k8s.io/release/v1.21.0/bin/linux/amd64/kubectl
|
||||
curl -LO https://dl.k8s.io/release/v1.29.3/bin/linux/amd64/kubectl
|
||||
|
||||
install -o root -g root -m 0755 kubectl /usr/local/bin/kubectl
|
||||
|
||||
|
||||
@@ -20,7 +20,7 @@ FROM python:${TAG} as target
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
ARG KUBECTL_VERSION=1.24.0
|
||||
ARG KUBECTL_VERSION=1.29.3
|
||||
|
||||
# Not sure about these ENV vars
|
||||
# ENV LC_ALL=en_US.UTF-8
|
||||
|
||||
@@ -2,7 +2,7 @@ ARG TAG=3.7.17-slim-bullseye
|
||||
|
||||
FROM python:${TAG} as target
|
||||
|
||||
ARG KUBECTL_VERSION=1.22.4
|
||||
ARG KUBECTL_VERSION=1.29.3
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
FROM ubuntu:18.04
|
||||
FROM ubuntu:22.04
|
||||
|
||||
USER root
|
||||
WORKDIR /root
|
||||
|
||||
@@ -33,4 +33,10 @@ if [ -z "$CLEARML_AGENT_NO_UPDATE" ]; then
|
||||
fi
|
||||
fi
|
||||
|
||||
clearml-agent daemon $DAEMON_OPTIONS --queue $QUEUES --docker "${CLEARML_AGENT_DEFAULT_BASE_DOCKER:-$TRAINS_AGENT_DEFAULT_BASE_DOCKER}" --cpu-only ${CLEARML_AGENT_EXTRA_ARGS:-$TRAINS_AGENT_EXTRA_ARGS}
|
||||
DOCKER_ARGS="--docker \"${CLEARML_AGENT_DEFAULT_BASE_DOCKER:-$TRAINS_AGENT_DEFAULT_BASE_DOCKER}\""
|
||||
|
||||
if [ -n "$CLEARML_AGENT_NO_DOCKER" ]; then
|
||||
DOCKER_ARGS=""
|
||||
fi
|
||||
|
||||
clearml-agent daemon $DAEMON_OPTIONS --queue $QUEUES $DOCKER_ARGS --cpu-only ${CLEARML_AGENT_EXTRA_ARGS:-$TRAINS_AGENT_EXTRA_ARGS}
|
||||
|
||||
@@ -161,6 +161,9 @@ agent {
|
||||
vcs_cache: {
|
||||
enabled: true,
|
||||
path: ~/.clearml/vcs-cache
|
||||
|
||||
# if git pull failed, always revert to re-cloning the repo, it protects against old user name changes
|
||||
# clone_on_pull_fail: false
|
||||
},
|
||||
|
||||
# DEPRECATED: please use `venvs_cache` and set `venvs_cache.path`
|
||||
@@ -305,6 +308,7 @@ agent {
|
||||
# pip_cache: "/root/.cache/pip"
|
||||
# poetry_cache: "/root/.cache/pypoetry"
|
||||
# vcs_cache: "/root/.clearml/vcs-cache"
|
||||
# venvs_cache: "/root/.clearml/venvs-cache"
|
||||
# venv_build: "~/.clearml/venvs-builds"
|
||||
# pip_download: "/root/.clearml/pip-download-cache"
|
||||
# }
|
||||
|
||||
@@ -13,65 +13,86 @@ def parse_args():
|
||||
group = parser.add_mutually_exclusive_group()
|
||||
|
||||
parser.add_argument(
|
||||
"--queue", type=str, help="Queues to pull tasks from. If multiple queues, use comma separated list, e.g. 'queue1,queue2'",
|
||||
"--queue",
|
||||
type=str,
|
||||
help="Queues to pull tasks from. If multiple queues, use comma separated list, e.g. 'queue1,queue2'",
|
||||
)
|
||||
group.add_argument(
|
||||
"--ports-mode", action='store_true', default=False,
|
||||
"--ports-mode",
|
||||
action="store_true",
|
||||
default=False,
|
||||
help="Ports-Mode will add a label to the pod which can be used as service, in order to expose ports"
|
||||
"Should not be used with max-pods"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--num-of-services", type=int, default=20,
|
||||
help="Specify the number of k8s services to be used. Use only with ports-mode."
|
||||
"--num-of-services",
|
||||
type=int,
|
||||
default=20,
|
||||
help="Specify the number of k8s services to be used. Use only with ports-mode.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--base-port", type=int,
|
||||
"--base-port",
|
||||
type=int,
|
||||
help="Used in conjunction with ports-mode, specifies the base port exposed by the services. "
|
||||
"For pod #X, the port will be <base-port>+X. Note that pod number is calculated based on base-pod-num"
|
||||
"e.g. if base-port=20000 and base-pod-num=3, the port for the first pod will be 20003"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--base-pod-num", type=int, default=1,
|
||||
"--base-pod-num",
|
||||
type=int,
|
||||
default=1,
|
||||
help="Used in conjunction with ports-mode and base-port, specifies the base pod number to be used by the "
|
||||
"service (default: %(default)s)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--gateway-address", type=str, default=None,
|
||||
help="Used in conjunction with ports-mode, specify the external address of the k8s ingress / ELB"
|
||||
"--gateway-address",
|
||||
type=str,
|
||||
default=None,
|
||||
help="Used in conjunction with ports-mode, specify the external address of the k8s ingress / ELB",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--pod-clearml-conf", type=str,
|
||||
help="Configuration file to be used by the pod itself (if not provided, current configuration is used)"
|
||||
"--pod-clearml-conf",
|
||||
type=str,
|
||||
help="Configuration file to be used by the pod itself (if not provided, current configuration is used)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--overrides-yaml", type=str,
|
||||
help="YAML file containing pod overrides to be used when launching a new pod"
|
||||
"--overrides-yaml", type=str, help="YAML file containing pod overrides to be used when launching a new pod"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--template-yaml", type=str,
|
||||
"--template-yaml",
|
||||
type=str,
|
||||
help="YAML file containing pod template. If provided pod will be scheduled with kubectl apply "
|
||||
"and overrides are ignored, otherwise it will be scheduled with kubectl run"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--ssh-server-port", type=int, default=0,
|
||||
help="If non-zero, every pod will also start an SSH server on the selected port (default: zero, not active)"
|
||||
"--ssh-server-port",
|
||||
type=int,
|
||||
default=0,
|
||||
help="If non-zero, every pod will also start an SSH server on the selected port (default: zero, not active)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--namespace", type=str,
|
||||
help="Specify the namespace in which pods will be created (default: %(default)s)", default="clearml"
|
||||
"--namespace",
|
||||
type=str,
|
||||
help="Specify the namespace in which pods will be created (default: %(default)s)",
|
||||
default="clearml",
|
||||
)
|
||||
group.add_argument(
|
||||
"--max-pods", type=int,
|
||||
"--max-pods",
|
||||
type=int,
|
||||
help="Limit the maximum number of pods that this service can run at the same time."
|
||||
"Should not be used with ports-mode"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--use-owner-token", action="store_true", default=False,
|
||||
help="Generate and use task owner token for the execution of each task"
|
||||
"--use-owner-token",
|
||||
action="store_true",
|
||||
default=False,
|
||||
help="Generate and use task owner token for the execution of each task",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--create-queue", action="store_true", default=False,
|
||||
help="Create the queue if it does not exist (default: %(default)s)"
|
||||
"--create-queue",
|
||||
action="store_true",
|
||||
default=False,
|
||||
help="Create the queue if it does not exist (default: %(default)s)",
|
||||
)
|
||||
return parser.parse_args()
|
||||
|
||||
@@ -81,23 +102,32 @@ def main():
|
||||
|
||||
user_props_cb = None
|
||||
if args.ports_mode and args.base_port:
|
||||
|
||||
def k8s_user_props_cb(pod_number=0):
|
||||
user_prop = {"k8s-pod-port": args.base_port + pod_number}
|
||||
if args.gateway_address:
|
||||
user_prop["k8s-gateway-address"] = args.gateway_address
|
||||
return user_prop
|
||||
|
||||
user_props_cb = k8s_user_props_cb
|
||||
|
||||
k8s = K8sIntegration(
|
||||
ports_mode=args.ports_mode, num_of_services=args.num_of_services, base_pod_num=args.base_pod_num,
|
||||
user_props_cb=user_props_cb, overrides_yaml=args.overrides_yaml, clearml_conf_file=args.pod_clearml_conf,
|
||||
template_yaml=args.template_yaml, extra_bash_init_script=K8sIntegration.get_ssh_server_bash(
|
||||
ssh_port_number=args.ssh_server_port) if args.ssh_server_port else None,
|
||||
namespace=args.namespace, max_pods_limit=args.max_pods or None,
|
||||
ports_mode=args.ports_mode,
|
||||
num_of_services=args.num_of_services,
|
||||
base_pod_num=args.base_pod_num,
|
||||
user_props_cb=user_props_cb,
|
||||
overrides_yaml=args.overrides_yaml,
|
||||
clearml_conf_file=args.pod_clearml_conf,
|
||||
template_yaml=args.template_yaml,
|
||||
extra_bash_init_script=K8sIntegration.get_ssh_server_bash(ssh_port_number=args.ssh_server_port)
|
||||
if args.ssh_server_port
|
||||
else None,
|
||||
namespace=args.namespace,
|
||||
max_pods_limit=args.max_pods or None,
|
||||
)
|
||||
args.queue = [q.strip() for q in args.queue.split(",") if q.strip()]
|
||||
queue = [q.strip() for q in args.queue.split(",") if q.strip()] if args.queue else None
|
||||
|
||||
k8s.k8s_daemon(args.queue, use_owner_token=args.use_owner_token, create_queue=args.create_queue)
|
||||
k8s.k8s_daemon(queue, use_owner_token=args.use_owner_token, create_queue=args.create_queue)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
Reference in New Issue
Block a user