Update langfuse_filter_pipeline.py

fixes issues with messages not properly being sent to langfuse.
also adds the system prompt if any and sources if any
This commit is contained in:
ther3zz 2025-02-20 15:13:15 -05:00 committed by GitHub
parent 10662e224c
commit e7efb32728
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1,8 +1,8 @@
""" """
title: Langfuse Filter Pipeline title: Langfuse Filter Pipeline
author: open-webui author: open-webui
date: 2024-09-27 date: 2025-02-20
version: 1.4 version: 1.5
license: MIT license: MIT
description: A filter pipeline that uses Langfuse. description: A filter pipeline that uses Langfuse.
requirements: langfuse requirements: langfuse
@ -11,12 +11,14 @@ requirements: langfuse
from typing import List, Optional from typing import List, Optional
import os import os
import uuid import uuid
import json
from utils.pipelines.main import get_last_assistant_message from utils.pipelines.main import get_last_assistant_message
from pydantic import BaseModel from pydantic import BaseModel
from langfuse import Langfuse from langfuse import Langfuse
from langfuse.api.resources.commons.errors.unauthorized_error import UnauthorizedError from langfuse.api.resources.commons.errors.unauthorized_error import UnauthorizedError
def get_last_assistant_message_obj(messages: List[dict]) -> dict: def get_last_assistant_message_obj(messages: List[dict]) -> dict:
for message in reversed(messages): for message in reversed(messages):
if message["role"] == "assistant": if message["role"] == "assistant":
@ -31,31 +33,48 @@ class Pipeline:
secret_key: str secret_key: str
public_key: str public_key: str
host: str host: str
debug: bool = False
def __init__(self): def __init__(self):
self.type = "filter" self.type = "filter"
self.name = "Langfuse Filter" self.name = "Langfuse Filter"
self.valves = self.Valves( self.valves = self.Valves(
**{ **{
"pipelines": ["*"], "pipelines": ["*"],
"secret_key": os.getenv("LANGFUSE_SECRET_KEY", "your-secret-key-here"), "secret_key": os.getenv("LANGFUSE_SECRET_KEY", "your-secret-key-here"),
"public_key": os.getenv("LANGFUSE_PUBLIC_KEY", "your-public-key-here"), "public_key": os.getenv("LANGFUSE_PUBLIC_KEY", "your-public-key-here"),
"host": os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com"), "host": os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com"),
"debug": os.getenv("DEBUG_MODE", "false").lower() == "true",
} }
) )
self.langfuse = None self.langfuse = None
# Keep track of the trace and the last-created generation for each chat_id
self.chat_traces = {} self.chat_traces = {}
self.chat_generations = {} self.chat_generations = {}
self.suppressed_logs = set()
def log(self, message: str, suppress_repeats: bool = False):
"""Logs messages to the terminal if debugging is enabled."""
if self.valves.debug:
if suppress_repeats:
if message in self.suppressed_logs:
return
self.suppressed_logs.add(message)
print(f"[DEBUG] {message}")
async def on_startup(self): async def on_startup(self):
print(f"on_startup:{__name__}") self.log(f"on_startup triggered for {__name__}")
self.set_langfuse() self.set_langfuse()
async def on_shutdown(self): async def on_shutdown(self):
print(f"on_shutdown:{__name__}") self.log(f"on_shutdown triggered for {__name__}")
self.langfuse.flush() if self.langfuse:
self.langfuse.flush()
async def on_valves_updated(self): async def on_valves_updated(self):
self.log("Valves updated, resetting Langfuse client.")
self.set_langfuse() self.set_langfuse()
def set_langfuse(self): def set_langfuse(self):
@ -64,76 +83,161 @@ class Pipeline:
secret_key=self.valves.secret_key, secret_key=self.valves.secret_key,
public_key=self.valves.public_key, public_key=self.valves.public_key,
host=self.valves.host, host=self.valves.host,
debug=False, debug=self.valves.debug,
) )
self.langfuse.auth_check() self.langfuse.auth_check()
self.log("Langfuse client initialized successfully.")
except UnauthorizedError: except UnauthorizedError:
print( print(
"Langfuse credentials incorrect. Please re-enter your Langfuse credentials in the pipeline settings." "Langfuse credentials incorrect. Please re-enter your Langfuse credentials in the pipeline settings."
) )
except Exception as e: except Exception as e:
print(f"Langfuse error: {e} Please re-enter your Langfuse credentials in the pipeline settings.") print(
f"Langfuse error: {e} Please re-enter your Langfuse credentials in the pipeline settings."
)
async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
print(f"inlet:{__name__}") """
print(f"Received body: {body}") Inlet handles the incoming request (usually a user message).
print(f"User: {user}") - If no trace exists yet for this chat_id, we create a new trace.
- If a trace does exist, we simply create a new generation for the new user message.
"""
if self.valves.debug:
print(f"[DEBUG] Received request: {json.dumps(body, indent=2)}")
# Check for presence of required keys and generate chat_id if missing self.log(f"Inlet function called with body: {body} and user: {user}")
if "chat_id" not in body.get("metadata", {}):
unique_id = f"SYSTEM MESSAGE {uuid.uuid4()}" metadata = body.get("metadata", {})
# Ensure the metadata key exists before assigning chat_id
if "metadata" not in body: # ---------------------------------------------------------
body["metadata"] = {} # Correct this indentation # Prepend the system prompt from metadata to the system message:
body["metadata"]["chat_id"] = unique_id model_info = metadata.get("model", {})
print(f"chat_id was missing, set to: {unique_id}") params_info = model_info.get("params", {})
system_prompt = params_info.get("system", "")
if system_prompt:
for msg in body["messages"]:
if msg.get("role") == "system":
# Only prepend if it hasn't already been prepended:
if not msg["content"].startswith("System Prompt:"):
msg["content"] = f"System Prompt:\n{system_prompt}\n\n{msg['content']}"
break
# ---------------------------------------------------------
# Fix SYSTEM MESSAGE prefix issue: Only apply for "task_generation"
if "chat_id" not in metadata:
if "task_generation" in metadata.get("type", "").lower():
chat_id = f"SYSTEM MESSAGE {uuid.uuid4()}"
self.log(f"Task Generation detected, assigned SYSTEM MESSAGE ID: {chat_id}")
else:
chat_id = str(uuid.uuid4()) # Regular chat messages
self.log(f"Assigned normal chat_id: {chat_id}")
metadata["chat_id"] = chat_id
body["metadata"] = metadata
else:
chat_id = metadata["chat_id"]
required_keys = ["model", "messages"] required_keys = ["model", "messages"]
missing_keys = [key for key in required_keys if key not in body] missing_keys = [key for key in required_keys if key not in body]
if missing_keys: if missing_keys:
error_message = f"Error: Missing keys in the request body: {', '.join(missing_keys)}" error_message = f"Error: Missing keys in the request body: {', '.join(missing_keys)}"
print(error_message) self.log(error_message)
raise ValueError(error_message) raise ValueError(error_message)
user_id = user.get("id") if user else None
user_name = user.get("name") if user else None
user_email = user.get("email") if user else None user_email = user.get("email") if user else None
trace = self.langfuse.trace( # Check if we already have a trace for this chat
name=f"filter:{__name__}", if chat_id not in self.chat_traces:
input=body, # Create a new trace and generation
user_id=user_email, self.log(f"Creating new chat trace for chat_id: {chat_id}")
metadata={"user_name": user_name, "user_id": user_id,"chat_id": body["metadata"]["chat_id"]},
session_id=body["metadata"]["chat_id"],
)
generation = trace.generation( trace_payload = {
name=body["metadata"]["chat_id"], "name": f"filter:{__name__}",
model=body["model"], "input": body,
input=body["messages"], "user_id": user_email,
metadata={"interface": "open-webui"}, "metadata": {"chat_id": chat_id},
) "session_id": chat_id,
}
self.chat_traces[body["metadata"]["chat_id"]] = trace if self.valves.debug:
self.chat_generations[body["metadata"]["chat_id"]] = generation print(f"[DEBUG] Langfuse trace request: {json.dumps(trace_payload, indent=2)}")
trace = self.langfuse.trace(**trace_payload)
generation_payload = {
"name": chat_id,
"model": body["model"],
"input": body["messages"],
"metadata": {"interface": "open-webui"},
}
if self.valves.debug:
print(f"[DEBUG] Langfuse generation request: {json.dumps(generation_payload, indent=2)}")
generation = trace.generation(**generation_payload)
self.chat_traces[chat_id] = trace
self.chat_generations[chat_id] = generation
self.log(f"Trace and generation objects successfully created for chat_id: {chat_id}")
else:
# Re-use existing trace but create a new generation for each new message
self.log(f"Re-using existing chat trace for chat_id: {chat_id}")
trace = self.chat_traces[chat_id]
new_generation_payload = {
"name": f"{chat_id}:{str(uuid.uuid4())}",
"model": body["model"],
"input": body["messages"],
"metadata": {"interface": "open-webui"},
}
if self.valves.debug:
print(f"[DEBUG] Langfuse new_generation request: {json.dumps(new_generation_payload, indent=2)}")
new_generation = trace.generation(**new_generation_payload)
self.chat_generations[chat_id] = new_generation
return body return body
async def outlet(self, body: dict, user: Optional[dict] = None) -> dict: async def outlet(self, body: dict, user: Optional[dict] = None) -> dict:
print(f"outlet:{__name__}") """
print(f"Received body: {body}") Outlet handles the response body (usually the assistant message).
if body["chat_id"] not in self.chat_generations or body["chat_id"] not in self.chat_traces: It will finalize/end the generation created for the user request.
return body """
self.log(f"Outlet function called with body: {body}")
trace = self.chat_traces[body["chat_id"]] chat_id = body.get("chat_id")
generation = self.chat_generations[body["chat_id"]]
# If no trace or generation exist, attempt to register again
if chat_id not in self.chat_traces or chat_id not in self.chat_generations:
self.log(f"[WARNING] No matching chat trace found for chat_id: {chat_id}, attempting to re-register.")
return await self.inlet(body, user)
trace = self.chat_traces[chat_id]
generation = self.chat_generations[chat_id]
# Get the last assistant message from the conversation
assistant_message = get_last_assistant_message(body["messages"]) assistant_message = get_last_assistant_message(body["messages"])
# Extract usage information for models that support it
usage = None
assistant_message_obj = get_last_assistant_message_obj(body["messages"]) assistant_message_obj = get_last_assistant_message_obj(body["messages"])
# ---------------------------------------------------------
# If the outlet contains a sources array, append it after the "System Prompt:"
# section in the system message:
if assistant_message_obj and "sources" in assistant_message_obj and assistant_message_obj["sources"]:
for msg in body["messages"]:
if msg.get("role") == "system":
if msg["content"].startswith("System Prompt:"):
# Format the sources nicely
sources_str = "\n\n".join(
json.dumps(src, indent=2) for src in assistant_message_obj["sources"]
)
msg["content"] += f"\n\nSources:\n{sources_str}"
break
# ---------------------------------------------------------
# Extract usage if available
usage = None
if assistant_message_obj: if assistant_message_obj:
info = assistant_message_obj.get("info", {}) info = assistant_message_obj.get("info", {})
if isinstance(info, dict): if isinstance(info, dict):
@ -145,19 +249,22 @@ class Pipeline:
"output": output_tokens, "output": output_tokens,
"unit": "TOKENS", "unit": "TOKENS",
} }
self.log(f"Usage data extracted: {usage}")
# Update generation # Optionally update the trace with the final assistant output
trace.update( trace.update(output=assistant_message)
output=assistant_message,
)
generation.end(
output=assistant_message,
metadata={"interface": "open-webui"},
usage=usage,
)
# Clean up the chat_generations dictionary # End the generation with the final assistant message and updated conversation
del self.chat_traces[body["chat_id"]] generation_payload = {
del self.chat_generations[body["chat_id"]] "input": body["messages"], # include the entire conversation
"metadata": {"interface": "open-webui"},
"usage": usage,
}
if self.valves.debug:
print(f"[DEBUG] Langfuse generation end request: {json.dumps(generation_payload, indent=2)}")
generation.end(**generation_payload)
self.log(f"Generation ended for chat_id: {chat_id}")
return body return body