mirror of
https://github.com/open-webui/pipelines
synced 2025-06-26 18:15:58 +00:00
Update opik_filter_pipeline.py
This commit is contained in:
parent
1e6bd15ad9
commit
b6cc9ae137
@ -98,50 +98,47 @@ class Pipeline:
|
|||||||
f"Opik error: {e} Please re-enter your Opik credentials in the pipeline settings."
|
f"Opik error: {e} Please re-enter your Opik credentials in the pipeline settings."
|
||||||
)
|
)
|
||||||
|
|
||||||
def cleanup_existing_trace(self, chat_id: str):
|
def cleanup_stale_spans(self, chat_id: str):
|
||||||
"""Safely cleanup existing trace and span for a chat_id"""
|
"""Clean up any existing span for a chat_id to prepare for a new one"""
|
||||||
try:
|
if chat_id in self.chat_spans:
|
||||||
if chat_id in self.chat_spans:
|
try:
|
||||||
existing_span = self.chat_spans[chat_id]
|
existing_span = self.chat_spans[chat_id]
|
||||||
try:
|
# End the previous span before creating a new one
|
||||||
existing_span.end(output={"status": "interrupted", "reason": "new_request_received"})
|
existing_span.end(output={"status": "interrupted", "reason": "new_message_received"})
|
||||||
self.log(f"Ended existing span for chat_id: {chat_id}")
|
self.log(f"Ended previous span for chat_id: {chat_id}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.log(f"Warning: Could not end existing span: {e}")
|
self.log(f"Warning: Could not end existing span for {chat_id}: {e}")
|
||||||
|
finally:
|
||||||
if chat_id in self.chat_traces:
|
# Always remove from tracking
|
||||||
existing_trace = self.chat_traces[chat_id]
|
self.chat_spans.pop(chat_id, None)
|
||||||
try:
|
|
||||||
existing_trace.end(output={"status": "interrupted", "reason": "new_request_received"})
|
|
||||||
self.log(f"Ended existing trace for chat_id: {chat_id}")
|
|
||||||
except Exception as e:
|
|
||||||
self.log(f"Warning: Could not end existing trace: {e}")
|
|
||||||
|
|
||||||
# Clean up the dictionaries
|
|
||||||
self.chat_traces.pop(chat_id, None)
|
|
||||||
self.chat_spans.pop(chat_id, None)
|
|
||||||
self.log(f"Cleaned up existing trace/span for chat_id: {chat_id}")
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
self.log(f"Error during cleanup for chat_id {chat_id}: {e}")
|
|
||||||
# Force cleanup even if there are errors
|
|
||||||
self.chat_traces.pop(chat_id, None)
|
|
||||||
self.chat_spans.pop(chat_id, None)
|
|
||||||
|
|
||||||
def cleanup_stale_traces(self, max_count: int = 100):
|
def cleanup_orphaned_traces(self, max_count: int = 100):
|
||||||
"""Clean up traces if we have too many active ones"""
|
"""Clean up traces if we have too many active ones"""
|
||||||
if len(self.chat_traces) > max_count:
|
if len(self.chat_traces) > max_count:
|
||||||
self.log(f"Too many active traces ({len(self.chat_traces)}), cleaning up oldest ones")
|
self.log(f"Too many active traces ({len(self.chat_traces)}), cleaning up oldest ones")
|
||||||
# Clean up oldest traces (simple FIFO approach)
|
# Clean up oldest traces (simple FIFO approach)
|
||||||
chat_ids_to_remove = list(self.chat_traces.keys())[:len(self.chat_traces) - max_count + 10]
|
chat_ids_to_remove = list(self.chat_traces.keys())[:len(self.chat_traces) - max_count + 10]
|
||||||
for chat_id in chat_ids_to_remove:
|
for chat_id in chat_ids_to_remove:
|
||||||
self.cleanup_existing_trace(chat_id)
|
try:
|
||||||
|
if chat_id in self.chat_spans:
|
||||||
|
span = self.chat_spans[chat_id]
|
||||||
|
span.end(output={"status": "cleanup", "reason": "too_many_active_traces"})
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
if chat_id in self.chat_traces:
|
||||||
|
trace = self.chat_traces[chat_id]
|
||||||
|
trace.end(output={"status": "cleanup", "reason": "too_many_active_traces"})
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
self.chat_traces.pop(chat_id, None)
|
||||||
|
self.chat_spans.pop(chat_id, None)
|
||||||
|
|
||||||
async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
|
async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
|
||||||
"""
|
"""
|
||||||
Inlet handles the incoming request (usually a user message).
|
Inlet handles the incoming request (usually a user message).
|
||||||
- If no trace exists yet for this chat_id, we create a new trace.
|
- If no trace exists yet for this chat_id, we create a new trace.
|
||||||
- If a trace does exist, we clean it up and create a new one.
|
- If a trace does exist, we reuse it and create a new span for the new user message.
|
||||||
"""
|
"""
|
||||||
if self.valves.debug:
|
if self.valves.debug:
|
||||||
print(f"[DEBUG] Received request: {json.dumps(body, indent=2)}")
|
print(f"[DEBUG] Received request: {json.dumps(body, indent=2)}")
|
||||||
@ -177,43 +174,59 @@ class Pipeline:
|
|||||||
|
|
||||||
user_email = user.get("email") if user else None
|
user_email = user.get("email") if user else None
|
||||||
|
|
||||||
# FIXED: Instead of asserting, clean up any existing trace/span
|
|
||||||
if chat_id in self.chat_traces:
|
|
||||||
self.log(f"Found existing trace for chat_id {chat_id}, cleaning up...")
|
|
||||||
self.cleanup_existing_trace(chat_id)
|
|
||||||
|
|
||||||
# Periodic cleanup to prevent memory leaks
|
# Periodic cleanup to prevent memory leaks
|
||||||
self.cleanup_stale_traces()
|
self.cleanup_orphaned_traces()
|
||||||
|
|
||||||
# Create a new trace and span
|
# FIXED: Check if trace already exists
|
||||||
self.log(f"Creating new chat trace for chat_id: {chat_id}")
|
trace = None
|
||||||
|
if chat_id in self.chat_traces:
|
||||||
|
# Reuse existing trace for continuing conversation
|
||||||
|
trace = self.chat_traces[chat_id]
|
||||||
|
self.log(f"Reusing existing trace for chat_id: {chat_id}")
|
||||||
|
|
||||||
|
# Clean up any existing span to prepare for new one
|
||||||
|
self.cleanup_stale_spans(chat_id)
|
||||||
|
else:
|
||||||
|
# Create a new trace for new conversation
|
||||||
|
self.log(f"Creating new chat trace for chat_id: {chat_id}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Body copy for trace
|
||||||
|
trace_body = body.copy()
|
||||||
|
|
||||||
|
# Extract metadata from body
|
||||||
|
trace_metadata = trace_body.pop("metadata", {})
|
||||||
|
trace_metadata.update({"chat_id": chat_id, "user_id": user_email})
|
||||||
|
|
||||||
|
# We don't need the model at the trace level
|
||||||
|
trace_body.pop("model", None)
|
||||||
|
|
||||||
|
trace_payload = {
|
||||||
|
"name": f"{__name__}",
|
||||||
|
"input": trace_body,
|
||||||
|
"metadata": trace_metadata,
|
||||||
|
"thread_id": chat_id,
|
||||||
|
}
|
||||||
|
|
||||||
|
if self.valves.debug:
|
||||||
|
print(f"[DEBUG] Opik trace request: {json.dumps(trace_payload, indent=2)}")
|
||||||
|
|
||||||
|
trace = self.opik.trace(**trace_payload)
|
||||||
|
self.chat_traces[chat_id] = trace
|
||||||
|
self.log(f"New trace created for chat_id: {chat_id}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.log(f"Error creating Opik trace for chat_id {chat_id}: {e}")
|
||||||
|
# Continue without Opik logging for this request
|
||||||
|
return body
|
||||||
|
|
||||||
|
# Create a new span (whether trace is new or existing)
|
||||||
try:
|
try:
|
||||||
# Body copy for traces and span
|
# Body copy for span
|
||||||
trace_body = body.copy()
|
|
||||||
span_body = body.copy()
|
span_body = body.copy()
|
||||||
|
|
||||||
# Extract metadata from body
|
|
||||||
metadata = trace_body.pop("metadata", {})
|
|
||||||
metadata.update({"chat_id": chat_id, "user_id": user_email})
|
|
||||||
|
|
||||||
# We don't need the model at the trace level
|
|
||||||
trace_body.pop("model", None)
|
|
||||||
|
|
||||||
trace_payload = {
|
|
||||||
"name": f"{__name__}",
|
|
||||||
"input": trace_body,
|
|
||||||
"metadata": metadata,
|
|
||||||
"thread_id": chat_id,
|
|
||||||
}
|
|
||||||
|
|
||||||
if self.valves.debug:
|
|
||||||
print(f"[DEBUG] Opik trace request: {json.dumps(trace_payload, indent=2)}")
|
|
||||||
|
|
||||||
trace = self.opik.trace(**trace_payload)
|
|
||||||
|
|
||||||
span_metadata = metadata.copy()
|
span_metadata = metadata.copy()
|
||||||
span_metadata.update({"interface": "open-webui"})
|
span_metadata.update({"interface": "open-webui", "user_id": user_email})
|
||||||
|
|
||||||
# Extract the model from body
|
# Extract the model from body
|
||||||
span_body.pop("model", None)
|
span_body.pop("model", None)
|
||||||
@ -224,8 +237,11 @@ class Pipeline:
|
|||||||
model = span_metadata.get("model", {}).get("id", None)
|
model = span_metadata.get("model", {}).get("id", None)
|
||||||
provider = span_metadata.get("model", {}).get("owned_by", None)
|
provider = span_metadata.get("model", {}).get("owned_by", None)
|
||||||
|
|
||||||
|
# Generate unique span name with timestamp
|
||||||
|
span_name = f"{chat_id}-{int(time.time() * 1000)}"
|
||||||
|
|
||||||
span_payload = {
|
span_payload = {
|
||||||
"name": chat_id,
|
"name": span_name,
|
||||||
"model": model,
|
"model": model,
|
||||||
"provider": provider,
|
"provider": provider,
|
||||||
"input": span_body,
|
"input": span_body,
|
||||||
@ -237,18 +253,12 @@ class Pipeline:
|
|||||||
print(f"[DEBUG] Opik span request: {json.dumps(span_payload, indent=2)}")
|
print(f"[DEBUG] Opik span request: {json.dumps(span_payload, indent=2)}")
|
||||||
|
|
||||||
span = trace.span(**span_payload)
|
span = trace.span(**span_payload)
|
||||||
|
|
||||||
self.chat_traces[chat_id] = trace
|
|
||||||
self.chat_spans[chat_id] = span
|
self.chat_spans[chat_id] = span
|
||||||
self.log(f"Trace and span objects successfully created for chat_id: {chat_id}")
|
self.log(f"New span created for chat_id: {chat_id}")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.log(f"Error creating Opik trace/span for chat_id {chat_id}: {e}")
|
self.log(f"Error creating Opik span for chat_id {chat_id}: {e}")
|
||||||
# Clean up on error
|
|
||||||
self.chat_traces.pop(chat_id, None)
|
|
||||||
self.chat_spans.pop(chat_id, None)
|
|
||||||
# Don't fail the request, just log the error
|
# Don't fail the request, just log the error
|
||||||
self.log(f"Continuing without Opik logging for this request")
|
|
||||||
|
|
||||||
return body
|
return body
|
||||||
|
|
||||||
@ -261,19 +271,17 @@ class Pipeline:
|
|||||||
|
|
||||||
chat_id = body.get("chat_id")
|
chat_id = body.get("chat_id")
|
||||||
|
|
||||||
# If no trace or span exist, attempt to register again
|
# If no span exists, we can't log this response
|
||||||
if chat_id not in self.chat_traces or chat_id not in self.chat_spans:
|
if chat_id not in self.chat_spans:
|
||||||
self.log(
|
self.log(
|
||||||
f"[WARNING] No matching chat trace found for chat_id: {chat_id}, chat won't be logged."
|
f"[WARNING] No active span found for chat_id: {chat_id}, response won't be logged."
|
||||||
)
|
)
|
||||||
return body
|
return body
|
||||||
|
|
||||||
trace = self.chat_traces[chat_id]
|
|
||||||
span = self.chat_spans[chat_id]
|
span = self.chat_spans[chat_id]
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Body copy for traces and span
|
# Body copy for span
|
||||||
trace_body = body.copy()
|
|
||||||
span_body = body.copy()
|
span_body = body.copy()
|
||||||
|
|
||||||
# Get the last assistant message from the conversation
|
# Get the last assistant message from the conversation
|
||||||
@ -314,31 +322,25 @@ class Pipeline:
|
|||||||
)
|
)
|
||||||
|
|
||||||
span.end(**span_payload)
|
span.end(**span_payload)
|
||||||
self.log(f"span ended for chat_id: {chat_id}")
|
self.log(f"Span ended for chat_id: {chat_id}")
|
||||||
|
|
||||||
# Chat_id is already logged as trace thread
|
|
||||||
trace_body.pop("chat_id", None)
|
|
||||||
|
|
||||||
# Optionally update the trace with the final assistant output
|
|
||||||
trace.end(output=trace_body)
|
|
||||||
self.log(f"trace ended for chat_id: {chat_id}")
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.log(f"Error ending Opik trace/span for chat_id {chat_id}: {e}")
|
self.log(f"Error ending Opik span for chat_id {chat_id}: {e}")
|
||||||
# Try to end gracefully even if there are errors
|
# Try to end gracefully even if there are errors
|
||||||
try:
|
try:
|
||||||
span.end(output={"status": "error", "error": str(e)})
|
span.end(output={"status": "error", "error": str(e)})
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
try:
|
|
||||||
trace.end(output={"status": "error", "error": str(e)})
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
# Always clean up the dictionaries, even if ending the trace/span failed
|
# Clean up the span reference (but keep the trace for potential future messages)
|
||||||
self.chat_traces.pop(chat_id, None)
|
|
||||||
self.chat_spans.pop(chat_id, None)
|
self.chat_spans.pop(chat_id, None)
|
||||||
self.log(f"Cleaned up trace/span references for chat_id: {chat_id}")
|
self.log(f"Cleaned up span reference for chat_id: {chat_id}")
|
||||||
|
|
||||||
|
# NOTE: We deliberately DON'T clean up the trace here, as it should persist
|
||||||
|
# for the duration of the conversation. Traces will be cleaned up by:
|
||||||
|
# 1. The cleanup_orphaned_traces method when there are too many
|
||||||
|
# 2. Server restart/shutdown
|
||||||
|
# 3. Manual cleanup if needed
|
||||||
|
|
||||||
return body
|
return body
|
||||||
|
Loading…
Reference in New Issue
Block a user