Add CLEARML_MULTI_NODE_SINGLE_TASK (values -1, 0, 1, 2) for easier multi-node singe Task workloads

This commit is contained in:
allegroai 2024-07-04 15:27:10 +03:00
parent 7dc601598b
commit 9594e5dddd
2 changed files with 41 additions and 2 deletions

View File

@ -28,6 +28,9 @@ SUPPRESS_UPDATE_MESSAGE_ENV_VAR = EnvEntry("CLEARML_SUPPRESS_UPDATE_MESSAGE", "T
MAX_SERIES_PER_METRIC = EnvEntry("CLEARML_MAX_SERIES_PER_METRIC", default=100, type=int) MAX_SERIES_PER_METRIC = EnvEntry("CLEARML_MAX_SERIES_PER_METRIC", default=100, type=int)
# values are 0/None (task per node), 1/2 (multi-node reporting, colored console), -1 (only report rank 0 node)
ENV_MULTI_NODE_SINGLE_TASK = EnvEntry("CLEARML_MULTI_NODE_SINGLE_TASK", type=int, default=None)
JUPYTER_PASSWORD = EnvEntry("CLEARML_JUPYTER_PASSWORD") JUPYTER_PASSWORD = EnvEntry("CLEARML_JUPYTER_PASSWORD")
# Repository detection # Repository detection

View File

@ -3,6 +3,7 @@ import os
import platform import platform
import sys import sys
import warnings import warnings
from math import ceil, log10
from time import time from time import time
import psutil import psutil
@ -12,7 +13,7 @@ from typing import Text
from .process.mp import BackgroundMonitor from .process.mp import BackgroundMonitor
from ..backend_api import Session from ..backend_api import Session
from ..binding.frameworks.tensorflow_bind import IsTensorboardInit from ..binding.frameworks.tensorflow_bind import IsTensorboardInit
from ..config import config from ..config import config, ENV_MULTI_NODE_SINGLE_TASK
try: try:
from .gpu import gpustat from .gpu import gpustat
@ -103,6 +104,31 @@ class ResourceMonitor(BackgroundMonitor):
if self._is_thread_mode_and_not_main_process(): if self._is_thread_mode_and_not_main_process():
return return
multi_node_single_task_reporting = False
report_node_as_series = False
rank = 0
world_size_digits = 0
# check if we are in multi-node reporting to the same Task
if ENV_MULTI_NODE_SINGLE_TASK.get():
# if resource monitoring is disabled, do nothing
if ENV_MULTI_NODE_SINGLE_TASK.get() < 0:
return
# we are reporting machines stats on a different machine over the same Task
multi_node_single_task_reporting = True
if ENV_MULTI_NODE_SINGLE_TASK.get() == 1:
# report per machine graph (unique title)
report_node_as_series = False
elif ENV_MULTI_NODE_SINGLE_TASK.get() == 2:
# report per machine series (i.e. merge title+series resource and have "node X" as different series)
report_node_as_series = True
# noinspection PyBroadException
try:
rank = int(os.environ.get("RANK") or 0)
world_size_digits = ceil(log10(int(os.environ.get("WORLD_SIZE") or 0)))
except Exception:
pass
seconds_since_started = 0 seconds_since_started = 0
reported = 0 reported = 0
last_iteration = 0 last_iteration = 0
@ -196,9 +222,19 @@ class ResourceMonitor(BackgroundMonitor):
# noinspection PyBroadException # noinspection PyBroadException
try: try:
title = self._title_gpu if k.startswith('gpu_') else self._title_machine title = self._title_gpu if k.startswith('gpu_') else self._title_machine
series = k
# 3 points after the dot # 3 points after the dot
if multi_node_single_task_reporting:
if report_node_as_series:
title = "{}:{}".format(":".join(title.split(":")[:-1]), series)
series = "rank {:0{world_size_digits}d}".format(
rank, world_size_digits=world_size_digits)
else:
title = "{}:rank{:0{world_size_digits}d}".format(
title, rank, world_size_digits=world_size_digits)
value = round(v * 1000) / 1000. value = round(v * 1000) / 1000.
self._task.get_logger().report_scalar(title=title, series=k, iteration=iteration, value=value) self._task.get_logger().report_scalar(title=title, series=series, iteration=iteration, value=value)
except Exception: except Exception:
pass pass
# clear readouts if this is update is not averaged # clear readouts if this is update is not averaged