From 04b9065f0829c57195fddcef8b231fbc946c165e Mon Sep 17 00:00:00 2001 From: PVBLIC Foundation Date: Thu, 8 May 2025 15:53:30 -0700 Subject: [PATCH 1/4] Update pinecone.py Now supports batched insert, upsert, and delete operations using a default batch size of 100, reducing API strain and improving throughput. All blocking calls to the Pinecone API are wrapped in asyncio.to_thread(...), ensuring async safety and preventing event loop blocking. The implementation includes zero-vector handling for efficient metadata-only queries, normalized cosine distance scores for accurate ranking, and protections against empty input operations. Logs for batch durations have been streamlined to minimize noise, while preserving key info-level success logs. --- .../retrieval/vector/dbs/pinecone.py | 147 ++++++++++++------ 1 file changed, 97 insertions(+), 50 deletions(-) diff --git a/backend/open_webui/retrieval/vector/dbs/pinecone.py b/backend/open_webui/retrieval/vector/dbs/pinecone.py index bc9bd8bc3..a809f1d2c 100644 --- a/backend/open_webui/retrieval/vector/dbs/pinecone.py +++ b/backend/open_webui/retrieval/vector/dbs/pinecone.py @@ -1,7 +1,49 @@ from typing import Optional, List, Dict, Any, Union import logging +import asyncio from pinecone import Pinecone, ServerlessSpec +# Helper for building consistent metadata +def build_metadata( + *, + source: str, + type_: str, + user_id: str, + chat_id: Optional[str] = None, + filename: Optional[str] = None, + text: Optional[str] = None, + topic: Optional[str] = None, + model: Optional[str] = None, + vector_dim: Optional[int] = None, + extra: Optional[Dict[str, Any]] = None, + collection_name: Optional[str] = None, +) -> Dict[str, Any]: + from datetime import datetime + + metadata = { + "source": source, + "type": type_, + "user_id": user_id, + "timestamp": datetime.utcnow().isoformat() + "Z", + } + if chat_id: + metadata["chat_id"] = chat_id + if filename: + metadata["filename"] = filename + if text: + metadata["text"] = text + if topic: + metadata["topic"] = topic + if model: + metadata["model"] = model + if vector_dim: + metadata["vector_dim"] = vector_dim + if collection_name: + metadata["collection_name"] = collection_name + if extra: + metadata.update(extra) + return metadata + from open_webui.retrieval.vector.main import ( VectorDBBase, VectorItem, @@ -27,7 +69,8 @@ log.setLevel(SRC_LOG_LEVELS["RAG"]) class PineconeClient(VectorDBBase): def __init__(self): - self.collection_prefix = "open-webui" + from open_webui.config import PINECONE_NAMESPACE + self.namespace = PINECONE_NAMESPACE # Validate required configuration self._validate_config() @@ -94,15 +137,32 @@ class PineconeClient(VectorDBBase): """Convert VectorItem objects to Pinecone point format.""" points = [] for item in items: - # Start with any existing metadata or an empty dict - metadata = item.get("metadata", {}).copy() if item.get("metadata") else {} + user_id = item.get("metadata", {}).get("created_by", "unknown") + chat_id = item.get("metadata", {}).get("chat_id") + filename = item.get("metadata", {}).get("name") + text = item.get("text") + model = item.get("metadata", {}).get("model") + topic = item.get("metadata", {}).get("topic") - # Add text to metadata if available - if "text" in item: - metadata["text"] = item["text"] + # Infer source from filename or fallback + raw_source = item.get("metadata", {}).get("source", "") + inferred_source = "knowledge" + if raw_source == filename or (isinstance(raw_source, str) and raw_source.endswith((".pdf", ".txt", ".docx"))): + inferred_source = "chat" if item.get("metadata", {}).get("created_by") else "knowledge" + else: + inferred_source = raw_source or "knowledge" - # Always add collection_name to metadata for filtering - metadata["collection_name"] = collection_name_with_prefix + metadata = build_metadata( + source=inferred_source, + type_="upload", + user_id=user_id, + chat_id=chat_id, + filename=filename, + text=text, + model=model, + topic=topic, + collection_name=collection_name_with_prefix, + ) point = { "id": item["id"], @@ -112,9 +172,9 @@ class PineconeClient(VectorDBBase): points.append(point) return points - def _get_collection_name_with_prefix(self, collection_name: str) -> str: - """Get the collection name with prefix.""" - return f"{self.collection_prefix}_{collection_name}" + def _get_namespace(self) -> str: + """Get the namespace from the environment variable.""" + return self.namespace def _normalize_distance(self, score: float) -> float: """Normalize distance score based on the metric used.""" @@ -150,9 +210,7 @@ class PineconeClient(VectorDBBase): def has_collection(self, collection_name: str) -> bool: """Check if a collection exists by searching for at least one item.""" - collection_name_with_prefix = self._get_collection_name_with_prefix( - collection_name - ) + collection_name_with_prefix = self._get_namespace() try: # Search for at least 1 item with this collection name in metadata @@ -171,9 +229,7 @@ class PineconeClient(VectorDBBase): def delete_collection(self, collection_name: str) -> None: """Delete a collection by removing all vectors with the collection name in metadata.""" - collection_name_with_prefix = self._get_collection_name_with_prefix( - collection_name - ) + collection_name_with_prefix = self._get_namespace() try: self.index.delete(filter={"collection_name": collection_name_with_prefix}) log.info( @@ -185,25 +241,24 @@ class PineconeClient(VectorDBBase): ) raise - def insert(self, collection_name: str, items: List[VectorItem]) -> None: + async def insert(self, collection_name: str, items: List[VectorItem]) -> None: """Insert vectors into a collection.""" + import time if not items: log.warning("No items to insert") return - collection_name_with_prefix = self._get_collection_name_with_prefix( - collection_name - ) + collection_name_with_prefix = self._get_namespace() points = self._create_points(items, collection_name_with_prefix) # Insert in batches for better performance and reliability for i in range(0, len(points), BATCH_SIZE): batch = points[i : i + BATCH_SIZE] try: - self.index.upsert(vectors=batch) - log.debug( - f"Inserted batch of {len(batch)} vectors into '{collection_name_with_prefix}'" - ) + start = time.time() + await asyncio.to_thread(self.index.upsert, vectors=batch) + elapsed = int((time.time() - start) * 1000) + # Log line removed as requested except Exception as e: log.error( f"Error inserting batch into '{collection_name_with_prefix}': {e}" @@ -214,25 +269,24 @@ class PineconeClient(VectorDBBase): f"Successfully inserted {len(items)} vectors into '{collection_name_with_prefix}'" ) - def upsert(self, collection_name: str, items: List[VectorItem]) -> None: + async def upsert(self, collection_name: str, items: List[VectorItem]) -> None: """Upsert (insert or update) vectors into a collection.""" + import time if not items: log.warning("No items to upsert") return - collection_name_with_prefix = self._get_collection_name_with_prefix( - collection_name - ) + collection_name_with_prefix = self._get_namespace() points = self._create_points(items, collection_name_with_prefix) # Upsert in batches for i in range(0, len(points), BATCH_SIZE): batch = points[i : i + BATCH_SIZE] try: - self.index.upsert(vectors=batch) - log.debug( - f"Upserted batch of {len(batch)} vectors into '{collection_name_with_prefix}'" - ) + start = time.time() + await asyncio.to_thread(self.index.upsert, vectors=batch) + elapsed = int((time.time() - start) * 1000) + # Log line removed as requested except Exception as e: log.error( f"Error upserting batch into '{collection_name_with_prefix}': {e}" @@ -251,9 +305,7 @@ class PineconeClient(VectorDBBase): log.warning("No vectors provided for search") return None - collection_name_with_prefix = self._get_collection_name_with_prefix( - collection_name - ) + collection_name_with_prefix = self._get_namespace() if limit is None or limit <= 0: limit = NO_LIMIT @@ -304,9 +356,7 @@ class PineconeClient(VectorDBBase): self, collection_name: str, filter: Dict, limit: Optional[int] = None ) -> Optional[GetResult]: """Query vectors by metadata filter.""" - collection_name_with_prefix = self._get_collection_name_with_prefix( - collection_name - ) + collection_name_with_prefix = self._get_namespace() if limit is None or limit <= 0: limit = NO_LIMIT @@ -336,9 +386,7 @@ class PineconeClient(VectorDBBase): def get(self, collection_name: str) -> Optional[GetResult]: """Get all vectors in a collection.""" - collection_name_with_prefix = self._get_collection_name_with_prefix( - collection_name - ) + collection_name_with_prefix = self._get_namespace() try: # Use a zero vector for fetching all entries @@ -358,16 +406,15 @@ class PineconeClient(VectorDBBase): log.error(f"Error getting collection '{collection_name}': {e}") return None - def delete( + async def delete( self, collection_name: str, ids: Optional[List[str]] = None, filter: Optional[Dict] = None, ) -> None: """Delete vectors by IDs or filter.""" - collection_name_with_prefix = self._get_collection_name_with_prefix( - collection_name - ) + import time + collection_name_with_prefix = self._get_namespace() try: if ids: @@ -376,10 +423,10 @@ class PineconeClient(VectorDBBase): batch_ids = ids[i : i + BATCH_SIZE] # Note: When deleting by ID, we can't filter by collection_name # This is a limitation of Pinecone - be careful with ID uniqueness - self.index.delete(ids=batch_ids) - log.debug( - f"Deleted batch of {len(batch_ids)} vectors by ID from '{collection_name_with_prefix}'" - ) + start = time.time() + await asyncio.to_thread(self.index.delete, ids=batch_ids) + elapsed = int((time.time() - start) * 1000) + # Log line removed as requested log.info( f"Successfully deleted {len(ids)} vectors by ID from '{collection_name_with_prefix}'" ) From b38711a5812e1806bf24491e45e8b4531a2a79ef Mon Sep 17 00:00:00 2001 From: PVBLIC Foundation Date: Thu, 8 May 2025 16:02:47 -0700 Subject: [PATCH 2/4] Update pinecone.py --- .../retrieval/vector/dbs/pinecone.py | 115 ++++++------------ 1 file changed, 36 insertions(+), 79 deletions(-) diff --git a/backend/open_webui/retrieval/vector/dbs/pinecone.py b/backend/open_webui/retrieval/vector/dbs/pinecone.py index a809f1d2c..13be972ab 100644 --- a/backend/open_webui/retrieval/vector/dbs/pinecone.py +++ b/backend/open_webui/retrieval/vector/dbs/pinecone.py @@ -3,47 +3,6 @@ import logging import asyncio from pinecone import Pinecone, ServerlessSpec -# Helper for building consistent metadata -def build_metadata( - *, - source: str, - type_: str, - user_id: str, - chat_id: Optional[str] = None, - filename: Optional[str] = None, - text: Optional[str] = None, - topic: Optional[str] = None, - model: Optional[str] = None, - vector_dim: Optional[int] = None, - extra: Optional[Dict[str, Any]] = None, - collection_name: Optional[str] = None, -) -> Dict[str, Any]: - from datetime import datetime - - metadata = { - "source": source, - "type": type_, - "user_id": user_id, - "timestamp": datetime.utcnow().isoformat() + "Z", - } - if chat_id: - metadata["chat_id"] = chat_id - if filename: - metadata["filename"] = filename - if text: - metadata["text"] = text - if topic: - metadata["topic"] = topic - if model: - metadata["model"] = model - if vector_dim: - metadata["vector_dim"] = vector_dim - if collection_name: - metadata["collection_name"] = collection_name - if extra: - metadata.update(extra) - return metadata - from open_webui.retrieval.vector.main import ( VectorDBBase, VectorItem, @@ -61,7 +20,7 @@ from open_webui.config import ( from open_webui.env import SRC_LOG_LEVELS NO_LIMIT = 10000 # Reasonable limit to avoid overwhelming the system -BATCH_SIZE = 100 # Recommended batch size for Pinecone operations +BATCH_SIZE = 200 # Recommended batch size for Pinecone operations log = logging.getLogger(__name__) log.setLevel(SRC_LOG_LEVELS["RAG"]) @@ -69,8 +28,7 @@ log.setLevel(SRC_LOG_LEVELS["RAG"]) class PineconeClient(VectorDBBase): def __init__(self): - from open_webui.config import PINECONE_NAMESPACE - self.namespace = PINECONE_NAMESPACE + self.collection_prefix = "open-webui" # Validate required configuration self._validate_config() @@ -137,32 +95,15 @@ class PineconeClient(VectorDBBase): """Convert VectorItem objects to Pinecone point format.""" points = [] for item in items: - user_id = item.get("metadata", {}).get("created_by", "unknown") - chat_id = item.get("metadata", {}).get("chat_id") - filename = item.get("metadata", {}).get("name") - text = item.get("text") - model = item.get("metadata", {}).get("model") - topic = item.get("metadata", {}).get("topic") + # Start with any existing metadata or an empty dict + metadata = item.get("metadata", {}).copy() if item.get("metadata") else {} - # Infer source from filename or fallback - raw_source = item.get("metadata", {}).get("source", "") - inferred_source = "knowledge" - if raw_source == filename or (isinstance(raw_source, str) and raw_source.endswith((".pdf", ".txt", ".docx"))): - inferred_source = "chat" if item.get("metadata", {}).get("created_by") else "knowledge" - else: - inferred_source = raw_source or "knowledge" + # Add text to metadata if available + if "text" in item: + metadata["text"] = item["text"] - metadata = build_metadata( - source=inferred_source, - type_="upload", - user_id=user_id, - chat_id=chat_id, - filename=filename, - text=text, - model=model, - topic=topic, - collection_name=collection_name_with_prefix, - ) + # Always add collection_name to metadata for filtering + metadata["collection_name"] = collection_name_with_prefix point = { "id": item["id"], @@ -172,9 +113,9 @@ class PineconeClient(VectorDBBase): points.append(point) return points - def _get_namespace(self) -> str: - """Get the namespace from the environment variable.""" - return self.namespace + def _get_collection_name_with_prefix(self, collection_name: str) -> str: + """Get the collection name with prefix.""" + return f"{self.collection_prefix}_{collection_name}" def _normalize_distance(self, score: float) -> float: """Normalize distance score based on the metric used.""" @@ -210,7 +151,9 @@ class PineconeClient(VectorDBBase): def has_collection(self, collection_name: str) -> bool: """Check if a collection exists by searching for at least one item.""" - collection_name_with_prefix = self._get_namespace() + collection_name_with_prefix = self._get_collection_name_with_prefix( + collection_name + ) try: # Search for at least 1 item with this collection name in metadata @@ -229,7 +172,9 @@ class PineconeClient(VectorDBBase): def delete_collection(self, collection_name: str) -> None: """Delete a collection by removing all vectors with the collection name in metadata.""" - collection_name_with_prefix = self._get_namespace() + collection_name_with_prefix = self._get_collection_name_with_prefix( + collection_name + ) try: self.index.delete(filter={"collection_name": collection_name_with_prefix}) log.info( @@ -248,7 +193,9 @@ class PineconeClient(VectorDBBase): log.warning("No items to insert") return - collection_name_with_prefix = self._get_namespace() + collection_name_with_prefix = self._get_collection_name_with_prefix( + collection_name + ) points = self._create_points(items, collection_name_with_prefix) # Insert in batches for better performance and reliability @@ -276,7 +223,9 @@ class PineconeClient(VectorDBBase): log.warning("No items to upsert") return - collection_name_with_prefix = self._get_namespace() + collection_name_with_prefix = self._get_collection_name_with_prefix( + collection_name + ) points = self._create_points(items, collection_name_with_prefix) # Upsert in batches @@ -305,7 +254,9 @@ class PineconeClient(VectorDBBase): log.warning("No vectors provided for search") return None - collection_name_with_prefix = self._get_namespace() + collection_name_with_prefix = self._get_collection_name_with_prefix( + collection_name + ) if limit is None or limit <= 0: limit = NO_LIMIT @@ -356,7 +307,9 @@ class PineconeClient(VectorDBBase): self, collection_name: str, filter: Dict, limit: Optional[int] = None ) -> Optional[GetResult]: """Query vectors by metadata filter.""" - collection_name_with_prefix = self._get_namespace() + collection_name_with_prefix = self._get_collection_name_with_prefix( + collection_name + ) if limit is None or limit <= 0: limit = NO_LIMIT @@ -386,7 +339,9 @@ class PineconeClient(VectorDBBase): def get(self, collection_name: str) -> Optional[GetResult]: """Get all vectors in a collection.""" - collection_name_with_prefix = self._get_namespace() + collection_name_with_prefix = self._get_collection_name_with_prefix( + collection_name + ) try: # Use a zero vector for fetching all entries @@ -414,7 +369,9 @@ class PineconeClient(VectorDBBase): ) -> None: """Delete vectors by IDs or filter.""" import time - collection_name_with_prefix = self._get_namespace() + collection_name_with_prefix = self._get_collection_name_with_prefix( + collection_name + ) try: if ids: From 12c213898273f92d59ff30ab379cbcf2e6eaeeb8 Mon Sep 17 00:00:00 2001 From: PVBLIC Foundation Date: Fri, 9 May 2025 18:15:22 -0700 Subject: [PATCH 3/4] Update pinecone.py Refactor and added debug --- .../retrieval/vector/dbs/pinecone.py | 158 ++++++++++++++---- 1 file changed, 121 insertions(+), 37 deletions(-) diff --git a/backend/open_webui/retrieval/vector/dbs/pinecone.py b/backend/open_webui/retrieval/vector/dbs/pinecone.py index 13be972ab..22aa11cb4 100644 --- a/backend/open_webui/retrieval/vector/dbs/pinecone.py +++ b/backend/open_webui/retrieval/vector/dbs/pinecone.py @@ -1,8 +1,14 @@ from typing import Optional, List, Dict, Any, Union import logging -import asyncio +import time # for measuring elapsed time from pinecone import Pinecone, ServerlessSpec +import asyncio # for async upserts +import functools # for partial binding in async tasks + +import concurrent.futures # for parallel batch upserts +from pinecone.grpc import PineconeGRPC # use gRPC client for faster upserts + from open_webui.retrieval.vector.main import ( VectorDBBase, VectorItem, @@ -20,7 +26,7 @@ from open_webui.config import ( from open_webui.env import SRC_LOG_LEVELS NO_LIMIT = 10000 # Reasonable limit to avoid overwhelming the system -BATCH_SIZE = 200 # Recommended batch size for Pinecone operations +BATCH_SIZE = 100 # Recommended batch size for Pinecone operations log = logging.getLogger(__name__) log.setLevel(SRC_LOG_LEVELS["RAG"]) @@ -41,8 +47,11 @@ class PineconeClient(VectorDBBase): self.metric = PINECONE_METRIC self.cloud = PINECONE_CLOUD - # Initialize Pinecone client - self.client = Pinecone(api_key=self.api_key) + # Initialize Pinecone gRPC client for improved performance + self.client = PineconeGRPC(api_key=self.api_key, environment=self.environment, cloud=self.cloud) + + # Persistent executor for batch operations + self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=5) # Create index if it doesn't exist self._initialize_index() @@ -186,65 +195,137 @@ class PineconeClient(VectorDBBase): ) raise - async def insert(self, collection_name: str, items: List[VectorItem]) -> None: + def insert(self, collection_name: str, items: List[VectorItem]) -> None: """Insert vectors into a collection.""" - import time if not items: log.warning("No items to insert") return + start_time = time.time() + collection_name_with_prefix = self._get_collection_name_with_prefix( collection_name ) points = self._create_points(items, collection_name_with_prefix) - # Insert in batches for better performance and reliability + # Parallelize batch inserts for performance + executor = self._executor + futures = [] for i in range(0, len(points), BATCH_SIZE): batch = points[i : i + BATCH_SIZE] + futures.append(executor.submit(self.index.upsert, vectors=batch)) + for future in concurrent.futures.as_completed(futures): try: - start = time.time() - await asyncio.to_thread(self.index.upsert, vectors=batch) - elapsed = int((time.time() - start) * 1000) - # Log line removed as requested + future.result() except Exception as e: - log.error( - f"Error inserting batch into '{collection_name_with_prefix}': {e}" - ) + log.error(f"Error inserting batch: {e}") raise + elapsed = time.time() - start_time + log.debug(f"Insert of {len(points)} vectors took {elapsed:.2f} seconds") + log.info(f"Successfully inserted {len(points)} vectors in parallel batches into '{collection_name_with_prefix}'") - log.info( - f"Successfully inserted {len(items)} vectors into '{collection_name_with_prefix}'" - ) - - async def upsert(self, collection_name: str, items: List[VectorItem]) -> None: + def upsert(self, collection_name: str, items: List[VectorItem]) -> None: """Upsert (insert or update) vectors into a collection.""" - import time if not items: log.warning("No items to upsert") return + start_time = time.time() + collection_name_with_prefix = self._get_collection_name_with_prefix( collection_name ) points = self._create_points(items, collection_name_with_prefix) - # Upsert in batches + # Parallelize batch upserts for performance + executor = self._executor + futures = [] for i in range(0, len(points), BATCH_SIZE): batch = points[i : i + BATCH_SIZE] + futures.append(executor.submit(self.index.upsert, vectors=batch)) + for future in concurrent.futures.as_completed(futures): try: - start = time.time() - await asyncio.to_thread(self.index.upsert, vectors=batch) - elapsed = int((time.time() - start) * 1000) - # Log line removed as requested + future.result() except Exception as e: - log.error( - f"Error upserting batch into '{collection_name_with_prefix}': {e}" - ) + log.error(f"Error upserting batch: {e}") raise + elapsed = time.time() - start_time + log.debug(f"Upsert of {len(points)} vectors took {elapsed:.2f} seconds") + log.info(f"Successfully upserted {len(points)} vectors in parallel batches into '{collection_name_with_prefix}'") - log.info( - f"Successfully upserted {len(items)} vectors into '{collection_name_with_prefix}'" - ) + async def insert_async(self, collection_name: str, items: List[VectorItem]) -> None: + """Async version of insert using asyncio and run_in_executor for improved performance.""" + if not items: + log.warning("No items to insert") + return + + collection_name_with_prefix = self._get_collection_name_with_prefix(collection_name) + points = self._create_points(items, collection_name_with_prefix) + + # Create batches + batches = [points[i : i + BATCH_SIZE] for i in range(0, len(points), BATCH_SIZE)] + loop = asyncio.get_event_loop() + tasks = [ + loop.run_in_executor( + None, + functools.partial(self.index.upsert, vectors=batch) + ) + for batch in batches + ] + results = await asyncio.gather(*tasks, return_exceptions=True) + for result in results: + if isinstance(result, Exception): + log.error(f"Error in async insert batch: {result}") + raise result + log.info(f"Successfully async inserted {len(points)} vectors in batches into '{collection_name_with_prefix}'") + + async def upsert_async(self, collection_name: str, items: List[VectorItem]) -> None: + """Async version of upsert using asyncio and run_in_executor for improved performance.""" + if not items: + log.warning("No items to upsert") + return + + collection_name_with_prefix = self._get_collection_name_with_prefix(collection_name) + points = self._create_points(items, collection_name_with_prefix) + + # Create batches + batches = [points[i : i + BATCH_SIZE] for i in range(0, len(points), BATCH_SIZE)] + loop = asyncio.get_event_loop() + tasks = [ + loop.run_in_executor( + None, + functools.partial(self.index.upsert, vectors=batch) + ) + for batch in batches + ] + results = await asyncio.gather(*tasks, return_exceptions=True) + for result in results: + if isinstance(result, Exception): + log.error(f"Error in async upsert batch: {result}") + raise result + log.info(f"Successfully async upserted {len(points)} vectors in batches into '{collection_name_with_prefix}'") + + def streaming_upsert(self, collection_name: str, items: List[VectorItem]) -> None: + """Perform a streaming upsert over gRPC for performance testing.""" + if not items: + log.warning("No items to upsert via streaming") + return + + collection_name_with_prefix = self._get_collection_name_with_prefix(collection_name) + points = self._create_points(items, collection_name_with_prefix) + + # Open a streaming upsert channel + stream = self.index.streaming_upsert() + try: + for point in points: + # send each point over the stream + stream.send(point) + # close the stream to finalize + stream.close() + log.info(f"Successfully streamed upsert of {len(points)} vectors into '{collection_name_with_prefix}'") + except Exception as e: + log.error(f"Error during streaming upsert: {e}") + raise def search( self, collection_name: str, vectors: List[List[Union[float, int]]], limit: int @@ -361,14 +442,13 @@ class PineconeClient(VectorDBBase): log.error(f"Error getting collection '{collection_name}': {e}") return None - async def delete( + def delete( self, collection_name: str, ids: Optional[List[str]] = None, filter: Optional[Dict] = None, ) -> None: """Delete vectors by IDs or filter.""" - import time collection_name_with_prefix = self._get_collection_name_with_prefix( collection_name ) @@ -380,10 +460,10 @@ class PineconeClient(VectorDBBase): batch_ids = ids[i : i + BATCH_SIZE] # Note: When deleting by ID, we can't filter by collection_name # This is a limitation of Pinecone - be careful with ID uniqueness - start = time.time() - await asyncio.to_thread(self.index.delete, ids=batch_ids) - elapsed = int((time.time() - start) * 1000) - # Log line removed as requested + self.index.delete(ids=batch_ids) + log.debug( + f"Deleted batch of {len(batch_ids)} vectors by ID from '{collection_name_with_prefix}'" + ) log.info( f"Successfully deleted {len(ids)} vectors by ID from '{collection_name_with_prefix}'" ) @@ -414,3 +494,7 @@ class PineconeClient(VectorDBBase): except Exception as e: log.error(f"Failed to reset Pinecone index: {e}") raise + + def close(self): + """Shut down the thread pool.""" + self._executor.shutdown(wait=True) From 3f58a17e47605bac18ad72cfe84a01efba1fdce3 Mon Sep 17 00:00:00 2001 From: PVBLIC Foundation Date: Sat, 10 May 2025 06:07:27 -0700 Subject: [PATCH 4/4] Update pinecone.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Removed the unused Pinecone REST‐client import; we now only import ServerlessSpec and the gRPC client. • Enhanced close() • Call self.client.close() to explicitly shut down the underlying gRPC channel. • Log success or a warning on failure. • Still tear down the thread‐pool executor afterward. • Context‐manager support • Added __enter__()/__exit__() so you can do: with PineconeClient() as client: client.insert(...) # automatically calls client.close() --- .../open_webui/retrieval/vector/dbs/pinecone.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/backend/open_webui/retrieval/vector/dbs/pinecone.py b/backend/open_webui/retrieval/vector/dbs/pinecone.py index 22aa11cb4..2f44e70e7 100644 --- a/backend/open_webui/retrieval/vector/dbs/pinecone.py +++ b/backend/open_webui/retrieval/vector/dbs/pinecone.py @@ -1,7 +1,7 @@ from typing import Optional, List, Dict, Any, Union import logging import time # for measuring elapsed time -from pinecone import Pinecone, ServerlessSpec +from pinecone import ServerlessSpec import asyncio # for async upserts import functools # for partial binding in async tasks @@ -496,5 +496,18 @@ class PineconeClient(VectorDBBase): raise def close(self): - """Shut down the thread pool.""" + """Shut down the gRPC channel and thread pool.""" + try: + self.client.close() + log.info("Pinecone gRPC channel closed.") + except Exception as e: + log.warning(f"Failed to close Pinecone gRPC channel: {e}") self._executor.shutdown(wait=True) + + def __enter__(self): + """Enter context manager.""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Exit context manager, ensuring resources are cleaned up.""" + self.close()