This commit is contained in:
Assaf 2025-06-24 13:18:01 +03:00 committed by GitHub
commit 56a2024e4a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -2,9 +2,9 @@
title: Opik Filter Pipeline
author: open-webui
date: 2025-03-12
version: 1.0
version: 1.3
license: MIT
description: A filter pipeline that uses Opik for LLM observability.
description: A comprehensive filter pipeline that uses Opik for LLM observability with improved error handling and universal usage tracking. Supports token counting and billing data for all major LLM providers including Anthropic (Claude), OpenAI (GPT), Google (Gemini), Meta (Llama), Mistral, Cohere, and others.
requirements: opik
"""
@ -12,6 +12,7 @@ from typing import List, Optional
import os
import uuid
import json
import time
from pydantic import BaseModel
from opik import Opik
@ -97,11 +98,271 @@ class Pipeline:
f"Opik error: {e} Please re-enter your Opik credentials in the pipeline settings."
)
def cleanup_stale_spans(self, chat_id: str):
"""Clean up any existing span for a chat_id to prepare for a new one"""
if chat_id in self.chat_spans:
try:
existing_span = self.chat_spans[chat_id]
# End the previous span before creating a new one
existing_span.end(output={"status": "interrupted", "reason": "new_message_received"})
self.log(f"Ended previous span for chat_id: {chat_id}")
except Exception as e:
self.log(f"Warning: Could not end existing span for {chat_id}: {e}")
finally:
# Always remove from tracking
self.chat_spans.pop(chat_id, None)
def cleanup_orphaned_traces(self, max_count: int = 100):
"""Clean up traces if we have too many active ones"""
if len(self.chat_traces) > max_count:
self.log(f"Too many active traces ({len(self.chat_traces)}), cleaning up oldest ones")
# Clean up oldest traces (simple FIFO approach)
chat_ids_to_remove = list(self.chat_traces.keys())[:len(self.chat_traces) - max_count + 10]
for chat_id in chat_ids_to_remove:
try:
if chat_id in self.chat_spans:
span = self.chat_spans[chat_id]
span.end(output={"status": "cleanup", "reason": "too_many_active_traces"})
except:
pass
try:
if chat_id in self.chat_traces:
trace = self.chat_traces[chat_id]
trace.end(output={"status": "cleanup", "reason": "too_many_active_traces"})
except:
pass
self.chat_traces.pop(chat_id, None)
self.chat_spans.pop(chat_id, None)
def detect_provider_type(self, body: dict, metadata: dict) -> str:
"""Detect the LLM provider type based on model name and response structure"""
model_info = metadata.get("model", {})
model_id = model_info.get("id", "").lower()
model_name = body.get("model", "").lower()
# Check model names/IDs for provider detection
if any(x in model_id or x in model_name for x in ["claude", "anthropic"]):
return "anthropic"
elif any(x in model_id or x in model_name for x in ["gpt", "openai", "o1"]):
return "openai"
elif any(x in model_id or x in model_name for x in ["gemini", "palm", "bard", "google"]):
return "google"
elif any(x in model_id or x in model_name for x in ["llama", "meta"]):
return "meta"
elif any(x in model_id or x in model_name for x in ["mistral"]):
return "mistral"
elif any(x in model_id or x in model_name for x in ["cohere"]):
return "cohere"
# Check response structure for provider hints
if "usage" in body and "input_tokens" in body.get("usage", {}):
return "anthropic"
elif "usage" in body and "prompt_tokens" in body.get("usage", {}):
return "openai"
elif "usageMetadata" in body:
return "google"
return "unknown"
def extract_usage_data(self, body: dict, metadata: dict = None) -> Optional[dict]:
"""Extract token usage data with support for multiple API providers (Anthropic, OpenAI, Gemini, etc.)"""
if metadata is None:
metadata = {}
provider = self.detect_provider_type(body, metadata)
self.log(f"Detected provider: {provider}")
usage = None
# Method 1: Check for usage in response body (multiple provider formats)
if "usage" in body and isinstance(body["usage"], dict):
usage_data = body["usage"]
self.log(f"Found usage data in response body: {usage_data}")
# Anthropic API format: input_tokens, output_tokens
input_tokens = usage_data.get("input_tokens")
output_tokens = usage_data.get("output_tokens")
# OpenAI API format: prompt_tokens, completion_tokens
if input_tokens is None or output_tokens is None:
input_tokens = usage_data.get("prompt_tokens")
output_tokens = usage_data.get("completion_tokens")
# Some variations use different field names
if input_tokens is None or output_tokens is None:
input_tokens = usage_data.get("promptTokens") or usage_data.get("prompt_token_count")
output_tokens = usage_data.get("completionTokens") or usage_data.get("completion_token_count")
if input_tokens is not None and output_tokens is not None:
total_tokens = usage_data.get("total_tokens") or usage_data.get("totalTokens") or (input_tokens + output_tokens)
usage = {
"prompt_tokens": input_tokens,
"completion_tokens": output_tokens,
"total_tokens": total_tokens,
}
self.log(f"Extracted usage data from response body: {usage}")
return usage
# Method 2: Check for Gemini API format (usageMetadata)
if "usageMetadata" in body and isinstance(body["usageMetadata"], dict):
gemini_usage = body["usageMetadata"]
self.log(f"Found Gemini usage metadata: {gemini_usage}")
input_tokens = (
gemini_usage.get("promptTokenCount") or
gemini_usage.get("prompt_token_count") or
gemini_usage.get("inputTokens")
)
output_tokens = (
gemini_usage.get("candidatesTokenCount") or
gemini_usage.get("candidates_token_count") or
gemini_usage.get("outputTokens") or
gemini_usage.get("completionTokens")
)
total_tokens = (
gemini_usage.get("totalTokenCount") or
gemini_usage.get("total_token_count") or
gemini_usage.get("totalTokens")
)
if input_tokens is not None and output_tokens is not None:
if total_tokens is None:
total_tokens = input_tokens + output_tokens
usage = {
"prompt_tokens": input_tokens,
"completion_tokens": output_tokens,
"total_tokens": total_tokens,
}
self.log(f"Extracted Gemini usage data: {usage}")
return usage
# Method 3: Check assistant message for usage data (various formats)
assistant_message_obj = get_last_assistant_message_obj(body.get("messages", []))
if assistant_message_obj:
message_usage = assistant_message_obj.get("usage", {})
self.log(f"Assistant message usage: {message_usage}")
if isinstance(message_usage, dict):
# Try multiple field name variations for different providers
input_tokens = (
message_usage.get("input_tokens") or # Anthropic
message_usage.get("prompt_tokens") or # OpenAI
message_usage.get("prompt_eval_count") or # Some local models
message_usage.get("promptTokenCount") or # Gemini variants
message_usage.get("prompt_token_count") # Alternative naming
)
output_tokens = (
message_usage.get("output_tokens") or # Anthropic
message_usage.get("completion_tokens") or # OpenAI
message_usage.get("eval_count") or # Some local models
message_usage.get("candidatesTokenCount") or # Gemini variants
message_usage.get("completion_token_count") # Alternative naming
)
total_tokens = (
message_usage.get("total_tokens") or
message_usage.get("totalTokens") or
message_usage.get("totalTokenCount")
)
if input_tokens is not None and output_tokens is not None:
if total_tokens is None:
total_tokens = input_tokens + output_tokens
usage = {
"prompt_tokens": input_tokens,
"completion_tokens": output_tokens,
"total_tokens": total_tokens,
}
self.log(f"Extracted message-level usage data: {usage}")
return usage
# Method 4: Check for usage at individual message level (some APIs put it there)
if "messages" in body and isinstance(body["messages"], list):
for message in reversed(body["messages"]):
if message.get("role") == "assistant":
# Check multiple possible usage field locations
usage_sources = [
message.get("usage", {}),
message.get("usageMetadata", {}),
message.get("metadata", {}).get("usage", {}) if message.get("metadata") else {}
]
for msg_usage in usage_sources:
if isinstance(msg_usage, dict) and msg_usage:
self.log(f"Found message usage: {msg_usage}")
input_tokens = (
msg_usage.get("input_tokens") or
msg_usage.get("prompt_tokens") or
msg_usage.get("promptTokenCount") or
msg_usage.get("prompt_eval_count")
)
output_tokens = (
msg_usage.get("output_tokens") or
msg_usage.get("completion_tokens") or
msg_usage.get("candidatesTokenCount") or
msg_usage.get("eval_count")
)
total_tokens = (
msg_usage.get("total_tokens") or
msg_usage.get("totalTokens") or
msg_usage.get("totalTokenCount")
)
if input_tokens is not None and output_tokens is not None:
if total_tokens is None:
total_tokens = input_tokens + output_tokens
usage = {
"prompt_tokens": input_tokens,
"completion_tokens": output_tokens,
"total_tokens": total_tokens,
}
self.log(f"Extracted individual message usage: {usage}")
return usage
# Method 5: Check alternative response structures (some proxies/wrappers)
for alt_key in ["token_usage", "billing", "cost_info", "metrics"]:
if alt_key in body and isinstance(body[alt_key], dict):
alt_usage = body[alt_key]
self.log(f"Found alternative usage data in {alt_key}: {alt_usage}")
input_tokens = (
alt_usage.get("input_tokens") or
alt_usage.get("prompt_tokens") or
alt_usage.get("input") or
alt_usage.get("prompt")
)
output_tokens = (
alt_usage.get("output_tokens") or
alt_usage.get("completion_tokens") or
alt_usage.get("output") or
alt_usage.get("completion")
)
if input_tokens is not None and output_tokens is not None:
total_tokens = alt_usage.get("total_tokens") or alt_usage.get("total") or (input_tokens + output_tokens)
usage = {
"prompt_tokens": input_tokens,
"completion_tokens": output_tokens,
"total_tokens": total_tokens,
}
self.log(f"Extracted alternative usage data: {usage}")
return usage
self.log("No usage data found in any expected location")
if self.valves.debug:
# Log the full body structure to help debug
self.log(f"Full response body keys: {list(body.keys())}")
if "messages" in body and body["messages"]:
last_message = body["messages"][-1] if body["messages"] else {}
self.log(f"Last message keys: {list(last_message.keys()) if isinstance(last_message, dict) else 'Not a dict'}")
return None
async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
"""
Inlet handles the incoming request (usually a user message).
- If no trace exists yet for this chat_id, we create a new trace.
- If a trace does exist, we simply create a new span for the new user message.
- If a trace does exist, we reuse it and create a new span for the new user message.
"""
if self.valves.debug:
print(f"[DEBUG] Received request: {json.dumps(body, indent=2)}")
@ -117,7 +378,8 @@ class Pipeline:
return body
if "chat_id" not in metadata:
chat_id = str(uuid.uuid4()) # Regular chat messages
# Generate unique chat_id with timestamp for extra uniqueness
chat_id = f"{uuid.uuid4()}-{int(time.time() * 1000)}"
self.log(f"Assigned normal chat_id: {chat_id}")
metadata["chat_id"] = chat_id
@ -136,65 +398,91 @@ class Pipeline:
user_email = user.get("email") if user else None
assert chat_id not in self.chat_traces, (
f"There shouldn't be a trace already exists for chat_id {chat_id}"
)
# Periodic cleanup to prevent memory leaks
self.cleanup_orphaned_traces()
# Create a new trace and span
self.log(f"Creating new chat trace for chat_id: {chat_id}")
# FIXED: Check if trace already exists
trace = None
if chat_id in self.chat_traces:
# Reuse existing trace for continuing conversation
trace = self.chat_traces[chat_id]
self.log(f"Reusing existing trace for chat_id: {chat_id}")
# Clean up any existing span to prepare for new one
self.cleanup_stale_spans(chat_id)
else:
# Create a new trace for new conversation
self.log(f"Creating new chat trace for chat_id: {chat_id}")
try:
# Body copy for trace
trace_body = body.copy()
# Body copy for traces and span
trace_body = body.copy()
span_body = body.copy()
# Extract metadata from body
trace_metadata = trace_body.pop("metadata", {})
trace_metadata.update({"chat_id": chat_id, "user_id": user_email})
# Extract metadata from body
metadata = trace_body.pop("metadata", {})
metadata.update({"chat_id": chat_id, "user_id": user_email})
# We don't need the model at the trace level
trace_body.pop("model", None)
# We don't need the model at the trace level
trace_body.pop("model", None)
trace_payload = {
"name": f"{__name__}",
"input": trace_body,
"metadata": trace_metadata,
"thread_id": chat_id,
}
trace_payload = {
"name": f"{__name__}",
"input": trace_body,
"metadata": metadata,
"thread_id": chat_id,
}
if self.valves.debug:
print(f"[DEBUG] Opik trace request: {json.dumps(trace_payload, indent=2)}")
if self.valves.debug:
print(f"[DEBUG] Opik trace request: {json.dumps(trace_payload, indent=2)}")
trace = self.opik.trace(**trace_payload)
self.chat_traces[chat_id] = trace
self.log(f"New trace created for chat_id: {chat_id}")
trace = self.opik.trace(**trace_payload)
except Exception as e:
self.log(f"Error creating Opik trace for chat_id {chat_id}: {e}")
# Continue without Opik logging for this request
return body
span_metadata = metadata.copy()
span_metadata.update({"interface": "open-webui"})
# Create a new span (whether trace is new or existing)
try:
# Body copy for span
span_body = body.copy()
# Extract the model from body
span_body.pop("model", None)
# We don't need the metadata in the input for the span
span_body.pop("metadata", None)
span_metadata = metadata.copy()
span_metadata.update({"interface": "open-webui", "user_id": user_email})
# Extract the model and provider from metadata
model = span_metadata.get("model", {}).get("id", None)
provider = span_metadata.get("model", {}).get("owned_by", None)
# Extract the model from body
span_body.pop("model", None)
# We don't need the metadata in the input for the span
span_body.pop("metadata", None)
span_payload = {
"name": chat_id,
"model": model,
"provider": provider,
"input": span_body,
"metadata": span_metadata,
"type": "llm",
}
# Extract the model and provider from metadata
model = span_metadata.get("model", {}).get("id", None)
provider = span_metadata.get("model", {}).get("owned_by", None)
if self.valves.debug:
print(f"[DEBUG] Opik span request: {json.dumps(span_payload, indent=2)}")
# Generate unique span name with timestamp
span_name = f"{chat_id}-{int(time.time() * 1000)}"
span = trace.span(**span_payload)
span_payload = {
"name": span_name,
"model": model,
"provider": provider,
"input": span_body,
"metadata": span_metadata,
"type": "llm",
}
self.chat_traces[chat_id] = trace
self.chat_spans[chat_id] = span
self.log(f"Trace and span objects successfully created for chat_id: {chat_id}")
if self.valves.debug:
print(f"[DEBUG] Opik span request: {json.dumps(span_payload, indent=2)}")
span = trace.span(**span_payload)
self.chat_spans[chat_id] = span
self.log(f"New span created for chat_id: {chat_id}")
except Exception as e:
self.log(f"Error creating Opik span for chat_id {chat_id}: {e}")
# Don't fail the request, just log the error
return body
@ -207,68 +495,75 @@ class Pipeline:
chat_id = body.get("chat_id")
# If no trace or span exist, attempt to register again
if chat_id not in self.chat_traces or chat_id not in self.chat_spans:
# If no span exists, we can't log this response
if chat_id not in self.chat_spans:
self.log(
f"[WARNING] No matching chat trace found for chat_id: {chat_id}, chat won't be logged."
f"[WARNING] No active span found for chat_id: {chat_id}, response won't be logged."
)
return body
trace = self.chat_traces[chat_id]
span = self.chat_spans[chat_id]
# Body copy for traces and span
trace_body = body.copy()
span_body = body.copy()
try:
# Body copy for span
span_body = body.copy()
# Get the last assistant message from the conversation
assistant_message_obj = get_last_assistant_message_obj(body["messages"])
# FIXED: Extract usage data using improved method that supports multiple providers
metadata = body.get("metadata", {})
usage = self.extract_usage_data(body, metadata)
# Add provider and model information to usage data for better analytics
if usage:
provider = self.detect_provider_type(body, metadata)
model_info = metadata.get("model", {})
# Enhance usage data with provider context
usage.update({
"provider": provider,
"model_id": model_info.get("id", "unknown"),
"model_name": model_info.get("name", "unknown"),
})
self.log(f"Enhanced usage data with provider info: {usage}")
if usage:
self.log(f"Successfully extracted usage data: {usage}")
else:
self.log("No usage data found - this might indicate an API integration issue")
# Extract usage if available
usage = None
self.log(f"Assistant message obj: {assistant_message_obj}")
if assistant_message_obj:
message_usage = assistant_message_obj.get("usage", {})
if isinstance(message_usage, dict):
input_tokens = message_usage.get(
"prompt_eval_count"
) or message_usage.get("prompt_tokens")
output_tokens = message_usage.get("eval_count") or message_usage.get(
"completion_tokens"
# Chat_id is already logged as trace thread
span_body.pop("chat_id", None)
# End the span with the final assistant message and updated conversation
span_payload = {
"output": span_body, # include the entire conversation
"usage": usage,
}
if self.valves.debug:
print(
f"[DEBUG] Opik span end request: {json.dumps(span_payload, indent=2)}"
)
if input_tokens is not None and output_tokens is not None:
usage = {
"prompt_tokens": input_tokens,
"completion_tokens": output_tokens,
"total_tokens": input_tokens + output_tokens,
}
self.log(f"Usage data extracted: {usage}")
# Chat_id is already logged as trace thread
span_body.pop("chat_id", None)
span.end(**span_payload)
self.log(f"Span ended for chat_id: {chat_id}")
# End the span with the final assistant message and updated conversation
span_payload = {
"output": span_body, # include the entire conversation
"usage": usage,
}
except Exception as e:
self.log(f"Error ending Opik span for chat_id {chat_id}: {e}")
# Try to end gracefully even if there are errors
try:
span.end(output={"status": "error", "error": str(e)})
except:
pass
if self.valves.debug:
print(
f"[DEBUG] Opik span end request: {json.dumps(span_payload, indent=2)}"
)
finally:
# Clean up the span reference (but keep the trace for potential future messages)
self.chat_spans.pop(chat_id, None)
self.log(f"Cleaned up span reference for chat_id: {chat_id}")
span.end(**span_payload)
self.log(f"span ended for chat_id: {chat_id}")
# Chat_id is already logged as trace thread
span_body.pop("chat_id", None)
# Optionally update the trace with the final assistant output
trace.end(output=trace_body)
# Force the creation of a new trace and span for the next chat even if they are part of the same thread
del self.chat_traces[chat_id]
del self.chat_spans[chat_id]
# NOTE: We deliberately DON'T clean up the trace here, as it should persist
# for the duration of the conversation. Traces will be cleaned up by:
# 1. The cleanup_orphaned_traces method when there are too many
# 2. Server restart/shutdown
# 3. Manual cleanup if needed
return body