Add Task.request_external_endpoint() to request external endpoints on supported backends

This commit is contained in:
clearml 2024-10-09 22:44:19 +03:00
parent 2b6ab4edc8
commit 5e201334ba
2 changed files with 124 additions and 0 deletions

View File

@ -815,6 +815,13 @@ class Session(TokenManager):
def get_clients(cls):
return cls._client
@classmethod
def verify_feature_set(cls, feature_set):
if isinstance(feature_set, str):
feature_set = [feature_set]
if cls.feature_set not in feature_set:
raise ValueError("ClearML-server does not support requested feature set '{}'".format(feature_set))
@staticmethod
def _version_tuple(v):
v = tuple(map(int, (v.split("."))))

View File

@ -842,6 +842,123 @@ class Task(_Task):
task._set_startup_info()
return task
def request_external_endpoint(
self, port, protocol="http", wait=False, wait_interval_seconds=3.0, wait_timeout_seconds=90.0
):
# type: (int, str, bool, float, float) -> Optional[Dict]
"""
Request an external endpoint for an application
:param port: Port the application is listening to
:param protocol: As of now, only `http` is supported
:param wait: If True, wait for the endpoint to be assigned
:param wait_interval_seconds: The poll frequency when waiting for the endpoint
:param wait_timeout_seconds: If this timeout is exceeded while waiting for the endpoint,
the method will no longer wait and None will be returned
:return: If wait is False, this method will return None.
If no endpoint could be found while waiting, this mehtod returns None.
Otherwise, it returns a dictionary containing the following values:
- endpoint - raw endpoint. One might need to authenticate in order to use this endpoint
- browser_endpoint - endpoint to be used in browser. Authentication will be handled via the browser
- port - the port exposed by the application
- protocol - the protocol used by the endpoint
"""
Session.verify_feature_set("advanced")
if not getattr(self, "_external_endpoint_port", None):
self.reload()
assigned_port = self._get_runtime_properties().get("_PORT")
if assigned_port:
self._external_endpoint_port = assigned_port
if getattr(self, "_external_endpoint_port", None):
if self._external_endpoint_port != port: # noqa
raise ValueError(
"Only one endpoint can be requested at the moment. Port already exposed is: {}".format(
self._external_endpoint_port
)
)
return
# noinspection PyProtectedMember
self._set_runtime_properties(
{"_SERVICE": "EXTERNAL", "_ADDRESS": get_private_ip(), "_PORT": port}
)
self.set_system_tags((self.get_system_tags() or []) + ["external_service"])
self._external_endpoint_port = port
if wait:
return self.wait_for_external_endpoint(wait_interval_seconds=wait_interval_seconds)
return None
def wait_for_external_endpoint(self, wait_interval_seconds=3.0, wait_timeout_seconds=90.0):
# type: (float) -> Optional[Dict]
"""
Wait for an external endpoint to be assigned
:param wait_interval_seconds: The poll frequency when waiting for the endpoint
:param wait_timeout_seconds: If this timeout is exceeded while waiting for the endpoint,
the method will no longer wait
:return: If no endpoint could be found while waiting, this mehtod returns None.
Otherwise, it returns a dictionary containing the following values:
- endpoint - raw endpoint. One might need to authenticate in order to use this endpoint
- browser_endpoint - endpoint to be used in browser. Authentication will be handled via the browser
- port - the port exposed by the application
- protocol - the protocol used by the endpoint
"""
Session.verify_feature_set("advanced")
if not getattr(self, "_external_endpoint_port", None):
LoggerRoot.get_base_logger().warning("No external endpoints have been requested")
return None
start_time = time.time()
while True:
self.reload()
# noinspection PyProtectedMember
runtime_props = self._get_runtime_properties()
endpoint = runtime_props.get("endpoint")
browser_endpoint = runtime_props.get("browser_endpoint")
if not getattr(self, "_external_endpoint_port", None):
self._external_endpoint_port = runtime_props.get("_PORT")
if endpoint or browser_endpoint:
return {
"endpoint": endpoint,
"browser_endpoint": browser_endpoint,
"port": self._external_endpoint_port,
"protocol": "http",
}
if time.time() >= start_time + wait_timeout_seconds:
LoggerRoot.get_base_logger().warning("Timeout exceeded while waiting for endpoint")
return None
time.sleep(wait_interval_seconds)
def list_external_endpoints(self):
# type: () -> List[Dict]
"""
List all external endpoints assigned
:return: A list of dictionaries. Each dictionary contains the following values:
- endpoint - raw endpoint. One might need to authenticate in order to use this endpoint
- browser_endpoint - endpoint to be used in browser. Authentication will be handled via the browser
- port - the port exposed by the application
- protocol - the protocol used by the endpoint
"""
Session.verify_feature_set("advanced")
if not getattr(self, "_external_endpoint_port", None):
self.reload()
self._external_endpoint_port = self._get_runtime_properties().get("_PORT")
if not getattr(self, "_external_endpoint_port", None):
LoggerRoot.get_base_logger().warning("No external endpoints have been requested")
return []
runtime_props = self._get_runtime_properties()
endpoint = runtime_props.get("endpoint")
browser_endpoint = runtime_props.get("browser_endpoint")
return [
{
"endpoint": endpoint,
"browser_endpoint": browser_endpoint,
"port": self._external_endpoint_port,
"protocol": "http",
}
]
@classmethod
def create(
cls,