From b6cc9ae1375460221994109b572e198e56b31bb4 Mon Sep 17 00:00:00 2001 From: Assaf Date: Tue, 24 Jun 2025 12:59:39 +0300 Subject: [PATCH] Update opik_filter_pipeline.py --- examples/filters/opik_filter_pipeline.py | 186 ++++++++++++----------- 1 file changed, 94 insertions(+), 92 deletions(-) diff --git a/examples/filters/opik_filter_pipeline.py b/examples/filters/opik_filter_pipeline.py index 5494488..3098e14 100644 --- a/examples/filters/opik_filter_pipeline.py +++ b/examples/filters/opik_filter_pipeline.py @@ -98,50 +98,47 @@ class Pipeline: f"Opik error: {e} Please re-enter your Opik credentials in the pipeline settings." ) - def cleanup_existing_trace(self, chat_id: str): - """Safely cleanup existing trace and span for a chat_id""" - try: - if chat_id in self.chat_spans: + def cleanup_stale_spans(self, chat_id: str): + """Clean up any existing span for a chat_id to prepare for a new one""" + if chat_id in self.chat_spans: + try: existing_span = self.chat_spans[chat_id] - try: - existing_span.end(output={"status": "interrupted", "reason": "new_request_received"}) - self.log(f"Ended existing span for chat_id: {chat_id}") - except Exception as e: - self.log(f"Warning: Could not end existing span: {e}") - - if chat_id in self.chat_traces: - existing_trace = self.chat_traces[chat_id] - try: - existing_trace.end(output={"status": "interrupted", "reason": "new_request_received"}) - self.log(f"Ended existing trace for chat_id: {chat_id}") - except Exception as e: - self.log(f"Warning: Could not end existing trace: {e}") - - # Clean up the dictionaries - self.chat_traces.pop(chat_id, None) - self.chat_spans.pop(chat_id, None) - self.log(f"Cleaned up existing trace/span for chat_id: {chat_id}") - - except Exception as e: - self.log(f"Error during cleanup for chat_id {chat_id}: {e}") - # Force cleanup even if there are errors - self.chat_traces.pop(chat_id, None) - self.chat_spans.pop(chat_id, None) + # End the previous span before creating a new one + existing_span.end(output={"status": "interrupted", "reason": "new_message_received"}) + self.log(f"Ended previous span for chat_id: {chat_id}") + except Exception as e: + self.log(f"Warning: Could not end existing span for {chat_id}: {e}") + finally: + # Always remove from tracking + self.chat_spans.pop(chat_id, None) - def cleanup_stale_traces(self, max_count: int = 100): + def cleanup_orphaned_traces(self, max_count: int = 100): """Clean up traces if we have too many active ones""" if len(self.chat_traces) > max_count: self.log(f"Too many active traces ({len(self.chat_traces)}), cleaning up oldest ones") # Clean up oldest traces (simple FIFO approach) chat_ids_to_remove = list(self.chat_traces.keys())[:len(self.chat_traces) - max_count + 10] for chat_id in chat_ids_to_remove: - self.cleanup_existing_trace(chat_id) + try: + if chat_id in self.chat_spans: + span = self.chat_spans[chat_id] + span.end(output={"status": "cleanup", "reason": "too_many_active_traces"}) + except: + pass + try: + if chat_id in self.chat_traces: + trace = self.chat_traces[chat_id] + trace.end(output={"status": "cleanup", "reason": "too_many_active_traces"}) + except: + pass + self.chat_traces.pop(chat_id, None) + self.chat_spans.pop(chat_id, None) async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: """ Inlet handles the incoming request (usually a user message). - If no trace exists yet for this chat_id, we create a new trace. - - If a trace does exist, we clean it up and create a new one. + - If a trace does exist, we reuse it and create a new span for the new user message. """ if self.valves.debug: print(f"[DEBUG] Received request: {json.dumps(body, indent=2)}") @@ -177,43 +174,59 @@ class Pipeline: user_email = user.get("email") if user else None - # FIXED: Instead of asserting, clean up any existing trace/span - if chat_id in self.chat_traces: - self.log(f"Found existing trace for chat_id {chat_id}, cleaning up...") - self.cleanup_existing_trace(chat_id) - # Periodic cleanup to prevent memory leaks - self.cleanup_stale_traces() + self.cleanup_orphaned_traces() - # Create a new trace and span - self.log(f"Creating new chat trace for chat_id: {chat_id}") + # FIXED: Check if trace already exists + trace = None + if chat_id in self.chat_traces: + # Reuse existing trace for continuing conversation + trace = self.chat_traces[chat_id] + self.log(f"Reusing existing trace for chat_id: {chat_id}") + + # Clean up any existing span to prepare for new one + self.cleanup_stale_spans(chat_id) + else: + # Create a new trace for new conversation + self.log(f"Creating new chat trace for chat_id: {chat_id}") + + try: + # Body copy for trace + trace_body = body.copy() + # Extract metadata from body + trace_metadata = trace_body.pop("metadata", {}) + trace_metadata.update({"chat_id": chat_id, "user_id": user_email}) + + # We don't need the model at the trace level + trace_body.pop("model", None) + + trace_payload = { + "name": f"{__name__}", + "input": trace_body, + "metadata": trace_metadata, + "thread_id": chat_id, + } + + if self.valves.debug: + print(f"[DEBUG] Opik trace request: {json.dumps(trace_payload, indent=2)}") + + trace = self.opik.trace(**trace_payload) + self.chat_traces[chat_id] = trace + self.log(f"New trace created for chat_id: {chat_id}") + + except Exception as e: + self.log(f"Error creating Opik trace for chat_id {chat_id}: {e}") + # Continue without Opik logging for this request + return body + + # Create a new span (whether trace is new or existing) try: - # Body copy for traces and span - trace_body = body.copy() + # Body copy for span span_body = body.copy() - # Extract metadata from body - metadata = trace_body.pop("metadata", {}) - metadata.update({"chat_id": chat_id, "user_id": user_email}) - - # We don't need the model at the trace level - trace_body.pop("model", None) - - trace_payload = { - "name": f"{__name__}", - "input": trace_body, - "metadata": metadata, - "thread_id": chat_id, - } - - if self.valves.debug: - print(f"[DEBUG] Opik trace request: {json.dumps(trace_payload, indent=2)}") - - trace = self.opik.trace(**trace_payload) - span_metadata = metadata.copy() - span_metadata.update({"interface": "open-webui"}) + span_metadata.update({"interface": "open-webui", "user_id": user_email}) # Extract the model from body span_body.pop("model", None) @@ -224,8 +237,11 @@ class Pipeline: model = span_metadata.get("model", {}).get("id", None) provider = span_metadata.get("model", {}).get("owned_by", None) + # Generate unique span name with timestamp + span_name = f"{chat_id}-{int(time.time() * 1000)}" + span_payload = { - "name": chat_id, + "name": span_name, "model": model, "provider": provider, "input": span_body, @@ -237,18 +253,12 @@ class Pipeline: print(f"[DEBUG] Opik span request: {json.dumps(span_payload, indent=2)}") span = trace.span(**span_payload) - - self.chat_traces[chat_id] = trace self.chat_spans[chat_id] = span - self.log(f"Trace and span objects successfully created for chat_id: {chat_id}") + self.log(f"New span created for chat_id: {chat_id}") except Exception as e: - self.log(f"Error creating Opik trace/span for chat_id {chat_id}: {e}") - # Clean up on error - self.chat_traces.pop(chat_id, None) - self.chat_spans.pop(chat_id, None) + self.log(f"Error creating Opik span for chat_id {chat_id}: {e}") # Don't fail the request, just log the error - self.log(f"Continuing without Opik logging for this request") return body @@ -261,19 +271,17 @@ class Pipeline: chat_id = body.get("chat_id") - # If no trace or span exist, attempt to register again - if chat_id not in self.chat_traces or chat_id not in self.chat_spans: + # If no span exists, we can't log this response + if chat_id not in self.chat_spans: self.log( - f"[WARNING] No matching chat trace found for chat_id: {chat_id}, chat won't be logged." + f"[WARNING] No active span found for chat_id: {chat_id}, response won't be logged." ) return body - trace = self.chat_traces[chat_id] span = self.chat_spans[chat_id] try: - # Body copy for traces and span - trace_body = body.copy() + # Body copy for span span_body = body.copy() # Get the last assistant message from the conversation @@ -314,31 +322,25 @@ class Pipeline: ) span.end(**span_payload) - self.log(f"span ended for chat_id: {chat_id}") - - # Chat_id is already logged as trace thread - trace_body.pop("chat_id", None) - - # Optionally update the trace with the final assistant output - trace.end(output=trace_body) - self.log(f"trace ended for chat_id: {chat_id}") + self.log(f"Span ended for chat_id: {chat_id}") except Exception as e: - self.log(f"Error ending Opik trace/span for chat_id {chat_id}: {e}") + self.log(f"Error ending Opik span for chat_id {chat_id}: {e}") # Try to end gracefully even if there are errors try: span.end(output={"status": "error", "error": str(e)}) except: pass - try: - trace.end(output={"status": "error", "error": str(e)}) - except: - pass finally: - # Always clean up the dictionaries, even if ending the trace/span failed - self.chat_traces.pop(chat_id, None) + # Clean up the span reference (but keep the trace for potential future messages) self.chat_spans.pop(chat_id, None) - self.log(f"Cleaned up trace/span references for chat_id: {chat_id}") + self.log(f"Cleaned up span reference for chat_id: {chat_id}") + + # NOTE: We deliberately DON'T clean up the trace here, as it should persist + # for the duration of the conversation. Traces will be cleaned up by: + # 1. The cleanup_orphaned_traces method when there are too many + # 2. Server restart/shutdown + # 3. Manual cleanup if needed return body