diff --git a/examples/filters/langfuse_filter_pipeline.py b/examples/filters/langfuse_filter_pipeline.py index 8c05f46..cd2c0ab 100644 --- a/examples/filters/langfuse_filter_pipeline.py +++ b/examples/filters/langfuse_filter_pipeline.py @@ -1,8 +1,8 @@ """ title: Langfuse Filter Pipeline author: open-webui -date: 2025-02-20 -version: 1.5 +date: 2025-03-28 +version: 1.7 license: MIT description: A filter pipeline that uses Langfuse. requirements: langfuse @@ -20,6 +20,7 @@ from langfuse.api.resources.commons.errors.unauthorized_error import Unauthorize def get_last_assistant_message_obj(messages: List[dict]) -> dict: + """Retrieve the last assistant message from the message list.""" for message in reversed(messages): if message["role"] == "assistant": return message @@ -33,6 +34,10 @@ class Pipeline: secret_key: str public_key: str host: str + # New valve that controls whether task names are added as tags: + insert_tags: bool = True + # New valve that controls whether to use model name instead of model ID for generation + use_model_name_instead_of_id_for_generation: bool = False debug: bool = False def __init__(self): @@ -45,18 +50,21 @@ class Pipeline: "secret_key": os.getenv("LANGFUSE_SECRET_KEY", "your-secret-key-here"), "public_key": os.getenv("LANGFUSE_PUBLIC_KEY", "your-public-key-here"), "host": os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com"), + "use_model_name_instead_of_id_for_generation": os.getenv("USE_MODEL_NAME", "false").lower() == "true", "debug": os.getenv("DEBUG_MODE", "false").lower() == "true", } ) self.langfuse = None - # Keep track of the trace and the last-created generation for each chat_id self.chat_traces = {} - self.chat_generations = {} self.suppressed_logs = set() + # Dictionary to store model names for each chat + self.model_names = {} + + # Only these tasks will be treated as LLM "generations": + self.GENERATION_TASKS = {"llm_response"} def log(self, message: str, suppress_repeats: bool = False): - """Logs messages to the terminal if debugging is enabled.""" if self.valves.debug: if suppress_repeats: if message in self.suppressed_logs: @@ -96,47 +104,44 @@ class Pipeline: f"Langfuse error: {e} Please re-enter your Langfuse credentials in the pipeline settings." ) + def _build_tags(self, task_name: str) -> list: + """ + Builds a list of tags based on valve settings, ensuring we always add + 'open-webui' and skip user_response / llm_response from becoming tags themselves. + """ + tags_list = [] + if self.valves.insert_tags: + # Always add 'open-webui' + tags_list.append("open-webui") + # Add the task_name if it's not one of the excluded defaults + if task_name not in ["user_response", "llm_response"]: + tags_list.append(task_name) + return tags_list + async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: - """ - Inlet handles the incoming request (usually a user message). - - If no trace exists yet for this chat_id, we create a new trace. - - If a trace does exist, we simply create a new generation for the new user message. - """ if self.valves.debug: print(f"[DEBUG] Received request: {json.dumps(body, indent=2)}") self.log(f"Inlet function called with body: {body} and user: {user}") metadata = body.get("metadata", {}) + chat_id = metadata.get("chat_id", str(uuid.uuid4())) + metadata["chat_id"] = chat_id + body["metadata"] = metadata - # --------------------------------------------------------- - # Prepend the system prompt from metadata to the system message: + # Extract and store both model name and ID if available model_info = metadata.get("model", {}) - params_info = model_info.get("params", {}) - system_prompt = params_info.get("system", "") - - if system_prompt: - for msg in body["messages"]: - if msg.get("role") == "system": - # Only prepend if it hasn't already been prepended: - if not msg["content"].startswith("System Prompt:"): - msg["content"] = f"System Prompt:\n{system_prompt}\n\n{msg['content']}" - break - # --------------------------------------------------------- - - # Fix SYSTEM MESSAGE prefix issue: Only apply for "task_generation" - if "chat_id" not in metadata: - if "task_generation" in metadata.get("type", "").lower(): - chat_id = f"SYSTEM MESSAGE {uuid.uuid4()}" - self.log(f"Task Generation detected, assigned SYSTEM MESSAGE ID: {chat_id}") - else: - chat_id = str(uuid.uuid4()) # Regular chat messages - self.log(f"Assigned normal chat_id: {chat_id}") - - metadata["chat_id"] = chat_id - body["metadata"] = metadata + model_id = body.get("model") + + # Store model information for this chat + if chat_id not in self.model_names: + self.model_names[chat_id] = {"id": model_id} else: - chat_id = metadata["chat_id"] + self.model_names[chat_id]["id"] = model_id + + if isinstance(model_info, dict) and "name" in model_info: + self.model_names[chat_id]["name"] = model_info["name"] + self.log(f"Stored model info - name: '{model_info['name']}', id: '{model_id}' for chat_id: {chat_id}") required_keys = ["model", "messages"] missing_keys = [key for key in required_keys if key not in body] @@ -146,100 +151,108 @@ class Pipeline: raise ValueError(error_message) user_email = user.get("email") if user else None + # Defaulting to 'user_response' if no task is provided + task_name = metadata.get("task", "user_response") + + # Build tags + tags_list = self._build_tags(task_name) - # Check if we already have a trace for this chat if chat_id not in self.chat_traces: - # Create a new trace and generation - self.log(f"Creating new chat trace for chat_id: {chat_id}") + self.log(f"Creating new trace for chat_id: {chat_id}") trace_payload = { - "name": f"filter:{__name__}", + "name": f"chat:{chat_id}", "input": body, "user_id": user_email, - "metadata": {"chat_id": chat_id}, + "metadata": metadata, "session_id": chat_id, } + if tags_list: + trace_payload["tags"] = tags_list + if self.valves.debug: print(f"[DEBUG] Langfuse trace request: {json.dumps(trace_payload, indent=2)}") trace = self.langfuse.trace(**trace_payload) + self.chat_traces[chat_id] = trace + else: + trace = self.chat_traces[chat_id] + self.log(f"Reusing existing trace for chat_id: {chat_id}") + if tags_list: + trace.update(tags=tags_list) + # Update metadata with type + metadata["type"] = task_name + metadata["interface"] = "open-webui" + + # If it's a task that is considered an LLM generation + if task_name in self.GENERATION_TASKS: + # Determine which model value to use based on the use_model_name valve + model_id = self.model_names.get(chat_id, {}).get("id", body["model"]) + model_name = self.model_names.get(chat_id, {}).get("name", "unknown") + + # Pick primary model identifier based on valve setting + model_value = model_name if self.valves.use_model_name_instead_of_id_for_generation else model_id + + # Add both values to metadata regardless of valve setting + metadata["model_id"] = model_id + metadata["model_name"] = model_name + generation_payload = { - "name": chat_id, - "model": body["model"], + "name": f"{task_name}:{str(uuid.uuid4())}", + "model": model_value, "input": body["messages"], - "metadata": {"interface": "open-webui"}, + "metadata": metadata, } + if tags_list: + generation_payload["tags"] = tags_list if self.valves.debug: print(f"[DEBUG] Langfuse generation request: {json.dumps(generation_payload, indent=2)}") - generation = trace.generation(**generation_payload) - - self.chat_traces[chat_id] = trace - self.chat_generations[chat_id] = generation - self.log(f"Trace and generation objects successfully created for chat_id: {chat_id}") - + trace.generation(**generation_payload) else: - # Re-use existing trace but create a new generation for each new message - self.log(f"Re-using existing chat trace for chat_id: {chat_id}") - trace = self.chat_traces[chat_id] - - new_generation_payload = { - "name": f"{chat_id}:{str(uuid.uuid4())}", - "model": body["model"], + # Otherwise, log it as an event + event_payload = { + "name": f"{task_name}:{str(uuid.uuid4())}", + "metadata": metadata, "input": body["messages"], - "metadata": {"interface": "open-webui"}, } - if self.valves.debug: - print(f"[DEBUG] Langfuse new_generation request: {json.dumps(new_generation_payload, indent=2)}") + if tags_list: + event_payload["tags"] = tags_list - new_generation = trace.generation(**new_generation_payload) - self.chat_generations[chat_id] = new_generation + if self.valves.debug: + print(f"[DEBUG] Langfuse event request: {json.dumps(event_payload, indent=2)}") + + trace.event(**event_payload) return body async def outlet(self, body: dict, user: Optional[dict] = None) -> dict: - """ - Outlet handles the response body (usually the assistant message). - It will finalize/end the generation created for the user request. - """ self.log(f"Outlet function called with body: {body}") chat_id = body.get("chat_id") + metadata = body.get("metadata", {}) + # Defaulting to 'llm_response' if no task is provided + task_name = metadata.get("task", "llm_response") - # If no trace or generation exist, attempt to register again - if chat_id not in self.chat_traces or chat_id not in self.chat_generations: - self.log(f"[WARNING] No matching chat trace found for chat_id: {chat_id}, attempting to re-register.") + # Build tags + tags_list = self._build_tags(task_name) + + if chat_id not in self.chat_traces: + self.log(f"[WARNING] No matching trace found for chat_id: {chat_id}, attempting to re-register.") + # Re-run inlet to register if somehow missing return await self.inlet(body, user) trace = self.chat_traces[chat_id] - generation = self.chat_generations[chat_id] - # Get the last assistant message from the conversation assistant_message = get_last_assistant_message(body["messages"]) assistant_message_obj = get_last_assistant_message_obj(body["messages"]) - # --------------------------------------------------------- - # If the outlet contains a sources array, append it after the "System Prompt:" - # section in the system message: - if assistant_message_obj and "sources" in assistant_message_obj and assistant_message_obj["sources"]: - for msg in body["messages"]: - if msg.get("role") == "system": - if msg["content"].startswith("System Prompt:"): - # Format the sources nicely - sources_str = "\n\n".join( - json.dumps(src, indent=2) for src in assistant_message_obj["sources"] - ) - msg["content"] += f"\n\nSources:\n{sources_str}" - break - # --------------------------------------------------------- - - # Extract usage if available usage = None if assistant_message_obj: - info = assistant_message_obj.get("info", {}) + info = assistant_message_obj.get("usage", {}) if isinstance(info, dict): input_tokens = info.get("prompt_eval_count") or info.get("prompt_tokens") output_tokens = info.get("eval_count") or info.get("completion_tokens") @@ -251,20 +264,58 @@ class Pipeline: } self.log(f"Usage data extracted: {usage}") - # Optionally update the trace with the final assistant output + # Update the trace output with the last assistant message trace.update(output=assistant_message) - # End the generation with the final assistant message and updated conversation - generation_payload = { - "input": body["messages"], # include the entire conversation - "metadata": {"interface": "open-webui"}, - "usage": usage, - } + metadata["type"] = task_name + metadata["interface"] = "open-webui" - if self.valves.debug: - print(f"[DEBUG] Langfuse generation end request: {json.dumps(generation_payload, indent=2)}") + if task_name in self.GENERATION_TASKS: + # Determine which model value to use based on the use_model_name valve + model_id = self.model_names.get(chat_id, {}).get("id", body.get("model")) + model_name = self.model_names.get(chat_id, {}).get("name", "unknown") + + # Pick primary model identifier based on valve setting + model_value = model_name if self.valves.use_model_name_instead_of_id_for_generation else model_id + + # Add both values to metadata regardless of valve setting + metadata["model_id"] = model_id + metadata["model_name"] = model_name + + # If it's an LLM generation + generation_payload = { + "name": f"{task_name}:{str(uuid.uuid4())}", + "model": model_value, # <-- Use model name or ID based on valve setting + "input": body["messages"], + "metadata": metadata, + "usage": usage, + } + if tags_list: + generation_payload["tags"] = tags_list - generation.end(**generation_payload) - self.log(f"Generation ended for chat_id: {chat_id}") + if self.valves.debug: + print(f"[DEBUG] Langfuse generation end request: {json.dumps(generation_payload, indent=2)}") + + trace.generation().end(**generation_payload) + self.log(f"Generation ended for chat_id: {chat_id}") + else: + # Otherwise log as an event + event_payload = { + "name": f"{task_name}:{str(uuid.uuid4())}", + "metadata": metadata, + "input": body["messages"], + } + if usage: + # If you want usage on event as well + event_payload["metadata"]["usage"] = usage + + if tags_list: + event_payload["tags"] = tags_list + + if self.valves.debug: + print(f"[DEBUG] Langfuse event end request: {json.dumps(event_payload, indent=2)}") + + trace.event(**event_payload) + self.log(f"Event logged for chat_id: {chat_id}") return body