mirror of
https://github.com/open-webui/pipelines
synced 2025-06-04 03:47:04 +00:00
Merge pull request #438 from ther3zz/patch-3
langfuse pipeline: chatID + clearer observation names + usage + trace tagging
This commit is contained in:
commit
5ccdecc4e3
@ -1,8 +1,8 @@
|
|||||||
"""
|
"""
|
||||||
title: Langfuse Filter Pipeline
|
title: Langfuse Filter Pipeline
|
||||||
author: open-webui
|
author: open-webui
|
||||||
date: 2025-02-20
|
date: 2025-03-28
|
||||||
version: 1.5
|
version: 1.7
|
||||||
license: MIT
|
license: MIT
|
||||||
description: A filter pipeline that uses Langfuse.
|
description: A filter pipeline that uses Langfuse.
|
||||||
requirements: langfuse
|
requirements: langfuse
|
||||||
@ -20,6 +20,7 @@ from langfuse.api.resources.commons.errors.unauthorized_error import Unauthorize
|
|||||||
|
|
||||||
|
|
||||||
def get_last_assistant_message_obj(messages: List[dict]) -> dict:
|
def get_last_assistant_message_obj(messages: List[dict]) -> dict:
|
||||||
|
"""Retrieve the last assistant message from the message list."""
|
||||||
for message in reversed(messages):
|
for message in reversed(messages):
|
||||||
if message["role"] == "assistant":
|
if message["role"] == "assistant":
|
||||||
return message
|
return message
|
||||||
@ -33,6 +34,10 @@ class Pipeline:
|
|||||||
secret_key: str
|
secret_key: str
|
||||||
public_key: str
|
public_key: str
|
||||||
host: str
|
host: str
|
||||||
|
# New valve that controls whether task names are added as tags:
|
||||||
|
insert_tags: bool = True
|
||||||
|
# New valve that controls whether to use model name instead of model ID for generation
|
||||||
|
use_model_name_instead_of_id_for_generation: bool = False
|
||||||
debug: bool = False
|
debug: bool = False
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
@ -45,18 +50,21 @@ class Pipeline:
|
|||||||
"secret_key": os.getenv("LANGFUSE_SECRET_KEY", "your-secret-key-here"),
|
"secret_key": os.getenv("LANGFUSE_SECRET_KEY", "your-secret-key-here"),
|
||||||
"public_key": os.getenv("LANGFUSE_PUBLIC_KEY", "your-public-key-here"),
|
"public_key": os.getenv("LANGFUSE_PUBLIC_KEY", "your-public-key-here"),
|
||||||
"host": os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com"),
|
"host": os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com"),
|
||||||
|
"use_model_name_instead_of_id_for_generation": os.getenv("USE_MODEL_NAME", "false").lower() == "true",
|
||||||
"debug": os.getenv("DEBUG_MODE", "false").lower() == "true",
|
"debug": os.getenv("DEBUG_MODE", "false").lower() == "true",
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
self.langfuse = None
|
self.langfuse = None
|
||||||
# Keep track of the trace and the last-created generation for each chat_id
|
|
||||||
self.chat_traces = {}
|
self.chat_traces = {}
|
||||||
self.chat_generations = {}
|
|
||||||
self.suppressed_logs = set()
|
self.suppressed_logs = set()
|
||||||
|
# Dictionary to store model names for each chat
|
||||||
|
self.model_names = {}
|
||||||
|
|
||||||
|
# Only these tasks will be treated as LLM "generations":
|
||||||
|
self.GENERATION_TASKS = {"llm_response"}
|
||||||
|
|
||||||
def log(self, message: str, suppress_repeats: bool = False):
|
def log(self, message: str, suppress_repeats: bool = False):
|
||||||
"""Logs messages to the terminal if debugging is enabled."""
|
|
||||||
if self.valves.debug:
|
if self.valves.debug:
|
||||||
if suppress_repeats:
|
if suppress_repeats:
|
||||||
if message in self.suppressed_logs:
|
if message in self.suppressed_logs:
|
||||||
@ -96,47 +104,44 @@ class Pipeline:
|
|||||||
f"Langfuse error: {e} Please re-enter your Langfuse credentials in the pipeline settings."
|
f"Langfuse error: {e} Please re-enter your Langfuse credentials in the pipeline settings."
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def _build_tags(self, task_name: str) -> list:
|
||||||
|
"""
|
||||||
|
Builds a list of tags based on valve settings, ensuring we always add
|
||||||
|
'open-webui' and skip user_response / llm_response from becoming tags themselves.
|
||||||
|
"""
|
||||||
|
tags_list = []
|
||||||
|
if self.valves.insert_tags:
|
||||||
|
# Always add 'open-webui'
|
||||||
|
tags_list.append("open-webui")
|
||||||
|
# Add the task_name if it's not one of the excluded defaults
|
||||||
|
if task_name not in ["user_response", "llm_response"]:
|
||||||
|
tags_list.append(task_name)
|
||||||
|
return tags_list
|
||||||
|
|
||||||
async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
|
async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
|
||||||
"""
|
|
||||||
Inlet handles the incoming request (usually a user message).
|
|
||||||
- If no trace exists yet for this chat_id, we create a new trace.
|
|
||||||
- If a trace does exist, we simply create a new generation for the new user message.
|
|
||||||
"""
|
|
||||||
if self.valves.debug:
|
if self.valves.debug:
|
||||||
print(f"[DEBUG] Received request: {json.dumps(body, indent=2)}")
|
print(f"[DEBUG] Received request: {json.dumps(body, indent=2)}")
|
||||||
|
|
||||||
self.log(f"Inlet function called with body: {body} and user: {user}")
|
self.log(f"Inlet function called with body: {body} and user: {user}")
|
||||||
|
|
||||||
metadata = body.get("metadata", {})
|
metadata = body.get("metadata", {})
|
||||||
|
chat_id = metadata.get("chat_id", str(uuid.uuid4()))
|
||||||
|
metadata["chat_id"] = chat_id
|
||||||
|
body["metadata"] = metadata
|
||||||
|
|
||||||
# ---------------------------------------------------------
|
# Extract and store both model name and ID if available
|
||||||
# Prepend the system prompt from metadata to the system message:
|
|
||||||
model_info = metadata.get("model", {})
|
model_info = metadata.get("model", {})
|
||||||
params_info = model_info.get("params", {})
|
model_id = body.get("model")
|
||||||
system_prompt = params_info.get("system", "")
|
|
||||||
|
# Store model information for this chat
|
||||||
if system_prompt:
|
if chat_id not in self.model_names:
|
||||||
for msg in body["messages"]:
|
self.model_names[chat_id] = {"id": model_id}
|
||||||
if msg.get("role") == "system":
|
|
||||||
# Only prepend if it hasn't already been prepended:
|
|
||||||
if not msg["content"].startswith("System Prompt:"):
|
|
||||||
msg["content"] = f"System Prompt:\n{system_prompt}\n\n{msg['content']}"
|
|
||||||
break
|
|
||||||
# ---------------------------------------------------------
|
|
||||||
|
|
||||||
# Fix SYSTEM MESSAGE prefix issue: Only apply for "task_generation"
|
|
||||||
if "chat_id" not in metadata:
|
|
||||||
if "task_generation" in metadata.get("type", "").lower():
|
|
||||||
chat_id = f"SYSTEM MESSAGE {uuid.uuid4()}"
|
|
||||||
self.log(f"Task Generation detected, assigned SYSTEM MESSAGE ID: {chat_id}")
|
|
||||||
else:
|
|
||||||
chat_id = str(uuid.uuid4()) # Regular chat messages
|
|
||||||
self.log(f"Assigned normal chat_id: {chat_id}")
|
|
||||||
|
|
||||||
metadata["chat_id"] = chat_id
|
|
||||||
body["metadata"] = metadata
|
|
||||||
else:
|
else:
|
||||||
chat_id = metadata["chat_id"]
|
self.model_names[chat_id]["id"] = model_id
|
||||||
|
|
||||||
|
if isinstance(model_info, dict) and "name" in model_info:
|
||||||
|
self.model_names[chat_id]["name"] = model_info["name"]
|
||||||
|
self.log(f"Stored model info - name: '{model_info['name']}', id: '{model_id}' for chat_id: {chat_id}")
|
||||||
|
|
||||||
required_keys = ["model", "messages"]
|
required_keys = ["model", "messages"]
|
||||||
missing_keys = [key for key in required_keys if key not in body]
|
missing_keys = [key for key in required_keys if key not in body]
|
||||||
@ -146,100 +151,108 @@ class Pipeline:
|
|||||||
raise ValueError(error_message)
|
raise ValueError(error_message)
|
||||||
|
|
||||||
user_email = user.get("email") if user else None
|
user_email = user.get("email") if user else None
|
||||||
|
# Defaulting to 'user_response' if no task is provided
|
||||||
|
task_name = metadata.get("task", "user_response")
|
||||||
|
|
||||||
|
# Build tags
|
||||||
|
tags_list = self._build_tags(task_name)
|
||||||
|
|
||||||
# Check if we already have a trace for this chat
|
|
||||||
if chat_id not in self.chat_traces:
|
if chat_id not in self.chat_traces:
|
||||||
# Create a new trace and generation
|
self.log(f"Creating new trace for chat_id: {chat_id}")
|
||||||
self.log(f"Creating new chat trace for chat_id: {chat_id}")
|
|
||||||
|
|
||||||
trace_payload = {
|
trace_payload = {
|
||||||
"name": f"filter:{__name__}",
|
"name": f"chat:{chat_id}",
|
||||||
"input": body,
|
"input": body,
|
||||||
"user_id": user_email,
|
"user_id": user_email,
|
||||||
"metadata": {"chat_id": chat_id},
|
"metadata": metadata,
|
||||||
"session_id": chat_id,
|
"session_id": chat_id,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if tags_list:
|
||||||
|
trace_payload["tags"] = tags_list
|
||||||
|
|
||||||
if self.valves.debug:
|
if self.valves.debug:
|
||||||
print(f"[DEBUG] Langfuse trace request: {json.dumps(trace_payload, indent=2)}")
|
print(f"[DEBUG] Langfuse trace request: {json.dumps(trace_payload, indent=2)}")
|
||||||
|
|
||||||
trace = self.langfuse.trace(**trace_payload)
|
trace = self.langfuse.trace(**trace_payload)
|
||||||
|
self.chat_traces[chat_id] = trace
|
||||||
|
else:
|
||||||
|
trace = self.chat_traces[chat_id]
|
||||||
|
self.log(f"Reusing existing trace for chat_id: {chat_id}")
|
||||||
|
if tags_list:
|
||||||
|
trace.update(tags=tags_list)
|
||||||
|
|
||||||
|
# Update metadata with type
|
||||||
|
metadata["type"] = task_name
|
||||||
|
metadata["interface"] = "open-webui"
|
||||||
|
|
||||||
|
# If it's a task that is considered an LLM generation
|
||||||
|
if task_name in self.GENERATION_TASKS:
|
||||||
|
# Determine which model value to use based on the use_model_name valve
|
||||||
|
model_id = self.model_names.get(chat_id, {}).get("id", body["model"])
|
||||||
|
model_name = self.model_names.get(chat_id, {}).get("name", "unknown")
|
||||||
|
|
||||||
|
# Pick primary model identifier based on valve setting
|
||||||
|
model_value = model_name if self.valves.use_model_name_instead_of_id_for_generation else model_id
|
||||||
|
|
||||||
|
# Add both values to metadata regardless of valve setting
|
||||||
|
metadata["model_id"] = model_id
|
||||||
|
metadata["model_name"] = model_name
|
||||||
|
|
||||||
generation_payload = {
|
generation_payload = {
|
||||||
"name": chat_id,
|
"name": f"{task_name}:{str(uuid.uuid4())}",
|
||||||
"model": body["model"],
|
"model": model_value,
|
||||||
"input": body["messages"],
|
"input": body["messages"],
|
||||||
"metadata": {"interface": "open-webui"},
|
"metadata": metadata,
|
||||||
}
|
}
|
||||||
|
if tags_list:
|
||||||
|
generation_payload["tags"] = tags_list
|
||||||
|
|
||||||
if self.valves.debug:
|
if self.valves.debug:
|
||||||
print(f"[DEBUG] Langfuse generation request: {json.dumps(generation_payload, indent=2)}")
|
print(f"[DEBUG] Langfuse generation request: {json.dumps(generation_payload, indent=2)}")
|
||||||
|
|
||||||
generation = trace.generation(**generation_payload)
|
trace.generation(**generation_payload)
|
||||||
|
|
||||||
self.chat_traces[chat_id] = trace
|
|
||||||
self.chat_generations[chat_id] = generation
|
|
||||||
self.log(f"Trace and generation objects successfully created for chat_id: {chat_id}")
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# Re-use existing trace but create a new generation for each new message
|
# Otherwise, log it as an event
|
||||||
self.log(f"Re-using existing chat trace for chat_id: {chat_id}")
|
event_payload = {
|
||||||
trace = self.chat_traces[chat_id]
|
"name": f"{task_name}:{str(uuid.uuid4())}",
|
||||||
|
"metadata": metadata,
|
||||||
new_generation_payload = {
|
|
||||||
"name": f"{chat_id}:{str(uuid.uuid4())}",
|
|
||||||
"model": body["model"],
|
|
||||||
"input": body["messages"],
|
"input": body["messages"],
|
||||||
"metadata": {"interface": "open-webui"},
|
|
||||||
}
|
}
|
||||||
if self.valves.debug:
|
if tags_list:
|
||||||
print(f"[DEBUG] Langfuse new_generation request: {json.dumps(new_generation_payload, indent=2)}")
|
event_payload["tags"] = tags_list
|
||||||
|
|
||||||
new_generation = trace.generation(**new_generation_payload)
|
if self.valves.debug:
|
||||||
self.chat_generations[chat_id] = new_generation
|
print(f"[DEBUG] Langfuse event request: {json.dumps(event_payload, indent=2)}")
|
||||||
|
|
||||||
|
trace.event(**event_payload)
|
||||||
|
|
||||||
return body
|
return body
|
||||||
|
|
||||||
async def outlet(self, body: dict, user: Optional[dict] = None) -> dict:
|
async def outlet(self, body: dict, user: Optional[dict] = None) -> dict:
|
||||||
"""
|
|
||||||
Outlet handles the response body (usually the assistant message).
|
|
||||||
It will finalize/end the generation created for the user request.
|
|
||||||
"""
|
|
||||||
self.log(f"Outlet function called with body: {body}")
|
self.log(f"Outlet function called with body: {body}")
|
||||||
|
|
||||||
chat_id = body.get("chat_id")
|
chat_id = body.get("chat_id")
|
||||||
|
metadata = body.get("metadata", {})
|
||||||
|
# Defaulting to 'llm_response' if no task is provided
|
||||||
|
task_name = metadata.get("task", "llm_response")
|
||||||
|
|
||||||
# If no trace or generation exist, attempt to register again
|
# Build tags
|
||||||
if chat_id not in self.chat_traces or chat_id not in self.chat_generations:
|
tags_list = self._build_tags(task_name)
|
||||||
self.log(f"[WARNING] No matching chat trace found for chat_id: {chat_id}, attempting to re-register.")
|
|
||||||
|
if chat_id not in self.chat_traces:
|
||||||
|
self.log(f"[WARNING] No matching trace found for chat_id: {chat_id}, attempting to re-register.")
|
||||||
|
# Re-run inlet to register if somehow missing
|
||||||
return await self.inlet(body, user)
|
return await self.inlet(body, user)
|
||||||
|
|
||||||
trace = self.chat_traces[chat_id]
|
trace = self.chat_traces[chat_id]
|
||||||
generation = self.chat_generations[chat_id]
|
|
||||||
|
|
||||||
# Get the last assistant message from the conversation
|
|
||||||
assistant_message = get_last_assistant_message(body["messages"])
|
assistant_message = get_last_assistant_message(body["messages"])
|
||||||
assistant_message_obj = get_last_assistant_message_obj(body["messages"])
|
assistant_message_obj = get_last_assistant_message_obj(body["messages"])
|
||||||
|
|
||||||
# ---------------------------------------------------------
|
|
||||||
# If the outlet contains a sources array, append it after the "System Prompt:"
|
|
||||||
# section in the system message:
|
|
||||||
if assistant_message_obj and "sources" in assistant_message_obj and assistant_message_obj["sources"]:
|
|
||||||
for msg in body["messages"]:
|
|
||||||
if msg.get("role") == "system":
|
|
||||||
if msg["content"].startswith("System Prompt:"):
|
|
||||||
# Format the sources nicely
|
|
||||||
sources_str = "\n\n".join(
|
|
||||||
json.dumps(src, indent=2) for src in assistant_message_obj["sources"]
|
|
||||||
)
|
|
||||||
msg["content"] += f"\n\nSources:\n{sources_str}"
|
|
||||||
break
|
|
||||||
# ---------------------------------------------------------
|
|
||||||
|
|
||||||
# Extract usage if available
|
|
||||||
usage = None
|
usage = None
|
||||||
if assistant_message_obj:
|
if assistant_message_obj:
|
||||||
info = assistant_message_obj.get("info", {})
|
info = assistant_message_obj.get("usage", {})
|
||||||
if isinstance(info, dict):
|
if isinstance(info, dict):
|
||||||
input_tokens = info.get("prompt_eval_count") or info.get("prompt_tokens")
|
input_tokens = info.get("prompt_eval_count") or info.get("prompt_tokens")
|
||||||
output_tokens = info.get("eval_count") or info.get("completion_tokens")
|
output_tokens = info.get("eval_count") or info.get("completion_tokens")
|
||||||
@ -251,20 +264,58 @@ class Pipeline:
|
|||||||
}
|
}
|
||||||
self.log(f"Usage data extracted: {usage}")
|
self.log(f"Usage data extracted: {usage}")
|
||||||
|
|
||||||
# Optionally update the trace with the final assistant output
|
# Update the trace output with the last assistant message
|
||||||
trace.update(output=assistant_message)
|
trace.update(output=assistant_message)
|
||||||
|
|
||||||
# End the generation with the final assistant message and updated conversation
|
metadata["type"] = task_name
|
||||||
generation_payload = {
|
metadata["interface"] = "open-webui"
|
||||||
"input": body["messages"], # include the entire conversation
|
|
||||||
"metadata": {"interface": "open-webui"},
|
|
||||||
"usage": usage,
|
|
||||||
}
|
|
||||||
|
|
||||||
if self.valves.debug:
|
if task_name in self.GENERATION_TASKS:
|
||||||
print(f"[DEBUG] Langfuse generation end request: {json.dumps(generation_payload, indent=2)}")
|
# Determine which model value to use based on the use_model_name valve
|
||||||
|
model_id = self.model_names.get(chat_id, {}).get("id", body.get("model"))
|
||||||
|
model_name = self.model_names.get(chat_id, {}).get("name", "unknown")
|
||||||
|
|
||||||
|
# Pick primary model identifier based on valve setting
|
||||||
|
model_value = model_name if self.valves.use_model_name_instead_of_id_for_generation else model_id
|
||||||
|
|
||||||
|
# Add both values to metadata regardless of valve setting
|
||||||
|
metadata["model_id"] = model_id
|
||||||
|
metadata["model_name"] = model_name
|
||||||
|
|
||||||
|
# If it's an LLM generation
|
||||||
|
generation_payload = {
|
||||||
|
"name": f"{task_name}:{str(uuid.uuid4())}",
|
||||||
|
"model": model_value, # <-- Use model name or ID based on valve setting
|
||||||
|
"input": body["messages"],
|
||||||
|
"metadata": metadata,
|
||||||
|
"usage": usage,
|
||||||
|
}
|
||||||
|
if tags_list:
|
||||||
|
generation_payload["tags"] = tags_list
|
||||||
|
|
||||||
generation.end(**generation_payload)
|
if self.valves.debug:
|
||||||
self.log(f"Generation ended for chat_id: {chat_id}")
|
print(f"[DEBUG] Langfuse generation end request: {json.dumps(generation_payload, indent=2)}")
|
||||||
|
|
||||||
|
trace.generation().end(**generation_payload)
|
||||||
|
self.log(f"Generation ended for chat_id: {chat_id}")
|
||||||
|
else:
|
||||||
|
# Otherwise log as an event
|
||||||
|
event_payload = {
|
||||||
|
"name": f"{task_name}:{str(uuid.uuid4())}",
|
||||||
|
"metadata": metadata,
|
||||||
|
"input": body["messages"],
|
||||||
|
}
|
||||||
|
if usage:
|
||||||
|
# If you want usage on event as well
|
||||||
|
event_payload["metadata"]["usage"] = usage
|
||||||
|
|
||||||
|
if tags_list:
|
||||||
|
event_payload["tags"] = tags_list
|
||||||
|
|
||||||
|
if self.valves.debug:
|
||||||
|
print(f"[DEBUG] Langfuse event end request: {json.dumps(event_payload, indent=2)}")
|
||||||
|
|
||||||
|
trace.event(**event_payload)
|
||||||
|
self.log(f"Event logged for chat_id: {chat_id}")
|
||||||
|
|
||||||
return body
|
return body
|
||||||
|
Loading…
Reference in New Issue
Block a user