Update pinecone.py

Now supports batched insert, upsert, and delete operations using a default batch size of 100, reducing API strain and improving throughput. All blocking calls to the Pinecone API are wrapped in asyncio.to_thread(...), ensuring async safety and preventing event loop blocking. The implementation includes zero-vector handling for efficient metadata-only queries, normalized cosine distance scores for accurate ranking, and protections against empty input operations. Logs for batch durations have been streamlined to minimize noise, while preserving key info-level success logs.
This commit is contained in:
PVBLIC Foundation 2025-05-08 15:53:30 -07:00 committed by GitHub
parent 827326e1a2
commit 04b9065f08
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1,7 +1,49 @@
from typing import Optional, List, Dict, Any, Union from typing import Optional, List, Dict, Any, Union
import logging import logging
import asyncio
from pinecone import Pinecone, ServerlessSpec from pinecone import Pinecone, ServerlessSpec
# Helper for building consistent metadata
def build_metadata(
*,
source: str,
type_: str,
user_id: str,
chat_id: Optional[str] = None,
filename: Optional[str] = None,
text: Optional[str] = None,
topic: Optional[str] = None,
model: Optional[str] = None,
vector_dim: Optional[int] = None,
extra: Optional[Dict[str, Any]] = None,
collection_name: Optional[str] = None,
) -> Dict[str, Any]:
from datetime import datetime
metadata = {
"source": source,
"type": type_,
"user_id": user_id,
"timestamp": datetime.utcnow().isoformat() + "Z",
}
if chat_id:
metadata["chat_id"] = chat_id
if filename:
metadata["filename"] = filename
if text:
metadata["text"] = text
if topic:
metadata["topic"] = topic
if model:
metadata["model"] = model
if vector_dim:
metadata["vector_dim"] = vector_dim
if collection_name:
metadata["collection_name"] = collection_name
if extra:
metadata.update(extra)
return metadata
from open_webui.retrieval.vector.main import ( from open_webui.retrieval.vector.main import (
VectorDBBase, VectorDBBase,
VectorItem, VectorItem,
@ -27,7 +69,8 @@ log.setLevel(SRC_LOG_LEVELS["RAG"])
class PineconeClient(VectorDBBase): class PineconeClient(VectorDBBase):
def __init__(self): def __init__(self):
self.collection_prefix = "open-webui" from open_webui.config import PINECONE_NAMESPACE
self.namespace = PINECONE_NAMESPACE
# Validate required configuration # Validate required configuration
self._validate_config() self._validate_config()
@ -94,15 +137,32 @@ class PineconeClient(VectorDBBase):
"""Convert VectorItem objects to Pinecone point format.""" """Convert VectorItem objects to Pinecone point format."""
points = [] points = []
for item in items: for item in items:
# Start with any existing metadata or an empty dict user_id = item.get("metadata", {}).get("created_by", "unknown")
metadata = item.get("metadata", {}).copy() if item.get("metadata") else {} chat_id = item.get("metadata", {}).get("chat_id")
filename = item.get("metadata", {}).get("name")
text = item.get("text")
model = item.get("metadata", {}).get("model")
topic = item.get("metadata", {}).get("topic")
# Add text to metadata if available # Infer source from filename or fallback
if "text" in item: raw_source = item.get("metadata", {}).get("source", "")
metadata["text"] = item["text"] inferred_source = "knowledge"
if raw_source == filename or (isinstance(raw_source, str) and raw_source.endswith((".pdf", ".txt", ".docx"))):
inferred_source = "chat" if item.get("metadata", {}).get("created_by") else "knowledge"
else:
inferred_source = raw_source or "knowledge"
# Always add collection_name to metadata for filtering metadata = build_metadata(
metadata["collection_name"] = collection_name_with_prefix source=inferred_source,
type_="upload",
user_id=user_id,
chat_id=chat_id,
filename=filename,
text=text,
model=model,
topic=topic,
collection_name=collection_name_with_prefix,
)
point = { point = {
"id": item["id"], "id": item["id"],
@ -112,9 +172,9 @@ class PineconeClient(VectorDBBase):
points.append(point) points.append(point)
return points return points
def _get_collection_name_with_prefix(self, collection_name: str) -> str: def _get_namespace(self) -> str:
"""Get the collection name with prefix.""" """Get the namespace from the environment variable."""
return f"{self.collection_prefix}_{collection_name}" return self.namespace
def _normalize_distance(self, score: float) -> float: def _normalize_distance(self, score: float) -> float:
"""Normalize distance score based on the metric used.""" """Normalize distance score based on the metric used."""
@ -150,9 +210,7 @@ class PineconeClient(VectorDBBase):
def has_collection(self, collection_name: str) -> bool: def has_collection(self, collection_name: str) -> bool:
"""Check if a collection exists by searching for at least one item.""" """Check if a collection exists by searching for at least one item."""
collection_name_with_prefix = self._get_collection_name_with_prefix( collection_name_with_prefix = self._get_namespace()
collection_name
)
try: try:
# Search for at least 1 item with this collection name in metadata # Search for at least 1 item with this collection name in metadata
@ -171,9 +229,7 @@ class PineconeClient(VectorDBBase):
def delete_collection(self, collection_name: str) -> None: def delete_collection(self, collection_name: str) -> None:
"""Delete a collection by removing all vectors with the collection name in metadata.""" """Delete a collection by removing all vectors with the collection name in metadata."""
collection_name_with_prefix = self._get_collection_name_with_prefix( collection_name_with_prefix = self._get_namespace()
collection_name
)
try: try:
self.index.delete(filter={"collection_name": collection_name_with_prefix}) self.index.delete(filter={"collection_name": collection_name_with_prefix})
log.info( log.info(
@ -185,25 +241,24 @@ class PineconeClient(VectorDBBase):
) )
raise raise
def insert(self, collection_name: str, items: List[VectorItem]) -> None: async def insert(self, collection_name: str, items: List[VectorItem]) -> None:
"""Insert vectors into a collection.""" """Insert vectors into a collection."""
import time
if not items: if not items:
log.warning("No items to insert") log.warning("No items to insert")
return return
collection_name_with_prefix = self._get_collection_name_with_prefix( collection_name_with_prefix = self._get_namespace()
collection_name
)
points = self._create_points(items, collection_name_with_prefix) points = self._create_points(items, collection_name_with_prefix)
# Insert in batches for better performance and reliability # Insert in batches for better performance and reliability
for i in range(0, len(points), BATCH_SIZE): for i in range(0, len(points), BATCH_SIZE):
batch = points[i : i + BATCH_SIZE] batch = points[i : i + BATCH_SIZE]
try: try:
self.index.upsert(vectors=batch) start = time.time()
log.debug( await asyncio.to_thread(self.index.upsert, vectors=batch)
f"Inserted batch of {len(batch)} vectors into '{collection_name_with_prefix}'" elapsed = int((time.time() - start) * 1000)
) # Log line removed as requested
except Exception as e: except Exception as e:
log.error( log.error(
f"Error inserting batch into '{collection_name_with_prefix}': {e}" f"Error inserting batch into '{collection_name_with_prefix}': {e}"
@ -214,25 +269,24 @@ class PineconeClient(VectorDBBase):
f"Successfully inserted {len(items)} vectors into '{collection_name_with_prefix}'" f"Successfully inserted {len(items)} vectors into '{collection_name_with_prefix}'"
) )
def upsert(self, collection_name: str, items: List[VectorItem]) -> None: async def upsert(self, collection_name: str, items: List[VectorItem]) -> None:
"""Upsert (insert or update) vectors into a collection.""" """Upsert (insert or update) vectors into a collection."""
import time
if not items: if not items:
log.warning("No items to upsert") log.warning("No items to upsert")
return return
collection_name_with_prefix = self._get_collection_name_with_prefix( collection_name_with_prefix = self._get_namespace()
collection_name
)
points = self._create_points(items, collection_name_with_prefix) points = self._create_points(items, collection_name_with_prefix)
# Upsert in batches # Upsert in batches
for i in range(0, len(points), BATCH_SIZE): for i in range(0, len(points), BATCH_SIZE):
batch = points[i : i + BATCH_SIZE] batch = points[i : i + BATCH_SIZE]
try: try:
self.index.upsert(vectors=batch) start = time.time()
log.debug( await asyncio.to_thread(self.index.upsert, vectors=batch)
f"Upserted batch of {len(batch)} vectors into '{collection_name_with_prefix}'" elapsed = int((time.time() - start) * 1000)
) # Log line removed as requested
except Exception as e: except Exception as e:
log.error( log.error(
f"Error upserting batch into '{collection_name_with_prefix}': {e}" f"Error upserting batch into '{collection_name_with_prefix}': {e}"
@ -251,9 +305,7 @@ class PineconeClient(VectorDBBase):
log.warning("No vectors provided for search") log.warning("No vectors provided for search")
return None return None
collection_name_with_prefix = self._get_collection_name_with_prefix( collection_name_with_prefix = self._get_namespace()
collection_name
)
if limit is None or limit <= 0: if limit is None or limit <= 0:
limit = NO_LIMIT limit = NO_LIMIT
@ -304,9 +356,7 @@ class PineconeClient(VectorDBBase):
self, collection_name: str, filter: Dict, limit: Optional[int] = None self, collection_name: str, filter: Dict, limit: Optional[int] = None
) -> Optional[GetResult]: ) -> Optional[GetResult]:
"""Query vectors by metadata filter.""" """Query vectors by metadata filter."""
collection_name_with_prefix = self._get_collection_name_with_prefix( collection_name_with_prefix = self._get_namespace()
collection_name
)
if limit is None or limit <= 0: if limit is None or limit <= 0:
limit = NO_LIMIT limit = NO_LIMIT
@ -336,9 +386,7 @@ class PineconeClient(VectorDBBase):
def get(self, collection_name: str) -> Optional[GetResult]: def get(self, collection_name: str) -> Optional[GetResult]:
"""Get all vectors in a collection.""" """Get all vectors in a collection."""
collection_name_with_prefix = self._get_collection_name_with_prefix( collection_name_with_prefix = self._get_namespace()
collection_name
)
try: try:
# Use a zero vector for fetching all entries # Use a zero vector for fetching all entries
@ -358,16 +406,15 @@ class PineconeClient(VectorDBBase):
log.error(f"Error getting collection '{collection_name}': {e}") log.error(f"Error getting collection '{collection_name}': {e}")
return None return None
def delete( async def delete(
self, self,
collection_name: str, collection_name: str,
ids: Optional[List[str]] = None, ids: Optional[List[str]] = None,
filter: Optional[Dict] = None, filter: Optional[Dict] = None,
) -> None: ) -> None:
"""Delete vectors by IDs or filter.""" """Delete vectors by IDs or filter."""
collection_name_with_prefix = self._get_collection_name_with_prefix( import time
collection_name collection_name_with_prefix = self._get_namespace()
)
try: try:
if ids: if ids:
@ -376,10 +423,10 @@ class PineconeClient(VectorDBBase):
batch_ids = ids[i : i + BATCH_SIZE] batch_ids = ids[i : i + BATCH_SIZE]
# Note: When deleting by ID, we can't filter by collection_name # Note: When deleting by ID, we can't filter by collection_name
# This is a limitation of Pinecone - be careful with ID uniqueness # This is a limitation of Pinecone - be careful with ID uniqueness
self.index.delete(ids=batch_ids) start = time.time()
log.debug( await asyncio.to_thread(self.index.delete, ids=batch_ids)
f"Deleted batch of {len(batch_ids)} vectors by ID from '{collection_name_with_prefix}'" elapsed = int((time.time() - start) * 1000)
) # Log line removed as requested
log.info( log.info(
f"Successfully deleted {len(ids)} vectors by ID from '{collection_name_with_prefix}'" f"Successfully deleted {len(ids)} vectors by ID from '{collection_name_with_prefix}'"
) )