open-webui/backend/open_webui/utils/tools.py

203 lines
6.4 KiB
Python
Raw Normal View History

2024-06-11 03:39:55 +00:00
import inspect
2024-08-19 09:53:12 +00:00
import logging
2024-11-21 17:19:56 +00:00
import re
from typing import Any, Awaitable, Callable, get_type_hints
from functools import update_wrapper, partial
2024-08-19 09:53:12 +00:00
2024-12-13 04:22:17 +00:00
from fastapi import Request
from pydantic import BaseModel, Field, create_model
2024-11-21 17:19:56 +00:00
from langchain_core.utils.function_calling import convert_to_openai_function
2024-12-13 04:22:17 +00:00
2024-12-10 08:54:13 +00:00
from open_webui.models.tools import Tools
from open_webui.models.users import UserModel
2024-12-12 02:36:59 +00:00
from open_webui.utils.plugin import load_tools_module_by_id
2024-08-19 15:27:38 +00:00
2024-08-19 09:53:12 +00:00
log = logging.getLogger(__name__)
def apply_extra_params_to_tool_function(
function: Callable, extra_params: dict
) -> Callable[..., Awaitable]:
2024-11-21 17:41:35 +00:00
sig = inspect.signature(function)
extra_params = {k: v for k, v in extra_params.items() if k in sig.parameters}
2024-11-21 17:19:56 +00:00
partial_func = partial(function, **extra_params)
if inspect.iscoroutinefunction(function):
update_wrapper(partial_func, function)
return partial_func
2024-08-19 09:53:12 +00:00
2024-11-21 17:19:56 +00:00
async def new_function(*args, **kwargs):
return partial_func(*args, **kwargs)
update_wrapper(new_function, function)
2024-08-19 09:53:12 +00:00
return new_function
# Mutation on extra_params
def get_tools(
2024-12-13 04:22:17 +00:00
request: Request, tool_ids: list[str], user: UserModel, extra_params: dict
2024-08-19 09:53:12 +00:00
) -> dict[str, dict]:
2024-11-17 02:31:13 +00:00
tools_dict = {}
2024-08-19 09:53:12 +00:00
for tool_id in tool_ids:
2024-11-17 02:31:13 +00:00
tools = Tools.get_tool_by_id(tool_id)
if tools is None:
2024-08-19 09:53:12 +00:00
continue
2024-12-13 04:22:17 +00:00
module = request.app.state.TOOLS.get(tool_id, None)
2024-08-19 09:53:12 +00:00
if module is None:
2024-11-17 01:54:38 +00:00
module, _ = load_tools_module_by_id(tool_id)
2024-12-13 04:22:17 +00:00
request.app.state.TOOLS[tool_id] = module
2024-08-19 09:53:12 +00:00
extra_params["__id__"] = tool_id
if hasattr(module, "valves") and hasattr(module, "Valves"):
valves = Tools.get_tool_valves_by_id(tool_id) or {}
module.valves = module.Valves(**valves)
if hasattr(module, "UserValves"):
extra_params["__user__"]["valves"] = module.UserValves( # type: ignore
**Tools.get_user_valves_by_id_and_user_id(tool_id, user.id)
)
2024-11-17 02:31:13 +00:00
for spec in tools.specs:
# Remove internal parameters
spec["parameters"]["properties"] = {
key: val
for key, val in spec["parameters"]["properties"].items()
if not key.startswith("__")
}
2024-08-19 09:53:12 +00:00
function_name = spec["name"]
# convert to function that takes only model params and inserts custom params
2024-08-19 15:27:21 +00:00
original_func = getattr(module, function_name)
callable = apply_extra_params_to_tool_function(original_func, extra_params)
2024-08-19 09:53:12 +00:00
# TODO: This needs to be a pydantic model
tool_dict = {
"toolkit_id": tool_id,
"callable": callable,
"spec": spec,
2024-11-21 17:19:56 +00:00
"pydantic_model": function_to_pydantic_model(callable),
2024-08-19 09:53:12 +00:00
"file_handler": hasattr(module, "file_handler") and module.file_handler,
"citation": hasattr(module, "citation") and module.citation,
}
# TODO: if collision, prepend toolkit name
2024-11-17 02:31:13 +00:00
if function_name in tools_dict:
log.warning(f"Tool {function_name} already exists in another tools!")
log.warning(f"Collision between {tools} and {tool_id}.")
log.warning(f"Discarding {tools}.{function_name}")
2024-08-19 09:53:12 +00:00
else:
2024-11-17 02:31:13 +00:00
tools_dict[function_name] = tool_dict
return tools_dict
2024-06-11 03:39:55 +00:00
2024-11-22 16:25:52 +00:00
def parse_description(docstring: str | None) -> str:
"""
Parse a function's docstring to extract the description.
Args:
docstring (str): The docstring to parse.
Returns:
str: The description.
"""
if not docstring:
return ""
lines = [line.strip() for line in docstring.strip().split("\n")]
description_lines: list[str] = []
for line in lines:
if re.match(r":param", line) or re.match(r":return", line):
break
description_lines.append(line)
return "\n".join(description_lines)
2024-11-21 17:19:56 +00:00
def parse_docstring(docstring):
"""
Parse a function's docstring to extract parameter descriptions in reST format.
Args:
docstring (str): The docstring to parse.
Returns:
dict: A dictionary where keys are parameter names and values are descriptions.
"""
if not docstring:
return {}
# Regex to match `:param name: description` format
param_pattern = re.compile(r":param (\w+):\s*(.+)")
param_descriptions = {}
for line in docstring.splitlines():
match = param_pattern.match(line.strip())
2024-11-22 20:51:16 +00:00
if not match:
continue
param_name, param_description = match.groups()
if param_name.startswith("__"):
continue
param_descriptions[param_name] = param_description
2024-11-21 17:19:56 +00:00
return param_descriptions
2024-06-11 03:39:55 +00:00
2024-11-21 17:19:56 +00:00
def function_to_pydantic_model(func: Callable) -> type[BaseModel]:
"""
Converts a Python function's type hints and docstring to a Pydantic model,
including support for nested types, default values, and descriptions.
2024-06-11 03:39:55 +00:00
2024-11-21 17:19:56 +00:00
Args:
func: The function whose type hints and docstring should be converted.
model_name: The name of the generated Pydantic model.
Returns:
A Pydantic model class.
"""
type_hints = get_type_hints(func)
signature = inspect.signature(func)
parameters = signature.parameters
docstring = func.__doc__
descriptions = parse_docstring(docstring)
2024-11-22 16:25:52 +00:00
tool_description = parse_description(docstring)
2024-11-21 17:19:56 +00:00
field_defs = {}
for name, param in parameters.items():
type_hint = type_hints.get(name, Any)
default_value = param.default if param.default is not param.empty else ...
description = descriptions.get(name, None)
if not description:
field_defs[name] = type_hint, default_value
continue
field_defs[name] = type_hint, Field(default_value, description=description)
2024-11-22 16:25:52 +00:00
model = create_model(func.__name__, **field_defs)
model.__doc__ = tool_description
return model
2024-11-21 17:19:56 +00:00
def get_callable_attributes(tool: object) -> list[Callable]:
return [
getattr(tool, func)
for func in dir(tool)
if callable(getattr(tool, func))
2024-06-22 09:29:22 +00:00
and not func.startswith("__")
2024-11-21 17:19:56 +00:00
and not inspect.isclass(getattr(tool, func))
2024-06-11 03:39:55 +00:00
]
2024-11-21 17:19:56 +00:00
def get_tools_specs(tool_class: object) -> list[dict]:
function_list = get_callable_attributes(tool_class)
models = map(function_to_pydantic_model, function_list)
return [convert_to_openai_function(tool) for tool in models]