From 97cb47d48e3b86a518bbccd2ad1b002f239b101b Mon Sep 17 00:00:00 2001 From: clearml <> Date: Mon, 24 Feb 2025 13:24:35 +0200 Subject: [PATCH] Add docker port mapping parsing and reassigning feature support Add initial component import from clearml-sdk for easier integration --- clearml_agent/definitions.py | 1 + clearml_agent/helper/docker_args.py | 115 +++++++++++++++++- clearml_agent/helper/os/networking.py | 42 +++++++ clearml_agent/helper/sdk_client/__init__.py | 0 .../helper/sdk_client/utilities/__init__.py | 0 .../helper/sdk_client/utilities/networking.py | 98 +++++++++++++++ clearml_agent/helper/task_runtime.py | 43 +++++++ 7 files changed, 293 insertions(+), 6 deletions(-) create mode 100644 clearml_agent/helper/os/networking.py create mode 100644 clearml_agent/helper/sdk_client/__init__.py create mode 100644 clearml_agent/helper/sdk_client/utilities/__init__.py create mode 100644 clearml_agent/helper/sdk_client/utilities/networking.py create mode 100644 clearml_agent/helper/task_runtime.py diff --git a/clearml_agent/definitions.py b/clearml_agent/definitions.py index d115147..80bdced 100644 --- a/clearml_agent/definitions.py +++ b/clearml_agent/definitions.py @@ -181,6 +181,7 @@ ENV_DOCKER_HOST_MOUNT = EnvironmentConfig( ENV_VENV_CACHE_PATH = EnvironmentConfig("CLEARML_AGENT_VENV_CACHE_PATH") ENV_EXTRA_DOCKER_ARGS = EnvironmentConfig("CLEARML_AGENT_EXTRA_DOCKER_ARGS", type=list) ENV_EXTRA_DOCKER_LABELS = EnvironmentConfig("CLEARML_AGENT_EXTRA_DOCKER_LABELS", type=list) +ENV_FORCE_HOST_MACHINE_IP = EnvironmentConfig("CLEARML_AGENT_FORCE_HOST_MACHINE_IP") ENV_DEBUG_INFO = EnvironmentConfig("CLEARML_AGENT_DEBUG_INFO") ENV_CHILD_AGENTS_COUNT_CMD = EnvironmentConfig("CLEARML_AGENT_CHILD_AGENTS_COUNT_CMD") ENV_DOCKER_ARGS_FILTERS = EnvironmentConfig("CLEARML_AGENT_DOCKER_ARGS_FILTERS") diff --git a/clearml_agent/helper/docker_args.py b/clearml_agent/helper/docker_args.py index 0421251..4094d96 100644 --- a/clearml_agent/helper/docker_args.py +++ b/clearml_agent/helper/docker_args.py @@ -1,6 +1,6 @@ import re import shlex -from typing import Tuple, List, TYPE_CHECKING +from typing import Tuple, List, TYPE_CHECKING, Optional from urllib.parse import urlunparse, urlparse from clearml_agent.definitions import ( @@ -11,7 +11,10 @@ from clearml_agent.definitions import ( ENV_AGENT_AUTH_TOKEN, ENV_DOCKER_IMAGE, ENV_DOCKER_ARGS_HIDE_ENV, + ENV_FORCE_HOST_MACHINE_IP, ) +from clearml_agent.helper.sdk_client.utilities.networking import get_private_ip +from clearml_agent.helper.os.networking import TcpPorts if TYPE_CHECKING: from clearml_agent.session import Session @@ -42,6 +45,8 @@ def sanitize_urls(s: str) -> Tuple[str, bool]: class DockerArgsSanitizer: + _machine_ip = None + @classmethod def sanitize_docker_command(cls, session, docker_command): # type: (Session, List[str]) -> List[str] @@ -108,14 +113,22 @@ class DockerArgsSanitizer: return args @staticmethod - def filter_switches(docker_args: List[str], exclude_switches: List[str]) -> List[str]: + def filter_switches( + docker_args: List[str], + exclude_switches: List[str] = None, + include_switches: List[str] = None + ) -> List[str]: + + assert not (include_switches and exclude_switches), "Either include_switches or exclude_switches but not both" + # shortcut if we are sure we have no matches - if (not exclude_switches or - not any("-{}".format(s) in " ".join(docker_args) for s in exclude_switches)): + if not include_switches and ( + not exclude_switches or not any("-{}".format(s) in " ".join(docker_args) for s in exclude_switches)): return docker_args args = [] - in_switch_args = True + in_switch_args = True if not include_switches else False + for token in docker_args: if token.strip().startswith("-"): if "=" in token: @@ -125,7 +138,10 @@ class DockerArgsSanitizer: switch = token in_switch_args = True - if switch.lstrip("-") in exclude_switches: + if not include_switches and switch.lstrip("-") in exclude_switches: + # if in excluded, skip the switch and following arguments + in_switch_args = False + elif not exclude_switches and switch.lstrip("-") not in include_switches: # if in excluded, skip the switch and following arguments in_switch_args = False else: @@ -167,3 +183,90 @@ class DockerArgsSanitizer: extra_docker_arguments = DockerArgsSanitizer.filter_switches(extra_docker_arguments, switches) base_cmd += [a for a in extra_docker_arguments if a] return base_cmd + + @staticmethod + def resolve_port_mapping(config, docker_arguments: List[str]) -> Optional[tuple]: + """ + If we have port mappings in the docker cmd, this function will do two things + 1. It will add an environment variable (CLEARML_AGENT_HOST_IP) with the host machines IP address + 2. it will return a runtime property ("_external_host_tcp_port_mapping") on the Task with the port mapping merged + :param config: + :param docker_arguments: + :return: new docker commands with additional one to add docker + (i.e. changing the ports if needed and adding the new env var), runtime property + """ + if not docker_arguments: + return + # make a copy we are going to change it + docker_arguments = docker_arguments[:] + port_mapping_filtered = [ + p for p in DockerArgsSanitizer.filter_switches(docker_arguments, include_switches=["p", "publish"]) + if p and p.strip() + ] + + if not port_mapping_filtered: + return + + # test if network=host was requested, docker will ignore published ports anyhow, so no use in parsing them + network_filtered = DockerArgsSanitizer.filter_switches( + docker_arguments, include_switches=["network", "net"]) + network_filtered = [t for t in network_filtered if t.strip == "host" or "host" in t.split("=")] + # if any network is configured, we ignore it, there is nothing we can do + if network_filtered: + return + + # verifying available ports, remapping if necessary + port_checks = TcpPorts() + for i_p in range(len(port_mapping_filtered)): + port_map = port_mapping_filtered[i_p] + if not port_map.strip(): + continue + # skip the flag + if port_map.strip().startswith("-"): + continue + + # todo: support udp?! + # example: "8080:80/udp" + if port_map.strip().split("/")[-1] == "udp": + continue + + # either no type specified or tcp + ports_host, ports_in = port_map.strip().split("/")[0].split(":")[-2:] + # verify ports available + port_range = int(ports_host.split("-")[0]), int(ports_host.split("-")[-1])+1 + if not all(port_checks.check_tcp_port_available(p) for p in range(port_range[0], port_range[1])): + # we need to find a new range (this is a consecutive range) + new_port_range = port_checks.find_port_range(port_range[1]-port_range[0]) + + if not new_port_range: + # we could not find any, leave it as it?! + break + + # replace the ports, + for i in range(len(docker_arguments)): + if docker_arguments[i].strip() != port_map.strip(): + continue + slash_parts = port_map.strip().split("/") + colon_parts = slash_parts[0].split(":") + colon_parts[-2] = "{}-{}".format(new_port_range[0], new_port_range[-1]) \ + if len(new_port_range) > 1 else str(new_port_range[0]) + + docker_arguments[i] = "/".join(slash_parts[1:] + [":".join(colon_parts)]) + port_mapping_filtered[i_p] = docker_arguments[i] + break + + additional_cmd = [] + if not DockerArgsSanitizer._machine_ip: + DockerArgsSanitizer._machine_ip = ENV_FORCE_HOST_MACHINE_IP.get() or get_private_ip(config) + + if DockerArgsSanitizer._machine_ip: + additional_cmd += ["-e", "CLEARML_AGENT_HOST_IP={}".format(DockerArgsSanitizer._machine_ip)] + + # sanitize, remove ip/type + ports = ",".join([":".join(t.strip().split("/")[0].split(":")[-2:]) + for t in port_mapping_filtered if t.strip() and not t.strip().startswith("-")]) + + # update Tasks runtime + additional_task_runtime = {"_external_host_tcp_port_mapping": ports} + + return docker_arguments+additional_cmd, additional_task_runtime diff --git a/clearml_agent/helper/os/networking.py b/clearml_agent/helper/os/networking.py new file mode 100644 index 0000000..6bc7728 --- /dev/null +++ b/clearml_agent/helper/os/networking.py @@ -0,0 +1,42 @@ +import psutil + + +class TcpPorts(object): + + def __init__(self): + self._used_ports = sorted([i.laddr.port for i in psutil.net_connections()]) + + def check_tcp_port_available(self, port: int, remember_port: bool = True) -> bool: + """ + return True if the port is available + :param port: port number + :param remember_port: if True add the port into the used ports list + :return: True port is available + """ + if port in self._used_ports: + return False + if remember_port: + self._used_ports.append(port) + return True + + def find_port_range(self, number_of_ports: int, remember_port: bool = True, + range_min: int = 10000, range_max: int = 60000) -> list: + ports = (i for i in range(range_min, range_max) if i not in self._used_ports) + new_allocation = [] + for p in ports: + # find consecutive ports + if new_allocation and (new_allocation[-1]+1) != p: + new_allocation = [] + + new_allocation.append(p) + if len(new_allocation) == number_of_ports: + break + + # check if we found enough + if len(new_allocation) != number_of_ports: + return [] + + if remember_port: + self._used_ports += new_allocation + + return new_allocation diff --git a/clearml_agent/helper/sdk_client/__init__.py b/clearml_agent/helper/sdk_client/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/clearml_agent/helper/sdk_client/utilities/__init__.py b/clearml_agent/helper/sdk_client/utilities/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/clearml_agent/helper/sdk_client/utilities/networking.py b/clearml_agent/helper/sdk_client/utilities/networking.py new file mode 100644 index 0000000..2c27b2e --- /dev/null +++ b/clearml_agent/helper/sdk_client/utilities/networking.py @@ -0,0 +1,98 @@ +import requests +import socket +import subprocess +from typing import Optional + + +def get_private_ip(config_obj): + # type: (Config) -> str + """ + Get the private IP of this machine + + :return: A string representing the IP of this machine + """ + approaches = ( + _get_private_ip_from_socket, + _get_private_ip_from_subprocess, + ) + + for approach in approaches: + # noinspection PyBroadException + try: + return approach(config_obj) + except Exception: + continue + + raise Exception("error getting private IP") + + +def get_public_ip(config_obj): + # type: (Config) -> Optional[str] + """ + Get the public IP of this machine. External services such as `https://api.ipify.org` or `https://ident.me` + are used to get the IP + + :return: A string representing the IP of this machine or `None` if getting the IP failed + """ + + # todo: add documentation in api section in conf file + public_ip_service_urls = ( + (config_obj.get("api.public_ip_service_urls", None) if config_obj else None) + or ["https://api.ipify.org", "https://ident.me"] + ) + for external_service in public_ip_service_urls: + ip = get_public_ip_from_external_service(external_service) + if ip: + return ip + return None + + +def get_public_ip_from_external_service(external_service, timeout=5): + # type: (str, Optional[int]) -> Optional[str] + """ + Get the public IP of this machine from an external service. + Fetching the IP is done via a GET request. The whole content of the request + should be the IP address + + :param external_service: The address of the external service + :param timeout: The GET request timeout + + :return: A string representing the IP of this machine or `None` if getting the IP failed + """ + # noinspection PyBroadException + try: + response = requests.get(external_service, timeout=timeout) + if not response.ok: + return None + ip = response.content.decode("utf8") + # check that we actually received an IP address + # noinspection PyBroadException + try: + socket.inet_pton(socket.AF_INET, ip) + return ip + except Exception: + socket.inet_pton(socket.AF_INET6, ip) + return ip + except Exception: + return None + + +def _get_private_ip_from_socket(config_obj): + + # todo: add documentation in api section in conf file + public_ip_ping = (config_obj.get("api.public_ip_ping", None) if config_obj else None) or "8.8.8.8" + + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.settimeout(0) + try: + s.connect((public_ip_ping, 1)) + ip = s.getsockname()[0] + except Exception as e: + raise e + finally: + s.close() + return ip + + +def _get_private_ip_from_subprocess(_): + return subprocess.check_output("hostname -I", shell=True).split()[0].decode("utf-8") diff --git a/clearml_agent/helper/task_runtime.py b/clearml_agent/helper/task_runtime.py new file mode 100644 index 0000000..814cff5 --- /dev/null +++ b/clearml_agent/helper/task_runtime.py @@ -0,0 +1,43 @@ +from typing import Optional + +from ..backend_api.session import Request + + +class TaskRuntime(object): + + def __init__(self, session): + self._session = session + + def get_task_runtime(self, task_id) -> Optional[dict]: + try: + res = self._session.send_request( + service='tasks', action='get_by_id', method=Request.def_method, + json={"task": task_id, "only_fields": ["runtime"]}, + ) + if not res.ok: + raise ValueError(f"request returned {res.status_code}") + data = res.json().get("data") + if not data or "task" not in data: + raise ValueError("empty data in result") + return data["task"].get("runtime", {}) + except Exception as ex: + print(f"ERROR: Failed getting runtime properties for task {task_id}: {ex}") + + def update_task_runtime(self, task_id: str, runtime: dict) -> bool: + task_runtime = self.get_task_runtime(task_id) or {} + task_runtime.update(runtime) + + try: + res = self._session.send_request( + service='tasks', action='edit', method=Request.def_method, + json={ + "task": task_id, "force": True, "runtime": task_runtime + }, + ) + if not res.ok: + raise Exception("failed setting runtime property") + return True + except Exception as ex: + print("WARNING: failed setting custom runtime properties for task '{}': {}".format(task_id, ex)) + + return False