Update opik_filter_pipeline.py

This commit is contained in:
Assaf 2025-06-24 12:55:06 +03:00 committed by GitHub
parent adec65727e
commit 1e6bd15ad9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -2,9 +2,9 @@
title: Opik Filter Pipeline title: Opik Filter Pipeline
author: open-webui author: open-webui
date: 2025-03-12 date: 2025-03-12
version: 1.0 version: 1.1
license: MIT license: MIT
description: A filter pipeline that uses Opik for LLM observability. description: A filter pipeline that uses Opik for LLM observability with improved error handling.
requirements: opik requirements: opik
""" """
@ -12,6 +12,7 @@ from typing import List, Optional
import os import os
import uuid import uuid
import json import json
import time
from pydantic import BaseModel from pydantic import BaseModel
from opik import Opik from opik import Opik
@ -97,11 +98,50 @@ class Pipeline:
f"Opik error: {e} Please re-enter your Opik credentials in the pipeline settings." f"Opik error: {e} Please re-enter your Opik credentials in the pipeline settings."
) )
def cleanup_existing_trace(self, chat_id: str):
"""Safely cleanup existing trace and span for a chat_id"""
try:
if chat_id in self.chat_spans:
existing_span = self.chat_spans[chat_id]
try:
existing_span.end(output={"status": "interrupted", "reason": "new_request_received"})
self.log(f"Ended existing span for chat_id: {chat_id}")
except Exception as e:
self.log(f"Warning: Could not end existing span: {e}")
if chat_id in self.chat_traces:
existing_trace = self.chat_traces[chat_id]
try:
existing_trace.end(output={"status": "interrupted", "reason": "new_request_received"})
self.log(f"Ended existing trace for chat_id: {chat_id}")
except Exception as e:
self.log(f"Warning: Could not end existing trace: {e}")
# Clean up the dictionaries
self.chat_traces.pop(chat_id, None)
self.chat_spans.pop(chat_id, None)
self.log(f"Cleaned up existing trace/span for chat_id: {chat_id}")
except Exception as e:
self.log(f"Error during cleanup for chat_id {chat_id}: {e}")
# Force cleanup even if there are errors
self.chat_traces.pop(chat_id, None)
self.chat_spans.pop(chat_id, None)
def cleanup_stale_traces(self, max_count: int = 100):
"""Clean up traces if we have too many active ones"""
if len(self.chat_traces) > max_count:
self.log(f"Too many active traces ({len(self.chat_traces)}), cleaning up oldest ones")
# Clean up oldest traces (simple FIFO approach)
chat_ids_to_remove = list(self.chat_traces.keys())[:len(self.chat_traces) - max_count + 10]
for chat_id in chat_ids_to_remove:
self.cleanup_existing_trace(chat_id)
async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
""" """
Inlet handles the incoming request (usually a user message). Inlet handles the incoming request (usually a user message).
- If no trace exists yet for this chat_id, we create a new trace. - If no trace exists yet for this chat_id, we create a new trace.
- If a trace does exist, we simply create a new span for the new user message. - If a trace does exist, we clean it up and create a new one.
""" """
if self.valves.debug: if self.valves.debug:
print(f"[DEBUG] Received request: {json.dumps(body, indent=2)}") print(f"[DEBUG] Received request: {json.dumps(body, indent=2)}")
@ -117,7 +157,8 @@ class Pipeline:
return body return body
if "chat_id" not in metadata: if "chat_id" not in metadata:
chat_id = str(uuid.uuid4()) # Regular chat messages # Generate unique chat_id with timestamp for extra uniqueness
chat_id = f"{uuid.uuid4()}-{int(time.time() * 1000)}"
self.log(f"Assigned normal chat_id: {chat_id}") self.log(f"Assigned normal chat_id: {chat_id}")
metadata["chat_id"] = chat_id metadata["chat_id"] = chat_id
@ -136,65 +177,78 @@ class Pipeline:
user_email = user.get("email") if user else None user_email = user.get("email") if user else None
assert chat_id not in self.chat_traces, ( # FIXED: Instead of asserting, clean up any existing trace/span
f"There shouldn't be a trace already exists for chat_id {chat_id}" if chat_id in self.chat_traces:
) self.log(f"Found existing trace for chat_id {chat_id}, cleaning up...")
self.cleanup_existing_trace(chat_id)
# Periodic cleanup to prevent memory leaks
self.cleanup_stale_traces()
# Create a new trace and span # Create a new trace and span
self.log(f"Creating new chat trace for chat_id: {chat_id}") self.log(f"Creating new chat trace for chat_id: {chat_id}")
# Body copy for traces and span try:
trace_body = body.copy() # Body copy for traces and span
span_body = body.copy() trace_body = body.copy()
span_body = body.copy()
# Extract metadata from body # Extract metadata from body
metadata = trace_body.pop("metadata", {}) metadata = trace_body.pop("metadata", {})
metadata.update({"chat_id": chat_id, "user_id": user_email}) metadata.update({"chat_id": chat_id, "user_id": user_email})
# We don't need the model at the trace level # We don't need the model at the trace level
trace_body.pop("model", None) trace_body.pop("model", None)
trace_payload = { trace_payload = {
"name": f"{__name__}", "name": f"{__name__}",
"input": trace_body, "input": trace_body,
"metadata": metadata, "metadata": metadata,
"thread_id": chat_id, "thread_id": chat_id,
} }
if self.valves.debug: if self.valves.debug:
print(f"[DEBUG] Opik trace request: {json.dumps(trace_payload, indent=2)}") print(f"[DEBUG] Opik trace request: {json.dumps(trace_payload, indent=2)}")
trace = self.opik.trace(**trace_payload) trace = self.opik.trace(**trace_payload)
span_metadata = metadata.copy() span_metadata = metadata.copy()
span_metadata.update({"interface": "open-webui"}) span_metadata.update({"interface": "open-webui"})
# Extract the model from body # Extract the model from body
span_body.pop("model", None) span_body.pop("model", None)
# We don't need the metadata in the input for the span # We don't need the metadata in the input for the span
span_body.pop("metadata", None) span_body.pop("metadata", None)
# Extract the model and provider from metadata # Extract the model and provider from metadata
model = span_metadata.get("model", {}).get("id", None) model = span_metadata.get("model", {}).get("id", None)
provider = span_metadata.get("model", {}).get("owned_by", None) provider = span_metadata.get("model", {}).get("owned_by", None)
span_payload = { span_payload = {
"name": chat_id, "name": chat_id,
"model": model, "model": model,
"provider": provider, "provider": provider,
"input": span_body, "input": span_body,
"metadata": span_metadata, "metadata": span_metadata,
"type": "llm", "type": "llm",
} }
if self.valves.debug: if self.valves.debug:
print(f"[DEBUG] Opik span request: {json.dumps(span_payload, indent=2)}") print(f"[DEBUG] Opik span request: {json.dumps(span_payload, indent=2)}")
span = trace.span(**span_payload) span = trace.span(**span_payload)
self.chat_traces[chat_id] = trace self.chat_traces[chat_id] = trace
self.chat_spans[chat_id] = span self.chat_spans[chat_id] = span
self.log(f"Trace and span objects successfully created for chat_id: {chat_id}") self.log(f"Trace and span objects successfully created for chat_id: {chat_id}")
except Exception as e:
self.log(f"Error creating Opik trace/span for chat_id {chat_id}: {e}")
# Clean up on error
self.chat_traces.pop(chat_id, None)
self.chat_spans.pop(chat_id, None)
# Don't fail the request, just log the error
self.log(f"Continuing without Opik logging for this request")
return body return body
@ -217,58 +271,74 @@ class Pipeline:
trace = self.chat_traces[chat_id] trace = self.chat_traces[chat_id]
span = self.chat_spans[chat_id] span = self.chat_spans[chat_id]
# Body copy for traces and span try:
trace_body = body.copy() # Body copy for traces and span
span_body = body.copy() trace_body = body.copy()
span_body = body.copy()
# Get the last assistant message from the conversation # Get the last assistant message from the conversation
assistant_message_obj = get_last_assistant_message_obj(body["messages"]) assistant_message_obj = get_last_assistant_message_obj(body["messages"])
# Extract usage if available # Extract usage if available
usage = None usage = None
self.log(f"Assistant message obj: {assistant_message_obj}") self.log(f"Assistant message obj: {assistant_message_obj}")
if assistant_message_obj: if assistant_message_obj:
message_usage = assistant_message_obj.get("usage", {}) message_usage = assistant_message_obj.get("usage", {})
if isinstance(message_usage, dict): if isinstance(message_usage, dict):
input_tokens = message_usage.get( input_tokens = message_usage.get(
"prompt_eval_count" "prompt_eval_count"
) or message_usage.get("prompt_tokens") ) or message_usage.get("prompt_tokens")
output_tokens = message_usage.get("eval_count") or message_usage.get( output_tokens = message_usage.get("eval_count") or message_usage.get(
"completion_tokens" "completion_tokens"
)
if input_tokens is not None and output_tokens is not None:
usage = {
"prompt_tokens": input_tokens,
"completion_tokens": output_tokens,
"total_tokens": input_tokens + output_tokens,
}
self.log(f"Usage data extracted: {usage}")
# Chat_id is already logged as trace thread
span_body.pop("chat_id", None)
# End the span with the final assistant message and updated conversation
span_payload = {
"output": span_body, # include the entire conversation
"usage": usage,
}
if self.valves.debug:
print(
f"[DEBUG] Opik span end request: {json.dumps(span_payload, indent=2)}"
) )
if input_tokens is not None and output_tokens is not None:
usage = {
"prompt_tokens": input_tokens,
"completion_tokens": output_tokens,
"total_tokens": input_tokens + output_tokens,
}
self.log(f"Usage data extracted: {usage}")
# Chat_id is already logged as trace thread span.end(**span_payload)
span_body.pop("chat_id", None) self.log(f"span ended for chat_id: {chat_id}")
# End the span with the final assistant message and updated conversation # Chat_id is already logged as trace thread
span_payload = { trace_body.pop("chat_id", None)
"output": span_body, # include the entire conversation
"usage": usage,
}
if self.valves.debug: # Optionally update the trace with the final assistant output
print( trace.end(output=trace_body)
f"[DEBUG] Opik span end request: {json.dumps(span_payload, indent=2)}" self.log(f"trace ended for chat_id: {chat_id}")
)
span.end(**span_payload) except Exception as e:
self.log(f"span ended for chat_id: {chat_id}") self.log(f"Error ending Opik trace/span for chat_id {chat_id}: {e}")
# Try to end gracefully even if there are errors
try:
span.end(output={"status": "error", "error": str(e)})
except:
pass
try:
trace.end(output={"status": "error", "error": str(e)})
except:
pass
# Chat_id is already logged as trace thread finally:
span_body.pop("chat_id", None) # Always clean up the dictionaries, even if ending the trace/span failed
self.chat_traces.pop(chat_id, None)
# Optionally update the trace with the final assistant output self.chat_spans.pop(chat_id, None)
trace.end(output=trace_body) self.log(f"Cleaned up trace/span references for chat_id: {chat_id}")
# Force the creation of a new trace and span for the next chat even if they are part of the same thread
del self.chat_traces[chat_id]
del self.chat_spans[chat_id]
return body return body