mirror of
https://github.com/clearml/clearml-agent
synced 2025-06-26 18:16:15 +00:00
Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
26aa50f1b5 | ||
|
|
8b4f1eefc2 | ||
|
|
97c2e21dcc | ||
|
|
918dd39b87 | ||
|
|
7776e906c4 | ||
|
|
1bf865ec08 | ||
|
|
3f1ce847dc |
@@ -197,7 +197,7 @@ with `--cpu-only`).
|
||||
|
||||
If no flag is set, and `NVIDIA_VISIBLE_DEVICES` variable doesn't exist, all GPU's will be allocated for
|
||||
the `clearml-agent` <br>
|
||||
If `--cpu-only` flag is set, or `NVIDIA_VISIBLE_DEVICES` is an empty string (""), no gpu will be allocated for
|
||||
If `--cpu-only` flag is set, or `NVIDIA_VISIBLE_DEVICES="none"`, no gpu will be allocated for
|
||||
the `clearml-agent`
|
||||
|
||||
Example: spin two agents, one per gpu on the same machine:
|
||||
|
||||
@@ -235,7 +235,8 @@
|
||||
docker_internal_mounts {
|
||||
sdk_cache: "/clearml_agent_cache"
|
||||
apt_cache: "/var/cache/apt/archives"
|
||||
ssh_folder: "/root/.ssh"
|
||||
ssh_folder: "~/.ssh"
|
||||
ssh_ro_folder: "/.ssh"
|
||||
pip_cache: "/root/.cache/pip"
|
||||
poetry_cache: "/root/.cache/pypoetry"
|
||||
vcs_cache: "/root/.clearml/vcs-cache"
|
||||
|
||||
@@ -1379,6 +1379,9 @@ class Worker(ServiceCommandSection):
|
||||
|
||||
self._session.print_configuration()
|
||||
|
||||
def resolve_daemon_queue_names(self, queues, create_if_missing=False):
|
||||
return self._resolve_queue_names(queues=queues, create_if_missing=create_if_missing)
|
||||
|
||||
def daemon(self, queues, log_level, foreground=False, docker=False, detached=False, order_fairness=False, **kwargs):
|
||||
self._apply_extra_configuration()
|
||||
|
||||
@@ -1421,7 +1424,7 @@ class Worker(ServiceCommandSection):
|
||||
|
||||
# if we do not need to create queues, make sure they are valid
|
||||
# match previous behaviour when we validated queue names before everything else
|
||||
queues = self._resolve_queue_names(queues, create_if_missing=kwargs.get('create_queue', False))
|
||||
queues = self.resolve_daemon_queue_names(queues, create_if_missing=kwargs.get('create_queue', False))
|
||||
|
||||
queues_info = [
|
||||
q.to_dict()
|
||||
@@ -3542,6 +3545,7 @@ class Worker(ServiceCommandSection):
|
||||
mounted_vcs_cache = temp_config.get("agent.vcs_cache.path")
|
||||
mounted_venvs_cache = temp_config.get("agent.venvs_cache.path", "")
|
||||
mount_ssh = temp_config.get("agent.docker_internal_mounts.ssh_folder", None)
|
||||
mount_ssh_ro = temp_config.get("agent.docker_internal_mounts.ssh_ro_folder", None)
|
||||
mount_apt_cache = temp_config.get("agent.docker_internal_mounts.apt_cache", None)
|
||||
mount_pip_cache = temp_config.get("agent.docker_internal_mounts.pip_cache", None)
|
||||
mount_poetry_cache = temp_config.get("agent.docker_internal_mounts.poetry_cache", None)
|
||||
@@ -3573,6 +3577,7 @@ class Worker(ServiceCommandSection):
|
||||
preprocess_bash_script=preprocess_bash_script,
|
||||
install_opencv_libs=install_opencv_libs,
|
||||
mount_ssh=mount_ssh,
|
||||
mount_ssh_ro=mount_ssh_ro,
|
||||
mount_apt_cache=mount_apt_cache,
|
||||
mount_pip_cache=mount_pip_cache,
|
||||
mount_poetry_cache=mount_poetry_cache,
|
||||
@@ -3626,7 +3631,7 @@ class Worker(ServiceCommandSection):
|
||||
auth_token=None,
|
||||
worker_tags=None,
|
||||
name=None,
|
||||
mount_ssh=None, mount_apt_cache=None, mount_pip_cache=None, mount_poetry_cache=None,
|
||||
mount_ssh=None, mount_ssh_ro=None, mount_apt_cache=None, mount_pip_cache=None, mount_poetry_cache=None,
|
||||
env_task_id=None,
|
||||
):
|
||||
self.debug("Constructing docker command", context="docker")
|
||||
@@ -3770,7 +3775,7 @@ class Worker(ServiceCommandSection):
|
||||
clearml_agent_wheel = 'clearml-agent{specify_version}'.format(specify_version=specify_version)
|
||||
|
||||
mount_ssh = mount_ssh or '/root/.ssh'
|
||||
mount_ssh_ro = "{}_ro".format(mount_ssh.rstrip("/"))
|
||||
mount_ssh_ro = mount_ssh_ro or "{}_ro".format(mount_ssh.rstrip("/"))
|
||||
mount_apt_cache = mount_apt_cache or '/var/cache/apt/archives'
|
||||
mount_pip_cache = mount_pip_cache or '/root/.cache/pip'
|
||||
mount_poetry_cache = mount_poetry_cache or '/root/.cache/pypoetry'
|
||||
|
||||
@@ -11,6 +11,7 @@ import subprocess
|
||||
import tempfile
|
||||
from copy import deepcopy
|
||||
from pathlib import Path
|
||||
from pprint import pformat
|
||||
from threading import Thread
|
||||
from time import sleep
|
||||
from typing import Text, List, Callable, Any, Collection, Optional, Union
|
||||
@@ -75,8 +76,8 @@ class K8sIntegration(Worker):
|
||||
"export LOCAL_PYTHON=$(which python3.$i) && break ; done",
|
||||
"[ ! -z $LOCAL_PYTHON ] || apt-get install -y python3-pip",
|
||||
"[ ! -z $LOCAL_PYTHON ] || export LOCAL_PYTHON=python3",
|
||||
"$LOCAL_PYTHON -m pip install clearml-agent",
|
||||
"{extra_bash_init_cmd}",
|
||||
"$LOCAL_PYTHON -m pip install clearml-agent",
|
||||
"{extra_docker_bash_script}",
|
||||
"$LOCAL_PYTHON -m clearml_agent execute --full-monitoring --require-queue --id {task_id}"
|
||||
]
|
||||
@@ -130,6 +131,7 @@ class K8sIntegration(Worker):
|
||||
"""
|
||||
super(K8sIntegration, self).__init__()
|
||||
self.k8s_pending_queue_name = k8s_pending_queue_name or self.K8S_PENDING_QUEUE
|
||||
self.k8s_pending_queue_id = None
|
||||
self.kubectl_cmd = kubectl_cmd or self.KUBECTL_RUN_CMD
|
||||
self.container_bash_script = container_bash_script or self.CONTAINER_BASH_SCRIPT
|
||||
# Always do system packages, because by we will be running inside a docker
|
||||
@@ -394,17 +396,17 @@ class K8sIntegration(Worker):
|
||||
# push task into the k8s queue, so we have visibility on pending tasks in the k8s scheduler
|
||||
try:
|
||||
print('Pushing task {} into temporary pending queue'.format(task_id))
|
||||
res = self._session.api_client.tasks.stop(task_id, force=True)
|
||||
_ = self._session.api_client.tasks.stop(task_id, force=True)
|
||||
res = self._session.api_client.tasks.enqueue(
|
||||
task_id,
|
||||
queue=self.k8s_pending_queue_name,
|
||||
queue=self.k8s_pending_queue_id,
|
||||
status_reason='k8s pending scheduler',
|
||||
)
|
||||
if res.meta.result_code != 200:
|
||||
raise Exception(res.meta.result_msg)
|
||||
except Exception as e:
|
||||
self.log.error("ERROR: Could not push back task [{}] to k8s pending queue [{}], error: {}".format(
|
||||
task_id, self.k8s_pending_queue_name, e))
|
||||
self.log.error("ERROR: Could not push back task [{}] to k8s pending queue {} [{}], error: {}".format(
|
||||
task_id, self.k8s_pending_queue_name, self.k8s_pending_queue_id, e))
|
||||
return
|
||||
|
||||
container = get_task_container(self._session, task_id)
|
||||
@@ -679,6 +681,8 @@ class K8sIntegration(Worker):
|
||||
with open(yaml_file, 'wt') as f:
|
||||
yaml.dump(template, f)
|
||||
|
||||
self.log.debug("Applying template:\n{}".format(pformat(template, indent=2)))
|
||||
|
||||
kubectl_cmd = self.KUBECTL_APPLY_CMD.format(
|
||||
task_id=task_id,
|
||||
docker_image=docker_image,
|
||||
@@ -765,13 +769,13 @@ class K8sIntegration(Worker):
|
||||
events_service = self.get_service(Events)
|
||||
|
||||
# make sure we have a k8s pending queue
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
self._session.api_client.queues.create(self.k8s_pending_queue_name)
|
||||
except Exception:
|
||||
pass
|
||||
# get queue id
|
||||
self.k8s_pending_queue_name = self._resolve_name(self.k8s_pending_queue_name, "queues")
|
||||
if not self.k8s_pending_queue_id:
|
||||
resolved_ids = self._resolve_queue_names([self.k8s_pending_queue_name], create_if_missing=True)
|
||||
if not resolved_ids:
|
||||
raise ValueError(
|
||||
"Failed resolving or creating k8s pending queue {}".format(self.k8s_pending_queue_name)
|
||||
)
|
||||
self.k8s_pending_queue_id = resolved_ids[0]
|
||||
|
||||
_last_machine_update_ts = 0
|
||||
while True:
|
||||
|
||||
Reference in New Issue
Block a user