open-webui/backend/open_webui/utils/tools.py

174 lines
6.2 KiB
Python
Raw Normal View History

2024-06-11 03:39:55 +00:00
import inspect
2024-08-19 09:53:12 +00:00
import logging
from typing import Awaitable, Callable, get_type_hints
from open_webui.apps.webui.models.tools import Tools
from open_webui.apps.webui.models.users import UserModel
2024-11-17 01:54:38 +00:00
from open_webui.apps.webui.utils import load_tools_module_by_id
from open_webui.utils.schemas import json_schema_to_model
2024-08-19 15:27:38 +00:00
2024-08-19 09:53:12 +00:00
log = logging.getLogger(__name__)
def apply_extra_params_to_tool_function(
function: Callable, extra_params: dict
) -> Callable[..., Awaitable]:
sig = inspect.signature(function)
extra_params = {
key: value for key, value in extra_params.items() if key in sig.parameters
}
is_coroutine = inspect.iscoroutinefunction(function)
async def new_function(**kwargs):
extra_kwargs = kwargs | extra_params
if is_coroutine:
return await function(**extra_kwargs)
return function(**extra_kwargs)
return new_function
# Mutation on extra_params
def get_tools(
webui_app, tool_ids: list[str], user: UserModel, extra_params: dict
) -> dict[str, dict]:
2024-11-17 02:31:13 +00:00
tools_dict = {}
2024-08-19 09:53:12 +00:00
for tool_id in tool_ids:
2024-11-17 02:31:13 +00:00
tools = Tools.get_tool_by_id(tool_id)
if tools is None:
2024-08-19 09:53:12 +00:00
continue
module = webui_app.state.TOOLS.get(tool_id, None)
if module is None:
2024-11-17 01:54:38 +00:00
module, _ = load_tools_module_by_id(tool_id)
2024-08-19 09:53:12 +00:00
webui_app.state.TOOLS[tool_id] = module
extra_params["__id__"] = tool_id
if hasattr(module, "valves") and hasattr(module, "Valves"):
valves = Tools.get_tool_valves_by_id(tool_id) or {}
module.valves = module.Valves(**valves)
if hasattr(module, "UserValves"):
extra_params["__user__"]["valves"] = module.UserValves( # type: ignore
**Tools.get_user_valves_by_id_and_user_id(tool_id, user.id)
)
2024-11-17 02:31:13 +00:00
for spec in tools.specs:
2024-08-19 09:53:12 +00:00
# TODO: Fix hack for OpenAI API
for val in spec.get("parameters", {}).get("properties", {}).values():
if val["type"] == "str":
val["type"] = "string"
2024-11-17 02:31:13 +00:00
# Remove internal parameters
spec["parameters"]["properties"] = {
key: val
for key, val in spec["parameters"]["properties"].items()
if not key.startswith("__")
}
2024-08-19 09:53:12 +00:00
function_name = spec["name"]
# convert to function that takes only model params and inserts custom params
2024-08-19 15:27:21 +00:00
original_func = getattr(module, function_name)
callable = apply_extra_params_to_tool_function(original_func, extra_params)
if hasattr(original_func, "__doc__"):
callable.__doc__ = original_func.__doc__
2024-08-19 09:53:12 +00:00
# TODO: This needs to be a pydantic model
tool_dict = {
"toolkit_id": tool_id,
"callable": callable,
"spec": spec,
2024-08-19 15:27:38 +00:00
"pydantic_model": json_schema_to_model(spec),
2024-08-19 09:53:12 +00:00
"file_handler": hasattr(module, "file_handler") and module.file_handler,
"citation": hasattr(module, "citation") and module.citation,
}
# TODO: if collision, prepend toolkit name
2024-11-17 02:31:13 +00:00
if function_name in tools_dict:
log.warning(f"Tool {function_name} already exists in another tools!")
log.warning(f"Collision between {tools} and {tool_id}.")
log.warning(f"Discarding {tools}.{function_name}")
2024-08-19 09:53:12 +00:00
else:
2024-11-17 02:31:13 +00:00
tools_dict[function_name] = tool_dict
return tools_dict
2024-06-11 03:39:55 +00:00
def doc_to_dict(docstring):
lines = docstring.split("\n")
description = lines[1].strip()
param_dict = {}
for line in lines:
if ":param" in line:
line = line.replace(":param", "").strip()
param, desc = line.split(":", 1)
param_dict[param.strip()] = desc.strip()
ret_dict = {"description": description, "params": param_dict}
return ret_dict
2024-08-14 12:46:31 +00:00
def get_tools_specs(tools) -> list[dict]:
2024-06-11 03:39:55 +00:00
function_list = [
{"name": func, "function": getattr(tools, func)}
for func in dir(tools)
2024-06-22 09:29:22 +00:00
if callable(getattr(tools, func))
and not func.startswith("__")
and not inspect.isclass(getattr(tools, func))
2024-06-11 03:39:55 +00:00
]
specs = []
for function_item in function_list:
function_name = function_item["name"]
function = function_item["function"]
function_doc = doc_to_dict(function.__doc__ or function_name)
specs.append(
{
"name": function_name,
# TODO: multi-line desc?
"description": function_doc.get("description", function_name),
"parameters": {
"type": "object",
"properties": {
param_name: {
"type": param_annotation.__name__.lower(),
**(
{
"enum": (
2024-06-11 21:30:18 +00:00
str(param_annotation.__args__)
2024-06-11 03:39:55 +00:00
if hasattr(param_annotation, "__args__")
else None
)
}
if hasattr(param_annotation, "__args__")
else {}
),
"description": function_doc.get("params", {}).get(
param_name, param_name
),
}
for param_name, param_annotation in get_type_hints(
function
).items()
if param_name != "return"
and not (
param_name.startswith("__") and param_name.endswith("__")
)
2024-06-11 03:39:55 +00:00
},
"required": [
name
for name, param in inspect.signature(
function
).parameters.items()
if param.default is param.empty
2024-06-22 20:47:57 +00:00
and not (name.startswith("__") and name.endswith("__"))
2024-06-11 03:39:55 +00:00
],
},
}
)
return specs