mirror of
https://github.com/open-webui/pipelines
synced 2025-05-11 16:10:45 +00:00
Update langfuse_filter_pipeline.py
reworked to make observations easier to understand fixed an issue where title generation were being overwritten by messages
This commit is contained in:
parent
dc18bdd5e2
commit
4011977ae1
@ -1,8 +1,8 @@
|
||||
"""
|
||||
title: Langfuse Filter Pipeline
|
||||
author: open-webui
|
||||
date: 2025-02-20
|
||||
version: 1.5
|
||||
date: 2024-09-27
|
||||
version: 1.6
|
||||
license: MIT
|
||||
description: A filter pipeline that uses Langfuse.
|
||||
requirements: langfuse
|
||||
@ -20,6 +20,7 @@ from langfuse.api.resources.commons.errors.unauthorized_error import Unauthorize
|
||||
|
||||
|
||||
def get_last_assistant_message_obj(messages: List[dict]) -> dict:
|
||||
"""Retrieve the last assistant message from the message list."""
|
||||
for message in reversed(messages):
|
||||
if message["role"] == "assistant":
|
||||
return message
|
||||
@ -50,13 +51,10 @@ class Pipeline:
|
||||
)
|
||||
|
||||
self.langfuse = None
|
||||
# Keep track of the trace and the last-created generation for each chat_id
|
||||
self.chat_traces = {}
|
||||
self.chat_generations = {}
|
||||
self.suppressed_logs = set()
|
||||
|
||||
def log(self, message: str, suppress_repeats: bool = False):
|
||||
"""Logs messages to the terminal if debugging is enabled."""
|
||||
if self.valves.debug:
|
||||
if suppress_repeats:
|
||||
if message in self.suppressed_logs:
|
||||
@ -97,46 +95,15 @@ class Pipeline:
|
||||
)
|
||||
|
||||
async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
|
||||
"""
|
||||
Inlet handles the incoming request (usually a user message).
|
||||
- If no trace exists yet for this chat_id, we create a new trace.
|
||||
- If a trace does exist, we simply create a new generation for the new user message.
|
||||
"""
|
||||
if self.valves.debug:
|
||||
print(f"[DEBUG] Received request: {json.dumps(body, indent=2)}")
|
||||
|
||||
self.log(f"Inlet function called with body: {body} and user: {user}")
|
||||
|
||||
metadata = body.get("metadata", {})
|
||||
|
||||
# ---------------------------------------------------------
|
||||
# Prepend the system prompt from metadata to the system message:
|
||||
model_info = metadata.get("model", {})
|
||||
params_info = model_info.get("params", {})
|
||||
system_prompt = params_info.get("system", "")
|
||||
|
||||
if system_prompt:
|
||||
for msg in body["messages"]:
|
||||
if msg.get("role") == "system":
|
||||
# Only prepend if it hasn't already been prepended:
|
||||
if not msg["content"].startswith("System Prompt:"):
|
||||
msg["content"] = f"System Prompt:\n{system_prompt}\n\n{msg['content']}"
|
||||
break
|
||||
# ---------------------------------------------------------
|
||||
|
||||
# Fix SYSTEM MESSAGE prefix issue: Only apply for "task_generation"
|
||||
if "chat_id" not in metadata:
|
||||
if "task_generation" in metadata.get("type", "").lower():
|
||||
chat_id = f"SYSTEM MESSAGE {uuid.uuid4()}"
|
||||
self.log(f"Task Generation detected, assigned SYSTEM MESSAGE ID: {chat_id}")
|
||||
else:
|
||||
chat_id = str(uuid.uuid4()) # Regular chat messages
|
||||
self.log(f"Assigned normal chat_id: {chat_id}")
|
||||
|
||||
metadata["chat_id"] = chat_id
|
||||
body["metadata"] = metadata
|
||||
else:
|
||||
chat_id = metadata["chat_id"]
|
||||
chat_id = metadata.get("chat_id", str(uuid.uuid4()))
|
||||
metadata["chat_id"] = chat_id
|
||||
body["metadata"] = metadata
|
||||
|
||||
required_keys = ["model", "messages"]
|
||||
missing_keys = [key for key in required_keys if key not in body]
|
||||
@ -146,17 +113,31 @@ class Pipeline:
|
||||
raise ValueError(error_message)
|
||||
|
||||
user_email = user.get("email") if user else None
|
||||
task_name = metadata.get("task", "user_response") # Default to user_response if task is missing
|
||||
|
||||
# Check if we already have a trace for this chat
|
||||
# **Extract system message from metadata and prepend**
|
||||
system_message = ""
|
||||
if "model" in metadata and "params" in metadata["model"]:
|
||||
system_message = metadata["model"]["params"].get("system", "")
|
||||
|
||||
for message in body["messages"]:
|
||||
if message["role"] == "system":
|
||||
message["content"] = system_message + "\n\n" + message["content"]
|
||||
break
|
||||
else:
|
||||
# If no system message was found, add one
|
||||
if system_message:
|
||||
body["messages"].insert(0, {"role": "system", "content": system_message})
|
||||
|
||||
# Ensure unique tracking per task
|
||||
if chat_id not in self.chat_traces:
|
||||
# Create a new trace and generation
|
||||
self.log(f"Creating new chat trace for chat_id: {chat_id}")
|
||||
self.log(f"Creating new trace for chat_id: {chat_id}")
|
||||
|
||||
trace_payload = {
|
||||
"name": f"filter:{__name__}",
|
||||
"name": f"chat:{chat_id}",
|
||||
"input": body,
|
||||
"user_id": user_email,
|
||||
"metadata": {"chat_id": chat_id},
|
||||
"metadata": metadata, # Preserve all metadata
|
||||
"session_id": chat_id,
|
||||
}
|
||||
|
||||
@ -164,80 +145,46 @@ class Pipeline:
|
||||
print(f"[DEBUG] Langfuse trace request: {json.dumps(trace_payload, indent=2)}")
|
||||
|
||||
trace = self.langfuse.trace(**trace_payload)
|
||||
|
||||
generation_payload = {
|
||||
"name": chat_id,
|
||||
"model": body["model"],
|
||||
"input": body["messages"],
|
||||
"metadata": {"interface": "open-webui"},
|
||||
}
|
||||
|
||||
if self.valves.debug:
|
||||
print(f"[DEBUG] Langfuse generation request: {json.dumps(generation_payload, indent=2)}")
|
||||
|
||||
generation = trace.generation(**generation_payload)
|
||||
|
||||
self.chat_traces[chat_id] = trace
|
||||
self.chat_generations[chat_id] = generation
|
||||
self.log(f"Trace and generation objects successfully created for chat_id: {chat_id}")
|
||||
|
||||
else:
|
||||
# Re-use existing trace but create a new generation for each new message
|
||||
self.log(f"Re-using existing chat trace for chat_id: {chat_id}")
|
||||
trace = self.chat_traces[chat_id]
|
||||
self.log(f"Reusing existing trace for chat_id: {chat_id}")
|
||||
|
||||
new_generation_payload = {
|
||||
"name": f"{chat_id}:{str(uuid.uuid4())}",
|
||||
"model": body["model"],
|
||||
"input": body["messages"],
|
||||
"metadata": {"interface": "open-webui"},
|
||||
}
|
||||
if self.valves.debug:
|
||||
print(f"[DEBUG] Langfuse new_generation request: {json.dumps(new_generation_payload, indent=2)}")
|
||||
# Ensure all metadata fields are passed through
|
||||
metadata["type"] = task_name
|
||||
metadata["interface"] = "open-webui"
|
||||
|
||||
new_generation = trace.generation(**new_generation_payload)
|
||||
self.chat_generations[chat_id] = new_generation
|
||||
generation_payload = {
|
||||
"name": f"{task_name}:{str(uuid.uuid4())}",
|
||||
"model": body["model"],
|
||||
"input": body["messages"],
|
||||
"metadata": metadata, # Preserve all metadata
|
||||
}
|
||||
|
||||
if self.valves.debug:
|
||||
print(f"[DEBUG] Langfuse generation request: {json.dumps(generation_payload, indent=2)}")
|
||||
|
||||
trace.generation(**generation_payload)
|
||||
|
||||
return body
|
||||
|
||||
async def outlet(self, body: dict, user: Optional[dict] = None) -> dict:
|
||||
"""
|
||||
Outlet handles the response body (usually the assistant message).
|
||||
It will finalize/end the generation created for the user request.
|
||||
"""
|
||||
self.log(f"Outlet function called with body: {body}")
|
||||
|
||||
chat_id = body.get("chat_id")
|
||||
metadata = body.get("metadata", {})
|
||||
task_name = metadata.get("task", "llm_response") # Default to llm_response if missing
|
||||
|
||||
# If no trace or generation exist, attempt to register again
|
||||
if chat_id not in self.chat_traces or chat_id not in self.chat_generations:
|
||||
self.log(f"[WARNING] No matching chat trace found for chat_id: {chat_id}, attempting to re-register.")
|
||||
if chat_id not in self.chat_traces:
|
||||
self.log(f"[WARNING] No matching trace found for chat_id: {chat_id}, attempting to re-register.")
|
||||
return await self.inlet(body, user)
|
||||
|
||||
trace = self.chat_traces[chat_id]
|
||||
generation = self.chat_generations[chat_id]
|
||||
|
||||
# Get the last assistant message from the conversation
|
||||
assistant_message = get_last_assistant_message(body["messages"])
|
||||
assistant_message_obj = get_last_assistant_message_obj(body["messages"])
|
||||
|
||||
# ---------------------------------------------------------
|
||||
# If the outlet contains a sources array, append it after the "System Prompt:"
|
||||
# section in the system message:
|
||||
if assistant_message_obj and "sources" in assistant_message_obj and assistant_message_obj["sources"]:
|
||||
for msg in body["messages"]:
|
||||
if msg.get("role") == "system":
|
||||
if msg["content"].startswith("System Prompt:"):
|
||||
# Format the sources nicely
|
||||
sources_str = "\n\n".join(
|
||||
json.dumps(src, indent=2) for src in assistant_message_obj["sources"]
|
||||
)
|
||||
msg["content"] += f"\n\nSources:\n{sources_str}"
|
||||
break
|
||||
# ---------------------------------------------------------
|
||||
|
||||
# Extract usage if available
|
||||
usage = None
|
||||
assistant_message_obj = get_last_assistant_message_obj(body["messages"])
|
||||
if assistant_message_obj:
|
||||
info = assistant_message_obj.get("info", {})
|
||||
if isinstance(info, dict):
|
||||
@ -251,20 +198,22 @@ class Pipeline:
|
||||
}
|
||||
self.log(f"Usage data extracted: {usage}")
|
||||
|
||||
# Optionally update the trace with the final assistant output
|
||||
trace.update(output=assistant_message)
|
||||
|
||||
# End the generation with the final assistant message and updated conversation
|
||||
metadata["type"] = task_name
|
||||
metadata["interface"] = "open-webui"
|
||||
|
||||
generation_payload = {
|
||||
"input": body["messages"], # include the entire conversation
|
||||
"metadata": {"interface": "open-webui"},
|
||||
"name": f"{task_name}:{str(uuid.uuid4())}",
|
||||
"input": body["messages"],
|
||||
"metadata": metadata, # Preserve all metadata
|
||||
"usage": usage,
|
||||
}
|
||||
|
||||
if self.valves.debug:
|
||||
print(f"[DEBUG] Langfuse generation end request: {json.dumps(generation_payload, indent=2)}")
|
||||
|
||||
generation.end(**generation_payload)
|
||||
trace.generation().end(**generation_payload)
|
||||
self.log(f"Generation ended for chat_id: {chat_id}")
|
||||
|
||||
return body
|
||||
|
Loading…
Reference in New Issue
Block a user