diff --git a/examples/function_calling_filter_pipeline.py b/examples/function_calling_filter_pipeline.py new file mode 100644 index 0000000..7367e65 --- /dev/null +++ b/examples/function_calling_filter_pipeline.py @@ -0,0 +1,191 @@ +from typing import List, Optional +from pydantic import BaseModel +from schemas import OpenAIChatMessage +import os +import requests +import json + +from utils.main import ( + get_last_user_message, + add_or_update_system_message, + get_function_specs, +) +from typing import Literal + + +class Pipeline: + def __init__(self): + # Pipeline filters are only compatible with Open WebUI + # You can think of filter pipeline as a middleware that can be used to edit the form data before it is sent to the OpenAI API. + self.type = "filter" + + # Assign a unique identifier to the pipeline. + # The identifier must be unique across all pipelines. + # The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes. + self.id = "function_calling_filter_pipeline" + self.name = "Function Calling Filter" + + class Valves(BaseModel): + # List target pipeline ids (models) that this filter will be connected to. + # If you want to connect this filter to all pipelines, you can set pipelines to ["*"] + pipelines: List[str] = [] + + # Assign a priority level to the filter pipeline. + # The priority level determines the order in which the filter pipelines are executed. + # The lower the number, the higher the priority. + priority: int = 0 + + # Valves for function calling + OPENAI_API_BASE_URL: str + OPENAI_API_KEY: str + TASK_MODEL: str + TEMPLATE: str + + # Initialize valves + self.valves = Valves( + **{ + "pipelines": ["*"], # Connect to all pipelines + "OPENAI_API_BASE_URL": "https://api.openai.com/v1", + "OPENAI_API_KEY": os.getenv("OPENAI_API_KEY"), + "TASK_MODEL": "gpt-3.5-turbo", + "TEMPLATE": """Use the following context as your learned knowledge, inside XML tags. + + {{CONTEXT}} + + +When answer to user: +- If you don't know, just say that you don't know. +- If you don't know when you are not sure, ask for clarification. +Avoid mentioning that you obtained the information from the context. +And answer according to the language of the user's question.""", + } + ) + + class Functions: + def get_current_weather( + self, + location: str, + unit: Literal["metric", "fahrenheit"] = "fahrenheit", + ) -> str: + """ + Get the current weather for a location. If the location is not found, return an empty string. + + :param location: The location to get the weather for. + :param unit: The unit to get the weather in. Default is fahrenheit. + :return: The current weather for the location. + """ + print(location, unit) + return f"{location}: Sunny" + + def get_user_name(self, user_id: str) -> str: + """ + Get the user's name from the user_id. + """ + print(user_id) + return "John Doe" + + self.functions = Functions() + + async def on_startup(self): + # This function is called when the server is started. + print(f"on_startup:{__name__}") + pass + + async def on_shutdown(self): + # This function is called when the server is stopped. + print(f"on_shutdown:{__name__}") + pass + + async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: + if body.get("title", False): + return body + + print(f"pipe:{__name__}") + print(user) + + user_message = get_last_user_message(body["messages"]) + function_specs = get_function_specs(self.functions) + + fc_system_prompt = ( + f"Functions: {json.dumps(function_specs, indent=2)}" + + """ +If a function doesn't match the query, return an empty string. Else, pick a function, fill in the parameters from the function's schema, and return it in the format { name: \"functionName\", parameters: { key: value } }. Only pick a function if the user asks." +""" + ) + + print(fc_system_prompt) + + r = None + + try: + r = requests.post( + url=f"{self.valves.OPENAI_API_BASE_URL}/chat/completions", + json={ + "model": self.valves.TASK_MODEL, + "messages": [ + { + "role": "system", + "content": fc_system_prompt, + }, + { + "role": "user", + "content": "History:\n" + + "\n".join( + [ + f"{message['role']}: {message['content']}" + for message in body["messages"][::-1][:4] + ] + ) + + f"Query: {user_message}", + }, + ], + # TODO: dynamically add response_format? + # "response_format": {"type": "json_object"}, + }, + headers={ + "Authorization": f"Bearer {self.valves.OPENAI_API_KEY}", + "Content-Type": "application/json", + }, + stream=False, + ) + + r.raise_for_status() + + response = r.json() + content = response["choices"][0]["message"]["content"] + + if content != "": + result = json.loads(content) + print(result) + + # + if "name" in result: + function = getattr(self.functions, result["name"]) + function_result = None + try: + function_result = function(**result["parameters"]) + except Exception as e: + print(e) + + if function_result: + system_prompt = self.valves.TEMPLATE.replace( + "{{CONTEXT}}", function_result + ) + + print(system_prompt) + messages = add_or_update_system_message( + system_prompt, body["messages"] + ) + + return {**body, "messages": messages} + + except Exception as e: + print(f"Error: {e}") + + if r: + try: + print(r.json()) + except: + pass + + return body diff --git a/utils/main.py b/utils/main.py index 96026ea..7a8b55b 100644 --- a/utils/main.py +++ b/utils/main.py @@ -4,6 +4,9 @@ import time from typing import List from schemas import OpenAIChatMessage +import inspect +from typing import get_type_hints, Literal + def stream_message_template(model: str, message: str): return { @@ -42,3 +45,95 @@ def get_last_assistant_message(messages: List[dict]) -> str: return item["text"] return message["content"] return None + + +def add_or_update_system_message(content: str, messages: List[dict]): + """ + Adds a new system message at the beginning of the messages list + or updates the existing system message at the beginning. + + :param msg: The message to be added or appended. + :param messages: The list of message dictionaries. + :return: The updated list of message dictionaries. + """ + + if messages and messages[0].get("role") == "system": + messages[0]["content"] += f"{content}\n{messages[0]['content']}" + else: + # Insert at the beginning + messages.insert(0, {"role": "system", "content": content}) + + return messages + + +def doc_to_dict(docstring): + lines = docstring.split("\n") + description = lines[1].strip() + param_dict = {} + + for line in lines: + if ":param" in line: + line = line.replace(":param", "").strip() + param, desc = line.split(":", 1) + param_dict[param.strip()] = desc.strip() + ret_dict = {"description": description, "params": param_dict} + return ret_dict + + +def get_function_specs(functions) -> List[dict]: + + function_list = [ + {"name": func, "function": getattr(functions, func)} + for func in dir(functions) + if callable(getattr(functions, func)) and not func.startswith("__") + ] + + specs = [] + + for function_item in function_list: + function_name = function_item["name"] + function = function_item["function"] + + function_doc = doc_to_dict(function.__doc__ or function_name) + specs.append( + { + "name": function_name, + # TODO: multi-line desc? + "description": function_doc.get("description", function_name), + "parameters": { + "type": "object", + "properties": { + param_name: { + "type": param_annotation.__name__.lower(), + **( + { + "enum": ( + param_annotation.__args__ + if hasattr(param_annotation, "__args__") + else None + ) + } + if hasattr(param_annotation, "__args__") + else {} + ), + "description": function_doc.get("params", {}).get( + param_name, param_name + ), + } + for param_name, param_annotation in get_type_hints( + function + ).items() + if param_name != "return" + }, + "required": [ + name + for name, param in inspect.signature( + function + ).parameters.items() + if param.default is param.empty + ], + }, + } + ) + + return specs