add tag valve and tag insert

adds a valve to enable/disable inserting tags into the langfuse trace.
This commit is contained in:
ther3zz 2025-03-03 13:26:03 -05:00 committed by GitHub
parent 78315eb930
commit 689d986f81
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -35,6 +35,8 @@ class Pipeline:
public_key: str
host: str
debug: bool = False
# New valve that controls whether task names are added as tags:
insert_tags: bool = True
def __init__(self):
self.type = "filter"
@ -94,6 +96,20 @@ class Pipeline:
f"Langfuse error: {e} Please re-enter your Langfuse credentials in the pipeline settings."
)
def _build_tags(self, task_name: str) -> list:
"""
Builds a list of tags based on valve settings, ensuring we always add
'open-webui' and skip user_response / llm_response.
"""
tags_list = []
if self.valves.insert_tags:
# Always add 'open-webui'
tags_list.append("open-webui")
# Add the task_name if it's not one of the excluded defaults
if task_name not in ["user_response", "llm_response"]:
tags_list.append(task_name)
return tags_list
async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
if self.valves.debug:
print(f"[DEBUG] Received request: {json.dumps(body, indent=2)}")
@ -113,7 +129,11 @@ class Pipeline:
raise ValueError(error_message)
user_email = user.get("email") if user else None
task_name = metadata.get("task", "user_response")
# Defaulting to 'user_response' if no task is provided
task_name = metadata.get("task", "user_response")
# Build tags
tags_list = self._build_tags(task_name)
if chat_id not in self.chat_traces:
self.log(f"Creating new trace for chat_id: {chat_id}")
@ -126,6 +146,9 @@ class Pipeline:
"session_id": chat_id,
}
if tags_list:
trace_payload["tags"] = tags_list
if self.valves.debug:
print(f"[DEBUG] Langfuse trace request: {json.dumps(trace_payload, indent=2)}")
@ -134,7 +157,10 @@ class Pipeline:
else:
trace = self.chat_traces[chat_id]
self.log(f"Reusing existing trace for chat_id: {chat_id}")
if tags_list:
trace.update(tags=tags_list)
# Update metadata with type
metadata["type"] = task_name
metadata["interface"] = "open-webui"
@ -145,6 +171,9 @@ class Pipeline:
"metadata": metadata,
}
if tags_list:
generation_payload["tags"] = tags_list
if self.valves.debug:
print(f"[DEBUG] Langfuse generation request: {json.dumps(generation_payload, indent=2)}")
@ -157,18 +186,23 @@ class Pipeline:
chat_id = body.get("chat_id")
metadata = body.get("metadata", {})
task_name = metadata.get("task", "llm_response")
# Defaulting to 'llm_response' if no task is provided
task_name = metadata.get("task", "llm_response")
# Build tags
tags_list = self._build_tags(task_name)
if chat_id not in self.chat_traces:
self.log(f"[WARNING] No matching trace found for chat_id: {chat_id}, attempting to re-register.")
# Re-run inlet to register if somehow missing
return await self.inlet(body, user)
trace = self.chat_traces[chat_id]
assistant_message = get_last_assistant_message(body["messages"])
assistant_message_obj = get_last_assistant_message_obj(body["messages"])
usage = None
assistant_message_obj = get_last_assistant_message_obj(body["messages"])
if assistant_message_obj:
info = assistant_message_obj.get("usage", {})
if isinstance(info, dict):
@ -182,6 +216,7 @@ class Pipeline:
}
self.log(f"Usage data extracted: {usage}")
# Update the trace output with the last assistant message
trace.update(output=assistant_message)
metadata["type"] = task_name
@ -191,13 +226,16 @@ class Pipeline:
"name": f"{task_name}:{str(uuid.uuid4())}",
"input": body["messages"],
"metadata": metadata,
"usage": usage,
"usage": usage,
}
if tags_list:
generation_payload["tags"] = tags_list
if self.valves.debug:
print(f"[DEBUG] Langfuse generation end request: {json.dumps(generation_payload, indent=2)}")
trace.generation().end(**generation_payload)
trace.generation().end(**generation_payload)
self.log(f"Generation ended for chat_id: {chat_id}")
return body