mirror of
https://github.com/open-webui/pipelines
synced 2025-06-26 18:15:58 +00:00
Merge branch 'open-webui:main' into main
This commit is contained in:
@@ -1,8 +1,8 @@
|
||||
"""
|
||||
title: Langfuse Filter Pipeline
|
||||
author: open-webui
|
||||
date: 2024-09-27
|
||||
version: 1.4
|
||||
date: 2025-03-28
|
||||
version: 1.7
|
||||
license: MIT
|
||||
description: A filter pipeline that uses Langfuse.
|
||||
requirements: langfuse
|
||||
@@ -11,13 +11,16 @@ requirements: langfuse
|
||||
from typing import List, Optional
|
||||
import os
|
||||
import uuid
|
||||
import json
|
||||
|
||||
from utils.pipelines.main import get_last_assistant_message
|
||||
from pydantic import BaseModel
|
||||
from langfuse import Langfuse
|
||||
from langfuse.api.resources.commons.errors.unauthorized_error import UnauthorizedError
|
||||
|
||||
|
||||
def get_last_assistant_message_obj(messages: List[dict]) -> dict:
|
||||
"""Retrieve the last assistant message from the message list."""
|
||||
for message in reversed(messages):
|
||||
if message["role"] == "assistant":
|
||||
return message
|
||||
@@ -31,31 +34,55 @@ class Pipeline:
|
||||
secret_key: str
|
||||
public_key: str
|
||||
host: str
|
||||
# New valve that controls whether task names are added as tags:
|
||||
insert_tags: bool = True
|
||||
# New valve that controls whether to use model name instead of model ID for generation
|
||||
use_model_name_instead_of_id_for_generation: bool = False
|
||||
debug: bool = False
|
||||
|
||||
def __init__(self):
|
||||
self.type = "filter"
|
||||
self.name = "Langfuse Filter"
|
||||
|
||||
self.valves = self.Valves(
|
||||
**{
|
||||
"pipelines": ["*"],
|
||||
"secret_key": os.getenv("LANGFUSE_SECRET_KEY", "your-secret-key-here"),
|
||||
"public_key": os.getenv("LANGFUSE_PUBLIC_KEY", "your-public-key-here"),
|
||||
"host": os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com"),
|
||||
"use_model_name_instead_of_id_for_generation": os.getenv("USE_MODEL_NAME", "false").lower() == "true",
|
||||
"debug": os.getenv("DEBUG_MODE", "false").lower() == "true",
|
||||
}
|
||||
)
|
||||
|
||||
self.langfuse = None
|
||||
self.chat_traces = {}
|
||||
self.chat_generations = {}
|
||||
self.suppressed_logs = set()
|
||||
# Dictionary to store model names for each chat
|
||||
self.model_names = {}
|
||||
|
||||
# Only these tasks will be treated as LLM "generations":
|
||||
self.GENERATION_TASKS = {"llm_response"}
|
||||
|
||||
def log(self, message: str, suppress_repeats: bool = False):
|
||||
if self.valves.debug:
|
||||
if suppress_repeats:
|
||||
if message in self.suppressed_logs:
|
||||
return
|
||||
self.suppressed_logs.add(message)
|
||||
print(f"[DEBUG] {message}")
|
||||
|
||||
async def on_startup(self):
|
||||
print(f"on_startup:{__name__}")
|
||||
self.log(f"on_startup triggered for {__name__}")
|
||||
self.set_langfuse()
|
||||
|
||||
async def on_shutdown(self):
|
||||
print(f"on_shutdown:{__name__}")
|
||||
self.langfuse.flush()
|
||||
self.log(f"on_shutdown triggered for {__name__}")
|
||||
if self.langfuse:
|
||||
self.langfuse.flush()
|
||||
|
||||
async def on_valves_updated(self):
|
||||
self.log("Valves updated, resetting Langfuse client.")
|
||||
self.set_langfuse()
|
||||
|
||||
def set_langfuse(self):
|
||||
@@ -64,78 +91,168 @@ class Pipeline:
|
||||
secret_key=self.valves.secret_key,
|
||||
public_key=self.valves.public_key,
|
||||
host=self.valves.host,
|
||||
debug=False,
|
||||
debug=self.valves.debug,
|
||||
)
|
||||
self.langfuse.auth_check()
|
||||
self.log("Langfuse client initialized successfully.")
|
||||
except UnauthorizedError:
|
||||
print(
|
||||
"Langfuse credentials incorrect. Please re-enter your Langfuse credentials in the pipeline settings."
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"Langfuse error: {e} Please re-enter your Langfuse credentials in the pipeline settings.")
|
||||
print(
|
||||
f"Langfuse error: {e} Please re-enter your Langfuse credentials in the pipeline settings."
|
||||
)
|
||||
|
||||
def _build_tags(self, task_name: str) -> list:
|
||||
"""
|
||||
Builds a list of tags based on valve settings, ensuring we always add
|
||||
'open-webui' and skip user_response / llm_response from becoming tags themselves.
|
||||
"""
|
||||
tags_list = []
|
||||
if self.valves.insert_tags:
|
||||
# Always add 'open-webui'
|
||||
tags_list.append("open-webui")
|
||||
# Add the task_name if it's not one of the excluded defaults
|
||||
if task_name not in ["user_response", "llm_response"]:
|
||||
tags_list.append(task_name)
|
||||
return tags_list
|
||||
|
||||
async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
|
||||
print(f"inlet:{__name__}")
|
||||
print(f"Received body: {body}")
|
||||
print(f"User: {user}")
|
||||
if self.valves.debug:
|
||||
print(f"[DEBUG] Received request: {json.dumps(body, indent=2)}")
|
||||
|
||||
# Check for presence of required keys and generate chat_id if missing
|
||||
if "chat_id" not in body.get("metadata", {}):
|
||||
unique_id = f"SYSTEM MESSAGE {uuid.uuid4()}"
|
||||
# Ensure the metadata key exists before assigning chat_id
|
||||
if "metadata" not in body:
|
||||
body["metadata"] = {} # Correct this indentation
|
||||
body["metadata"]["chat_id"] = unique_id
|
||||
print(f"chat_id was missing, set to: {unique_id}")
|
||||
self.log(f"Inlet function called with body: {body} and user: {user}")
|
||||
|
||||
metadata = body.get("metadata", {})
|
||||
chat_id = metadata.get("chat_id", str(uuid.uuid4()))
|
||||
metadata["chat_id"] = chat_id
|
||||
body["metadata"] = metadata
|
||||
|
||||
# Extract and store both model name and ID if available
|
||||
model_info = metadata.get("model", {})
|
||||
model_id = body.get("model")
|
||||
|
||||
# Store model information for this chat
|
||||
if chat_id not in self.model_names:
|
||||
self.model_names[chat_id] = {"id": model_id}
|
||||
else:
|
||||
self.model_names[chat_id]["id"] = model_id
|
||||
|
||||
if isinstance(model_info, dict) and "name" in model_info:
|
||||
self.model_names[chat_id]["name"] = model_info["name"]
|
||||
self.log(f"Stored model info - name: '{model_info['name']}', id: '{model_id}' for chat_id: {chat_id}")
|
||||
|
||||
required_keys = ["model", "messages"]
|
||||
missing_keys = [key for key in required_keys if key not in body]
|
||||
|
||||
if missing_keys:
|
||||
error_message = f"Error: Missing keys in the request body: {', '.join(missing_keys)}"
|
||||
print(error_message)
|
||||
self.log(error_message)
|
||||
raise ValueError(error_message)
|
||||
|
||||
user_id = user.get("id") if user else None
|
||||
user_name = user.get("name") if user else None
|
||||
user_email = user.get("email") if user else None
|
||||
# Defaulting to 'user_response' if no task is provided
|
||||
task_name = metadata.get("task", "user_response")
|
||||
|
||||
trace = self.langfuse.trace(
|
||||
name=f"filter:{__name__}",
|
||||
input=body,
|
||||
user_id=user_email,
|
||||
metadata={"user_name": user_name, "user_id": user_id,"chat_id": body["metadata"]["chat_id"]},
|
||||
session_id=body["metadata"]["chat_id"],
|
||||
)
|
||||
# Build tags
|
||||
tags_list = self._build_tags(task_name)
|
||||
|
||||
generation = trace.generation(
|
||||
name=body["metadata"]["chat_id"],
|
||||
model=body["model"],
|
||||
input=body["messages"],
|
||||
metadata={"interface": "open-webui"},
|
||||
)
|
||||
if chat_id not in self.chat_traces:
|
||||
self.log(f"Creating new trace for chat_id: {chat_id}")
|
||||
|
||||
self.chat_traces[body["metadata"]["chat_id"]] = trace
|
||||
self.chat_generations[body["metadata"]["chat_id"]] = generation
|
||||
trace_payload = {
|
||||
"name": f"chat:{chat_id}",
|
||||
"input": body,
|
||||
"user_id": user_email,
|
||||
"metadata": metadata,
|
||||
"session_id": chat_id,
|
||||
}
|
||||
|
||||
if tags_list:
|
||||
trace_payload["tags"] = tags_list
|
||||
|
||||
if self.valves.debug:
|
||||
print(f"[DEBUG] Langfuse trace request: {json.dumps(trace_payload, indent=2)}")
|
||||
|
||||
trace = self.langfuse.trace(**trace_payload)
|
||||
self.chat_traces[chat_id] = trace
|
||||
else:
|
||||
trace = self.chat_traces[chat_id]
|
||||
self.log(f"Reusing existing trace for chat_id: {chat_id}")
|
||||
if tags_list:
|
||||
trace.update(tags=tags_list)
|
||||
|
||||
# Update metadata with type
|
||||
metadata["type"] = task_name
|
||||
metadata["interface"] = "open-webui"
|
||||
|
||||
# If it's a task that is considered an LLM generation
|
||||
if task_name in self.GENERATION_TASKS:
|
||||
# Determine which model value to use based on the use_model_name valve
|
||||
model_id = self.model_names.get(chat_id, {}).get("id", body["model"])
|
||||
model_name = self.model_names.get(chat_id, {}).get("name", "unknown")
|
||||
|
||||
# Pick primary model identifier based on valve setting
|
||||
model_value = model_name if self.valves.use_model_name_instead_of_id_for_generation else model_id
|
||||
|
||||
# Add both values to metadata regardless of valve setting
|
||||
metadata["model_id"] = model_id
|
||||
metadata["model_name"] = model_name
|
||||
|
||||
generation_payload = {
|
||||
"name": f"{task_name}:{str(uuid.uuid4())}",
|
||||
"model": model_value,
|
||||
"input": body["messages"],
|
||||
"metadata": metadata,
|
||||
}
|
||||
if tags_list:
|
||||
generation_payload["tags"] = tags_list
|
||||
|
||||
if self.valves.debug:
|
||||
print(f"[DEBUG] Langfuse generation request: {json.dumps(generation_payload, indent=2)}")
|
||||
|
||||
trace.generation(**generation_payload)
|
||||
else:
|
||||
# Otherwise, log it as an event
|
||||
event_payload = {
|
||||
"name": f"{task_name}:{str(uuid.uuid4())}",
|
||||
"metadata": metadata,
|
||||
"input": body["messages"],
|
||||
}
|
||||
if tags_list:
|
||||
event_payload["tags"] = tags_list
|
||||
|
||||
if self.valves.debug:
|
||||
print(f"[DEBUG] Langfuse event request: {json.dumps(event_payload, indent=2)}")
|
||||
|
||||
trace.event(**event_payload)
|
||||
|
||||
return body
|
||||
|
||||
async def outlet(self, body: dict, user: Optional[dict] = None) -> dict:
|
||||
print(f"outlet:{__name__}")
|
||||
print(f"Received body: {body}")
|
||||
if body["chat_id"] not in self.chat_generations or body["chat_id"] not in self.chat_traces:
|
||||
return body
|
||||
self.log(f"Outlet function called with body: {body}")
|
||||
|
||||
chat_id = body.get("chat_id")
|
||||
metadata = body.get("metadata", {})
|
||||
# Defaulting to 'llm_response' if no task is provided
|
||||
task_name = metadata.get("task", "llm_response")
|
||||
|
||||
# Build tags
|
||||
tags_list = self._build_tags(task_name)
|
||||
|
||||
if chat_id not in self.chat_traces:
|
||||
self.log(f"[WARNING] No matching trace found for chat_id: {chat_id}, attempting to re-register.")
|
||||
# Re-run inlet to register if somehow missing
|
||||
return await self.inlet(body, user)
|
||||
|
||||
trace = self.chat_traces[chat_id]
|
||||
|
||||
trace = self.chat_traces[body["chat_id"]]
|
||||
generation = self.chat_generations[body["chat_id"]]
|
||||
assistant_message = get_last_assistant_message(body["messages"])
|
||||
|
||||
|
||||
# Extract usage information for models that support it
|
||||
usage = None
|
||||
assistant_message_obj = get_last_assistant_message_obj(body["messages"])
|
||||
|
||||
usage = None
|
||||
if assistant_message_obj:
|
||||
info = assistant_message_obj.get("info", {})
|
||||
info = assistant_message_obj.get("usage", {})
|
||||
if isinstance(info, dict):
|
||||
input_tokens = info.get("prompt_eval_count") or info.get("prompt_tokens")
|
||||
output_tokens = info.get("eval_count") or info.get("completion_tokens")
|
||||
@@ -145,19 +262,60 @@ class Pipeline:
|
||||
"output": output_tokens,
|
||||
"unit": "TOKENS",
|
||||
}
|
||||
self.log(f"Usage data extracted: {usage}")
|
||||
|
||||
# Update generation
|
||||
trace.update(
|
||||
output=assistant_message,
|
||||
)
|
||||
generation.end(
|
||||
output=assistant_message,
|
||||
metadata={"interface": "open-webui"},
|
||||
usage=usage,
|
||||
)
|
||||
# Update the trace output with the last assistant message
|
||||
trace.update(output=assistant_message)
|
||||
|
||||
# Clean up the chat_generations dictionary
|
||||
del self.chat_traces[body["chat_id"]]
|
||||
del self.chat_generations[body["chat_id"]]
|
||||
metadata["type"] = task_name
|
||||
metadata["interface"] = "open-webui"
|
||||
|
||||
if task_name in self.GENERATION_TASKS:
|
||||
# Determine which model value to use based on the use_model_name valve
|
||||
model_id = self.model_names.get(chat_id, {}).get("id", body.get("model"))
|
||||
model_name = self.model_names.get(chat_id, {}).get("name", "unknown")
|
||||
|
||||
# Pick primary model identifier based on valve setting
|
||||
model_value = model_name if self.valves.use_model_name_instead_of_id_for_generation else model_id
|
||||
|
||||
# Add both values to metadata regardless of valve setting
|
||||
metadata["model_id"] = model_id
|
||||
metadata["model_name"] = model_name
|
||||
|
||||
# If it's an LLM generation
|
||||
generation_payload = {
|
||||
"name": f"{task_name}:{str(uuid.uuid4())}",
|
||||
"model": model_value, # <-- Use model name or ID based on valve setting
|
||||
"input": body["messages"],
|
||||
"metadata": metadata,
|
||||
"usage": usage,
|
||||
}
|
||||
if tags_list:
|
||||
generation_payload["tags"] = tags_list
|
||||
|
||||
if self.valves.debug:
|
||||
print(f"[DEBUG] Langfuse generation end request: {json.dumps(generation_payload, indent=2)}")
|
||||
|
||||
trace.generation().end(**generation_payload)
|
||||
self.log(f"Generation ended for chat_id: {chat_id}")
|
||||
else:
|
||||
# Otherwise log as an event
|
||||
event_payload = {
|
||||
"name": f"{task_name}:{str(uuid.uuid4())}",
|
||||
"metadata": metadata,
|
||||
"input": body["messages"],
|
||||
}
|
||||
if usage:
|
||||
# If you want usage on event as well
|
||||
event_payload["metadata"]["usage"] = usage
|
||||
|
||||
if tags_list:
|
||||
event_payload["tags"] = tags_list
|
||||
|
||||
if self.valves.debug:
|
||||
print(f"[DEBUG] Langfuse event end request: {json.dumps(event_payload, indent=2)}")
|
||||
|
||||
trace.event(**event_payload)
|
||||
self.log(f"Event logged for chat_id: {chat_id}")
|
||||
|
||||
return body
|
||||
|
||||
274
examples/filters/opik_filter_pipeline.py
Normal file
274
examples/filters/opik_filter_pipeline.py
Normal file
@@ -0,0 +1,274 @@
|
||||
"""
|
||||
title: Opik Filter Pipeline
|
||||
author: open-webui
|
||||
date: 2025-03-12
|
||||
version: 1.0
|
||||
license: MIT
|
||||
description: A filter pipeline that uses Opik for LLM observability.
|
||||
requirements: opik
|
||||
"""
|
||||
|
||||
from typing import List, Optional
|
||||
import os
|
||||
import uuid
|
||||
import json
|
||||
|
||||
from pydantic import BaseModel
|
||||
from opik import Opik
|
||||
|
||||
|
||||
def get_last_assistant_message_obj(messages: List[dict]) -> dict:
|
||||
for message in reversed(messages):
|
||||
if message["role"] == "assistant":
|
||||
return message
|
||||
return {}
|
||||
|
||||
|
||||
class Pipeline:
|
||||
class Valves(BaseModel):
|
||||
pipelines: List[str] = []
|
||||
priority: int = 0
|
||||
api_key: Optional[str] = None
|
||||
workspace: str
|
||||
project_name: str
|
||||
host: str
|
||||
debug: bool = False
|
||||
|
||||
def __init__(self):
|
||||
self.type = "filter"
|
||||
self.name = "Opik Filter"
|
||||
|
||||
self.valves = self.Valves(
|
||||
**{
|
||||
"pipelines": ["*"],
|
||||
"api_key": os.getenv("OPIK_API_KEY", "set_me_for_opik_cloud"),
|
||||
"workspace": os.getenv("OPIK_WORKSPACE", "default"),
|
||||
"project_name": os.getenv("OPIK_PROJECT_NAME", "default"),
|
||||
"host": os.getenv(
|
||||
"OPIK_URL_OVERRIDE", "https://www.comet.com/opik/api"
|
||||
),
|
||||
"debug": os.getenv("DEBUG_MODE", "false").lower() == "true",
|
||||
}
|
||||
)
|
||||
|
||||
self.opik = None
|
||||
# Keep track of the trace and the last-created span for each chat_id
|
||||
self.chat_traces = {}
|
||||
self.chat_spans = {}
|
||||
|
||||
self.suppressed_logs = set()
|
||||
|
||||
def log(self, message: str, suppress_repeats: bool = False):
|
||||
"""Logs messages to the terminal if debugging is enabled."""
|
||||
if self.valves.debug:
|
||||
if suppress_repeats:
|
||||
if message in self.suppressed_logs:
|
||||
return
|
||||
self.suppressed_logs.add(message)
|
||||
print(f"[DEBUG] {message}")
|
||||
|
||||
async def on_startup(self):
|
||||
self.log(f"on_startup triggered for {__name__}")
|
||||
self.set_opik()
|
||||
|
||||
async def on_shutdown(self):
|
||||
self.log(f"on_shutdown triggered for {__name__}")
|
||||
if self.opik:
|
||||
self.opik.end()
|
||||
|
||||
async def on_valves_updated(self):
|
||||
self.log("Valves updated, resetting Opik client.")
|
||||
if self.opik:
|
||||
self.opik.end()
|
||||
self.set_opik()
|
||||
|
||||
def set_opik(self):
|
||||
try:
|
||||
self.opik = Opik(
|
||||
project_name=self.valves.project_name,
|
||||
workspace=self.valves.workspace,
|
||||
host=self.valves.host,
|
||||
api_key=self.valves.api_key,
|
||||
)
|
||||
self.opik.auth_check()
|
||||
self.log("Opik client initialized successfully.")
|
||||
except Exception as e:
|
||||
print(
|
||||
f"Opik error: {e} Please re-enter your Opik credentials in the pipeline settings."
|
||||
)
|
||||
|
||||
async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
|
||||
"""
|
||||
Inlet handles the incoming request (usually a user message).
|
||||
- If no trace exists yet for this chat_id, we create a new trace.
|
||||
- If a trace does exist, we simply create a new span for the new user message.
|
||||
"""
|
||||
if self.valves.debug:
|
||||
print(f"[DEBUG] Received request: {json.dumps(body, indent=2)}")
|
||||
|
||||
self.log(f"Inlet function called with body: {body} and user: {user}")
|
||||
|
||||
metadata = body.get("metadata", {})
|
||||
task = metadata.get("task", "")
|
||||
|
||||
# Skip logging tasks for now
|
||||
if task:
|
||||
self.log(f"Skipping {task} task.")
|
||||
return body
|
||||
|
||||
if "chat_id" not in metadata:
|
||||
chat_id = str(uuid.uuid4()) # Regular chat messages
|
||||
self.log(f"Assigned normal chat_id: {chat_id}")
|
||||
|
||||
metadata["chat_id"] = chat_id
|
||||
body["metadata"] = metadata
|
||||
else:
|
||||
chat_id = metadata["chat_id"]
|
||||
|
||||
required_keys = ["model", "messages"]
|
||||
missing_keys = [key for key in required_keys if key not in body]
|
||||
if missing_keys:
|
||||
error_message = (
|
||||
f"Error: Missing keys in the request body: {', '.join(missing_keys)}"
|
||||
)
|
||||
self.log(error_message)
|
||||
raise ValueError(error_message)
|
||||
|
||||
user_email = user.get("email") if user else None
|
||||
|
||||
assert chat_id not in self.chat_traces, (
|
||||
f"There shouldn't be a trace already exists for chat_id {chat_id}"
|
||||
)
|
||||
|
||||
# Create a new trace and span
|
||||
self.log(f"Creating new chat trace for chat_id: {chat_id}")
|
||||
|
||||
# Body copy for traces and span
|
||||
trace_body = body.copy()
|
||||
span_body = body.copy()
|
||||
|
||||
# Extract metadata from body
|
||||
metadata = trace_body.pop("metadata", {})
|
||||
metadata.update({"chat_id": chat_id, "user_id": user_email})
|
||||
|
||||
# We don't need the model at the trace level
|
||||
trace_body.pop("model", None)
|
||||
|
||||
trace_payload = {
|
||||
"name": f"{__name__}",
|
||||
"input": trace_body,
|
||||
"metadata": metadata,
|
||||
"thread_id": chat_id,
|
||||
}
|
||||
|
||||
if self.valves.debug:
|
||||
print(f"[DEBUG] Opik trace request: {json.dumps(trace_payload, indent=2)}")
|
||||
|
||||
trace = self.opik.trace(**trace_payload)
|
||||
|
||||
span_metadata = metadata.copy()
|
||||
span_metadata.update({"interface": "open-webui"})
|
||||
|
||||
# Extract the model from body
|
||||
span_body.pop("model", None)
|
||||
# We don't need the metadata in the input for the span
|
||||
span_body.pop("metadata", None)
|
||||
|
||||
# Extract the model and provider from metadata
|
||||
model = span_metadata.get("model", {}).get("id", None)
|
||||
provider = span_metadata.get("model", {}).get("owned_by", None)
|
||||
|
||||
span_payload = {
|
||||
"name": chat_id,
|
||||
"model": model,
|
||||
"provider": provider,
|
||||
"input": span_body,
|
||||
"metadata": span_metadata,
|
||||
"type": "llm",
|
||||
}
|
||||
|
||||
if self.valves.debug:
|
||||
print(f"[DEBUG] Opik span request: {json.dumps(span_payload, indent=2)}")
|
||||
|
||||
span = trace.span(**span_payload)
|
||||
|
||||
self.chat_traces[chat_id] = trace
|
||||
self.chat_spans[chat_id] = span
|
||||
self.log(f"Trace and span objects successfully created for chat_id: {chat_id}")
|
||||
|
||||
return body
|
||||
|
||||
async def outlet(self, body: dict, user: Optional[dict] = None) -> dict:
|
||||
"""
|
||||
Outlet handles the response body (usually the assistant message).
|
||||
It will finalize/end the span created for the user request.
|
||||
"""
|
||||
self.log(f"Outlet function called with body: {body}")
|
||||
|
||||
chat_id = body.get("chat_id")
|
||||
|
||||
# If no trace or span exist, attempt to register again
|
||||
if chat_id not in self.chat_traces or chat_id not in self.chat_spans:
|
||||
self.log(
|
||||
f"[WARNING] No matching chat trace found for chat_id: {chat_id}, chat won't be logged."
|
||||
)
|
||||
return body
|
||||
|
||||
trace = self.chat_traces[chat_id]
|
||||
span = self.chat_spans[chat_id]
|
||||
|
||||
# Body copy for traces and span
|
||||
trace_body = body.copy()
|
||||
span_body = body.copy()
|
||||
|
||||
# Get the last assistant message from the conversation
|
||||
assistant_message_obj = get_last_assistant_message_obj(body["messages"])
|
||||
|
||||
# Extract usage if available
|
||||
usage = None
|
||||
self.log(f"Assistant message obj: {assistant_message_obj}")
|
||||
if assistant_message_obj:
|
||||
message_usage = assistant_message_obj.get("usage", {})
|
||||
if isinstance(message_usage, dict):
|
||||
input_tokens = message_usage.get(
|
||||
"prompt_eval_count"
|
||||
) or message_usage.get("prompt_tokens")
|
||||
output_tokens = message_usage.get("eval_count") or message_usage.get(
|
||||
"completion_tokens"
|
||||
)
|
||||
if input_tokens is not None and output_tokens is not None:
|
||||
usage = {
|
||||
"prompt_tokens": input_tokens,
|
||||
"completion_tokens": output_tokens,
|
||||
"total_tokens": input_tokens + output_tokens,
|
||||
}
|
||||
self.log(f"Usage data extracted: {usage}")
|
||||
|
||||
# Chat_id is already logged as trace thread
|
||||
span_body.pop("chat_id", None)
|
||||
|
||||
# End the span with the final assistant message and updated conversation
|
||||
span_payload = {
|
||||
"output": span_body, # include the entire conversation
|
||||
"usage": usage,
|
||||
}
|
||||
|
||||
if self.valves.debug:
|
||||
print(
|
||||
f"[DEBUG] Opik span end request: {json.dumps(span_payload, indent=2)}"
|
||||
)
|
||||
|
||||
span.end(**span_payload)
|
||||
self.log(f"span ended for chat_id: {chat_id}")
|
||||
|
||||
# Chat_id is already logged as trace thread
|
||||
span_body.pop("chat_id", None)
|
||||
|
||||
# Optionally update the trace with the final assistant output
|
||||
trace.end(output=trace_body)
|
||||
|
||||
# Force the creation of a new trace and span for the next chat even if they are part of the same thread
|
||||
del self.chat_traces[chat_id]
|
||||
del self.chat_spans[chat_id]
|
||||
|
||||
return body
|
||||
83
examples/pipelines/events_pipeline.py
Normal file
83
examples/pipelines/events_pipeline.py
Normal file
@@ -0,0 +1,83 @@
|
||||
from typing import List, Union, Generator, Iterator, Optional
|
||||
from pprint import pprint
|
||||
import time
|
||||
|
||||
# Uncomment to disable SSL verification warnings if needed.
|
||||
# warnings.filterwarnings('ignore', message='Unverified HTTPS request')
|
||||
|
||||
|
||||
class Pipeline:
|
||||
def __init__(self):
|
||||
self.name = "Pipeline with Status Event"
|
||||
self.description = (
|
||||
"This is a pipeline that demonstrates how to use the status event."
|
||||
)
|
||||
self.debug = True
|
||||
self.version = "0.1.0"
|
||||
self.author = "Anthony Durussel"
|
||||
|
||||
async def on_startup(self):
|
||||
# This function is called when the server is started.
|
||||
print(f"on_startup: {__name__}")
|
||||
pass
|
||||
|
||||
async def on_shutdown(self):
|
||||
# This function is called when the server is shutdown.
|
||||
print(f"on_shutdown: {__name__}")
|
||||
pass
|
||||
|
||||
async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
|
||||
# This function is called before the OpenAI API request is made. You can modify the form data before it is sent to the OpenAI API.
|
||||
print(f"inlet: {__name__}")
|
||||
if self.debug:
|
||||
print(f"inlet: {__name__} - body:")
|
||||
pprint(body)
|
||||
print(f"inlet: {__name__} - user:")
|
||||
pprint(user)
|
||||
return body
|
||||
|
||||
async def outlet(self, body: dict, user: Optional[dict] = None) -> dict:
|
||||
# This function is called after the OpenAI API response is completed. You can modify the messages after they are received from the OpenAI API.
|
||||
print(f"outlet: {__name__}")
|
||||
if self.debug:
|
||||
print(f"outlet: {__name__} - body:")
|
||||
pprint(body)
|
||||
print(f"outlet: {__name__} - user:")
|
||||
pprint(user)
|
||||
return body
|
||||
|
||||
def pipe(
|
||||
self,
|
||||
user_message: str,
|
||||
model_id: str,
|
||||
messages: List[dict],
|
||||
body: dict,
|
||||
) -> Union[str, Generator, Iterator]:
|
||||
print(f"pipe: {__name__}")
|
||||
|
||||
if self.debug:
|
||||
print(f"pipe: {__name__} - received message from user: {user_message}")
|
||||
|
||||
yield {
|
||||
"event": {
|
||||
"type": "status",
|
||||
"data": {
|
||||
"description": "Fake Status",
|
||||
"done": False,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
time.sleep(5) # Sleep for 5 seconds
|
||||
|
||||
yield f"user_message: {user_message}"
|
||||
|
||||
yield {
|
||||
"event": {
|
||||
"type": "status",
|
||||
"data": {
|
||||
"description": "",
|
||||
"done": True,
|
||||
},
|
||||
}
|
||||
}
|
||||
428
examples/pipelines/integrations/flowise_pipeline.py
Normal file
428
examples/pipelines/integrations/flowise_pipeline.py
Normal file
@@ -0,0 +1,428 @@
|
||||
"""
|
||||
title: FlowiseAI Integration
|
||||
author: Eric Zavesky
|
||||
author_url: https://github.com/ezavesky
|
||||
git_url: https://github.com/open-webui/pipelines/
|
||||
description: Access FlowiseAI endpoints via chat integration
|
||||
required_open_webui_version: 0.4.3
|
||||
requirements: requests,flowise>=1.0.4
|
||||
version: 0.4.3
|
||||
licence: MIT
|
||||
"""
|
||||
|
||||
from typing import List, Union, Generator, Iterator, Dict, Optional
|
||||
from pydantic import BaseModel, Field
|
||||
import requests
|
||||
import os
|
||||
import re
|
||||
import json
|
||||
from datetime import datetime
|
||||
import time
|
||||
from flowise import Flowise, PredictionData
|
||||
|
||||
from logging import getLogger
|
||||
logger = getLogger(__name__)
|
||||
logger.setLevel("DEBUG")
|
||||
|
||||
|
||||
class Pipeline:
|
||||
class Valves(BaseModel):
|
||||
FLOWISE_API_KEY: str = Field(default="", description="FlowiseAI API key (from Bearer key, e.g. QMknVTFTB40Pk23n6KIVRgdB7va2o-Xlx73zEfpeOu0)")
|
||||
FLOWISE_BASE_URL: str = Field(default="", description="FlowiseAI base URL (e.g. http://localhost:3000 (URL before '/api/v1/prediction'))")
|
||||
RATE_LIMIT: int = Field(default=5, description="Rate limit for the pipeline (ops/minute)")
|
||||
|
||||
FLOW_0_ENABLED: Optional[bool] = Field(default=False, description="Flow 0 Enabled (make this flow available for use)")
|
||||
FLOW_0_ID: Optional[str] = Field(default=None, description="Flow 0 ID (the flow GUID, e.g. b06d97f5-da14-4d29-81bd-8533261b6c88)")
|
||||
FLOW_0_NAME: Optional[str] = Field(default=None, description="Flow 0 Name (human-readable flow name, no special characters, e.g. news or stock-reader)")
|
||||
|
||||
FLOW_1_ENABLED: Optional[bool] = Field(default=False, description="Flow 1 Enabled (make this flow available for use)")
|
||||
FLOW_1_ID: Optional[str] = Field(default=None, description="Flow 1 ID (the flow GUID, e.g. b06d97f5-da14-4d29-81bd-8533261b6c88)")
|
||||
FLOW_1_NAME: Optional[str] = Field(default=None, description="Flow 1 Name (human-readable flwo name, no special characters, e.g. news or stock-reader)")
|
||||
|
||||
FLOW_2_ENABLED: Optional[bool] = Field(default=False, description="Flow 2 Enabled (make this flow available for use)")
|
||||
FLOW_2_ID: Optional[str] = Field(default=None, description="Flow 2 ID (the flow GUID, e.g. b06d97f5-da14-4d29-81bd-8533261b6c88)")
|
||||
FLOW_2_NAME: Optional[str] = Field(default=None, description="Flow 2 Name (human-readable flow name, no special characters, e.g. news or stock-reader)")
|
||||
|
||||
FLOW_3_ENABLED: Optional[bool] = Field(default=False, description="Flow 3 Enabled (make this flow available for use)")
|
||||
FLOW_3_ID: Optional[str] = Field(default=None, description="Flow 3 ID (the flow GUID, e.g. b06d97f5-da14-4d29-81bd-8533261b6c88)")
|
||||
FLOW_3_NAME: Optional[str] = Field(default=None, description="Flow 3 Name (human-readable flow name, no special characters, e.g. news or stock-reader)")
|
||||
|
||||
FLOW_4_ENABLED: Optional[bool] = Field(default=False, description="Flow 4 Enabled (make this flow available for use)")
|
||||
FLOW_4_ID: Optional[str] = Field(default=None, description="Flow 4 ID (the flow GUID, e.g. b06d97f5-da14-4d29-81bd-8533261b6c88)")
|
||||
FLOW_4_NAME: Optional[str] = Field(default=None, description="Flow 4 Name (human-readable flow name, no special characters, e.g. news or stock-reader)")
|
||||
|
||||
FLOW_5_ENABLED: Optional[bool] = Field(default=False, description="Flow 5 Enabled (make this flow available for use)")
|
||||
FLOW_5_ID: Optional[str] = Field(default=None, description="Flow 5 ID (the flow GUID, e.g. b06d97f5-da14-4d29-81bd-8533261b6c88)")
|
||||
FLOW_5_NAME: Optional[str] = Field(default=None, description="Flow 5 Name (human-readable flow name, no special characters, e.g. news or stock-reader)")
|
||||
|
||||
FLOW_6_ENABLED: Optional[bool] = Field(default=False, description="Flow 6 Enabled (make this flow available for use)")
|
||||
FLOW_6_ID: Optional[str] = Field(default=None, description="Flow 6 ID (the flow GUID, e.g. b06d97f5-da14-4d29-81bd-8533261b6c88)")
|
||||
FLOW_6_NAME: Optional[str] = Field(default=None, description="Flow 6 Name (human-readable flow name, no special characters, e.g. news or stock-reader)")
|
||||
|
||||
FLOW_7_ENABLED: Optional[bool] = Field(default=False, description="Flow 7 Enabled (make this flow available for use)")
|
||||
FLOW_7_ID: Optional[str] = Field(default=None, description="Flow 7 ID (the flow GUID, e.g. b06d97f5-da14-4d29-81bd-8533261b6c88)")
|
||||
FLOW_7_NAME: Optional[str] = Field(default=None, description="Flow 7 Name (human-readable flow name, no special characters, e.g. news or stock-reader)")
|
||||
|
||||
FLOW_8_ENABLED: Optional[bool] = Field(default=False, description="Flow 8 Enabled (make this flow available for use)")
|
||||
FLOW_8_ID: Optional[str] = Field(default=None, description="Flow 8 ID (the flow GUID, e.g. b06d97f5-da14-4d29-81bd-8533261b6c88)")
|
||||
FLOW_8_NAME: Optional[str] = Field(default=None, description="Flow 8 Name (human-readable flow name, no special characters, e.g. news or stock-reader)")
|
||||
|
||||
FLOW_9_ENABLED: Optional[bool] = Field(default=False, description="Flow 9 Enabled (make this flow available for use)")
|
||||
FLOW_9_ID: Optional[str] = Field(default=None, description="Flow 9 ID (the flow GUID, e.g. b06d97f5-da14-4d29-81bd-8533261b6c88)")
|
||||
FLOW_9_NAME: Optional[str] = Field(default=None, description="Flow 9 Name (human-readable flow name, no special characters, e.g. news or stock-reader)")
|
||||
|
||||
|
||||
|
||||
def __init__(self):
|
||||
self.name = "FlowiseAI Pipeline"
|
||||
|
||||
# Initialize valve parameters from environment variables
|
||||
self.valves = self.Valves(
|
||||
**{k: os.getenv(k, v.default) for k, v in self.Valves.model_fields.items()}
|
||||
)
|
||||
|
||||
# Build flow mapping for faster lookup
|
||||
self.flows = {}
|
||||
self.update_flows()
|
||||
|
||||
def get_flow_details(self, flow_id: str) -> Optional[dict]:
|
||||
"""
|
||||
Fetch flow details from the FlowiseAI API
|
||||
|
||||
Args:
|
||||
flow_id (str): The ID of the flow to fetch
|
||||
|
||||
Returns:
|
||||
Optional[dict]: Flow details if successful, None if failed
|
||||
"""
|
||||
try:
|
||||
api_url = f"{self.valves.FLOWISE_BASE_URL.rstrip('/')}/api/v1/chatflows/{flow_id}"
|
||||
headers = {"Authorization": f"Bearer {self.valves.FLOWISE_API_KEY}"}
|
||||
|
||||
response = requests.get(api_url, headers=headers)
|
||||
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
return data
|
||||
else:
|
||||
logger.error(f"Error fetching flow details: Status {response.status_code}")
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error fetching flow details: {str(e)}")
|
||||
return None
|
||||
|
||||
def update_flows(self):
|
||||
"""Update the flows dictionary based on the current valve settings"""
|
||||
self.flows = {}
|
||||
# Iterate through each flow
|
||||
for i in range(20): # Support up to 20 flows
|
||||
enabled_name = f"FLOW_{i}_ENABLED"
|
||||
if not hasattr(self.valves, enabled_name): # sequential numbering
|
||||
break
|
||||
enabled = getattr(self.valves, f"FLOW_{i}_ENABLED", False)
|
||||
flow_id = getattr(self.valves, f"FLOW_{i}_ID", None)
|
||||
flow_name = getattr(self.valves, f"FLOW_{i}_NAME", None)
|
||||
|
||||
if enabled and flow_id and flow_name:
|
||||
# Fetch flow details from API
|
||||
flow_details = self.get_flow_details(flow_id)
|
||||
api_name = flow_details.get('name', 'Unknown') if flow_details else 'Unknown'
|
||||
|
||||
# Store both names in the flows dictionary
|
||||
self.flows[flow_name.lower()] = {
|
||||
'id': flow_id,
|
||||
'brief_name': flow_name,
|
||||
'api_name': api_name
|
||||
}
|
||||
|
||||
logger.info(f"Updated flows: {[{k: v['api_name']} for k, v in self.flows.items()]}")
|
||||
|
||||
async def on_startup(self):
|
||||
"""Called when the server is started"""
|
||||
logger.debug(f"on_startup:{self.name}")
|
||||
self.update_flows()
|
||||
|
||||
async def on_shutdown(self):
|
||||
"""Called when the server is stopped"""
|
||||
logger.debug(f"on_shutdown:{self.name}")
|
||||
|
||||
async def on_valves_updated(self) -> None:
|
||||
"""Called when valves are updated"""
|
||||
logger.debug(f"on_valves_updated:{self.name}")
|
||||
self.update_flows()
|
||||
|
||||
def rate_check(self, dt_start: datetime) -> bool:
|
||||
"""
|
||||
Check time, sleep if not enough time has passed for rate
|
||||
|
||||
Args:
|
||||
dt_start (datetime): Start time of the operation
|
||||
Returns:
|
||||
bool: True if sleep was done
|
||||
"""
|
||||
dt_end = datetime.now()
|
||||
time_diff = (dt_end - dt_start).total_seconds()
|
||||
time_buffer = (1 / self.valves.RATE_LIMIT)
|
||||
if time_diff >= time_buffer: # no need to sleep
|
||||
return False
|
||||
time.sleep(time_buffer - time_diff)
|
||||
return True
|
||||
|
||||
def parse_user_input(self, user_message: str) -> tuple[str, str]:
|
||||
"""
|
||||
Parse the user message to extract flow name and query
|
||||
|
||||
Format expected: @flow_name: query
|
||||
|
||||
Args:
|
||||
user_message (str): User's input message
|
||||
|
||||
Returns:
|
||||
tuple[str, str]: Flow name and query
|
||||
"""
|
||||
# Match pattern flow_name: query
|
||||
pattern = r"^([^:]+):\s*(.+)$"
|
||||
match = re.match(pattern, user_message.strip())
|
||||
|
||||
if not match:
|
||||
return None, user_message
|
||||
|
||||
flow_name = match.group(1).strip().lower()
|
||||
query = match.group(2).strip()
|
||||
|
||||
date_now = datetime.now().strftime("%Y-%m-%d")
|
||||
time_now = datetime.now().strftime("%H:%M:%S")
|
||||
query = f"{query}; today's date is {date_now} and the current time is {time_now}"
|
||||
|
||||
return flow_name, query
|
||||
|
||||
def pipe(
|
||||
self,
|
||||
user_message: str,
|
||||
model_id: str,
|
||||
messages: List[dict],
|
||||
body: dict
|
||||
) -> Union[str, Generator, Iterator]:
|
||||
"""
|
||||
Main pipeline function. Calls a specified FlowiseAI flow with the provided query.
|
||||
|
||||
Format expected: @flow_name: query
|
||||
If no flow is specified, a list of available flows will be returned.
|
||||
"""
|
||||
logger.debug(f"pipe:{self.name}")
|
||||
|
||||
dt_start = datetime.now()
|
||||
streaming = body.get("stream", False)
|
||||
logger.warning(f"Stream: {streaming}")
|
||||
context = ""
|
||||
|
||||
# Check if we have valid API configuration
|
||||
if not self.valves.FLOWISE_API_KEY or not self.valves.FLOWISE_BASE_URL:
|
||||
error_msg = "FlowiseAI configuration missing. Please set FLOWISE_API_KEY and FLOWISE_BASE_URL valves."
|
||||
if streaming:
|
||||
yield error_msg
|
||||
else:
|
||||
return error_msg
|
||||
|
||||
# Parse the user message to extract flow name and query
|
||||
flow_name, query = self.parse_user_input(user_message)
|
||||
|
||||
# If no flow specified or invalid flow, list available flows
|
||||
if flow_name is None or flow_name not in self.flows:
|
||||
available_flows = list(self.flows.keys())
|
||||
if not available_flows:
|
||||
no_flows_msg = "No flows configured. Enable at least one FLOW_X_ENABLED valve and set its ID and NAME."
|
||||
if streaming:
|
||||
yield no_flows_msg
|
||||
else:
|
||||
return no_flows_msg
|
||||
|
||||
flows_list = "\n".join([f"- flow_name: {flow} (description:{self.flows[flow]['api_name']})" for flow in available_flows])
|
||||
help_msg = f"Please specify a flow using the format: <flow_name>: <your query>\n\nAvailable flows:\n{flows_list}"
|
||||
|
||||
if flow_name is None:
|
||||
help_msg = "No flow specified. " + help_msg
|
||||
else:
|
||||
help_msg = f"Invalid flow '{flow_name}'. " + help_msg
|
||||
|
||||
if streaming:
|
||||
yield help_msg
|
||||
return
|
||||
else:
|
||||
return help_msg
|
||||
|
||||
# Get the flow ID from the map
|
||||
flow_id = self.flows[flow_name]['id']
|
||||
|
||||
if streaming:
|
||||
yield from self.stream_retrieve(flow_id, flow_name, query, dt_start)
|
||||
else:
|
||||
for chunk in self.static_retrieve(flow_id, flow_name, query, dt_start):
|
||||
context += chunk
|
||||
return context if context else "No response from FlowiseAI"
|
||||
|
||||
def stream_retrieve(
|
||||
self, flow_id: str, flow_name: str, query: str, dt_start: datetime
|
||||
) -> Generator:
|
||||
"""
|
||||
Stream responses from FlowiseAI using the official client library.
|
||||
|
||||
Args:
|
||||
flow_id (str): The ID of the flow to call
|
||||
flow_name (str): The name of the flow (for logging)
|
||||
query (str): The user's query
|
||||
dt_start (datetime): Start time for rate limiting
|
||||
|
||||
Returns:
|
||||
Generator: Response chunks for streaming
|
||||
"""
|
||||
if not query:
|
||||
yield "Query is empty. Please provide a question or prompt for the flow."
|
||||
return
|
||||
|
||||
try:
|
||||
logger.info(f"Streaming from FlowiseAI flow '{flow_name}' with query: {query}")
|
||||
|
||||
# Rate limiting check
|
||||
self.rate_check(dt_start)
|
||||
|
||||
# Initialize Flowise client with API configuration
|
||||
client = Flowise(
|
||||
base_url=self.valves.FLOWISE_BASE_URL.rstrip('/'),
|
||||
api_key=self.valves.FLOWISE_API_KEY
|
||||
)
|
||||
|
||||
# Create streaming prediction request
|
||||
completion = client.create_prediction(
|
||||
PredictionData(
|
||||
chatflowId=flow_id,
|
||||
question=query,
|
||||
streaming=True
|
||||
)
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"Error streaming from FlowiseAI: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
yield error_msg
|
||||
|
||||
idx_last_update = 0
|
||||
yield f"Analysis started... {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
|
||||
|
||||
# Process each streamed chunk
|
||||
for chunk in completion:
|
||||
try:
|
||||
if isinstance(chunk, str):
|
||||
chunk = json.loads(chunk)
|
||||
except Exception as e:
|
||||
# If chunk is not a string, it's already a dictionary
|
||||
pass
|
||||
|
||||
try:
|
||||
if isinstance(chunk, dict):
|
||||
# Expected format: {event: "token", data: "content"}
|
||||
if "event" in chunk:
|
||||
if ((chunk["event"] in ["start", "update", "agentReasoning"]) and
|
||||
("data" in chunk) and (isinstance(chunk["data"], list))):
|
||||
for data_update in chunk["data"][idx_last_update:]:
|
||||
# e.g. {"event":"start","data":[{"agentName":"Perspective Explorer","messages":["...
|
||||
idx_last_update += 1
|
||||
yield "\n---\n"
|
||||
yield f"\n__Reasoning: {data_update['agentName']} ({datetime.now().strftime('%Y-%m-%d %H:%M:%S')})__\n\n"
|
||||
for message in data_update["messages"]:
|
||||
yield message # yield message for each agent update
|
||||
elif chunk["event"] == "end":
|
||||
# {"event":"end","data":"[DONE]"}
|
||||
yield "\n---\n"
|
||||
yield f"\nAnalysis complete. ({datetime.now().strftime('%Y-%m-%d %H:%M:%S')})\n\n"
|
||||
elif chunk["event"] == "token":
|
||||
# do nothing, this is the flat output of the flow (final)
|
||||
pass
|
||||
elif "error" in chunk:
|
||||
error_msg = f"Error from FlowiseAI: {chunk['error']}"
|
||||
logger.error(error_msg)
|
||||
yield error_msg
|
||||
else:
|
||||
# If chunk format is unexpected, yield as is
|
||||
yield str(chunk)
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing chunk: {str(e)}")
|
||||
yield f"\nUnusual Response Chunk: ({datetime.now().strftime('%Y-%m-%d %H:%M:%S')})\n{str(e)}\n"
|
||||
yield f"\n---\n"
|
||||
yield str(chunk)
|
||||
|
||||
return
|
||||
|
||||
def static_retrieve(
|
||||
self, flow_id: str, flow_name: str, query: str, dt_start: datetime
|
||||
) -> Generator:
|
||||
"""
|
||||
Call the FlowiseAI endpoint with the specified flow ID and query using REST API.
|
||||
|
||||
Args:
|
||||
flow_id (str): The ID of the flow to call
|
||||
flow_name (str): The name of the flow (for logging)
|
||||
query (str): The user's query
|
||||
dt_start (datetime): Start time for rate limiting
|
||||
|
||||
Returns:
|
||||
Generator: Response chunks for non-streaming requests
|
||||
"""
|
||||
if not query:
|
||||
yield "Query is empty. Please provide a question or prompt for the flow."
|
||||
return
|
||||
|
||||
api_url = f"{self.valves.FLOWISE_BASE_URL.rstrip('/')}/api/v1/prediction/{flow_id}"
|
||||
headers = {"Authorization": f"Bearer {self.valves.FLOWISE_API_KEY}"}
|
||||
|
||||
payload = {
|
||||
"question": query,
|
||||
}
|
||||
|
||||
try:
|
||||
logger.info(f"Calling FlowiseAI flow '{flow_name}' with query: {query}")
|
||||
|
||||
# Rate limiting check
|
||||
self.rate_check(dt_start)
|
||||
|
||||
response = requests.post(api_url, headers=headers, json=payload)
|
||||
|
||||
if response.status_code != 200:
|
||||
error_msg = f"Error from FlowiseAI: Status {response.status_code}"
|
||||
logger.error(f"{error_msg} - {response.text}")
|
||||
yield error_msg
|
||||
return
|
||||
|
||||
try:
|
||||
result = response.json()
|
||||
|
||||
# Format might vary based on flow configuration
|
||||
# Try common response formats
|
||||
if isinstance(result, dict):
|
||||
if "text" in result:
|
||||
yield result["text"]
|
||||
elif "answer" in result:
|
||||
yield result["answer"]
|
||||
elif "response" in result:
|
||||
yield result["response"]
|
||||
elif "result" in result:
|
||||
yield result["result"]
|
||||
else:
|
||||
# If no standard field found, return full JSON as string
|
||||
yield f"```json\n{json.dumps(result, indent=2)}\n```"
|
||||
elif isinstance(result, str):
|
||||
yield result
|
||||
else:
|
||||
yield f"```json\n{json.dumps(result, indent=2)}\n```"
|
||||
|
||||
except json.JSONDecodeError:
|
||||
# If not JSON, return the raw text
|
||||
yield response.text
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"Error calling FlowiseAI: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
yield error_msg
|
||||
|
||||
return
|
||||
28
examples/pipelines/integrations/langgraph_pipeline/README.md
Normal file
28
examples/pipelines/integrations/langgraph_pipeline/README.md
Normal file
@@ -0,0 +1,28 @@
|
||||
# Example of langgraph integration
|
||||
## Python version: 3.11
|
||||
## Feature
|
||||
1. Using langgraph stream writer and custom mode of stream to integrate langgraph with open webui pipeline.
|
||||
2. Support \<think\> block display.
|
||||
## Prerequirement
|
||||
Install the open webui pipeline.
|
||||
You can follow the docs : https://docs.openwebui.com/pipelines/#-quick-start-with-docker
|
||||
|
||||
## Usage
|
||||
### 1. Upload pipeline file
|
||||
Upload `langgraph_stream_pipeline.py` to the open webui pipeline.
|
||||
|
||||
### 2. Enable the uploaded pipeline
|
||||
Properly set up your langgraph api url.
|
||||
|
||||
And choose **"LangGraph stream"** as your model.
|
||||
|
||||
### 2. Install dependencies
|
||||
Under the folder `pipelines/examples/pipelines/integrations/langgraph_pipeline`, run command below :
|
||||
```
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
### 3. Start langgraph api server
|
||||
Run command below :
|
||||
```
|
||||
uvicorn langgraph_example:app --reload
|
||||
```
|
||||
@@ -0,0 +1,166 @@
|
||||
"""
|
||||
title: Langgraph stream integration
|
||||
author: bartonzzx
|
||||
author_url: https://github.com/bartonzzx
|
||||
git_url:
|
||||
description: Integrate langgraph with open webui pipeline
|
||||
required_open_webui_version: 0.4.3
|
||||
requirements: none
|
||||
version: 0.4.3
|
||||
licence: MIT
|
||||
"""
|
||||
|
||||
|
||||
import os
|
||||
import json
|
||||
import getpass
|
||||
from typing import Annotated, Literal
|
||||
from typing_extensions import TypedDict
|
||||
|
||||
from fastapi import FastAPI
|
||||
from fastapi.responses import StreamingResponse
|
||||
|
||||
from langgraph.graph import StateGraph, START, END
|
||||
from langgraph.graph.message import add_messages
|
||||
from langchain_openai import ChatOpenAI
|
||||
from langgraph.config import get_stream_writer
|
||||
|
||||
|
||||
'''
|
||||
Define LLM API key
|
||||
'''
|
||||
def _set_env(var: str):
|
||||
if not os.environ.get(var):
|
||||
os.environ[var] = getpass.getpass(f"{var}: ")
|
||||
|
||||
|
||||
_set_env("OPENAI_API_KEY")
|
||||
|
||||
|
||||
'''
|
||||
Define Langgraph
|
||||
'''
|
||||
def generate_custom_stream(type: Literal["think","normal"], content: str):
|
||||
content = "\n"+content+"\n"
|
||||
custom_stream_writer = get_stream_writer()
|
||||
return custom_stream_writer({type:content})
|
||||
|
||||
class State(TypedDict):
|
||||
messages: Annotated[list, add_messages]
|
||||
|
||||
llm = ChatOpenAI(model="gpt-3.5-turbo")
|
||||
|
||||
def chatbot(state: State):
|
||||
think_response = llm.invoke(["Please reasoning:"] + state["messages"])
|
||||
normal_response = llm.invoke(state["messages"])
|
||||
generate_custom_stream("think", think_response.content)
|
||||
generate_custom_stream("normal", normal_response.content)
|
||||
return {"messages": [normal_response]}
|
||||
|
||||
# Define graph
|
||||
graph_builder = StateGraph(State)
|
||||
|
||||
# Define nodes
|
||||
graph_builder.add_node("chatbot", chatbot)
|
||||
graph_builder.add_edge("chatbot", END)
|
||||
|
||||
# Define edges
|
||||
graph_builder.add_edge(START, "chatbot")
|
||||
|
||||
# Compile graph
|
||||
graph = graph_builder.compile()
|
||||
|
||||
|
||||
'''
|
||||
Define api processing
|
||||
'''
|
||||
app = FastAPI(
|
||||
title="Langgraph API",
|
||||
description="Langgraph API",
|
||||
)
|
||||
|
||||
@app.get("/test")
|
||||
async def test():
|
||||
return {"message": "Hello World"}
|
||||
|
||||
|
||||
@app.post("/stream")
|
||||
async def stream(inputs: State):
|
||||
async def event_stream():
|
||||
try:
|
||||
stream_start_msg = {
|
||||
'choices':
|
||||
[
|
||||
{
|
||||
'delta': {},
|
||||
'finish_reason': None
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
# Stream start
|
||||
yield f"data: {json.dumps(stream_start_msg)}\n\n"
|
||||
|
||||
# Processing langgraph stream response with <think> block support
|
||||
async for event in graph.astream(input=inputs, stream_mode="custom"):
|
||||
print(event)
|
||||
think_content = event.get("think", None)
|
||||
normal_content = event.get("normal", None)
|
||||
|
||||
think_msg = {
|
||||
'choices':
|
||||
[
|
||||
{
|
||||
'delta':
|
||||
{
|
||||
'reasoning_content': think_content,
|
||||
},
|
||||
'finish_reason': None
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
normal_msg = {
|
||||
'choices':
|
||||
[
|
||||
{
|
||||
'delta':
|
||||
{
|
||||
'content': normal_content,
|
||||
},
|
||||
'finish_reason': None
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
yield f"data: {json.dumps(think_msg)}\n\n"
|
||||
yield f"data: {json.dumps(normal_msg)}\n\n"
|
||||
|
||||
# End of the stream
|
||||
stream_end_msg = {
|
||||
'choices': [
|
||||
{
|
||||
'delta': {},
|
||||
'finish_reason': 'stop'
|
||||
}
|
||||
]
|
||||
}
|
||||
yield f"data: {json.dumps(stream_end_msg)}\n\n"
|
||||
|
||||
except Exception as e:
|
||||
# Simply print the error information
|
||||
print(f"An error occurred: {e}")
|
||||
|
||||
return StreamingResponse(
|
||||
event_stream(),
|
||||
media_type="text/event-stream",
|
||||
headers={
|
||||
"Cache-Control": "no-cache",
|
||||
"Connection": "keep-alive",
|
||||
}
|
||||
)
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
|
||||
uvicorn.run(app, host="0.0.0.0", port=9000)
|
||||
@@ -0,0 +1,63 @@
|
||||
"""
|
||||
title: Langgraph stream integration
|
||||
author: bartonzzx
|
||||
author_url: https://github.com/bartonzzx
|
||||
git_url:
|
||||
description: Integrate langgraph with open webui pipeline
|
||||
required_open_webui_version: 0.4.3
|
||||
requirements: none
|
||||
version: 0.4.3
|
||||
licence: MIT
|
||||
"""
|
||||
|
||||
|
||||
import os
|
||||
import requests
|
||||
from pydantic import BaseModel, Field
|
||||
from typing import List, Union, Generator, Iterator
|
||||
|
||||
|
||||
class Pipeline:
|
||||
class Valves(BaseModel):
|
||||
API_URL: str = Field(default="http://127.0.0.1:9000/stream", description="Langgraph API URL")
|
||||
|
||||
def __init__(self):
|
||||
self.id = "LangGraph stream"
|
||||
self.name = "LangGraph stream"
|
||||
# Initialize valve paramaters
|
||||
self.valves = self.Valves(
|
||||
**{k: os.getenv(k, v.default) for k, v in self.Valves.model_fields.items()}
|
||||
)
|
||||
|
||||
async def on_startup(self):
|
||||
# This function is called when the server is started.
|
||||
print(f"on_startup: {__name__}")
|
||||
pass
|
||||
|
||||
async def on_shutdown(self):
|
||||
# This function is called when the server is shutdown.
|
||||
print(f"on_shutdown: {__name__}")
|
||||
pass
|
||||
|
||||
def pipe(
|
||||
self,
|
||||
user_message: str,
|
||||
model_id: str,
|
||||
messages: List[dict],
|
||||
body: dict
|
||||
) -> Union[str, Generator, Iterator]:
|
||||
|
||||
data = {
|
||||
"messages": [[msg['role'], msg['content']] for msg in messages],
|
||||
}
|
||||
|
||||
headers = {
|
||||
'accept': 'text/event-stream',
|
||||
'Content-Type': 'application/json',
|
||||
}
|
||||
|
||||
response = requests.post(self.valves.API_URL, json=data, headers=headers, stream=True)
|
||||
|
||||
response.raise_for_status()
|
||||
|
||||
return response.iter_lines()
|
||||
@@ -0,0 +1,40 @@
|
||||
annotated-types==0.7.0
|
||||
anyio==4.8.0
|
||||
certifi==2025.1.31
|
||||
charset-normalizer==3.4.1
|
||||
click==8.1.8
|
||||
distro==1.9.0
|
||||
fastapi==0.115.11
|
||||
h11==0.14.0
|
||||
httpcore==1.0.7
|
||||
httpx==0.28.1
|
||||
idna==3.10
|
||||
jiter==0.9.0
|
||||
jsonpatch==1.33
|
||||
jsonpointer==3.0.0
|
||||
langchain-core==0.3.45
|
||||
langchain-openai==0.3.8
|
||||
langgraph==0.3.11
|
||||
langgraph-checkpoint==2.0.20
|
||||
langgraph-prebuilt==0.1.3
|
||||
langgraph-sdk==0.1.57
|
||||
langsmith==0.3.15
|
||||
msgpack==1.1.0
|
||||
openai==1.66.3
|
||||
orjson==3.10.15
|
||||
packaging==24.2
|
||||
pydantic==2.10.6
|
||||
pydantic_core==2.27.2
|
||||
PyYAML==6.0.2
|
||||
regex==2024.11.6
|
||||
requests==2.32.3
|
||||
requests-toolbelt==1.0.0
|
||||
sniffio==1.3.1
|
||||
starlette==0.46.1
|
||||
tenacity==9.0.0
|
||||
tiktoken==0.9.0
|
||||
tqdm==4.67.1
|
||||
typing_extensions==4.12.2
|
||||
urllib3==2.3.0
|
||||
uvicorn==0.34.0
|
||||
zstandard==0.23.0
|
||||
@@ -6,7 +6,7 @@ version: 1.4
|
||||
license: MIT
|
||||
description: A pipeline for generating text and processing images using the Anthropic API.
|
||||
requirements: requests, sseclient-py
|
||||
environment_variables: ANTHROPIC_API_KEY
|
||||
environment_variables: ANTHROPIC_API_KEY, ANTHROPIC_THINKING_BUDGET_TOKENS, ANTHROPIC_ENABLE_THINKING
|
||||
"""
|
||||
|
||||
import os
|
||||
@@ -18,6 +18,17 @@ import sseclient
|
||||
|
||||
from utils.pipelines.main import pop_system_message
|
||||
|
||||
REASONING_EFFORT_BUDGET_TOKEN_MAP = {
|
||||
"none": None,
|
||||
"low": 1024,
|
||||
"medium": 4096,
|
||||
"high": 16384,
|
||||
"max": 32768,
|
||||
}
|
||||
|
||||
# Maximum combined token limit for Claude 3.7
|
||||
MAX_COMBINED_TOKENS = 64000
|
||||
|
||||
|
||||
class Pipeline:
|
||||
class Valves(BaseModel):
|
||||
@@ -29,16 +40,20 @@ class Pipeline:
|
||||
self.name = "anthropic/"
|
||||
|
||||
self.valves = self.Valves(
|
||||
**{"ANTHROPIC_API_KEY": os.getenv("ANTHROPIC_API_KEY", "your-api-key-here")}
|
||||
**{
|
||||
"ANTHROPIC_API_KEY": os.getenv(
|
||||
"ANTHROPIC_API_KEY", "your-api-key-here"
|
||||
),
|
||||
}
|
||||
)
|
||||
self.url = 'https://api.anthropic.com/v1/messages'
|
||||
self.url = "https://api.anthropic.com/v1/messages"
|
||||
self.update_headers()
|
||||
|
||||
def update_headers(self):
|
||||
self.headers = {
|
||||
'anthropic-version': '2023-06-01',
|
||||
'content-type': 'application/json',
|
||||
'x-api-key': self.valves.ANTHROPIC_API_KEY
|
||||
"anthropic-version": "2023-06-01",
|
||||
"content-type": "application/json",
|
||||
"x-api-key": self.valves.ANTHROPIC_API_KEY,
|
||||
}
|
||||
|
||||
def get_anthropic_models(self):
|
||||
@@ -48,6 +63,7 @@ class Pipeline:
|
||||
{"id": "claude-3-sonnet-20240229", "name": "claude-3-sonnet"},
|
||||
{"id": "claude-3-5-haiku-20241022", "name": "claude-3.5-haiku"},
|
||||
{"id": "claude-3-5-sonnet-20241022", "name": "claude-3.5-sonnet"},
|
||||
{"id": "claude-3-7-sonnet-20250219", "name": "claude-3.7-sonnet"},
|
||||
]
|
||||
|
||||
async def on_startup(self):
|
||||
@@ -87,7 +103,7 @@ class Pipeline:
|
||||
) -> Union[str, Generator, Iterator]:
|
||||
try:
|
||||
# Remove unnecessary keys
|
||||
for key in ['user', 'chat_id', 'title']:
|
||||
for key in ["user", "chat_id", "title"]:
|
||||
body.pop(key, None)
|
||||
|
||||
system_message, messages = pop_system_message(messages)
|
||||
@@ -101,28 +117,40 @@ class Pipeline:
|
||||
if isinstance(message.get("content"), list):
|
||||
for item in message["content"]:
|
||||
if item["type"] == "text":
|
||||
processed_content.append({"type": "text", "text": item["text"]})
|
||||
processed_content.append(
|
||||
{"type": "text", "text": item["text"]}
|
||||
)
|
||||
elif item["type"] == "image_url":
|
||||
if image_count >= 5:
|
||||
raise ValueError("Maximum of 5 images per API call exceeded")
|
||||
raise ValueError(
|
||||
"Maximum of 5 images per API call exceeded"
|
||||
)
|
||||
|
||||
processed_image = self.process_image(item["image_url"])
|
||||
processed_content.append(processed_image)
|
||||
|
||||
if processed_image["source"]["type"] == "base64":
|
||||
image_size = len(processed_image["source"]["data"]) * 3 / 4
|
||||
image_size = (
|
||||
len(processed_image["source"]["data"]) * 3 / 4
|
||||
)
|
||||
else:
|
||||
image_size = 0
|
||||
|
||||
total_image_size += image_size
|
||||
if total_image_size > 100 * 1024 * 1024:
|
||||
raise ValueError("Total size of images exceeds 100 MB limit")
|
||||
raise ValueError(
|
||||
"Total size of images exceeds 100 MB limit"
|
||||
)
|
||||
|
||||
image_count += 1
|
||||
else:
|
||||
processed_content = [{"type": "text", "text": message.get("content", "")}]
|
||||
processed_content = [
|
||||
{"type": "text", "text": message.get("content", "")}
|
||||
]
|
||||
|
||||
processed_messages.append({"role": message["role"], "content": processed_content})
|
||||
processed_messages.append(
|
||||
{"role": message["role"], "content": processed_content}
|
||||
)
|
||||
|
||||
# Prepare the payload
|
||||
payload = {
|
||||
@@ -138,6 +166,43 @@ class Pipeline:
|
||||
}
|
||||
|
||||
if body.get("stream", False):
|
||||
supports_thinking = "claude-3-7" in model_id
|
||||
reasoning_effort = body.get("reasoning_effort", "none")
|
||||
budget_tokens = REASONING_EFFORT_BUDGET_TOKEN_MAP.get(reasoning_effort)
|
||||
|
||||
# Allow users to input an integer value representing budget tokens
|
||||
if (
|
||||
not budget_tokens
|
||||
and reasoning_effort is not None
|
||||
and reasoning_effort not in REASONING_EFFORT_BUDGET_TOKEN_MAP.keys()
|
||||
):
|
||||
try:
|
||||
budget_tokens = int(reasoning_effort)
|
||||
except ValueError as e:
|
||||
print("Failed to convert reasoning effort to int", e)
|
||||
budget_tokens = None
|
||||
|
||||
if supports_thinking and budget_tokens:
|
||||
# Check if the combined tokens (budget_tokens + max_tokens) exceeds the limit
|
||||
max_tokens = payload.get("max_tokens", 4096)
|
||||
combined_tokens = budget_tokens + max_tokens
|
||||
|
||||
if combined_tokens > MAX_COMBINED_TOKENS:
|
||||
error_message = f"Error: Combined tokens (budget_tokens {budget_tokens} + max_tokens {max_tokens} = {combined_tokens}) exceeds the maximum limit of {MAX_COMBINED_TOKENS}"
|
||||
print(error_message)
|
||||
return error_message
|
||||
|
||||
payload["max_tokens"] = combined_tokens
|
||||
payload["thinking"] = {
|
||||
"type": "enabled",
|
||||
"budget_tokens": budget_tokens,
|
||||
}
|
||||
# Thinking requires temperature 1.0 and does not support top_p, top_k
|
||||
payload["temperature"] = 1.0
|
||||
if "top_k" in payload:
|
||||
del payload["top_k"]
|
||||
if "top_p" in payload:
|
||||
del payload["top_p"]
|
||||
return self.stream_response(payload)
|
||||
else:
|
||||
return self.get_completion(payload)
|
||||
@@ -145,31 +210,64 @@ class Pipeline:
|
||||
return f"Error: {e}"
|
||||
|
||||
def stream_response(self, payload: dict) -> Generator:
|
||||
response = requests.post(self.url, headers=self.headers, json=payload, stream=True)
|
||||
"""Used for title and tag generation"""
|
||||
try:
|
||||
response = requests.post(
|
||||
self.url, headers=self.headers, json=payload, stream=True
|
||||
)
|
||||
print(f"{response} for {payload}")
|
||||
|
||||
if response.status_code == 200:
|
||||
client = sseclient.SSEClient(response)
|
||||
for event in client.events():
|
||||
try:
|
||||
data = json.loads(event.data)
|
||||
if data["type"] == "content_block_start":
|
||||
yield data["content_block"]["text"]
|
||||
elif data["type"] == "content_block_delta":
|
||||
yield data["delta"]["text"]
|
||||
elif data["type"] == "message_stop":
|
||||
break
|
||||
except json.JSONDecodeError:
|
||||
print(f"Failed to parse JSON: {event.data}")
|
||||
except KeyError as e:
|
||||
print(f"Unexpected data structure: {e}")
|
||||
print(f"Full data: {data}")
|
||||
else:
|
||||
raise Exception(f"Error: {response.status_code} - {response.text}")
|
||||
if response.status_code == 200:
|
||||
client = sseclient.SSEClient(response)
|
||||
for event in client.events():
|
||||
try:
|
||||
data = json.loads(event.data)
|
||||
if data["type"] == "content_block_start":
|
||||
if data["content_block"]["type"] == "thinking":
|
||||
yield "<think>"
|
||||
else:
|
||||
yield data["content_block"]["text"]
|
||||
elif data["type"] == "content_block_delta":
|
||||
if data["delta"]["type"] == "thinking_delta":
|
||||
yield data["delta"]["thinking"]
|
||||
elif data["delta"]["type"] == "signature_delta":
|
||||
yield "\n </think> \n\n"
|
||||
else:
|
||||
yield data["delta"]["text"]
|
||||
elif data["type"] == "message_stop":
|
||||
break
|
||||
except json.JSONDecodeError:
|
||||
print(f"Failed to parse JSON: {event.data}")
|
||||
yield f"Error: Failed to parse JSON response"
|
||||
except KeyError as e:
|
||||
print(f"Unexpected data structure: {e} for payload {payload}")
|
||||
print(f"Full data: {data}")
|
||||
yield f"Error: Unexpected data structure: {e}"
|
||||
else:
|
||||
error_message = f"Error: {response.status_code} - {response.text}"
|
||||
print(error_message)
|
||||
yield error_message
|
||||
except Exception as e:
|
||||
error_message = f"Error: {str(e)}"
|
||||
print(error_message)
|
||||
yield error_message
|
||||
|
||||
def get_completion(self, payload: dict) -> str:
|
||||
response = requests.post(self.url, headers=self.headers, json=payload)
|
||||
if response.status_code == 200:
|
||||
res = response.json()
|
||||
return res["content"][0]["text"] if "content" in res and res["content"] else ""
|
||||
else:
|
||||
raise Exception(f"Error: {response.status_code} - {response.text}")
|
||||
try:
|
||||
response = requests.post(self.url, headers=self.headers, json=payload)
|
||||
print(response, payload)
|
||||
if response.status_code == 200:
|
||||
res = response.json()
|
||||
for content in res["content"]:
|
||||
if not content.get("text"):
|
||||
continue
|
||||
return content["text"]
|
||||
return ""
|
||||
else:
|
||||
error_message = f"Error: {response.status_code} - {response.text}"
|
||||
print(error_message)
|
||||
return error_message
|
||||
except Exception as e:
|
||||
error_message = f"Error: {str(e)}"
|
||||
print(error_message)
|
||||
return error_message
|
||||
|
||||
@@ -12,7 +12,7 @@ import base64
|
||||
import json
|
||||
import logging
|
||||
from io import BytesIO
|
||||
from typing import List, Union, Generator, Iterator
|
||||
from typing import List, Union, Generator, Iterator, Optional, Any
|
||||
|
||||
import boto3
|
||||
|
||||
@@ -23,12 +23,23 @@ import requests
|
||||
|
||||
from utils.pipelines.main import pop_system_message
|
||||
|
||||
REASONING_EFFORT_BUDGET_TOKEN_MAP = {
|
||||
"none": None,
|
||||
"low": 1024,
|
||||
"medium": 4096,
|
||||
"high": 16384,
|
||||
"max": 32768,
|
||||
}
|
||||
|
||||
# Maximum combined token limit for Claude 3.7
|
||||
MAX_COMBINED_TOKENS = 64000
|
||||
|
||||
|
||||
class Pipeline:
|
||||
class Valves(BaseModel):
|
||||
AWS_ACCESS_KEY: str = ""
|
||||
AWS_SECRET_KEY: str = ""
|
||||
AWS_REGION_NAME: str = ""
|
||||
AWS_ACCESS_KEY: Optional[str] = None
|
||||
AWS_SECRET_KEY: Optional[str] = None
|
||||
AWS_REGION_NAME: Optional[str] = None
|
||||
|
||||
def __init__(self):
|
||||
self.type = "manifold"
|
||||
@@ -47,21 +58,25 @@ class Pipeline:
|
||||
}
|
||||
)
|
||||
|
||||
self.bedrock = boto3.client(aws_access_key_id=self.valves.AWS_ACCESS_KEY,
|
||||
aws_secret_access_key=self.valves.AWS_SECRET_KEY,
|
||||
service_name="bedrock",
|
||||
region_name=self.valves.AWS_REGION_NAME)
|
||||
self.bedrock_runtime = boto3.client(aws_access_key_id=self.valves.AWS_ACCESS_KEY,
|
||||
aws_secret_access_key=self.valves.AWS_SECRET_KEY,
|
||||
service_name="bedrock-runtime",
|
||||
region_name=self.valves.AWS_REGION_NAME)
|
||||
self.valves = self.Valves(
|
||||
**{
|
||||
"AWS_ACCESS_KEY": os.getenv("AWS_ACCESS_KEY", ""),
|
||||
"AWS_SECRET_KEY": os.getenv("AWS_SECRET_KEY", ""),
|
||||
"AWS_REGION_NAME": os.getenv(
|
||||
"AWS_REGION_NAME", os.getenv(
|
||||
"AWS_REGION", os.getenv("AWS_DEFAULT_REGION", "")
|
||||
)
|
||||
),
|
||||
}
|
||||
)
|
||||
|
||||
self.pipelines = self.get_models()
|
||||
self.update_pipelines()
|
||||
|
||||
|
||||
async def on_startup(self):
|
||||
# This function is called when the server is started.
|
||||
print(f"on_startup:{__name__}")
|
||||
self.update_pipelines()
|
||||
pass
|
||||
|
||||
async def on_shutdown(self):
|
||||
@@ -72,40 +87,58 @@ class Pipeline:
|
||||
async def on_valves_updated(self):
|
||||
# This function is called when the valves are updated.
|
||||
print(f"on_valves_updated:{__name__}")
|
||||
self.bedrock = boto3.client(aws_access_key_id=self.valves.AWS_ACCESS_KEY,
|
||||
aws_secret_access_key=self.valves.AWS_SECRET_KEY,
|
||||
service_name="bedrock",
|
||||
region_name=self.valves.AWS_REGION_NAME)
|
||||
self.bedrock_runtime = boto3.client(aws_access_key_id=self.valves.AWS_ACCESS_KEY,
|
||||
aws_secret_access_key=self.valves.AWS_SECRET_KEY,
|
||||
service_name="bedrock-runtime",
|
||||
region_name=self.valves.AWS_REGION_NAME)
|
||||
self.pipelines = self.get_models()
|
||||
self.update_pipelines()
|
||||
|
||||
def pipelines(self) -> List[dict]:
|
||||
return self.get_models()
|
||||
def update_pipelines(self) -> None:
|
||||
try:
|
||||
self.bedrock = boto3.client(service_name="bedrock",
|
||||
aws_access_key_id=self.valves.AWS_ACCESS_KEY,
|
||||
aws_secret_access_key=self.valves.AWS_SECRET_KEY,
|
||||
region_name=self.valves.AWS_REGION_NAME)
|
||||
self.bedrock_runtime = boto3.client(service_name="bedrock-runtime",
|
||||
aws_access_key_id=self.valves.AWS_ACCESS_KEY,
|
||||
aws_secret_access_key=self.valves.AWS_SECRET_KEY,
|
||||
region_name=self.valves.AWS_REGION_NAME)
|
||||
self.pipelines = self.get_models()
|
||||
except Exception as e:
|
||||
print(f"Error: {e}")
|
||||
self.pipelines = [
|
||||
{
|
||||
"id": "error",
|
||||
"name": "Could not fetch models from Bedrock, please set up AWS Key/Secret or Instance/Task Role.",
|
||||
},
|
||||
]
|
||||
|
||||
def get_models(self):
|
||||
if self.valves.AWS_ACCESS_KEY and self.valves.AWS_SECRET_KEY:
|
||||
try:
|
||||
response = self.bedrock.list_foundation_models(byProvider='Anthropic', byInferenceType='ON_DEMAND')
|
||||
return [
|
||||
{
|
||||
"id": model["modelId"],
|
||||
"name": model["modelName"],
|
||||
}
|
||||
for model in response["modelSummaries"]
|
||||
]
|
||||
except Exception as e:
|
||||
print(f"Error: {e}")
|
||||
return [
|
||||
{
|
||||
"id": "error",
|
||||
"name": "Could not fetch models from Bedrock, please update the Access/Secret Key in the valves.",
|
||||
},
|
||||
]
|
||||
else:
|
||||
return []
|
||||
try:
|
||||
res = []
|
||||
response = self.bedrock.list_foundation_models(byProvider='Anthropic')
|
||||
for model in response['modelSummaries']:
|
||||
inference_types = model.get('inferenceTypesSupported', [])
|
||||
if "ON_DEMAND" in inference_types:
|
||||
res.append({'id': model['modelId'], 'name': model['modelName']})
|
||||
elif "INFERENCE_PROFILE" in inference_types:
|
||||
inferenceProfileId = self.getInferenceProfileId(model['modelArn'])
|
||||
if inferenceProfileId:
|
||||
res.append({'id': inferenceProfileId, 'name': model['modelName']})
|
||||
|
||||
return res
|
||||
except Exception as e:
|
||||
print(f"Error: {e}")
|
||||
return [
|
||||
{
|
||||
"id": "error",
|
||||
"name": "Could not fetch models from Bedrock, please check permissoin.",
|
||||
},
|
||||
]
|
||||
|
||||
def getInferenceProfileId(self, modelArn: str) -> str:
|
||||
response = self.bedrock.list_inference_profiles()
|
||||
for profile in response.get('inferenceProfileSummaries', []):
|
||||
for model in profile.get('models', []):
|
||||
if model.get('modelArn') == modelArn:
|
||||
return profile['inferenceProfileId']
|
||||
return None
|
||||
|
||||
def pipe(
|
||||
self, user_message: str, model_id: str, messages: List[dict], body: dict
|
||||
@@ -139,11 +172,53 @@ class Pipeline:
|
||||
|
||||
payload = {"modelId": model_id,
|
||||
"messages": processed_messages,
|
||||
"system": [{'text': system_message if system_message else 'you are an intelligent ai assistant'}],
|
||||
"inferenceConfig": {"temperature": body.get("temperature", 0.5)},
|
||||
"additionalModelRequestFields": {"top_k": body.get("top_k", 200), "top_p": body.get("top_p", 0.9)}
|
||||
"system": [{'text': system_message["content"] if system_message else 'you are an intelligent ai assistant'}],
|
||||
"inferenceConfig": {
|
||||
"temperature": body.get("temperature", 0.5),
|
||||
"topP": body.get("top_p", 0.9),
|
||||
"maxTokens": body.get("max_tokens", 4096),
|
||||
"stopSequences": body.get("stop", []),
|
||||
},
|
||||
"additionalModelRequestFields": {"top_k": body.get("top_k", 200)}
|
||||
}
|
||||
|
||||
if body.get("stream", False):
|
||||
supports_thinking = "claude-3-7" in model_id
|
||||
reasoning_effort = body.get("reasoning_effort", "none")
|
||||
budget_tokens = REASONING_EFFORT_BUDGET_TOKEN_MAP.get(reasoning_effort)
|
||||
|
||||
# Allow users to input an integer value representing budget tokens
|
||||
if (
|
||||
not budget_tokens
|
||||
and reasoning_effort not in REASONING_EFFORT_BUDGET_TOKEN_MAP.keys()
|
||||
):
|
||||
try:
|
||||
budget_tokens = int(reasoning_effort)
|
||||
except ValueError as e:
|
||||
print("Failed to convert reasoning effort to int", e)
|
||||
budget_tokens = None
|
||||
|
||||
if supports_thinking and budget_tokens:
|
||||
# Check if the combined tokens (budget_tokens + max_tokens) exceeds the limit
|
||||
max_tokens = payload.get("max_tokens", 4096)
|
||||
combined_tokens = budget_tokens + max_tokens
|
||||
|
||||
if combined_tokens > MAX_COMBINED_TOKENS:
|
||||
error_message = f"Error: Combined tokens (budget_tokens {budget_tokens} + max_tokens {max_tokens} = {combined_tokens}) exceeds the maximum limit of {MAX_COMBINED_TOKENS}"
|
||||
print(error_message)
|
||||
return error_message
|
||||
|
||||
payload["inferenceConfig"]["maxTokens"] = combined_tokens
|
||||
payload["additionalModelRequestFields"]["thinking"] = {
|
||||
"type": "enabled",
|
||||
"budget_tokens": budget_tokens,
|
||||
}
|
||||
# Thinking requires temperature 1.0 and does not support top_p, top_k
|
||||
payload["inferenceConfig"]["temperature"] = 1.0
|
||||
if "top_k" in payload["additionalModelRequestFields"]:
|
||||
del payload["additionalModelRequestFields"]["top_k"]
|
||||
if "topP" in payload["inferenceConfig"]:
|
||||
del payload["inferenceConfig"]["topP"]
|
||||
return self.stream_response(model_id, payload)
|
||||
else:
|
||||
return self.get_completion(model_id, payload)
|
||||
@@ -152,30 +227,45 @@ class Pipeline:
|
||||
|
||||
def process_image(self, image: str):
|
||||
img_stream = None
|
||||
if image["url"].startswith("data:image"):
|
||||
if ',' in image["url"]:
|
||||
base64_string = image["url"].split(',')[1]
|
||||
image_data = base64.b64decode(base64_string)
|
||||
content_type = None
|
||||
|
||||
if image["url"].startswith("data:image"):
|
||||
mime_type, base64_string = image["url"].split(",", 1)
|
||||
content_type = mime_type.split(":")[1].split(";")[0]
|
||||
image_data = base64.b64decode(base64_string)
|
||||
img_stream = BytesIO(image_data)
|
||||
else:
|
||||
img_stream = requests.get(image["url"]).content
|
||||
response = requests.get(image["url"])
|
||||
img_stream = BytesIO(response.content)
|
||||
content_type = response.headers.get('Content-Type', 'image/jpeg')
|
||||
|
||||
media_type = content_type.split('/')[-1] if '/' in content_type else content_type
|
||||
return {
|
||||
"image": {"format": "png" if image["url"].endswith(".png") else "jpeg",
|
||||
"source": {"bytes": img_stream.read()}}
|
||||
"image": {
|
||||
"format": media_type,
|
||||
"source": {"bytes": img_stream.read()}
|
||||
}
|
||||
}
|
||||
|
||||
def stream_response(self, model_id: str, payload: dict) -> Generator:
|
||||
if "system" in payload:
|
||||
del payload["system"]
|
||||
if "additionalModelRequestFields" in payload:
|
||||
del payload["additionalModelRequestFields"]
|
||||
streaming_response = self.bedrock_runtime.converse_stream(**payload)
|
||||
|
||||
in_resasoning_context = False
|
||||
for chunk in streaming_response["stream"]:
|
||||
if "contentBlockDelta" in chunk:
|
||||
yield chunk["contentBlockDelta"]["delta"]["text"]
|
||||
if in_resasoning_context and "contentBlockStop" in chunk:
|
||||
in_resasoning_context = False
|
||||
yield "\n </think> \n\n"
|
||||
elif "contentBlockDelta" in chunk and "delta" in chunk["contentBlockDelta"]:
|
||||
if "reasoningContent" in chunk["contentBlockDelta"]["delta"]:
|
||||
if not in_resasoning_context:
|
||||
yield "<think>"
|
||||
|
||||
in_resasoning_context = True
|
||||
if "text" in chunk["contentBlockDelta"]["delta"]["reasoningContent"]:
|
||||
yield chunk["contentBlockDelta"]["delta"]["reasoningContent"]["text"]
|
||||
elif "text" in chunk["contentBlockDelta"]["delta"]:
|
||||
yield chunk["contentBlockDelta"]["delta"]["text"]
|
||||
|
||||
def get_completion(self, model_id: str, payload: dict) -> str:
|
||||
response = self.bedrock_runtime.converse(**payload)
|
||||
return response['output']['message']['content'][0]['text']
|
||||
|
||||
|
||||
187
examples/pipelines/providers/aws_bedrock_deepseek_pipeline.py
Normal file
187
examples/pipelines/providers/aws_bedrock_deepseek_pipeline.py
Normal file
@@ -0,0 +1,187 @@
|
||||
"""
|
||||
title: AWS Bedrock DeepSeek Pipeline
|
||||
author: kikumoto
|
||||
date: 2025-03-17
|
||||
version: 1.0
|
||||
license: MIT
|
||||
description: A pipeline for generating text using the AWS Bedrock API.
|
||||
requirements: boto3
|
||||
environment_variables:
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
|
||||
from typing import List, Union, Generator, Iterator, Dict, Optional, Any
|
||||
|
||||
import boto3
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
import os
|
||||
|
||||
from utils.pipelines.main import pop_system_message
|
||||
|
||||
class Pipeline:
|
||||
class Valves(BaseModel):
|
||||
AWS_ACCESS_KEY: Optional[str] = None
|
||||
AWS_SECRET_KEY: Optional[str] = None
|
||||
AWS_REGION_NAME: Optional[str] = None
|
||||
|
||||
def __init__(self):
|
||||
self.type = "manifold"
|
||||
self.name = "Bedrock DeepSeek: "
|
||||
|
||||
self.valves = self.Valves(
|
||||
**{
|
||||
"AWS_ACCESS_KEY": os.getenv("AWS_ACCESS_KEY", ""),
|
||||
"AWS_SECRET_KEY": os.getenv("AWS_SECRET_KEY", ""),
|
||||
"AWS_REGION_NAME": os.getenv(
|
||||
"AWS_REGION_NAME", os.getenv(
|
||||
"AWS_REGION", os.getenv("AWS_DEFAULT_REGION", "")
|
||||
)
|
||||
),
|
||||
}
|
||||
)
|
||||
|
||||
self.update_pipelines()
|
||||
|
||||
async def on_startup(self):
|
||||
# This function is called when the server is started.
|
||||
print(f"on_startup:{__name__}")
|
||||
self.update_pipelines()
|
||||
pass
|
||||
|
||||
async def on_shutdown(self):
|
||||
# This function is called when the server is stopped.
|
||||
print(f"on_shutdown:{__name__}")
|
||||
pass
|
||||
|
||||
async def on_valves_updated(self):
|
||||
# This function is called when the valves are updated.
|
||||
print(f"on_valves_updated:{__name__}")
|
||||
self.update_pipelines()
|
||||
|
||||
def update_pipelines(self) -> None:
|
||||
try:
|
||||
self.bedrock = boto3.client(service_name="bedrock",
|
||||
aws_access_key_id=self.valves.AWS_ACCESS_KEY,
|
||||
aws_secret_access_key=self.valves.AWS_SECRET_KEY,
|
||||
region_name=self.valves.AWS_REGION_NAME)
|
||||
self.bedrock_runtime = boto3.client(service_name="bedrock-runtime",
|
||||
aws_access_key_id=self.valves.AWS_ACCESS_KEY,
|
||||
aws_secret_access_key=self.valves.AWS_SECRET_KEY,
|
||||
region_name=self.valves.AWS_REGION_NAME)
|
||||
self.pipelines = self.get_models()
|
||||
except Exception as e:
|
||||
print(f"Error: {e}")
|
||||
self.pipelines = [
|
||||
{
|
||||
"id": "error",
|
||||
"name": "Could not fetch models from Bedrock, please set up AWS Key/Secret or Instance/Task Role.",
|
||||
},
|
||||
]
|
||||
|
||||
def pipelines(self) -> List[dict]:
|
||||
return self.get_models()
|
||||
|
||||
def get_models(self):
|
||||
try:
|
||||
res = []
|
||||
response = self.bedrock.list_foundation_models(byProvider='DeepSeek')
|
||||
for model in response['modelSummaries']:
|
||||
inference_types = model.get('inferenceTypesSupported', [])
|
||||
if "ON_DEMAND" in inference_types:
|
||||
res.append({'id': model['modelId'], 'name': model['modelName']})
|
||||
elif "INFERENCE_PROFILE" in inference_types:
|
||||
inferenceProfileId = self.getInferenceProfileId(model['modelArn'])
|
||||
if inferenceProfileId:
|
||||
res.append({'id': inferenceProfileId, 'name': model['modelName']})
|
||||
|
||||
return res
|
||||
except Exception as e:
|
||||
print(f"Error: {e}")
|
||||
return [
|
||||
{
|
||||
"id": "error",
|
||||
"name": "Could not fetch models from Bedrock, please check permissoin.",
|
||||
},
|
||||
]
|
||||
|
||||
def getInferenceProfileId(self, modelArn: str) -> str:
|
||||
response = self.bedrock.list_inference_profiles()
|
||||
for profile in response.get('inferenceProfileSummaries', []):
|
||||
for model in profile.get('models', []):
|
||||
if model.get('modelArn') == modelArn:
|
||||
return profile['inferenceProfileId']
|
||||
return None
|
||||
|
||||
def pipe(
|
||||
self, user_message: str, model_id: str, messages: List[dict], body: dict
|
||||
) -> Union[str, Generator, Iterator]:
|
||||
# This is where you can add your custom pipelines like RAG.
|
||||
print(f"pipe:{__name__}")
|
||||
|
||||
try:
|
||||
# Remove unnecessary keys
|
||||
for key in ['user', 'chat_id', 'title']:
|
||||
body.pop(key, None)
|
||||
|
||||
system_message, messages = pop_system_message(messages)
|
||||
|
||||
logging.info(f"pop_system_message: {json.dumps(messages)}")
|
||||
|
||||
processed_messages = []
|
||||
for message in messages:
|
||||
processed_content = []
|
||||
if isinstance(message.get("content"), list):
|
||||
for item in message["content"]:
|
||||
# DeepSeek currently doesn't support multi-modal inputs
|
||||
if item["type"] == "text":
|
||||
processed_content.append({"text": item["text"]})
|
||||
else:
|
||||
processed_content = [{"text": message.get("content", "")}]
|
||||
|
||||
processed_messages.append({"role": message["role"], "content": processed_content})
|
||||
|
||||
payload = {"modelId": model_id,
|
||||
"system": [{'text': system_message["content"] if system_message else 'you are an intelligent ai assistant'}],
|
||||
"messages": processed_messages,
|
||||
"inferenceConfig": {
|
||||
"temperature": body.get("temperature", 0.5),
|
||||
"topP": body.get("top_p", 0.9),
|
||||
"maxTokens": body.get("max_tokens", 8192),
|
||||
"stopSequences": body.get("stop", []),
|
||||
},
|
||||
}
|
||||
|
||||
if body.get("stream", False):
|
||||
return self.stream_response(model_id, payload)
|
||||
else:
|
||||
return self.get_completion(model_id, payload)
|
||||
|
||||
except Exception as e:
|
||||
return f"Error: {e}"
|
||||
|
||||
def stream_response(self, model_id: str, payload: dict) -> Generator:
|
||||
streaming_response = self.bedrock_runtime.converse_stream(**payload)
|
||||
|
||||
in_resasoning_context = False
|
||||
for chunk in streaming_response["stream"]:
|
||||
if in_resasoning_context and "contentBlockStop" in chunk:
|
||||
in_resasoning_context = False
|
||||
yield "\n </think> \n\n"
|
||||
elif "contentBlockDelta" in chunk and "delta" in chunk["contentBlockDelta"]:
|
||||
if "reasoningContent" in chunk["contentBlockDelta"]["delta"]:
|
||||
if not in_resasoning_context:
|
||||
yield "<think>"
|
||||
|
||||
in_resasoning_context = True
|
||||
if "text" in chunk["contentBlockDelta"]["delta"]["reasoningContent"]:
|
||||
yield chunk["contentBlockDelta"]["delta"]["reasoningContent"]["text"]
|
||||
elif "text" in chunk["contentBlockDelta"]["delta"]:
|
||||
yield chunk["contentBlockDelta"]["delta"]["text"]
|
||||
|
||||
def get_completion(self, model_id: str, payload: dict) -> str:
|
||||
response = self.bedrock_runtime.converse(**payload)
|
||||
return response['output']['message']['content'][0]['text']
|
||||
@@ -0,0 +1,89 @@
|
||||
"""
|
||||
title: Azure - Dall-E Manifold Pipeline
|
||||
author: weisser-dev
|
||||
date: 2025-03-26
|
||||
version: 1.0
|
||||
license: MIT
|
||||
description: A pipeline for generating text and processing images using the Azure API. And including multiple Dall-e models
|
||||
requirements: requests, os
|
||||
environment_variables: AZURE_OPENAI_API_KEY, AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_API_VERSION, AZURE_OPENAI_MODELS, AZURE_OPENAI_MODEL_NAMES, IMAGE_SIZE, NUM_IMAGES
|
||||
"""
|
||||
from typing import List, Union, Generator, Iterator
|
||||
from pydantic import BaseModel
|
||||
import requests
|
||||
import os
|
||||
|
||||
class Pipeline:
|
||||
class Valves(BaseModel):
|
||||
AZURE_OPENAI_API_KEY: str
|
||||
AZURE_OPENAI_ENDPOINT: str
|
||||
AZURE_OPENAI_API_VERSION: str
|
||||
AZURE_OPENAI_MODELS: str
|
||||
AZURE_OPENAI_MODEL_NAMES: str
|
||||
IMAGE_SIZE: str = "1024x1024"
|
||||
NUM_IMAGES: int = 1
|
||||
|
||||
def __init__(self):
|
||||
self.type = "manifold"
|
||||
self.name = "Azure DALL·E: "
|
||||
self.valves = self.Valves(
|
||||
**{
|
||||
"AZURE_OPENAI_API_KEY": os.getenv("AZURE_OPENAI_API_KEY", "your-azure-openai-api-key-here"),
|
||||
"AZURE_OPENAI_ENDPOINT": os.getenv("AZURE_OPENAI_ENDPOINT", "your-azure-openai-endpoint-here"),
|
||||
"AZURE_OPENAI_API_VERSION": os.getenv("AZURE_OPENAI_API_VERSION", "2024-02-01"),
|
||||
"AZURE_OPENAI_MODELS": os.getenv("AZURE_OPENAI_MODELS", "dall-e-2;dall-e-3"), #ensure that the model here is within your enpoint url, sometime the name within the url it is also like Dalle3
|
||||
"AZURE_OPENAI_MODEL_NAMES": os.getenv("AZURE_OPENAI_MODEL_NAMES", "DALL-E 2;DALL-E 3"),
|
||||
}
|
||||
)
|
||||
self.set_pipelines()
|
||||
|
||||
def set_pipelines(self):
|
||||
models = self.valves.AZURE_OPENAI_MODELS.split(";")
|
||||
model_names = self.valves.AZURE_OPENAI_MODEL_NAMES.split(";")
|
||||
self.pipelines = [
|
||||
{"id": model, "name": name} for model, name in zip(models, model_names)
|
||||
]
|
||||
print(f"azure_dalle_pipeline - models: {self.pipelines}")
|
||||
|
||||
async def on_startup(self) -> None:
|
||||
print(f"on_startup:{__name__}")
|
||||
|
||||
async def on_shutdown(self):
|
||||
print(f"on_shutdown:{__name__}")
|
||||
|
||||
async def on_valves_updated(self):
|
||||
print(f"on_valves_updated:{__name__}")
|
||||
self.set_pipelines()
|
||||
|
||||
def pipe(
|
||||
self, user_message: str, model_id: str, messages: List[dict], body: dict
|
||||
) -> Union[str, Generator, Iterator]:
|
||||
print(f"pipe:{__name__}")
|
||||
|
||||
headers = {
|
||||
"api-key": self.valves.AZURE_OPENAI_API_KEY,
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
url = f"{self.valves.AZURE_OPENAI_ENDPOINT}/openai/deployments/{model_id}/images/generations?api-version={self.valves.AZURE_OPENAI_API_VERSION}"
|
||||
|
||||
payload = {
|
||||
"model": model_id,
|
||||
"prompt": user_message,
|
||||
"size": self.valves.IMAGE_SIZE,
|
||||
"n": self.valves.NUM_IMAGES,
|
||||
}
|
||||
|
||||
try:
|
||||
response = requests.post(url, json=payload, headers=headers)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
|
||||
message = ""
|
||||
for image in data.get("data", []):
|
||||
if "url" in image:
|
||||
message += f"\n"
|
||||
|
||||
yield message
|
||||
except Exception as e:
|
||||
yield f"Error: {e} ({response.text if response else 'No response'})"
|
||||
@@ -5,7 +5,7 @@ date: 2024-06-06
|
||||
version: 1.3
|
||||
license: MIT
|
||||
description: A pipeline for generating text using Google's GenAI models in Open-WebUI.
|
||||
requirements: google-generativeai
|
||||
requirements: google-genai
|
||||
environment_variables: GOOGLE_API_KEY
|
||||
"""
|
||||
|
||||
@@ -14,8 +14,11 @@ import os
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
import google.generativeai as genai
|
||||
from google.generativeai.types import GenerationConfig
|
||||
from google import genai
|
||||
from google.genai import types
|
||||
from PIL import Image
|
||||
from io import BytesIO
|
||||
import base64
|
||||
|
||||
|
||||
class Pipeline:
|
||||
@@ -24,8 +27,9 @@ class Pipeline:
|
||||
class Valves(BaseModel):
|
||||
"""Options to change from the WebUI"""
|
||||
|
||||
GOOGLE_API_KEY: str = ""
|
||||
USE_PERMISSIVE_SAFETY: bool = Field(default=False)
|
||||
GOOGLE_API_KEY: str = Field(default="",description="Google Generative AI API key")
|
||||
USE_PERMISSIVE_SAFETY: bool = Field(default=False,description="Use permissive safety settings")
|
||||
GENERATE_IMAGE: bool = Field(default=False,description="Allow image generation")
|
||||
|
||||
def __init__(self):
|
||||
self.type = "manifold"
|
||||
@@ -34,19 +38,20 @@ class Pipeline:
|
||||
|
||||
self.valves = self.Valves(**{
|
||||
"GOOGLE_API_KEY": os.getenv("GOOGLE_API_KEY", ""),
|
||||
"USE_PERMISSIVE_SAFETY": False
|
||||
"USE_PERMISSIVE_SAFETY": False,
|
||||
"GENERATE_IMAGE": False
|
||||
})
|
||||
self.pipelines = []
|
||||
|
||||
genai.configure(api_key=self.valves.GOOGLE_API_KEY)
|
||||
self.update_pipelines()
|
||||
if self.valves.GOOGLE_API_KEY:
|
||||
self.update_pipelines()
|
||||
|
||||
async def on_startup(self) -> None:
|
||||
"""This function is called when the server is started."""
|
||||
|
||||
print(f"on_startup:{__name__}")
|
||||
genai.configure(api_key=self.valves.GOOGLE_API_KEY)
|
||||
self.update_pipelines()
|
||||
if self.valves.GOOGLE_API_KEY:
|
||||
self.update_pipelines()
|
||||
|
||||
async def on_shutdown(self) -> None:
|
||||
"""This function is called when the server is stopped."""
|
||||
@@ -57,22 +62,23 @@ class Pipeline:
|
||||
"""This function is called when the valves are updated."""
|
||||
|
||||
print(f"on_valves_updated:{__name__}")
|
||||
genai.configure(api_key=self.valves.GOOGLE_API_KEY)
|
||||
self.update_pipelines()
|
||||
if self.valves.GOOGLE_API_KEY:
|
||||
self.update_pipelines()
|
||||
|
||||
def update_pipelines(self) -> None:
|
||||
"""Update the available models from Google GenAI"""
|
||||
|
||||
if self.valves.GOOGLE_API_KEY:
|
||||
client = genai.Client(api_key=self.valves.GOOGLE_API_KEY)
|
||||
try:
|
||||
models = genai.list_models()
|
||||
models = client.models.list()
|
||||
self.pipelines = [
|
||||
{
|
||||
"id": model.name[7:], # the "models/" part messeses up the URL
|
||||
"name": model.display_name,
|
||||
}
|
||||
for model in models
|
||||
if "generateContent" in model.supported_generation_methods
|
||||
if "generateContent" in model.supported_actions
|
||||
if model.name[:7] == "models/"
|
||||
]
|
||||
except Exception:
|
||||
@@ -92,13 +98,13 @@ class Pipeline:
|
||||
return "Error: GOOGLE_API_KEY is not set"
|
||||
|
||||
try:
|
||||
genai.configure(api_key=self.valves.GOOGLE_API_KEY)
|
||||
client = genai.Client(api_key=self.valves.GOOGLE_API_KEY)
|
||||
|
||||
if model_id.startswith("google_genai."):
|
||||
model_id = model_id[12:]
|
||||
model_id = model_id.lstrip(".")
|
||||
|
||||
if not model_id.startswith("gemini-"):
|
||||
if not (model_id.startswith("gemini-") or model_id.startswith("learnlm-") or model_id.startswith("gemma-")):
|
||||
return f"Error: Invalid model name format: {model_id}"
|
||||
|
||||
print(f"Pipe function called for model: {model_id}")
|
||||
@@ -127,50 +133,78 @@ class Pipeline:
|
||||
"role": "user" if message["role"] == "user" else "model",
|
||||
"parts": [{"text": message["content"]}]
|
||||
})
|
||||
|
||||
if "gemini-1.5" in model_id:
|
||||
model = genai.GenerativeModel(model_name=model_id, system_instruction=system_message)
|
||||
else:
|
||||
if system_message:
|
||||
contents.insert(0, {"role": "user", "parts": [{"text": f"System: {system_message}"}]})
|
||||
|
||||
model = genai.GenerativeModel(model_name=model_id)
|
||||
print(f"{contents}")
|
||||
|
||||
generation_config = GenerationConfig(
|
||||
temperature=body.get("temperature", 0.7),
|
||||
top_p=body.get("top_p", 0.9),
|
||||
top_k=body.get("top_k", 40),
|
||||
max_output_tokens=body.get("max_tokens", 8192),
|
||||
stop_sequences=body.get("stop", []),
|
||||
)
|
||||
generation_config = {
|
||||
"temperature": body.get("temperature", 0.7),
|
||||
"top_p": body.get("top_p", 0.9),
|
||||
"top_k": body.get("top_k", 40),
|
||||
"max_output_tokens": body.get("max_tokens", 8192),
|
||||
"stop_sequences": body.get("stop", []),
|
||||
"response_modalities": ['Text']
|
||||
}
|
||||
|
||||
if self.valves.GENERATE_IMAGE and model_id.startswith("gemini-2.0-flash-exp"):
|
||||
generation_config["response_modalities"].append("Image")
|
||||
|
||||
if self.valves.USE_PERMISSIVE_SAFETY:
|
||||
safety_settings = {
|
||||
genai.types.HarmCategory.HARM_CATEGORY_HARASSMENT: genai.types.HarmBlockThreshold.BLOCK_NONE,
|
||||
genai.types.HarmCategory.HARM_CATEGORY_HATE_SPEECH: genai.types.HarmBlockThreshold.BLOCK_NONE,
|
||||
genai.types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: genai.types.HarmBlockThreshold.BLOCK_NONE,
|
||||
genai.types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: genai.types.HarmBlockThreshold.BLOCK_NONE,
|
||||
}
|
||||
safety_settings = [
|
||||
types.SafetySetting(category='HARM_CATEGORY_HARASSMENT', threshold='OFF'),
|
||||
types.SafetySetting(category='HARM_CATEGORY_HATE_SPEECH', threshold='OFF'),
|
||||
types.SafetySetting(category='HARM_CATEGORY_SEXUALLY_EXPLICIT', threshold='OFF'),
|
||||
types.SafetySetting(category='HARM_CATEGORY_DANGEROUS_CONTENT', threshold='OFF'),
|
||||
types.SafetySetting(category='HARM_CATEGORY_CIVIC_INTEGRITY', threshold='OFF')
|
||||
]
|
||||
generation_config = types.GenerateContentConfig(**generation_config, safety_settings=safety_settings)
|
||||
else:
|
||||
safety_settings = body.get("safety_settings")
|
||||
generation_config = types.GenerateContentConfig(**generation_config)
|
||||
|
||||
response = model.generate_content(
|
||||
contents,
|
||||
generation_config=generation_config,
|
||||
safety_settings=safety_settings,
|
||||
stream=body.get("stream", False),
|
||||
)
|
||||
if system_message:
|
||||
contents.insert(0, {"role": "user", "parts": [{"text": f"System: {system_message}"}]})
|
||||
|
||||
if body.get("stream", False):
|
||||
response = client.models.generate_content_stream(
|
||||
model = model_id,
|
||||
contents = contents,
|
||||
config = generation_config,
|
||||
)
|
||||
return self.stream_response(response)
|
||||
else:
|
||||
return response.text
|
||||
response = client.models.generate_content(
|
||||
model = model_id,
|
||||
contents = contents,
|
||||
config = generation_config,
|
||||
)
|
||||
for part in response.candidates[0].content.parts:
|
||||
if part.text is not None:
|
||||
return part.text
|
||||
elif part.inline_data is not None:
|
||||
try:
|
||||
image_data = base64.b64decode(part.inline_data.data)
|
||||
image = Image.open(BytesIO((image_data)))
|
||||
content_type = part.inline_data.mime_type
|
||||
return "Image not supported yet."
|
||||
except Exception as e:
|
||||
print(f"Error processing image: {e}")
|
||||
return "Error processing image."
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error generating content: {e}")
|
||||
return f"An error occurred: {str(e)}"
|
||||
return f"{e}"
|
||||
|
||||
def stream_response(self, response):
|
||||
for chunk in response:
|
||||
if chunk.text:
|
||||
yield chunk.text
|
||||
for candidate in chunk.candidates:
|
||||
if candidate.content.parts is not None:
|
||||
for part in candidate.content.parts:
|
||||
if part.text is not None:
|
||||
yield chunk.text
|
||||
elif part.inline_data is not None:
|
||||
try:
|
||||
image_data = base64.b64decode(part.inline_data.data)
|
||||
image = Image.open(BytesIO(image_data))
|
||||
content_type = part.inline_data.mime_type
|
||||
yield "Image not supported yet."
|
||||
except Exception as e:
|
||||
print(f"Error processing image: {e}")
|
||||
yield "Error processing image."
|
||||
|
||||
@@ -51,9 +51,11 @@ class Pipeline:
|
||||
)
|
||||
self.pipelines = [
|
||||
{"id": "gemini-1.5-flash-001", "name": "Gemini 1.5 Flash"},
|
||||
{"id": "gemini-2.0-flash", "name": "Gemini 2.0 Flash"},
|
||||
{"id": "gemini-2.0-flash-lite", "name": "Gemini 2.0 Flash-Lite"},
|
||||
{"id": "gemini-2.0-flash-thinking-exp-01-21", "name": "Gemini 2.0 Flash Thinking"},
|
||||
{"id": "gemini-1.5-pro-001", "name": "Gemini 1.5 Pro"},
|
||||
{"id": "gemini-flash-experimental", "name": "Gemini 1.5 Flash Experimental"},
|
||||
{"id": "gemini-pro-experimental", "name": "Gemini 1.5 Pro Experimental"},
|
||||
{"id": "gemini-2.0-pro-exp-02-05", "name": "Gemini 2.0 Pro"},
|
||||
]
|
||||
|
||||
async def on_startup(self) -> None:
|
||||
|
||||
@@ -30,26 +30,26 @@ class Pipeline:
|
||||
# List of models
|
||||
self.pipelines = [
|
||||
{
|
||||
"id": "llama-3.1-sonar-large-128k-online",
|
||||
"name": "Llama 3.1 Sonar Large 128k Online"
|
||||
"id": "sonar-pro",
|
||||
"name": "Sonar Pro"
|
||||
},
|
||||
{
|
||||
"id": "llama-3.1-sonar-small-128k-online",
|
||||
"name": "Llama 3.1 Sonar Small 128k Online"
|
||||
"id": "sonar",
|
||||
"name": "Sonar"
|
||||
},
|
||||
{
|
||||
"id": "llama-3.1-sonar-large-128k-chat",
|
||||
"name": "Llama 3.1 Sonar Large 128k Chat"
|
||||
"id": "sonar-deep-research",
|
||||
"name": "Sonar Deep Research"
|
||||
},
|
||||
{
|
||||
"id": "llama-3.1-sonar-small-128k-chat",
|
||||
"name": "Llama 3.1 Sonar Small 128k Chat"
|
||||
"id": "sonar-reasoning-pro",
|
||||
"name": "Sonar Reasoning Pro"
|
||||
},
|
||||
{
|
||||
"id": "llama-3.1-8b-instruct", "name": "Llama 3.1 8B Instruct"
|
||||
"id": "sonar-reasoning", "name": "Sonar Reasoning"
|
||||
},
|
||||
{
|
||||
"id": "llama-3.1-70b-instruct", "name": "Llama 3.1 70B Instruct"
|
||||
"id": "r1-1776", "name": "R1-1776"
|
||||
}
|
||||
]
|
||||
pass
|
||||
|
||||
45
examples/pipelines/rag/r2r_pipeline.py
Normal file
45
examples/pipelines/rag/r2r_pipeline.py
Normal file
@@ -0,0 +1,45 @@
|
||||
"""
|
||||
title: R2R Pipeline
|
||||
author: Nolan Tremelling
|
||||
date: 2025-03-21
|
||||
version: 1.0
|
||||
license: MIT
|
||||
description: A pipeline for retrieving relevant information from a knowledge base using R2R.
|
||||
requirements: r2r
|
||||
"""
|
||||
|
||||
from typing import List, Union, Generator, Iterator
|
||||
from schemas import OpenAIChatMessage
|
||||
import os
|
||||
import asyncio
|
||||
|
||||
|
||||
class Pipeline:
|
||||
def __init__(self):
|
||||
self.r2r_client = None
|
||||
|
||||
async def on_startup(self):
|
||||
from r2r import R2RClient
|
||||
|
||||
# Connect to either SciPhi cloud or your self hosted R2R server
|
||||
self.r2r_client = R2RClient(os.getenv("R2R_SERVER_URL", "https://api.sciphi.ai"))
|
||||
self.r2r_client.set_api_key(os.getenv("R2R_API_KEY", ""))
|
||||
|
||||
pass
|
||||
|
||||
async def on_shutdown(self):
|
||||
# This function is called when the server is stopped.
|
||||
self.r2r_client = None
|
||||
|
||||
def pipe(
|
||||
self, user_message: str, model_id: str, messages: List[dict], body: dict
|
||||
) -> Union[str, Generator, Iterator]:
|
||||
|
||||
print(messages)
|
||||
print(user_message)
|
||||
|
||||
response = self.r2r_client.retrieval.rag(
|
||||
query=user_message,
|
||||
)
|
||||
|
||||
return response.results.completion
|
||||
Reference in New Issue
Block a user