mirror of
https://github.com/open-webui/open-webui
synced 2025-04-26 09:10:15 +00:00
Some checks are pending
Deploy to HuggingFace Spaces / check-secret (push) Waiting to run
Deploy to HuggingFace Spaces / deploy (push) Blocked by required conditions
Create and publish Docker images with specific build args / build-main-image (linux/amd64) (push) Waiting to run
Create and publish Docker images with specific build args / build-main-image (linux/arm64) (push) Waiting to run
Create and publish Docker images with specific build args / build-cuda-image (linux/amd64) (push) Waiting to run
Create and publish Docker images with specific build args / build-cuda-image (linux/arm64) (push) Waiting to run
Create and publish Docker images with specific build args / build-ollama-image (linux/amd64) (push) Waiting to run
Create and publish Docker images with specific build args / build-ollama-image (linux/arm64) (push) Waiting to run
Create and publish Docker images with specific build args / merge-main-images (push) Blocked by required conditions
Create and publish Docker images with specific build args / merge-cuda-images (push) Blocked by required conditions
Create and publish Docker images with specific build args / merge-ollama-images (push) Blocked by required conditions
Python CI / Format Backend (3.11) (push) Waiting to run
Frontend Build / Format & Build Frontend (push) Waiting to run
Frontend Build / Frontend Unit Tests (push) Waiting to run
554 lines
19 KiB
Python
554 lines
19 KiB
Python
import inspect
|
|
import logging
|
|
import re
|
|
import inspect
|
|
import aiohttp
|
|
import asyncio
|
|
|
|
from typing import Any, Awaitable, Callable, get_type_hints, Dict, List, Union, Optional
|
|
from functools import update_wrapper, partial
|
|
|
|
|
|
from fastapi import Request
|
|
from pydantic import BaseModel, Field, create_model
|
|
from langchain_core.utils.function_calling import convert_to_openai_function
|
|
|
|
|
|
from open_webui.models.tools import Tools
|
|
from open_webui.models.users import UserModel
|
|
from open_webui.utils.plugin import load_tools_module_by_id
|
|
|
|
import copy
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
def get_async_tool_function_and_apply_extra_params(
|
|
function: Callable, extra_params: dict
|
|
) -> Callable[..., Awaitable]:
|
|
sig = inspect.signature(function)
|
|
extra_params = {k: v for k, v in extra_params.items() if k in sig.parameters}
|
|
partial_func = partial(function, **extra_params)
|
|
|
|
if inspect.iscoroutinefunction(function):
|
|
update_wrapper(partial_func, function)
|
|
return partial_func
|
|
else:
|
|
# Make it a coroutine function
|
|
async def new_function(*args, **kwargs):
|
|
return partial_func(*args, **kwargs)
|
|
|
|
update_wrapper(new_function, function)
|
|
return new_function
|
|
|
|
|
|
def get_tools(
|
|
request: Request, tool_ids: list[str], user: UserModel, extra_params: dict
|
|
) -> dict[str, dict]:
|
|
tools_dict = {}
|
|
|
|
for tool_id in tool_ids:
|
|
tool = Tools.get_tool_by_id(tool_id)
|
|
if tool is None:
|
|
if tool_id.startswith("server:"):
|
|
server_idx = int(tool_id.split(":")[1])
|
|
tool_server_connection = (
|
|
request.app.state.config.TOOL_SERVER_CONNECTIONS[server_idx]
|
|
)
|
|
tool_server_data = request.app.state.TOOL_SERVERS[server_idx]
|
|
specs = tool_server_data.get("specs", [])
|
|
|
|
for spec in specs:
|
|
function_name = spec["name"]
|
|
|
|
auth_type = tool_server_connection.get("auth_type", "bearer")
|
|
token = None
|
|
|
|
if auth_type == "bearer":
|
|
token = tool_server_connection.get("key", "")
|
|
elif auth_type == "session":
|
|
token = request.state.token.credentials
|
|
|
|
def make_tool_function(function_name, token, tool_server_data):
|
|
async def tool_function(**kwargs):
|
|
print(
|
|
f"Executing tool function {function_name} with params: {kwargs}"
|
|
)
|
|
return await execute_tool_server(
|
|
token=token,
|
|
url=tool_server_data["url"],
|
|
name=function_name,
|
|
params=kwargs,
|
|
server_data=tool_server_data,
|
|
)
|
|
|
|
return tool_function
|
|
|
|
tool_function = make_tool_function(
|
|
function_name, token, tool_server_data
|
|
)
|
|
|
|
callable = get_async_tool_function_and_apply_extra_params(
|
|
tool_function,
|
|
{},
|
|
)
|
|
|
|
tool_dict = {
|
|
"tool_id": tool_id,
|
|
"callable": callable,
|
|
"spec": spec,
|
|
}
|
|
|
|
# TODO: if collision, prepend toolkit name
|
|
if function_name in tools_dict:
|
|
log.warning(
|
|
f"Tool {function_name} already exists in another tools!"
|
|
)
|
|
log.warning(f"Discarding {tool_id}.{function_name}")
|
|
else:
|
|
tools_dict[function_name] = tool_dict
|
|
else:
|
|
continue
|
|
else:
|
|
module = request.app.state.TOOLS.get(tool_id, None)
|
|
if module is None:
|
|
module, _ = load_tools_module_by_id(tool_id)
|
|
request.app.state.TOOLS[tool_id] = module
|
|
|
|
extra_params["__id__"] = tool_id
|
|
|
|
# Set valves for the tool
|
|
if hasattr(module, "valves") and hasattr(module, "Valves"):
|
|
valves = Tools.get_tool_valves_by_id(tool_id) or {}
|
|
module.valves = module.Valves(**valves)
|
|
if hasattr(module, "UserValves"):
|
|
extra_params["__user__"]["valves"] = module.UserValves( # type: ignore
|
|
**Tools.get_user_valves_by_id_and_user_id(tool_id, user.id)
|
|
)
|
|
|
|
for spec in tool.specs:
|
|
# TODO: Fix hack for OpenAI API
|
|
# Some times breaks OpenAI but others don't. Leaving the comment
|
|
for val in spec.get("parameters", {}).get("properties", {}).values():
|
|
if val["type"] == "str":
|
|
val["type"] = "string"
|
|
|
|
# Remove internal reserved parameters (e.g. __id__, __user__)
|
|
spec["parameters"]["properties"] = {
|
|
key: val
|
|
for key, val in spec["parameters"]["properties"].items()
|
|
if not key.startswith("__")
|
|
}
|
|
|
|
# convert to function that takes only model params and inserts custom params
|
|
function_name = spec["name"]
|
|
tool_function = getattr(module, function_name)
|
|
callable = get_async_tool_function_and_apply_extra_params(
|
|
tool_function, extra_params
|
|
)
|
|
|
|
# TODO: Support Pydantic models as parameters
|
|
if callable.__doc__ and callable.__doc__.strip() != "":
|
|
s = re.split(":(param|return)", callable.__doc__, 1)
|
|
spec["description"] = s[0]
|
|
else:
|
|
spec["description"] = function_name
|
|
|
|
tool_dict = {
|
|
"tool_id": tool_id,
|
|
"callable": callable,
|
|
"spec": spec,
|
|
# Misc info
|
|
"metadata": {
|
|
"file_handler": hasattr(module, "file_handler")
|
|
and module.file_handler,
|
|
"citation": hasattr(module, "citation") and module.citation,
|
|
},
|
|
}
|
|
|
|
# TODO: if collision, prepend toolkit name
|
|
if function_name in tools_dict:
|
|
log.warning(
|
|
f"Tool {function_name} already exists in another tools!"
|
|
)
|
|
log.warning(f"Discarding {tool_id}.{function_name}")
|
|
else:
|
|
tools_dict[function_name] = tool_dict
|
|
|
|
return tools_dict
|
|
|
|
|
|
def parse_description(docstring: str | None) -> str:
|
|
"""
|
|
Parse a function's docstring to extract the description.
|
|
|
|
Args:
|
|
docstring (str): The docstring to parse.
|
|
|
|
Returns:
|
|
str: The description.
|
|
"""
|
|
|
|
if not docstring:
|
|
return ""
|
|
|
|
lines = [line.strip() for line in docstring.strip().split("\n")]
|
|
description_lines: list[str] = []
|
|
|
|
for line in lines:
|
|
if re.match(r":param", line) or re.match(r":return", line):
|
|
break
|
|
|
|
description_lines.append(line)
|
|
|
|
return "\n".join(description_lines)
|
|
|
|
|
|
def parse_docstring(docstring):
|
|
"""
|
|
Parse a function's docstring to extract parameter descriptions in reST format.
|
|
|
|
Args:
|
|
docstring (str): The docstring to parse.
|
|
|
|
Returns:
|
|
dict: A dictionary where keys are parameter names and values are descriptions.
|
|
"""
|
|
if not docstring:
|
|
return {}
|
|
|
|
# Regex to match `:param name: description` format
|
|
param_pattern = re.compile(r":param (\w+):\s*(.+)")
|
|
param_descriptions = {}
|
|
|
|
for line in docstring.splitlines():
|
|
match = param_pattern.match(line.strip())
|
|
if not match:
|
|
continue
|
|
param_name, param_description = match.groups()
|
|
if param_name.startswith("__"):
|
|
continue
|
|
param_descriptions[param_name] = param_description
|
|
|
|
return param_descriptions
|
|
|
|
|
|
def function_to_pydantic_model(func: Callable) -> type[BaseModel]:
|
|
"""
|
|
Converts a Python function's type hints and docstring to a Pydantic model,
|
|
including support for nested types, default values, and descriptions.
|
|
|
|
Args:
|
|
func: The function whose type hints and docstring should be converted.
|
|
model_name: The name of the generated Pydantic model.
|
|
|
|
Returns:
|
|
A Pydantic model class.
|
|
"""
|
|
type_hints = get_type_hints(func)
|
|
signature = inspect.signature(func)
|
|
parameters = signature.parameters
|
|
|
|
docstring = func.__doc__
|
|
descriptions = parse_docstring(docstring)
|
|
|
|
tool_description = parse_description(docstring)
|
|
|
|
field_defs = {}
|
|
for name, param in parameters.items():
|
|
type_hint = type_hints.get(name, Any)
|
|
default_value = param.default if param.default is not param.empty else ...
|
|
description = descriptions.get(name, None)
|
|
if not description:
|
|
field_defs[name] = type_hint, default_value
|
|
continue
|
|
field_defs[name] = type_hint, Field(default_value, description=description)
|
|
|
|
model = create_model(func.__name__, **field_defs)
|
|
model.__doc__ = tool_description
|
|
|
|
return model
|
|
|
|
|
|
def get_callable_attributes(tool: object) -> list[Callable]:
|
|
return [
|
|
getattr(tool, func)
|
|
for func in dir(tool)
|
|
if callable(getattr(tool, func))
|
|
and not func.startswith("__")
|
|
and not inspect.isclass(getattr(tool, func))
|
|
]
|
|
|
|
|
|
def get_tools_specs(tool_class: object) -> list[dict]:
|
|
function_model_list = map(
|
|
function_to_pydantic_model, get_callable_attributes(tool_class)
|
|
)
|
|
return [
|
|
convert_to_openai_function(function_model)
|
|
for function_model in function_model_list
|
|
]
|
|
|
|
|
|
def resolve_schema(schema, components):
|
|
"""
|
|
Recursively resolves a JSON schema using OpenAPI components.
|
|
"""
|
|
if not schema:
|
|
return {}
|
|
|
|
if "$ref" in schema:
|
|
ref_path = schema["$ref"]
|
|
ref_parts = ref_path.strip("#/").split("/")
|
|
resolved = components
|
|
for part in ref_parts[1:]: # Skip the initial 'components'
|
|
resolved = resolved.get(part, {})
|
|
return resolve_schema(resolved, components)
|
|
|
|
resolved_schema = copy.deepcopy(schema)
|
|
|
|
# Recursively resolve inner schemas
|
|
if "properties" in resolved_schema:
|
|
for prop, prop_schema in resolved_schema["properties"].items():
|
|
resolved_schema["properties"][prop] = resolve_schema(
|
|
prop_schema, components
|
|
)
|
|
|
|
if "items" in resolved_schema:
|
|
resolved_schema["items"] = resolve_schema(resolved_schema["items"], components)
|
|
|
|
return resolved_schema
|
|
|
|
|
|
def convert_openapi_to_tool_payload(openapi_spec):
|
|
"""
|
|
Converts an OpenAPI specification into a custom tool payload structure.
|
|
|
|
Args:
|
|
openapi_spec (dict): The OpenAPI specification as a Python dict.
|
|
|
|
Returns:
|
|
list: A list of tool payloads.
|
|
"""
|
|
tool_payload = []
|
|
|
|
for path, methods in openapi_spec.get("paths", {}).items():
|
|
for method, operation in methods.items():
|
|
tool = {
|
|
"type": "function",
|
|
"name": operation.get("operationId"),
|
|
"description": operation.get(
|
|
"description", operation.get("summary", "No description available.")
|
|
),
|
|
"parameters": {"type": "object", "properties": {}, "required": []},
|
|
}
|
|
|
|
# Extract path and query parameters
|
|
for param in operation.get("parameters", []):
|
|
param_name = param["name"]
|
|
param_schema = param.get("schema", {})
|
|
tool["parameters"]["properties"][param_name] = {
|
|
"type": param_schema.get("type"),
|
|
"description": param_schema.get("description", ""),
|
|
}
|
|
if param.get("required"):
|
|
tool["parameters"]["required"].append(param_name)
|
|
|
|
# Extract and resolve requestBody if available
|
|
request_body = operation.get("requestBody")
|
|
if request_body:
|
|
content = request_body.get("content", {})
|
|
json_schema = content.get("application/json", {}).get("schema")
|
|
if json_schema:
|
|
resolved_schema = resolve_schema(
|
|
json_schema, openapi_spec.get("components", {})
|
|
)
|
|
|
|
if resolved_schema.get("properties"):
|
|
tool["parameters"]["properties"].update(
|
|
resolved_schema["properties"]
|
|
)
|
|
if "required" in resolved_schema:
|
|
tool["parameters"]["required"] = list(
|
|
set(
|
|
tool["parameters"]["required"]
|
|
+ resolved_schema["required"]
|
|
)
|
|
)
|
|
elif resolved_schema.get("type") == "array":
|
|
tool["parameters"] = resolved_schema # special case for array
|
|
|
|
tool_payload.append(tool)
|
|
|
|
return tool_payload
|
|
|
|
|
|
async def get_tool_server_data(token: str, url: str) -> Dict[str, Any]:
|
|
headers = {
|
|
"Accept": "application/json",
|
|
"Content-Type": "application/json",
|
|
}
|
|
if token:
|
|
headers["Authorization"] = f"Bearer {token}"
|
|
|
|
error = None
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url, headers=headers) as response:
|
|
if response.status != 200:
|
|
error_body = await response.json()
|
|
raise Exception(error_body)
|
|
res = await response.json()
|
|
except Exception as err:
|
|
print("Error:", err)
|
|
if isinstance(err, dict) and "detail" in err:
|
|
error = err["detail"]
|
|
else:
|
|
error = str(err)
|
|
raise Exception(error)
|
|
|
|
data = {
|
|
"openapi": res,
|
|
"info": res.get("info", {}),
|
|
"specs": convert_openapi_to_tool_payload(res),
|
|
}
|
|
|
|
print("Fetched data:", data)
|
|
return data
|
|
|
|
|
|
async def get_tool_servers_data(
|
|
servers: List[Dict[str, Any]], session_token: Optional[str] = None
|
|
) -> List[Dict[str, Any]]:
|
|
# Prepare list of enabled servers along with their original index
|
|
server_entries = []
|
|
for idx, server in enumerate(servers):
|
|
if server.get("config", {}).get("enable"):
|
|
url_path = server.get("path", "openapi.json")
|
|
full_url = f"{server.get('url')}/{url_path}"
|
|
|
|
auth_type = server.get("auth_type", "bearer")
|
|
token = None
|
|
|
|
if auth_type == "bearer":
|
|
token = server.get("key", "")
|
|
elif auth_type == "session":
|
|
token = session_token
|
|
server_entries.append((idx, server, full_url, token))
|
|
|
|
# Create async tasks to fetch data
|
|
tasks = [get_tool_server_data(token, url) for (_, _, url, token) in server_entries]
|
|
|
|
# Execute tasks concurrently
|
|
responses = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
# Build final results with index and server metadata
|
|
results = []
|
|
for (idx, server, url, _), response in zip(server_entries, responses):
|
|
if isinstance(response, Exception):
|
|
print(f"Failed to connect to {url} OpenAPI tool server")
|
|
continue
|
|
|
|
results.append(
|
|
{
|
|
"idx": idx,
|
|
"url": server.get("url"),
|
|
"openapi": response.get("openapi"),
|
|
"info": response.get("info"),
|
|
"specs": response.get("specs"),
|
|
}
|
|
)
|
|
|
|
return results
|
|
|
|
|
|
async def execute_tool_server(
|
|
token: str, url: str, name: str, params: Dict[str, Any], server_data: Dict[str, Any]
|
|
) -> Any:
|
|
error = None
|
|
try:
|
|
openapi = server_data.get("openapi", {})
|
|
paths = openapi.get("paths", {})
|
|
|
|
matching_route = None
|
|
for route_path, methods in paths.items():
|
|
for http_method, operation in methods.items():
|
|
if isinstance(operation, dict) and operation.get("operationId") == name:
|
|
matching_route = (route_path, methods)
|
|
break
|
|
if matching_route:
|
|
break
|
|
|
|
if not matching_route:
|
|
raise Exception(f"No matching route found for operationId: {name}")
|
|
|
|
route_path, methods = matching_route
|
|
|
|
method_entry = None
|
|
for http_method, operation in methods.items():
|
|
if operation.get("operationId") == name:
|
|
method_entry = (http_method.lower(), operation)
|
|
break
|
|
|
|
if not method_entry:
|
|
raise Exception(f"No matching method found for operationId: {name}")
|
|
|
|
http_method, operation = method_entry
|
|
|
|
path_params = {}
|
|
query_params = {}
|
|
body_params = {}
|
|
|
|
for param in operation.get("parameters", []):
|
|
param_name = param["name"]
|
|
param_in = param["in"]
|
|
if param_name in params:
|
|
if param_in == "path":
|
|
path_params[param_name] = params[param_name]
|
|
elif param_in == "query":
|
|
query_params[param_name] = params[param_name]
|
|
|
|
final_url = f"{url}{route_path}"
|
|
for key, value in path_params.items():
|
|
final_url = final_url.replace(f"{{{key}}}", str(value))
|
|
|
|
if query_params:
|
|
query_string = "&".join(f"{k}={v}" for k, v in query_params.items())
|
|
final_url = f"{final_url}?{query_string}"
|
|
|
|
if operation.get("requestBody", {}).get("content"):
|
|
if params:
|
|
body_params = params
|
|
else:
|
|
raise Exception(
|
|
f"Request body expected for operation '{name}' but none found."
|
|
)
|
|
|
|
headers = {"Content-Type": "application/json"}
|
|
|
|
if token:
|
|
headers["Authorization"] = f"Bearer {token}"
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
request_method = getattr(session, http_method.lower())
|
|
|
|
if http_method in ["post", "put", "patch"]:
|
|
async with request_method(
|
|
final_url, json=body_params, headers=headers
|
|
) as response:
|
|
if response.status >= 400:
|
|
text = await response.text()
|
|
raise Exception(f"HTTP error {response.status}: {text}")
|
|
return await response.json()
|
|
else:
|
|
async with request_method(final_url, headers=headers) as response:
|
|
if response.status >= 400:
|
|
text = await response.text()
|
|
raise Exception(f"HTTP error {response.status}: {text}")
|
|
return await response.json()
|
|
|
|
except Exception as err:
|
|
error = str(err)
|
|
print("API Request Error:", error)
|
|
return {"error": error}
|