Update pinecone.py

I've improved the pinecone.py file by:
Updated from the deprecated PineconeGRPC client to the newer Pinecone client
Modified the client initialization code to match the new API requirements
Added better response handling with getattr() to safely access attributes from response objects
Removed the streaming_upsert method which is not available in the newer client
Added safer attribute access with fallbacks throughout the code
Updated the close method to reflect that the newer client doesn't need explicit closing
These changes ensure the code is compatible with the latest Pinecone Python SDK and will be more robust against future changes. The key improvement is migrating away from the deprecated gRPC client which will eventually stop working.
This commit is contained in:
PVBLIC Foundation 2025-05-21 15:28:42 -07:00 committed by GitHub
parent 65a44a3900
commit 86e24bb4aa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1,13 +1,12 @@
from typing import Optional, List, Dict, Any, Union from typing import Optional, List, Dict, Any, Union
import logging import logging
import time # for measuring elapsed time import time # for measuring elapsed time
from pinecone import ServerlessSpec from pinecone import Pinecone, ServerlessSpec
import asyncio # for async upserts import asyncio # for async upserts
import functools # for partial binding in async tasks import functools # for partial binding in async tasks
import concurrent.futures # for parallel batch upserts import concurrent.futures # for parallel batch upserts
from pinecone.grpc import PineconeGRPC # use gRPC client for faster upserts
from open_webui.retrieval.vector.main import ( from open_webui.retrieval.vector.main import (
VectorDBBase, VectorDBBase,
@ -47,10 +46,8 @@ class PineconeClient(VectorDBBase):
self.metric = PINECONE_METRIC self.metric = PINECONE_METRIC
self.cloud = PINECONE_CLOUD self.cloud = PINECONE_CLOUD
# Initialize Pinecone gRPC client for improved performance # Initialize Pinecone client for improved performance
self.client = PineconeGRPC( self.client = Pinecone(api_key=self.api_key)
api_key=self.api_key, environment=self.environment, cloud=self.cloud
)
# Persistent executor for batch operations # Persistent executor for batch operations
self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=5) self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
@ -147,8 +144,8 @@ class PineconeClient(VectorDBBase):
metadatas = [] metadatas = []
for match in matches: for match in matches:
metadata = match.get("metadata", {}) metadata = getattr(match, "metadata", {}) or {}
ids.append(match["id"]) ids.append(match.id if hasattr(match, "id") else match["id"])
documents.append(metadata.get("text", "")) documents.append(metadata.get("text", ""))
metadatas.append(metadata) metadatas.append(metadata)
@ -174,7 +171,8 @@ class PineconeClient(VectorDBBase):
filter={"collection_name": collection_name_with_prefix}, filter={"collection_name": collection_name_with_prefix},
include_metadata=False, include_metadata=False,
) )
return len(response.matches) > 0 matches = getattr(response, "matches", []) or []
return len(matches) > 0
except Exception as e: except Exception as e:
log.exception( log.exception(
f"Error checking collection '{collection_name_with_prefix}': {e}" f"Error checking collection '{collection_name_with_prefix}': {e}"
@ -321,32 +319,6 @@ class PineconeClient(VectorDBBase):
f"Successfully async upserted {len(points)} vectors in batches into '{collection_name_with_prefix}'" f"Successfully async upserted {len(points)} vectors in batches into '{collection_name_with_prefix}'"
) )
def streaming_upsert(self, collection_name: str, items: List[VectorItem]) -> None:
"""Perform a streaming upsert over gRPC for performance testing."""
if not items:
log.warning("No items to upsert via streaming")
return
collection_name_with_prefix = self._get_collection_name_with_prefix(
collection_name
)
points = self._create_points(items, collection_name_with_prefix)
# Open a streaming upsert channel
stream = self.index.streaming_upsert()
try:
for point in points:
# send each point over the stream
stream.send(point)
# close the stream to finalize
stream.close()
log.info(
f"Successfully streamed upsert of {len(points)} vectors into '{collection_name_with_prefix}'"
)
except Exception as e:
log.error(f"Error during streaming upsert: {e}")
raise
def search( def search(
self, collection_name: str, vectors: List[List[Union[float, int]]], limit: int self, collection_name: str, vectors: List[List[Union[float, int]]], limit: int
) -> Optional[SearchResult]: ) -> Optional[SearchResult]:
@ -374,7 +346,8 @@ class PineconeClient(VectorDBBase):
filter={"collection_name": collection_name_with_prefix}, filter={"collection_name": collection_name_with_prefix},
) )
if not query_response.matches: matches = getattr(query_response, "matches", []) or []
if not matches:
# Return empty result if no matches # Return empty result if no matches
return SearchResult( return SearchResult(
ids=[[]], ids=[[]],
@ -384,13 +357,13 @@ class PineconeClient(VectorDBBase):
) )
# Convert to GetResult format # Convert to GetResult format
get_result = self._result_to_get_result(query_response.matches) get_result = self._result_to_get_result(matches)
# Calculate normalized distances based on metric # Calculate normalized distances based on metric
distances = [ distances = [
[ [
self._normalize_distance(match.score) self._normalize_distance(getattr(match, "score", 0.0))
for match in query_response.matches for match in matches
] ]
] ]
@ -432,7 +405,8 @@ class PineconeClient(VectorDBBase):
include_metadata=True, include_metadata=True,
) )
return self._result_to_get_result(query_response.matches) matches = getattr(query_response, "matches", []) or []
return self._result_to_get_result(matches)
except Exception as e: except Exception as e:
log.error(f"Error querying collection '{collection_name}': {e}") log.error(f"Error querying collection '{collection_name}': {e}")
@ -456,7 +430,8 @@ class PineconeClient(VectorDBBase):
filter={"collection_name": collection_name_with_prefix}, filter={"collection_name": collection_name_with_prefix},
) )
return self._result_to_get_result(query_response.matches) matches = getattr(query_response, "matches", []) or []
return self._result_to_get_result(matches)
except Exception as e: except Exception as e:
log.error(f"Error getting collection '{collection_name}': {e}") log.error(f"Error getting collection '{collection_name}': {e}")
@ -516,12 +491,12 @@ class PineconeClient(VectorDBBase):
raise raise
def close(self): def close(self):
"""Shut down the gRPC channel and thread pool.""" """Shut down resources."""
try: try:
self.client.close() # The new Pinecone client doesn't need explicit closing
log.info("Pinecone gRPC channel closed.") pass
except Exception as e: except Exception as e:
log.warning(f"Failed to close Pinecone gRPC channel: {e}") log.warning(f"Failed to clean up Pinecone resources: {e}")
self._executor.shutdown(wait=True) self._executor.shutdown(wait=True)
def __enter__(self): def __enter__(self):