chore: format

This commit is contained in:
Timothy Jaeryang Baek
2025-05-10 19:00:01 +04:00
parent 3a79840a87
commit c61790b355
57 changed files with 344 additions and 172 deletions

View File

@@ -24,6 +24,7 @@ from open_webui.env import SRC_LOG_LEVELS
log = logging.getLogger(__name__)
log.setLevel(SRC_LOG_LEVELS["RAG"])
class MilvusClient(VectorDBBase):
def __init__(self):
self.collection_prefix = "open_webui"
@@ -113,12 +114,15 @@ class MilvusClient(VectorDBBase):
# Use configurations from config.py
index_type = MILVUS_INDEX_TYPE.upper()
metric_type = MILVUS_METRIC_TYPE.upper()
log.info(f"Using Milvus index type: {index_type}, metric type: {metric_type}")
index_creation_params = {}
if index_type == "HNSW":
index_creation_params = {"M": MILVUS_HNSW_M, "efConstruction": MILVUS_HNSW_EFCONSTRUCTION}
index_creation_params = {
"M": MILVUS_HNSW_M,
"efConstruction": MILVUS_HNSW_EFCONSTRUCTION,
}
log.info(f"HNSW params: {index_creation_params}")
elif index_type == "IVF_FLAT":
index_creation_params = {"nlist": MILVUS_IVF_FLAT_NLIST}
@@ -146,8 +150,9 @@ class MilvusClient(VectorDBBase):
schema=schema,
index_params=index_params,
)
log.info(f"Successfully created collection '{self.collection_prefix}_{collection_name}' with index type '{index_type}' and metric '{metric_type}'.")
log.info(
f"Successfully created collection '{self.collection_prefix}_{collection_name}' with index type '{index_type}' and metric '{metric_type}'."
)
def has_collection(self, collection_name: str) -> bool:
# Check if the collection exists based on the collection name.
@@ -184,7 +189,9 @@ class MilvusClient(VectorDBBase):
# Construct the filter string for querying
collection_name = collection_name.replace("-", "_")
if not self.has_collection(collection_name):
log.warning(f"Query attempted on non-existent collection: {self.collection_prefix}_{collection_name}")
log.warning(
f"Query attempted on non-existent collection: {self.collection_prefix}_{collection_name}"
)
return None
filter_string = " && ".join(
[
@@ -200,47 +207,62 @@ class MilvusClient(VectorDBBase):
# For now, if limit is None, we'll fetch in batches up to a very large number.
# This part could be refined based on expected use cases for "get all".
# For this function signature, None implies "as many as possible" up to Milvus limits.
limit = 16384 * 10 # A large number to signify fetching many, will be capped by actual data or max_limit per call.
log.info(f"Limit not specified for query, fetching up to {limit} results in batches.")
limit = (
16384 * 10
) # A large number to signify fetching many, will be capped by actual data or max_limit per call.
log.info(
f"Limit not specified for query, fetching up to {limit} results in batches."
)
# Initialize offset and remaining to handle pagination
offset = 0
remaining = limit
try:
log.info(f"Querying collection {self.collection_prefix}_{collection_name} with filter: '{filter_string}', limit: {limit}")
log.info(
f"Querying collection {self.collection_prefix}_{collection_name} with filter: '{filter_string}', limit: {limit}"
)
# Loop until there are no more items to fetch or the desired limit is reached
while remaining > 0:
current_fetch = min(max_limit, remaining if isinstance(remaining, int) else max_limit)
log.debug(f"Querying with offset: {offset}, current_fetch: {current_fetch}")
current_fetch = min(
max_limit, remaining if isinstance(remaining, int) else max_limit
)
log.debug(
f"Querying with offset: {offset}, current_fetch: {current_fetch}"
)
results = self.client.query(
collection_name=f"{self.collection_prefix}_{collection_name}",
filter=filter_string,
output_fields=["id", "data", "metadata"], # Explicitly list needed fields. Vector not usually needed in query.
output_fields=[
"id",
"data",
"metadata",
], # Explicitly list needed fields. Vector not usually needed in query.
limit=current_fetch,
offset=offset,
)
if not results:
log.debug("No more results from query.")
break
all_results.extend(results)
results_count = len(results)
log.debug(f"Fetched {results_count} results in this batch.")
if isinstance(remaining, int):
remaining -= results_count
offset += results_count
# Break the loop if the results returned are less than the requested fetch count (means end of data)
if results_count < current_fetch:
log.debug("Fetched less than requested, assuming end of results for this query.")
log.debug(
"Fetched less than requested, assuming end of results for this query."
)
break
log.info(f"Total results from query: {len(all_results)}")
return self._result_to_get_result([all_results])
except Exception as e:
@@ -252,27 +274,36 @@ class MilvusClient(VectorDBBase):
def get(self, collection_name: str) -> Optional[GetResult]:
# Get all the items in the collection. This can be very resource-intensive for large collections.
collection_name = collection_name.replace("-", "_")
log.warning(f"Fetching ALL items from collection '{self.collection_prefix}_{collection_name}'. This might be slow for large collections.")
log.warning(
f"Fetching ALL items from collection '{self.collection_prefix}_{collection_name}'. This might be slow for large collections."
)
# Using query with a trivial filter to get all items.
# This will use the paginated query logic.
return self.query(collection_name=collection_name, filter={}, limit=None)
def insert(self, collection_name: str, items: list[VectorItem]):
# Insert the items into the collection, if the collection does not exist, it will be created.
collection_name = collection_name.replace("-", "_")
if not self.client.has_collection(
collection_name=f"{self.collection_prefix}_{collection_name}"
):
log.info(f"Collection {self.collection_prefix}_{collection_name} does not exist. Creating now.")
log.info(
f"Collection {self.collection_prefix}_{collection_name} does not exist. Creating now."
)
if not items:
log.error(f"Cannot create collection {self.collection_prefix}_{collection_name} without items to determine dimension.")
raise ValueError("Cannot create Milvus collection without items to determine vector dimension.")
log.error(
f"Cannot create collection {self.collection_prefix}_{collection_name} without items to determine dimension."
)
raise ValueError(
"Cannot create Milvus collection without items to determine vector dimension."
)
self._create_collection(
collection_name=collection_name, dimension=len(items[0]["vector"])
)
log.info(f"Inserting {len(items)} items into collection {self.collection_prefix}_{collection_name}.")
log.info(
f"Inserting {len(items)} items into collection {self.collection_prefix}_{collection_name}."
)
return self.client.insert(
collection_name=f"{self.collection_prefix}_{collection_name}",
data=[
@@ -292,15 +323,23 @@ class MilvusClient(VectorDBBase):
if not self.client.has_collection(
collection_name=f"{self.collection_prefix}_{collection_name}"
):
log.info(f"Collection {self.collection_prefix}_{collection_name} does not exist for upsert. Creating now.")
log.info(
f"Collection {self.collection_prefix}_{collection_name} does not exist for upsert. Creating now."
)
if not items:
log.error(f"Cannot create collection {self.collection_prefix}_{collection_name} for upsert without items to determine dimension.")
raise ValueError("Cannot create Milvus collection for upsert without items to determine vector dimension.")
log.error(
f"Cannot create collection {self.collection_prefix}_{collection_name} for upsert without items to determine dimension."
)
raise ValueError(
"Cannot create Milvus collection for upsert without items to determine vector dimension."
)
self._create_collection(
collection_name=collection_name, dimension=len(items[0]["vector"])
)
log.info(f"Upserting {len(items)} items into collection {self.collection_prefix}_{collection_name}.")
log.info(
f"Upserting {len(items)} items into collection {self.collection_prefix}_{collection_name}."
)
return self.client.upsert(
collection_name=f"{self.collection_prefix}_{collection_name}",
data=[
@@ -323,11 +362,15 @@ class MilvusClient(VectorDBBase):
# Delete the items from the collection based on the ids or filter.
collection_name = collection_name.replace("-", "_")
if not self.has_collection(collection_name):
log.warning(f"Delete attempted on non-existent collection: {self.collection_prefix}_{collection_name}")
log.warning(
f"Delete attempted on non-existent collection: {self.collection_prefix}_{collection_name}"
)
return None
if ids:
log.info(f"Deleting items by IDs from {self.collection_prefix}_{collection_name}. IDs: {ids}")
log.info(
f"Deleting items by IDs from {self.collection_prefix}_{collection_name}. IDs: {ids}"
)
return self.client.delete(
collection_name=f"{self.collection_prefix}_{collection_name}",
ids=ids,
@@ -339,19 +382,24 @@ class MilvusClient(VectorDBBase):
for key, value in filter.items()
]
)
log.info(f"Deleting items by filter from {self.collection_prefix}_{collection_name}. Filter: {filter_string}")
log.info(
f"Deleting items by filter from {self.collection_prefix}_{collection_name}. Filter: {filter_string}"
)
return self.client.delete(
collection_name=f"{self.collection_prefix}_{collection_name}",
filter=filter_string,
)
else:
log.warning(f"Delete operation on {self.collection_prefix}_{collection_name} called without IDs or filter. No action taken.")
log.warning(
f"Delete operation on {self.collection_prefix}_{collection_name} called without IDs or filter. No action taken."
)
return None
def reset(self):
# Resets the database. This will delete all collections and item entries that match the prefix.
log.warning(f"Resetting Milvus: Deleting all collections with prefix '{self.collection_prefix}'.")
log.warning(
f"Resetting Milvus: Deleting all collections with prefix '{self.collection_prefix}'."
)
collection_names = self.client.list_collections()
deleted_collections = []
for collection_name_full in collection_names:

View File

@@ -48,7 +48,9 @@ class PineconeClient(VectorDBBase):
self.cloud = PINECONE_CLOUD
# Initialize Pinecone gRPC client for improved performance
self.client = PineconeGRPC(api_key=self.api_key, environment=self.environment, cloud=self.cloud)
self.client = PineconeGRPC(
api_key=self.api_key, environment=self.environment, cloud=self.cloud
)
# Persistent executor for batch operations
self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
@@ -222,7 +224,9 @@ class PineconeClient(VectorDBBase):
raise
elapsed = time.time() - start_time
log.debug(f"Insert of {len(points)} vectors took {elapsed:.2f} seconds")
log.info(f"Successfully inserted {len(points)} vectors in parallel batches into '{collection_name_with_prefix}'")
log.info(
f"Successfully inserted {len(points)} vectors in parallel batches into '{collection_name_with_prefix}'"
)
def upsert(self, collection_name: str, items: List[VectorItem]) -> None:
"""Upsert (insert or update) vectors into a collection."""
@@ -251,7 +255,9 @@ class PineconeClient(VectorDBBase):
raise
elapsed = time.time() - start_time
log.debug(f"Upsert of {len(points)} vectors took {elapsed:.2f} seconds")
log.info(f"Successfully upserted {len(points)} vectors in parallel batches into '{collection_name_with_prefix}'")
log.info(
f"Successfully upserted {len(points)} vectors in parallel batches into '{collection_name_with_prefix}'"
)
async def insert_async(self, collection_name: str, items: List[VectorItem]) -> None:
"""Async version of insert using asyncio and run_in_executor for improved performance."""
@@ -259,16 +265,19 @@ class PineconeClient(VectorDBBase):
log.warning("No items to insert")
return
collection_name_with_prefix = self._get_collection_name_with_prefix(collection_name)
collection_name_with_prefix = self._get_collection_name_with_prefix(
collection_name
)
points = self._create_points(items, collection_name_with_prefix)
# Create batches
batches = [points[i : i + BATCH_SIZE] for i in range(0, len(points), BATCH_SIZE)]
batches = [
points[i : i + BATCH_SIZE] for i in range(0, len(points), BATCH_SIZE)
]
loop = asyncio.get_event_loop()
tasks = [
loop.run_in_executor(
None,
functools.partial(self.index.upsert, vectors=batch)
None, functools.partial(self.index.upsert, vectors=batch)
)
for batch in batches
]
@@ -277,7 +286,9 @@ class PineconeClient(VectorDBBase):
if isinstance(result, Exception):
log.error(f"Error in async insert batch: {result}")
raise result
log.info(f"Successfully async inserted {len(points)} vectors in batches into '{collection_name_with_prefix}'")
log.info(
f"Successfully async inserted {len(points)} vectors in batches into '{collection_name_with_prefix}'"
)
async def upsert_async(self, collection_name: str, items: List[VectorItem]) -> None:
"""Async version of upsert using asyncio and run_in_executor for improved performance."""
@@ -285,16 +296,19 @@ class PineconeClient(VectorDBBase):
log.warning("No items to upsert")
return
collection_name_with_prefix = self._get_collection_name_with_prefix(collection_name)
collection_name_with_prefix = self._get_collection_name_with_prefix(
collection_name
)
points = self._create_points(items, collection_name_with_prefix)
# Create batches
batches = [points[i : i + BATCH_SIZE] for i in range(0, len(points), BATCH_SIZE)]
batches = [
points[i : i + BATCH_SIZE] for i in range(0, len(points), BATCH_SIZE)
]
loop = asyncio.get_event_loop()
tasks = [
loop.run_in_executor(
None,
functools.partial(self.index.upsert, vectors=batch)
None, functools.partial(self.index.upsert, vectors=batch)
)
for batch in batches
]
@@ -303,7 +317,9 @@ class PineconeClient(VectorDBBase):
if isinstance(result, Exception):
log.error(f"Error in async upsert batch: {result}")
raise result
log.info(f"Successfully async upserted {len(points)} vectors in batches into '{collection_name_with_prefix}'")
log.info(
f"Successfully async upserted {len(points)} vectors in batches into '{collection_name_with_prefix}'"
)
def streaming_upsert(self, collection_name: str, items: List[VectorItem]) -> None:
"""Perform a streaming upsert over gRPC for performance testing."""
@@ -311,7 +327,9 @@ class PineconeClient(VectorDBBase):
log.warning("No items to upsert via streaming")
return
collection_name_with_prefix = self._get_collection_name_with_prefix(collection_name)
collection_name_with_prefix = self._get_collection_name_with_prefix(
collection_name
)
points = self._create_points(items, collection_name_with_prefix)
# Open a streaming upsert channel
@@ -322,7 +340,9 @@ class PineconeClient(VectorDBBase):
stream.send(point)
# close the stream to finalize
stream.close()
log.info(f"Successfully streamed upsert of {len(points)} vectors into '{collection_name_with_prefix}'")
log.info(
f"Successfully streamed upsert of {len(points)} vectors into '{collection_name_with_prefix}'"
)
except Exception as e:
log.error(f"Error during streaming upsert: {e}")
raise

View File

@@ -50,8 +50,8 @@ class JupyterCodeExecuter:
self.password = password
self.timeout = timeout
self.kernel_id = ""
if self.base_url[-1] != '/':
self.base_url += '/'
if self.base_url[-1] != "/":
self.base_url += "/"
self.session = aiohttp.ClientSession(trust_env=True, base_url=self.base_url)
self.params = {}
self.result = ResultModel()
@@ -103,9 +103,7 @@ class JupyterCodeExecuter:
self.params.update({"token": self.token})
async def init_kernel(self) -> None:
async with self.session.post(
url="api/kernels", params=self.params
) as response:
async with self.session.post(url="api/kernels", params=self.params) as response:
response.raise_for_status()
kernel_data = await response.json()
self.kernel_id = kernel_data["id"]