diff --git a/blueprints/function_calling_blueprint.py b/blueprints/function_calling_blueprint.py
new file mode 100644
index 0000000..aeae241
--- /dev/null
+++ b/blueprints/function_calling_blueprint.py
@@ -0,0 +1,172 @@
+from typing import List, Optional
+from pydantic import BaseModel
+from schemas import OpenAIChatMessage
+import os
+import requests
+import json
+
+from utils.main import (
+ get_last_user_message,
+ add_or_update_system_message,
+ get_tools_specs,
+)
+
+
+class Pipeline:
+ class Valves(BaseModel):
+ # List target pipeline ids (models) that this filter will be connected to.
+ # If you want to connect this filter to all pipelines, you can set pipelines to ["*"]
+ pipelines: List[str] = []
+
+ # Assign a priority level to the filter pipeline.
+ # The priority level determines the order in which the filter pipelines are executed.
+ # The lower the number, the higher the priority.
+ priority: int = 0
+
+ # Valves for function calling
+ OPENAI_API_BASE_URL: str
+ OPENAI_API_KEY: str
+ TASK_MODEL: str
+ TEMPLATE: str
+
+ def __init__(self):
+ # Pipeline filters are only compatible with Open WebUI
+ # You can think of filter pipeline as a middleware that can be used to edit the form data before it is sent to the OpenAI API.
+ self.type = "filter"
+
+ # Assign a unique identifier to the pipeline.
+ # The identifier must be unique across all pipelines.
+ # The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes.
+ self.id = "function_calling_blueprint"
+ self.name = "Function Calling Blueprint"
+
+ # Initialize valves
+ self.valves = self.Valves(
+ **{
+ "pipelines": ["*"], # Connect to all pipelines
+ "OPENAI_API_BASE_URL": os.getenv(
+ "OPENAI_API_BASE_URL", "https://api.openai.com/v1"
+ ),
+ "OPENAI_API_KEY": os.getenv("OPENAI_API_KEY", "YOUR_OPENAI_API_KEY"),
+ "TASK_MODEL": os.getenv("TASK_MODEL", "gpt-3.5-turbo"),
+ "TEMPLATE": """Use the following context as your learned knowledge, inside XML tags.
+
+ {{CONTEXT}}
+
+
+When answer to user:
+- If you don't know, just say that you don't know.
+- If you don't know when you are not sure, ask for clarification.
+Avoid mentioning that you obtained the information from the context.
+And answer according to the language of the user's question.""",
+ }
+ )
+
+ async def on_startup(self):
+ # This function is called when the server is started.
+ print(f"on_startup:{__name__}")
+ pass
+
+ async def on_shutdown(self):
+ # This function is called when the server is stopped.
+ print(f"on_shutdown:{__name__}")
+ pass
+
+ async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
+ # If title generation is requested, skip the function calling filter
+ if body.get("title", False):
+ return body
+
+ print(f"pipe:{__name__}")
+ print(user)
+
+ # Get the last user message
+ user_message = get_last_user_message(body["messages"])
+
+ # Get the tools specs
+ tools_specs = get_tools_specs(self.tools)
+
+ # System prompt for function calling
+ fc_system_prompt = (
+ f"Tools: {json.dumps(tools_specs, indent=2)}"
+ + """
+If a function tool doesn't match the query, return an empty string. Else, pick a function tool, fill in the parameters from the function tool's schema, and return it in the format { "name": \"functionName\", "parameters": { "key": "value" } }. Only pick a function if the user asks. Only return the object. Do not return any other text."
+"""
+ )
+
+ r = None
+ try:
+ # Call the OpenAI API to get the function response
+ r = requests.post(
+ url=f"{self.valves.OPENAI_API_BASE_URL}/chat/completions",
+ json={
+ "model": self.valves.TASK_MODEL,
+ "messages": [
+ {
+ "role": "system",
+ "content": fc_system_prompt,
+ },
+ {
+ "role": "user",
+ "content": "History:\n"
+ + "\n".join(
+ [
+ f"{message['role']}: {message['content']}"
+ for message in body["messages"][::-1][:4]
+ ]
+ )
+ + f"Query: {user_message}",
+ },
+ ],
+ # TODO: dynamically add response_format?
+ # "response_format": {"type": "json_object"},
+ },
+ headers={
+ "Authorization": f"Bearer {self.valves.OPENAI_API_KEY}",
+ "Content-Type": "application/json",
+ },
+ stream=False,
+ )
+ r.raise_for_status()
+
+ response = r.json()
+ content = response["choices"][0]["message"]["content"]
+
+ # Parse the function response
+ if content != "":
+ result = json.loads(content)
+ print(result)
+
+ # Call the function
+ if "name" in result:
+ function = getattr(self.tools, result["name"])
+ function_result = None
+ try:
+ function_result = function(**result["parameters"])
+ except Exception as e:
+ print(e)
+
+ # Add the function result to the system prompt
+ if function_result:
+ system_prompt = self.valves.TEMPLATE.replace(
+ "{{CONTEXT}}", function_result
+ )
+
+ print(system_prompt)
+ messages = add_or_update_system_message(
+ system_prompt, body["messages"]
+ )
+
+ # Return the updated messages
+ return {**body, "messages": messages}
+
+ except Exception as e:
+ print(f"Error: {e}")
+
+ if r:
+ try:
+ print(r.json())
+ except:
+ pass
+
+ return body
diff --git a/examples/function_calling_filter_pipeline.py b/examples/function_calling_filter_pipeline.py
index 43bae4d..8db3da5 100644
--- a/examples/function_calling_filter_pipeline.py
+++ b/examples/function_calling_filter_pipeline.py
@@ -1,231 +1,80 @@
-from typing import List, Optional
-from pydantic import BaseModel
-from schemas import OpenAIChatMessage
import os
import requests
-import json
-
-from utils.main import (
- get_last_user_message,
- add_or_update_system_message,
- get_function_specs,
-)
-from typing import Literal
+from typing import Literal, List, Optional
+from blueprints.function_calling_blueprint import Pipeline as FunctionCallingBlueprint
-class Pipeline:
+class Pipeline(FunctionCallingBlueprint):
+ class Valves(FunctionCallingBlueprint.Valves):
+ # Add your custom parameters here
+ OPENWEATHERMAP_API_KEY: str = ""
+ pass
+
+ class Tools:
+ def __init__(self, pipeline) -> None:
+ self.pipeline = pipeline
+
+ def get_current_weather(
+ self,
+ location: str,
+ unit: Literal["metric", "fahrenheit"] = "fahrenheit",
+ ) -> str:
+ """
+ Get the current weather for a location. If the location is not found, return an empty string.
+
+ :param location: The location to get the weather for.
+ :param unit: The unit to get the weather in. Default is fahrenheit.
+ :return: The current weather for the location.
+ """
+
+ # https://openweathermap.org/api
+
+ if self.pipeline.valves.OPENWEATHERMAP_API_KEY == "":
+ return "OpenWeatherMap API Key not set, ask the user to set it up."
+ else:
+ units = "imperial" if unit == "fahrenheit" else "metric"
+ params = {
+ "q": location,
+ "appid": self.pipeline.valves.OPENWEATHERMAP_API_KEY,
+ "units": units,
+ }
+
+ response = requests.get(
+ "http://api.openweathermap.org/data/2.5/weather", params=params
+ )
+ response.raise_for_status() # Raises an HTTPError for bad responses
+ data = response.json()
+
+ weather_description = data["weather"][0]["description"]
+ temperature = data["main"]["temp"]
+
+ return f"{location}: {weather_description.capitalize()}, {temperature}°{unit.capitalize()[0]}"
+
+ def calculator(self, equation: str) -> str:
+ """
+ Calculate the result of an equation.
+
+ :param equation: The equation to calculate.
+ """
+
+ # Avoid using eval in production code
+ # https://nedbatchelder.com/blog/201206/eval_really_is_dangerous.html
+ try:
+ result = eval(equation)
+ return f"{equation} = {result}"
+ except Exception as e:
+ print(e)
+ return "Invalid equation"
+
def __init__(self):
- # Pipeline filters are only compatible with Open WebUI
- # You can think of filter pipeline as a middleware that can be used to edit the form data before it is sent to the OpenAI API.
- self.type = "filter"
-
- # Assign a unique identifier to the pipeline.
- # The identifier must be unique across all pipelines.
- # The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes.
- self.id = "function_calling_filter_pipeline"
- self.name = "Function Calling Filter"
-
- class Valves(BaseModel):
- # List target pipeline ids (models) that this filter will be connected to.
- # If you want to connect this filter to all pipelines, you can set pipelines to ["*"]
- pipelines: List[str] = []
-
- # Assign a priority level to the filter pipeline.
- # The priority level determines the order in which the filter pipelines are executed.
- # The lower the number, the higher the priority.
- priority: int = 0
-
- # Valves for function calling
- OPENAI_API_BASE_URL: str
- OPENAI_API_KEY: str
- TASK_MODEL: str
- TEMPLATE: str
-
- OPENWEATHERMAP_API_KEY: str = ""
-
- # Initialize valves
- self.valves = Valves(
+ super().__init__()
+ self.id = "my_tools_pipeline"
+ self.name = "My Tools Pipeline"
+ self.valves = self.Valves(
**{
+ **self.valves.model_dump(),
"pipelines": ["*"], # Connect to all pipelines
- "OPENAI_API_BASE_URL": "https://api.openai.com/v1",
- "OPENAI_API_KEY": os.getenv("OPENAI_API_KEY", "YOUR_OPENAI_API_KEY"),
- "TASK_MODEL": "gpt-3.5-turbo",
- "TEMPLATE": """Use the following context as your learned knowledge, inside XML tags.
-
- {{CONTEXT}}
-
-
-When answer to user:
-- If you don't know, just say that you don't know.
-- If you don't know when you are not sure, ask for clarification.
-Avoid mentioning that you obtained the information from the context.
-And answer according to the language of the user's question.""",
- }
+ "OPENWEATHERMAP_API_KEY": os.getenv("OPENWEATHERMAP_API_KEY", ""),
+ },
)
-
- class Functions:
- def __init__(self, pipeline) -> None:
- self.pipeline = pipeline
-
- def get_current_weather(
- self,
- location: str,
- unit: Literal["metric", "fahrenheit"] = "fahrenheit",
- ) -> str:
- """
- Get the current weather for a location. If the location is not found, return an empty string.
-
- :param location: The location to get the weather for.
- :param unit: The unit to get the weather in. Default is fahrenheit.
- :return: The current weather for the location.
- """
-
- # https://openweathermap.org/api
-
- if self.pipeline.valves.OPENWEATHERMAP_API_KEY == "":
- return "OpenWeatherMap API Key not set, ask the user to set it up."
- else:
- units = "imperial" if unit == "fahrenheit" else "metric"
- params = {
- "q": location,
- "appid": self.pipeline.valves.OPENWEATHERMAP_API_KEY,
- "units": units,
- }
-
- response = requests.get(
- "http://api.openweathermap.org/data/2.5/weather", params=params
- )
- response.raise_for_status() # Raises an HTTPError for bad responses
- data = response.json()
-
- weather_description = data["weather"][0]["description"]
- temperature = data["main"]["temp"]
-
- return f"{location}: {weather_description.capitalize()}, {temperature}°{unit.capitalize()[0]}"
-
- def calculator(self, equation: str) -> str:
- """
- Calculate the result of an equation.
-
- :param equation: The equation to calculate.
- """
-
- # Avoid using eval in production code
- # https://nedbatchelder.com/blog/201206/eval_really_is_dangerous.html
- try:
- result = eval(equation)
- return f"{equation} = {result}"
- except Exception as e:
- print(e)
- return "Invalid equation"
-
- self.functions = Functions(self)
-
- async def on_startup(self):
- # This function is called when the server is started.
- print(f"on_startup:{__name__}")
- pass
-
- async def on_shutdown(self):
- # This function is called when the server is stopped.
- print(f"on_shutdown:{__name__}")
- pass
-
- async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
- # If title generation is requested, skip the function calling filter
- if body.get("title", False):
- return body
-
- print(f"pipe:{__name__}")
- print(user)
-
- # Get the last user message
- user_message = get_last_user_message(body["messages"])
-
- # Get the function specs
- function_specs = get_function_specs(self.functions)
-
- # System prompt for function calling
- fc_system_prompt = (
- f"Functions: {json.dumps(function_specs, indent=2)}"
- + """
-If a function doesn't match the query, return an empty string. Else, pick a function, fill in the parameters from the function's schema, and return it in the format { "name": \"functionName\", "parameters": { "key": "value" } }. Only pick a function if the user asks. Only return the object. Do not return any other text."
-"""
- )
-
- r = None
- try:
- # Call the OpenAI API to get the function response
- r = requests.post(
- url=f"{self.valves.OPENAI_API_BASE_URL}/chat/completions",
- json={
- "model": self.valves.TASK_MODEL,
- "messages": [
- {
- "role": "system",
- "content": fc_system_prompt,
- },
- {
- "role": "user",
- "content": "History:\n"
- + "\n".join(
- [
- f"{message['role']}: {message['content']}"
- for message in body["messages"][::-1][:4]
- ]
- )
- + f"Query: {user_message}",
- },
- ],
- # TODO: dynamically add response_format?
- # "response_format": {"type": "json_object"},
- },
- headers={
- "Authorization": f"Bearer {self.valves.OPENAI_API_KEY}",
- "Content-Type": "application/json",
- },
- stream=False,
- )
- r.raise_for_status()
-
- response = r.json()
- content = response["choices"][0]["message"]["content"]
-
- # Parse the function response
- if content != "":
- result = json.loads(content)
- print(result)
-
- # Call the function
- if "name" in result:
- function = getattr(self.functions, result["name"])
- function_result = None
- try:
- function_result = function(**result["parameters"])
- except Exception as e:
- print(e)
-
- # Add the function result to the system prompt
- if function_result:
- system_prompt = self.valves.TEMPLATE.replace(
- "{{CONTEXT}}", function_result
- )
-
- print(system_prompt)
- messages = add_or_update_system_message(
- system_prompt, body["messages"]
- )
-
- # Return the updated messages
- return {**body, "messages": messages}
-
- except Exception as e:
- print(f"Error: {e}")
-
- if r:
- try:
- print(r.json())
- except:
- pass
-
- return body
+ self.tools = self.Tools(self)
diff --git a/main.py b/main.py
index f095089..ebd41c7 100644
--- a/main.py
+++ b/main.py
@@ -203,7 +203,6 @@ async def get_models():
Returns the available pipelines
"""
app.state.PIPELINES = get_all_pipelines()
-
return {
"data": [
{
diff --git a/utils/main.py b/utils/main.py
index 7a8b55b..f5830b9 100644
--- a/utils/main.py
+++ b/utils/main.py
@@ -80,12 +80,11 @@ def doc_to_dict(docstring):
return ret_dict
-def get_function_specs(functions) -> List[dict]:
-
+def get_tools_specs(tools) -> List[dict]:
function_list = [
- {"name": func, "function": getattr(functions, func)}
- for func in dir(functions)
- if callable(getattr(functions, func)) and not func.startswith("__")
+ {"name": func, "function": getattr(tools, func)}
+ for func in dir(tools)
+ if callable(getattr(tools, func)) and not func.startswith("__")
]
specs = []