Add CLEARML_MULTI_NODE_SINGLE_TASK (values -1, 0, 1, 2) for easier multi-node singe Task workloads

This commit is contained in:
allegroai 2024-07-24 17:42:25 +03:00
parent 93df021108
commit ab9b9db0c9
2 changed files with 16 additions and 1 deletions

View File

@ -22,6 +22,9 @@ ENV_INITIAL_CONNECT_RETRY_OVERRIDE = EnvEntry(
'CLEARML_AGENT_INITIAL_CONNECT_RETRY_OVERRIDE', default=True, converter=safe_text_to_bool
)
ENV_FORCE_MAX_API_VERSION = EnvEntry("CLEARML_AGENT_FORCE_MAX_API_VERSION", type=str)
# values are 0/None (task per node), 1/2 (multi-node reporting, colored console), -1 (only report rank 0 node)
ENV_MULTI_NODE_SINGLE_TASK = EnvEntry("CLEARML_MULTI_NODE_SINGLE_TASK", type=int, default=None)
"""
Experimental option to set the request method for all API requests and auth login.

View File

@ -38,7 +38,7 @@ from clearml_agent.backend_api.services import workers as workers_api
from clearml_agent.backend_api.session import CallResult, Request
from clearml_agent.backend_api.session.defs import (
ENV_ENABLE_ENV_CONFIG_SECTION, ENV_ENABLE_FILES_CONFIG_SECTION,
ENV_VENV_CONFIGURED, ENV_PROPAGATE_EXITCODE, )
ENV_VENV_CONFIGURED, ENV_PROPAGATE_EXITCODE, ENV_MULTI_NODE_SINGLE_TASK, )
from clearml_agent.backend_config import Config
from clearml_agent.backend_config.defs import UptimeConf
from clearml_agent.backend_config.utils import apply_environment, apply_files
@ -2063,6 +2063,18 @@ class Worker(ServiceCommandSection):
lines_buffer = defaultdict(list)
def report_lines(lines, source):
# support colored multi-node reporting on the same Task for easier debugging
if lines and ENV_MULTI_NODE_SINGLE_TASK.get() and ENV_MULTI_NODE_SINGLE_TASK.get() > 0:
# noinspection PyBroadException
try:
rank = int(os.environ.get("RANK") or 0)
except Exception:
rank = 0
if rank:
# see ANSI color: https://en.wikipedia.org/wiki/ANSI_escape_code#8-bit
# Only the "RANK x: line is colored to preserve the original color reporting
lines = ["\033[38;5;{}mRANK {}:\033[0m\n".format(20+(rank % 210), rank)] + lines
if not self._truncate_task_output_files:
# non-buffered
return self.send_logs(task_id, lines, session=session)