Add docker port mapping parsing and reassigning feature support

Add initial component import from clearml-sdk for easier integration
This commit is contained in:
clearml 2025-02-24 13:24:35 +02:00
parent 8f28d2882a
commit 97cb47d48e
7 changed files with 293 additions and 6 deletions

View File

@ -181,6 +181,7 @@ ENV_DOCKER_HOST_MOUNT = EnvironmentConfig(
ENV_VENV_CACHE_PATH = EnvironmentConfig("CLEARML_AGENT_VENV_CACHE_PATH")
ENV_EXTRA_DOCKER_ARGS = EnvironmentConfig("CLEARML_AGENT_EXTRA_DOCKER_ARGS", type=list)
ENV_EXTRA_DOCKER_LABELS = EnvironmentConfig("CLEARML_AGENT_EXTRA_DOCKER_LABELS", type=list)
ENV_FORCE_HOST_MACHINE_IP = EnvironmentConfig("CLEARML_AGENT_FORCE_HOST_MACHINE_IP")
ENV_DEBUG_INFO = EnvironmentConfig("CLEARML_AGENT_DEBUG_INFO")
ENV_CHILD_AGENTS_COUNT_CMD = EnvironmentConfig("CLEARML_AGENT_CHILD_AGENTS_COUNT_CMD")
ENV_DOCKER_ARGS_FILTERS = EnvironmentConfig("CLEARML_AGENT_DOCKER_ARGS_FILTERS")

View File

@ -1,6 +1,6 @@
import re
import shlex
from typing import Tuple, List, TYPE_CHECKING
from typing import Tuple, List, TYPE_CHECKING, Optional
from urllib.parse import urlunparse, urlparse
from clearml_agent.definitions import (
@ -11,7 +11,10 @@ from clearml_agent.definitions import (
ENV_AGENT_AUTH_TOKEN,
ENV_DOCKER_IMAGE,
ENV_DOCKER_ARGS_HIDE_ENV,
ENV_FORCE_HOST_MACHINE_IP,
)
from clearml_agent.helper.sdk_client.utilities.networking import get_private_ip
from clearml_agent.helper.os.networking import TcpPorts
if TYPE_CHECKING:
from clearml_agent.session import Session
@ -42,6 +45,8 @@ def sanitize_urls(s: str) -> Tuple[str, bool]:
class DockerArgsSanitizer:
_machine_ip = None
@classmethod
def sanitize_docker_command(cls, session, docker_command):
# type: (Session, List[str]) -> List[str]
@ -108,14 +113,22 @@ class DockerArgsSanitizer:
return args
@staticmethod
def filter_switches(docker_args: List[str], exclude_switches: List[str]) -> List[str]:
def filter_switches(
docker_args: List[str],
exclude_switches: List[str] = None,
include_switches: List[str] = None
) -> List[str]:
assert not (include_switches and exclude_switches), "Either include_switches or exclude_switches but not both"
# shortcut if we are sure we have no matches
if (not exclude_switches or
not any("-{}".format(s) in " ".join(docker_args) for s in exclude_switches)):
if not include_switches and (
not exclude_switches or not any("-{}".format(s) in " ".join(docker_args) for s in exclude_switches)):
return docker_args
args = []
in_switch_args = True
in_switch_args = True if not include_switches else False
for token in docker_args:
if token.strip().startswith("-"):
if "=" in token:
@ -125,7 +138,10 @@ class DockerArgsSanitizer:
switch = token
in_switch_args = True
if switch.lstrip("-") in exclude_switches:
if not include_switches and switch.lstrip("-") in exclude_switches:
# if in excluded, skip the switch and following arguments
in_switch_args = False
elif not exclude_switches and switch.lstrip("-") not in include_switches:
# if in excluded, skip the switch and following arguments
in_switch_args = False
else:
@ -167,3 +183,90 @@ class DockerArgsSanitizer:
extra_docker_arguments = DockerArgsSanitizer.filter_switches(extra_docker_arguments, switches)
base_cmd += [a for a in extra_docker_arguments if a]
return base_cmd
@staticmethod
def resolve_port_mapping(config, docker_arguments: List[str]) -> Optional[tuple]:
"""
If we have port mappings in the docker cmd, this function will do two things
1. It will add an environment variable (CLEARML_AGENT_HOST_IP) with the host machines IP address
2. it will return a runtime property ("_external_host_tcp_port_mapping") on the Task with the port mapping merged
:param config:
:param docker_arguments:
:return: new docker commands with additional one to add docker
(i.e. changing the ports if needed and adding the new env var), runtime property
"""
if not docker_arguments:
return
# make a copy we are going to change it
docker_arguments = docker_arguments[:]
port_mapping_filtered = [
p for p in DockerArgsSanitizer.filter_switches(docker_arguments, include_switches=["p", "publish"])
if p and p.strip()
]
if not port_mapping_filtered:
return
# test if network=host was requested, docker will ignore published ports anyhow, so no use in parsing them
network_filtered = DockerArgsSanitizer.filter_switches(
docker_arguments, include_switches=["network", "net"])
network_filtered = [t for t in network_filtered if t.strip == "host" or "host" in t.split("=")]
# if any network is configured, we ignore it, there is nothing we can do
if network_filtered:
return
# verifying available ports, remapping if necessary
port_checks = TcpPorts()
for i_p in range(len(port_mapping_filtered)):
port_map = port_mapping_filtered[i_p]
if not port_map.strip():
continue
# skip the flag
if port_map.strip().startswith("-"):
continue
# todo: support udp?!
# example: "8080:80/udp"
if port_map.strip().split("/")[-1] == "udp":
continue
# either no type specified or tcp
ports_host, ports_in = port_map.strip().split("/")[0].split(":")[-2:]
# verify ports available
port_range = int(ports_host.split("-")[0]), int(ports_host.split("-")[-1])+1
if not all(port_checks.check_tcp_port_available(p) for p in range(port_range[0], port_range[1])):
# we need to find a new range (this is a consecutive range)
new_port_range = port_checks.find_port_range(port_range[1]-port_range[0])
if not new_port_range:
# we could not find any, leave it as it?!
break
# replace the ports,
for i in range(len(docker_arguments)):
if docker_arguments[i].strip() != port_map.strip():
continue
slash_parts = port_map.strip().split("/")
colon_parts = slash_parts[0].split(":")
colon_parts[-2] = "{}-{}".format(new_port_range[0], new_port_range[-1]) \
if len(new_port_range) > 1 else str(new_port_range[0])
docker_arguments[i] = "/".join(slash_parts[1:] + [":".join(colon_parts)])
port_mapping_filtered[i_p] = docker_arguments[i]
break
additional_cmd = []
if not DockerArgsSanitizer._machine_ip:
DockerArgsSanitizer._machine_ip = ENV_FORCE_HOST_MACHINE_IP.get() or get_private_ip(config)
if DockerArgsSanitizer._machine_ip:
additional_cmd += ["-e", "CLEARML_AGENT_HOST_IP={}".format(DockerArgsSanitizer._machine_ip)]
# sanitize, remove ip/type
ports = ",".join([":".join(t.strip().split("/")[0].split(":")[-2:])
for t in port_mapping_filtered if t.strip() and not t.strip().startswith("-")])
# update Tasks runtime
additional_task_runtime = {"_external_host_tcp_port_mapping": ports}
return docker_arguments+additional_cmd, additional_task_runtime

View File

@ -0,0 +1,42 @@
import psutil
class TcpPorts(object):
def __init__(self):
self._used_ports = sorted([i.laddr.port for i in psutil.net_connections()])
def check_tcp_port_available(self, port: int, remember_port: bool = True) -> bool:
"""
return True if the port is available
:param port: port number
:param remember_port: if True add the port into the used ports list
:return: True port is available
"""
if port in self._used_ports:
return False
if remember_port:
self._used_ports.append(port)
return True
def find_port_range(self, number_of_ports: int, remember_port: bool = True,
range_min: int = 10000, range_max: int = 60000) -> list:
ports = (i for i in range(range_min, range_max) if i not in self._used_ports)
new_allocation = []
for p in ports:
# find consecutive ports
if new_allocation and (new_allocation[-1]+1) != p:
new_allocation = []
new_allocation.append(p)
if len(new_allocation) == number_of_ports:
break
# check if we found enough
if len(new_allocation) != number_of_ports:
return []
if remember_port:
self._used_ports += new_allocation
return new_allocation

View File

@ -0,0 +1,98 @@
import requests
import socket
import subprocess
from typing import Optional
def get_private_ip(config_obj):
# type: (Config) -> str
"""
Get the private IP of this machine
:return: A string representing the IP of this machine
"""
approaches = (
_get_private_ip_from_socket,
_get_private_ip_from_subprocess,
)
for approach in approaches:
# noinspection PyBroadException
try:
return approach(config_obj)
except Exception:
continue
raise Exception("error getting private IP")
def get_public_ip(config_obj):
# type: (Config) -> Optional[str]
"""
Get the public IP of this machine. External services such as `https://api.ipify.org` or `https://ident.me`
are used to get the IP
:return: A string representing the IP of this machine or `None` if getting the IP failed
"""
# todo: add documentation in api section in conf file
public_ip_service_urls = (
(config_obj.get("api.public_ip_service_urls", None) if config_obj else None)
or ["https://api.ipify.org", "https://ident.me"]
)
for external_service in public_ip_service_urls:
ip = get_public_ip_from_external_service(external_service)
if ip:
return ip
return None
def get_public_ip_from_external_service(external_service, timeout=5):
# type: (str, Optional[int]) -> Optional[str]
"""
Get the public IP of this machine from an external service.
Fetching the IP is done via a GET request. The whole content of the request
should be the IP address
:param external_service: The address of the external service
:param timeout: The GET request timeout
:return: A string representing the IP of this machine or `None` if getting the IP failed
"""
# noinspection PyBroadException
try:
response = requests.get(external_service, timeout=timeout)
if not response.ok:
return None
ip = response.content.decode("utf8")
# check that we actually received an IP address
# noinspection PyBroadException
try:
socket.inet_pton(socket.AF_INET, ip)
return ip
except Exception:
socket.inet_pton(socket.AF_INET6, ip)
return ip
except Exception:
return None
def _get_private_ip_from_socket(config_obj):
# todo: add documentation in api section in conf file
public_ip_ping = (config_obj.get("api.public_ip_ping", None) if config_obj else None) or "8.8.8.8"
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(0)
try:
s.connect((public_ip_ping, 1))
ip = s.getsockname()[0]
except Exception as e:
raise e
finally:
s.close()
return ip
def _get_private_ip_from_subprocess(_):
return subprocess.check_output("hostname -I", shell=True).split()[0].decode("utf-8")

View File

@ -0,0 +1,43 @@
from typing import Optional
from ..backend_api.session import Request
class TaskRuntime(object):
def __init__(self, session):
self._session = session
def get_task_runtime(self, task_id) -> Optional[dict]:
try:
res = self._session.send_request(
service='tasks', action='get_by_id', method=Request.def_method,
json={"task": task_id, "only_fields": ["runtime"]},
)
if not res.ok:
raise ValueError(f"request returned {res.status_code}")
data = res.json().get("data")
if not data or "task" not in data:
raise ValueError("empty data in result")
return data["task"].get("runtime", {})
except Exception as ex:
print(f"ERROR: Failed getting runtime properties for task {task_id}: {ex}")
def update_task_runtime(self, task_id: str, runtime: dict) -> bool:
task_runtime = self.get_task_runtime(task_id) or {}
task_runtime.update(runtime)
try:
res = self._session.send_request(
service='tasks', action='edit', method=Request.def_method,
json={
"task": task_id, "force": True, "runtime": task_runtime
},
)
if not res.ok:
raise Exception("failed setting runtime property")
return True
except Exception as ex:
print("WARNING: failed setting custom runtime properties for task '{}': {}".format(task_id, ex))
return False