diff --git a/backend/open_webui/retrieval/vector/dbs/pinecone.py b/backend/open_webui/retrieval/vector/dbs/pinecone.py index 9f8abf460..982258880 100644 --- a/backend/open_webui/retrieval/vector/dbs/pinecone.py +++ b/backend/open_webui/retrieval/vector/dbs/pinecone.py @@ -3,10 +3,18 @@ import logging import time # for measuring elapsed time from pinecone import Pinecone, ServerlessSpec +# Add gRPC support for better performance (Pinecone best practice) +try: + from pinecone.grpc import PineconeGRPC + GRPC_AVAILABLE = True +except ImportError: + GRPC_AVAILABLE = False + import asyncio # for async upserts import functools # for partial binding in async tasks import concurrent.futures # for parallel batch upserts +import random # for jitter in retry backoff from open_webui.retrieval.vector.main import ( VectorDBBase, @@ -47,7 +55,24 @@ class PineconeClient(VectorDBBase): self.cloud = PINECONE_CLOUD # Initialize Pinecone client for improved performance - self.client = Pinecone(api_key=self.api_key) + if GRPC_AVAILABLE: + # Use gRPC client for better performance (Pinecone recommendation) + self.client = PineconeGRPC( + api_key=self.api_key, + pool_threads=20, # Improved connection pool size + timeout=30 # Reasonable timeout for operations + ) + self.using_grpc = True + log.info("Using Pinecone gRPC client for optimal performance") + else: + # Fallback to HTTP client with enhanced connection pooling + self.client = Pinecone( + api_key=self.api_key, + pool_threads=20, # Improved connection pool size + timeout=30 # Reasonable timeout for operations + ) + self.using_grpc = False + log.info("Using Pinecone HTTP client (gRPC not available)") # Persistent executor for batch operations self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=5) @@ -91,12 +116,37 @@ class PineconeClient(VectorDBBase): log.info(f"Using existing Pinecone index '{self.index_name}'") # Connect to the index - self.index = self.client.Index(self.index_name) + self.index = self.client.Index( + self.index_name, + pool_threads=20, # Enhanced connection pool for index operations + ) except Exception as e: log.error(f"Failed to initialize Pinecone index: {e}") raise RuntimeError(f"Failed to initialize Pinecone index: {e}") + def _retry_pinecone_operation(self, operation_func, max_retries=3): + """Retry Pinecone operations with exponential backoff for rate limits and network issues.""" + for attempt in range(max_retries): + try: + return operation_func() + except Exception as e: + error_str = str(e).lower() + # Check if it's a retryable error (rate limits, network issues, timeouts) + is_retryable = any(keyword in error_str for keyword in [ + 'rate limit', 'quota', 'timeout', 'network', 'connection', + 'unavailable', 'internal error', '429', '500', '502', '503', '504' + ]) + + if not is_retryable or attempt == max_retries - 1: + # Don't retry for non-retryable errors or on final attempt + raise + + # Exponential backoff with jitter + delay = (2 ** attempt) + random.uniform(0, 1) + log.warning(f"Pinecone operation failed (attempt {attempt + 1}/{max_retries}), retrying in {delay:.2f}s: {e}") + time.sleep(delay) + def _create_points( self, items: List[VectorItem], collection_name_with_prefix: str ) -> List[Dict[str, Any]]: