From 3cb201d2d6400f93ee0d1d9dac46382ffcc8ef64 Mon Sep 17 00:00:00 2001 From: Lorenzo <57605930+lmtr0@users.noreply.github.com> Date: Wed, 19 Feb 2025 09:32:27 -0300 Subject: [PATCH 1/6] Update openai_manifold_pipeline.py added support for o1 and o3 models --- examples/pipelines/providers/openai_manifold_pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/pipelines/providers/openai_manifold_pipeline.py b/examples/pipelines/providers/openai_manifold_pipeline.py index 0d667f5..e46bf04 100644 --- a/examples/pipelines/providers/openai_manifold_pipeline.py +++ b/examples/pipelines/providers/openai_manifold_pipeline.py @@ -66,7 +66,7 @@ class Pipeline: "name": model["name"] if "name" in model else model["id"], } for model in models["data"] - if "gpt" in model["id"] + if "gpt" in model["id"] or "o1" in model["id"] or "o3" in model["id"] ] except Exception as e: From 138c90a153ee89283a8f2197c5dd8591ed9b0e8b Mon Sep 17 00:00:00 2001 From: Timothy Jaeryang Baek Date: Thu, 20 Feb 2025 11:33:14 -0800 Subject: [PATCH 2/6] Update README.md --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 41ad052..eea6f65 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,8 @@ # Pipelines: UI-Agnostic OpenAI API Plugin Framework > [!TIP] +> You probably **don't** need Pipelines! +> > If your goal is simply to add support for additional providers like Anthropic or basic filters, you likely don't need Pipelines . For those cases, Open WebUI Functions are a better fit—it's built-in, much more convenient, and easier to configure. Pipelines, however, comes into play when you're dealing with computationally heavy tasks (e.g., running large models or complex logic) that you want to offload from your main Open WebUI instance for better performance and scalability. From 10662e224c2bde94ffd9cc56208c8d9d1c70f2ae Mon Sep 17 00:00:00 2001 From: Timothy Jaeryang Baek Date: Thu, 20 Feb 2025 11:33:33 -0800 Subject: [PATCH 3/6] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index eea6f65..22c11db 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ # Pipelines: UI-Agnostic OpenAI API Plugin Framework > [!TIP] -> You probably **don't** need Pipelines! +> **You probably don't need Pipelines!** > > If your goal is simply to add support for additional providers like Anthropic or basic filters, you likely don't need Pipelines . For those cases, Open WebUI Functions are a better fit—it's built-in, much more convenient, and easier to configure. Pipelines, however, comes into play when you're dealing with computationally heavy tasks (e.g., running large models or complex logic) that you want to offload from your main Open WebUI instance for better performance and scalability. From e7efb32728c83392442770c9b1c18547880c75f1 Mon Sep 17 00:00:00 2001 From: ther3zz <40278044+ther3zz@users.noreply.github.com> Date: Thu, 20 Feb 2025 15:13:15 -0500 Subject: [PATCH 4/6] Update langfuse_filter_pipeline.py fixes issues with messages not properly being sent to langfuse. also adds the system prompt if any and sources if any --- examples/filters/langfuse_filter_pipeline.py | 225 ++++++++++++++----- 1 file changed, 166 insertions(+), 59 deletions(-) diff --git a/examples/filters/langfuse_filter_pipeline.py b/examples/filters/langfuse_filter_pipeline.py index c80ea01..8c05f46 100644 --- a/examples/filters/langfuse_filter_pipeline.py +++ b/examples/filters/langfuse_filter_pipeline.py @@ -1,8 +1,8 @@ """ title: Langfuse Filter Pipeline author: open-webui -date: 2024-09-27 -version: 1.4 +date: 2025-02-20 +version: 1.5 license: MIT description: A filter pipeline that uses Langfuse. requirements: langfuse @@ -11,12 +11,14 @@ requirements: langfuse from typing import List, Optional import os import uuid +import json from utils.pipelines.main import get_last_assistant_message from pydantic import BaseModel from langfuse import Langfuse from langfuse.api.resources.commons.errors.unauthorized_error import UnauthorizedError + def get_last_assistant_message_obj(messages: List[dict]) -> dict: for message in reversed(messages): if message["role"] == "assistant": @@ -31,31 +33,48 @@ class Pipeline: secret_key: str public_key: str host: str + debug: bool = False def __init__(self): self.type = "filter" self.name = "Langfuse Filter" + self.valves = self.Valves( **{ "pipelines": ["*"], "secret_key": os.getenv("LANGFUSE_SECRET_KEY", "your-secret-key-here"), "public_key": os.getenv("LANGFUSE_PUBLIC_KEY", "your-public-key-here"), "host": os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com"), + "debug": os.getenv("DEBUG_MODE", "false").lower() == "true", } ) + self.langfuse = None + # Keep track of the trace and the last-created generation for each chat_id self.chat_traces = {} self.chat_generations = {} + self.suppressed_logs = set() + + def log(self, message: str, suppress_repeats: bool = False): + """Logs messages to the terminal if debugging is enabled.""" + if self.valves.debug: + if suppress_repeats: + if message in self.suppressed_logs: + return + self.suppressed_logs.add(message) + print(f"[DEBUG] {message}") async def on_startup(self): - print(f"on_startup:{__name__}") + self.log(f"on_startup triggered for {__name__}") self.set_langfuse() async def on_shutdown(self): - print(f"on_shutdown:{__name__}") - self.langfuse.flush() + self.log(f"on_shutdown triggered for {__name__}") + if self.langfuse: + self.langfuse.flush() async def on_valves_updated(self): + self.log("Valves updated, resetting Langfuse client.") self.set_langfuse() def set_langfuse(self): @@ -64,76 +83,161 @@ class Pipeline: secret_key=self.valves.secret_key, public_key=self.valves.public_key, host=self.valves.host, - debug=False, + debug=self.valves.debug, ) self.langfuse.auth_check() + self.log("Langfuse client initialized successfully.") except UnauthorizedError: print( "Langfuse credentials incorrect. Please re-enter your Langfuse credentials in the pipeline settings." ) except Exception as e: - print(f"Langfuse error: {e} Please re-enter your Langfuse credentials in the pipeline settings.") + print( + f"Langfuse error: {e} Please re-enter your Langfuse credentials in the pipeline settings." + ) async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: - print(f"inlet:{__name__}") - print(f"Received body: {body}") - print(f"User: {user}") + """ + Inlet handles the incoming request (usually a user message). + - If no trace exists yet for this chat_id, we create a new trace. + - If a trace does exist, we simply create a new generation for the new user message. + """ + if self.valves.debug: + print(f"[DEBUG] Received request: {json.dumps(body, indent=2)}") - # Check for presence of required keys and generate chat_id if missing - if "chat_id" not in body.get("metadata", {}): - unique_id = f"SYSTEM MESSAGE {uuid.uuid4()}" - # Ensure the metadata key exists before assigning chat_id - if "metadata" not in body: - body["metadata"] = {} # Correct this indentation - body["metadata"]["chat_id"] = unique_id - print(f"chat_id was missing, set to: {unique_id}") + self.log(f"Inlet function called with body: {body} and user: {user}") + + metadata = body.get("metadata", {}) + + # --------------------------------------------------------- + # Prepend the system prompt from metadata to the system message: + model_info = metadata.get("model", {}) + params_info = model_info.get("params", {}) + system_prompt = params_info.get("system", "") + + if system_prompt: + for msg in body["messages"]: + if msg.get("role") == "system": + # Only prepend if it hasn't already been prepended: + if not msg["content"].startswith("System Prompt:"): + msg["content"] = f"System Prompt:\n{system_prompt}\n\n{msg['content']}" + break + # --------------------------------------------------------- + + # Fix SYSTEM MESSAGE prefix issue: Only apply for "task_generation" + if "chat_id" not in metadata: + if "task_generation" in metadata.get("type", "").lower(): + chat_id = f"SYSTEM MESSAGE {uuid.uuid4()}" + self.log(f"Task Generation detected, assigned SYSTEM MESSAGE ID: {chat_id}") + else: + chat_id = str(uuid.uuid4()) # Regular chat messages + self.log(f"Assigned normal chat_id: {chat_id}") + + metadata["chat_id"] = chat_id + body["metadata"] = metadata + else: + chat_id = metadata["chat_id"] required_keys = ["model", "messages"] missing_keys = [key for key in required_keys if key not in body] - if missing_keys: error_message = f"Error: Missing keys in the request body: {', '.join(missing_keys)}" - print(error_message) + self.log(error_message) raise ValueError(error_message) - user_id = user.get("id") if user else None - user_name = user.get("name") if user else None user_email = user.get("email") if user else None - trace = self.langfuse.trace( - name=f"filter:{__name__}", - input=body, - user_id=user_email, - metadata={"user_name": user_name, "user_id": user_id,"chat_id": body["metadata"]["chat_id"]}, - session_id=body["metadata"]["chat_id"], - ) + # Check if we already have a trace for this chat + if chat_id not in self.chat_traces: + # Create a new trace and generation + self.log(f"Creating new chat trace for chat_id: {chat_id}") - generation = trace.generation( - name=body["metadata"]["chat_id"], - model=body["model"], - input=body["messages"], - metadata={"interface": "open-webui"}, - ) + trace_payload = { + "name": f"filter:{__name__}", + "input": body, + "user_id": user_email, + "metadata": {"chat_id": chat_id}, + "session_id": chat_id, + } - self.chat_traces[body["metadata"]["chat_id"]] = trace - self.chat_generations[body["metadata"]["chat_id"]] = generation + if self.valves.debug: + print(f"[DEBUG] Langfuse trace request: {json.dumps(trace_payload, indent=2)}") + + trace = self.langfuse.trace(**trace_payload) + + generation_payload = { + "name": chat_id, + "model": body["model"], + "input": body["messages"], + "metadata": {"interface": "open-webui"}, + } + + if self.valves.debug: + print(f"[DEBUG] Langfuse generation request: {json.dumps(generation_payload, indent=2)}") + + generation = trace.generation(**generation_payload) + + self.chat_traces[chat_id] = trace + self.chat_generations[chat_id] = generation + self.log(f"Trace and generation objects successfully created for chat_id: {chat_id}") + + else: + # Re-use existing trace but create a new generation for each new message + self.log(f"Re-using existing chat trace for chat_id: {chat_id}") + trace = self.chat_traces[chat_id] + + new_generation_payload = { + "name": f"{chat_id}:{str(uuid.uuid4())}", + "model": body["model"], + "input": body["messages"], + "metadata": {"interface": "open-webui"}, + } + if self.valves.debug: + print(f"[DEBUG] Langfuse new_generation request: {json.dumps(new_generation_payload, indent=2)}") + + new_generation = trace.generation(**new_generation_payload) + self.chat_generations[chat_id] = new_generation return body async def outlet(self, body: dict, user: Optional[dict] = None) -> dict: - print(f"outlet:{__name__}") - print(f"Received body: {body}") - if body["chat_id"] not in self.chat_generations or body["chat_id"] not in self.chat_traces: - return body + """ + Outlet handles the response body (usually the assistant message). + It will finalize/end the generation created for the user request. + """ + self.log(f"Outlet function called with body: {body}") - trace = self.chat_traces[body["chat_id"]] - generation = self.chat_generations[body["chat_id"]] + chat_id = body.get("chat_id") + + # If no trace or generation exist, attempt to register again + if chat_id not in self.chat_traces or chat_id not in self.chat_generations: + self.log(f"[WARNING] No matching chat trace found for chat_id: {chat_id}, attempting to re-register.") + return await self.inlet(body, user) + + trace = self.chat_traces[chat_id] + generation = self.chat_generations[chat_id] + + # Get the last assistant message from the conversation assistant_message = get_last_assistant_message(body["messages"]) - - - # Extract usage information for models that support it - usage = None assistant_message_obj = get_last_assistant_message_obj(body["messages"]) + + # --------------------------------------------------------- + # If the outlet contains a sources array, append it after the "System Prompt:" + # section in the system message: + if assistant_message_obj and "sources" in assistant_message_obj and assistant_message_obj["sources"]: + for msg in body["messages"]: + if msg.get("role") == "system": + if msg["content"].startswith("System Prompt:"): + # Format the sources nicely + sources_str = "\n\n".join( + json.dumps(src, indent=2) for src in assistant_message_obj["sources"] + ) + msg["content"] += f"\n\nSources:\n{sources_str}" + break + # --------------------------------------------------------- + + # Extract usage if available + usage = None if assistant_message_obj: info = assistant_message_obj.get("info", {}) if isinstance(info, dict): @@ -145,19 +249,22 @@ class Pipeline: "output": output_tokens, "unit": "TOKENS", } + self.log(f"Usage data extracted: {usage}") - # Update generation - trace.update( - output=assistant_message, - ) - generation.end( - output=assistant_message, - metadata={"interface": "open-webui"}, - usage=usage, - ) + # Optionally update the trace with the final assistant output + trace.update(output=assistant_message) - # Clean up the chat_generations dictionary - del self.chat_traces[body["chat_id"]] - del self.chat_generations[body["chat_id"]] + # End the generation with the final assistant message and updated conversation + generation_payload = { + "input": body["messages"], # include the entire conversation + "metadata": {"interface": "open-webui"}, + "usage": usage, + } + + if self.valves.debug: + print(f"[DEBUG] Langfuse generation end request: {json.dumps(generation_payload, indent=2)}") + + generation.end(**generation_payload) + self.log(f"Generation ended for chat_id: {chat_id}") return body From afde1aafb0fb852d4b1cc77f87618be67dfcccfe Mon Sep 17 00:00:00 2001 From: Federico B Date: Mon, 24 Feb 2025 23:16:41 +0100 Subject: [PATCH 5/6] Add claude-3-7-sonnet-20250219 Model Added support for the latest Claude-3-7-Sonnet model (version 20250219) from Anthropic in the OpenWebUI pipeline. --- examples/pipelines/providers/anthropic_manifold_pipeline.py | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/pipelines/providers/anthropic_manifold_pipeline.py b/examples/pipelines/providers/anthropic_manifold_pipeline.py index 98b27d5..3540c61 100644 --- a/examples/pipelines/providers/anthropic_manifold_pipeline.py +++ b/examples/pipelines/providers/anthropic_manifold_pipeline.py @@ -48,6 +48,7 @@ class Pipeline: {"id": "claude-3-sonnet-20240229", "name": "claude-3-sonnet"}, {"id": "claude-3-5-haiku-20241022", "name": "claude-3.5-haiku"}, {"id": "claude-3-5-sonnet-20241022", "name": "claude-3.5-sonnet"}, + {"id": "claude-3-7-sonnet-20250219", "name": "claude-3.7-sonnet"}, ] async def on_startup(self): From 10242dc8a21a6c5c7ce7e51efa21ea171875fb83 Mon Sep 17 00:00:00 2001 From: Mikey O'Brien Date: Mon, 24 Feb 2025 19:33:54 -0600 Subject: [PATCH 6/6] Add thinking support for claude-3-7-sonnet --- .../providers/anthropic_manifold_pipeline.py | 172 ++++++++++++++---- 1 file changed, 134 insertions(+), 38 deletions(-) diff --git a/examples/pipelines/providers/anthropic_manifold_pipeline.py b/examples/pipelines/providers/anthropic_manifold_pipeline.py index 3540c61..29dd2d7 100644 --- a/examples/pipelines/providers/anthropic_manifold_pipeline.py +++ b/examples/pipelines/providers/anthropic_manifold_pipeline.py @@ -6,7 +6,7 @@ version: 1.4 license: MIT description: A pipeline for generating text and processing images using the Anthropic API. requirements: requests, sseclient-py -environment_variables: ANTHROPIC_API_KEY +environment_variables: ANTHROPIC_API_KEY, ANTHROPIC_THINKING_BUDGET_TOKENS, ANTHROPIC_ENABLE_THINKING """ import os @@ -18,6 +18,17 @@ import sseclient from utils.pipelines.main import pop_system_message +REASONING_EFFORT_BUDGET_TOKEN_MAP = { + "none": None, + "low": 1024, + "medium": 4096, + "high": 16384, + "max": 32768, +} + +# Maximum combined token limit for Claude 3.7 +MAX_COMBINED_TOKENS = 64000 + class Pipeline: class Valves(BaseModel): @@ -29,16 +40,20 @@ class Pipeline: self.name = "anthropic/" self.valves = self.Valves( - **{"ANTHROPIC_API_KEY": os.getenv("ANTHROPIC_API_KEY", "your-api-key-here")} + **{ + "ANTHROPIC_API_KEY": os.getenv( + "ANTHROPIC_API_KEY", "your-api-key-here" + ), + } ) - self.url = 'https://api.anthropic.com/v1/messages' + self.url = "https://api.anthropic.com/v1/messages" self.update_headers() def update_headers(self): self.headers = { - 'anthropic-version': '2023-06-01', - 'content-type': 'application/json', - 'x-api-key': self.valves.ANTHROPIC_API_KEY + "anthropic-version": "2023-06-01", + "content-type": "application/json", + "x-api-key": self.valves.ANTHROPIC_API_KEY, } def get_anthropic_models(self): @@ -88,7 +103,7 @@ class Pipeline: ) -> Union[str, Generator, Iterator]: try: # Remove unnecessary keys - for key in ['user', 'chat_id', 'title']: + for key in ["user", "chat_id", "title"]: body.pop(key, None) system_message, messages = pop_system_message(messages) @@ -102,28 +117,40 @@ class Pipeline: if isinstance(message.get("content"), list): for item in message["content"]: if item["type"] == "text": - processed_content.append({"type": "text", "text": item["text"]}) + processed_content.append( + {"type": "text", "text": item["text"]} + ) elif item["type"] == "image_url": if image_count >= 5: - raise ValueError("Maximum of 5 images per API call exceeded") + raise ValueError( + "Maximum of 5 images per API call exceeded" + ) processed_image = self.process_image(item["image_url"]) processed_content.append(processed_image) if processed_image["source"]["type"] == "base64": - image_size = len(processed_image["source"]["data"]) * 3 / 4 + image_size = ( + len(processed_image["source"]["data"]) * 3 / 4 + ) else: image_size = 0 total_image_size += image_size if total_image_size > 100 * 1024 * 1024: - raise ValueError("Total size of images exceeds 100 MB limit") + raise ValueError( + "Total size of images exceeds 100 MB limit" + ) image_count += 1 else: - processed_content = [{"type": "text", "text": message.get("content", "")}] + processed_content = [ + {"type": "text", "text": message.get("content", "")} + ] - processed_messages.append({"role": message["role"], "content": processed_content}) + processed_messages.append( + {"role": message["role"], "content": processed_content} + ) # Prepare the payload payload = { @@ -139,6 +166,42 @@ class Pipeline: } if body.get("stream", False): + supports_thinking = "claude-3-7" in model_id + reasoning_effort = body.get("reasoning_effort", "none") + budget_tokens = REASONING_EFFORT_BUDGET_TOKEN_MAP.get(reasoning_effort) + + # Allow users to input an integer value representing budget tokens + if ( + not budget_tokens + and reasoning_effort not in REASONING_EFFORT_BUDGET_TOKEN_MAP.keys() + ): + try: + budget_tokens = int(reasoning_effort) + except ValueError as e: + print("Failed to convert reasoning effort to int", e) + budget_tokens = None + + if supports_thinking and budget_tokens: + # Check if the combined tokens (budget_tokens + max_tokens) exceeds the limit + max_tokens = payload.get("max_tokens", 4096) + combined_tokens = budget_tokens + max_tokens + + if combined_tokens > MAX_COMBINED_TOKENS: + error_message = f"Error: Combined tokens (budget_tokens {budget_tokens} + max_tokens {max_tokens} = {combined_tokens}) exceeds the maximum limit of {MAX_COMBINED_TOKENS}" + print(error_message) + return error_message + + payload["max_tokens"] = combined_tokens + payload["thinking"] = { + "type": "enabled", + "budget_tokens": budget_tokens, + } + # Thinking requires temperature 1.0 and does not support top_p, top_k + payload["temperature"] = 1.0 + if "top_k" in payload: + del payload["top_k"] + if "top_p" in payload: + del payload["top_p"] return self.stream_response(payload) else: return self.get_completion(payload) @@ -146,31 +209,64 @@ class Pipeline: return f"Error: {e}" def stream_response(self, payload: dict) -> Generator: - response = requests.post(self.url, headers=self.headers, json=payload, stream=True) + """Used for title and tag generation""" + try: + response = requests.post( + self.url, headers=self.headers, json=payload, stream=True + ) + print(f"{response} for {payload}") - if response.status_code == 200: - client = sseclient.SSEClient(response) - for event in client.events(): - try: - data = json.loads(event.data) - if data["type"] == "content_block_start": - yield data["content_block"]["text"] - elif data["type"] == "content_block_delta": - yield data["delta"]["text"] - elif data["type"] == "message_stop": - break - except json.JSONDecodeError: - print(f"Failed to parse JSON: {event.data}") - except KeyError as e: - print(f"Unexpected data structure: {e}") - print(f"Full data: {data}") - else: - raise Exception(f"Error: {response.status_code} - {response.text}") + if response.status_code == 200: + client = sseclient.SSEClient(response) + for event in client.events(): + try: + data = json.loads(event.data) + if data["type"] == "content_block_start": + if data["content_block"]["type"] == "thinking": + yield "" + else: + yield data["content_block"]["text"] + elif data["type"] == "content_block_delta": + if data["delta"]["type"] == "thinking_delta": + yield data["delta"]["thinking"] + elif data["delta"]["type"] == "signature_delta": + yield "\n \n\n" + else: + yield data["delta"]["text"] + elif data["type"] == "message_stop": + break + except json.JSONDecodeError: + print(f"Failed to parse JSON: {event.data}") + yield f"Error: Failed to parse JSON response" + except KeyError as e: + print(f"Unexpected data structure: {e} for payload {payload}") + print(f"Full data: {data}") + yield f"Error: Unexpected data structure: {e}" + else: + error_message = f"Error: {response.status_code} - {response.text}" + print(error_message) + yield error_message + except Exception as e: + error_message = f"Error: {str(e)}" + print(error_message) + yield error_message def get_completion(self, payload: dict) -> str: - response = requests.post(self.url, headers=self.headers, json=payload) - if response.status_code == 200: - res = response.json() - return res["content"][0]["text"] if "content" in res and res["content"] else "" - else: - raise Exception(f"Error: {response.status_code} - {response.text}") + try: + response = requests.post(self.url, headers=self.headers, json=payload) + print(response, payload) + if response.status_code == 200: + res = response.json() + for content in res["content"]: + if not content.get("text"): + continue + return content["text"] + return "" + else: + error_message = f"Error: {response.status_code} - {response.text}" + print(error_message) + return error_message + except Exception as e: + error_message = f"Error: {str(e)}" + print(error_message) + return error_message