mirror of
https://github.com/clearml/clearml
synced 2025-03-13 15:20:50 +00:00
Add support for the ClearML HTTP router using Task.get_http_router()
This commit is contained in:
parent
749a80a70a
commit
ba492dd65d
0
clearml/router/__init__.py
Normal file
0
clearml/router/__init__.py
Normal file
235
clearml/router/endpoint_telemetry.py
Normal file
235
clearml/router/endpoint_telemetry.py
Normal file
@ -0,0 +1,235 @@
|
||||
from ..task import Task
|
||||
from ..utilities.resource_monitor import ResourceMonitor
|
||||
import uuid
|
||||
import time
|
||||
import copy
|
||||
from threading import Thread
|
||||
|
||||
|
||||
class EndpointTelemetry:
|
||||
BACKEND_STAT_MAP = {
|
||||
"cpu_usage_*": "cpu_usage",
|
||||
"cpu_temperature_*": "cpu_temperature",
|
||||
"disk_free_percent": "disk_free_home",
|
||||
"io_read_mbs": "disk_read",
|
||||
"io_write_mbs": "disk_write",
|
||||
"network_tx_mbs": "network_tx",
|
||||
"network_rx_mbs": "network_rx",
|
||||
"memory_free_gb": "memory_free",
|
||||
"memory_used_gb": "memory_used",
|
||||
"gpu_temperature_*": "gpu_temperature",
|
||||
"gpu_mem_used_gb_*": "gpu_memory_used",
|
||||
"gpu_mem_free_gb_*": "gpu_memory_free",
|
||||
"gpu_utilization_*": "gpu_usage",
|
||||
}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
endpoint_name="endpoint",
|
||||
model_name="model",
|
||||
model=None,
|
||||
model_url=None,
|
||||
model_source=None,
|
||||
model_version=None,
|
||||
app_id=None,
|
||||
app_instance=None,
|
||||
tags=None,
|
||||
system_tags=None,
|
||||
container_id=None,
|
||||
input_size=None,
|
||||
input_type="str",
|
||||
report_statistics=True,
|
||||
endpoint_url=None,
|
||||
preprocess_artifact=None,
|
||||
force_register=False
|
||||
):
|
||||
self.report_window = 30
|
||||
self._previous_readouts = {}
|
||||
self._previous_readouts_ts = time.time()
|
||||
self._num_readouts = 0
|
||||
self.container_info = {
|
||||
"container_id": container_id or str(uuid.uuid4()).replace("-", ""),
|
||||
"endpoint_name": endpoint_name,
|
||||
"model_name": model_name,
|
||||
"model_source": model_source,
|
||||
"model_version": model_version,
|
||||
"preprocess_artifact": preprocess_artifact,
|
||||
"input_type": str(input_type),
|
||||
"input_size": str(input_size),
|
||||
"tags": tags,
|
||||
"system_tags": system_tags,
|
||||
"endpoint_url": endpoint_url
|
||||
}
|
||||
references = []
|
||||
if app_id:
|
||||
references.append({"type": "app_id", "value": app_id})
|
||||
if app_instance:
|
||||
references.append({"type": "app_instance", "value": app_instance})
|
||||
references.append({"type": "task", "value": Task.current_task().id})
|
||||
if model:
|
||||
references.append({"type": "model", "value": model})
|
||||
if model_url:
|
||||
references.append({"type": "url", "value": model_url})
|
||||
self.container_info["reference"] = references
|
||||
self.session = Task._get_default_session()
|
||||
self.requests_num = 0
|
||||
self.requests_num_window = 0
|
||||
self.requests_num_prev_window = 0
|
||||
self.latency_sum_window = 0
|
||||
self.uptime_timestamp = time.time()
|
||||
self.last_request_time = None
|
||||
# use readily available resource monitor, otherwise create one (can happen in spawned subprocesses)
|
||||
self.resource_monitor = Task.current_task()._resource_monitor or ResourceMonitor(Task.current_task())
|
||||
if not container_id and not force_register:
|
||||
self.register_container()
|
||||
self._stop_container_status_report_daemon = False
|
||||
if report_statistics:
|
||||
Thread(target=self.container_status_report_daemon, daemon=True).start()
|
||||
|
||||
def stop(self):
|
||||
self._stop_container_status_report_daemon = True
|
||||
|
||||
def update(
|
||||
self,
|
||||
endpoint_name=None,
|
||||
model_name=None,
|
||||
model=None,
|
||||
model_url=None,
|
||||
model_source=None,
|
||||
model_version=None,
|
||||
tags=None,
|
||||
system_tags=None,
|
||||
input_size=None,
|
||||
input_type=None,
|
||||
endpoint_url=None,
|
||||
preprocess_artifact=None,
|
||||
):
|
||||
update_dict = {}
|
||||
if endpoint_name is not None:
|
||||
update_dict["endpoint_name"] = endpoint_name
|
||||
if model_name is not None:
|
||||
update_dict["model_name"] = model_name
|
||||
if model_source is not None:
|
||||
update_dict["model_source"] = model_source
|
||||
if model_version is not None:
|
||||
update_dict["model_version"] = model_version
|
||||
if preprocess_artifact is not None:
|
||||
update_dict["preprocess_artifact"] = preprocess_artifact
|
||||
if input_type is not None:
|
||||
update_dict["input_type"] = input_type
|
||||
if input_size is not None:
|
||||
update_dict["input_size"] = input_size
|
||||
if tags is not None:
|
||||
update_dict["tags"] = tags
|
||||
if system_tags is not None:
|
||||
update_dict["system_tags"] = system_tags
|
||||
if endpoint_url is not None:
|
||||
update_dict["endpoint_url"] = endpoint_url
|
||||
self.container_info.update(update_dict)
|
||||
references_to_add = {}
|
||||
if model:
|
||||
references_to_add["model"] = {"type": "model", "value": model}
|
||||
if model_url:
|
||||
references_to_add["model_url"] = {"type": "url", "value": model_url}
|
||||
for reference in self.container_info["reference"]:
|
||||
if reference["type"] in references_to_add:
|
||||
reference["value"] = references_to_add[reference["type"]]["value"]
|
||||
references_to_add.pop(reference["type"], None)
|
||||
self.container_info["reference"].extend(list(references_to_add.values()))
|
||||
|
||||
def register_container(self):
|
||||
result = self.session.send_request("serving", "register_container", json=self.container_info)
|
||||
if result.status_code != 200:
|
||||
print("Failed registering container: {}".format(result.json()))
|
||||
|
||||
def wait_for_endpoint_url(self):
|
||||
while not self.container_info.get("endpoint_url"):
|
||||
Task.current_task().reload()
|
||||
endpoint = Task.current_task()._get_runtime_properties().get("endpoint")
|
||||
if endpoint:
|
||||
self.container_info["endpoint_url"] = endpoint
|
||||
self.uptime_timestamp = time.time()
|
||||
else:
|
||||
time.sleep(1)
|
||||
|
||||
def get_machine_stats(self):
|
||||
def create_general_key(old_key):
|
||||
return "{}_*".format(old_key)
|
||||
|
||||
stats = self.resource_monitor._machine_stats()
|
||||
elapsed = time.time() - self._previous_readouts_ts
|
||||
self._previous_readouts_ts = time.time()
|
||||
updates = {}
|
||||
for k, v in stats.items():
|
||||
if k.endswith("_mbs"):
|
||||
v = (v - self._previous_readouts.get(k, v)) / elapsed
|
||||
updates[k] = v
|
||||
self._previous_readouts = copy.deepcopy(stats)
|
||||
stats.update(updates)
|
||||
self._num_readouts += 1
|
||||
|
||||
preprocessed_stats = {}
|
||||
ordered_keys = sorted(stats.keys())
|
||||
for k in ordered_keys:
|
||||
v = stats[k]
|
||||
if k in ["memory_used_gb", "memory_free_gb"]:
|
||||
v *= 1024
|
||||
if isinstance(v, float):
|
||||
v = round(v, 3)
|
||||
stat_key = self.BACKEND_STAT_MAP.get(k)
|
||||
if stat_key:
|
||||
preprocessed_stats[stat_key] = v
|
||||
else:
|
||||
general_key = create_general_key(k)
|
||||
if general_key.startswith("gpu"):
|
||||
prev_general_key = general_key
|
||||
general_key = "_".join(["gpu"] + general_key.split("_")[2:])
|
||||
if general_key == "gpu_mem_used_gb_*":
|
||||
gpu_index = prev_general_key.split("_")[1]
|
||||
mem_usage = min(stats["gpu_{}_mem_usage".format(gpu_index)], 99.99)
|
||||
mem_free = stats["gpu_{}_mem_free_gb".format(gpu_index)]
|
||||
v = (mem_usage * mem_free) / (100 - mem_usage)
|
||||
if general_key in ["gpu_mem_used_gb_*", "gpu_mem_free_gb_*"]:
|
||||
v *= 1024
|
||||
general_key = self.BACKEND_STAT_MAP.get(general_key)
|
||||
if general_key:
|
||||
preprocessed_stats.setdefault(general_key, []).append(v)
|
||||
return preprocessed_stats
|
||||
|
||||
def container_status_report_daemon(self):
|
||||
while not self._stop_container_status_report_daemon:
|
||||
self.container_status_report()
|
||||
time.sleep(self.report_window)
|
||||
|
||||
def container_status_report(self):
|
||||
self.wait_for_endpoint_url()
|
||||
status_report = {**self.container_info}
|
||||
status_report["uptime_sec"] = int(time.time() - self.uptime_timestamp)
|
||||
status_report["requests_num"] = self.requests_num
|
||||
status_report["requests_min"] = self.requests_num_window + self.requests_num_prev_window
|
||||
status_report["latency_ms"] = (
|
||||
0 if (self.requests_num_window == 0) else (self.latency_sum_window / self.requests_num_window)
|
||||
)
|
||||
status_report["machine_stats"] = self.get_machine_stats()
|
||||
self.requests_num_prev_window = self.requests_num_window
|
||||
self.requests_num_window = 0
|
||||
self.latency_sum_window = 0
|
||||
self.latency_num_window = 0
|
||||
result = self.session.send_request("serving", "container_status_report", json=status_report)
|
||||
if result.status_code != 200:
|
||||
print("Failed sending status report: {}".format(result.json()))
|
||||
|
||||
def update_last_request_time(self):
|
||||
self.last_request_time = time.time()
|
||||
|
||||
def update_statistics(self):
|
||||
self.requests_num += 1
|
||||
self.requests_num_window += 1
|
||||
latency = (time.time() - self.last_request_time) * 1000
|
||||
self.latency_sum_window += latency
|
||||
|
||||
def on_request(self):
|
||||
self.update_last_request_time()
|
||||
|
||||
def on_response(self):
|
||||
self.update_statistics()
|
184
clearml/router/fastapi_proxy.py
Normal file
184
clearml/router/fastapi_proxy.py
Normal file
@ -0,0 +1,184 @@
|
||||
from fastapi import FastAPI, Request, Response
|
||||
from typing import Optional
|
||||
from multiprocessing import Process
|
||||
from starlette.middleware.base import BaseHTTPMiddleware
|
||||
from starlette.routing import Match
|
||||
import functools
|
||||
import threading
|
||||
import httpx
|
||||
import uvicorn
|
||||
from .route import Route
|
||||
from ..utilities.process.mp import SafeQueue
|
||||
|
||||
|
||||
class FastAPIProxy:
|
||||
ALL_REST_METHODS = ["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"]
|
||||
|
||||
def __init__(self, port, workers=None, default_target=None):
|
||||
self.app = None
|
||||
self.routes = {}
|
||||
self.port = port
|
||||
self.message_queue = SafeQueue()
|
||||
self.uvicorn_subprocess = None
|
||||
self.workers = workers
|
||||
self._default_target = default_target
|
||||
self._default_session = None
|
||||
self._in_subprocess = False
|
||||
|
||||
def _create_default_route(self):
|
||||
proxy = self
|
||||
|
||||
class DefaultRouteMiddleware(BaseHTTPMiddleware):
|
||||
async def dispatch(self, request: Request, call_next):
|
||||
scope = {
|
||||
"type": "http",
|
||||
"method": request.method,
|
||||
"path": request.url.path,
|
||||
"root_path": "",
|
||||
"headers": request.headers.raw,
|
||||
"query_string": request.url.query.encode("utf-8"),
|
||||
"client": request.client,
|
||||
"server": request.scope.get("server"),
|
||||
"scheme": request.url.scheme,
|
||||
"extensions": request.scope.get("extensions", {}),
|
||||
"app": request.scope.get("app"),
|
||||
}
|
||||
for route in proxy.app.router.routes:
|
||||
if route.matches(scope)[0] == Match.FULL:
|
||||
return await call_next(request)
|
||||
proxied_response = await proxy._default_session.request(
|
||||
method=request.method,
|
||||
url=proxy._default_target + request.url.path,
|
||||
headers=dict(request.headers),
|
||||
content=await request.body(),
|
||||
params=request.query_params,
|
||||
)
|
||||
return Response(
|
||||
content=proxied_response.content,
|
||||
headers=dict(proxied_response.headers),
|
||||
status_code=proxied_response.status_code,
|
||||
)
|
||||
self.app.add_middleware(DefaultRouteMiddleware)
|
||||
|
||||
async def proxy(
|
||||
self,
|
||||
request: Request,
|
||||
path: Optional[str] = None,
|
||||
source_path: Optional[str] = None,
|
||||
):
|
||||
route_data = self.routes.get(source_path)
|
||||
if not route_data:
|
||||
return Response(status_code=404)
|
||||
|
||||
request = route_data.on_request(request)
|
||||
proxied_response = await route_data.session.request(
|
||||
method=request.method,
|
||||
url=f"{route_data.target_url}/{path}" if path else route_data.target_url,
|
||||
headers=dict(request.headers),
|
||||
content=await request.body(),
|
||||
params=request.query_params,
|
||||
)
|
||||
proxied_response = Response(
|
||||
content=proxied_response.content,
|
||||
headers=dict(proxied_response.headers),
|
||||
status_code=proxied_response.status_code,
|
||||
)
|
||||
return route_data.on_response(proxied_response, request)
|
||||
|
||||
def add_route(
|
||||
self,
|
||||
source,
|
||||
target,
|
||||
request_callback=None,
|
||||
response_callback=None,
|
||||
endpoint_telemetry=True
|
||||
):
|
||||
if not self._in_subprocess:
|
||||
self.message_queue.put(
|
||||
{
|
||||
"method": "add_route",
|
||||
"kwargs": {
|
||||
"source": source,
|
||||
"target": target,
|
||||
"request_callback": request_callback,
|
||||
"response_callback": response_callback,
|
||||
"endpoint_telemetry": endpoint_telemetry
|
||||
},
|
||||
}
|
||||
)
|
||||
return
|
||||
should_add_route = False
|
||||
if source not in self.routes:
|
||||
should_add_route = True
|
||||
else:
|
||||
self.routes[source].stop_endpoint_telemetry()
|
||||
self.routes[source] = Route(
|
||||
target,
|
||||
request_callback=request_callback,
|
||||
response_callback=response_callback,
|
||||
session=httpx.AsyncClient(timeout=None),
|
||||
)
|
||||
if endpoint_telemetry is True:
|
||||
endpoint_telemetry = {}
|
||||
if endpoint_telemetry is not False:
|
||||
self.routes[source].set_endpoint_telemetry_args(**endpoint_telemetry)
|
||||
if self._in_subprocess:
|
||||
self.routes[source].start_endpoint_telemetry()
|
||||
if should_add_route:
|
||||
self.app.add_api_route(
|
||||
source,
|
||||
functools.partial(
|
||||
self.proxy,
|
||||
source_path=source,
|
||||
),
|
||||
methods=self.ALL_REST_METHODS,
|
||||
)
|
||||
self.app.add_api_route(
|
||||
source.rstrip("/") + "/{path:path}",
|
||||
functools.partial(
|
||||
self.proxy,
|
||||
source_path=source,
|
||||
),
|
||||
methods=self.ALL_REST_METHODS,
|
||||
)
|
||||
return self.routes[source]
|
||||
|
||||
def remove_route(self, source):
|
||||
if not self._in_subprocess:
|
||||
self.message_queue.put({"method": "remove_route", "kwargs": {"source": source}})
|
||||
return
|
||||
route = self.routes.get(source)
|
||||
if route:
|
||||
route.stop_endpoint_telemetry()
|
||||
if source in self.routes:
|
||||
# we are not popping the key to prevent calling self.app.add_api_route multiple times
|
||||
# when self.add_route is called on the same source_path after removal
|
||||
self.routes[source] = None
|
||||
|
||||
def _start(self):
|
||||
self._in_subprocess = True
|
||||
self.app = FastAPI()
|
||||
if self._default_target:
|
||||
self._default_session = httpx.AsyncClient(timeout=None)
|
||||
self._create_default_route()
|
||||
for route in self.routes.values():
|
||||
route.start_endpoint_telemetry()
|
||||
threading.Thread(target=self._rpc_manager, daemon=True).start()
|
||||
uvicorn.run(self.app, port=self.port, host="0.0.0.0", workers=self.workers)
|
||||
|
||||
def _rpc_manager(self):
|
||||
while True:
|
||||
message = self.message_queue.get()
|
||||
if message["method"] == "add_route":
|
||||
self.add_route(**message["kwargs"])
|
||||
elif message["method"] == "remove_route":
|
||||
self.remove_route(**message["kwargs"])
|
||||
|
||||
def start(self):
|
||||
self.uvicorn_subprocess = Process(target=self._start)
|
||||
self.uvicorn_subprocess.start()
|
||||
|
||||
def stop(self):
|
||||
if self.uvicorn_subprocess:
|
||||
self.uvicorn_subprocess.terminate()
|
||||
self.uvicorn_subprocess = None
|
35
clearml/router/proxy.py
Normal file
35
clearml/router/proxy.py
Normal file
@ -0,0 +1,35 @@
|
||||
from .fastapi_proxy import FastAPIProxy
|
||||
|
||||
|
||||
class HttpProxy:
|
||||
DEFAULT_PORT = 9000
|
||||
|
||||
def __init__(self, port=None, workers=None, default_target=None):
|
||||
# at the moment, only a fastapi proxy is supported
|
||||
self.base_proxy = FastAPIProxy(port or self.DEFAULT_PORT, workers=workers, default_target=default_target)
|
||||
self.base_proxy.start()
|
||||
self.port = port
|
||||
self.routes = {}
|
||||
|
||||
def add_route(self, source, target, request_callback=None, response_callback=None, endpoint_telemetry=True):
|
||||
self.routes[source] = self.base_proxy.add_route(
|
||||
source=source,
|
||||
target=target,
|
||||
request_callback=request_callback,
|
||||
response_callback=response_callback,
|
||||
endpoint_telemetry=endpoint_telemetry,
|
||||
)
|
||||
return self.routes[source]
|
||||
|
||||
def remove_route(self, source):
|
||||
self.routes.pop(source, None)
|
||||
self.base_proxy.remove_route(source)
|
||||
|
||||
def get_routes(self):
|
||||
return self.routes
|
||||
|
||||
def start(self):
|
||||
self.base_proxy.start()
|
||||
|
||||
def stop(self):
|
||||
self.base_proxy.stop()
|
79
clearml/router/route.py
Normal file
79
clearml/router/route.py
Normal file
@ -0,0 +1,79 @@
|
||||
from .endpoint_telemetry import EndpointTelemetry
|
||||
|
||||
|
||||
class Route:
|
||||
def __init__(self, target_url, request_callback=None, response_callback=None, session=None):
|
||||
self.target_url = target_url
|
||||
self.request_callback = request_callback
|
||||
self.response_callback = response_callback
|
||||
self.session = session
|
||||
self.persistent_state = {}
|
||||
self._endpoint_telemetry = None
|
||||
self._endpoint_telemetry_args = None
|
||||
|
||||
def set_endpoint_telemetry_args(
|
||||
self,
|
||||
endpoint_name="endpoint",
|
||||
model_name="model",
|
||||
model=None,
|
||||
model_url=None,
|
||||
model_source=None,
|
||||
model_version=None,
|
||||
app_id=None,
|
||||
app_instance=None,
|
||||
tags=None,
|
||||
system_tags=None,
|
||||
container_id=None,
|
||||
input_size=None,
|
||||
input_type="str",
|
||||
report_statistics=True,
|
||||
endpoint_url=None,
|
||||
preprocess_artifact=None,
|
||||
force_register=False
|
||||
):
|
||||
self._endpoint_telemetry_args = dict(
|
||||
endpoint_name=endpoint_name,
|
||||
model_name=model_name,
|
||||
model=model,
|
||||
model_url=model_url,
|
||||
model_source=model_source,
|
||||
model_version=model_version,
|
||||
app_id=app_id,
|
||||
app_instance=app_instance,
|
||||
tags=tags,
|
||||
system_tags=system_tags,
|
||||
container_id=container_id,
|
||||
input_size=input_size,
|
||||
input_type=input_type,
|
||||
report_statistics=report_statistics,
|
||||
endpoint_url=endpoint_url,
|
||||
preprocess_artifact=preprocess_artifact,
|
||||
force_register=force_register
|
||||
)
|
||||
|
||||
def start_endpoint_telemetry(self):
|
||||
if self._endpoint_telemetry is not None or self._endpoint_telemetry_args is None:
|
||||
return
|
||||
self._endpoint_telemetry = EndpointTelemetry(**self._endpoint_telemetry_args)
|
||||
|
||||
def stop_endpoint_telemetry(self):
|
||||
if self._endpoint_telemetry is None:
|
||||
return
|
||||
self._endpoint_telemetry.stop()
|
||||
self._endpoint_telemetry = None
|
||||
|
||||
def on_request(self, request):
|
||||
new_request = request
|
||||
if self.request_callback:
|
||||
new_request = self.request_callback(request, persistent_state=self.persistent_state) or request
|
||||
if self._endpoint_telemetry:
|
||||
self._endpoint_telemetry.on_request()
|
||||
return new_request
|
||||
|
||||
def on_response(self, response, request):
|
||||
new_response = response
|
||||
if self.response_callback:
|
||||
new_response = self.response_callback(response, request, persistent_state=self.persistent_state) or response
|
||||
if self._endpoint_telemetry:
|
||||
self._endpoint_telemetry.on_response()
|
||||
return new_response
|
203
clearml/router/router.py
Normal file
203
clearml/router/router.py
Normal file
@ -0,0 +1,203 @@
|
||||
from typing import Optional, Callable, Dict, Union # noqa
|
||||
from fastapi import Request, Response # noqa
|
||||
from .proxy import HttpProxy
|
||||
|
||||
|
||||
class HttpRouter:
|
||||
"""
|
||||
A router class to manage HTTP routing for an application.
|
||||
Allows the creation, deployment, and management of local and external endpoints,
|
||||
as well as the configuration of a local proxy for traffic routing.
|
||||
|
||||
Example usage:
|
||||
|
||||
.. code-block:: py
|
||||
def request_callback(request, persistent_state):
|
||||
persistent_state["last_request_time"] = time.time()
|
||||
|
||||
def response_callback(response, request, persistent_state):
|
||||
print("Latency:", time.time() - persistent_state["last_request_time"])
|
||||
if urllib.parse.urlparse(str(request.url).rstrip("/")).path == "/modify":
|
||||
new_content = response.body.replace(b"modify", b"modified")
|
||||
headers = copy.deepcopy(response.headers)
|
||||
headers["Content-Length"] = str(len(new_content))
|
||||
return Response(status_code=response.status_code, headers=headers, content=new_content)
|
||||
|
||||
router = Task.current_task().get_http_router()
|
||||
router.set_local_proxy_parameters(incoming_port=9000)
|
||||
router.create_local_route(
|
||||
source="/",
|
||||
target="http://localhost:8000",
|
||||
request_callback=request_callback,
|
||||
response_callback=response_callback,
|
||||
endpoint_telemetry={"model": "MyModel"}
|
||||
)
|
||||
router.deploy(wait=True)
|
||||
"""
|
||||
_instance = None
|
||||
|
||||
def __init__(self, task):
|
||||
"""
|
||||
Do not use directly. Use `Task.get_router` instead
|
||||
"""
|
||||
self._task = task
|
||||
self._external_endpoint_port = None
|
||||
self._proxy = None
|
||||
self._proxy_params = {"port": HttpProxy.DEFAULT_PORT}
|
||||
|
||||
def set_local_proxy_parameters(self, incoming_port=None, default_target=None):
|
||||
# type: (Optional[int], Optional[str]) -> ()
|
||||
"""
|
||||
Set the parameters with which the local proxy is initialized
|
||||
|
||||
:param incoming_port: The incoming port of the proxy
|
||||
:param default_target: If None, no default target is set. Otherwise, route all traffic
|
||||
that doesn't match a local route created via `create_local_route` to this target
|
||||
"""
|
||||
self._proxy_params["port"] = incoming_port or HttpProxy.DEFAULT_PORT
|
||||
self._proxy_params["default_target"] = default_target
|
||||
|
||||
def start_local_proxy(self):
|
||||
"""
|
||||
Start the local proxy without deploying the router, i.e. requesting an external endpoint
|
||||
"""
|
||||
self._proxy = self._proxy or HttpProxy(**self._proxy_params)
|
||||
|
||||
def create_local_route(
|
||||
self,
|
||||
source, # type: str
|
||||
target, # type: str
|
||||
request_callback=None, # type: Callable[Request, Dict]
|
||||
response_callback=None, # type: Callable[Response, Request, Dict]
|
||||
endpoint_telemetry=True # type: Union[bool, Dict]
|
||||
):
|
||||
"""
|
||||
Create a local route from a source to a target through a proxy. If no proxy instance
|
||||
exists, one is automatically created.
|
||||
This function enables routing traffic between the source and target endpoints, optionally
|
||||
allowing custom callbacks to handle requests and responses or to gather telemetry data
|
||||
at the endpoint.
|
||||
To customize proxy parameters, use the `Router.set_local_proxy_parameters` method.
|
||||
By default, the proxy binds to port 9000 for incoming requests.
|
||||
|
||||
:param source: The source path for routing the traffic. For example, `/` will intercept
|
||||
all the traffic sent to the proxy, while `/example` will only intercept the calls
|
||||
that have `/example` as the path prefix.
|
||||
:param target: The target URL where the intercepted traffic is routed.
|
||||
:param request_callback: A function used to process each request before it is forwarded to the target.
|
||||
The callback must have the following parameters:
|
||||
- request - The intercepted FastAPI request
|
||||
- persistent_state - A dictionary meant to be used as a caching utility object.
|
||||
Shared with `response_callback`
|
||||
The callback can return a FastAPI Request, in which case this request will be forwarded to the target
|
||||
:param response_callback: A function used to process each response before it is returned by the proxy.
|
||||
The callback must have the following parameters:
|
||||
- response - The FastAPI response
|
||||
- request - The FastAPI request (after being preprocessed by the proxy)
|
||||
- persistent_state - A dictionary meant to be used as a caching utility object.
|
||||
Shared with `request_callback`
|
||||
The callback can return a FastAPI Response, in which case this response will be forwarded
|
||||
:param endpoint_telemetry: If True, enable endpoint telemetry. If False, disable it.
|
||||
If a dictionary is passed, enable endpoint telemetry with custom parameters.
|
||||
The parameters are:
|
||||
- endpoint_url - URL to the endpoint, mandatory if no external URL has been requested
|
||||
- endpoint_name - name of the endpoint
|
||||
- model_name - name of the model served by the endpoint
|
||||
- model - referenced model
|
||||
- model_url - URL to the model
|
||||
- model_source - Source of the model
|
||||
- model_version - Model version
|
||||
- app_id - App ID, if used inside a ClearML app
|
||||
- app_instance - The ID of the instance the ClearML app is running
|
||||
- tags - ClearML tags
|
||||
- system_tags - ClearML system tags
|
||||
- container_id - Container ID, should be unique
|
||||
- input_size - input size of the model
|
||||
- input_type - input type expected by the model/endpoint
|
||||
- report_statistics - whether or not to report statistics
|
||||
"""
|
||||
self.start_local_proxy()
|
||||
self._proxy.add_route(
|
||||
source,
|
||||
target,
|
||||
request_callback=request_callback,
|
||||
response_callback=response_callback,
|
||||
endpoint_telemetry=endpoint_telemetry,
|
||||
)
|
||||
|
||||
def remove_local_route(self, source):
|
||||
# type: (str) -> ()
|
||||
"""
|
||||
Remove a local route. If endpoint telemetry is enabled for that route, disable it
|
||||
|
||||
:param source: Remove route based on the source path used to route the traffic
|
||||
"""
|
||||
if self._proxy:
|
||||
self._proxy.remove_route(source)
|
||||
|
||||
def deploy(
|
||||
self, wait=False, wait_interval_seconds=3.0, wait_timeout_seconds=90.0
|
||||
):
|
||||
# type: (Optional[int], str, bool, float, float) -> Optional[Dict]
|
||||
"""
|
||||
Start the local HTTP proxy and request an external endpoint for an application
|
||||
|
||||
:param port: Port the application is listening to. If no port is supplied, a local proxy
|
||||
will be created. To control the proxy parameters, use `Router.set_local_proxy_parameters`.
|
||||
To control create local routes through the proxy, use `Router.create_local_route`.
|
||||
By default, the incoming port bound by the proxy is 9000
|
||||
:param protocol: As of now, only `http` is supported
|
||||
:param wait: If True, wait for the endpoint to be assigned
|
||||
:param wait_interval_seconds: The poll frequency when waiting for the endpoint
|
||||
:param wait_timeout_seconds: If this timeout is exceeded while waiting for the endpoint,
|
||||
the method will no longer wait and None will be returned
|
||||
|
||||
:return: If wait is False, this method will return None.
|
||||
If no endpoint could be found while waiting, this mehtod returns None.
|
||||
Otherwise, it returns a dictionary containing the following values:
|
||||
- endpoint - raw endpoint. One might need to authenticate in order to use this endpoint
|
||||
- browser_endpoint - endpoint to be used in browser. Authentication will be handled via the browser
|
||||
- port - the port exposed by the application
|
||||
- protocol - the protocol used by the endpo"int
|
||||
"""
|
||||
self._proxy = self._proxy or HttpProxy(**self._proxy_params)
|
||||
return self._task.request_external_endpoint(
|
||||
port=self._proxy.port,
|
||||
protocol="http",
|
||||
wait=wait,
|
||||
wait_interval_seconds=wait_interval_seconds,
|
||||
wait_timeout_seconds=wait_timeout_seconds,
|
||||
)
|
||||
|
||||
def wait_for_external_endpoint(self, wait_interval_seconds=3.0, wait_timeout_seconds=90.0):
|
||||
# type: (float) -> Optional[Dict]
|
||||
"""
|
||||
Wait for an external endpoint to be assigned
|
||||
|
||||
:param wait_interval_seconds: The poll frequency when waiting for the endpoint
|
||||
:param wait_timeout_seconds: If this timeout is exceeded while waiting for the endpoint,
|
||||
the method will no longer wait
|
||||
|
||||
:return: If no endpoint could be found while waiting, this mehtod returns None.
|
||||
Otherwise, it returns a dictionary containing the following values:
|
||||
- endpoint - raw endpoint. One might need to authenticate in order to use this endpoint
|
||||
- browser_endpoint - endpoint to be used in browser. Authentication will be handled via the browser
|
||||
- port - the port exposed by the application
|
||||
- protocol - the protocol used by the endpoint
|
||||
"""
|
||||
return self._task.wait_for_external_endpoint(
|
||||
protocol="http", wait_interval_seconds=wait_interval_seconds, wait_timeout_seconds=wait_timeout_seconds
|
||||
)
|
||||
|
||||
def list_external_endpoints(self):
|
||||
# type: () -> List[Dict]
|
||||
"""
|
||||
List all external endpoints assigned
|
||||
|
||||
:return: A list of dictionaries. Each dictionary contains the following values:
|
||||
- endpoint - raw endpoint. One might need to authenticate in order to use this endpoint
|
||||
- browser_endpoint - endpoint to be used in browser. Authentication will be handled via the browser
|
||||
- port - the port exposed by the application
|
||||
- protocol - the protocol used by the endpoint
|
||||
"""
|
||||
return self._task.list_external_endpoints(protocol="http")
|
@ -113,6 +113,7 @@ if TYPE_CHECKING:
|
||||
import pandas
|
||||
import numpy
|
||||
from PIL import Image
|
||||
from .router.router import HttpRouter
|
||||
|
||||
# Forward declaration to help linters
|
||||
TaskInstance = TypeVar("TaskInstance", bound="Task")
|
||||
@ -224,6 +225,7 @@ class Task(_Task):
|
||||
self._calling_filename = None
|
||||
self._remote_functions_generated = {}
|
||||
self._external_endpoint_ports = {}
|
||||
self._http_router = None
|
||||
# register atexit, so that we mark the task as stopped
|
||||
self._at_exit_called = False
|
||||
|
||||
@ -848,6 +850,49 @@ class Task(_Task):
|
||||
task._set_startup_info()
|
||||
return task
|
||||
|
||||
def get_http_router(self):
|
||||
# type: () -> HttpRouter
|
||||
"""
|
||||
Retrieve an instance of `HttpRouter` to manage an external HTTP endpoint and intercept traffic.
|
||||
The `HttpRouter` serves as a traffic manager, enabling the creation and configuration of local and external
|
||||
routesto redirect, monitor, or manipulate HTTP requests and responses. It is designed to handle routing
|
||||
needs such via a proxy setup which handles request/response interception and telemetry reporting for
|
||||
applications that require HTTP endpoint management.
|
||||
|
||||
Example usage:
|
||||
|
||||
.. code-block:: py
|
||||
def request_callback(request, persistent_state):
|
||||
persistent_state["last_request_time"] = time.time()
|
||||
|
||||
def response_callback(response, request, persistent_state):
|
||||
print("Latency:", time.time() - persistent_state["last_request_time"])
|
||||
if urllib.parse.urlparse(str(request.url).rstrip("/")).path == "/modify":
|
||||
new_content = response.body.replace(b"modify", b"modified")
|
||||
headers = copy.deepcopy(response.headers)
|
||||
headers["Content-Length"] = str(len(new_content))
|
||||
return Response(status_code=response.status_code, headers=headers, content=new_content)
|
||||
|
||||
router = Task.current_task().get_http_router()
|
||||
router.set_local_proxy_parameters(incoming_port=9000)
|
||||
router.create_local_route(
|
||||
source="/",
|
||||
target="http://localhost:8000",
|
||||
request_callback=request_callback,
|
||||
response_callback=response_callback,
|
||||
endpoint_telemetry={"model": "MyModel"}
|
||||
)
|
||||
router.deploy(wait=True)
|
||||
"""
|
||||
try:
|
||||
from .router.router import HttpRouter # noqa
|
||||
except ImportError:
|
||||
raise UsageError("Could not import `HttpRouter`. Please run `pip install clearml[router]`")
|
||||
|
||||
if self._http_router is None:
|
||||
self._http_router = HttpRouter(self)
|
||||
return self._http_router
|
||||
|
||||
def request_external_endpoint(
|
||||
self, port, protocol="http", wait=False, wait_interval_seconds=3.0, wait_timeout_seconds=90.0
|
||||
):
|
||||
@ -996,11 +1041,14 @@ class Task(_Task):
|
||||
return None
|
||||
time.sleep(wait_interval_seconds)
|
||||
|
||||
def list_external_endpoints(self):
|
||||
# type: () -> List[Dict]
|
||||
def list_external_endpoints(self, protocol=None):
|
||||
# type: (Optional[str]) -> List[Dict]
|
||||
"""
|
||||
List all external endpoints assigned
|
||||
|
||||
:param protocol: If None, list all external endpoints. Otherwise, only list endpoints
|
||||
that use this protocol
|
||||
|
||||
:return: A list of dictionaries. Each dictionary contains the following values:
|
||||
- endpoint - raw endpoint. One might need to authenticate in order to use this endpoint
|
||||
- browser_endpoint - endpoint to be used in browser. Authentication will be handled via the browser
|
||||
@ -1010,7 +1058,8 @@ class Task(_Task):
|
||||
Session.verify_feature_set("advanced")
|
||||
runtime_props = self._get_runtime_properties()
|
||||
results = []
|
||||
for protocol in ["http", "tcp"]:
|
||||
protocols = [protocol] if protocol is not None else ["http", "tcp"]
|
||||
for protocol in protocols:
|
||||
internal_port = runtime_props.get(self._external_endpoint_internal_port_map[protocol])
|
||||
if internal_port:
|
||||
self._external_endpoint_ports[protocol] = internal_port
|
||||
|
42
examples/router/http_router.py
Normal file
42
examples/router/http_router.py
Normal file
@ -0,0 +1,42 @@
|
||||
"""
|
||||
Example on how to use the ClearML HTTP router.
|
||||
For this example, you would first need a webserver to route the traffic to:
|
||||
`simple_webserver.py` launches such a server. Running the script will start a
|
||||
webserver, bound to localhost:8000.
|
||||
|
||||
Then, when running this example, it creates a router which binds to 0.0.0.0:9000.
|
||||
A local route is then created, which will proxy all traffic from
|
||||
`http://<PRIVATE_IP>:9000/example_source` to `http://localhost:8000/serve`.
|
||||
|
||||
Trafic can be intercepted both on request and response via callbacks. See
|
||||
`request_callback` and `response_callback`.
|
||||
|
||||
By default, the route traffic is monitored and telemetry is sent to the ClearML
|
||||
server. To disable this, pass `endpoint_telemetry=False` when creating the route
|
||||
"""
|
||||
|
||||
import time
|
||||
from clearml import Task
|
||||
|
||||
|
||||
def request_callback(request, persistent_state):
|
||||
persistent_state["last_request_time"] = time.time()
|
||||
|
||||
|
||||
def response_callback(response, request, persistent_state):
|
||||
print("Latency:", time.time() - persistent_state["last_request_time"])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
task = Task.init(project_name="Router Example", task_name="Router Example")
|
||||
router = task.get_http_router()
|
||||
router.set_local_proxy_parameters(incoming_port=9000, default_target="http://localhost:8000")
|
||||
router.create_local_route(
|
||||
source="/example_source",
|
||||
target="http://localhost:8000/serve", # route traffic to this address
|
||||
request_callback=request_callback, # intercept requests
|
||||
response_callback=response_callback, # intercept responses
|
||||
endpoint_telemetry={"model": "MyModel"} # set this to False to disable telemetry
|
||||
)
|
||||
router.deploy(wait=True)
|
||||
# run `curl http://localhost:9000/example_source/1`
|
2
examples/router/requirements.txt
Normal file
2
examples/router/requirements.txt
Normal file
@ -0,0 +1,2 @@
|
||||
clearml
|
||||
clearml[router]
|
44
examples/router/simple_webserver.py
Normal file
44
examples/router/simple_webserver.py
Normal file
@ -0,0 +1,44 @@
|
||||
"""
|
||||
A simple webserver, used as a tool to showcase the capabilities of
|
||||
ClearML HTTP router. See `http_router.py` for more details.
|
||||
"""
|
||||
|
||||
|
||||
from fastapi import FastAPI, HTTPException
|
||||
from pydantic import BaseModel
|
||||
import uvicorn
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
actions = {
|
||||
"1": {"name": "Action 1", "description": "This is model action 1"},
|
||||
"2": {"name": "Action 2", "description": "This is model action 2"},
|
||||
"3": {"name": "Action 3", "description": "This is model action 3"},
|
||||
}
|
||||
|
||||
|
||||
class Item(BaseModel):
|
||||
name: str
|
||||
description: str
|
||||
|
||||
|
||||
@app.get("/")
|
||||
def read_root():
|
||||
return {"message": "Welcome to the FastAPI application!"}
|
||||
|
||||
|
||||
@app.get("/serve/{action}", response_model=Item)
|
||||
def read_item(action: str):
|
||||
if action in actions:
|
||||
return actions[action]
|
||||
else:
|
||||
raise HTTPException(status_code=404, detail="Item not found")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
uvicorn.run(
|
||||
"simple_webserver:app",
|
||||
host="127.0.0.1",
|
||||
port=8000,
|
||||
reload=True
|
||||
)
|
Loading…
Reference in New Issue
Block a user