mirror of
https://github.com/open-webui/pipelines
synced 2025-05-12 16:40:45 +00:00
modify non-llm responses as events
instead of inserting non-llm responses as generation, they are now inserted as events into langfuse
This commit is contained in:
parent
689d986f81
commit
3703976ac2
@ -1,7 +1,7 @@
|
|||||||
"""
|
"""
|
||||||
title: Langfuse Filter Pipeline
|
title: Langfuse Filter Pipeline
|
||||||
author: open-webui
|
author: open-webui
|
||||||
date: 2024-09-27
|
date: 2025-03-04
|
||||||
version: 1.6
|
version: 1.6
|
||||||
license: MIT
|
license: MIT
|
||||||
description: A filter pipeline that uses Langfuse.
|
description: A filter pipeline that uses Langfuse.
|
||||||
@ -56,6 +56,9 @@ class Pipeline:
|
|||||||
self.chat_traces = {}
|
self.chat_traces = {}
|
||||||
self.suppressed_logs = set()
|
self.suppressed_logs = set()
|
||||||
|
|
||||||
|
# Only these tasks will be treated as LLM "generations":
|
||||||
|
self.GENERATION_TASKS = {"llm_response"}
|
||||||
|
|
||||||
def log(self, message: str, suppress_repeats: bool = False):
|
def log(self, message: str, suppress_repeats: bool = False):
|
||||||
if self.valves.debug:
|
if self.valves.debug:
|
||||||
if suppress_repeats:
|
if suppress_repeats:
|
||||||
@ -99,7 +102,7 @@ class Pipeline:
|
|||||||
def _build_tags(self, task_name: str) -> list:
|
def _build_tags(self, task_name: str) -> list:
|
||||||
"""
|
"""
|
||||||
Builds a list of tags based on valve settings, ensuring we always add
|
Builds a list of tags based on valve settings, ensuring we always add
|
||||||
'open-webui' and skip user_response / llm_response.
|
'open-webui' and skip user_response / llm_response from becoming tags themselves.
|
||||||
"""
|
"""
|
||||||
tags_list = []
|
tags_list = []
|
||||||
if self.valves.insert_tags:
|
if self.valves.insert_tags:
|
||||||
@ -164,20 +167,35 @@ class Pipeline:
|
|||||||
metadata["type"] = task_name
|
metadata["type"] = task_name
|
||||||
metadata["interface"] = "open-webui"
|
metadata["interface"] = "open-webui"
|
||||||
|
|
||||||
generation_payload = {
|
# If it's a task that is considered an LLM generation
|
||||||
"name": f"{task_name}:{str(uuid.uuid4())}",
|
if task_name in self.GENERATION_TASKS:
|
||||||
"model": body["model"],
|
generation_payload = {
|
||||||
"input": body["messages"],
|
"name": f"{task_name}:{str(uuid.uuid4())}",
|
||||||
"metadata": metadata,
|
"model": body["model"],
|
||||||
}
|
"input": body["messages"],
|
||||||
|
"metadata": metadata,
|
||||||
|
}
|
||||||
|
if tags_list:
|
||||||
|
generation_payload["tags"] = tags_list
|
||||||
|
|
||||||
if tags_list:
|
if self.valves.debug:
|
||||||
generation_payload["tags"] = tags_list
|
print(f"[DEBUG] Langfuse generation request: {json.dumps(generation_payload, indent=2)}")
|
||||||
|
|
||||||
if self.valves.debug:
|
trace.generation(**generation_payload)
|
||||||
print(f"[DEBUG] Langfuse generation request: {json.dumps(generation_payload, indent=2)}")
|
else:
|
||||||
|
# Otherwise, log it as an event
|
||||||
|
event_payload = {
|
||||||
|
"name": f"{task_name}:{str(uuid.uuid4())}",
|
||||||
|
"metadata": metadata,
|
||||||
|
"input": body["messages"],
|
||||||
|
}
|
||||||
|
if tags_list:
|
||||||
|
event_payload["tags"] = tags_list
|
||||||
|
|
||||||
trace.generation(**generation_payload)
|
if self.valves.debug:
|
||||||
|
print(f"[DEBUG] Langfuse event request: {json.dumps(event_payload, indent=2)}")
|
||||||
|
|
||||||
|
trace.event(**event_payload)
|
||||||
|
|
||||||
return body
|
return body
|
||||||
|
|
||||||
@ -222,20 +240,40 @@ class Pipeline:
|
|||||||
metadata["type"] = task_name
|
metadata["type"] = task_name
|
||||||
metadata["interface"] = "open-webui"
|
metadata["interface"] = "open-webui"
|
||||||
|
|
||||||
generation_payload = {
|
if task_name in self.GENERATION_TASKS:
|
||||||
"name": f"{task_name}:{str(uuid.uuid4())}",
|
# If it's an LLM generation
|
||||||
"input": body["messages"],
|
generation_payload = {
|
||||||
"metadata": metadata,
|
"name": f"{task_name}:{str(uuid.uuid4())}",
|
||||||
"usage": usage,
|
"input": body["messages"],
|
||||||
}
|
"metadata": metadata,
|
||||||
|
"usage": usage,
|
||||||
|
}
|
||||||
|
if tags_list:
|
||||||
|
generation_payload["tags"] = tags_list
|
||||||
|
|
||||||
if tags_list:
|
if self.valves.debug:
|
||||||
generation_payload["tags"] = tags_list
|
print(f"[DEBUG] Langfuse generation end request: {json.dumps(generation_payload, indent=2)}")
|
||||||
|
|
||||||
if self.valves.debug:
|
trace.generation().end(**generation_payload)
|
||||||
print(f"[DEBUG] Langfuse generation end request: {json.dumps(generation_payload, indent=2)}")
|
self.log(f"Generation ended for chat_id: {chat_id}")
|
||||||
|
else:
|
||||||
|
# Otherwise log as an event
|
||||||
|
event_payload = {
|
||||||
|
"name": f"{task_name}:{str(uuid.uuid4())}",
|
||||||
|
"metadata": metadata,
|
||||||
|
"input": body["messages"],
|
||||||
|
}
|
||||||
|
if usage:
|
||||||
|
# If you want usage on event as well
|
||||||
|
event_payload["metadata"]["usage"] = usage
|
||||||
|
|
||||||
trace.generation().end(**generation_payload)
|
if tags_list:
|
||||||
self.log(f"Generation ended for chat_id: {chat_id}")
|
event_payload["tags"] = tags_list
|
||||||
|
|
||||||
|
if self.valves.debug:
|
||||||
|
print(f"[DEBUG] Langfuse event end request: {json.dumps(event_payload, indent=2)}")
|
||||||
|
|
||||||
|
trace.event(**event_payload)
|
||||||
|
self.log(f"Event logged for chat_id: {chat_id}")
|
||||||
|
|
||||||
return body
|
return body
|
||||||
|
Loading…
Reference in New Issue
Block a user