From 86adb744f72edfaf4e1a319a3e58f8e37ccb8969 Mon Sep 17 00:00:00 2001 From: Hai Duong Tran Date: Mon, 2 Dec 2024 22:07:08 +0100 Subject: [PATCH 01/41] Add docker-compose installation guide. --- README.md | 41 +++++++++++++++++++++++++++++++++++++++++ docker-compose.yaml | 19 +++++++++++++++++++ 2 files changed, 60 insertions(+) create mode 100644 docker-compose.yaml diff --git a/README.md b/README.md index 5e78ab4..2ce9cb9 100644 --- a/README.md +++ b/README.md @@ -39,6 +39,8 @@ Integrating Pipelines with any OpenAI API-compatible UI client is simple. Launch > [!WARNING] > Pipelines are a plugin system with arbitrary code execution — **don't fetch random pipelines from sources you don't trust**. +### Docker + For a streamlined setup using Docker: 1. **Run the Pipelines container:** @@ -75,6 +77,45 @@ Alternatively, you can directly install pipelines from the admin settings by cop That's it! You're now ready to build customizable AI integrations effortlessly with Pipelines. Enjoy! +### Docker Compose together with Open WebUI + +Using [Docker Compose](https://docs.docker.com/compose/) simplifies the management of multi-container Docker applications. + +Here is an example configuration file `docker-compose.yaml` for setting up Open WebUI together with Pipelines using Docker Compose: + +```yaml +services: + openwebui: + image: ghcr.io/open-webui/open-webui:main + ports: + - "3000:8080" + volumes: + - open-webui:/app/backend/data + + pipelines: + image: ghcr.io/open-webui/pipelines:main + volumes: + - pipelines:/app/pipelines + restart: always + environment: + - PIPELINES_API_KEY=0p3n-w3bu! + +volumes: + open-webui: {} + pipelines: {} +``` + +To start your services, run the following command: + +``` +docker compose up -d +``` + +You can then use `http://pipelines:9099` (the name is the same as the service's name defined in `docker-compose.yaml`) as an API URL to connect to Open WebUI. + +> [!NOTE] +> The `pipelines` service is accessible only by `openwebui` Docker service and thus provide additional layer of security. + ## 📦 Installation and Setup Get started with Pipelines in a few easy steps: diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..399fdbc --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,19 @@ +services: + openwebui: + image: ghcr.io/open-webui/open-webui:main + ports: + - "3000:8080" + volumes: + - open-webui:/app/backend/data + + pipelines: + image: ghcr.io/open-webui/pipelines:main + volumes: + - pipelines:/app/pipelines + restart: always + environment: + - PIPELINES_API_KEY=0p3n-w3bu! + +volumes: + open-webui: {} + pipelines: {} \ No newline at end of file From 1e79a038fe39100c502fd65596b3ed4bb9696afd Mon Sep 17 00:00:00 2001 From: mohamedo rayen <135092019+mohameodo@users.noreply.github.com> Date: Fri, 6 Dec 2024 17:33:44 -0700 Subject: [PATCH 02/41] Fix errors in function calling blueprint, reload function, and URL conversion Add a return statement to the `call_function` method in `blueprints/function_calling_blueprint.py`. * Add a return statement after adding the function result to the system prompt in the `call_function` method. * Ensure the `call_function` method returns the updated messages. --- blueprints/function_calling_blueprint.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/blueprints/function_calling_blueprint.py b/blueprints/function_calling_blueprint.py index f4739b0..ad6a386 100644 --- a/blueprints/function_calling_blueprint.py +++ b/blueprints/function_calling_blueprint.py @@ -137,6 +137,8 @@ And answer according to the language of the user's question.""", # Return the updated messages return messages + return messages + def run_completion(self, system_prompt: str, content: str) -> dict: r = None try: From 138c90a153ee89283a8f2197c5dd8591ed9b0e8b Mon Sep 17 00:00:00 2001 From: Timothy Jaeryang Baek Date: Thu, 20 Feb 2025 11:33:14 -0800 Subject: [PATCH 03/41] Update README.md --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 41ad052..eea6f65 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,8 @@ # Pipelines: UI-Agnostic OpenAI API Plugin Framework > [!TIP] +> You probably **don't** need Pipelines! +> > If your goal is simply to add support for additional providers like Anthropic or basic filters, you likely don't need Pipelines . For those cases, Open WebUI Functions are a better fit—it's built-in, much more convenient, and easier to configure. Pipelines, however, comes into play when you're dealing with computationally heavy tasks (e.g., running large models or complex logic) that you want to offload from your main Open WebUI instance for better performance and scalability. From 10662e224c2bde94ffd9cc56208c8d9d1c70f2ae Mon Sep 17 00:00:00 2001 From: Timothy Jaeryang Baek Date: Thu, 20 Feb 2025 11:33:33 -0800 Subject: [PATCH 04/41] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index eea6f65..22c11db 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ # Pipelines: UI-Agnostic OpenAI API Plugin Framework > [!TIP] -> You probably **don't** need Pipelines! +> **You probably don't need Pipelines!** > > If your goal is simply to add support for additional providers like Anthropic or basic filters, you likely don't need Pipelines . For those cases, Open WebUI Functions are a better fit—it's built-in, much more convenient, and easier to configure. Pipelines, however, comes into play when you're dealing with computationally heavy tasks (e.g., running large models or complex logic) that you want to offload from your main Open WebUI instance for better performance and scalability. From e7efb32728c83392442770c9b1c18547880c75f1 Mon Sep 17 00:00:00 2001 From: ther3zz <40278044+ther3zz@users.noreply.github.com> Date: Thu, 20 Feb 2025 15:13:15 -0500 Subject: [PATCH 05/41] Update langfuse_filter_pipeline.py fixes issues with messages not properly being sent to langfuse. also adds the system prompt if any and sources if any --- examples/filters/langfuse_filter_pipeline.py | 225 ++++++++++++++----- 1 file changed, 166 insertions(+), 59 deletions(-) diff --git a/examples/filters/langfuse_filter_pipeline.py b/examples/filters/langfuse_filter_pipeline.py index c80ea01..8c05f46 100644 --- a/examples/filters/langfuse_filter_pipeline.py +++ b/examples/filters/langfuse_filter_pipeline.py @@ -1,8 +1,8 @@ """ title: Langfuse Filter Pipeline author: open-webui -date: 2024-09-27 -version: 1.4 +date: 2025-02-20 +version: 1.5 license: MIT description: A filter pipeline that uses Langfuse. requirements: langfuse @@ -11,12 +11,14 @@ requirements: langfuse from typing import List, Optional import os import uuid +import json from utils.pipelines.main import get_last_assistant_message from pydantic import BaseModel from langfuse import Langfuse from langfuse.api.resources.commons.errors.unauthorized_error import UnauthorizedError + def get_last_assistant_message_obj(messages: List[dict]) -> dict: for message in reversed(messages): if message["role"] == "assistant": @@ -31,31 +33,48 @@ class Pipeline: secret_key: str public_key: str host: str + debug: bool = False def __init__(self): self.type = "filter" self.name = "Langfuse Filter" + self.valves = self.Valves( **{ "pipelines": ["*"], "secret_key": os.getenv("LANGFUSE_SECRET_KEY", "your-secret-key-here"), "public_key": os.getenv("LANGFUSE_PUBLIC_KEY", "your-public-key-here"), "host": os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com"), + "debug": os.getenv("DEBUG_MODE", "false").lower() == "true", } ) + self.langfuse = None + # Keep track of the trace and the last-created generation for each chat_id self.chat_traces = {} self.chat_generations = {} + self.suppressed_logs = set() + + def log(self, message: str, suppress_repeats: bool = False): + """Logs messages to the terminal if debugging is enabled.""" + if self.valves.debug: + if suppress_repeats: + if message in self.suppressed_logs: + return + self.suppressed_logs.add(message) + print(f"[DEBUG] {message}") async def on_startup(self): - print(f"on_startup:{__name__}") + self.log(f"on_startup triggered for {__name__}") self.set_langfuse() async def on_shutdown(self): - print(f"on_shutdown:{__name__}") - self.langfuse.flush() + self.log(f"on_shutdown triggered for {__name__}") + if self.langfuse: + self.langfuse.flush() async def on_valves_updated(self): + self.log("Valves updated, resetting Langfuse client.") self.set_langfuse() def set_langfuse(self): @@ -64,76 +83,161 @@ class Pipeline: secret_key=self.valves.secret_key, public_key=self.valves.public_key, host=self.valves.host, - debug=False, + debug=self.valves.debug, ) self.langfuse.auth_check() + self.log("Langfuse client initialized successfully.") except UnauthorizedError: print( "Langfuse credentials incorrect. Please re-enter your Langfuse credentials in the pipeline settings." ) except Exception as e: - print(f"Langfuse error: {e} Please re-enter your Langfuse credentials in the pipeline settings.") + print( + f"Langfuse error: {e} Please re-enter your Langfuse credentials in the pipeline settings." + ) async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: - print(f"inlet:{__name__}") - print(f"Received body: {body}") - print(f"User: {user}") + """ + Inlet handles the incoming request (usually a user message). + - If no trace exists yet for this chat_id, we create a new trace. + - If a trace does exist, we simply create a new generation for the new user message. + """ + if self.valves.debug: + print(f"[DEBUG] Received request: {json.dumps(body, indent=2)}") - # Check for presence of required keys and generate chat_id if missing - if "chat_id" not in body.get("metadata", {}): - unique_id = f"SYSTEM MESSAGE {uuid.uuid4()}" - # Ensure the metadata key exists before assigning chat_id - if "metadata" not in body: - body["metadata"] = {} # Correct this indentation - body["metadata"]["chat_id"] = unique_id - print(f"chat_id was missing, set to: {unique_id}") + self.log(f"Inlet function called with body: {body} and user: {user}") + + metadata = body.get("metadata", {}) + + # --------------------------------------------------------- + # Prepend the system prompt from metadata to the system message: + model_info = metadata.get("model", {}) + params_info = model_info.get("params", {}) + system_prompt = params_info.get("system", "") + + if system_prompt: + for msg in body["messages"]: + if msg.get("role") == "system": + # Only prepend if it hasn't already been prepended: + if not msg["content"].startswith("System Prompt:"): + msg["content"] = f"System Prompt:\n{system_prompt}\n\n{msg['content']}" + break + # --------------------------------------------------------- + + # Fix SYSTEM MESSAGE prefix issue: Only apply for "task_generation" + if "chat_id" not in metadata: + if "task_generation" in metadata.get("type", "").lower(): + chat_id = f"SYSTEM MESSAGE {uuid.uuid4()}" + self.log(f"Task Generation detected, assigned SYSTEM MESSAGE ID: {chat_id}") + else: + chat_id = str(uuid.uuid4()) # Regular chat messages + self.log(f"Assigned normal chat_id: {chat_id}") + + metadata["chat_id"] = chat_id + body["metadata"] = metadata + else: + chat_id = metadata["chat_id"] required_keys = ["model", "messages"] missing_keys = [key for key in required_keys if key not in body] - if missing_keys: error_message = f"Error: Missing keys in the request body: {', '.join(missing_keys)}" - print(error_message) + self.log(error_message) raise ValueError(error_message) - user_id = user.get("id") if user else None - user_name = user.get("name") if user else None user_email = user.get("email") if user else None - trace = self.langfuse.trace( - name=f"filter:{__name__}", - input=body, - user_id=user_email, - metadata={"user_name": user_name, "user_id": user_id,"chat_id": body["metadata"]["chat_id"]}, - session_id=body["metadata"]["chat_id"], - ) + # Check if we already have a trace for this chat + if chat_id not in self.chat_traces: + # Create a new trace and generation + self.log(f"Creating new chat trace for chat_id: {chat_id}") - generation = trace.generation( - name=body["metadata"]["chat_id"], - model=body["model"], - input=body["messages"], - metadata={"interface": "open-webui"}, - ) + trace_payload = { + "name": f"filter:{__name__}", + "input": body, + "user_id": user_email, + "metadata": {"chat_id": chat_id}, + "session_id": chat_id, + } - self.chat_traces[body["metadata"]["chat_id"]] = trace - self.chat_generations[body["metadata"]["chat_id"]] = generation + if self.valves.debug: + print(f"[DEBUG] Langfuse trace request: {json.dumps(trace_payload, indent=2)}") + + trace = self.langfuse.trace(**trace_payload) + + generation_payload = { + "name": chat_id, + "model": body["model"], + "input": body["messages"], + "metadata": {"interface": "open-webui"}, + } + + if self.valves.debug: + print(f"[DEBUG] Langfuse generation request: {json.dumps(generation_payload, indent=2)}") + + generation = trace.generation(**generation_payload) + + self.chat_traces[chat_id] = trace + self.chat_generations[chat_id] = generation + self.log(f"Trace and generation objects successfully created for chat_id: {chat_id}") + + else: + # Re-use existing trace but create a new generation for each new message + self.log(f"Re-using existing chat trace for chat_id: {chat_id}") + trace = self.chat_traces[chat_id] + + new_generation_payload = { + "name": f"{chat_id}:{str(uuid.uuid4())}", + "model": body["model"], + "input": body["messages"], + "metadata": {"interface": "open-webui"}, + } + if self.valves.debug: + print(f"[DEBUG] Langfuse new_generation request: {json.dumps(new_generation_payload, indent=2)}") + + new_generation = trace.generation(**new_generation_payload) + self.chat_generations[chat_id] = new_generation return body async def outlet(self, body: dict, user: Optional[dict] = None) -> dict: - print(f"outlet:{__name__}") - print(f"Received body: {body}") - if body["chat_id"] not in self.chat_generations or body["chat_id"] not in self.chat_traces: - return body + """ + Outlet handles the response body (usually the assistant message). + It will finalize/end the generation created for the user request. + """ + self.log(f"Outlet function called with body: {body}") - trace = self.chat_traces[body["chat_id"]] - generation = self.chat_generations[body["chat_id"]] + chat_id = body.get("chat_id") + + # If no trace or generation exist, attempt to register again + if chat_id not in self.chat_traces or chat_id not in self.chat_generations: + self.log(f"[WARNING] No matching chat trace found for chat_id: {chat_id}, attempting to re-register.") + return await self.inlet(body, user) + + trace = self.chat_traces[chat_id] + generation = self.chat_generations[chat_id] + + # Get the last assistant message from the conversation assistant_message = get_last_assistant_message(body["messages"]) - - - # Extract usage information for models that support it - usage = None assistant_message_obj = get_last_assistant_message_obj(body["messages"]) + + # --------------------------------------------------------- + # If the outlet contains a sources array, append it after the "System Prompt:" + # section in the system message: + if assistant_message_obj and "sources" in assistant_message_obj and assistant_message_obj["sources"]: + for msg in body["messages"]: + if msg.get("role") == "system": + if msg["content"].startswith("System Prompt:"): + # Format the sources nicely + sources_str = "\n\n".join( + json.dumps(src, indent=2) for src in assistant_message_obj["sources"] + ) + msg["content"] += f"\n\nSources:\n{sources_str}" + break + # --------------------------------------------------------- + + # Extract usage if available + usage = None if assistant_message_obj: info = assistant_message_obj.get("info", {}) if isinstance(info, dict): @@ -145,19 +249,22 @@ class Pipeline: "output": output_tokens, "unit": "TOKENS", } + self.log(f"Usage data extracted: {usage}") - # Update generation - trace.update( - output=assistant_message, - ) - generation.end( - output=assistant_message, - metadata={"interface": "open-webui"}, - usage=usage, - ) + # Optionally update the trace with the final assistant output + trace.update(output=assistant_message) - # Clean up the chat_generations dictionary - del self.chat_traces[body["chat_id"]] - del self.chat_generations[body["chat_id"]] + # End the generation with the final assistant message and updated conversation + generation_payload = { + "input": body["messages"], # include the entire conversation + "metadata": {"interface": "open-webui"}, + "usage": usage, + } + + if self.valves.debug: + print(f"[DEBUG] Langfuse generation end request: {json.dumps(generation_payload, indent=2)}") + + generation.end(**generation_payload) + self.log(f"Generation ended for chat_id: {chat_id}") return body From 4011977ae1dbf55328d380c7717d319bb1fc2190 Mon Sep 17 00:00:00 2001 From: ther3zz <40278044+ther3zz@users.noreply.github.com> Date: Fri, 21 Feb 2025 18:27:34 -0500 Subject: [PATCH 06/41] Update langfuse_filter_pipeline.py reworked to make observations easier to understand fixed an issue where title generation were being overwritten by messages --- examples/filters/langfuse_filter_pipeline.py | 155 +++++++------------ 1 file changed, 52 insertions(+), 103 deletions(-) diff --git a/examples/filters/langfuse_filter_pipeline.py b/examples/filters/langfuse_filter_pipeline.py index 8c05f46..9788ae0 100644 --- a/examples/filters/langfuse_filter_pipeline.py +++ b/examples/filters/langfuse_filter_pipeline.py @@ -1,8 +1,8 @@ """ title: Langfuse Filter Pipeline author: open-webui -date: 2025-02-20 -version: 1.5 +date: 2024-09-27 +version: 1.6 license: MIT description: A filter pipeline that uses Langfuse. requirements: langfuse @@ -20,6 +20,7 @@ from langfuse.api.resources.commons.errors.unauthorized_error import Unauthorize def get_last_assistant_message_obj(messages: List[dict]) -> dict: + """Retrieve the last assistant message from the message list.""" for message in reversed(messages): if message["role"] == "assistant": return message @@ -50,13 +51,10 @@ class Pipeline: ) self.langfuse = None - # Keep track of the trace and the last-created generation for each chat_id self.chat_traces = {} - self.chat_generations = {} self.suppressed_logs = set() def log(self, message: str, suppress_repeats: bool = False): - """Logs messages to the terminal if debugging is enabled.""" if self.valves.debug: if suppress_repeats: if message in self.suppressed_logs: @@ -97,46 +95,15 @@ class Pipeline: ) async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: - """ - Inlet handles the incoming request (usually a user message). - - If no trace exists yet for this chat_id, we create a new trace. - - If a trace does exist, we simply create a new generation for the new user message. - """ if self.valves.debug: print(f"[DEBUG] Received request: {json.dumps(body, indent=2)}") self.log(f"Inlet function called with body: {body} and user: {user}") metadata = body.get("metadata", {}) - - # --------------------------------------------------------- - # Prepend the system prompt from metadata to the system message: - model_info = metadata.get("model", {}) - params_info = model_info.get("params", {}) - system_prompt = params_info.get("system", "") - - if system_prompt: - for msg in body["messages"]: - if msg.get("role") == "system": - # Only prepend if it hasn't already been prepended: - if not msg["content"].startswith("System Prompt:"): - msg["content"] = f"System Prompt:\n{system_prompt}\n\n{msg['content']}" - break - # --------------------------------------------------------- - - # Fix SYSTEM MESSAGE prefix issue: Only apply for "task_generation" - if "chat_id" not in metadata: - if "task_generation" in metadata.get("type", "").lower(): - chat_id = f"SYSTEM MESSAGE {uuid.uuid4()}" - self.log(f"Task Generation detected, assigned SYSTEM MESSAGE ID: {chat_id}") - else: - chat_id = str(uuid.uuid4()) # Regular chat messages - self.log(f"Assigned normal chat_id: {chat_id}") - - metadata["chat_id"] = chat_id - body["metadata"] = metadata - else: - chat_id = metadata["chat_id"] + chat_id = metadata.get("chat_id", str(uuid.uuid4())) + metadata["chat_id"] = chat_id + body["metadata"] = metadata required_keys = ["model", "messages"] missing_keys = [key for key in required_keys if key not in body] @@ -146,17 +113,31 @@ class Pipeline: raise ValueError(error_message) user_email = user.get("email") if user else None + task_name = metadata.get("task", "user_response") # Default to user_response if task is missing - # Check if we already have a trace for this chat + # **Extract system message from metadata and prepend** + system_message = "" + if "model" in metadata and "params" in metadata["model"]: + system_message = metadata["model"]["params"].get("system", "") + + for message in body["messages"]: + if message["role"] == "system": + message["content"] = system_message + "\n\n" + message["content"] + break + else: + # If no system message was found, add one + if system_message: + body["messages"].insert(0, {"role": "system", "content": system_message}) + + # Ensure unique tracking per task if chat_id not in self.chat_traces: - # Create a new trace and generation - self.log(f"Creating new chat trace for chat_id: {chat_id}") + self.log(f"Creating new trace for chat_id: {chat_id}") trace_payload = { - "name": f"filter:{__name__}", + "name": f"chat:{chat_id}", "input": body, "user_id": user_email, - "metadata": {"chat_id": chat_id}, + "metadata": metadata, # Preserve all metadata "session_id": chat_id, } @@ -164,80 +145,46 @@ class Pipeline: print(f"[DEBUG] Langfuse trace request: {json.dumps(trace_payload, indent=2)}") trace = self.langfuse.trace(**trace_payload) - - generation_payload = { - "name": chat_id, - "model": body["model"], - "input": body["messages"], - "metadata": {"interface": "open-webui"}, - } - - if self.valves.debug: - print(f"[DEBUG] Langfuse generation request: {json.dumps(generation_payload, indent=2)}") - - generation = trace.generation(**generation_payload) - self.chat_traces[chat_id] = trace - self.chat_generations[chat_id] = generation - self.log(f"Trace and generation objects successfully created for chat_id: {chat_id}") - else: - # Re-use existing trace but create a new generation for each new message - self.log(f"Re-using existing chat trace for chat_id: {chat_id}") trace = self.chat_traces[chat_id] + self.log(f"Reusing existing trace for chat_id: {chat_id}") - new_generation_payload = { - "name": f"{chat_id}:{str(uuid.uuid4())}", - "model": body["model"], - "input": body["messages"], - "metadata": {"interface": "open-webui"}, - } - if self.valves.debug: - print(f"[DEBUG] Langfuse new_generation request: {json.dumps(new_generation_payload, indent=2)}") + # Ensure all metadata fields are passed through + metadata["type"] = task_name + metadata["interface"] = "open-webui" - new_generation = trace.generation(**new_generation_payload) - self.chat_generations[chat_id] = new_generation + generation_payload = { + "name": f"{task_name}:{str(uuid.uuid4())}", + "model": body["model"], + "input": body["messages"], + "metadata": metadata, # Preserve all metadata + } + + if self.valves.debug: + print(f"[DEBUG] Langfuse generation request: {json.dumps(generation_payload, indent=2)}") + + trace.generation(**generation_payload) return body async def outlet(self, body: dict, user: Optional[dict] = None) -> dict: - """ - Outlet handles the response body (usually the assistant message). - It will finalize/end the generation created for the user request. - """ self.log(f"Outlet function called with body: {body}") chat_id = body.get("chat_id") + metadata = body.get("metadata", {}) + task_name = metadata.get("task", "llm_response") # Default to llm_response if missing - # If no trace or generation exist, attempt to register again - if chat_id not in self.chat_traces or chat_id not in self.chat_generations: - self.log(f"[WARNING] No matching chat trace found for chat_id: {chat_id}, attempting to re-register.") + if chat_id not in self.chat_traces: + self.log(f"[WARNING] No matching trace found for chat_id: {chat_id}, attempting to re-register.") return await self.inlet(body, user) trace = self.chat_traces[chat_id] - generation = self.chat_generations[chat_id] - # Get the last assistant message from the conversation assistant_message = get_last_assistant_message(body["messages"]) - assistant_message_obj = get_last_assistant_message_obj(body["messages"]) - # --------------------------------------------------------- - # If the outlet contains a sources array, append it after the "System Prompt:" - # section in the system message: - if assistant_message_obj and "sources" in assistant_message_obj and assistant_message_obj["sources"]: - for msg in body["messages"]: - if msg.get("role") == "system": - if msg["content"].startswith("System Prompt:"): - # Format the sources nicely - sources_str = "\n\n".join( - json.dumps(src, indent=2) for src in assistant_message_obj["sources"] - ) - msg["content"] += f"\n\nSources:\n{sources_str}" - break - # --------------------------------------------------------- - - # Extract usage if available usage = None + assistant_message_obj = get_last_assistant_message_obj(body["messages"]) if assistant_message_obj: info = assistant_message_obj.get("info", {}) if isinstance(info, dict): @@ -251,20 +198,22 @@ class Pipeline: } self.log(f"Usage data extracted: {usage}") - # Optionally update the trace with the final assistant output trace.update(output=assistant_message) - # End the generation with the final assistant message and updated conversation + metadata["type"] = task_name + metadata["interface"] = "open-webui" + generation_payload = { - "input": body["messages"], # include the entire conversation - "metadata": {"interface": "open-webui"}, + "name": f"{task_name}:{str(uuid.uuid4())}", + "input": body["messages"], + "metadata": metadata, # Preserve all metadata "usage": usage, } if self.valves.debug: print(f"[DEBUG] Langfuse generation end request: {json.dumps(generation_payload, indent=2)}") - generation.end(**generation_payload) + trace.generation().end(**generation_payload) self.log(f"Generation ended for chat_id: {chat_id}") return body From afde1aafb0fb852d4b1cc77f87618be67dfcccfe Mon Sep 17 00:00:00 2001 From: Federico B Date: Mon, 24 Feb 2025 23:16:41 +0100 Subject: [PATCH 07/41] Add claude-3-7-sonnet-20250219 Model Added support for the latest Claude-3-7-Sonnet model (version 20250219) from Anthropic in the OpenWebUI pipeline. --- examples/pipelines/providers/anthropic_manifold_pipeline.py | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/pipelines/providers/anthropic_manifold_pipeline.py b/examples/pipelines/providers/anthropic_manifold_pipeline.py index 98b27d5..3540c61 100644 --- a/examples/pipelines/providers/anthropic_manifold_pipeline.py +++ b/examples/pipelines/providers/anthropic_manifold_pipeline.py @@ -48,6 +48,7 @@ class Pipeline: {"id": "claude-3-sonnet-20240229", "name": "claude-3-sonnet"}, {"id": "claude-3-5-haiku-20241022", "name": "claude-3.5-haiku"}, {"id": "claude-3-5-sonnet-20241022", "name": "claude-3.5-sonnet"}, + {"id": "claude-3-7-sonnet-20250219", "name": "claude-3.7-sonnet"}, ] async def on_startup(self): From 78315eb9308e378ac2bddb033b98c4bb6625c3d7 Mon Sep 17 00:00:00 2001 From: ther3zz <40278044+ther3zz@users.noreply.github.com> Date: Thu, 27 Feb 2025 10:48:49 -0500 Subject: [PATCH 08/41] Update langfuse_filter_pipeline.py fixes usage not passing over to langfuse --- examples/filters/langfuse_filter_pipeline.py | 32 +++++--------------- 1 file changed, 8 insertions(+), 24 deletions(-) diff --git a/examples/filters/langfuse_filter_pipeline.py b/examples/filters/langfuse_filter_pipeline.py index 9788ae0..a788919 100644 --- a/examples/filters/langfuse_filter_pipeline.py +++ b/examples/filters/langfuse_filter_pipeline.py @@ -113,23 +113,8 @@ class Pipeline: raise ValueError(error_message) user_email = user.get("email") if user else None - task_name = metadata.get("task", "user_response") # Default to user_response if task is missing + task_name = metadata.get("task", "user_response") - # **Extract system message from metadata and prepend** - system_message = "" - if "model" in metadata and "params" in metadata["model"]: - system_message = metadata["model"]["params"].get("system", "") - - for message in body["messages"]: - if message["role"] == "system": - message["content"] = system_message + "\n\n" + message["content"] - break - else: - # If no system message was found, add one - if system_message: - body["messages"].insert(0, {"role": "system", "content": system_message}) - - # Ensure unique tracking per task if chat_id not in self.chat_traces: self.log(f"Creating new trace for chat_id: {chat_id}") @@ -137,7 +122,7 @@ class Pipeline: "name": f"chat:{chat_id}", "input": body, "user_id": user_email, - "metadata": metadata, # Preserve all metadata + "metadata": metadata, "session_id": chat_id, } @@ -150,7 +135,6 @@ class Pipeline: trace = self.chat_traces[chat_id] self.log(f"Reusing existing trace for chat_id: {chat_id}") - # Ensure all metadata fields are passed through metadata["type"] = task_name metadata["interface"] = "open-webui" @@ -158,7 +142,7 @@ class Pipeline: "name": f"{task_name}:{str(uuid.uuid4())}", "model": body["model"], "input": body["messages"], - "metadata": metadata, # Preserve all metadata + "metadata": metadata, } if self.valves.debug: @@ -173,7 +157,7 @@ class Pipeline: chat_id = body.get("chat_id") metadata = body.get("metadata", {}) - task_name = metadata.get("task", "llm_response") # Default to llm_response if missing + task_name = metadata.get("task", "llm_response") if chat_id not in self.chat_traces: self.log(f"[WARNING] No matching trace found for chat_id: {chat_id}, attempting to re-register.") @@ -186,7 +170,7 @@ class Pipeline: usage = None assistant_message_obj = get_last_assistant_message_obj(body["messages"]) if assistant_message_obj: - info = assistant_message_obj.get("info", {}) + info = assistant_message_obj.get("usage", {}) if isinstance(info, dict): input_tokens = info.get("prompt_eval_count") or info.get("prompt_tokens") output_tokens = info.get("eval_count") or info.get("completion_tokens") @@ -206,14 +190,14 @@ class Pipeline: generation_payload = { "name": f"{task_name}:{str(uuid.uuid4())}", "input": body["messages"], - "metadata": metadata, # Preserve all metadata - "usage": usage, + "metadata": metadata, + "usage": usage, } if self.valves.debug: print(f"[DEBUG] Langfuse generation end request: {json.dumps(generation_payload, indent=2)}") - trace.generation().end(**generation_payload) + trace.generation().end(**generation_payload) self.log(f"Generation ended for chat_id: {chat_id}") return body From 10242dc8a21a6c5c7ce7e51efa21ea171875fb83 Mon Sep 17 00:00:00 2001 From: Mikey O'Brien Date: Mon, 24 Feb 2025 19:33:54 -0600 Subject: [PATCH 09/41] Add thinking support for claude-3-7-sonnet --- .../providers/anthropic_manifold_pipeline.py | 172 ++++++++++++++---- 1 file changed, 134 insertions(+), 38 deletions(-) diff --git a/examples/pipelines/providers/anthropic_manifold_pipeline.py b/examples/pipelines/providers/anthropic_manifold_pipeline.py index 3540c61..29dd2d7 100644 --- a/examples/pipelines/providers/anthropic_manifold_pipeline.py +++ b/examples/pipelines/providers/anthropic_manifold_pipeline.py @@ -6,7 +6,7 @@ version: 1.4 license: MIT description: A pipeline for generating text and processing images using the Anthropic API. requirements: requests, sseclient-py -environment_variables: ANTHROPIC_API_KEY +environment_variables: ANTHROPIC_API_KEY, ANTHROPIC_THINKING_BUDGET_TOKENS, ANTHROPIC_ENABLE_THINKING """ import os @@ -18,6 +18,17 @@ import sseclient from utils.pipelines.main import pop_system_message +REASONING_EFFORT_BUDGET_TOKEN_MAP = { + "none": None, + "low": 1024, + "medium": 4096, + "high": 16384, + "max": 32768, +} + +# Maximum combined token limit for Claude 3.7 +MAX_COMBINED_TOKENS = 64000 + class Pipeline: class Valves(BaseModel): @@ -29,16 +40,20 @@ class Pipeline: self.name = "anthropic/" self.valves = self.Valves( - **{"ANTHROPIC_API_KEY": os.getenv("ANTHROPIC_API_KEY", "your-api-key-here")} + **{ + "ANTHROPIC_API_KEY": os.getenv( + "ANTHROPIC_API_KEY", "your-api-key-here" + ), + } ) - self.url = 'https://api.anthropic.com/v1/messages' + self.url = "https://api.anthropic.com/v1/messages" self.update_headers() def update_headers(self): self.headers = { - 'anthropic-version': '2023-06-01', - 'content-type': 'application/json', - 'x-api-key': self.valves.ANTHROPIC_API_KEY + "anthropic-version": "2023-06-01", + "content-type": "application/json", + "x-api-key": self.valves.ANTHROPIC_API_KEY, } def get_anthropic_models(self): @@ -88,7 +103,7 @@ class Pipeline: ) -> Union[str, Generator, Iterator]: try: # Remove unnecessary keys - for key in ['user', 'chat_id', 'title']: + for key in ["user", "chat_id", "title"]: body.pop(key, None) system_message, messages = pop_system_message(messages) @@ -102,28 +117,40 @@ class Pipeline: if isinstance(message.get("content"), list): for item in message["content"]: if item["type"] == "text": - processed_content.append({"type": "text", "text": item["text"]}) + processed_content.append( + {"type": "text", "text": item["text"]} + ) elif item["type"] == "image_url": if image_count >= 5: - raise ValueError("Maximum of 5 images per API call exceeded") + raise ValueError( + "Maximum of 5 images per API call exceeded" + ) processed_image = self.process_image(item["image_url"]) processed_content.append(processed_image) if processed_image["source"]["type"] == "base64": - image_size = len(processed_image["source"]["data"]) * 3 / 4 + image_size = ( + len(processed_image["source"]["data"]) * 3 / 4 + ) else: image_size = 0 total_image_size += image_size if total_image_size > 100 * 1024 * 1024: - raise ValueError("Total size of images exceeds 100 MB limit") + raise ValueError( + "Total size of images exceeds 100 MB limit" + ) image_count += 1 else: - processed_content = [{"type": "text", "text": message.get("content", "")}] + processed_content = [ + {"type": "text", "text": message.get("content", "")} + ] - processed_messages.append({"role": message["role"], "content": processed_content}) + processed_messages.append( + {"role": message["role"], "content": processed_content} + ) # Prepare the payload payload = { @@ -139,6 +166,42 @@ class Pipeline: } if body.get("stream", False): + supports_thinking = "claude-3-7" in model_id + reasoning_effort = body.get("reasoning_effort", "none") + budget_tokens = REASONING_EFFORT_BUDGET_TOKEN_MAP.get(reasoning_effort) + + # Allow users to input an integer value representing budget tokens + if ( + not budget_tokens + and reasoning_effort not in REASONING_EFFORT_BUDGET_TOKEN_MAP.keys() + ): + try: + budget_tokens = int(reasoning_effort) + except ValueError as e: + print("Failed to convert reasoning effort to int", e) + budget_tokens = None + + if supports_thinking and budget_tokens: + # Check if the combined tokens (budget_tokens + max_tokens) exceeds the limit + max_tokens = payload.get("max_tokens", 4096) + combined_tokens = budget_tokens + max_tokens + + if combined_tokens > MAX_COMBINED_TOKENS: + error_message = f"Error: Combined tokens (budget_tokens {budget_tokens} + max_tokens {max_tokens} = {combined_tokens}) exceeds the maximum limit of {MAX_COMBINED_TOKENS}" + print(error_message) + return error_message + + payload["max_tokens"] = combined_tokens + payload["thinking"] = { + "type": "enabled", + "budget_tokens": budget_tokens, + } + # Thinking requires temperature 1.0 and does not support top_p, top_k + payload["temperature"] = 1.0 + if "top_k" in payload: + del payload["top_k"] + if "top_p" in payload: + del payload["top_p"] return self.stream_response(payload) else: return self.get_completion(payload) @@ -146,31 +209,64 @@ class Pipeline: return f"Error: {e}" def stream_response(self, payload: dict) -> Generator: - response = requests.post(self.url, headers=self.headers, json=payload, stream=True) + """Used for title and tag generation""" + try: + response = requests.post( + self.url, headers=self.headers, json=payload, stream=True + ) + print(f"{response} for {payload}") - if response.status_code == 200: - client = sseclient.SSEClient(response) - for event in client.events(): - try: - data = json.loads(event.data) - if data["type"] == "content_block_start": - yield data["content_block"]["text"] - elif data["type"] == "content_block_delta": - yield data["delta"]["text"] - elif data["type"] == "message_stop": - break - except json.JSONDecodeError: - print(f"Failed to parse JSON: {event.data}") - except KeyError as e: - print(f"Unexpected data structure: {e}") - print(f"Full data: {data}") - else: - raise Exception(f"Error: {response.status_code} - {response.text}") + if response.status_code == 200: + client = sseclient.SSEClient(response) + for event in client.events(): + try: + data = json.loads(event.data) + if data["type"] == "content_block_start": + if data["content_block"]["type"] == "thinking": + yield "" + else: + yield data["content_block"]["text"] + elif data["type"] == "content_block_delta": + if data["delta"]["type"] == "thinking_delta": + yield data["delta"]["thinking"] + elif data["delta"]["type"] == "signature_delta": + yield "\n \n\n" + else: + yield data["delta"]["text"] + elif data["type"] == "message_stop": + break + except json.JSONDecodeError: + print(f"Failed to parse JSON: {event.data}") + yield f"Error: Failed to parse JSON response" + except KeyError as e: + print(f"Unexpected data structure: {e} for payload {payload}") + print(f"Full data: {data}") + yield f"Error: Unexpected data structure: {e}" + else: + error_message = f"Error: {response.status_code} - {response.text}" + print(error_message) + yield error_message + except Exception as e: + error_message = f"Error: {str(e)}" + print(error_message) + yield error_message def get_completion(self, payload: dict) -> str: - response = requests.post(self.url, headers=self.headers, json=payload) - if response.status_code == 200: - res = response.json() - return res["content"][0]["text"] if "content" in res and res["content"] else "" - else: - raise Exception(f"Error: {response.status_code} - {response.text}") + try: + response = requests.post(self.url, headers=self.headers, json=payload) + print(response, payload) + if response.status_code == 200: + res = response.json() + for content in res["content"]: + if not content.get("text"): + continue + return content["text"] + return "" + else: + error_message = f"Error: {response.status_code} - {response.text}" + print(error_message) + return error_message + except Exception as e: + error_message = f"Error: {str(e)}" + print(error_message) + return error_message From 689d986f817a5d3ec610297abbd74d9fb1963890 Mon Sep 17 00:00:00 2001 From: ther3zz <40278044+ther3zz@users.noreply.github.com> Date: Mon, 3 Mar 2025 13:26:03 -0500 Subject: [PATCH 10/41] add tag valve and tag insert adds a valve to enable/disable inserting tags into the langfuse trace. --- examples/filters/langfuse_filter_pipeline.py | 48 ++++++++++++++++++-- 1 file changed, 43 insertions(+), 5 deletions(-) diff --git a/examples/filters/langfuse_filter_pipeline.py b/examples/filters/langfuse_filter_pipeline.py index a788919..1b6b8c7 100644 --- a/examples/filters/langfuse_filter_pipeline.py +++ b/examples/filters/langfuse_filter_pipeline.py @@ -35,6 +35,8 @@ class Pipeline: public_key: str host: str debug: bool = False + # New valve that controls whether task names are added as tags: + insert_tags: bool = True def __init__(self): self.type = "filter" @@ -94,6 +96,20 @@ class Pipeline: f"Langfuse error: {e} Please re-enter your Langfuse credentials in the pipeline settings." ) + def _build_tags(self, task_name: str) -> list: + """ + Builds a list of tags based on valve settings, ensuring we always add + 'open-webui' and skip user_response / llm_response. + """ + tags_list = [] + if self.valves.insert_tags: + # Always add 'open-webui' + tags_list.append("open-webui") + # Add the task_name if it's not one of the excluded defaults + if task_name not in ["user_response", "llm_response"]: + tags_list.append(task_name) + return tags_list + async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: if self.valves.debug: print(f"[DEBUG] Received request: {json.dumps(body, indent=2)}") @@ -113,7 +129,11 @@ class Pipeline: raise ValueError(error_message) user_email = user.get("email") if user else None - task_name = metadata.get("task", "user_response") + # Defaulting to 'user_response' if no task is provided + task_name = metadata.get("task", "user_response") + + # Build tags + tags_list = self._build_tags(task_name) if chat_id not in self.chat_traces: self.log(f"Creating new trace for chat_id: {chat_id}") @@ -126,6 +146,9 @@ class Pipeline: "session_id": chat_id, } + if tags_list: + trace_payload["tags"] = tags_list + if self.valves.debug: print(f"[DEBUG] Langfuse trace request: {json.dumps(trace_payload, indent=2)}") @@ -134,7 +157,10 @@ class Pipeline: else: trace = self.chat_traces[chat_id] self.log(f"Reusing existing trace for chat_id: {chat_id}") + if tags_list: + trace.update(tags=tags_list) + # Update metadata with type metadata["type"] = task_name metadata["interface"] = "open-webui" @@ -145,6 +171,9 @@ class Pipeline: "metadata": metadata, } + if tags_list: + generation_payload["tags"] = tags_list + if self.valves.debug: print(f"[DEBUG] Langfuse generation request: {json.dumps(generation_payload, indent=2)}") @@ -157,18 +186,23 @@ class Pipeline: chat_id = body.get("chat_id") metadata = body.get("metadata", {}) - task_name = metadata.get("task", "llm_response") + # Defaulting to 'llm_response' if no task is provided + task_name = metadata.get("task", "llm_response") + + # Build tags + tags_list = self._build_tags(task_name) if chat_id not in self.chat_traces: self.log(f"[WARNING] No matching trace found for chat_id: {chat_id}, attempting to re-register.") + # Re-run inlet to register if somehow missing return await self.inlet(body, user) trace = self.chat_traces[chat_id] assistant_message = get_last_assistant_message(body["messages"]) + assistant_message_obj = get_last_assistant_message_obj(body["messages"]) usage = None - assistant_message_obj = get_last_assistant_message_obj(body["messages"]) if assistant_message_obj: info = assistant_message_obj.get("usage", {}) if isinstance(info, dict): @@ -182,6 +216,7 @@ class Pipeline: } self.log(f"Usage data extracted: {usage}") + # Update the trace output with the last assistant message trace.update(output=assistant_message) metadata["type"] = task_name @@ -191,13 +226,16 @@ class Pipeline: "name": f"{task_name}:{str(uuid.uuid4())}", "input": body["messages"], "metadata": metadata, - "usage": usage, + "usage": usage, } + if tags_list: + generation_payload["tags"] = tags_list + if self.valves.debug: print(f"[DEBUG] Langfuse generation end request: {json.dumps(generation_payload, indent=2)}") - trace.generation().end(**generation_payload) + trace.generation().end(**generation_payload) self.log(f"Generation ended for chat_id: {chat_id}") return body From 3703976ac2eec1950041a87529efe85f824cbc70 Mon Sep 17 00:00:00 2001 From: ther3zz <40278044+ther3zz@users.noreply.github.com> Date: Tue, 4 Mar 2025 10:28:20 -0500 Subject: [PATCH 11/41] modify non-llm responses as events instead of inserting non-llm responses as generation, they are now inserted as events into langfuse --- examples/filters/langfuse_filter_pipeline.py | 88 ++++++++++++++------ 1 file changed, 63 insertions(+), 25 deletions(-) diff --git a/examples/filters/langfuse_filter_pipeline.py b/examples/filters/langfuse_filter_pipeline.py index 1b6b8c7..f06e945 100644 --- a/examples/filters/langfuse_filter_pipeline.py +++ b/examples/filters/langfuse_filter_pipeline.py @@ -1,7 +1,7 @@ """ title: Langfuse Filter Pipeline author: open-webui -date: 2024-09-27 +date: 2025-03-04 version: 1.6 license: MIT description: A filter pipeline that uses Langfuse. @@ -56,6 +56,9 @@ class Pipeline: self.chat_traces = {} self.suppressed_logs = set() + # Only these tasks will be treated as LLM "generations": + self.GENERATION_TASKS = {"llm_response"} + def log(self, message: str, suppress_repeats: bool = False): if self.valves.debug: if suppress_repeats: @@ -99,7 +102,7 @@ class Pipeline: def _build_tags(self, task_name: str) -> list: """ Builds a list of tags based on valve settings, ensuring we always add - 'open-webui' and skip user_response / llm_response. + 'open-webui' and skip user_response / llm_response from becoming tags themselves. """ tags_list = [] if self.valves.insert_tags: @@ -164,20 +167,35 @@ class Pipeline: metadata["type"] = task_name metadata["interface"] = "open-webui" - generation_payload = { - "name": f"{task_name}:{str(uuid.uuid4())}", - "model": body["model"], - "input": body["messages"], - "metadata": metadata, - } + # If it's a task that is considered an LLM generation + if task_name in self.GENERATION_TASKS: + generation_payload = { + "name": f"{task_name}:{str(uuid.uuid4())}", + "model": body["model"], + "input": body["messages"], + "metadata": metadata, + } + if tags_list: + generation_payload["tags"] = tags_list - if tags_list: - generation_payload["tags"] = tags_list + if self.valves.debug: + print(f"[DEBUG] Langfuse generation request: {json.dumps(generation_payload, indent=2)}") - if self.valves.debug: - print(f"[DEBUG] Langfuse generation request: {json.dumps(generation_payload, indent=2)}") + trace.generation(**generation_payload) + else: + # Otherwise, log it as an event + event_payload = { + "name": f"{task_name}:{str(uuid.uuid4())}", + "metadata": metadata, + "input": body["messages"], + } + if tags_list: + event_payload["tags"] = tags_list - trace.generation(**generation_payload) + if self.valves.debug: + print(f"[DEBUG] Langfuse event request: {json.dumps(event_payload, indent=2)}") + + trace.event(**event_payload) return body @@ -222,20 +240,40 @@ class Pipeline: metadata["type"] = task_name metadata["interface"] = "open-webui" - generation_payload = { - "name": f"{task_name}:{str(uuid.uuid4())}", - "input": body["messages"], - "metadata": metadata, - "usage": usage, - } + if task_name in self.GENERATION_TASKS: + # If it's an LLM generation + generation_payload = { + "name": f"{task_name}:{str(uuid.uuid4())}", + "input": body["messages"], + "metadata": metadata, + "usage": usage, + } + if tags_list: + generation_payload["tags"] = tags_list - if tags_list: - generation_payload["tags"] = tags_list + if self.valves.debug: + print(f"[DEBUG] Langfuse generation end request: {json.dumps(generation_payload, indent=2)}") - if self.valves.debug: - print(f"[DEBUG] Langfuse generation end request: {json.dumps(generation_payload, indent=2)}") + trace.generation().end(**generation_payload) + self.log(f"Generation ended for chat_id: {chat_id}") + else: + # Otherwise log as an event + event_payload = { + "name": f"{task_name}:{str(uuid.uuid4())}", + "metadata": metadata, + "input": body["messages"], + } + if usage: + # If you want usage on event as well + event_payload["metadata"]["usage"] = usage - trace.generation().end(**generation_payload) - self.log(f"Generation ended for chat_id: {chat_id}") + if tags_list: + event_payload["tags"] = tags_list + + if self.valves.debug: + print(f"[DEBUG] Langfuse event end request: {json.dumps(event_payload, indent=2)}") + + trace.event(**event_payload) + self.log(f"Event logged for chat_id: {chat_id}") return body From 40c32238afd927416e42caa202666558c23bb78a Mon Sep 17 00:00:00 2001 From: ther3zz <40278044+ther3zz@users.noreply.github.com> Date: Tue, 4 Mar 2025 10:32:40 -0500 Subject: [PATCH 12/41] show debug switch last moving the debug valve as the last option (above the tags valve) --- examples/filters/langfuse_filter_pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/filters/langfuse_filter_pipeline.py b/examples/filters/langfuse_filter_pipeline.py index f06e945..0a25bbc 100644 --- a/examples/filters/langfuse_filter_pipeline.py +++ b/examples/filters/langfuse_filter_pipeline.py @@ -34,9 +34,9 @@ class Pipeline: secret_key: str public_key: str host: str - debug: bool = False # New valve that controls whether task names are added as tags: insert_tags: bool = True + debug: bool = False def __init__(self): self.type = "filter" From 34a464e1d93090bb9a6806a8379b791bd66465c4 Mon Sep 17 00:00:00 2001 From: ther3zz <40278044+ther3zz@users.noreply.github.com> Date: Tue, 4 Mar 2025 13:27:58 -0500 Subject: [PATCH 13/41] fix model id we now pass model info to langfuse for generation observations --- examples/filters/langfuse_filter_pipeline.py | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/filters/langfuse_filter_pipeline.py b/examples/filters/langfuse_filter_pipeline.py index 0a25bbc..48f453a 100644 --- a/examples/filters/langfuse_filter_pipeline.py +++ b/examples/filters/langfuse_filter_pipeline.py @@ -244,6 +244,7 @@ class Pipeline: # If it's an LLM generation generation_payload = { "name": f"{task_name}:{str(uuid.uuid4())}", + "model": body.get("model"), # <-- Include the model in LLM generation "input": body["messages"], "metadata": metadata, "usage": usage, From afb4704034870075333e060aa56e6114b0154f36 Mon Sep 17 00:00:00 2001 From: reasv <7143787+reasv@users.noreply.github.com> Date: Mon, 10 Mar 2025 23:22:43 +0100 Subject: [PATCH 14/41] Fix: Error: int() argument must be a string, a bytes-like object or a real number, not 'NoneType' --- examples/pipelines/providers/anthropic_manifold_pipeline.py | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/pipelines/providers/anthropic_manifold_pipeline.py b/examples/pipelines/providers/anthropic_manifold_pipeline.py index 29dd2d7..069ecf1 100644 --- a/examples/pipelines/providers/anthropic_manifold_pipeline.py +++ b/examples/pipelines/providers/anthropic_manifold_pipeline.py @@ -173,6 +173,7 @@ class Pipeline: # Allow users to input an integer value representing budget tokens if ( not budget_tokens + and reasoning_effort is not None and reasoning_effort not in REASONING_EFFORT_BUDGET_TOKEN_MAP.keys() ): try: From b7debc3b1b6e4bd9aabec3bb1f39d93d444dcb87 Mon Sep 17 00:00:00 2001 From: Boris Feld Date: Thu, 13 Mar 2025 16:08:02 +0100 Subject: [PATCH 15/41] Add filter example of message monitoring using Opik --- README.md | 1 + examples/filters/opik_filter_pipeline.py | 274 +++++++++++++++++++++++ requirements.txt | 1 + 3 files changed, 276 insertions(+) create mode 100644 examples/filters/opik_filter_pipeline.py diff --git a/README.md b/README.md index 22c11db..c82496b 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,7 @@ Welcome to **Pipelines**, an [Open WebUI](https://github.com/open-webui) initiat - [**Function Calling Pipeline**](/examples/filters/function_calling_filter_pipeline.py): Easily handle function calls and enhance your applications with custom logic. - [**Custom RAG Pipeline**](/examples/pipelines/rag/llamaindex_pipeline.py): Implement sophisticated Retrieval-Augmented Generation pipelines tailored to your needs. - [**Message Monitoring Using Langfuse**](/examples/filters/langfuse_filter_pipeline.py): Monitor and analyze message interactions in real-time using Langfuse. +- [**Message Monitoring Using Opik**](/examples/filters/opik_filter_pipeline.py): Monitor and analyze message interactions using Opik, an open-source platform for debugging and evaluating LLM applications and RAG systems. - [**Rate Limit Filter**](/examples/filters/rate_limit_filter_pipeline.py): Control the flow of requests to prevent exceeding rate limits. - [**Real-Time Translation Filter with LibreTranslate**](/examples/filters/libretranslate_filter_pipeline.py): Seamlessly integrate real-time translations into your LLM interactions. - [**Toxic Message Filter**](/examples/filters/detoxify_filter_pipeline.py): Implement filters to detect and handle toxic messages effectively. diff --git a/examples/filters/opik_filter_pipeline.py b/examples/filters/opik_filter_pipeline.py new file mode 100644 index 0000000..ab768b0 --- /dev/null +++ b/examples/filters/opik_filter_pipeline.py @@ -0,0 +1,274 @@ +""" +title: Opik Filter Pipeline +author: open-webui +date: 2025-03-12 +version: 1.0 +license: MIT +description: A filter pipeline that uses Opik for LLM observability. +requirements: opik +""" + +from typing import List, Optional +import os +import uuid +import json + +from pydantic import BaseModel +from opik import Opik + + +def get_last_assistant_message_obj(messages: List[dict]) -> dict: + for message in reversed(messages): + if message["role"] == "assistant": + return message + return {} + + +class Pipeline: + class Valves(BaseModel): + pipelines: List[str] = [] + priority: int = 0 + api_key: Optional[str] = None + workspace: str + project_name: str + host: str + debug: bool = False + + def __init__(self): + self.type = "filter" + self.name = "Opik Filter" + + self.valves = self.Valves( + **{ + "pipelines": ["*"], + "api_key": os.getenv("OPIK_API_KEY", "set_me_for_opik_cloud"), + "workspace": os.getenv("OPIK_WORKSPACE", "default"), + "project_name": os.getenv("OPIK_PROJECT_NAME", "default"), + "host": os.getenv( + "OPIK_URL_OVERRIDE", "https://www.comet.com/opik/api" + ), + "debug": os.getenv("DEBUG_MODE", "false").lower() == "true", + } + ) + + self.opik = None + # Keep track of the trace and the last-created span for each chat_id + self.chat_traces = {} + self.chat_spans = {} + + self.suppressed_logs = set() + + def log(self, message: str, suppress_repeats: bool = False): + """Logs messages to the terminal if debugging is enabled.""" + if self.valves.debug: + if suppress_repeats: + if message in self.suppressed_logs: + return + self.suppressed_logs.add(message) + print(f"[DEBUG] {message}") + + async def on_startup(self): + self.log(f"on_startup triggered for {__name__}") + self.set_opik() + + async def on_shutdown(self): + self.log(f"on_shutdown triggered for {__name__}") + if self.opik: + self.opik.end() + + async def on_valves_updated(self): + self.log("Valves updated, resetting Opik client.") + if self.opik: + self.opik.end() + self.set_opik() + + def set_opik(self): + try: + self.opik = Opik( + project_name=self.valves.project_name, + workspace=self.valves.workspace, + host=self.valves.host, + api_key=self.valves.api_key, + ) + self.opik.auth_check() + self.log("Opik client initialized successfully.") + except Exception as e: + print( + f"Opik error: {e} Please re-enter your Opik credentials in the pipeline settings." + ) + + async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: + """ + Inlet handles the incoming request (usually a user message). + - If no trace exists yet for this chat_id, we create a new trace. + - If a trace does exist, we simply create a new span for the new user message. + """ + if self.valves.debug: + print(f"[DEBUG] Received request: {json.dumps(body, indent=2)}") + + self.log(f"Inlet function called with body: {body} and user: {user}") + + metadata = body.get("metadata", {}) + task = metadata.get("task", "") + + # Skip logging tasks for now + if task: + self.log(f"Skipping {task} task.") + return body + + if "chat_id" not in metadata: + chat_id = str(uuid.uuid4()) # Regular chat messages + self.log(f"Assigned normal chat_id: {chat_id}") + + metadata["chat_id"] = chat_id + body["metadata"] = metadata + else: + chat_id = metadata["chat_id"] + + required_keys = ["model", "messages"] + missing_keys = [key for key in required_keys if key not in body] + if missing_keys: + error_message = ( + f"Error: Missing keys in the request body: {', '.join(missing_keys)}" + ) + self.log(error_message) + raise ValueError(error_message) + + user_email = user.get("email") if user else None + + assert chat_id not in self.chat_traces, ( + f"There shouldn't be a trace already exists for chat_id {chat_id}" + ) + + # Create a new trace and span + self.log(f"Creating new chat trace for chat_id: {chat_id}") + + # Body copy for traces and span + trace_body = body.copy() + span_body = body.copy() + + # Extract metadata from body + metadata = trace_body.pop("metadata", {}) + metadata.update({"chat_id": chat_id, "user_id": user_email}) + + # We don't need the model at the trace level + trace_body.pop("model", None) + + trace_payload = { + "name": f"{__name__}", + "input": trace_body, + "metadata": metadata, + "thread_id": chat_id, + } + + if self.valves.debug: + print(f"[DEBUG] Opik trace request: {json.dumps(trace_payload, indent=2)}") + + trace = self.opik.trace(**trace_payload) + + span_metadata = metadata.copy() + span_metadata.update({"interface": "open-webui"}) + + # Extract the model from body + span_body.pop("model", None) + # We don't need the metadata in the input for the span + span_body.pop("metadata", None) + + # Extract the model and provider from metadata + model = span_metadata.get("model", {}).get("id", None) + provider = span_metadata.get("model", {}).get("owned_by", None) + + span_payload = { + "name": chat_id, + "model": model, + "provider": provider, + "input": span_body, + "metadata": span_metadata, + "type": "llm", + } + + if self.valves.debug: + print(f"[DEBUG] Opik span request: {json.dumps(span_payload, indent=2)}") + + span = trace.span(**span_payload) + + self.chat_traces[chat_id] = trace + self.chat_spans[chat_id] = span + self.log(f"Trace and span objects successfully created for chat_id: {chat_id}") + + return body + + async def outlet(self, body: dict, user: Optional[dict] = None) -> dict: + """ + Outlet handles the response body (usually the assistant message). + It will finalize/end the span created for the user request. + """ + self.log(f"Outlet function called with body: {body}") + + chat_id = body.get("chat_id") + + # If no trace or span exist, attempt to register again + if chat_id not in self.chat_traces or chat_id not in self.chat_spans: + self.log( + f"[WARNING] No matching chat trace found for chat_id: {chat_id}, chat won't be logged." + ) + return body + + trace = self.chat_traces[chat_id] + span = self.chat_spans[chat_id] + + # Body copy for traces and span + trace_body = body.copy() + span_body = body.copy() + + # Get the last assistant message from the conversation + assistant_message_obj = get_last_assistant_message_obj(body["messages"]) + + # Extract usage if available + usage = None + self.log(f"Assistant message obj: {assistant_message_obj}") + if assistant_message_obj: + message_usage = assistant_message_obj.get("usage", {}) + if isinstance(message_usage, dict): + input_tokens = message_usage.get( + "prompt_eval_count" + ) or message_usage.get("prompt_tokens") + output_tokens = message_usage.get("eval_count") or message_usage.get( + "completion_tokens" + ) + if input_tokens is not None and output_tokens is not None: + usage = { + "prompt_tokens": input_tokens, + "completion_tokens": output_tokens, + "total_tokens": input_tokens + output_tokens, + } + self.log(f"Usage data extracted: {usage}") + + # Chat_id is already logged as trace thread + span_body.pop("chat_id", None) + + # End the span with the final assistant message and updated conversation + span_payload = { + "output": span_body, # include the entire conversation + "usage": usage, + } + + if self.valves.debug: + print( + f"[DEBUG] Opik span end request: {json.dumps(span_payload, indent=2)}" + ) + + span.end(**span_payload) + self.log(f"span ended for chat_id: {chat_id}") + + # Chat_id is already logged as trace thread + span_body.pop("chat_id", None) + + # Optionally update the trace with the final assistant output + trace.end(output=trace_body) + + # Force the creation of a new trace and span for the next chat even if they are part of the same thread + del self.chat_traces[chat_id] + del self.chat_spans[chat_id] + + return body diff --git a/requirements.txt b/requirements.txt index 86160d7..b3cc96e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -32,6 +32,7 @@ psycopg2-binary # Observability langfuse ddtrace +opik # ML libraries torch From 48ddbec455de76fc43224daf3438537cd8fcde87 Mon Sep 17 00:00:00 2001 From: bartonzzx Date: Sat, 15 Mar 2025 23:02:38 +0800 Subject: [PATCH 16/41] Add langgraph integration example, also support thinking. --- .../integrations/langgraph_pipeline/README.md | 28 +++ .../langgraph_pipeline/langgraph_example.py | 166 ++++++++++++++++++ .../langgraph_stream_pipeline.py | 63 +++++++ .../langgraph_pipeline/requirements.txt | 40 +++++ 4 files changed, 297 insertions(+) create mode 100644 examples/pipelines/integrations/langgraph_pipeline/README.md create mode 100644 examples/pipelines/integrations/langgraph_pipeline/langgraph_example.py create mode 100644 examples/pipelines/integrations/langgraph_pipeline/langgraph_stream_pipeline.py create mode 100644 examples/pipelines/integrations/langgraph_pipeline/requirements.txt diff --git a/examples/pipelines/integrations/langgraph_pipeline/README.md b/examples/pipelines/integrations/langgraph_pipeline/README.md new file mode 100644 index 0000000..8d2cca6 --- /dev/null +++ b/examples/pipelines/integrations/langgraph_pipeline/README.md @@ -0,0 +1,28 @@ +# Example of langgraph integration +## Python version: 3.11 +## Feature +1. Using langgraph stream writer and custom mode of stream to integrate langgraph with open webui pipeline. +2. Support \ block display. +## Prerequirement +Install the open webui pipeline. +You can follow the docs : https://docs.openwebui.com/pipelines/#-quick-start-with-docker + +## Usage +### 1. Upload pipeline file +Upload `langgraph_stream_pipeline.py` to the open webui pipeline. + +### 2. Enable the uploaded pipeline +Properly set up your langgraph api url. + +And choose **"LangGraph stream"** as your model. + +### 2. Install dependencies +Under the folder `pipelines/examples/pipelines/integrations/langgraph_pipeline`, run command below : +``` +pip install -r requirements.txt +``` +### 3. Start langgraph api server +Run command below : +``` +uvicorn langgraph_example:app --reload +``` \ No newline at end of file diff --git a/examples/pipelines/integrations/langgraph_pipeline/langgraph_example.py b/examples/pipelines/integrations/langgraph_pipeline/langgraph_example.py new file mode 100644 index 0000000..6ae57a2 --- /dev/null +++ b/examples/pipelines/integrations/langgraph_pipeline/langgraph_example.py @@ -0,0 +1,166 @@ +""" +title: Langgraph stream integration +author: bartonzzx +author_url: https://github.com/bartonzzx +git_url: +description: Integrate langgraph with open webui pipeline +required_open_webui_version: 0.4.3 +requirements: none +version: 0.4.3 +licence: MIT +""" + + +import os +import json +import getpass +from typing import Annotated, Literal +from typing_extensions import TypedDict + +from fastapi import FastAPI +from fastapi.responses import StreamingResponse + +from langgraph.graph import StateGraph, START, END +from langgraph.graph.message import add_messages +from langchain_openai import ChatOpenAI +from langgraph.config import get_stream_writer + + +''' +Define LLM API key +''' +def _set_env(var: str): + if not os.environ.get(var): + os.environ[var] = getpass.getpass(f"{var}: ") + + +_set_env("OPENAI_API_KEY") + + +''' +Define Langgraph +''' +def generate_custom_stream(type: Literal["think","normal"], content: str): + content = "\n"+content+"\n" + custom_stream_writer = get_stream_writer() + return custom_stream_writer({type:content}) + +class State(TypedDict): + messages: Annotated[list, add_messages] + +llm = ChatOpenAI(model="gpt-3.5-turbo") + +def chatbot(state: State): + think_response = llm.invoke(["Please reasoning:"] + state["messages"]) + normal_response = llm.invoke(state["messages"]) + generate_custom_stream("think", think_response.content) + generate_custom_stream("normal", normal_response.content) + return {"messages": [normal_response]} + +# Define graph +graph_builder = StateGraph(State) + +# Define nodes +graph_builder.add_node("chatbot", chatbot) +graph_builder.add_edge("chatbot", END) + +# Define edges +graph_builder.add_edge(START, "chatbot") + +# Compile graph +graph = graph_builder.compile() + + +''' +Define api processing +''' +app = FastAPI( + title="Langgraph API", + description="Langgraph API", + ) + +@app.get("/test") +async def test(): + return {"message": "Hello World"} + + +@app.post("/stream") +async def stream(inputs: State): + async def event_stream(): + try: + stream_start_msg = { + 'choices': + [ + { + 'delta': {}, + 'finish_reason': None + } + ] + } + + # Stream start + yield f"data: {json.dumps(stream_start_msg)}\n\n" + + # Processing langgraph stream response with block support + async for event in graph.astream(input=inputs, stream_mode="custom"): + print(event) + think_content = event.get("think", None) + normal_content = event.get("normal", None) + + think_msg = { + 'choices': + [ + { + 'delta': + { + 'reasoning_content': think_content, + }, + 'finish_reason': None + } + ] + } + + normal_msg = { + 'choices': + [ + { + 'delta': + { + 'content': normal_content, + }, + 'finish_reason': None + } + ] + } + + yield f"data: {json.dumps(think_msg)}\n\n" + yield f"data: {json.dumps(normal_msg)}\n\n" + + # End of the stream + stream_end_msg = { + 'choices': [ + { + 'delta': {}, + 'finish_reason': 'stop' + } + ] + } + yield f"data: {json.dumps(stream_end_msg)}\n\n" + + except Exception as e: + # Simply print the error information + print(f"An error occurred: {e}") + + return StreamingResponse( + event_stream(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + } + ) + +if __name__ == "__main__": + import uvicorn + + uvicorn.run(app, host="0.0.0.0", port=9000) \ No newline at end of file diff --git a/examples/pipelines/integrations/langgraph_pipeline/langgraph_stream_pipeline.py b/examples/pipelines/integrations/langgraph_pipeline/langgraph_stream_pipeline.py new file mode 100644 index 0000000..65da0df --- /dev/null +++ b/examples/pipelines/integrations/langgraph_pipeline/langgraph_stream_pipeline.py @@ -0,0 +1,63 @@ +""" +title: Langgraph stream integration +author: bartonzzx +author_url: https://github.com/bartonzzx +git_url: +description: Integrate langgraph with open webui pipeline +required_open_webui_version: 0.4.3 +requirements: none +version: 0.4.3 +licence: MIT +""" + + +import os +import requests +from pydantic import BaseModel, Field +from typing import List, Union, Generator, Iterator + + +class Pipeline: + class Valves(BaseModel): + API_URL: str = Field(default="http://127.0.0.1:9000/stream", description="Langgraph API URL") + + def __init__(self): + self.id = "LangGraph stream" + self.name = "LangGraph stream" + # Initialize valve paramaters + self.valves = self.Valves( + **{k: os.getenv(k, v.default) for k, v in self.Valves.model_fields.items()} + ) + + async def on_startup(self): + # This function is called when the server is started. + print(f"on_startup: {__name__}") + pass + + async def on_shutdown(self): + # This function is called when the server is shutdown. + print(f"on_shutdown: {__name__}") + pass + + def pipe( + self, + user_message: str, + model_id: str, + messages: List[dict], + body: dict + ) -> Union[str, Generator, Iterator]: + + data = { + "messages": [[msg['role'], msg['content']] for msg in messages], + } + + headers = { + 'accept': 'text/event-stream', + 'Content-Type': 'application/json', + } + + response = requests.post(self.valves.API_URL, json=data, headers=headers, stream=True) + + response.raise_for_status() + + return response.iter_lines() \ No newline at end of file diff --git a/examples/pipelines/integrations/langgraph_pipeline/requirements.txt b/examples/pipelines/integrations/langgraph_pipeline/requirements.txt new file mode 100644 index 0000000..fc122d6 --- /dev/null +++ b/examples/pipelines/integrations/langgraph_pipeline/requirements.txt @@ -0,0 +1,40 @@ +annotated-types==0.7.0 +anyio==4.8.0 +certifi==2025.1.31 +charset-normalizer==3.4.1 +click==8.1.8 +distro==1.9.0 +fastapi==0.115.11 +h11==0.14.0 +httpcore==1.0.7 +httpx==0.28.1 +idna==3.10 +jiter==0.9.0 +jsonpatch==1.33 +jsonpointer==3.0.0 +langchain-core==0.3.45 +langchain-openai==0.3.8 +langgraph==0.3.11 +langgraph-checkpoint==2.0.20 +langgraph-prebuilt==0.1.3 +langgraph-sdk==0.1.57 +langsmith==0.3.15 +msgpack==1.1.0 +openai==1.66.3 +orjson==3.10.15 +packaging==24.2 +pydantic==2.10.6 +pydantic_core==2.27.2 +PyYAML==6.0.2 +regex==2024.11.6 +requests==2.32.3 +requests-toolbelt==1.0.0 +sniffio==1.3.1 +starlette==0.46.1 +tenacity==9.0.0 +tiktoken==0.9.0 +tqdm==4.67.1 +typing_extensions==4.12.2 +urllib3==2.3.0 +uvicorn==0.34.0 +zstandard==0.23.0 From 9ec364092d290d43a57b5b6abb6a38ebdf2c7e29 Mon Sep 17 00:00:00 2001 From: Takahiro Kikumoto Date: Mon, 17 Mar 2025 17:54:27 +0900 Subject: [PATCH 17/41] Implemented pipeline for AWS Bedrock DeepSeek model. --- .../aws_bedrock_deepseek_pipeline.py | 174 ++++++++++++++++++ 1 file changed, 174 insertions(+) create mode 100644 examples/pipelines/providers/aws_bedrock_deepseek_pipeline.py diff --git a/examples/pipelines/providers/aws_bedrock_deepseek_pipeline.py b/examples/pipelines/providers/aws_bedrock_deepseek_pipeline.py new file mode 100644 index 0000000..310958a --- /dev/null +++ b/examples/pipelines/providers/aws_bedrock_deepseek_pipeline.py @@ -0,0 +1,174 @@ +""" +title: AWS Bedrock DeepSeek Pipeline +author: kikumoto +date: 2025-03-17 +version: 1.0 +license: MIT +description: A pipeline for generating text using the AWS Bedrock API. +requirements: boto3 +environment_variables: +""" + +import json +import logging + +from typing import List, Union, Generator, Iterator, Dict, Optional, Any + +import boto3 + +from pydantic import BaseModel + +import os + +from utils.pipelines.main import pop_system_message + +class Pipeline: + class Valves(BaseModel): + AWS_ACCESS_KEY: Optional[str] = None + AWS_SECRET_KEY: Optional[str] = None + AWS_REGION_NAME: Optional[str] = None + + def __init__(self): + self.type = "manifold" + self.name = "Bedrock DeepSeek: " + + self.valves = self.Valves( + **{ + "AWS_ACCESS_KEY": os.getenv("AWS_ACCESS_KEY", ""), + "AWS_SECRET_KEY": os.getenv("AWS_SECRET_KEY", ""), + "AWS_REGION_NAME": os.getenv( + "AWS_REGION_NAME", os.getenv( + "AWS_REGION", os.getenv("AWS_DEFAULT_REGION", "") + ) + ), + } + ) + + self.update_pipelines() + + async def on_startup(self): + # This function is called when the server is started. + print(f"on_startup:{__name__}") + self.update_pipelines() + pass + + async def on_shutdown(self): + # This function is called when the server is stopped. + print(f"on_shutdown:{__name__}") + pass + + async def on_valves_updated(self): + # This function is called when the valves are updated. + print(f"on_valves_updated:{__name__}") + self.update_pipelines() + + def update_pipelines(self) -> None: + try: + self.bedrock = boto3.client(service_name="bedrock", + aws_access_key_id=self.valves.AWS_ACCESS_KEY, + aws_secret_access_key=self.valves.AWS_SECRET_KEY, + region_name=self.valves.AWS_REGION_NAME) + self.bedrock_runtime = boto3.client(service_name="bedrock-runtime", + aws_access_key_id=self.valves.AWS_ACCESS_KEY, + aws_secret_access_key=self.valves.AWS_SECRET_KEY, + region_name=self.valves.AWS_REGION_NAME) + self.pipelines = self.get_models() + except Exception as e: + print(f"Error: {e}") + self.pipelines = [ + { + "id": "error", + "name": "Could not fetch models from Bedrock, please set up AWS Key/Secret or Instance/Task Role.", + }, + ] + + def pipelines(self) -> List[dict]: + return self.get_models() + + def get_models(self): + try: + res = [] + response = self.bedrock.list_foundation_models(byProvider='DeepSeek') + for model in response['modelSummaries']: + inference_types = model.get('inferenceTypesSupported', []) + if "ON_DEMAND" in inference_types: + res.append({'id': model['modelId'], 'name': model['modelName']}) + elif "INFERENCE_PROFILE" in inference_types: + inferenceProfileId = self.getInferenceProfileId(model['modelArn']) + if inferenceProfileId: + res.append({'id': inferenceProfileId, 'name': model['modelName']}) + + return res + except Exception as e: + print(f"Error: {e}") + return [ + { + "id": "error", + "name": "Could not fetch models from Bedrock, please check permissoin.", + }, + ] + + def getInferenceProfileId(self, modelArn: str) -> str: + response = self.bedrock.list_inference_profiles() + for profile in response.get('inferenceProfileSummaries', []): + for model in profile.get('models', []): + if model.get('modelArn') == modelArn: + return profile['inferenceProfileId'] + return None + + def pipe( + self, user_message: str, model_id: str, messages: List[dict], body: dict + ) -> Union[str, Generator, Iterator]: + # This is where you can add your custom pipelines like RAG. + print(f"pipe:{__name__}") + + try: + # Remove unnecessary keys + for key in ['user', 'chat_id', 'title']: + body.pop(key, None) + + system_message, messages = pop_system_message(messages) + + logging.info(f"pop_system_message: {json.dumps(messages)}") + + processed_messages = [] + for message in messages: + processed_content = [] + if isinstance(message.get("content"), list): + for item in message["content"]: + # DeepSeek currently doesn't support multi-modal inputs + if item["type"] == "text": + processed_content.append({"text": item["text"]}) + else: + processed_content = [{"text": message.get("content", "")}] + + processed_messages.append({"role": message["role"], "content": processed_content}) + + payload = {"modelId": model_id, + "system": [{'text': system_message["content"] if system_message else 'you are an intelligent ai assistant'}], + "messages": processed_messages, + "inferenceConfig": { + "temperature": body.get("temperature", 0.5), + "topP": body.get("top_p", 0.9), + "maxTokens": body.get("max_tokens", 8192), + "stopSequences": body.get("stop", []), + }, + } + + if body.get("stream", False): + return self.stream_response(model_id, payload) + else: + return self.get_completion(model_id, payload) + + except Exception as e: + return f"Error: {e}" + + def stream_response(self, model_id: str, payload: dict) -> Generator: + streaming_response = self.bedrock_runtime.converse_stream(**payload) + for chunk in streaming_response["stream"]: + if "contentBlockDelta" in chunk and "text" in chunk["contentBlockDelta"]["delta"]: + yield chunk["contentBlockDelta"]["delta"]["text"] + + def get_completion(self, model_id: str, payload: dict) -> str: + response = self.bedrock_runtime.converse(**payload) + return response['output']['message']['content'][0]['text'] \ No newline at end of file From c5af48c45b729747571e877a13adec789ca0b57c Mon Sep 17 00:00:00 2001 From: Takahiro Kikumoto Date: Tue, 18 Mar 2025 10:34:17 +0900 Subject: [PATCH 18/41] Add support for outputting reasoning process in DeepSeek - Modified `stream_response` method to handle reasoning content blocks. - Added logic to yield "" and "" tags around reasoning content. --- .../providers/aws_bedrock_deepseek_pipeline.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/examples/pipelines/providers/aws_bedrock_deepseek_pipeline.py b/examples/pipelines/providers/aws_bedrock_deepseek_pipeline.py index 310958a..8f6512e 100644 --- a/examples/pipelines/providers/aws_bedrock_deepseek_pipeline.py +++ b/examples/pipelines/providers/aws_bedrock_deepseek_pipeline.py @@ -165,9 +165,22 @@ class Pipeline: def stream_response(self, model_id: str, payload: dict) -> Generator: streaming_response = self.bedrock_runtime.converse_stream(**payload) + + in_resasoning_context = False for chunk in streaming_response["stream"]: - if "contentBlockDelta" in chunk and "text" in chunk["contentBlockDelta"]["delta"]: - yield chunk["contentBlockDelta"]["delta"]["text"] + if in_resasoning_context and "contentBlockStop" in chunk: + in_resasoning_context = False + yield "\n \n\n" + elif "contentBlockDelta" in chunk and "delta" in chunk["contentBlockDelta"]: + if "reasoningContent" in chunk["contentBlockDelta"]["delta"]: + if not in_resasoning_context: + yield "" + + in_resasoning_context = True + if "text" in chunk["contentBlockDelta"]["delta"]["reasoningContent"]: + yield chunk["contentBlockDelta"]["delta"]["reasoningContent"]["text"] + elif "text" in chunk["contentBlockDelta"]["delta"]: + yield chunk["contentBlockDelta"]["delta"]["text"] def get_completion(self, model_id: str, payload: dict) -> str: response = self.bedrock_runtime.converse(**payload) From 827b47d2d5f3a2f5a12b1138ae57a813b10ff66e Mon Sep 17 00:00:00 2001 From: Takahiro Kikumoto Date: Tue, 18 Mar 2025 12:01:14 +0900 Subject: [PATCH 19/41] Fix system message handling and payload cleanup in AWS Bedrock Claude Pipeline - Corrected the system message extraction to use the "content" field. - Removed unnecessary deletion of the "system" field from the payload in stream_response method. --- examples/pipelines/providers/aws_bedrock_claude_pipeline.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/examples/pipelines/providers/aws_bedrock_claude_pipeline.py b/examples/pipelines/providers/aws_bedrock_claude_pipeline.py index 4990927..9d03426 100644 --- a/examples/pipelines/providers/aws_bedrock_claude_pipeline.py +++ b/examples/pipelines/providers/aws_bedrock_claude_pipeline.py @@ -139,7 +139,7 @@ class Pipeline: payload = {"modelId": model_id, "messages": processed_messages, - "system": [{'text': system_message if system_message else 'you are an intelligent ai assistant'}], + "system": [{'text': system_message["content"] if system_message else 'you are an intelligent ai assistant'}], "inferenceConfig": {"temperature": body.get("temperature", 0.5)}, "additionalModelRequestFields": {"top_k": body.get("top_k", 200), "top_p": body.get("top_p", 0.9)} } @@ -166,8 +166,6 @@ class Pipeline: } def stream_response(self, model_id: str, payload: dict) -> Generator: - if "system" in payload: - del payload["system"] if "additionalModelRequestFields" in payload: del payload["additionalModelRequestFields"] streaming_response = self.bedrock_runtime.converse_stream(**payload) From 51e267c10f68bee3847dab2ddce61781892c394f Mon Sep 17 00:00:00 2001 From: Takahiro Kikumoto Date: Tue, 18 Mar 2025 13:29:15 +0900 Subject: [PATCH 20/41] Refactor payload structure to comply with Bedrock Converse API - Updated `inferenceConfig` to include `temperature`, `topP`, `maxTokens`, and `stopSequences`. - Added `additionalModelRequestFields` with `top_k` parameter. - Removed unnecessary deletion of `additionalModelRequestFields` in `stream_response` method. --- .../providers/aws_bedrock_claude_pipeline.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/examples/pipelines/providers/aws_bedrock_claude_pipeline.py b/examples/pipelines/providers/aws_bedrock_claude_pipeline.py index 9d03426..80edbd7 100644 --- a/examples/pipelines/providers/aws_bedrock_claude_pipeline.py +++ b/examples/pipelines/providers/aws_bedrock_claude_pipeline.py @@ -140,8 +140,13 @@ class Pipeline: payload = {"modelId": model_id, "messages": processed_messages, "system": [{'text': system_message["content"] if system_message else 'you are an intelligent ai assistant'}], - "inferenceConfig": {"temperature": body.get("temperature", 0.5)}, - "additionalModelRequestFields": {"top_k": body.get("top_k", 200), "top_p": body.get("top_p", 0.9)} + "inferenceConfig": { + "temperature": body.get("temperature", 0.5), + "topP": body.get("top_p", 0.9), + "maxTokens": body.get("max_tokens", 4096), + "stopSequences": body.get("stop", []), + }, + "additionalModelRequestFields": {"top_k": body.get("top_k", 200)} } if body.get("stream", False): return self.stream_response(model_id, payload) @@ -166,8 +171,6 @@ class Pipeline: } def stream_response(self, model_id: str, payload: dict) -> Generator: - if "additionalModelRequestFields" in payload: - del payload["additionalModelRequestFields"] streaming_response = self.bedrock_runtime.converse_stream(**payload) for chunk in streaming_response["stream"]: if "contentBlockDelta" in chunk: From c1bbbe11650b5cf10c7f5d4ceb6a893d93fee943 Mon Sep 17 00:00:00 2001 From: Takahiro Kikumoto Date: Tue, 18 Mar 2025 16:33:44 +0900 Subject: [PATCH 21/41] Refactor AWS Bedrock Claude Pipeline to support Instance Profile and Task Role - Updated `Valves` class to use `Optional[str]` for AWS credentials. - Modified `__init__` method to initialize `valves` with environment variables. - Added `update_pipelines` method to handle Bedrock client initialization and model fetching. - Refactored `on_startup` and `on_valves_updated` methods to call `update_pipelines`. - Improved error handling in `update_pipelines` and `get_models` methods. --- .../providers/aws_bedrock_claude_pipeline.py | 98 ++++++++++--------- 1 file changed, 54 insertions(+), 44 deletions(-) diff --git a/examples/pipelines/providers/aws_bedrock_claude_pipeline.py b/examples/pipelines/providers/aws_bedrock_claude_pipeline.py index 80edbd7..f347d77 100644 --- a/examples/pipelines/providers/aws_bedrock_claude_pipeline.py +++ b/examples/pipelines/providers/aws_bedrock_claude_pipeline.py @@ -12,7 +12,7 @@ import base64 import json import logging from io import BytesIO -from typing import List, Union, Generator, Iterator +from typing import List, Union, Generator, Iterator, Optional, Any import boto3 @@ -26,9 +26,9 @@ from utils.pipelines.main import pop_system_message class Pipeline: class Valves(BaseModel): - AWS_ACCESS_KEY: str = "" - AWS_SECRET_KEY: str = "" - AWS_REGION_NAME: str = "" + AWS_ACCESS_KEY: Optional[str] = None + AWS_SECRET_KEY: Optional[str] = None + AWS_REGION_NAME: Optional[str] = None def __init__(self): self.type = "manifold" @@ -47,21 +47,25 @@ class Pipeline: } ) - self.bedrock = boto3.client(aws_access_key_id=self.valves.AWS_ACCESS_KEY, - aws_secret_access_key=self.valves.AWS_SECRET_KEY, - service_name="bedrock", - region_name=self.valves.AWS_REGION_NAME) - self.bedrock_runtime = boto3.client(aws_access_key_id=self.valves.AWS_ACCESS_KEY, - aws_secret_access_key=self.valves.AWS_SECRET_KEY, - service_name="bedrock-runtime", - region_name=self.valves.AWS_REGION_NAME) + self.valves = self.Valves( + **{ + "AWS_ACCESS_KEY": os.getenv("AWS_ACCESS_KEY", ""), + "AWS_SECRET_KEY": os.getenv("AWS_SECRET_KEY", ""), + "AWS_REGION_NAME": os.getenv( + "AWS_REGION_NAME", os.getenv( + "AWS_REGION", os.getenv("AWS_DEFAULT_REGION", "") + ) + ), + } + ) - self.pipelines = self.get_models() + self.update_pipelines() async def on_startup(self): # This function is called when the server is started. print(f"on_startup:{__name__}") + self.update_pipelines() pass async def on_shutdown(self): @@ -72,40 +76,46 @@ class Pipeline: async def on_valves_updated(self): # This function is called when the valves are updated. print(f"on_valves_updated:{__name__}") - self.bedrock = boto3.client(aws_access_key_id=self.valves.AWS_ACCESS_KEY, - aws_secret_access_key=self.valves.AWS_SECRET_KEY, - service_name="bedrock", - region_name=self.valves.AWS_REGION_NAME) - self.bedrock_runtime = boto3.client(aws_access_key_id=self.valves.AWS_ACCESS_KEY, - aws_secret_access_key=self.valves.AWS_SECRET_KEY, - service_name="bedrock-runtime", - region_name=self.valves.AWS_REGION_NAME) - self.pipelines = self.get_models() + self.update_pipelines() - def pipelines(self) -> List[dict]: - return self.get_models() + def update_pipelines(self) -> None: + try: + self.bedrock = boto3.client(service_name="bedrock", + aws_access_key_id=self.valves.AWS_ACCESS_KEY, + aws_secret_access_key=self.valves.AWS_SECRET_KEY, + region_name=self.valves.AWS_REGION_NAME) + self.bedrock_runtime = boto3.client(service_name="bedrock-runtime", + aws_access_key_id=self.valves.AWS_ACCESS_KEY, + aws_secret_access_key=self.valves.AWS_SECRET_KEY, + region_name=self.valves.AWS_REGION_NAME) + self.pipelines = self.get_models() + except Exception as e: + print(f"Error: {e}") + self.pipelines = [ + { + "id": "error", + "name": "Could not fetch models from Bedrock, please set up AWS Key/Secret or Instance/Task Role.", + }, + ] def get_models(self): - if self.valves.AWS_ACCESS_KEY and self.valves.AWS_SECRET_KEY: - try: - response = self.bedrock.list_foundation_models(byProvider='Anthropic', byInferenceType='ON_DEMAND') - return [ - { - "id": model["modelId"], - "name": model["modelName"], - } - for model in response["modelSummaries"] - ] - except Exception as e: - print(f"Error: {e}") - return [ - { - "id": "error", - "name": "Could not fetch models from Bedrock, please update the Access/Secret Key in the valves.", - }, - ] - else: - return [] + try: + response = self.bedrock.list_foundation_models(byProvider='Anthropic', byInferenceType='ON_DEMAND') + return [ + { + "id": model["modelId"], + "name": model["modelName"], + } + for model in response["modelSummaries"] + ] + except Exception as e: + print(f"Error: {e}") + return [ + { + "id": "error", + "name": "Could not fetch models from Bedrock, please check permissoin.", + }, + ] def pipe( self, user_message: str, model_id: str, messages: List[dict], body: dict From ecc44ebd1e0707db56af4e186cc71e4ead9bbced Mon Sep 17 00:00:00 2001 From: Takahiro Kikumoto Date: Tue, 18 Mar 2025 16:48:12 +0900 Subject: [PATCH 22/41] Enhance get_models method to include models with INFERENCE_PROFILE type - Updated the get_models method to fetch models that support both ON_DEMAND and INFERENCE_PROFILE inference types. - Added a helper method getInferenceProfileId to retrieve the inference profile ID for models with INFERENCE_PROFILE type. - This change ensures that models with different inference types are correctly listed and available for use. --- .../providers/aws_bedrock_claude_pipeline.py | 28 +++++++++++++------ 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/examples/pipelines/providers/aws_bedrock_claude_pipeline.py b/examples/pipelines/providers/aws_bedrock_claude_pipeline.py index f347d77..245a046 100644 --- a/examples/pipelines/providers/aws_bedrock_claude_pipeline.py +++ b/examples/pipelines/providers/aws_bedrock_claude_pipeline.py @@ -100,14 +100,18 @@ class Pipeline: def get_models(self): try: - response = self.bedrock.list_foundation_models(byProvider='Anthropic', byInferenceType='ON_DEMAND') - return [ - { - "id": model["modelId"], - "name": model["modelName"], - } - for model in response["modelSummaries"] - ] + res = [] + response = self.bedrock.list_foundation_models(byProvider='Anthropic') + for model in response['modelSummaries']: + inference_types = model.get('inferenceTypesSupported', []) + if "ON_DEMAND" in inference_types: + res.append({'id': model['modelId'], 'name': model['modelName']}) + elif "INFERENCE_PROFILE" in inference_types: + inferenceProfileId = self.getInferenceProfileId(model['modelArn']) + if inferenceProfileId: + res.append({'id': inferenceProfileId, 'name': model['modelName']}) + + return res except Exception as e: print(f"Error: {e}") return [ @@ -117,6 +121,14 @@ class Pipeline: }, ] + def getInferenceProfileId(self, modelArn: str) -> str: + response = self.bedrock.list_inference_profiles() + for profile in response.get('inferenceProfileSummaries', []): + for model in profile.get('models', []): + if model.get('modelArn') == modelArn: + return profile['inferenceProfileId'] + return None + def pipe( self, user_message: str, model_id: str, messages: List[dict], body: dict ) -> Union[str, Generator, Iterator]: From 327062733a168ffbaa4dca9cbbd061fed29c6a30 Mon Sep 17 00:00:00 2001 From: Takahiro Kikumoto Date: Tue, 18 Mar 2025 17:38:46 +0900 Subject: [PATCH 23/41] Add support for Claude 3.7 thinking mode - Implemented support for Claude 3.7 thinking mode by adding reasoning effort and budget tokens. - Added checks to ensure combined tokens do not exceed the maximum limit. - Adjusted inference configuration to accommodate thinking mode requirements. - Referenced implementation from https://github.com/open-webui/pipelines/blob/main/examples/pipelines/providers/anthropic_manifold_pipeline.py. --- .../providers/aws_bedrock_claude_pipeline.py | 66 ++++++++++++++++++- 1 file changed, 63 insertions(+), 3 deletions(-) diff --git a/examples/pipelines/providers/aws_bedrock_claude_pipeline.py b/examples/pipelines/providers/aws_bedrock_claude_pipeline.py index 245a046..ab5d937 100644 --- a/examples/pipelines/providers/aws_bedrock_claude_pipeline.py +++ b/examples/pipelines/providers/aws_bedrock_claude_pipeline.py @@ -23,6 +23,17 @@ import requests from utils.pipelines.main import pop_system_message +REASONING_EFFORT_BUDGET_TOKEN_MAP = { + "none": None, + "low": 1024, + "medium": 4096, + "high": 16384, + "max": 32768, +} + +# Maximum combined token limit for Claude 3.7 +MAX_COMBINED_TOKENS = 64000 + class Pipeline: class Valves(BaseModel): @@ -170,7 +181,44 @@ class Pipeline: }, "additionalModelRequestFields": {"top_k": body.get("top_k", 200)} } + if body.get("stream", False): + supports_thinking = "claude-3-7" in model_id + reasoning_effort = body.get("reasoning_effort", "none") + budget_tokens = REASONING_EFFORT_BUDGET_TOKEN_MAP.get(reasoning_effort) + + # Allow users to input an integer value representing budget tokens + if ( + not budget_tokens + and reasoning_effort not in REASONING_EFFORT_BUDGET_TOKEN_MAP.keys() + ): + try: + budget_tokens = int(reasoning_effort) + except ValueError as e: + print("Failed to convert reasoning effort to int", e) + budget_tokens = None + + if supports_thinking and budget_tokens: + # Check if the combined tokens (budget_tokens + max_tokens) exceeds the limit + max_tokens = payload.get("max_tokens", 4096) + combined_tokens = budget_tokens + max_tokens + + if combined_tokens > MAX_COMBINED_TOKENS: + error_message = f"Error: Combined tokens (budget_tokens {budget_tokens} + max_tokens {max_tokens} = {combined_tokens}) exceeds the maximum limit of {MAX_COMBINED_TOKENS}" + print(error_message) + return error_message + + payload["inferenceConfig"]["maxTokens"] = combined_tokens + payload["additionalModelRequestFields"]["thinking"] = { + "type": "enabled", + "budget_tokens": budget_tokens, + } + # Thinking requires temperature 1.0 and does not support top_p, top_k + payload["inferenceConfig"]["temperature"] = 1.0 + if "top_k" in payload["additionalModelRequestFields"]: + del payload["additionalModelRequestFields"]["top_k"] + if "topP" in payload["inferenceConfig"]: + del payload["inferenceConfig"]["topP"] return self.stream_response(model_id, payload) else: return self.get_completion(model_id, payload) @@ -194,11 +242,23 @@ class Pipeline: def stream_response(self, model_id: str, payload: dict) -> Generator: streaming_response = self.bedrock_runtime.converse_stream(**payload) + + in_resasoning_context = False for chunk in streaming_response["stream"]: - if "contentBlockDelta" in chunk: - yield chunk["contentBlockDelta"]["delta"]["text"] + if in_resasoning_context and "contentBlockStop" in chunk: + in_resasoning_context = False + yield "\n \n\n" + elif "contentBlockDelta" in chunk and "delta" in chunk["contentBlockDelta"]: + if "reasoningContent" in chunk["contentBlockDelta"]["delta"]: + if not in_resasoning_context: + yield "" + + in_resasoning_context = True + if "text" in chunk["contentBlockDelta"]["delta"]["reasoningContent"]: + yield chunk["contentBlockDelta"]["delta"]["reasoningContent"]["text"] + elif "text" in chunk["contentBlockDelta"]["delta"]: + yield chunk["contentBlockDelta"]["delta"]["text"] def get_completion(self, model_id: str, payload: dict) -> str: response = self.bedrock_runtime.converse(**payload) return response['output']['message']['content'][0]['text'] - From 488c43edd91176b08e3a1218692c3d82da6ffd85 Mon Sep 17 00:00:00 2001 From: Takahiro Kikumoto Date: Tue, 18 Mar 2025 21:28:05 +0900 Subject: [PATCH 24/41] improve: enhance image format detection in process_image method - Add proper MIME type detection for both data URLs and HTTP requests - Extract media type from Content-Type header or MIME type - Make format detection more robust and generic - Remove hardcoded PNG/JPEG format assumptions --- .../providers/aws_bedrock_claude_pipeline.py | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/examples/pipelines/providers/aws_bedrock_claude_pipeline.py b/examples/pipelines/providers/aws_bedrock_claude_pipeline.py index ab5d937..e6ad83f 100644 --- a/examples/pipelines/providers/aws_bedrock_claude_pipeline.py +++ b/examples/pipelines/providers/aws_bedrock_claude_pipeline.py @@ -227,17 +227,24 @@ class Pipeline: def process_image(self, image: str): img_stream = None - if image["url"].startswith("data:image"): - if ',' in image["url"]: - base64_string = image["url"].split(',')[1] - image_data = base64.b64decode(base64_string) + content_type = None + if image["url"].startswith("data:image"): + mime_type, base64_string = image["url"].split(",", 1) + content_type = mime_type.split(":")[1].split(";")[0] + image_data = base64.b64decode(base64_string) img_stream = BytesIO(image_data) else: - img_stream = requests.get(image["url"]).content + response = requests.get(image["url"]) + img_stream = BytesIO(response.content) + content_type = response.headers.get('Content-Type', 'image/jpeg') + + media_type = content_type.split('/')[-1] if '/' in content_type else content_type return { - "image": {"format": "png" if image["url"].endswith(".png") else "jpeg", - "source": {"bytes": img_stream.read()}} + "image": { + "format": media_type, + "source": {"bytes": img_stream.read()} + } } def stream_response(self, model_id: str, payload: dict) -> Generator: From a80ce9b5feafa3891b1694587b45a14d0e312f8f Mon Sep 17 00:00:00 2001 From: Timothy Jaeryang Baek Date: Tue, 18 Mar 2025 07:15:57 -0700 Subject: [PATCH 25/41] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 22c11db..b08a7d4 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ # Pipelines: UI-Agnostic OpenAI API Plugin Framework > [!TIP] -> **You probably don't need Pipelines!** +> **Do NOT use Pipelines!** > > If your goal is simply to add support for additional providers like Anthropic or basic filters, you likely don't need Pipelines . For those cases, Open WebUI Functions are a better fit—it's built-in, much more convenient, and easier to configure. Pipelines, however, comes into play when you're dealing with computationally heavy tasks (e.g., running large models or complex logic) that you want to offload from your main Open WebUI instance for better performance and scalability. From d2b7077cce94d02ee040fef5ece4ce0e971192f7 Mon Sep 17 00:00:00 2001 From: Timothy Jaeryang Baek Date: Tue, 18 Mar 2025 07:16:15 -0700 Subject: [PATCH 26/41] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index b08a7d4..ab6fc90 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ # Pipelines: UI-Agnostic OpenAI API Plugin Framework > [!TIP] -> **Do NOT use Pipelines!** +> **DO NOT USE PIPELINES!** > > If your goal is simply to add support for additional providers like Anthropic or basic filters, you likely don't need Pipelines . For those cases, Open WebUI Functions are a better fit—it's built-in, much more convenient, and easier to configure. Pipelines, however, comes into play when you're dealing with computationally heavy tasks (e.g., running large models or complex logic) that you want to offload from your main Open WebUI instance for better performance and scalability. From c04d6f41cee54373bfef3f8965202b974dacf3f7 Mon Sep 17 00:00:00 2001 From: xxond <47518240+xxond@users.noreply.github.com> Date: Thu, 20 Mar 2025 19:31:12 +0300 Subject: [PATCH 27/41] Update pipeline list in google_vertexai_manifold_pipeline.py --- .../providers/google_vertexai_manifold_pipeline.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/examples/pipelines/providers/google_vertexai_manifold_pipeline.py b/examples/pipelines/providers/google_vertexai_manifold_pipeline.py index 4fe085f..cbf57d2 100644 --- a/examples/pipelines/providers/google_vertexai_manifold_pipeline.py +++ b/examples/pipelines/providers/google_vertexai_manifold_pipeline.py @@ -51,9 +51,11 @@ class Pipeline: ) self.pipelines = [ {"id": "gemini-1.5-flash-001", "name": "Gemini 1.5 Flash"}, + {"id": "gemini-2.0-flash", "name": "Gemini 2.0 Flash"}, + {"id": "gemini-2.0-flash-lite", "name": "Gemini 2.0 Flash-Lite"}, + {"id": "gemini-2.0-flash-thinking-exp-01-21", "name": "Gemini 2.0 Flash Thinking"}, {"id": "gemini-1.5-pro-001", "name": "Gemini 1.5 Pro"}, - {"id": "gemini-flash-experimental", "name": "Gemini 1.5 Flash Experimental"}, - {"id": "gemini-pro-experimental", "name": "Gemini 1.5 Pro Experimental"}, + {"id": "gemini-2.0-pro-exp-02-05", "name": "Gemini 2.0 Pro"}, ] async def on_startup(self) -> None: From ece9fceb596cf2364a00d755d6ea616e4a837527 Mon Sep 17 00:00:00 2001 From: NolanTrem <34580718+NolanTrem@users.noreply.github.com> Date: Fri, 21 Mar 2025 16:53:19 -0700 Subject: [PATCH 28/41] Add R2R Example Pipeline --- examples/pipelines/rag/r2r_pipeline.py | 45 ++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 examples/pipelines/rag/r2r_pipeline.py diff --git a/examples/pipelines/rag/r2r_pipeline.py b/examples/pipelines/rag/r2r_pipeline.py new file mode 100644 index 0000000..1dbcc15 --- /dev/null +++ b/examples/pipelines/rag/r2r_pipeline.py @@ -0,0 +1,45 @@ +""" +title: R2R Pipeline +author: Nolan Tremelling +date: 2025-03-21 +version: 1.0 +license: MIT +description: A pipeline for retrieving relevant information from a knowledge base using R2R. +requirements: r2r +""" + +from typing import List, Union, Generator, Iterator +from schemas import OpenAIChatMessage +import os +import asyncio + + +class Pipeline: + def __init__(self): + self.r2r_client = None + + async def on_startup(self): + from r2r import R2RClient + + # Connect to either SciPhi cloud or your self hosted R2R server + self.r2r_client = R2RClient(os.getenv("R2R_SERVER_URL", "https://api.sciphi.ai")) + self.r2r_client.set_api_key(os.getenv("R2R_API_KEY", "")) + + pass + + async def on_shutdown(self): + # This function is called when the server is stopped. + self.r2r_client = None + + def pipe( + self, user_message: str, model_id: str, messages: List[dict], body: dict + ) -> Union[str, Generator, Iterator]: + + print(messages) + print(user_message) + + response = self.r2r_client.retrieval.rag( + query=user_message, + ) + + return response.results.completion From 83716778a63c3103ba59cacbafd06881f6004d5a Mon Sep 17 00:00:00 2001 From: angkk2u Date: Wed, 26 Mar 2025 12:55:08 +0900 Subject: [PATCH 29/41] Update new API models Updates based on the new API models --- .../providers/perplexity_manifold_pipeline.py | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/examples/pipelines/providers/perplexity_manifold_pipeline.py b/examples/pipelines/providers/perplexity_manifold_pipeline.py index 2d87aff..c985b65 100644 --- a/examples/pipelines/providers/perplexity_manifold_pipeline.py +++ b/examples/pipelines/providers/perplexity_manifold_pipeline.py @@ -30,26 +30,26 @@ class Pipeline: # List of models self.pipelines = [ { - "id": "llama-3.1-sonar-large-128k-online", - "name": "Llama 3.1 Sonar Large 128k Online" + "id": "sonar-pro", + "name": "Sonar Pro" }, { - "id": "llama-3.1-sonar-small-128k-online", - "name": "Llama 3.1 Sonar Small 128k Online" + "id": "sonar", + "name": "Sonar" }, { - "id": "llama-3.1-sonar-large-128k-chat", - "name": "Llama 3.1 Sonar Large 128k Chat" + "id": "sonar-deep-research", + "name": "Sonar Deep Research" }, { - "id": "llama-3.1-sonar-small-128k-chat", - "name": "Llama 3.1 Sonar Small 128k Chat" + "id": "sonar-reasoning-pro", + "name": "Sonar Reasoning Pro" }, { - "id": "llama-3.1-8b-instruct", "name": "Llama 3.1 8B Instruct" + "id": "sonar-reasoning", "name": "Sonar Reasoning" }, { - "id": "llama-3.1-70b-instruct", "name": "Llama 3.1 70B Instruct" + "id": "r1-1776", "name": "R1-1776" } ] pass From 4b42a7b51455719ea94ef1df6622f6c482b02d1d Mon Sep 17 00:00:00 2001 From: Erik <35608570+weisser-dev@users.noreply.github.com> Date: Wed, 26 Mar 2025 08:02:25 +0100 Subject: [PATCH 30/41] Create azure_dalle_manifold_pipeline.py A pipeline for generating text and processing images using the Azure API. And including multiple Dall-e models --- .../azure_dalle_manifold_pipeline.py | 89 +++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 examples/pipelines/providers/azure_dalle_manifold_pipeline.py diff --git a/examples/pipelines/providers/azure_dalle_manifold_pipeline.py b/examples/pipelines/providers/azure_dalle_manifold_pipeline.py new file mode 100644 index 0000000..c64a766 --- /dev/null +++ b/examples/pipelines/providers/azure_dalle_manifold_pipeline.py @@ -0,0 +1,89 @@ +""" +title: Azure - Dall-E Manifold Pipeline +author: weisser-dev +date: 2025-03-26 +version: 1.0 +license: MIT +description: A pipeline for generating text and processing images using the Azure API. And including multiple Dall-e models +requirements: requests, os +environment_variables: AZURE_OPENAI_API_KEY, AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_API_VERSION, AZURE_OPENAI_MODELS, AZURE_OPENAI_MODEL_NAMES, IMAGE_SIZE, NUM_IMAGES +""" +from typing import List, Union, Generator, Iterator +from pydantic import BaseModel +import requests +import os + +class Pipeline: + class Valves(BaseModel): + AZURE_OPENAI_API_KEY: str + AZURE_OPENAI_ENDPOINT: str + AZURE_OPENAI_API_VERSION: str + AZURE_OPENAI_MODELS: str + AZURE_OPENAI_MODEL_NAMES: str + IMAGE_SIZE: str = "1024x1024" + NUM_IMAGES: int = 1 + + def __init__(self): + self.type = "manifold" + self.name = "Azure DALL·E: " + self.valves = self.Valves( + **{ + "AZURE_OPENAI_API_KEY": os.getenv("AZURE_OPENAI_API_KEY", "your-azure-openai-api-key-here"), + "AZURE_OPENAI_ENDPOINT": os.getenv("AZURE_OPENAI_ENDPOINT", "your-azure-openai-endpoint-here"), + "AZURE_OPENAI_API_VERSION": os.getenv("AZURE_OPENAI_API_VERSION", "2024-02-01"), + "AZURE_OPENAI_MODELS": os.getenv("AZURE_OPENAI_MODELS", "dall-e-2;dall-e-3"), #ensure that the model here is within your enpoint url, sometime the name within the url it is also like Dalle3 + "AZURE_OPENAI_MODEL_NAMES": os.getenv("AZURE_OPENAI_MODEL_NAMES", "DALL-E 2;DALL-E 3"), + } + ) + self.set_pipelines() + + def set_pipelines(self): + models = self.valves.AZURE_OPENAI_MODELS.split(";") + model_names = self.valves.AZURE_OPENAI_MODEL_NAMES.split(";") + self.pipelines = [ + {"id": model, "name": name} for model, name in zip(models, model_names) + ] + print(f"azure_dalle_pipeline - models: {self.pipelines}") + + async def on_startup(self) -> None: + print(f"on_startup:{__name__}") + + async def on_shutdown(self): + print(f"on_shutdown:{__name__}") + + async def on_valves_updated(self): + print(f"on_valves_updated:{__name__}") + self.set_pipelines() + + def pipe( + self, user_message: str, model_id: str, messages: List[dict], body: dict + ) -> Union[str, Generator, Iterator]: + print(f"pipe:{__name__}") + + headers = { + "api-key": self.valves.AZURE_OPENAI_API_KEY, + "Content-Type": "application/json", + } + + url = f"{self.valves.AZURE_OPENAI_ENDPOINT}/openai/deployments/{model_id}/images/generations?api-version={self.valves.AZURE_OPENAI_API_VERSION}" + + payload = { + "model": model_id, + "prompt": user_message, + "size": self.valves.IMAGE_SIZE, + "n": self.valves.NUM_IMAGES, + } + + try: + response = requests.post(url, json=payload, headers=headers) + response.raise_for_status() + data = response.json() + + message = "" + for image in data.get("data", []): + if "url" in image: + message += f"![image]({image['url']})\n" + + yield message + except Exception as e: + yield f"Error: {e} ({response.text if response else 'No response'})" From c0a60f1b0f74ff9f010efcdbf0acc1c7681c9846 Mon Sep 17 00:00:00 2001 From: ther3zz <40278044+ther3zz@users.noreply.github.com> Date: Fri, 28 Mar 2025 10:30:32 -0400 Subject: [PATCH 31/41] Model Name vs ID for Generation adds the ability to use the model name as the model value when inserting generation observations into langfuse. Also adds both model name and id in metadata. --- examples/filters/langfuse_filter_pipeline.py | 49 ++++++++++++++++++-- 1 file changed, 45 insertions(+), 4 deletions(-) diff --git a/examples/filters/langfuse_filter_pipeline.py b/examples/filters/langfuse_filter_pipeline.py index 48f453a..cd2c0ab 100644 --- a/examples/filters/langfuse_filter_pipeline.py +++ b/examples/filters/langfuse_filter_pipeline.py @@ -1,8 +1,8 @@ """ title: Langfuse Filter Pipeline author: open-webui -date: 2025-03-04 -version: 1.6 +date: 2025-03-28 +version: 1.7 license: MIT description: A filter pipeline that uses Langfuse. requirements: langfuse @@ -36,6 +36,8 @@ class Pipeline: host: str # New valve that controls whether task names are added as tags: insert_tags: bool = True + # New valve that controls whether to use model name instead of model ID for generation + use_model_name_instead_of_id_for_generation: bool = False debug: bool = False def __init__(self): @@ -48,6 +50,7 @@ class Pipeline: "secret_key": os.getenv("LANGFUSE_SECRET_KEY", "your-secret-key-here"), "public_key": os.getenv("LANGFUSE_PUBLIC_KEY", "your-public-key-here"), "host": os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com"), + "use_model_name_instead_of_id_for_generation": os.getenv("USE_MODEL_NAME", "false").lower() == "true", "debug": os.getenv("DEBUG_MODE", "false").lower() == "true", } ) @@ -55,6 +58,8 @@ class Pipeline: self.langfuse = None self.chat_traces = {} self.suppressed_logs = set() + # Dictionary to store model names for each chat + self.model_names = {} # Only these tasks will be treated as LLM "generations": self.GENERATION_TASKS = {"llm_response"} @@ -124,6 +129,20 @@ class Pipeline: metadata["chat_id"] = chat_id body["metadata"] = metadata + # Extract and store both model name and ID if available + model_info = metadata.get("model", {}) + model_id = body.get("model") + + # Store model information for this chat + if chat_id not in self.model_names: + self.model_names[chat_id] = {"id": model_id} + else: + self.model_names[chat_id]["id"] = model_id + + if isinstance(model_info, dict) and "name" in model_info: + self.model_names[chat_id]["name"] = model_info["name"] + self.log(f"Stored model info - name: '{model_info['name']}', id: '{model_id}' for chat_id: {chat_id}") + required_keys = ["model", "messages"] missing_keys = [key for key in required_keys if key not in body] if missing_keys: @@ -169,9 +188,20 @@ class Pipeline: # If it's a task that is considered an LLM generation if task_name in self.GENERATION_TASKS: + # Determine which model value to use based on the use_model_name valve + model_id = self.model_names.get(chat_id, {}).get("id", body["model"]) + model_name = self.model_names.get(chat_id, {}).get("name", "unknown") + + # Pick primary model identifier based on valve setting + model_value = model_name if self.valves.use_model_name_instead_of_id_for_generation else model_id + + # Add both values to metadata regardless of valve setting + metadata["model_id"] = model_id + metadata["model_name"] = model_name + generation_payload = { "name": f"{task_name}:{str(uuid.uuid4())}", - "model": body["model"], + "model": model_value, "input": body["messages"], "metadata": metadata, } @@ -241,10 +271,21 @@ class Pipeline: metadata["interface"] = "open-webui" if task_name in self.GENERATION_TASKS: + # Determine which model value to use based on the use_model_name valve + model_id = self.model_names.get(chat_id, {}).get("id", body.get("model")) + model_name = self.model_names.get(chat_id, {}).get("name", "unknown") + + # Pick primary model identifier based on valve setting + model_value = model_name if self.valves.use_model_name_instead_of_id_for_generation else model_id + + # Add both values to metadata regardless of valve setting + metadata["model_id"] = model_id + metadata["model_name"] = model_name + # If it's an LLM generation generation_payload = { "name": f"{task_name}:{str(uuid.uuid4())}", - "model": body.get("model"), # <-- Include the model in LLM generation + "model": model_value, # <-- Use model name or ID based on valve setting "input": body["messages"], "metadata": metadata, "usage": usage, From eb9a3a2d6d5c7d8c9f93f193daeed791a6c8e394 Mon Sep 17 00:00:00 2001 From: Eisai <12467320+Eisaichen@users.noreply.github.com> Date: Fri, 4 Apr 2025 23:24:20 -0500 Subject: [PATCH 32/41] Update google_manifold_pipeline.py --- .../providers/google_manifold_pipeline.py | 130 +++++++++++------- 1 file changed, 82 insertions(+), 48 deletions(-) diff --git a/examples/pipelines/providers/google_manifold_pipeline.py b/examples/pipelines/providers/google_manifold_pipeline.py index 8e71bdd..d5d500b 100644 --- a/examples/pipelines/providers/google_manifold_pipeline.py +++ b/examples/pipelines/providers/google_manifold_pipeline.py @@ -5,7 +5,7 @@ date: 2024-06-06 version: 1.3 license: MIT description: A pipeline for generating text using Google's GenAI models in Open-WebUI. -requirements: google-generativeai +requirements: google-genai environment_variables: GOOGLE_API_KEY """ @@ -14,8 +14,11 @@ import os from pydantic import BaseModel, Field -import google.generativeai as genai -from google.generativeai.types import GenerationConfig +from google import genai +from google.genai import types +from PIL import Image +from io import BytesIO +import base64 class Pipeline: @@ -24,8 +27,9 @@ class Pipeline: class Valves(BaseModel): """Options to change from the WebUI""" - GOOGLE_API_KEY: str = "" - USE_PERMISSIVE_SAFETY: bool = Field(default=False) + GOOGLE_API_KEY: str = Field(default="",description="Google Generative AI API key") + USE_PERMISSIVE_SAFETY: bool = Field(default=False,description="Use permissive safety settings") + GENERATE_IMAGE: bool = Field(default=False,description="Allow image generation") def __init__(self): self.type = "manifold" @@ -34,19 +38,20 @@ class Pipeline: self.valves = self.Valves(**{ "GOOGLE_API_KEY": os.getenv("GOOGLE_API_KEY", ""), - "USE_PERMISSIVE_SAFETY": False + "USE_PERMISSIVE_SAFETY": False, + "GENERATE_IMAGE": False }) self.pipelines = [] - genai.configure(api_key=self.valves.GOOGLE_API_KEY) - self.update_pipelines() + if self.valves.GOOGLE_API_KEY: + self.update_pipelines() async def on_startup(self) -> None: """This function is called when the server is started.""" print(f"on_startup:{__name__}") - genai.configure(api_key=self.valves.GOOGLE_API_KEY) - self.update_pipelines() + if self.valves.GOOGLE_API_KEY: + self.update_pipelines() async def on_shutdown(self) -> None: """This function is called when the server is stopped.""" @@ -57,22 +62,23 @@ class Pipeline: """This function is called when the valves are updated.""" print(f"on_valves_updated:{__name__}") - genai.configure(api_key=self.valves.GOOGLE_API_KEY) - self.update_pipelines() + if self.valves.GOOGLE_API_KEY: + self.update_pipelines() def update_pipelines(self) -> None: """Update the available models from Google GenAI""" if self.valves.GOOGLE_API_KEY: + client = genai.Client(api_key=self.valves.GOOGLE_API_KEY) try: - models = genai.list_models() + models = client.models.list() self.pipelines = [ { "id": model.name[7:], # the "models/" part messeses up the URL "name": model.display_name, } for model in models - if "generateContent" in model.supported_generation_methods + if "generateContent" in model.supported_actions if model.name[:7] == "models/" ] except Exception: @@ -92,13 +98,13 @@ class Pipeline: return "Error: GOOGLE_API_KEY is not set" try: - genai.configure(api_key=self.valves.GOOGLE_API_KEY) + client = genai.Client(api_key=self.valves.GOOGLE_API_KEY) if model_id.startswith("google_genai."): model_id = model_id[12:] model_id = model_id.lstrip(".") - if not model_id.startswith("gemini-"): + if not (model_id.startswith("gemini-") or model_id.startswith("learnlm-") or model_id.startswith("gemma-")): return f"Error: Invalid model name format: {model_id}" print(f"Pipe function called for model: {model_id}") @@ -127,50 +133,78 @@ class Pipeline: "role": "user" if message["role"] == "user" else "model", "parts": [{"text": message["content"]}] }) - - if "gemini-1.5" in model_id: - model = genai.GenerativeModel(model_name=model_id, system_instruction=system_message) - else: - if system_message: - contents.insert(0, {"role": "user", "parts": [{"text": f"System: {system_message}"}]}) - - model = genai.GenerativeModel(model_name=model_id) + print(f"{contents}") - generation_config = GenerationConfig( - temperature=body.get("temperature", 0.7), - top_p=body.get("top_p", 0.9), - top_k=body.get("top_k", 40), - max_output_tokens=body.get("max_tokens", 8192), - stop_sequences=body.get("stop", []), - ) + generation_config = { + "temperature": body.get("temperature", 0.7), + "top_p": body.get("top_p", 0.9), + "top_k": body.get("top_k", 40), + "max_output_tokens": body.get("max_tokens", 8192), + "stop_sequences": body.get("stop", []), + "response_modalities": ['Text'] + } + + if self.valves.GENERATE_IMAGE and model_id.startswith("gemini-2.0-flash-exp"): + generation_config["response_modalities"].append("Image") if self.valves.USE_PERMISSIVE_SAFETY: - safety_settings = { - genai.types.HarmCategory.HARM_CATEGORY_HARASSMENT: genai.types.HarmBlockThreshold.BLOCK_NONE, - genai.types.HarmCategory.HARM_CATEGORY_HATE_SPEECH: genai.types.HarmBlockThreshold.BLOCK_NONE, - genai.types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: genai.types.HarmBlockThreshold.BLOCK_NONE, - genai.types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: genai.types.HarmBlockThreshold.BLOCK_NONE, - } + safety_settings = [ + types.SafetySetting(category='HARM_CATEGORY_HARASSMENT', threshold='OFF'), + types.SafetySetting(category='HARM_CATEGORY_HATE_SPEECH', threshold='OFF'), + types.SafetySetting(category='HARM_CATEGORY_SEXUALLY_EXPLICIT', threshold='OFF'), + types.SafetySetting(category='HARM_CATEGORY_DANGEROUS_CONTENT', threshold='OFF'), + types.SafetySetting(category='HARM_CATEGORY_CIVIC_INTEGRITY', threshold='OFF') + ] + generation_config = types.GenerateContentConfig(**generation_config, safety_settings=safety_settings) else: - safety_settings = body.get("safety_settings") + generation_config = types.GenerateContentConfig(**generation_config) - response = model.generate_content( - contents, - generation_config=generation_config, - safety_settings=safety_settings, - stream=body.get("stream", False), - ) + if system_message: + contents.insert(0, {"role": "user", "parts": [{"text": f"System: {system_message}"}]}) if body.get("stream", False): + response = client.models.generate_content_stream( + model = model_id, + contents = contents, + config = generation_config, + ) return self.stream_response(response) else: - return response.text + response = client.models.generate_content( + model = model_id, + contents = contents, + config = generation_config, + ) + for part in response.candidates[0].content.parts: + if part.text is not None: + return part.text + elif part.inline_data is not None: + try: + image_data = base64.b64decode(part.inline_data.data) + image = Image.open(BytesIO((image_data))) + content_type = part.inline_data.mime_type + return "Image not supported yet." + except Exception as e: + print(f"Error processing image: {e}") + return "Error processing image." except Exception as e: print(f"Error generating content: {e}") - return f"An error occurred: {str(e)}" + return f"{e}" def stream_response(self, response): for chunk in response: - if chunk.text: - yield chunk.text + for candidate in chunk.candidates: + if candidate.content.parts is not None: + for part in candidate.content.parts: + if part.text is not None: + yield chunk.text + elif part.inline_data is not None: + try: + image_data = base64.b64decode(part.inline_data.data) + image = Image.open(BytesIO(image_data)) + content_type = part.inline_data.mime_type + yield "Image not supported yet." + except Exception as e: + print(f"Error processing image: {e}") + yield "Error processing image." From cf57fd0747ef8e610ed095c484ad2f78c7f32fe7 Mon Sep 17 00:00:00 2001 From: Timothy Jaeryang Baek Date: Thu, 10 Apr 2025 09:24:11 -0700 Subject: [PATCH 33/41] Create events_pipeline.py Co-Authored-By: Anthony Durussel <87324020+anthonydurussel@users.noreply.github.com> --- examples/pipelines/events_pipeline.py | 83 +++++++++++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 examples/pipelines/events_pipeline.py diff --git a/examples/pipelines/events_pipeline.py b/examples/pipelines/events_pipeline.py new file mode 100644 index 0000000..2baece9 --- /dev/null +++ b/examples/pipelines/events_pipeline.py @@ -0,0 +1,83 @@ +from typing import List, Union, Generator, Iterator, Optional +from pprint import pprint +import time + +# Uncomment to disable SSL verification warnings if needed. +# warnings.filterwarnings('ignore', message='Unverified HTTPS request') + + +class Pipeline: + def __init__(self): + self.name = "Pipeline with Status Event" + self.description = ( + "This is a pipeline that demonstrates how to use the status event." + ) + self.debug = True + self.version = "0.1.0" + self.author = "Anthony Durussel" + + async def on_startup(self): + # This function is called when the server is started. + print(f"on_startup: {__name__}") + pass + + async def on_shutdown(self): + # This function is called when the server is shutdown. + print(f"on_shutdown: {__name__}") + pass + + async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: + # This function is called before the OpenAI API request is made. You can modify the form data before it is sent to the OpenAI API. + print(f"inlet: {__name__}") + if self.debug: + print(f"inlet: {__name__} - body:") + pprint(body) + print(f"inlet: {__name__} - user:") + pprint(user) + return body + + async def outlet(self, body: dict, user: Optional[dict] = None) -> dict: + # This function is called after the OpenAI API response is completed. You can modify the messages after they are received from the OpenAI API. + print(f"outlet: {__name__}") + if self.debug: + print(f"outlet: {__name__} - body:") + pprint(body) + print(f"outlet: {__name__} - user:") + pprint(user) + return body + + def pipe( + self, + user_message: str, + model_id: str, + messages: List[dict], + body: dict, + ) -> Union[str, Generator, Iterator]: + print(f"pipe: {__name__}") + + if self.debug: + print(f"pipe: {__name__} - received message from user: {user_message}") + + yield { + "event": { + "type": "status", + "data": { + "description": "Fake Status", + "done": False, + }, + } + } + + time.sleep(5) # Sleep for 5 seconds + + yield f"user_message: {user_message}" + + yield { + "event": { + "type": "status", + "data": { + "description": "", + "done": True, + }, + } + } From 756d3a94f52aaa55c82964be3a13fccde1e5af56 Mon Sep 17 00:00:00 2001 From: Erik <35608570+weisser-dev@users.noreply.github.com> Date: Fri, 11 Apr 2025 12:22:37 +0200 Subject: [PATCH 34/41] Update main.py --- main.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/main.py b/main.py index cff3335..1b3cc76 100644 --- a/main.py +++ b/main.py @@ -39,6 +39,10 @@ PIPELINES = {} PIPELINE_MODULES = {} PIPELINE_NAMES = {} +#Add GLOBAL_LOG_LEVEL for Pipeplines +log_level = os.getenv('GLOBAL_LOG_LEVEL', 'INFO').upper() +logging.basicConfig(level=LOG_LEVELS[log_level]) + def get_all_pipelines(): pipelines = {} From 5e1f90dc49de79fff04f791b404e6b4eca8f8366 Mon Sep 17 00:00:00 2001 From: Eric Z Date: Fri, 11 Apr 2025 23:53:11 -0500 Subject: [PATCH 35/41] feat(flowise): add flowise integration for agentic evaluation --- .../integrations/flowise_pipeline.py | 272 ++++++++++++++++++ 1 file changed, 272 insertions(+) create mode 100644 examples/pipelines/integrations/flowise_pipeline.py diff --git a/examples/pipelines/integrations/flowise_pipeline.py b/examples/pipelines/integrations/flowise_pipeline.py new file mode 100644 index 0000000..18f9ce1 --- /dev/null +++ b/examples/pipelines/integrations/flowise_pipeline.py @@ -0,0 +1,272 @@ +""" +title: FlowiseAI Integration +author: Claude +author_url: https://anthropic.com +git_url: https://github.com/open-webui/pipelines/ +description: Access FlowiseAI endpoints with customizable flows +required_open_webui_version: 0.4.3 +requirements: requests +version: 0.4.3 +licence: MIT +""" + +from typing import List, Union, Generator, Iterator, Dict, Optional +from pydantic import BaseModel, Field +import requests +import os +import re +import json +from datetime import datetime +import time + +from logging import getLogger +logger = getLogger(__name__) +logger.setLevel("DEBUG") + + +class Pipeline: + class Valves(BaseModel): + API_KEY: str = Field(default="", description="FlowiseAI API key") + API_URL: str = Field(default="", description="FlowiseAI base URL") + RATE_LIMIT: int = Field(default=5, description="Rate limit for the pipeline") + + FLOW_0_ENABLED: Optional[bool] = Field(default=False, description="Flow 0 Enabled (make this flow available for use)") + FLOW_0_ID: Optional[str] = Field(default=None, description="Flow 0 ID (the FlowiseAI flow identifier)") + FLOW_0_NAME: Optional[str] = Field(default=None, description="Flow 0 Name (human-readable name for the flow)") + + FLOW_1_ENABLED: Optional[bool] = Field(default=False, description="Flow 1 Enabled (make this flow available for use)") + FLOW_1_ID: Optional[str] = Field(default=None, description="Flow 1 ID (the FlowiseAI flow identifier)") + FLOW_1_NAME: Optional[str] = Field(default=None, description="Flow 1 Name (human-readable name for the flow)") + + FLOW_2_ENABLED: Optional[bool] = Field(default=False, description="Flow 2 Enabled (make this flow available for use)") + FLOW_2_ID: Optional[str] = Field(default=None, description="Flow 2 ID (the FlowiseAI flow identifier)") + FLOW_2_NAME: Optional[str] = Field(default=None, description="Flow 2 Name (human-readable name for the flow)") + + FLOW_3_ENABLED: Optional[bool] = Field(default=False, description="Flow 3 Enabled (make this flow available for use)") + FLOW_3_ID: Optional[str] = Field(default=None, description="Flow 3 ID (the FlowiseAI flow identifier)") + FLOW_3_NAME: Optional[str] = Field(default=None, description="Flow 3 Name (human-readable name for the flow)") + + FLOW_4_ENABLED: Optional[bool] = Field(default=False, description="Flow 4 Enabled (make this flow available for use)") + FLOW_4_ID: Optional[str] = Field(default=None, description="Flow 4 ID (the FlowiseAI flow identifier)") + FLOW_4_NAME: Optional[str] = Field(default=None, description="Flow 4 Name (human-readable name for the flow)") + + def __init__(self): + self.name = "FlowiseAI Pipeline" + + # Initialize valve parameters from environment variables + self.valves = self.Valves( + **{k: os.getenv(k, v.default) for k, v in self.Valves.model_fields.items()} + ) + + # Build flow mapping for faster lookup + self.flows = {} + self.update_flows() + + def update_flows(self): + """Update the flows dictionary based on the current valve settings""" + self.flows = {} + # Iterate through each flow + for i in range(20): # Support up to 20 flows + enabled_name = f"FLOW_{i}_ENABLED" + if not hasattr(self.valves, enabled_name): # sequential numbering + break + enabled = getattr(self.valves, f"FLOW_{i}_ENABLED", False) + flow_id = getattr(self.valves, f"FLOW_{i}_ID", None) + flow_name = getattr(self.valves, f"FLOW_{i}_NAME", None) + + if enabled and flow_id and flow_name: + self.flows[flow_name.lower()] = flow_id + + logger.info(f"Updated flows: {list(self.flows.keys())}") + + async def on_startup(self): + """Called when the server is started""" + logger.debug(f"on_startup:{self.name}") + self.update_flows() + + async def on_shutdown(self): + """Called when the server is stopped""" + logger.debug(f"on_shutdown:{self.name}") + + async def on_valves_updated(self) -> None: + """Called when valves are updated""" + logger.debug(f"on_valves_updated:{self.name}") + self.update_flows() + + def rate_check(self, dt_start: datetime) -> bool: + """ + Check time, sleep if not enough time has passed for rate + + Args: + dt_start (datetime): Start time of the operation + Returns: + bool: True if sleep was done + """ + dt_end = datetime.now() + time_diff = (dt_end - dt_start).total_seconds() + time_buffer = (1 / self.valves.RATE_LIMIT) + if time_diff >= time_buffer: # no need to sleep + return False + time.sleep(time_buffer - time_diff) + return True + + def parse_user_input(self, user_message: str) -> tuple[str, str]: + """ + Parse the user message to extract flow name and query + + Format expected: @flow_name: query + + Args: + user_message (str): User's input message + + Returns: + tuple[str, str]: Flow name and query + """ + # Match pattern @flow_name: query + pattern = r"^@([^:]+):\s*(.+)$" + match = re.match(pattern, user_message.strip()) + + if not match: + return None, user_message + + flow_name = match.group(1).strip().lower() + query = match.group(2).strip() + + return flow_name, query + + def pipe( + self, + user_message: str, + model_id: str, + messages: List[dict], + body: dict + ) -> Union[str, Generator, Iterator]: + """ + Main pipeline function. Calls a specified FlowiseAI flow with the provided query. + + Format expected: @flow_name: query + If no flow is specified, a list of available flows will be returned. + """ + logger.debug(f"pipe:{self.name}") + + dt_start = datetime.now() + streaming = body.get("stream", False) + logger.warning(f"Stream: {streaming}") + context = "" + + # Check if we have valid API configuration + if not self.valves.API_KEY or not self.valves.API_URL: + error_msg = "FlowiseAI configuration missing. Please set API_KEY and API_URL valves." + if streaming: + yield error_msg + else: + return error_msg + + # Parse the user message to extract flow name and query + flow_name, query = self.parse_user_input(user_message) + + # If no flow specified or invalid flow, list available flows + if not flow_name or flow_name not in self.flows: + available_flows = list(self.flows.keys()) + if not available_flows: + no_flows_msg = "No flows configured. Enable at least one FLOW_X_ENABLED valve and set its ID and NAME." + if streaming: + yield no_flows_msg + else: + return no_flows_msg + + flows_list = "\n".join([f"- @{flow}" for flow in available_flows]) + help_msg = f"Please specify a flow using the format: @flow_name: your query\n\nAvailable flows:\n{flows_list}" + + if not flow_name: + help_msg = "No flow specified. " + help_msg + else: + help_msg = f"Invalid flow '{flow_name}'. " + help_msg + + if streaming: + yield help_msg + else: + return help_msg + + # Get the flow ID from the map + flow_id = self.flows[flow_name] + + if streaming: + yield from self.stream_retrieve(flow_id, flow_name, query, dt_start) + else: + for chunk in self.stream_retrieve(flow_id, flow_name, query, dt_start): + context += chunk + return context if context else "No response from FlowiseAI" + + def stream_retrieve( + self, flow_id: str, flow_name: str, query: str, dt_start: datetime + ) -> Generator: + """ + Call the FlowiseAI endpoint with the specified flow ID and query. + + Args: + flow_id (str): The ID of the flow to call + flow_name (str): The name of the flow (for logging) + query (str): The user's query + dt_start (datetime): Start time for rate limiting + + Returns: + Generator: Response chunks for streaming + """ + if not query: + yield "Query is empty. Please provide a question or prompt for the flow." + return + + api_url = f"{self.valves.API_URL.rstrip('/')}/api/v1/prediction/{flow_id}" + headers = {"Authorization": f"Bearer {self.valves.API_KEY}"} + + payload = { + "question": query, + } + + try: + logger.info(f"Calling FlowiseAI flow '{flow_name}' with query: {query}") + + # Rate limiting check + self.rate_check(dt_start) + + response = requests.post(api_url, headers=headers, json=payload) + + if response.status_code != 200: + error_msg = f"Error from FlowiseAI: Status {response.status_code}" + logger.error(f"{error_msg} - {response.text}") + yield error_msg + return + + try: + result = response.json() + + # Format might vary based on flow configuration + # Try common response formats + if isinstance(result, dict): + if "text" in result: + yield result["text"] + elif "answer" in result: + yield result["answer"] + elif "response" in result: + yield result["response"] + elif "result" in result: + yield result["result"] + else: + # If no standard field found, return full JSON as string + yield f"```json\n{json.dumps(result, indent=2)}\n```" + elif isinstance(result, str): + yield result + else: + yield f"```json\n{json.dumps(result, indent=2)}\n```" + + except json.JSONDecodeError: + # If not JSON, return the raw text + yield response.text + + except Exception as e: + error_msg = f"Error calling FlowiseAI: {str(e)}" + logger.error(error_msg) + yield error_msg + + return \ No newline at end of file From cb5a16a5a499e58e0340c5961a92b6b09a726bdc Mon Sep 17 00:00:00 2001 From: Eric Z Date: Sat, 12 Apr 2025 14:26:55 -0500 Subject: [PATCH 36/41] fix(flowise): add streaming capability to flowise integration --- .../integrations/flowise_pipeline.py | 172 +++++++++++++++--- 1 file changed, 146 insertions(+), 26 deletions(-) diff --git a/examples/pipelines/integrations/flowise_pipeline.py b/examples/pipelines/integrations/flowise_pipeline.py index 18f9ce1..026a85f 100644 --- a/examples/pipelines/integrations/flowise_pipeline.py +++ b/examples/pipelines/integrations/flowise_pipeline.py @@ -5,7 +5,7 @@ author_url: https://anthropic.com git_url: https://github.com/open-webui/pipelines/ description: Access FlowiseAI endpoints with customizable flows required_open_webui_version: 0.4.3 -requirements: requests +requirements: requests,flowise>=1.0.4 version: 0.4.3 licence: MIT """ @@ -18,6 +18,7 @@ import re import json from datetime import datetime import time +from flowise import Flowise, PredictionData from logging import getLogger logger = getLogger(__name__) @@ -26,29 +27,51 @@ logger.setLevel("DEBUG") class Pipeline: class Valves(BaseModel): - API_KEY: str = Field(default="", description="FlowiseAI API key") - API_URL: str = Field(default="", description="FlowiseAI base URL") - RATE_LIMIT: int = Field(default=5, description="Rate limit for the pipeline") + FLOWISE_API_KEY: str = Field(default="", description="FlowiseAI API key (from Bearer key, e.g. QMknVTFTB40Pk23n6KIVRgdB7va2o-Xlx73zEfpeOu0)") + FLOWISE_BASE_URL: str = Field(default="", description="FlowiseAI base URL (e.g. http://localhost:3000 (URL before '/api/v1/prediction'))") + RATE_LIMIT: int = Field(default=5, description="Rate limit for the pipeline (ops/minute)") FLOW_0_ENABLED: Optional[bool] = Field(default=False, description="Flow 0 Enabled (make this flow available for use)") - FLOW_0_ID: Optional[str] = Field(default=None, description="Flow 0 ID (the FlowiseAI flow identifier)") - FLOW_0_NAME: Optional[str] = Field(default=None, description="Flow 0 Name (human-readable name for the flow)") + FLOW_0_ID: Optional[str] = Field(default=None, description="Flow 0 ID (the flow GUID, e.g. b06d97f5-da14-4d29-81bd-8533261b6c88)") + FLOW_0_NAME: Optional[str] = Field(default=None, description="Flow 0 Name (human-readable flow name, no special characters, e.g. news or stock-reader)") FLOW_1_ENABLED: Optional[bool] = Field(default=False, description="Flow 1 Enabled (make this flow available for use)") - FLOW_1_ID: Optional[str] = Field(default=None, description="Flow 1 ID (the FlowiseAI flow identifier)") - FLOW_1_NAME: Optional[str] = Field(default=None, description="Flow 1 Name (human-readable name for the flow)") + FLOW_1_ID: Optional[str] = Field(default=None, description="Flow 1 ID (the flow GUID, e.g. b06d97f5-da14-4d29-81bd-8533261b6c88)") + FLOW_1_NAME: Optional[str] = Field(default=None, description="Flow 1 Name (human-readable flwo name, no special characters, e.g. news or stock-reader)") FLOW_2_ENABLED: Optional[bool] = Field(default=False, description="Flow 2 Enabled (make this flow available for use)") - FLOW_2_ID: Optional[str] = Field(default=None, description="Flow 2 ID (the FlowiseAI flow identifier)") - FLOW_2_NAME: Optional[str] = Field(default=None, description="Flow 2 Name (human-readable name for the flow)") + FLOW_2_ID: Optional[str] = Field(default=None, description="Flow 2 ID (the flow GUID, e.g. b06d97f5-da14-4d29-81bd-8533261b6c88)") + FLOW_2_NAME: Optional[str] = Field(default=None, description="Flow 2 Name (human-readable flow name, no special characters, e.g. news or stock-reader)") FLOW_3_ENABLED: Optional[bool] = Field(default=False, description="Flow 3 Enabled (make this flow available for use)") - FLOW_3_ID: Optional[str] = Field(default=None, description="Flow 3 ID (the FlowiseAI flow identifier)") - FLOW_3_NAME: Optional[str] = Field(default=None, description="Flow 3 Name (human-readable name for the flow)") + FLOW_3_ID: Optional[str] = Field(default=None, description="Flow 3 ID (the flow GUID, e.g. b06d97f5-da14-4d29-81bd-8533261b6c88)") + FLOW_3_NAME: Optional[str] = Field(default=None, description="Flow 3 Name (human-readable flow name, no special characters, e.g. news or stock-reader)") FLOW_4_ENABLED: Optional[bool] = Field(default=False, description="Flow 4 Enabled (make this flow available for use)") - FLOW_4_ID: Optional[str] = Field(default=None, description="Flow 4 ID (the FlowiseAI flow identifier)") - FLOW_4_NAME: Optional[str] = Field(default=None, description="Flow 4 Name (human-readable name for the flow)") + FLOW_4_ID: Optional[str] = Field(default=None, description="Flow 4 ID (the flow GUID, e.g. b06d97f5-da14-4d29-81bd-8533261b6c88)") + FLOW_4_NAME: Optional[str] = Field(default=None, description="Flow 4 Name (human-readable flow name, no special characters, e.g. news or stock-reader)") + + FLOW_5_ENABLED: Optional[bool] = Field(default=False, description="Flow 5 Enabled (make this flow available for use)") + FLOW_5_ID: Optional[str] = Field(default=None, description="Flow 5 ID (the flow GUID, e.g. b06d97f5-da14-4d29-81bd-8533261b6c88)") + FLOW_5_NAME: Optional[str] = Field(default=None, description="Flow 5 Name (human-readable flow name, no special characters, e.g. news or stock-reader)") + + FLOW_6_ENABLED: Optional[bool] = Field(default=False, description="Flow 6 Enabled (make this flow available for use)") + FLOW_6_ID: Optional[str] = Field(default=None, description="Flow 6 ID (the flow GUID, e.g. b06d97f5-da14-4d29-81bd-8533261b6c88)") + FLOW_6_NAME: Optional[str] = Field(default=None, description="Flow 6 Name (human-readable flow name, no special characters, e.g. news or stock-reader)") + + FLOW_7_ENABLED: Optional[bool] = Field(default=False, description="Flow 7 Enabled (make this flow available for use)") + FLOW_7_ID: Optional[str] = Field(default=None, description="Flow 7 ID (the flow GUID, e.g. b06d97f5-da14-4d29-81bd-8533261b6c88)") + FLOW_7_NAME: Optional[str] = Field(default=None, description="Flow 7 Name (human-readable flow name, no special characters, e.g. news or stock-reader)") + + FLOW_8_ENABLED: Optional[bool] = Field(default=False, description="Flow 8 Enabled (make this flow available for use)") + FLOW_8_ID: Optional[str] = Field(default=None, description="Flow 8 ID (the flow GUID, e.g. b06d97f5-da14-4d29-81bd-8533261b6c88)") + FLOW_8_NAME: Optional[str] = Field(default=None, description="Flow 8 Name (human-readable flow name, no special characters, e.g. news or stock-reader)") + + FLOW_9_ENABLED: Optional[bool] = Field(default=False, description="Flow 9 Enabled (make this flow available for use)") + FLOW_9_ID: Optional[str] = Field(default=None, description="Flow 9 ID (the flow GUID, e.g. b06d97f5-da14-4d29-81bd-8533261b6c88)") + FLOW_9_NAME: Optional[str] = Field(default=None, description="Flow 9 Name (human-readable flow name, no special characters, e.g. news or stock-reader)") + + def __init__(self): self.name = "FlowiseAI Pipeline" @@ -122,8 +145,8 @@ class Pipeline: Returns: tuple[str, str]: Flow name and query """ - # Match pattern @flow_name: query - pattern = r"^@([^:]+):\s*(.+)$" + # Match pattern flow_name: query + pattern = r"^([^:]+):\s*(.+)$" match = re.match(pattern, user_message.strip()) if not match: @@ -131,6 +154,10 @@ class Pipeline: flow_name = match.group(1).strip().lower() query = match.group(2).strip() + + date_now = datetime.now().strftime("%Y-%m-%d") + time_now = datetime.now().strftime("%H:%M:%S") + query = f"{query}; today's date is {date_now} and the current time is {time_now}" return flow_name, query @@ -155,8 +182,8 @@ class Pipeline: context = "" # Check if we have valid API configuration - if not self.valves.API_KEY or not self.valves.API_URL: - error_msg = "FlowiseAI configuration missing. Please set API_KEY and API_URL valves." + if not self.valves.FLOWISE_API_KEY or not self.valves.FLOWISE_BASE_URL: + error_msg = "FlowiseAI configuration missing. Please set FLOWISE_API_KEY and FLOWISE_BASE_URL valves." if streaming: yield error_msg else: @@ -166,7 +193,7 @@ class Pipeline: flow_name, query = self.parse_user_input(user_message) # If no flow specified or invalid flow, list available flows - if not flow_name or flow_name not in self.flows: + if flow_name is None or flow_name not in self.flows: available_flows = list(self.flows.keys()) if not available_flows: no_flows_msg = "No flows configured. Enable at least one FLOW_X_ENABLED valve and set its ID and NAME." @@ -175,16 +202,17 @@ class Pipeline: else: return no_flows_msg - flows_list = "\n".join([f"- @{flow}" for flow in available_flows]) - help_msg = f"Please specify a flow using the format: @flow_name: your query\n\nAvailable flows:\n{flows_list}" + flows_list = "\n".join([f"- {flow}" for flow in available_flows]) + help_msg = f"Please specify a flow using the format: flow_name: your query\n\nAvailable flows:\n{flows_list}" - if not flow_name: + if flow_name is None: help_msg = "No flow specified. " + help_msg else: help_msg = f"Invalid flow '{flow_name}'. " + help_msg if streaming: yield help_msg + return else: return help_msg @@ -194,7 +222,7 @@ class Pipeline: if streaming: yield from self.stream_retrieve(flow_id, flow_name, query, dt_start) else: - for chunk in self.stream_retrieve(flow_id, flow_name, query, dt_start): + for chunk in self.static_retrieve(flow_id, flow_name, query, dt_start): context += chunk return context if context else "No response from FlowiseAI" @@ -202,7 +230,7 @@ class Pipeline: self, flow_id: str, flow_name: str, query: str, dt_start: datetime ) -> Generator: """ - Call the FlowiseAI endpoint with the specified flow ID and query. + Stream responses from FlowiseAI using the official client library. Args: flow_id (str): The ID of the flow to call @@ -216,9 +244,101 @@ class Pipeline: if not query: yield "Query is empty. Please provide a question or prompt for the flow." return + + try: + logger.info(f"Streaming from FlowiseAI flow '{flow_name}' with query: {query}") - api_url = f"{self.valves.API_URL.rstrip('/')}/api/v1/prediction/{flow_id}" - headers = {"Authorization": f"Bearer {self.valves.API_KEY}"} + # Rate limiting check + self.rate_check(dt_start) + + # Initialize Flowise client with API configuration + client = Flowise( + base_url=self.valves.FLOWISE_BASE_URL.rstrip('/'), + api_key=self.valves.FLOWISE_API_KEY + ) + + # Create streaming prediction request + completion = client.create_prediction( + PredictionData( + chatflowId=flow_id, + question=query, + streaming=True + ) + ) + + except Exception as e: + error_msg = f"Error streaming from FlowiseAI: {str(e)}" + logger.error(error_msg) + yield error_msg + + idx_last_update = 0 + yield f"Analysis started... {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n" + + # Process each streamed chunk + for chunk in completion: + try: + if isinstance(chunk, str): + chunk = json.loads(chunk) + except Exception as e: + # If chunk is not a string, it's already a dictionary + pass + + try: + if isinstance(chunk, dict): + # Expected format: {event: "token", data: "content"} + if "event" in chunk: + if ((chunk["event"] in ["start", "update", "agentReasoning"]) and + ("data" in chunk) and (isinstance(chunk["data"], list))): + for data_update in chunk["data"][idx_last_update:]: + # e.g. {"event":"start","data":[{"agentName":"Perspective Explorer","messages":["... + idx_last_update += 1 + yield "\n---\n" + yield f"\n__Reasoning: {data_update['agentName']} ({datetime.now().strftime('%Y-%m-%d %H:%M:%S')})__\n\n" + for message in data_update["messages"]: + yield message # yield message for each agent update + elif chunk["event"] == "end": + # {"event":"end","data":"[DONE]"} + yield "\n---\n" + yield f"\nAnalysis complete. ({datetime.now().strftime('%Y-%m-%d %H:%M:%S')})\n\n" + elif chunk["event"] == "token": + # do nothing, this is the flat output of the flow (final) + pass + elif "error" in chunk: + error_msg = f"Error from FlowiseAI: {chunk['error']}" + logger.error(error_msg) + yield error_msg + else: + # If chunk format is unexpected, yield as is + yield str(chunk) + except Exception as e: + logger.error(f"Error processing chunk: {str(e)}") + yield f"\nUnusual Response Chunk: ({datetime.now().strftime('%Y-%m-%d %H:%M:%S')})\n{str(e)}\n" + yield f"\n---\n" + yield str(chunk) + + return + + def static_retrieve( + self, flow_id: str, flow_name: str, query: str, dt_start: datetime + ) -> Generator: + """ + Call the FlowiseAI endpoint with the specified flow ID and query using REST API. + + Args: + flow_id (str): The ID of the flow to call + flow_name (str): The name of the flow (for logging) + query (str): The user's query + dt_start (datetime): Start time for rate limiting + + Returns: + Generator: Response chunks for non-streaming requests + """ + if not query: + yield "Query is empty. Please provide a question or prompt for the flow." + return + + api_url = f"{self.valves.FLOWISE_BASE_URL.rstrip('/')}/api/v1/prediction/{flow_id}" + headers = {"Authorization": f"Bearer {self.valves.FLOWISE_API_KEY}"} payload = { "question": query, From af479bc7ceb5f2ce1f3b5fef63e05326bb202df8 Mon Sep 17 00:00:00 2001 From: Eric Z Date: Sat, 12 Apr 2025 14:50:38 -0500 Subject: [PATCH 37/41] fix(flowise): minor cleanup and finalize of integration hook --- .../integrations/flowise_pipeline.py | 52 ++++++++++++++++--- 1 file changed, 44 insertions(+), 8 deletions(-) diff --git a/examples/pipelines/integrations/flowise_pipeline.py b/examples/pipelines/integrations/flowise_pipeline.py index 026a85f..d57d1bd 100644 --- a/examples/pipelines/integrations/flowise_pipeline.py +++ b/examples/pipelines/integrations/flowise_pipeline.py @@ -1,9 +1,9 @@ """ title: FlowiseAI Integration -author: Claude -author_url: https://anthropic.com +author: Eric Zavesky +author_url: https://github.com/ezavesky git_url: https://github.com/open-webui/pipelines/ -description: Access FlowiseAI endpoints with customizable flows +description: Access FlowiseAI endpoints via chat integration required_open_webui_version: 0.4.3 requirements: requests,flowise>=1.0.4 version: 0.4.3 @@ -85,6 +85,33 @@ class Pipeline: self.flows = {} self.update_flows() + def get_flow_details(self, flow_id: str) -> Optional[dict]: + """ + Fetch flow details from the FlowiseAI API + + Args: + flow_id (str): The ID of the flow to fetch + + Returns: + Optional[dict]: Flow details if successful, None if failed + """ + try: + api_url = f"{self.valves.FLOWISE_BASE_URL.rstrip('/')}/api/v1/chatflows/{flow_id}" + headers = {"Authorization": f"Bearer {self.valves.FLOWISE_API_KEY}"} + + response = requests.get(api_url, headers=headers) + + if response.status_code == 200: + data = response.json() + return data + else: + logger.error(f"Error fetching flow details: Status {response.status_code}") + return None + + except Exception as e: + logger.error(f"Error fetching flow details: {str(e)}") + return None + def update_flows(self): """Update the flows dictionary based on the current valve settings""" self.flows = {} @@ -98,9 +125,18 @@ class Pipeline: flow_name = getattr(self.valves, f"FLOW_{i}_NAME", None) if enabled and flow_id and flow_name: - self.flows[flow_name.lower()] = flow_id + # Fetch flow details from API + flow_details = self.get_flow_details(flow_id) + api_name = flow_details.get('name', 'Unknown') if flow_details else 'Unknown' + + # Store both names in the flows dictionary + self.flows[flow_name.lower()] = { + 'id': flow_id, + 'brief_name': flow_name, + 'api_name': api_name + } - logger.info(f"Updated flows: {list(self.flows.keys())}") + logger.info(f"Updated flows: {[{k: v['api_name']} for k, v in self.flows.items()]}") async def on_startup(self): """Called when the server is started""" @@ -202,8 +238,8 @@ class Pipeline: else: return no_flows_msg - flows_list = "\n".join([f"- {flow}" for flow in available_flows]) - help_msg = f"Please specify a flow using the format: flow_name: your query\n\nAvailable flows:\n{flows_list}" + flows_list = "\n".join([f"- flow_name: {flow} (description:{self.flows[flow]['api_name']})" for flow in available_flows]) + help_msg = f"Please specify a flow using the format: : \n\nAvailable flows:\n{flows_list}" if flow_name is None: help_msg = "No flow specified. " + help_msg @@ -217,7 +253,7 @@ class Pipeline: return help_msg # Get the flow ID from the map - flow_id = self.flows[flow_name] + flow_id = self.flows[flow_name]['id'] if streaming: yield from self.stream_retrieve(flow_id, flow_name, query, dt_start) From ef52d542554baf8e810a6c45521a281920e2a3ea Mon Sep 17 00:00:00 2001 From: Eric Z Date: Sat, 12 Apr 2025 17:16:39 -0500 Subject: [PATCH 38/41] fix(undefined-const): hotfix for missing constants; hazarding guess from context but breaks build --- config.py | 11 ++++++++++- main.py | 2 +- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/config.py b/config.py index 7666d0a..28b1031 100644 --- a/config.py +++ b/config.py @@ -1,5 +1,5 @@ import os - +import logging #################################### # Load .env file #################################### @@ -11,5 +11,14 @@ try: except ImportError: print("dotenv not installed, skipping...") +# Define log levels dictionary +LOG_LEVELS = { + 'DEBUG': logging.DEBUG, + 'INFO': logging.INFO, + 'WARNING': logging.WARNING, + 'ERROR': logging.ERROR, + 'CRITICAL': logging.CRITICAL +} + API_KEY = os.getenv("PIPELINES_API_KEY", "0p3n-w3bu!") PIPELINES_DIR = os.getenv("PIPELINES_DIR", "./pipelines") diff --git a/main.py b/main.py index 1b3cc76..ca9c6c5 100644 --- a/main.py +++ b/main.py @@ -29,7 +29,7 @@ import sys import subprocess -from config import API_KEY, PIPELINES_DIR +from config import API_KEY, PIPELINES_DIR, LOG_LEVELS if not os.path.exists(PIPELINES_DIR): os.makedirs(PIPELINES_DIR) From 88613aa75de141587c92e519887a77ebd82f3d31 Mon Sep 17 00:00:00 2001 From: Timothy Jaeryang Baek Date: Sun, 13 Apr 2025 17:35:34 -0700 Subject: [PATCH 39/41] REFAC --- main.py | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/main.py b/main.py index ca9c6c5..eb82c08 100644 --- a/main.py +++ b/main.py @@ -39,8 +39,8 @@ PIPELINES = {} PIPELINE_MODULES = {} PIPELINE_NAMES = {} -#Add GLOBAL_LOG_LEVEL for Pipeplines -log_level = os.getenv('GLOBAL_LOG_LEVEL', 'INFO').upper() +# Add GLOBAL_LOG_LEVEL for Pipeplines +log_level = os.getenv("GLOBAL_LOG_LEVEL", "INFO").upper() logging.basicConfig(level=LOG_LEVELS[log_level]) @@ -694,7 +694,6 @@ async def generate_openai_chat_completion(form_data: OpenAIChatCompletionForm): messages=messages, body=form_data.model_dump(), ) - logging.info(f"stream:true:{res}") if isinstance(res, str): @@ -708,19 +707,23 @@ async def generate_openai_chat_completion(form_data: OpenAIChatCompletionForm): line = line.model_dump_json() line = f"data: {line}" + elif isinstance(line, dict): + line = json.dumps(line) + line = f"data: {line}" + try: line = line.decode("utf-8") + logging.info(f"stream_content:Generator:{line}") + + if line.startswith("data:"): + yield f"{line}\n\n" + else: + line = stream_message_template(form_data.model, line) + yield f"data: {json.dumps(line)}\n\n" + except: pass - logging.info(f"stream_content:Generator:{line}") - - if line.startswith("data:"): - yield f"{line}\n\n" - else: - line = stream_message_template(form_data.model, line) - yield f"data: {json.dumps(line)}\n\n" - if isinstance(res, str) or isinstance(res, Generator): finish_message = { "id": f"{form_data.model}-{str(uuid.uuid4())}", From 62f98df9118084fa933aea2d7c0f26c33d464799 Mon Sep 17 00:00:00 2001 From: Timothy Jaeryang Baek Date: Sun, 13 Apr 2025 22:13:06 -0700 Subject: [PATCH 40/41] fix --- main.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/main.py b/main.py index eb82c08..5be7cfb 100644 --- a/main.py +++ b/main.py @@ -703,6 +703,7 @@ async def generate_openai_chat_completion(form_data: OpenAIChatCompletionForm): if isinstance(res, Iterator): for line in res: + print(line) if isinstance(line, BaseModel): line = line.model_dump_json() line = f"data: {line}" @@ -714,16 +715,15 @@ async def generate_openai_chat_completion(form_data: OpenAIChatCompletionForm): try: line = line.decode("utf-8") logging.info(f"stream_content:Generator:{line}") - - if line.startswith("data:"): - yield f"{line}\n\n" - else: - line = stream_message_template(form_data.model, line) - yield f"data: {json.dumps(line)}\n\n" - except: pass + if isinstance(line, str) and line.startswith("data:"): + yield f"{line}\n\n" + else: + line = stream_message_template(form_data.model, line) + yield f"data: {json.dumps(line)}\n\n" + if isinstance(res, str) or isinstance(res, Generator): finish_message = { "id": f"{form_data.model}-{str(uuid.uuid4())}", From 8e98ea0352e2765cf2655fd7bc77710fd141d724 Mon Sep 17 00:00:00 2001 From: Timothy Jaeryang Baek Date: Sun, 13 Apr 2025 22:15:19 -0700 Subject: [PATCH 41/41] refac --- main.py | 1 - 1 file changed, 1 deletion(-) diff --git a/main.py b/main.py index 5be7cfb..e277d3a 100644 --- a/main.py +++ b/main.py @@ -703,7 +703,6 @@ async def generate_openai_chat_completion(form_data: OpenAIChatCompletionForm): if isinstance(res, Iterator): for line in res: - print(line) if isinstance(line, BaseModel): line = line.model_dump_json() line = f"data: {line}"