mirror of
https://github.com/clearml/clearml
synced 2025-02-07 05:18:50 +00:00
Add support for IP overriding with CLEARML_AGENT_HOST_IP env var.
Add port mapping support (requires clearml-agent v2+) with runtime port mapping value _external_host_tcp_port_mapping
This commit is contained in:
parent
7899630d66
commit
01736190c9
@ -21,6 +21,7 @@ CLEARML_CACHE_DIR = EnvEntry("CLEARML_CACHE_DIR", "TRAINS_CACHE_DIR")
|
|||||||
DEBUG_SIMULATE_REMOTE_TASK = EnvEntry("CLEARML_SIMULATE_REMOTE_TASK", type=bool)
|
DEBUG_SIMULATE_REMOTE_TASK = EnvEntry("CLEARML_SIMULATE_REMOTE_TASK", type=bool)
|
||||||
DEV_DEFAULT_OUTPUT_URI = EnvEntry("CLEARML_DEFAULT_OUTPUT_URI", type=str)
|
DEV_DEFAULT_OUTPUT_URI = EnvEntry("CLEARML_DEFAULT_OUTPUT_URI", type=str)
|
||||||
TASK_SET_ITERATION_OFFSET = EnvEntry("CLEARML_SET_ITERATION_OFFSET", type=int)
|
TASK_SET_ITERATION_OFFSET = EnvEntry("CLEARML_SET_ITERATION_OFFSET", type=int)
|
||||||
|
HOST_MACHINE_IP = EnvEntry("CLEARML_AGENT_HOST_IP", type=str)
|
||||||
|
|
||||||
LOG_LEVEL_ENV_VAR = EnvEntry("CLEARML_LOG_LEVEL", "TRAINS_LOG_LEVEL", converter=or_(int, str))
|
LOG_LEVEL_ENV_VAR = EnvEntry("CLEARML_LOG_LEVEL", "TRAINS_LOG_LEVEL", converter=or_(int, str))
|
||||||
|
|
||||||
|
@ -79,7 +79,7 @@ from .binding.gradio_bind import PatchGradio
|
|||||||
from .binding.frameworks import WeightsFileHandler
|
from .binding.frameworks import WeightsFileHandler
|
||||||
from .config import (
|
from .config import (
|
||||||
config, DEV_TASK_NO_REUSE, get_is_master_node, DEBUG_SIMULATE_REMOTE_TASK, DEV_DEFAULT_OUTPUT_URI,
|
config, DEV_TASK_NO_REUSE, get_is_master_node, DEBUG_SIMULATE_REMOTE_TASK, DEV_DEFAULT_OUTPUT_URI,
|
||||||
deferred_config, TASK_SET_ITERATION_OFFSET)
|
deferred_config, TASK_SET_ITERATION_OFFSET, HOST_MACHINE_IP)
|
||||||
from .config import running_remotely, get_remote_task_id
|
from .config import running_remotely, get_remote_task_id
|
||||||
from .config.cache import SessionCache
|
from .config.cache import SessionCache
|
||||||
from .debugging.log import LoggerRoot
|
from .debugging.log import LoggerRoot
|
||||||
@ -190,6 +190,7 @@ class Task(_Task):
|
|||||||
_external_endpoint_address_map = {"http": "_ADDRESS", "tcp": "external_address"}
|
_external_endpoint_address_map = {"http": "_ADDRESS", "tcp": "external_address"}
|
||||||
_external_endpoint_service_map = {"http": "EXTERNAL", "tcp": "EXTERNAL_TCP"}
|
_external_endpoint_service_map = {"http": "EXTERNAL", "tcp": "EXTERNAL_TCP"}
|
||||||
_external_endpoint_internal_port_map = {"http": "_PORT", "tcp": "upstream_task_port"}
|
_external_endpoint_internal_port_map = {"http": "_PORT", "tcp": "upstream_task_port"}
|
||||||
|
_external_endpoint_host_tcp_port_mapping = {"tcp_host_mapping": "_external_host_tcp_port_mapping"}
|
||||||
|
|
||||||
class _ConnectedParametersType(object):
|
class _ConnectedParametersType(object):
|
||||||
argparse = "argument_parser"
|
argparse = "argument_parser"
|
||||||
@ -926,29 +927,67 @@ class Task(_Task):
|
|||||||
if internal_port:
|
if internal_port:
|
||||||
self._external_endpoint_ports[protocol] = internal_port
|
self._external_endpoint_ports[protocol] = internal_port
|
||||||
|
|
||||||
|
# notice this applies for both raw tcp and http, it is so that we can
|
||||||
|
# detect the host machine exposed ports, and register them on the router
|
||||||
|
external_host_port_mapping = self._get_runtime_properties().get(
|
||||||
|
self._external_endpoint_host_tcp_port_mapping["tcp_host_mapping"])
|
||||||
|
self._external_endpoint_ports["tcp_host_mapping"] = external_host_port_mapping
|
||||||
|
|
||||||
|
# check if we need to parse the port mapping, only if running on "bare-metal" host machine.
|
||||||
|
if self._external_endpoint_ports.get("tcp_host_mapping"):
|
||||||
|
external_host_port_mapping = self._external_endpoint_ports.get("tcp_host_mapping")
|
||||||
|
# format is docker standard port mapping format:
|
||||||
|
# example: "out:in,out_range100-out_range102:in_range0-in_range2"
|
||||||
|
# notice `out` in this context means the host port, the one that
|
||||||
|
# the router will route external traffic to
|
||||||
|
# noinspection PyBroadException
|
||||||
|
out_port = None
|
||||||
|
# noinspection PyBroadException
|
||||||
|
try:
|
||||||
|
for port_range in external_host_port_mapping.split(","):
|
||||||
|
out_range, in_range = port_range.split(":", 1)
|
||||||
|
out_range = out_range.split("-")
|
||||||
|
in_range = in_range.split("-")
|
||||||
|
if int(in_range[0]) <= port <= int(in_range[-1]):
|
||||||
|
# we found a match:
|
||||||
|
out_port = int(out_range[0]) + (port-int(in_range[0]))
|
||||||
|
print("INFO: Task.request_external_endpoint(...) changed requested external port to {}, "
|
||||||
|
"conforming to mapped external host ports [{} -> {}]".format(out_port, port, port_range))
|
||||||
|
break
|
||||||
|
|
||||||
|
if not out_port:
|
||||||
|
raise ValueError("match not found defaulting to original port")
|
||||||
|
except Exception:
|
||||||
|
print("WARNING: Task.request_external_endpoint(...) failed matching requested port to "
|
||||||
|
"mapped external host port [{} to {}], "
|
||||||
|
"proceeding with original port {}".format(port, external_host_port_mapping, port))
|
||||||
|
|
||||||
|
# change the requested port to the one we have on the machine
|
||||||
|
if out_port:
|
||||||
|
port = out_port
|
||||||
|
|
||||||
# check if we are trying to change the port - currently not allowed
|
# check if we are trying to change the port - currently not allowed
|
||||||
if self._external_endpoint_ports.get(protocol):
|
if self._external_endpoint_ports.get(protocol):
|
||||||
if self._external_endpoint_ports.get(protocol) == port:
|
if self._external_endpoint_ports.get(protocol) == port:
|
||||||
# we already set this endpoint, so do nothing
|
# we already set this endpoint, but we will set the values again, because maybe IP changed?!
|
||||||
return
|
pass
|
||||||
|
else:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
"Only one endpoint per protocol can be requested at the moment. Port already exposed is: {}".format(
|
"Only one endpoint per protocol can be requested at the moment. "
|
||||||
self._external_endpoint_ports.get(protocol)
|
"Port already exposed is: {}".format(self._external_endpoint_ports.get(protocol))
|
||||||
)
|
)
|
||||||
)
|
|
||||||
|
|
||||||
# mark for the router our request
|
# mark for the router our request
|
||||||
# noinspection PyProtectedMember
|
# noinspection PyProtectedMember
|
||||||
self._set_runtime_properties(
|
self._set_runtime_properties(
|
||||||
{
|
{
|
||||||
"_SERVICE": self._external_endpoint_service_map[protocol],
|
"_SERVICE": self._external_endpoint_service_map[protocol],
|
||||||
self._external_endpoint_address_map[protocol]: get_private_ip(),
|
self._external_endpoint_address_map[protocol]: HOST_MACHINE_IP.get() or get_private_ip(),
|
||||||
self._external_endpoint_port_map[protocol]: port,
|
self._external_endpoint_port_map[protocol]: port,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
# required system_tag for the router to catch the routing request
|
# required system_tag for the router to catch the routing request
|
||||||
self.set_system_tags((self.get_system_tags() or []) + ["external_service"])
|
self.set_system_tags(list(set((self.get_system_tags() or []) + ["external_service"])))
|
||||||
self._external_endpoint_ports[protocol] = port
|
self._external_endpoint_ports[protocol] = port
|
||||||
if wait:
|
if wait:
|
||||||
return self.wait_for_external_endpoint(
|
return self.wait_for_external_endpoint(
|
||||||
|
Loading…
Reference in New Issue
Block a user