diff --git a/backend/open_webui/retrieval/vector/dbs/pinecone.py b/backend/open_webui/retrieval/vector/dbs/pinecone.py index 9f8abf460..8291332c0 100644 --- a/backend/open_webui/retrieval/vector/dbs/pinecone.py +++ b/backend/open_webui/retrieval/vector/dbs/pinecone.py @@ -3,10 +3,19 @@ import logging import time # for measuring elapsed time from pinecone import Pinecone, ServerlessSpec +# Add gRPC support for better performance (Pinecone best practice) +try: + from pinecone.grpc import PineconeGRPC + + GRPC_AVAILABLE = True +except ImportError: + GRPC_AVAILABLE = False + import asyncio # for async upserts import functools # for partial binding in async tasks import concurrent.futures # for parallel batch upserts +import random # for jitter in retry backoff from open_webui.retrieval.vector.main import ( VectorDBBase, @@ -47,7 +56,24 @@ class PineconeClient(VectorDBBase): self.cloud = PINECONE_CLOUD # Initialize Pinecone client for improved performance - self.client = Pinecone(api_key=self.api_key) + if GRPC_AVAILABLE: + # Use gRPC client for better performance (Pinecone recommendation) + self.client = PineconeGRPC( + api_key=self.api_key, + pool_threads=20, # Improved connection pool size + timeout=30, # Reasonable timeout for operations + ) + self.using_grpc = True + log.info("Using Pinecone gRPC client for optimal performance") + else: + # Fallback to HTTP client with enhanced connection pooling + self.client = Pinecone( + api_key=self.api_key, + pool_threads=20, # Improved connection pool size + timeout=30, # Reasonable timeout for operations + ) + self.using_grpc = False + log.info("Using Pinecone HTTP client (gRPC not available)") # Persistent executor for batch operations self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=5) @@ -91,12 +117,53 @@ class PineconeClient(VectorDBBase): log.info(f"Using existing Pinecone index '{self.index_name}'") # Connect to the index - self.index = self.client.Index(self.index_name) + self.index = self.client.Index( + self.index_name, + pool_threads=20, # Enhanced connection pool for index operations + ) except Exception as e: log.error(f"Failed to initialize Pinecone index: {e}") raise RuntimeError(f"Failed to initialize Pinecone index: {e}") + def _retry_pinecone_operation(self, operation_func, max_retries=3): + """Retry Pinecone operations with exponential backoff for rate limits and network issues.""" + for attempt in range(max_retries): + try: + return operation_func() + except Exception as e: + error_str = str(e).lower() + # Check if it's a retryable error (rate limits, network issues, timeouts) + is_retryable = any( + keyword in error_str + for keyword in [ + "rate limit", + "quota", + "timeout", + "network", + "connection", + "unavailable", + "internal error", + "429", + "500", + "502", + "503", + "504", + ] + ) + + if not is_retryable or attempt == max_retries - 1: + # Don't retry for non-retryable errors or on final attempt + raise + + # Exponential backoff with jitter + delay = (2**attempt) + random.uniform(0, 1) + log.warning( + f"Pinecone operation failed (attempt {attempt + 1}/{max_retries}), " + f"retrying in {delay:.2f}s: {e}" + ) + time.sleep(delay) + def _create_points( self, items: List[VectorItem], collection_name_with_prefix: str ) -> List[Dict[str, Any]]: @@ -223,7 +290,8 @@ class PineconeClient(VectorDBBase): elapsed = time.time() - start_time log.debug(f"Insert of {len(points)} vectors took {elapsed:.2f} seconds") log.info( - f"Successfully inserted {len(points)} vectors in parallel batches into '{collection_name_with_prefix}'" + f"Successfully inserted {len(points)} vectors in parallel batches " + f"into '{collection_name_with_prefix}'" ) def upsert(self, collection_name: str, items: List[VectorItem]) -> None: @@ -254,7 +322,8 @@ class PineconeClient(VectorDBBase): elapsed = time.time() - start_time log.debug(f"Upsert of {len(points)} vectors took {elapsed:.2f} seconds") log.info( - f"Successfully upserted {len(points)} vectors in parallel batches into '{collection_name_with_prefix}'" + f"Successfully upserted {len(points)} vectors in parallel batches " + f"into '{collection_name_with_prefix}'" ) async def insert_async(self, collection_name: str, items: List[VectorItem]) -> None: @@ -285,7 +354,8 @@ class PineconeClient(VectorDBBase): log.error(f"Error in async insert batch: {result}") raise result log.info( - f"Successfully async inserted {len(points)} vectors in batches into '{collection_name_with_prefix}'" + f"Successfully async inserted {len(points)} vectors in batches " + f"into '{collection_name_with_prefix}'" ) async def upsert_async(self, collection_name: str, items: List[VectorItem]) -> None: @@ -316,7 +386,8 @@ class PineconeClient(VectorDBBase): log.error(f"Error in async upsert batch: {result}") raise result log.info( - f"Successfully async upserted {len(points)} vectors in batches into '{collection_name_with_prefix}'" + f"Successfully async upserted {len(points)} vectors in batches " + f"into '{collection_name_with_prefix}'" ) def search( @@ -457,10 +528,12 @@ class PineconeClient(VectorDBBase): # This is a limitation of Pinecone - be careful with ID uniqueness self.index.delete(ids=batch_ids) log.debug( - f"Deleted batch of {len(batch_ids)} vectors by ID from '{collection_name_with_prefix}'" + f"Deleted batch of {len(batch_ids)} vectors by ID " + f"from '{collection_name_with_prefix}'" ) log.info( - f"Successfully deleted {len(ids)} vectors by ID from '{collection_name_with_prefix}'" + f"Successfully deleted {len(ids)} vectors by ID " + f"from '{collection_name_with_prefix}'" ) elif filter: