import inspect
import logging
from typing import Awaitable, Callable, get_type_hints

from open_webui.apps.webui.models.tools import Tools
from open_webui.apps.webui.models.users import UserModel
from open_webui.apps.webui.utils import load_toolkit_module_by_id
from open_webui.utils.schemas import json_schema_to_model

log = logging.getLogger(__name__)


def apply_extra_params_to_tool_function(
    function: Callable, extra_params: dict
) -> Callable[..., Awaitable]:
    sig = inspect.signature(function)
    extra_params = {
        key: value for key, value in extra_params.items() if key in sig.parameters
    }
    is_coroutine = inspect.iscoroutinefunction(function)

    async def new_function(**kwargs):
        extra_kwargs = kwargs | extra_params
        if is_coroutine:
            return await function(**extra_kwargs)
        return function(**extra_kwargs)

    return new_function


# Mutation on extra_params
def get_tools(
    webui_app, tool_ids: list[str], user: UserModel, extra_params: dict
) -> dict[str, dict]:
    tools = {}
    for tool_id in tool_ids:
        toolkit = Tools.get_tool_by_id(tool_id)
        if toolkit is None:
            continue

        module = webui_app.state.TOOLS.get(tool_id, None)
        if module is None:
            module, _ = load_toolkit_module_by_id(tool_id)
            webui_app.state.TOOLS[tool_id] = module

        extra_params["__id__"] = tool_id
        if hasattr(module, "valves") and hasattr(module, "Valves"):
            valves = Tools.get_tool_valves_by_id(tool_id) or {}
            module.valves = module.Valves(**valves)

        if hasattr(module, "UserValves"):
            extra_params["__user__"]["valves"] = module.UserValves(  # type: ignore
                **Tools.get_user_valves_by_id_and_user_id(tool_id, user.id)
            )

        for spec in toolkit.specs:
            # TODO: Fix hack for OpenAI API
            for val in spec.get("parameters", {}).get("properties", {}).values():
                if val["type"] == "str":
                    val["type"] = "string"
            function_name = spec["name"]

            # convert to function that takes only model params and inserts custom params
            original_func = getattr(module, function_name)
            callable = apply_extra_params_to_tool_function(original_func, extra_params)
            if hasattr(original_func, "__doc__"):
                callable.__doc__ = original_func.__doc__

            # TODO: This needs to be a pydantic model
            tool_dict = {
                "toolkit_id": tool_id,
                "callable": callable,
                "spec": spec,
                "pydantic_model": json_schema_to_model(spec),
                "file_handler": hasattr(module, "file_handler") and module.file_handler,
                "citation": hasattr(module, "citation") and module.citation,
            }

            # TODO: if collision, prepend toolkit name
            if function_name in tools:
                log.warning(f"Tool {function_name} already exists in another toolkit!")
                log.warning(f"Collision between {toolkit} and {tool_id}.")
                log.warning(f"Discarding {toolkit}.{function_name}")
            else:
                tools[function_name] = tool_dict
    return tools


def doc_to_dict(docstring):
    lines = docstring.split("\n")
    description = lines[1].strip()
    param_dict = {}

    for line in lines:
        if ":param" in line:
            line = line.replace(":param", "").strip()
            param, desc = line.split(":", 1)
            param_dict[param.strip()] = desc.strip()
    ret_dict = {"description": description, "params": param_dict}
    return ret_dict


def get_tools_specs(tools) -> list[dict]:
    function_list = [
        {"name": func, "function": getattr(tools, func)}
        for func in dir(tools)
        if callable(getattr(tools, func))
        and not func.startswith("__")
        and not inspect.isclass(getattr(tools, func))
    ]

    specs = []
    for function_item in function_list:
        function_name = function_item["name"]
        function = function_item["function"]

        function_doc = doc_to_dict(function.__doc__ or function_name)
        specs.append(
            {
                "name": function_name,
                # TODO: multi-line desc?
                "description": function_doc.get("description", function_name),
                "parameters": {
                    "type": "object",
                    "properties": {
                        param_name: {
                            "type": param_annotation.__name__.lower(),
                            **(
                                {
                                    "enum": (
                                        str(param_annotation.__args__)
                                        if hasattr(param_annotation, "__args__")
                                        else None
                                    )
                                }
                                if hasattr(param_annotation, "__args__")
                                else {}
                            ),
                            "description": function_doc.get("params", {}).get(
                                param_name, param_name
                            ),
                        }
                        for param_name, param_annotation in get_type_hints(
                            function
                        ).items()
                        if param_name != "return"
                        and not (
                            param_name.startswith("__") and param_name.endswith("__")
                        )
                    },
                    "required": [
                        name
                        for name, param in inspect.signature(
                            function
                        ).parameters.items()
                        if param.default is param.empty
                        and not (name.startswith("__") and name.endswith("__"))
                    ],
                },
            }
        )

    return specs