From ee5255144f5a8f8efa5041b8e2084f27056f0c17 Mon Sep 17 00:00:00 2001 From: resqnet Date: Wed, 19 Feb 2025 14:57:43 +0900 Subject: [PATCH 1/2] add langfuse pipline outlet --- .../langfuse_filter_pipeline_outlet.py | 153 ++++++++++++++++++ 1 file changed, 153 insertions(+) create mode 100644 examples/filters/langfuse_filter_pipeline_outlet.py diff --git a/examples/filters/langfuse_filter_pipeline_outlet.py b/examples/filters/langfuse_filter_pipeline_outlet.py new file mode 100644 index 0000000..608cfda --- /dev/null +++ b/examples/filters/langfuse_filter_pipeline_outlet.py @@ -0,0 +1,153 @@ +""" +title: Langfuse Filter Pipeline Outlet +author: open-webui +date: 2025-02-19 +version: 1.4 +license: MIT +description: A filter pipeline that uses Langfuse. +requirements: langfuse +""" + +from typing import List, Optional +import os +import uuid + +from utils.pipelines.main import get_last_assistant_message +from pydantic import BaseModel +from langfuse import Langfuse +from langfuse.api.resources.commons.errors.unauthorized_error import UnauthorizedError + + +def get_last_assistant_message_obj(messages: List[dict]) -> dict: + for message in reversed(messages): + if message["role"] == "assistant": + return message + return {} + + +class Pipeline: + class Valves(BaseModel): + pipelines: List[str] = [] + priority: int = 0 + secret_key: str + public_key: str + host: str + + def __init__(self): + self.type = "filter" + self.name = "Langfuse Filter Outlet" + self.valves = self.Valves( + **{ + "pipelines": ["*"], + "secret_key": os.getenv("LANGFUSE_SECRET_KEY", "your-secret-key-here"), + "public_key": os.getenv("LANGFUSE_PUBLIC_KEY", "your-public-key-here"), + "host": os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com"), + } + ) + self.langfuse = None + self.chat_traces = {} + self.chat_generations = {} + + async def on_startup(self): + print(f"on_startup:{__name__}") + self.set_langfuse() + + async def on_shutdown(self): + print(f"on_shutdown:{__name__}") + self.langfuse.flush() + + async def on_valves_updated(self): + self.set_langfuse() + + def set_langfuse(self): + try: + self.langfuse = Langfuse( + secret_key=self.valves.secret_key, + public_key=self.valves.public_key, + host=self.valves.host, + debug=False, + ) + self.langfuse.auth_check() + except UnauthorizedError: + print( + "Langfuse credentials incorrect. Please re-enter your Langfuse credentials in the pipeline settings." + ) + except Exception as e: + print( + f"Langfuse error: {e} Please re-enter your Langfuse credentials in the pipeline settings." + ) + + async def outlet(self, body: dict, user: Optional[dict] = None) -> dict: + print(f"outlet:{__name__}") + print(f"Received body: {body}") + + if "chat_id" not in body: + print("chat_id not in body") + return body + + if ( + body["chat_id"] not in self.chat_generations + or body["chat_id"] not in self.chat_traces + ): + user_id = user.get("id") if user else None + user_name = user.get("name") if user else None + user_email = user.get("email") if user else None + + trace = self.langfuse.trace( + name=f"filter:{__name__}", + input=body, + user_id=user_email, + metadata={ + "user_name": user_name, + "user_id": user_id, + "chat_id": body["chat_id"], + }, + session_id=body["chat_id"], + ) + + generation = trace.generation( + name=body["chat_id"], + model=body["model"], + input=body["messages"], + metadata={"interface": "open-webui"}, + ) + + self.chat_traces[body["chat_id"]] = trace + self.chat_generations[body["chat_id"]] = generation + + trace = self.chat_traces[body["chat_id"]] + generation = self.chat_generations[body["chat_id"]] + assistant_message = get_last_assistant_message(body["messages"]) + + # Extract usage information for models that support it + usage = None + assistant_message_obj = get_last_assistant_message_obj(body["messages"]) + if assistant_message_obj: + info = assistant_message_obj.get("info", {}) + if isinstance(info, dict): + input_tokens = info.get("prompt_eval_count") or info.get( + "prompt_tokens" + ) + output_tokens = info.get("eval_count") or info.get("completion_tokens") + if input_tokens is not None and output_tokens is not None: + usage = { + "input": input_tokens, + "output": output_tokens, + "unit": "TOKENS", + } + + # Update generation + trace.update( + output=assistant_message, + ) + generation.end( + output=assistant_message, + metadata={"interface": "open-webui"}, + usage=usage, + ) + + # Clean up the chat_generations dictionary + del self.chat_traces[body["chat_id"]] + del self.chat_generations[body["chat_id"]] + + return body From eff7d5bc3aec9e6d402c266155d6e38901404a49 Mon Sep 17 00:00:00 2001 From: resqnet Date: Thu, 20 Feb 2025 13:01:18 +0900 Subject: [PATCH 2/2] Refactor: Remove unnecessary cache and exclude last assistant message from input to maintain context integrity --- .../langfuse_filter_pipeline_outlet.py | 62 ++++++++----------- 1 file changed, 27 insertions(+), 35 deletions(-) diff --git a/examples/filters/langfuse_filter_pipeline_outlet.py b/examples/filters/langfuse_filter_pipeline_outlet.py index 608cfda..b4da0c9 100644 --- a/examples/filters/langfuse_filter_pipeline_outlet.py +++ b/examples/filters/langfuse_filter_pipeline_outlet.py @@ -25,6 +25,12 @@ def get_last_assistant_message_obj(messages: List[dict]) -> dict: return {} +def remove_last_assistant_message(messages: List[dict]) -> List[dict]: + if messages and messages[-1]["role"] == "assistant": + return messages[:-1] + return messages + + class Pipeline: class Valves(BaseModel): pipelines: List[str] = [] @@ -45,8 +51,6 @@ class Pipeline: } ) self.langfuse = None - self.chat_traces = {} - self.chat_generations = {} async def on_startup(self): print(f"on_startup:{__name__}") @@ -85,38 +89,30 @@ class Pipeline: print("chat_id not in body") return body - if ( - body["chat_id"] not in self.chat_generations - or body["chat_id"] not in self.chat_traces - ): - user_id = user.get("id") if user else None - user_name = user.get("name") if user else None - user_email = user.get("email") if user else None + user_id = user.get("id") if user else None + user_name = user.get("name") if user else None + user_email = user.get("email") if user else None - trace = self.langfuse.trace( - name=f"filter:{__name__}", - input=body, - user_id=user_email, - metadata={ - "user_name": user_name, - "user_id": user_id, - "chat_id": body["chat_id"], - }, - session_id=body["chat_id"], - ) + input_messages = remove_last_assistant_message(body["messages"]) + trace = self.langfuse.trace( + name=f"filter:{__name__}", + input=input_messages, + user_id=user_email, + metadata={ + "user_name": user_name, + "user_id": user_id, + "chat_id": body["chat_id"], + }, + session_id=body["chat_id"], + ) - generation = trace.generation( - name=body["chat_id"], - model=body["model"], - input=body["messages"], - metadata={"interface": "open-webui"}, - ) + generation = trace.generation( + name=body["chat_id"], + model=body["model"], + input=input_messages, + metadata={"interface": "open-webui"}, + ) - self.chat_traces[body["chat_id"]] = trace - self.chat_generations[body["chat_id"]] = generation - - trace = self.chat_traces[body["chat_id"]] - generation = self.chat_generations[body["chat_id"]] assistant_message = get_last_assistant_message(body["messages"]) # Extract usage information for models that support it @@ -146,8 +142,4 @@ class Pipeline: usage=usage, ) - # Clean up the chat_generations dictionary - del self.chat_traces[body["chat_id"]] - del self.chat_generations[body["chat_id"]] - return body