From 01736190c9d1157e2cc6111ad61ea9b6e08bcab8 Mon Sep 17 00:00:00 2001 From: clearml <> Date: Sun, 26 Jan 2025 22:45:50 +0200 Subject: [PATCH] Add support for IP overriding with CLEARML_AGENT_HOST_IP env var. Add port mapping support (requires clearml-agent v2+) with runtime port mapping value _external_host_tcp_port_mapping --- clearml/config/defs.py | 1 + clearml/task.py | 59 +++++++++++++++++++++++++++++++++++------- 2 files changed, 50 insertions(+), 10 deletions(-) diff --git a/clearml/config/defs.py b/clearml/config/defs.py index 926df011..970ba7ae 100644 --- a/clearml/config/defs.py +++ b/clearml/config/defs.py @@ -21,6 +21,7 @@ CLEARML_CACHE_DIR = EnvEntry("CLEARML_CACHE_DIR", "TRAINS_CACHE_DIR") DEBUG_SIMULATE_REMOTE_TASK = EnvEntry("CLEARML_SIMULATE_REMOTE_TASK", type=bool) DEV_DEFAULT_OUTPUT_URI = EnvEntry("CLEARML_DEFAULT_OUTPUT_URI", type=str) TASK_SET_ITERATION_OFFSET = EnvEntry("CLEARML_SET_ITERATION_OFFSET", type=int) +HOST_MACHINE_IP = EnvEntry("CLEARML_AGENT_HOST_IP", type=str) LOG_LEVEL_ENV_VAR = EnvEntry("CLEARML_LOG_LEVEL", "TRAINS_LOG_LEVEL", converter=or_(int, str)) diff --git a/clearml/task.py b/clearml/task.py index 709b5886..f0f7211f 100644 --- a/clearml/task.py +++ b/clearml/task.py @@ -79,7 +79,7 @@ from .binding.gradio_bind import PatchGradio from .binding.frameworks import WeightsFileHandler from .config import ( config, DEV_TASK_NO_REUSE, get_is_master_node, DEBUG_SIMULATE_REMOTE_TASK, DEV_DEFAULT_OUTPUT_URI, - deferred_config, TASK_SET_ITERATION_OFFSET) + deferred_config, TASK_SET_ITERATION_OFFSET, HOST_MACHINE_IP) from .config import running_remotely, get_remote_task_id from .config.cache import SessionCache from .debugging.log import LoggerRoot @@ -190,6 +190,7 @@ class Task(_Task): _external_endpoint_address_map = {"http": "_ADDRESS", "tcp": "external_address"} _external_endpoint_service_map = {"http": "EXTERNAL", "tcp": "EXTERNAL_TCP"} _external_endpoint_internal_port_map = {"http": "_PORT", "tcp": "upstream_task_port"} + _external_endpoint_host_tcp_port_mapping = {"tcp_host_mapping": "_external_host_tcp_port_mapping"} class _ConnectedParametersType(object): argparse = "argument_parser" @@ -926,29 +927,67 @@ class Task(_Task): if internal_port: self._external_endpoint_ports[protocol] = internal_port + # notice this applies for both raw tcp and http, it is so that we can + # detect the host machine exposed ports, and register them on the router + external_host_port_mapping = self._get_runtime_properties().get( + self._external_endpoint_host_tcp_port_mapping["tcp_host_mapping"]) + self._external_endpoint_ports["tcp_host_mapping"] = external_host_port_mapping + + # check if we need to parse the port mapping, only if running on "bare-metal" host machine. + if self._external_endpoint_ports.get("tcp_host_mapping"): + external_host_port_mapping = self._external_endpoint_ports.get("tcp_host_mapping") + # format is docker standard port mapping format: + # example: "out:in,out_range100-out_range102:in_range0-in_range2" + # notice `out` in this context means the host port, the one that + # the router will route external traffic to + # noinspection PyBroadException + out_port = None + # noinspection PyBroadException + try: + for port_range in external_host_port_mapping.split(","): + out_range, in_range = port_range.split(":", 1) + out_range = out_range.split("-") + in_range = in_range.split("-") + if int(in_range[0]) <= port <= int(in_range[-1]): + # we found a match: + out_port = int(out_range[0]) + (port-int(in_range[0])) + print("INFO: Task.request_external_endpoint(...) changed requested external port to {}, " + "conforming to mapped external host ports [{} -> {}]".format(out_port, port, port_range)) + break + + if not out_port: + raise ValueError("match not found defaulting to original port") + except Exception: + print("WARNING: Task.request_external_endpoint(...) failed matching requested port to " + "mapped external host port [{} to {}], " + "proceeding with original port {}".format(port, external_host_port_mapping, port)) + + # change the requested port to the one we have on the machine + if out_port: + port = out_port + # check if we are trying to change the port - currently not allowed if self._external_endpoint_ports.get(protocol): if self._external_endpoint_ports.get(protocol) == port: - # we already set this endpoint, so do nothing - return - - raise ValueError( - "Only one endpoint per protocol can be requested at the moment. Port already exposed is: {}".format( - self._external_endpoint_ports.get(protocol) + # we already set this endpoint, but we will set the values again, because maybe IP changed?! + pass + else: + raise ValueError( + "Only one endpoint per protocol can be requested at the moment. " + "Port already exposed is: {}".format(self._external_endpoint_ports.get(protocol)) ) - ) # mark for the router our request # noinspection PyProtectedMember self._set_runtime_properties( { "_SERVICE": self._external_endpoint_service_map[protocol], - self._external_endpoint_address_map[protocol]: get_private_ip(), + self._external_endpoint_address_map[protocol]: HOST_MACHINE_IP.get() or get_private_ip(), self._external_endpoint_port_map[protocol]: port, } ) # required system_tag for the router to catch the routing request - self.set_system_tags((self.get_system_tags() or []) + ["external_service"]) + self.set_system_tags(list(set((self.get_system_tags() or []) + ["external_service"]))) self._external_endpoint_ports[protocol] = port if wait: return self.wait_for_external_endpoint(