From 3703976ac2eec1950041a87529efe85f824cbc70 Mon Sep 17 00:00:00 2001 From: ther3zz <40278044+ther3zz@users.noreply.github.com> Date: Tue, 4 Mar 2025 10:28:20 -0500 Subject: [PATCH] modify non-llm responses as events instead of inserting non-llm responses as generation, they are now inserted as events into langfuse --- examples/filters/langfuse_filter_pipeline.py | 88 ++++++++++++++------ 1 file changed, 63 insertions(+), 25 deletions(-) diff --git a/examples/filters/langfuse_filter_pipeline.py b/examples/filters/langfuse_filter_pipeline.py index 1b6b8c7..f06e945 100644 --- a/examples/filters/langfuse_filter_pipeline.py +++ b/examples/filters/langfuse_filter_pipeline.py @@ -1,7 +1,7 @@ """ title: Langfuse Filter Pipeline author: open-webui -date: 2024-09-27 +date: 2025-03-04 version: 1.6 license: MIT description: A filter pipeline that uses Langfuse. @@ -56,6 +56,9 @@ class Pipeline: self.chat_traces = {} self.suppressed_logs = set() + # Only these tasks will be treated as LLM "generations": + self.GENERATION_TASKS = {"llm_response"} + def log(self, message: str, suppress_repeats: bool = False): if self.valves.debug: if suppress_repeats: @@ -99,7 +102,7 @@ class Pipeline: def _build_tags(self, task_name: str) -> list: """ Builds a list of tags based on valve settings, ensuring we always add - 'open-webui' and skip user_response / llm_response. + 'open-webui' and skip user_response / llm_response from becoming tags themselves. """ tags_list = [] if self.valves.insert_tags: @@ -164,20 +167,35 @@ class Pipeline: metadata["type"] = task_name metadata["interface"] = "open-webui" - generation_payload = { - "name": f"{task_name}:{str(uuid.uuid4())}", - "model": body["model"], - "input": body["messages"], - "metadata": metadata, - } + # If it's a task that is considered an LLM generation + if task_name in self.GENERATION_TASKS: + generation_payload = { + "name": f"{task_name}:{str(uuid.uuid4())}", + "model": body["model"], + "input": body["messages"], + "metadata": metadata, + } + if tags_list: + generation_payload["tags"] = tags_list - if tags_list: - generation_payload["tags"] = tags_list + if self.valves.debug: + print(f"[DEBUG] Langfuse generation request: {json.dumps(generation_payload, indent=2)}") - if self.valves.debug: - print(f"[DEBUG] Langfuse generation request: {json.dumps(generation_payload, indent=2)}") + trace.generation(**generation_payload) + else: + # Otherwise, log it as an event + event_payload = { + "name": f"{task_name}:{str(uuid.uuid4())}", + "metadata": metadata, + "input": body["messages"], + } + if tags_list: + event_payload["tags"] = tags_list - trace.generation(**generation_payload) + if self.valves.debug: + print(f"[DEBUG] Langfuse event request: {json.dumps(event_payload, indent=2)}") + + trace.event(**event_payload) return body @@ -222,20 +240,40 @@ class Pipeline: metadata["type"] = task_name metadata["interface"] = "open-webui" - generation_payload = { - "name": f"{task_name}:{str(uuid.uuid4())}", - "input": body["messages"], - "metadata": metadata, - "usage": usage, - } + if task_name in self.GENERATION_TASKS: + # If it's an LLM generation + generation_payload = { + "name": f"{task_name}:{str(uuid.uuid4())}", + "input": body["messages"], + "metadata": metadata, + "usage": usage, + } + if tags_list: + generation_payload["tags"] = tags_list - if tags_list: - generation_payload["tags"] = tags_list + if self.valves.debug: + print(f"[DEBUG] Langfuse generation end request: {json.dumps(generation_payload, indent=2)}") - if self.valves.debug: - print(f"[DEBUG] Langfuse generation end request: {json.dumps(generation_payload, indent=2)}") + trace.generation().end(**generation_payload) + self.log(f"Generation ended for chat_id: {chat_id}") + else: + # Otherwise log as an event + event_payload = { + "name": f"{task_name}:{str(uuid.uuid4())}", + "metadata": metadata, + "input": body["messages"], + } + if usage: + # If you want usage on event as well + event_payload["metadata"]["usage"] = usage - trace.generation().end(**generation_payload) - self.log(f"Generation ended for chat_id: {chat_id}") + if tags_list: + event_payload["tags"] = tags_list + + if self.valves.debug: + print(f"[DEBUG] Langfuse event end request: {json.dumps(event_payload, indent=2)}") + + trace.event(**event_payload) + self.log(f"Event logged for chat_id: {chat_id}") return body