Merge branch 'open-webui:main' into main

This commit is contained in:
Quantuary 2025-03-14 22:03:15 +10:00 committed by GitHub
commit 87974ac4c5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 304 additions and 98 deletions

View File

@ -5,6 +5,8 @@
# Pipelines: UI-Agnostic OpenAI API Plugin Framework # Pipelines: UI-Agnostic OpenAI API Plugin Framework
> [!TIP] > [!TIP]
> **You probably don't need Pipelines!**
>
> If your goal is simply to add support for additional providers like Anthropic or basic filters, you likely don't need Pipelines . For those cases, Open WebUI Functions are a better fit—it's built-in, much more convenient, and easier to configure. Pipelines, however, comes into play when you're dealing with computationally heavy tasks (e.g., running large models or complex logic) that you want to offload from your main Open WebUI instance for better performance and scalability. > If your goal is simply to add support for additional providers like Anthropic or basic filters, you likely don't need Pipelines . For those cases, Open WebUI Functions are a better fit—it's built-in, much more convenient, and easier to configure. Pipelines, however, comes into play when you're dealing with computationally heavy tasks (e.g., running large models or complex logic) that you want to offload from your main Open WebUI instance for better performance and scalability.

View File

@ -1,8 +1,8 @@
""" """
title: Langfuse Filter Pipeline title: Langfuse Filter Pipeline
author: open-webui author: open-webui
date: 2024-09-27 date: 2025-02-20
version: 1.4 version: 1.5
license: MIT license: MIT
description: A filter pipeline that uses Langfuse. description: A filter pipeline that uses Langfuse.
requirements: langfuse requirements: langfuse
@ -11,12 +11,14 @@ requirements: langfuse
from typing import List, Optional from typing import List, Optional
import os import os
import uuid import uuid
import json
from utils.pipelines.main import get_last_assistant_message from utils.pipelines.main import get_last_assistant_message
from pydantic import BaseModel from pydantic import BaseModel
from langfuse import Langfuse from langfuse import Langfuse
from langfuse.api.resources.commons.errors.unauthorized_error import UnauthorizedError from langfuse.api.resources.commons.errors.unauthorized_error import UnauthorizedError
def get_last_assistant_message_obj(messages: List[dict]) -> dict: def get_last_assistant_message_obj(messages: List[dict]) -> dict:
for message in reversed(messages): for message in reversed(messages):
if message["role"] == "assistant": if message["role"] == "assistant":
@ -31,31 +33,48 @@ class Pipeline:
secret_key: str secret_key: str
public_key: str public_key: str
host: str host: str
debug: bool = False
def __init__(self): def __init__(self):
self.type = "filter" self.type = "filter"
self.name = "Langfuse Filter" self.name = "Langfuse Filter"
self.valves = self.Valves( self.valves = self.Valves(
**{ **{
"pipelines": ["*"], "pipelines": ["*"],
"secret_key": os.getenv("LANGFUSE_SECRET_KEY", "your-secret-key-here"), "secret_key": os.getenv("LANGFUSE_SECRET_KEY", "your-secret-key-here"),
"public_key": os.getenv("LANGFUSE_PUBLIC_KEY", "your-public-key-here"), "public_key": os.getenv("LANGFUSE_PUBLIC_KEY", "your-public-key-here"),
"host": os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com"), "host": os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com"),
"debug": os.getenv("DEBUG_MODE", "false").lower() == "true",
} }
) )
self.langfuse = None self.langfuse = None
# Keep track of the trace and the last-created generation for each chat_id
self.chat_traces = {} self.chat_traces = {}
self.chat_generations = {} self.chat_generations = {}
self.suppressed_logs = set()
def log(self, message: str, suppress_repeats: bool = False):
"""Logs messages to the terminal if debugging is enabled."""
if self.valves.debug:
if suppress_repeats:
if message in self.suppressed_logs:
return
self.suppressed_logs.add(message)
print(f"[DEBUG] {message}")
async def on_startup(self): async def on_startup(self):
print(f"on_startup:{__name__}") self.log(f"on_startup triggered for {__name__}")
self.set_langfuse() self.set_langfuse()
async def on_shutdown(self): async def on_shutdown(self):
print(f"on_shutdown:{__name__}") self.log(f"on_shutdown triggered for {__name__}")
if self.langfuse:
self.langfuse.flush() self.langfuse.flush()
async def on_valves_updated(self): async def on_valves_updated(self):
self.log("Valves updated, resetting Langfuse client.")
self.set_langfuse() self.set_langfuse()
def set_langfuse(self): def set_langfuse(self):
@ -64,76 +83,161 @@ class Pipeline:
secret_key=self.valves.secret_key, secret_key=self.valves.secret_key,
public_key=self.valves.public_key, public_key=self.valves.public_key,
host=self.valves.host, host=self.valves.host,
debug=False, debug=self.valves.debug,
) )
self.langfuse.auth_check() self.langfuse.auth_check()
self.log("Langfuse client initialized successfully.")
except UnauthorizedError: except UnauthorizedError:
print( print(
"Langfuse credentials incorrect. Please re-enter your Langfuse credentials in the pipeline settings." "Langfuse credentials incorrect. Please re-enter your Langfuse credentials in the pipeline settings."
) )
except Exception as e: except Exception as e:
print(f"Langfuse error: {e} Please re-enter your Langfuse credentials in the pipeline settings.") print(
f"Langfuse error: {e} Please re-enter your Langfuse credentials in the pipeline settings."
)
async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
print(f"inlet:{__name__}") """
print(f"Received body: {body}") Inlet handles the incoming request (usually a user message).
print(f"User: {user}") - If no trace exists yet for this chat_id, we create a new trace.
- If a trace does exist, we simply create a new generation for the new user message.
"""
if self.valves.debug:
print(f"[DEBUG] Received request: {json.dumps(body, indent=2)}")
# Check for presence of required keys and generate chat_id if missing self.log(f"Inlet function called with body: {body} and user: {user}")
if "chat_id" not in body.get("metadata", {}):
unique_id = f"SYSTEM MESSAGE {uuid.uuid4()}" metadata = body.get("metadata", {})
# Ensure the metadata key exists before assigning chat_id
if "metadata" not in body: # ---------------------------------------------------------
body["metadata"] = {} # Correct this indentation # Prepend the system prompt from metadata to the system message:
body["metadata"]["chat_id"] = unique_id model_info = metadata.get("model", {})
print(f"chat_id was missing, set to: {unique_id}") params_info = model_info.get("params", {})
system_prompt = params_info.get("system", "")
if system_prompt:
for msg in body["messages"]:
if msg.get("role") == "system":
# Only prepend if it hasn't already been prepended:
if not msg["content"].startswith("System Prompt:"):
msg["content"] = f"System Prompt:\n{system_prompt}\n\n{msg['content']}"
break
# ---------------------------------------------------------
# Fix SYSTEM MESSAGE prefix issue: Only apply for "task_generation"
if "chat_id" not in metadata:
if "task_generation" in metadata.get("type", "").lower():
chat_id = f"SYSTEM MESSAGE {uuid.uuid4()}"
self.log(f"Task Generation detected, assigned SYSTEM MESSAGE ID: {chat_id}")
else:
chat_id = str(uuid.uuid4()) # Regular chat messages
self.log(f"Assigned normal chat_id: {chat_id}")
metadata["chat_id"] = chat_id
body["metadata"] = metadata
else:
chat_id = metadata["chat_id"]
required_keys = ["model", "messages"] required_keys = ["model", "messages"]
missing_keys = [key for key in required_keys if key not in body] missing_keys = [key for key in required_keys if key not in body]
if missing_keys: if missing_keys:
error_message = f"Error: Missing keys in the request body: {', '.join(missing_keys)}" error_message = f"Error: Missing keys in the request body: {', '.join(missing_keys)}"
print(error_message) self.log(error_message)
raise ValueError(error_message) raise ValueError(error_message)
user_id = user.get("id") if user else None
user_name = user.get("name") if user else None
user_email = user.get("email") if user else None user_email = user.get("email") if user else None
trace = self.langfuse.trace( # Check if we already have a trace for this chat
name=f"filter:{__name__}", if chat_id not in self.chat_traces:
input=body, # Create a new trace and generation
user_id=user_email, self.log(f"Creating new chat trace for chat_id: {chat_id}")
metadata={"user_name": user_name, "user_id": user_id,"chat_id": body["metadata"]["chat_id"]},
session_id=body["metadata"]["chat_id"],
)
generation = trace.generation( trace_payload = {
name=body["metadata"]["chat_id"], "name": f"filter:{__name__}",
model=body["model"], "input": body,
input=body["messages"], "user_id": user_email,
metadata={"interface": "open-webui"}, "metadata": {"chat_id": chat_id},
) "session_id": chat_id,
}
self.chat_traces[body["metadata"]["chat_id"]] = trace if self.valves.debug:
self.chat_generations[body["metadata"]["chat_id"]] = generation print(f"[DEBUG] Langfuse trace request: {json.dumps(trace_payload, indent=2)}")
trace = self.langfuse.trace(**trace_payload)
generation_payload = {
"name": chat_id,
"model": body["model"],
"input": body["messages"],
"metadata": {"interface": "open-webui"},
}
if self.valves.debug:
print(f"[DEBUG] Langfuse generation request: {json.dumps(generation_payload, indent=2)}")
generation = trace.generation(**generation_payload)
self.chat_traces[chat_id] = trace
self.chat_generations[chat_id] = generation
self.log(f"Trace and generation objects successfully created for chat_id: {chat_id}")
else:
# Re-use existing trace but create a new generation for each new message
self.log(f"Re-using existing chat trace for chat_id: {chat_id}")
trace = self.chat_traces[chat_id]
new_generation_payload = {
"name": f"{chat_id}:{str(uuid.uuid4())}",
"model": body["model"],
"input": body["messages"],
"metadata": {"interface": "open-webui"},
}
if self.valves.debug:
print(f"[DEBUG] Langfuse new_generation request: {json.dumps(new_generation_payload, indent=2)}")
new_generation = trace.generation(**new_generation_payload)
self.chat_generations[chat_id] = new_generation
return body return body
async def outlet(self, body: dict, user: Optional[dict] = None) -> dict: async def outlet(self, body: dict, user: Optional[dict] = None) -> dict:
print(f"outlet:{__name__}") """
print(f"Received body: {body}") Outlet handles the response body (usually the assistant message).
if body["chat_id"] not in self.chat_generations or body["chat_id"] not in self.chat_traces: It will finalize/end the generation created for the user request.
return body """
self.log(f"Outlet function called with body: {body}")
trace = self.chat_traces[body["chat_id"]] chat_id = body.get("chat_id")
generation = self.chat_generations[body["chat_id"]]
# If no trace or generation exist, attempt to register again
if chat_id not in self.chat_traces or chat_id not in self.chat_generations:
self.log(f"[WARNING] No matching chat trace found for chat_id: {chat_id}, attempting to re-register.")
return await self.inlet(body, user)
trace = self.chat_traces[chat_id]
generation = self.chat_generations[chat_id]
# Get the last assistant message from the conversation
assistant_message = get_last_assistant_message(body["messages"]) assistant_message = get_last_assistant_message(body["messages"])
# Extract usage information for models that support it
usage = None
assistant_message_obj = get_last_assistant_message_obj(body["messages"]) assistant_message_obj = get_last_assistant_message_obj(body["messages"])
# ---------------------------------------------------------
# If the outlet contains a sources array, append it after the "System Prompt:"
# section in the system message:
if assistant_message_obj and "sources" in assistant_message_obj and assistant_message_obj["sources"]:
for msg in body["messages"]:
if msg.get("role") == "system":
if msg["content"].startswith("System Prompt:"):
# Format the sources nicely
sources_str = "\n\n".join(
json.dumps(src, indent=2) for src in assistant_message_obj["sources"]
)
msg["content"] += f"\n\nSources:\n{sources_str}"
break
# ---------------------------------------------------------
# Extract usage if available
usage = None
if assistant_message_obj: if assistant_message_obj:
info = assistant_message_obj.get("info", {}) info = assistant_message_obj.get("info", {})
if isinstance(info, dict): if isinstance(info, dict):
@ -145,19 +249,22 @@ class Pipeline:
"output": output_tokens, "output": output_tokens,
"unit": "TOKENS", "unit": "TOKENS",
} }
self.log(f"Usage data extracted: {usage}")
# Update generation # Optionally update the trace with the final assistant output
trace.update( trace.update(output=assistant_message)
output=assistant_message,
)
generation.end(
output=assistant_message,
metadata={"interface": "open-webui"},
usage=usage,
)
# Clean up the chat_generations dictionary # End the generation with the final assistant message and updated conversation
del self.chat_traces[body["chat_id"]] generation_payload = {
del self.chat_generations[body["chat_id"]] "input": body["messages"], # include the entire conversation
"metadata": {"interface": "open-webui"},
"usage": usage,
}
if self.valves.debug:
print(f"[DEBUG] Langfuse generation end request: {json.dumps(generation_payload, indent=2)}")
generation.end(**generation_payload)
self.log(f"Generation ended for chat_id: {chat_id}")
return body return body

View File

@ -6,7 +6,7 @@ version: 1.4
license: MIT license: MIT
description: A pipeline for generating text and processing images using the Anthropic API. description: A pipeline for generating text and processing images using the Anthropic API.
requirements: requests, sseclient-py requirements: requests, sseclient-py
environment_variables: ANTHROPIC_API_KEY environment_variables: ANTHROPIC_API_KEY, ANTHROPIC_THINKING_BUDGET_TOKENS, ANTHROPIC_ENABLE_THINKING
""" """
import os import os
@ -18,6 +18,17 @@ import sseclient
from utils.pipelines.main import pop_system_message from utils.pipelines.main import pop_system_message
REASONING_EFFORT_BUDGET_TOKEN_MAP = {
"none": None,
"low": 1024,
"medium": 4096,
"high": 16384,
"max": 32768,
}
# Maximum combined token limit for Claude 3.7
MAX_COMBINED_TOKENS = 64000
class Pipeline: class Pipeline:
class Valves(BaseModel): class Valves(BaseModel):
@ -29,16 +40,20 @@ class Pipeline:
self.name = "anthropic/" self.name = "anthropic/"
self.valves = self.Valves( self.valves = self.Valves(
**{"ANTHROPIC_API_KEY": os.getenv("ANTHROPIC_API_KEY", "your-api-key-here")} **{
"ANTHROPIC_API_KEY": os.getenv(
"ANTHROPIC_API_KEY", "your-api-key-here"
),
}
) )
self.url = 'https://api.anthropic.com/v1/messages' self.url = "https://api.anthropic.com/v1/messages"
self.update_headers() self.update_headers()
def update_headers(self): def update_headers(self):
self.headers = { self.headers = {
'anthropic-version': '2023-06-01', "anthropic-version": "2023-06-01",
'content-type': 'application/json', "content-type": "application/json",
'x-api-key': self.valves.ANTHROPIC_API_KEY "x-api-key": self.valves.ANTHROPIC_API_KEY,
} }
def get_anthropic_models(self): def get_anthropic_models(self):
@ -48,6 +63,7 @@ class Pipeline:
{"id": "claude-3-sonnet-20240229", "name": "claude-3-sonnet"}, {"id": "claude-3-sonnet-20240229", "name": "claude-3-sonnet"},
{"id": "claude-3-5-haiku-20241022", "name": "claude-3.5-haiku"}, {"id": "claude-3-5-haiku-20241022", "name": "claude-3.5-haiku"},
{"id": "claude-3-5-sonnet-20241022", "name": "claude-3.5-sonnet"}, {"id": "claude-3-5-sonnet-20241022", "name": "claude-3.5-sonnet"},
{"id": "claude-3-7-sonnet-20250219", "name": "claude-3.7-sonnet"},
] ]
async def on_startup(self): async def on_startup(self):
@ -87,7 +103,7 @@ class Pipeline:
) -> Union[str, Generator, Iterator]: ) -> Union[str, Generator, Iterator]:
try: try:
# Remove unnecessary keys # Remove unnecessary keys
for key in ['user', 'chat_id', 'title']: for key in ["user", "chat_id", "title"]:
body.pop(key, None) body.pop(key, None)
system_message, messages = pop_system_message(messages) system_message, messages = pop_system_message(messages)
@ -101,28 +117,40 @@ class Pipeline:
if isinstance(message.get("content"), list): if isinstance(message.get("content"), list):
for item in message["content"]: for item in message["content"]:
if item["type"] == "text": if item["type"] == "text":
processed_content.append({"type": "text", "text": item["text"]}) processed_content.append(
{"type": "text", "text": item["text"]}
)
elif item["type"] == "image_url": elif item["type"] == "image_url":
if image_count >= 5: if image_count >= 5:
raise ValueError("Maximum of 5 images per API call exceeded") raise ValueError(
"Maximum of 5 images per API call exceeded"
)
processed_image = self.process_image(item["image_url"]) processed_image = self.process_image(item["image_url"])
processed_content.append(processed_image) processed_content.append(processed_image)
if processed_image["source"]["type"] == "base64": if processed_image["source"]["type"] == "base64":
image_size = len(processed_image["source"]["data"]) * 3 / 4 image_size = (
len(processed_image["source"]["data"]) * 3 / 4
)
else: else:
image_size = 0 image_size = 0
total_image_size += image_size total_image_size += image_size
if total_image_size > 100 * 1024 * 1024: if total_image_size > 100 * 1024 * 1024:
raise ValueError("Total size of images exceeds 100 MB limit") raise ValueError(
"Total size of images exceeds 100 MB limit"
)
image_count += 1 image_count += 1
else: else:
processed_content = [{"type": "text", "text": message.get("content", "")}] processed_content = [
{"type": "text", "text": message.get("content", "")}
]
processed_messages.append({"role": message["role"], "content": processed_content}) processed_messages.append(
{"role": message["role"], "content": processed_content}
)
# Prepare the payload # Prepare the payload
payload = { payload = {
@ -138,6 +166,42 @@ class Pipeline:
} }
if body.get("stream", False): if body.get("stream", False):
supports_thinking = "claude-3-7" in model_id
reasoning_effort = body.get("reasoning_effort", "none")
budget_tokens = REASONING_EFFORT_BUDGET_TOKEN_MAP.get(reasoning_effort)
# Allow users to input an integer value representing budget tokens
if (
not budget_tokens
and reasoning_effort not in REASONING_EFFORT_BUDGET_TOKEN_MAP.keys()
):
try:
budget_tokens = int(reasoning_effort)
except ValueError as e:
print("Failed to convert reasoning effort to int", e)
budget_tokens = None
if supports_thinking and budget_tokens:
# Check if the combined tokens (budget_tokens + max_tokens) exceeds the limit
max_tokens = payload.get("max_tokens", 4096)
combined_tokens = budget_tokens + max_tokens
if combined_tokens > MAX_COMBINED_TOKENS:
error_message = f"Error: Combined tokens (budget_tokens {budget_tokens} + max_tokens {max_tokens} = {combined_tokens}) exceeds the maximum limit of {MAX_COMBINED_TOKENS}"
print(error_message)
return error_message
payload["max_tokens"] = combined_tokens
payload["thinking"] = {
"type": "enabled",
"budget_tokens": budget_tokens,
}
# Thinking requires temperature 1.0 and does not support top_p, top_k
payload["temperature"] = 1.0
if "top_k" in payload:
del payload["top_k"]
if "top_p" in payload:
del payload["top_p"]
return self.stream_response(payload) return self.stream_response(payload)
else: else:
return self.get_completion(payload) return self.get_completion(payload)
@ -145,7 +209,12 @@ class Pipeline:
return f"Error: {e}" return f"Error: {e}"
def stream_response(self, payload: dict) -> Generator: def stream_response(self, payload: dict) -> Generator:
response = requests.post(self.url, headers=self.headers, json=payload, stream=True) """Used for title and tag generation"""
try:
response = requests.post(
self.url, headers=self.headers, json=payload, stream=True
)
print(f"{response} for {payload}")
if response.status_code == 200: if response.status_code == 200:
client = sseclient.SSEClient(response) client = sseclient.SSEClient(response)
@ -153,23 +222,51 @@ class Pipeline:
try: try:
data = json.loads(event.data) data = json.loads(event.data)
if data["type"] == "content_block_start": if data["type"] == "content_block_start":
if data["content_block"]["type"] == "thinking":
yield "<think>"
else:
yield data["content_block"]["text"] yield data["content_block"]["text"]
elif data["type"] == "content_block_delta": elif data["type"] == "content_block_delta":
if data["delta"]["type"] == "thinking_delta":
yield data["delta"]["thinking"]
elif data["delta"]["type"] == "signature_delta":
yield "\n </think> \n\n"
else:
yield data["delta"]["text"] yield data["delta"]["text"]
elif data["type"] == "message_stop": elif data["type"] == "message_stop":
break break
except json.JSONDecodeError: except json.JSONDecodeError:
print(f"Failed to parse JSON: {event.data}") print(f"Failed to parse JSON: {event.data}")
yield f"Error: Failed to parse JSON response"
except KeyError as e: except KeyError as e:
print(f"Unexpected data structure: {e}") print(f"Unexpected data structure: {e} for payload {payload}")
print(f"Full data: {data}") print(f"Full data: {data}")
yield f"Error: Unexpected data structure: {e}"
else: else:
raise Exception(f"Error: {response.status_code} - {response.text}") error_message = f"Error: {response.status_code} - {response.text}"
print(error_message)
yield error_message
except Exception as e:
error_message = f"Error: {str(e)}"
print(error_message)
yield error_message
def get_completion(self, payload: dict) -> str: def get_completion(self, payload: dict) -> str:
try:
response = requests.post(self.url, headers=self.headers, json=payload) response = requests.post(self.url, headers=self.headers, json=payload)
print(response, payload)
if response.status_code == 200: if response.status_code == 200:
res = response.json() res = response.json()
return res["content"][0]["text"] if "content" in res and res["content"] else "" for content in res["content"]:
if not content.get("text"):
continue
return content["text"]
return ""
else: else:
raise Exception(f"Error: {response.status_code} - {response.text}") error_message = f"Error: {response.status_code} - {response.text}"
print(error_message)
return error_message
except Exception as e:
error_message = f"Error: {str(e)}"
print(error_message)
return error_message

View File

@ -66,7 +66,7 @@ class Pipeline:
"name": model["name"] if "name" in model else model["id"], "name": model["name"] if "name" in model else model["id"],
} }
for model in models["data"] for model in models["data"]
if "gpt" in model["id"] if "gpt" in model["id"] or "o1" in model["id"] or "o3" in model["id"]
] ]
except Exception as e: except Exception as e: