Merge branch 'allegroai:master' into master

This commit is contained in:
Liron Ilouz 2024-04-01 16:59:15 +03:00 committed by GitHub
commit 05600504b6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 1590 additions and 1278 deletions

View File

@ -92,7 +92,7 @@
# pytorch_resolve: "pip"
# additional conda channels to use when installing with conda package manager
conda_channels: ["pytorch", "conda-forge", "defaults", ]
conda_channels: ["pytorch", "conda-forge", "nvidia", "defaults", ]
# If set to true, Task's "installed packages" are ignored,
# and the repository's "requirements.txt" is used instead

View File

@ -140,7 +140,7 @@
vcs_repo_detect_async: true
# Store uncommitted git/hg source code diff in experiment manifest when training in development mode
# This stores "git diff" or "hg diff" into the experiment's "script.requirements.diff" section
# This stores "git diff" or into the experiment's "script.requirements.diff" section
store_uncommitted_code_diff: true
# Support stopping an experiment in case it was externally stopped, status was changed or task was reset

View File

@ -11,6 +11,7 @@ ENV_AUTH_TOKEN = EnvEntry("CLEARML_AUTH_TOKEN")
ENV_VERBOSE = EnvEntry("CLEARML_API_VERBOSE", "TRAINS_API_VERBOSE", type=bool, default=False)
ENV_HOST_VERIFY_CERT = EnvEntry("CLEARML_API_HOST_VERIFY_CERT", "TRAINS_API_HOST_VERIFY_CERT", type=bool, default=True)
ENV_CONDA_ENV_PACKAGE = EnvEntry("CLEARML_CONDA_ENV_PACKAGE", "TRAINS_CONDA_ENV_PACKAGE")
ENV_USE_CONDA_BASE_ENV = EnvEntry("CLEARML_USE_CONDA_BASE_ENV", type=bool)
ENV_NO_DEFAULT_SERVER = EnvEntry("CLEARML_NO_DEFAULT_SERVER", "TRAINS_NO_DEFAULT_SERVER", type=bool, default=True)
ENV_DISABLE_VAULT_SUPPORT = EnvEntry('CLEARML_AGENT_DISABLE_VAULT_SUPPORT', type=bool)
ENV_ENABLE_ENV_CONFIG_SECTION = EnvEntry('CLEARML_AGENT_ENABLE_ENV_CONFIG_SECTION', type=bool)

View File

@ -1396,7 +1396,7 @@ class Worker(ServiceCommandSection):
def _setup_dynamic_gpus(self, gpu_queues):
available_gpus = self.get_runtime_properties()
if available_gpus is None:
raise ValueError("Dynamic GPU allocation is not supported by the ClearML-server")
raise ValueError("Dynamic GPU allocation is not supported by your ClearML-server")
available_gpus = [prop["value"] for prop in available_gpus if prop["key"] == 'available_gpus']
if available_gpus:
gpus = []
@ -1413,7 +1413,9 @@ class Worker(ServiceCommandSection):
if not self.set_runtime_properties(
key='available_gpus', value=','.join(str(g) for g in available_gpus)):
raise ValueError("Dynamic GPU allocation is not supported by the ClearML-server")
raise ValueError("Dynamic GPU allocation is not supported by your ClearML-server")
self.cluster_report_monitor(available_gpus=available_gpus, gpu_queues=gpu_queues)
return available_gpus, gpu_queues
@ -1810,7 +1812,7 @@ class Worker(ServiceCommandSection):
available_gpus = self._dynamic_gpu_get_available(gpu_indexes)
if not self.set_runtime_properties(
key='available_gpus', value=','.join(str(g) for g in available_gpus)):
raise ValueError("Dynamic GPU allocation is not supported by the ClearML-server")
raise ValueError("Dynamic GPU allocation is not supported by your ClearML-server")
def report_monitor(self, report):
if not self.monitor:
@ -1819,6 +1821,13 @@ class Worker(ServiceCommandSection):
self.monitor.set_report(report)
self.monitor.send_report()
def cluster_report_monitor(self, available_gpus, gpu_queues):
if not self.monitor:
self.new_monitor()
self.monitor.setup_cluster_report(
worker_id=self.worker_id, available_gpus=available_gpus, gpu_queues=gpu_queues
)
def stop_monitor(self):
if self.monitor:
self.monitor.stop()
@ -2062,6 +2071,7 @@ class Worker(ServiceCommandSection):
service_mode_internal_agent_started = True
filter_lines = printed_lines[:i+1]
elif line.startswith(log_control_end_msg):
service_mode_internal_agent_started = True
return filter_lines, service_mode_internal_agent_started, 0
return filter_lines, service_mode_internal_agent_started, None
@ -4280,7 +4290,8 @@ class Worker(ServiceCommandSection):
if docker_bash_setup_script and docker_bash_setup_script.strip('\n '):
extra_shell_script = (extra_shell_script or '') + \
' ; '.join(line.strip()
for line in docker_bash_setup_script.split('\n') if line.strip()) + \
for line in docker_bash_setup_script.split('\n')
if line.strip() and not line.lstrip().startswith("#")) + \
' ; '
self.debug(
@ -4504,10 +4515,15 @@ class Worker(ServiceCommandSection):
if self._session.feature_set == "basic":
raise ValueError("Server does not support --use-owner-token option")
role = self._session.get_decoded_token(self._session.token).get("identity", {}).get("role", None)
if role and role not in ["admin", "root", "system"]:
identity = self._session.get_decoded_token(self._session.token).get("identity", {})
role = identity.get("role", None)
try:
service_account_type = int(identity.get("service_account_type", 0))
except ValueError:
service_account_type = 0
if role and (role not in ["admin", "root", "system"] and service_account_type < 2):
raise ValueError(
"User role not suitable for --use-owner-token option (requires at least admin,"
"User role not suitable for --use-owner-token option (requires at least admin or service account,"
" found {})".format(role)
)

View File

@ -248,6 +248,8 @@ ENV_TEMP_STDOUT_FILE_DIR = EnvironmentConfig("CLEARML_AGENT_TEMP_STDOUT_FILE_DIR
ENV_GIT_CLONE_VERBOSE = EnvironmentConfig("CLEARML_AGENT_GIT_CLONE_VERBOSE", type=bool)
ENV_GPU_FRACTIONS = EnvironmentConfig("CLEARML_AGENT_GPU_FRACTIONS")
class FileBuffering(IntEnum):
"""

View File

@ -39,7 +39,7 @@ LOCAL_REGEX = re.compile(
class Requirement(object):
"""
Represents a single requirementfrom clearml_agent.external.requirements_parser.requirement import Requirement
Represents a single requirement from clearml_agent.external.requirements_parser.requirement import Requirement
Typically instances of this class are created with ``Requirement.parse``.
For local file requirements, there's no verification that the file
@ -214,6 +214,7 @@ class Requirement(object):
def parse(cls, line):
"""
Parses a Requirement from a line of a requirement file.
This is the main entry point for parsing a single requirements line (not parse_line!)
:param line: a line of a requirement file
:returns: a Requirement instance for the given line
@ -226,7 +227,7 @@ class Requirement(object):
return cls.parse_editable(
re.sub(r'^(-e|--editable=?)\s*', '', line))
elif '@' in line and ('#' not in line or line.index('#') > line.index('@')):
# Allegro bug fix: support 'name @ git+' entries
# ClearML bug fix: support 'name @ git+' entries
name, uri = line.split('@', 1)
name = name.strip()
uri = uri.strip()

View File

@ -570,7 +570,7 @@ class K8sIntegration(Worker):
print("Kubernetes scheduling task id={}".format(task_id))
try:
template = self._resolve_template(task_session, task_data, queue)
template = self._resolve_template(task_session, task_data, queue, task_id)
except Exception as ex:
print("ERROR: Failed resolving template (skipping): {}".format(ex))
return
@ -1095,8 +1095,9 @@ class K8sIntegration(Worker):
:param list(str) queue: queue name to pull from
"""
queues = queue if isinstance(queue, (list, tuple)) else ([queue] if queue else None)
return self.daemon(
queues=[ObjectID(name=q) for q in queue] if queue else None,
queues=[ObjectID(name=q) for q in queues] if queues else None,
log_level=logging.INFO, foreground=True, docker=False, **kwargs,
)
@ -1105,7 +1106,7 @@ class K8sIntegration(Worker):
self._session, queue=queue, get_task_info=get_task_info
)
def _resolve_template(self, task_session, task_data, queue):
def _resolve_template(self, task_session, task_data, queue, task_id):
if self.template_dict:
return deepcopy(self.template_dict)

View File

@ -57,6 +57,21 @@ class GPUStat(object):
"""
return self.entry['uuid']
@property
def mig_index(self):
"""
Returns the index of the MIG partition (as in nvidia-smi).
"""
return self.entry.get("mig_index")
@property
def mig_uuid(self):
"""
Returns the uuid of the MIG partition returned by nvidia-smi when running in MIG mode,
e.g. MIG-12345678-abcd-abcd-uuid-123456abcdef
"""
return self.entry.get("mig_uuid")
@property
def name(self):
"""
@ -161,6 +176,7 @@ class GPUStatCollection(object):
_initialized = False
_device_count = None
_gpu_device_info = {}
_mig_device_info = {}
def __init__(self, gpu_list, driver_version=None, driver_cuda_version=None):
self.gpus = gpu_list
@ -191,7 +207,7 @@ class GPUStatCollection(object):
return b.decode() # for python3, to unicode
return b
def get_gpu_info(index, handle):
def get_gpu_info(index, handle, is_mig=False):
"""Get one GPU information specified by nvml handle"""
def get_process_info(nv_process):
@ -227,12 +243,14 @@ class GPUStatCollection(object):
pass
return process
if not GPUStatCollection._gpu_device_info.get(index):
device_info = GPUStatCollection._mig_device_info if is_mig else GPUStatCollection._gpu_device_info
if not device_info.get(index):
name = _decode(N.nvmlDeviceGetName(handle))
uuid = _decode(N.nvmlDeviceGetUUID(handle))
GPUStatCollection._gpu_device_info[index] = (name, uuid)
device_info[index] = (name, uuid)
name, uuid = GPUStatCollection._gpu_device_info[index]
name, uuid = device_info[index]
try:
temperature = N.nvmlDeviceGetTemperature(
@ -328,8 +346,36 @@ class GPUStatCollection(object):
for index in range(GPUStatCollection._device_count):
handle = N.nvmlDeviceGetHandleByIndex(index)
gpu_info = get_gpu_info(index, handle)
gpu_stat = GPUStat(gpu_info)
gpu_list.append(gpu_stat)
mig_cnt = 0
# noinspection PyBroadException
try:
mig_cnt = N.nvmlDeviceGetMaxMigDeviceCount(handle)
except Exception:
pass
if mig_cnt <= 0:
gpu_list.append(GPUStat(gpu_info))
continue
got_mig_info = False
for mig_index in range(mig_cnt):
try:
mig_handle = N.nvmlDeviceGetMigDeviceHandleByIndex(handle, mig_index)
mig_info = get_gpu_info(mig_index, mig_handle, is_mig=True)
mig_info["mig_name"] = mig_info["name"]
mig_info["name"] = gpu_info["name"]
mig_info["mig_index"] = mig_info["index"]
mig_info["mig_uuid"] = mig_info["uuid"]
mig_info["index"] = gpu_info["index"]
mig_info["uuid"] = gpu_info["uuid"]
mig_info["temperature.gpu"] = gpu_info["temperature.gpu"]
mig_info["fan.speed"] = gpu_info["fan.speed"]
gpu_list.append(GPUStat(mig_info))
got_mig_info = True
except Exception as e:
pass
if not got_mig_info:
gpu_list.append(GPUStat(gpu_info))
# 2. additional info (driver version, etc).
if get_driver_info:

File diff suppressed because it is too large Load Diff

View File

@ -27,7 +27,7 @@ from clearml_agent.session import Session
from .base import PackageManager
from .pip_api.venv import VirtualenvPip
from .requirements import RequirementsManager, MarkerRequirement
from ...backend_api.session.defs import ENV_CONDA_ENV_PACKAGE
from ...backend_api.session.defs import ENV_CONDA_ENV_PACKAGE, ENV_USE_CONDA_BASE_ENV
package_normalize = partial(re.compile(r"""\[version=['"](.*)['"]\]""").sub, r"\1")
@ -78,6 +78,11 @@ class CondaAPI(PackageManager):
self.path = path
self.env_read_only = False
self.extra_channels = self.session.config.get('agent.package_manager.conda_channels', [])
# install into base conda environment (should only be used if running in docker mode)
self.use_conda_base_env = ENV_USE_CONDA_BASE_ENV.get(
default=self.session.config.get('agent.package_manager.use_conda_base_env', None)
)
# notice this will not install any additional packages into the selected environment
self.conda_env_as_base_docker = \
self.session.config.get('agent.package_manager.conda_env_as_base_docker', None) or \
bool(ENV_CONDA_ENV_PACKAGE.get())
@ -128,16 +133,38 @@ class CondaAPI(PackageManager):
def bin(self):
return self.pip.bin
def _parse_package_marker_match_python_ver(self, line=None, marker_req=None):
if line:
marker_req = MarkerRequirement(Requirement.parse(line))
try:
mock_req = MarkerRequirement(Requirement.parse(marker_req.marker.replace("'", "").replace("\"", "")))
except Exception as ex:
print("WARNING: failed parsing, assuming package is okay {}".format(ex))
return marker_req
if not mock_req.compare_version(requested_version=self.python):
print("SKIPPING package `{}` not required python version {}".format(marker_req.tostr(), self.python))
return None
return marker_req
# noinspection SpellCheckingInspection
def upgrade_pip(self):
# do not change pip version if pre built environement is used
if self.env_read_only:
print('Conda environment in read-only mode, skipping pip upgrade.')
return ''
pip_versions = []
for req_pip_line in self.pip.get_pip_versions():
req = self._parse_package_marker_match_python_ver(line=req_pip_line)
if req:
pip_versions.append(req.tostr(markers=False))
return self._install(
*select_for_platform(
windows=self.pip.get_pip_versions(),
linux=self.pip.get_pip_versions()
windows=pip_versions,
linux=pip_versions
)
)
@ -172,6 +199,15 @@ class CondaAPI(PackageManager):
else:
raise ValueError("Could not restore Conda environment, cannot find {}".format(
self.conda_pre_build_env_path))
elif self.use_conda_base_env:
try:
base_path = Path(self.conda).parent.parent.as_posix()
print("Using base conda environment at {}".format(base_path))
self._init_existing_environment(base_path, is_readonly=False)
return self
except Exception as ex:
print("WARNING: Failed using base conda environment, reverting to new environment: {}".format(ex))
command = Argv(
self.conda,
@ -199,10 +235,25 @@ class CondaAPI(PackageManager):
return self
def _init_existing_environment(self, conda_pre_build_env_path):
def _init_existing_environment(self, conda_pre_build_env_path, is_readonly=True):
print("Using pre-existing Conda environment from {}".format(conda_pre_build_env_path))
self.path = Path(conda_pre_build_env_path)
self.source = ("conda", "activate", self.path.as_posix())
conda_env = self._get_conda_sh()
self.source = CommandSequence(('source', conda_env.as_posix()), self.source)
conda_packages_json = json.loads(
self._run_command((self.conda, "list", "--json", "-p", self.path), raw=True))
try:
for package in conda_packages_json:
if package.get("name") == "python" and package.get("version"):
self.python = ".".join(package.get("version").split(".")[:2])
print("Existing conda environment, found python version {}".format(self.python))
break
except Exception as ex:
print("WARNING: failed detecting existing conda python version: {}".format(ex))
self.pip = CondaPip(
session=self.session,
source=self.source,
@ -210,9 +261,9 @@ class CondaAPI(PackageManager):
requirements_manager=self.requirements_manager,
path=self.path,
)
conda_env = self._get_conda_sh()
self.source = self.pip.source = CommandSequence(('source', conda_env.as_posix()), self.source)
self.env_read_only = True
self.pip.source = self.source
self.env_read_only = is_readonly
def remove(self):
"""
@ -498,7 +549,7 @@ class CondaAPI(PackageManager):
if '.' not in m.specs[0][1]:
continue
if m.name.lower() == 'cudatoolkit':
if m.name.lower() in ('cudatoolkit', 'cuda-toolkit'):
# skip cuda if we are running on CPU
if not cuda_version:
continue
@ -525,10 +576,22 @@ class CondaAPI(PackageManager):
has_torch = True
m.req.name = 'tensorflow-gpu' if cuda_version > 0 else 'tensorflow'
# push the clearml packages into the pip_requirements
if "clearml" in m.req.name and "clearml" not in self.extra_channels:
if self.session.debug_mode:
print("info: moving `{}` packages to `pip` section".format(m.req))
pip_requirements.append(m)
continue
reqs.append(m)
if not has_cudatoolkit and cuda_version:
m = MarkerRequirement(Requirement.parse("cudatoolkit == {}".format(cuda_version_full)))
# nvidia channel is using `cuda-toolkit` and has newer versions of cuda,
# older cuda can be picked from conda-forge (<12)
if "nvidia" in self.extra_channels:
m = MarkerRequirement(Requirement.parse("cuda-toolkit == {}".format(cuda_version_full)))
else:
m = MarkerRequirement(Requirement.parse("cudatoolkit == {}".format(cuda_version_full)))
has_cudatoolkit = True
reqs.append(m)
@ -588,21 +651,30 @@ class CondaAPI(PackageManager):
if r.name and not r.name.startswith('_') and not requirements.get('conda', None):
r.name = r.name.replace('_', '-')
if has_cudatoolkit and r.specs and len(r.specs[0]) > 1 and r.name == 'cudatoolkit':
if has_cudatoolkit and r.specs and len(r.specs[0]) > 1 and r.name in ('cudatoolkit', 'cuda-toolkit'):
# select specific cuda version if it came from the requirements
r.specs = [(r.specs[0][0].replace('==', '='), r.specs[0][1].split('.post')[0])]
elif r.specs and r.specs[0] and len(r.specs[0]) > 1:
# remove .post from version numbers it fails with ~= version, and change == to ~=
r.specs = [(r.specs[0][0].replace('==', '~='), r.specs[0][1].split('.post')[0])]
r.specs = [(s[0].replace('==', '~='), s[1].split('.post')[0]) for s in r.specs]
while reqs:
# notice, we give conda more freedom in version selection, to help it choose best combination
def clean_ver(ar):
if not ar.specs:
return ar.tostr()
ar.specs = [(ar.specs[0][0], ar.specs[0][1] + '.0' if '.' not in ar.specs[0][1] else ar.specs[0][1])]
return ar.tostr()
conda_env['dependencies'] = [clean_ver(r) for r in reqs]
markers = None
if ar.marker:
# check if we really need it based on python version
ar = self._parse_package_marker_match_python_ver(marker_req=ar)
if not ar:
# empty lines should be skipped
return ""
# if we do make sure we note that we ignored markers
print("WARNING: ignoring marker in `{}`".format(ar.tostr()))
markers = False
if ar.specs:
ar.specs = [(s[0], s[1] + '.0' if '.' not in s[1] else s[1]) for s in ar.specs]
return ar.tostr(markers=markers)
conda_env['dependencies'] = [clean_ver(r) for r in reqs if clean_ver(r)]
with self.temp_file("conda_env", yaml.dump(conda_env), suffix=".yml") as name:
print('Conda: Trying to install requirements:\n{}'.format(conda_env['dependencies']))
if self.session.debug_mode:

View File

@ -670,8 +670,7 @@ class PytorchRequirement(SimpleSubstitution):
return MarkerRequirement(Requirement.parse(self._fix_setuptools))
return None
@classmethod
def get_torch_index_url(cls, cuda_version, nightly=False):
def get_torch_index_url(self, cuda_version, nightly=False):
# noinspection PyBroadException
try:
cuda = int(cuda_version)
@ -681,39 +680,39 @@ class PytorchRequirement(SimpleSubstitution):
if nightly:
for c in range(cuda, max(-1, cuda-15), -1):
# then try the nightly builds, it might be there...
torch_url = cls.nightly_extra_index_url_template.format(c)
torch_url = self.nightly_extra_index_url_template.format(c)
# noinspection PyBroadException
try:
if requests.get(torch_url, timeout=10).ok:
print('Torch nightly CUDA {} index page found'.format(c))
cls.torch_index_url_lookup[c] = torch_url
return cls.torch_index_url_lookup[c], c
self.torch_index_url_lookup[c] = torch_url
return self.torch_index_url_lookup[c], c
except Exception:
pass
return
# first check if key is valid
if cuda in cls.torch_index_url_lookup:
return cls.torch_index_url_lookup[cuda], cuda
if cuda in self.torch_index_url_lookup:
return self.torch_index_url_lookup[cuda], cuda
# then try a new cuda version page
for c in range(cuda, max(-1, cuda-15), -1):
torch_url = cls.extra_index_url_template.format(c)
torch_url = self.extra_index_url_template.format(c)
# noinspection PyBroadException
try:
if requests.get(torch_url, timeout=10).ok:
print('Torch CUDA {} index page found, adding `{}`'.format(c, torch_url))
cls.torch_index_url_lookup[c] = torch_url
return cls.torch_index_url_lookup[c], c
self.torch_index_url_lookup[c] = torch_url
return self.torch_index_url_lookup[c], c
except Exception:
pass
keys = sorted(cls.torch_index_url_lookup.keys(), reverse=True)
keys = sorted(self.torch_index_url_lookup.keys(), reverse=True)
for k in keys:
if k <= cuda:
return cls.torch_index_url_lookup[k], k
return self.torch_index_url_lookup[k], k
# return default - zero
return cls.torch_index_url_lookup[0], 0
return self.torch_index_url_lookup[0], 0
MAP = {
"windows": {

View File

@ -1,19 +1,20 @@
from __future__ import unicode_literals, division
import logging
import os
import re
import shlex
from collections import deque
from itertools import starmap
from threading import Thread, Event
from time import time
from typing import Text, Sequence
from typing import Sequence, List, Union, Dict, Optional
import attr
import psutil
from pathlib2 import Path
from clearml_agent.definitions import ENV_WORKER_TAGS, ENV_GPU_FRACTIONS
from clearml_agent.session import Session
from clearml_agent.definitions import ENV_WORKER_TAGS
try:
from .gpu import gpustat
@ -54,6 +55,14 @@ class ResourceMonitor(object):
if value is not None
}
@attr.s
class ClusterReport:
cluster_key = attr.ib(type=str)
max_gpus = attr.ib(type=int, default=None)
max_workers = attr.ib(type=int, default=None)
max_cpus = attr.ib(type=int, default=None)
resource_groups = attr.ib(type=Sequence[str], factory=list)
def __init__(
self,
session, # type: Session
@ -61,7 +70,7 @@ class ResourceMonitor(object):
sample_frequency_per_sec=2.0,
report_frequency_sec=30.0,
first_report_sec=None,
worker_tags=None,
worker_tags=None
):
self.session = session
self.queue = deque(maxlen=1)
@ -79,7 +88,13 @@ class ResourceMonitor(object):
self._gpustat_fail = 0
self._gpustat = gpustat
self._active_gpus = None
self._default_gpu_utilization = session.config.get("agent.resource_monitoring.default_gpu_utilization", 100)
# allow default_gpu_utilization as null in the config, in which case we don't log anything
if self._default_gpu_utilization is not None:
self._default_gpu_utilization = int(self._default_gpu_utilization)
self._gpu_utilization_warning_sent = False
self._disk_use_path = str(session.config.get("agent.resource_monitoring.disk_use_path", None) or Path.home())
self._fractions_handler = GpuFractionsHandler() if session.feature_set != "basic" else None
if not worker_tags and ENV_WORKER_TAGS.get():
worker_tags = shlex.split(ENV_WORKER_TAGS.get())
self._worker_tags = worker_tags
@ -92,6 +107,7 @@ class ResourceMonitor(object):
else:
# None means no filtering, report all gpus
self._active_gpus = None
# noinspection PyBroadException
try:
active_gpus = Session.get_nvidia_visible_env()
# None means no filtering, report all gpus
@ -99,6 +115,10 @@ class ResourceMonitor(object):
self._active_gpus = [g.strip() for g in str(active_gpus).split(',')]
except Exception:
pass
self._cluster_report_interval_sec = int(session.config.get(
"agent.resource_monitoring.cluster_report_interval_sec", 60
))
self._cluster_report = None
def set_report(self, report):
# type: (ResourceMonitor.StatusReport) -> ()
@ -130,6 +150,7 @@ class ResourceMonitor(object):
)
log.debug("sending report: %s", report)
# noinspection PyBroadException
try:
self.session.get(service="workers", action="status_report", **report)
except Exception:
@ -137,7 +158,76 @@ class ResourceMonitor(object):
return False
return True
def send_cluster_report(self) -> bool:
if not self.session.feature_set == "basic":
return False
# noinspection PyBroadException
try:
properties = {
"max_cpus": self._cluster_report.max_cpus,
"max_gpus": self._cluster_report.max_gpus,
"max_workers": self._cluster_report.max_workers,
}
payload = {
"key": self._cluster_report.cluster_key,
"timestamp": int(time() * 1000),
"timeout": int(self._cluster_report_interval_sec * 2),
# "resource_groups": self._cluster_report.resource_groups, # yet to be supported
"properties": {k: v for k, v in properties.items() if v is not None},
}
self.session.post(service="workers", action="cluster_report", **payload)
except Exception as ex:
log.warning("Failed sending cluster report: %s", ex)
return False
return True
def setup_cluster_report(self, available_gpus, gpu_queues, worker_id=None, cluster_key=None, resource_groups=None):
# type: (List[int], Dict[str, int], Optional[str], Optional[str], Optional[List[str]]) -> ()
"""
Set up a cluster report for the enterprise server dashboard feature.
If a worker_id is provided, cluster_key and resource_groups are inferred from it.
"""
if self.session.feature_set == "basic":
return
if not worker_id and not cluster_key:
print("Error: cannot set up dashboard reporting - worker_id or cluster key are required")
return
# noinspection PyBroadException
try:
if not cluster_key:
worker_id_parts = worker_id.split(":")
if len(worker_id_parts) < 3:
cluster_key = self.session.config.get("agent.resource_dashboard.default_cluster_name", "onprem")
resource_group = ":".join((cluster_key, worker_id_parts[0]))
print(
'WARNING: your worker ID "{}" is not suitable for proper resource dashboard reporting, please '
'set up agent.worker_name to be at least two colon-separated parts (i.e. "<category>:<name>"). '
'Using "{}" as the resource dashboard category and "{}" as the resource group.'.format(
worker_id, cluster_key, resource_group
)
)
else:
cluster_key = worker_id_parts[0]
resource_group = ":".join((worker_id_parts[:2]))
resource_groups = [resource_group]
self._cluster_report = ResourceMonitor.ClusterReport(
cluster_key=cluster_key,
max_gpus=len(available_gpus),
max_workers=len(available_gpus) // min(x for x, _ in gpu_queues.values()),
resource_groups=resource_groups
)
self.send_cluster_report()
except Exception as ex:
print("Error: failed setting cluster report: {}".format(ex))
def _daemon(self):
last_cluster_report = 0
seconds_since_started = 0
reported = 0
try:
@ -154,7 +244,7 @@ class ResourceMonitor(object):
try:
self._update_readouts()
except Exception as ex:
log.warning("failed getting machine stats: %s", report_error(ex))
log.error("failed getting machine stats: %s", report_error(ex))
self._failure()
seconds_since_started += int(round(time() - last_report))
@ -177,6 +267,15 @@ class ResourceMonitor(object):
# count reported iterations
reported += 1
if (
self._cluster_report and
self._cluster_report_interval_sec
and time() - last_cluster_report > self._cluster_report_interval_sec
):
if self.send_cluster_report():
last_cluster_report = time()
except Exception as ex:
log.exception("Error reporting monitoring info: %s", str(ex))
@ -265,25 +364,47 @@ class ResourceMonitor(object):
if self._active_gpus is not False and self._gpustat:
try:
gpu_stat = self._gpustat.new_query()
report_index = 0
for i, g in enumerate(gpu_stat.gpus):
# only monitor the active gpu's, if none were selected, monitor everything
if self._active_gpus:
uuid = getattr(g, "uuid", None)
if str(i) not in self._active_gpus and (not uuid or uuid not in self._active_gpus):
mig_uuid = getattr(g, "mig_uuid", None)
if (
str(g.index) not in self._active_gpus
and (not uuid or uuid not in self._active_gpus)
and (not mig_uuid or mig_uuid not in self._active_gpus)
):
continue
stats["gpu_temperature_{:d}".format(i)] = g["temperature.gpu"]
stats["gpu_utilization_{:d}".format(i)] = g["utilization.gpu"]
stats["gpu_mem_usage_{:d}".format(i)] = (
stats["gpu_temperature_{}".format(report_index)] = g["temperature.gpu"]
if g["utilization.gpu"] is not None:
stats["gpu_utilization_{}".format(report_index)] = g["utilization.gpu"]
elif self._default_gpu_utilization is not None:
stats["gpu_utilization_{}".format(report_index)] = self._default_gpu_utilization
if getattr(g, "mig_index", None) is None and not self._gpu_utilization_warning_sent:
# this shouldn't happen for non-MIGs, warn the user about it
log.error("Failed fetching GPU utilization")
self._gpu_utilization_warning_sent = True
stats["gpu_mem_usage_{}".format(report_index)] = (
100.0 * g["memory.used"] / g["memory.total"]
)
# already in MBs
stats["gpu_mem_free_{:d}".format(i)] = (
stats["gpu_mem_free_{}".format(report_index)] = (
g["memory.total"] - g["memory.used"]
)
stats["gpu_mem_used_%d" % i] = g["memory.used"]
stats["gpu_mem_used_{}".format(report_index)] = g["memory.used"] or 0
if self._fractions_handler:
fractions = self._fractions_handler.fractions
stats["gpu_fraction_{}".format(report_index)] = \
(fractions[i] if i < len(fractions) else fractions[-1]) if fractions else 1.0
except Exception as ex:
# something happened and we can't use gpu stats,
log.warning("failed getting machine stats: %s", report_error(ex))
log.error("failed getting machine stats: %s", report_error(ex))
self._failure()
return stats
@ -296,19 +417,137 @@ class ResourceMonitor(object):
)
self._gpustat = None
BACKEND_STAT_MAP = {"cpu_usage_*": "cpu_usage",
"cpu_temperature_*": "cpu_temperature",
"disk_free_percent": "disk_free_home",
"io_read_mbs": "disk_read",
"io_write_mbs": "disk_write",
"network_tx_mbs": "network_tx",
"network_rx_mbs": "network_rx",
"memory_free": "memory_free",
"memory_used": "memory_used",
"gpu_temperature_*": "gpu_temperature",
"gpu_mem_used_*": "gpu_memory_used",
"gpu_mem_free_*": "gpu_memory_free",
"gpu_utilization_*": "gpu_usage"}
BACKEND_STAT_MAP = {
"cpu_usage_*": "cpu_usage",
"cpu_temperature_*": "cpu_temperature",
"disk_free_percent": "disk_free_home",
"io_read_mbs": "disk_read",
"io_write_mbs": "disk_write",
"network_tx_mbs": "network_tx",
"network_rx_mbs": "network_rx",
"memory_free": "memory_free",
"memory_used": "memory_used",
"gpu_temperature_*": "gpu_temperature",
"gpu_mem_used_*": "gpu_memory_used",
"gpu_mem_free_*": "gpu_memory_free",
"gpu_utilization_*": "gpu_usage",
"gpu_fraction_*": "gpu_fraction"
}
class GpuFractionsHandler:
_number_re = re.compile(r"^clear\.ml/fraction(-\d+)?$")
_mig_re = re.compile(r"^nvidia\.com/mig-(?P<compute>[0-9]+)g\.(?P<memory>[0-9]+)gb$")
_gpu_name_to_memory_gb = {
"A30": 24,
"NVIDIA A30": 24,
"A100-SXM4-40GB": 40,
"NVIDIA-A100-40GB-PCIe": 40,
"NVIDIA A100-40GB-PCIe": 40,
"NVIDIA-A100-SXM4-40GB": 40,
"NVIDIA A100-SXM4-40GB": 40,
"NVIDIA-A100-SXM4-80GB": 79,
"NVIDIA A100-SXM4-80GB": 79,
"NVIDIA-A100-80GB-PCIe": 79,
"NVIDIA A100-80GB-PCIe": 79,
}
def __init__(self):
self._total_memory_gb = [
self._gpu_name_to_memory_gb.get(name, 0)
for name in (self._get_gpu_names() or [])
]
self._fractions = self._get_fractions()
@property
def fractions(self) -> List[float]:
return self._fractions
def _get_fractions(self) -> List[float]:
if not self._total_memory_gb:
# Can't compute
return [1.0]
fractions = (ENV_GPU_FRACTIONS.get() or "").strip()
if not fractions:
# No fractions
return [1.0]
decoded_fractions = self.decode_fractions(fractions)
if isinstance(decoded_fractions, list):
return decoded_fractions
totals = []
for i, (fraction, count) in enumerate(decoded_fractions.items()):
m = self._mig_re.match(fraction)
if not m:
continue
try:
total_gb = self._total_memory_gb[i] if i < len(self._total_memory_gb) else self._total_memory_gb[-1]
if not total_gb:
continue
totals.append((int(m.group("memory")) * count) / total_gb)
except ValueError:
pass
if not totals:
log.warning("Fractions count is empty for {}".format(fractions))
return [1.0]
return totals
@classmethod
def extract_custom_limits(cls, limits: dict):
for k, v in list(limits.items() or []):
if cls._number_re.match(k):
limits.pop(k, None)
@classmethod
def get_simple_fractions_total(cls, limits: dict) -> float:
try:
if any(cls._number_re.match(x) for x in limits):
return sum(float(v) for k, v in limits.items() if cls._number_re.match(k))
except Exception as ex:
log.error("Failed summing up fractions from {}: {}".format(limits, ex))
return 0
@classmethod
def encode_fractions(cls, limits: dict) -> str:
if any(cls._number_re.match(x) for x in (limits or {})):
return ",".join(str(v) for k, v in sorted(limits.items()) if cls._number_re.match(k))
return ",".join(("{}:{}".format(k, v) for k, v in (limits or {}).items() if cls._mig_re.match(k)))
@staticmethod
def decode_fractions(fractions: str) -> Union[List[float], Dict[str, int]]:
try:
items = [f.strip() for f in fractions.strip().split(",")]
tuples = [(k.strip(), v.strip()) for k, v in (f.partition(":")[::2] for f in items)]
if all(not v for _, v in tuples):
# comma-separated float fractions
return [float(k) for k, _ in tuples]
# comma-separated slice:count items
return {
k.strip(): int(v.strip())
for k, v in tuples
}
except Exception as ex:
log.error("Failed decoding GPU fractions '{}': {}".format(fractions, ex))
return {}
@staticmethod
def _get_gpu_names():
# noinspection PyBroadException
try:
gpus = gpustat.new_query().gpus
names = [g["name"] for g in gpus]
print("GPU names: {}".format(names))
return names
except Exception as ex:
log.error("Failed getting GPU names: {}".format(ex))
def report_error(ex):

View File

@ -361,7 +361,7 @@ sdk {
vcs_repo_detect_async: true
# Store uncommitted git/hg source code diff in experiment manifest when training in development mode
# This stores "git diff" or "hg diff" into the experiment's "script.requirements.diff" section
# This stores "git diff" or into the experiment's "script.requirements.diff" section
store_uncommitted_code_diff: true
# Support stopping an experiment in case it was externally stopped, status was changed or task was reset

View File

@ -108,10 +108,17 @@ agent {
# pytorch_resolve: "pip"
# additional conda channels to use when installing with conda package manager
conda_channels: ["pytorch", "conda-forge", "defaults", ]
conda_channels: ["pytorch", "conda-forge", "nvidia", "defaults", ]
# conda_full_env_update: false
# notice this will not install any additional packages into the selected environment, should be used in
# conjunction with CLEARML_CONDA_ENV_PACKAGE which points to an existing conda environment directory
# conda_env_as_base_docker: false
# install into base conda environment
# (should only be used if running in docker mode, because it will change the conda base enrichment)
# use_conda_base_env: false
# set the priority packages to be installed before the rest of the required packages
# Note: this only controls the installation order of existing requirement packages (and does not add additional packages)
# priority_packages: ["cython", "numpy", "setuptools", ]
@ -294,6 +301,7 @@ agent {
# sdk_cache: "/clearml_agent_cache"
# apt_cache: "/var/cache/apt/archives"
# ssh_folder: "/root/.ssh"
# ssh_ro_folder: "/.ssh"
# pip_cache: "/root/.cache/pip"
# poetry_cache: "/root/.cache/pypoetry"
# vcs_cache: "/root/.clearml/vcs-cache"
@ -451,7 +459,7 @@ sdk {
vcs_repo_detect_async: True
# Store uncommitted git/hg source code diff in experiment manifest when training in development mode
# This stores "git diff" or "hg diff" into the experiment's "script.requirements.diff" section
# This stores "git diff" or into the experiment's "script.requirements.diff" section
store_uncommitted_code_diff_on_train: True
# Support stopping an experiment in case it was externally stopped, status was changed or task was reset

View File

@ -1,15 +1,15 @@
attrs>=18.0,<23.0.0
attrs>=18.0,<24.0.0
enum34>=0.9,<1.2.0 ; python_version < '3.6'
furl>=2.0.0,<2.2.0
jsonschema>=2.6.0,<5.0.0
pathlib2>=2.3.0,<2.4.0
psutil>=3.4.2,<5.10.0
pyparsing>=2.0.3,<3.1.0
pyparsing>=2.0.3,<3.2.0
python-dateutil>=2.4.2,<2.9.0
pyjwt>=2.4.0,<2.7.0
pyjwt>=2.4.0,<2.9.0
PyYAML>=3.12,<6.1
requests>=2.20.0,<=2.31.0
six>=1.13.0,<1.17.0
typing>=3.6.4,<3.8.0 ; python_version < '3.5'
urllib3>=1.21.1,<1.27.0
urllib3>=1.21.1,<2
virtualenv>=16,<21