Update opik_filter_pipeline.py

This commit is contained in:
Assaf 2025-06-24 13:15:56 +03:00 committed by GitHub
parent b6cc9ae137
commit 6f4d86392d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -2,9 +2,9 @@
title: Opik Filter Pipeline
author: open-webui
date: 2025-03-12
version: 1.1
version: 1.3
license: MIT
description: A filter pipeline that uses Opik for LLM observability with improved error handling.
description: A comprehensive filter pipeline that uses Opik for LLM observability with improved error handling and universal usage tracking. Supports token counting and billing data for all major LLM providers including Anthropic (Claude), OpenAI (GPT), Google (Gemini), Meta (Llama), Mistral, Cohere, and others.
requirements: opik
"""
@ -134,6 +134,230 @@ class Pipeline:
self.chat_traces.pop(chat_id, None)
self.chat_spans.pop(chat_id, None)
def detect_provider_type(self, body: dict, metadata: dict) -> str:
"""Detect the LLM provider type based on model name and response structure"""
model_info = metadata.get("model", {})
model_id = model_info.get("id", "").lower()
model_name = body.get("model", "").lower()
# Check model names/IDs for provider detection
if any(x in model_id or x in model_name for x in ["claude", "anthropic"]):
return "anthropic"
elif any(x in model_id or x in model_name for x in ["gpt", "openai", "o1"]):
return "openai"
elif any(x in model_id or x in model_name for x in ["gemini", "palm", "bard", "google"]):
return "google"
elif any(x in model_id or x in model_name for x in ["llama", "meta"]):
return "meta"
elif any(x in model_id or x in model_name for x in ["mistral"]):
return "mistral"
elif any(x in model_id or x in model_name for x in ["cohere"]):
return "cohere"
# Check response structure for provider hints
if "usage" in body and "input_tokens" in body.get("usage", {}):
return "anthropic"
elif "usage" in body and "prompt_tokens" in body.get("usage", {}):
return "openai"
elif "usageMetadata" in body:
return "google"
return "unknown"
def extract_usage_data(self, body: dict, metadata: dict = None) -> Optional[dict]:
"""Extract token usage data with support for multiple API providers (Anthropic, OpenAI, Gemini, etc.)"""
if metadata is None:
metadata = {}
provider = self.detect_provider_type(body, metadata)
self.log(f"Detected provider: {provider}")
usage = None
# Method 1: Check for usage in response body (multiple provider formats)
if "usage" in body and isinstance(body["usage"], dict):
usage_data = body["usage"]
self.log(f"Found usage data in response body: {usage_data}")
# Anthropic API format: input_tokens, output_tokens
input_tokens = usage_data.get("input_tokens")
output_tokens = usage_data.get("output_tokens")
# OpenAI API format: prompt_tokens, completion_tokens
if input_tokens is None or output_tokens is None:
input_tokens = usage_data.get("prompt_tokens")
output_tokens = usage_data.get("completion_tokens")
# Some variations use different field names
if input_tokens is None or output_tokens is None:
input_tokens = usage_data.get("promptTokens") or usage_data.get("prompt_token_count")
output_tokens = usage_data.get("completionTokens") or usage_data.get("completion_token_count")
if input_tokens is not None and output_tokens is not None:
total_tokens = usage_data.get("total_tokens") or usage_data.get("totalTokens") or (input_tokens + output_tokens)
usage = {
"prompt_tokens": input_tokens,
"completion_tokens": output_tokens,
"total_tokens": total_tokens,
}
self.log(f"Extracted usage data from response body: {usage}")
return usage
# Method 2: Check for Gemini API format (usageMetadata)
if "usageMetadata" in body and isinstance(body["usageMetadata"], dict):
gemini_usage = body["usageMetadata"]
self.log(f"Found Gemini usage metadata: {gemini_usage}")
input_tokens = (
gemini_usage.get("promptTokenCount") or
gemini_usage.get("prompt_token_count") or
gemini_usage.get("inputTokens")
)
output_tokens = (
gemini_usage.get("candidatesTokenCount") or
gemini_usage.get("candidates_token_count") or
gemini_usage.get("outputTokens") or
gemini_usage.get("completionTokens")
)
total_tokens = (
gemini_usage.get("totalTokenCount") or
gemini_usage.get("total_token_count") or
gemini_usage.get("totalTokens")
)
if input_tokens is not None and output_tokens is not None:
if total_tokens is None:
total_tokens = input_tokens + output_tokens
usage = {
"prompt_tokens": input_tokens,
"completion_tokens": output_tokens,
"total_tokens": total_tokens,
}
self.log(f"Extracted Gemini usage data: {usage}")
return usage
# Method 3: Check assistant message for usage data (various formats)
assistant_message_obj = get_last_assistant_message_obj(body.get("messages", []))
if assistant_message_obj:
message_usage = assistant_message_obj.get("usage", {})
self.log(f"Assistant message usage: {message_usage}")
if isinstance(message_usage, dict):
# Try multiple field name variations for different providers
input_tokens = (
message_usage.get("input_tokens") or # Anthropic
message_usage.get("prompt_tokens") or # OpenAI
message_usage.get("prompt_eval_count") or # Some local models
message_usage.get("promptTokenCount") or # Gemini variants
message_usage.get("prompt_token_count") # Alternative naming
)
output_tokens = (
message_usage.get("output_tokens") or # Anthropic
message_usage.get("completion_tokens") or # OpenAI
message_usage.get("eval_count") or # Some local models
message_usage.get("candidatesTokenCount") or # Gemini variants
message_usage.get("completion_token_count") # Alternative naming
)
total_tokens = (
message_usage.get("total_tokens") or
message_usage.get("totalTokens") or
message_usage.get("totalTokenCount")
)
if input_tokens is not None and output_tokens is not None:
if total_tokens is None:
total_tokens = input_tokens + output_tokens
usage = {
"prompt_tokens": input_tokens,
"completion_tokens": output_tokens,
"total_tokens": total_tokens,
}
self.log(f"Extracted message-level usage data: {usage}")
return usage
# Method 4: Check for usage at individual message level (some APIs put it there)
if "messages" in body and isinstance(body["messages"], list):
for message in reversed(body["messages"]):
if message.get("role") == "assistant":
# Check multiple possible usage field locations
usage_sources = [
message.get("usage", {}),
message.get("usageMetadata", {}),
message.get("metadata", {}).get("usage", {}) if message.get("metadata") else {}
]
for msg_usage in usage_sources:
if isinstance(msg_usage, dict) and msg_usage:
self.log(f"Found message usage: {msg_usage}")
input_tokens = (
msg_usage.get("input_tokens") or
msg_usage.get("prompt_tokens") or
msg_usage.get("promptTokenCount") or
msg_usage.get("prompt_eval_count")
)
output_tokens = (
msg_usage.get("output_tokens") or
msg_usage.get("completion_tokens") or
msg_usage.get("candidatesTokenCount") or
msg_usage.get("eval_count")
)
total_tokens = (
msg_usage.get("total_tokens") or
msg_usage.get("totalTokens") or
msg_usage.get("totalTokenCount")
)
if input_tokens is not None and output_tokens is not None:
if total_tokens is None:
total_tokens = input_tokens + output_tokens
usage = {
"prompt_tokens": input_tokens,
"completion_tokens": output_tokens,
"total_tokens": total_tokens,
}
self.log(f"Extracted individual message usage: {usage}")
return usage
# Method 5: Check alternative response structures (some proxies/wrappers)
for alt_key in ["token_usage", "billing", "cost_info", "metrics"]:
if alt_key in body and isinstance(body[alt_key], dict):
alt_usage = body[alt_key]
self.log(f"Found alternative usage data in {alt_key}: {alt_usage}")
input_tokens = (
alt_usage.get("input_tokens") or
alt_usage.get("prompt_tokens") or
alt_usage.get("input") or
alt_usage.get("prompt")
)
output_tokens = (
alt_usage.get("output_tokens") or
alt_usage.get("completion_tokens") or
alt_usage.get("output") or
alt_usage.get("completion")
)
if input_tokens is not None and output_tokens is not None:
total_tokens = alt_usage.get("total_tokens") or alt_usage.get("total") or (input_tokens + output_tokens)
usage = {
"prompt_tokens": input_tokens,
"completion_tokens": output_tokens,
"total_tokens": total_tokens,
}
self.log(f"Extracted alternative usage data: {usage}")
return usage
self.log("No usage data found in any expected location")
if self.valves.debug:
# Log the full body structure to help debug
self.log(f"Full response body keys: {list(body.keys())}")
if "messages" in body and body["messages"]:
last_message = body["messages"][-1] if body["messages"] else {}
self.log(f"Last message keys: {list(last_message.keys()) if isinstance(last_message, dict) else 'Not a dict'}")
return None
async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
"""
Inlet handles the incoming request (usually a user message).
@ -284,28 +508,27 @@ class Pipeline:
# Body copy for span
span_body = body.copy()
# Get the last assistant message from the conversation
assistant_message_obj = get_last_assistant_message_obj(body["messages"])
# Extract usage if available
usage = None
self.log(f"Assistant message obj: {assistant_message_obj}")
if assistant_message_obj:
message_usage = assistant_message_obj.get("usage", {})
if isinstance(message_usage, dict):
input_tokens = message_usage.get(
"prompt_eval_count"
) or message_usage.get("prompt_tokens")
output_tokens = message_usage.get("eval_count") or message_usage.get(
"completion_tokens"
)
if input_tokens is not None and output_tokens is not None:
usage = {
"prompt_tokens": input_tokens,
"completion_tokens": output_tokens,
"total_tokens": input_tokens + output_tokens,
}
self.log(f"Usage data extracted: {usage}")
# FIXED: Extract usage data using improved method that supports multiple providers
metadata = body.get("metadata", {})
usage = self.extract_usage_data(body, metadata)
# Add provider and model information to usage data for better analytics
if usage:
provider = self.detect_provider_type(body, metadata)
model_info = metadata.get("model", {})
# Enhance usage data with provider context
usage.update({
"provider": provider,
"model_id": model_info.get("id", "unknown"),
"model_name": model_info.get("name", "unknown"),
})
self.log(f"Enhanced usage data with provider info: {usage}")
if usage:
self.log(f"Successfully extracted usage data: {usage}")
else:
self.log("No usage data found - this might indicate an API integration issue")
# Chat_id is already logged as trace thread
span_body.pop("chat_id", None)