Update pinecone.py

This commit is contained in:
PVBLIC Foundation 2025-05-30 18:47:23 -07:00 committed by GitHub
parent 4ecf2a8685
commit 66bde32623
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -6,6 +6,7 @@ from pinecone import Pinecone, ServerlessSpec
# Add gRPC support for better performance (Pinecone best practice) # Add gRPC support for better performance (Pinecone best practice)
try: try:
from pinecone.grpc import PineconeGRPC from pinecone.grpc import PineconeGRPC
GRPC_AVAILABLE = True GRPC_AVAILABLE = True
except ImportError: except ImportError:
GRPC_AVAILABLE = False GRPC_AVAILABLE = False
@ -60,7 +61,7 @@ class PineconeClient(VectorDBBase):
self.client = PineconeGRPC( self.client = PineconeGRPC(
api_key=self.api_key, api_key=self.api_key,
pool_threads=20, # Improved connection pool size pool_threads=20, # Improved connection pool size
timeout=30 # Reasonable timeout for operations timeout=30, # Reasonable timeout for operations
) )
self.using_grpc = True self.using_grpc = True
log.info("Using Pinecone gRPC client for optimal performance") log.info("Using Pinecone gRPC client for optimal performance")
@ -69,7 +70,7 @@ class PineconeClient(VectorDBBase):
self.client = Pinecone( self.client = Pinecone(
api_key=self.api_key, api_key=self.api_key,
pool_threads=20, # Improved connection pool size pool_threads=20, # Improved connection pool size
timeout=30 # Reasonable timeout for operations timeout=30, # Reasonable timeout for operations
) )
self.using_grpc = False self.using_grpc = False
log.info("Using Pinecone HTTP client (gRPC not available)") log.info("Using Pinecone HTTP client (gRPC not available)")
@ -133,18 +134,34 @@ class PineconeClient(VectorDBBase):
except Exception as e: except Exception as e:
error_str = str(e).lower() error_str = str(e).lower()
# Check if it's a retryable error (rate limits, network issues, timeouts) # Check if it's a retryable error (rate limits, network issues, timeouts)
is_retryable = any(keyword in error_str for keyword in [ is_retryable = any(
'rate limit', 'quota', 'timeout', 'network', 'connection', keyword in error_str
'unavailable', 'internal error', '429', '500', '502', '503', '504' for keyword in [
]) "rate limit",
"quota",
"timeout",
"network",
"connection",
"unavailable",
"internal error",
"429",
"500",
"502",
"503",
"504",
]
)
if not is_retryable or attempt == max_retries - 1: if not is_retryable or attempt == max_retries - 1:
# Don't retry for non-retryable errors or on final attempt # Don't retry for non-retryable errors or on final attempt
raise raise
# Exponential backoff with jitter # Exponential backoff with jitter
delay = (2 ** attempt) + random.uniform(0, 1) delay = (2**attempt) + random.uniform(0, 1)
log.warning(f"Pinecone operation failed (attempt {attempt + 1}/{max_retries}), retrying in {delay:.2f}s: {e}") log.warning(
f"Pinecone operation failed (attempt {attempt + 1}/{max_retries}), "
f"retrying in {delay:.2f}s: {e}"
)
time.sleep(delay) time.sleep(delay)
def _create_points( def _create_points(
@ -273,7 +290,8 @@ class PineconeClient(VectorDBBase):
elapsed = time.time() - start_time elapsed = time.time() - start_time
log.debug(f"Insert of {len(points)} vectors took {elapsed:.2f} seconds") log.debug(f"Insert of {len(points)} vectors took {elapsed:.2f} seconds")
log.info( log.info(
f"Successfully inserted {len(points)} vectors in parallel batches into '{collection_name_with_prefix}'" f"Successfully inserted {len(points)} vectors in parallel batches "
f"into '{collection_name_with_prefix}'"
) )
def upsert(self, collection_name: str, items: List[VectorItem]) -> None: def upsert(self, collection_name: str, items: List[VectorItem]) -> None:
@ -304,7 +322,8 @@ class PineconeClient(VectorDBBase):
elapsed = time.time() - start_time elapsed = time.time() - start_time
log.debug(f"Upsert of {len(points)} vectors took {elapsed:.2f} seconds") log.debug(f"Upsert of {len(points)} vectors took {elapsed:.2f} seconds")
log.info( log.info(
f"Successfully upserted {len(points)} vectors in parallel batches into '{collection_name_with_prefix}'" f"Successfully upserted {len(points)} vectors in parallel batches "
f"into '{collection_name_with_prefix}'"
) )
async def insert_async(self, collection_name: str, items: List[VectorItem]) -> None: async def insert_async(self, collection_name: str, items: List[VectorItem]) -> None:
@ -335,7 +354,8 @@ class PineconeClient(VectorDBBase):
log.error(f"Error in async insert batch: {result}") log.error(f"Error in async insert batch: {result}")
raise result raise result
log.info( log.info(
f"Successfully async inserted {len(points)} vectors in batches into '{collection_name_with_prefix}'" f"Successfully async inserted {len(points)} vectors in batches "
f"into '{collection_name_with_prefix}'"
) )
async def upsert_async(self, collection_name: str, items: List[VectorItem]) -> None: async def upsert_async(self, collection_name: str, items: List[VectorItem]) -> None:
@ -366,7 +386,8 @@ class PineconeClient(VectorDBBase):
log.error(f"Error in async upsert batch: {result}") log.error(f"Error in async upsert batch: {result}")
raise result raise result
log.info( log.info(
f"Successfully async upserted {len(points)} vectors in batches into '{collection_name_with_prefix}'" f"Successfully async upserted {len(points)} vectors in batches "
f"into '{collection_name_with_prefix}'"
) )
def search( def search(
@ -507,10 +528,12 @@ class PineconeClient(VectorDBBase):
# This is a limitation of Pinecone - be careful with ID uniqueness # This is a limitation of Pinecone - be careful with ID uniqueness
self.index.delete(ids=batch_ids) self.index.delete(ids=batch_ids)
log.debug( log.debug(
f"Deleted batch of {len(batch_ids)} vectors by ID from '{collection_name_with_prefix}'" f"Deleted batch of {len(batch_ids)} vectors by ID "
f"from '{collection_name_with_prefix}'"
) )
log.info( log.info(
f"Successfully deleted {len(ids)} vectors by ID from '{collection_name_with_prefix}'" f"Successfully deleted {len(ids)} vectors by ID "
f"from '{collection_name_with_prefix}'"
) )
elif filter: elif filter: