open-webui/backend/open_webui/apps/retrieval/vector/dbs/milvus.py

243 lines
8.0 KiB
Python
Raw Normal View History

2024-09-12 05:52:19 +00:00
from pymilvus import MilvusClient as Client
from pymilvus import FieldSchema, DataType
import json
2024-09-10 03:37:06 +00:00
from typing import Optional
2024-09-27 23:28:45 +00:00
from open_webui.apps.retrieval.vector.main import VectorItem, SearchResult, GetResult
2024-09-12 05:52:19 +00:00
from open_webui.config import (
MILVUS_URI,
)
2024-09-10 03:37:06 +00:00
class MilvusClient:
def __init__(self):
2024-09-12 05:52:19 +00:00
self.collection_prefix = "open_webui"
self.client = Client(uri=MILVUS_URI)
2024-09-10 03:37:06 +00:00
2024-09-13 05:30:30 +00:00
def _result_to_get_result(self, result) -> GetResult:
ids = []
documents = []
metadatas = []
for match in result:
_ids = []
_documents = []
_metadatas = []
for item in match:
_ids.append(item.get("id"))
_documents.append(item.get("data", {}).get("text"))
_metadatas.append(item.get("metadata"))
ids.append(_ids)
documents.append(_documents)
metadatas.append(_metadatas)
return GetResult(
**{
"ids": ids,
"documents": documents,
"metadatas": metadatas,
}
)
def _result_to_search_result(self, result) -> SearchResult:
2024-09-12 05:52:19 +00:00
ids = []
distances = []
documents = []
metadatas = []
for match in result:
_ids = []
_distances = []
_documents = []
_metadatas = []
for item in match:
_ids.append(item.get("id"))
_distances.append(item.get("distance"))
_documents.append(item.get("entity", {}).get("data", {}).get("text"))
_metadatas.append(item.get("entity", {}).get("metadata"))
ids.append(_ids)
distances.append(_distances)
documents.append(_documents)
metadatas.append(_metadatas)
2024-09-13 05:18:20 +00:00
return SearchResult(
**{
"ids": ids,
"distances": distances,
"documents": documents,
"metadatas": metadatas,
}
)
2024-09-12 05:52:19 +00:00
def _create_collection(self, collection_name: str, dimension: int):
schema = self.client.create_schema(
auto_id=False,
enable_dynamic_field=True,
)
schema.add_field(
field_name="id",
datatype=DataType.VARCHAR,
is_primary=True,
max_length=65535,
)
schema.add_field(
field_name="vector",
datatype=DataType.FLOAT_VECTOR,
dim=dimension,
description="vector",
)
schema.add_field(field_name="data", datatype=DataType.JSON, description="data")
schema.add_field(
field_name="metadata", datatype=DataType.JSON, description="metadata"
)
index_params = self.client.prepare_index_params()
index_params.add_index(
2024-09-26 20:59:09 +00:00
field_name="vector",
index_type="HNSW",
metric_type="COSINE",
params={"M": 16, "efConstruction": 100},
2024-09-12 05:52:19 +00:00
)
2024-09-10 03:37:06 +00:00
2024-09-12 05:52:19 +00:00
self.client.create_collection(
collection_name=f"{self.collection_prefix}_{collection_name}",
schema=schema,
index_params=index_params,
)
2024-09-12 06:00:31 +00:00
def has_collection(self, collection_name: str) -> bool:
# Check if the collection exists based on the collection name.
return self.client.has_collection(
collection_name=f"{self.collection_prefix}_{collection_name}"
)
2024-09-10 03:37:06 +00:00
def delete_collection(self, collection_name: str):
2024-09-12 05:52:19 +00:00
# Delete the collection based on the collection name.
return self.client.drop_collection(
collection_name=f"{self.collection_prefix}_{collection_name}"
)
2024-09-10 03:37:06 +00:00
def search(
self, collection_name: str, vectors: list[list[float | int]], limit: int
2024-09-13 05:18:20 +00:00
) -> Optional[SearchResult]:
2024-09-12 05:52:19 +00:00
# Search for the nearest neighbor items based on the vectors and return 'limit' number of results.
result = self.client.search(
collection_name=f"{self.collection_prefix}_{collection_name}",
data=vectors,
limit=limit,
output_fields=["data", "metadata"],
)
2024-09-13 05:30:30 +00:00
return self._result_to_search_result(result)
2024-09-10 03:37:06 +00:00
2024-10-03 13:53:21 +00:00
def query(
self, collection_name: str, filter: dict, limit: int = 1
) -> Optional[SearchResult]:
# Query the items from the collection based on the filter.
filter_string = " && ".join(
[
f"JSON_CONTAINS(metadata[{key}], '{[value] if isinstance(value, str) else value}')"
for key, value in filter.items()
]
)
result = self.client.query(
collection_name=f"{self.collection_prefix}_{collection_name}",
filter=filter_string,
limit=limit,
)
return self._result_to_search_result([result])
2024-09-13 05:18:20 +00:00
def get(self, collection_name: str) -> Optional[GetResult]:
2024-09-12 05:52:19 +00:00
# Get all the items in the collection.
result = self.client.query(
collection_name=f"{self.collection_prefix}_{collection_name}",
2024-09-13 05:18:20 +00:00
filter='id != ""',
2024-09-12 05:52:19 +00:00
)
2024-09-13 05:30:30 +00:00
return self._result_to_get_result([result])
2024-09-10 03:37:06 +00:00
def insert(self, collection_name: str, items: list[VectorItem]):
2024-09-12 05:52:19 +00:00
# Insert the items into the collection, if the collection does not exist, it will be created.
if not self.client.has_collection(
collection_name=f"{self.collection_prefix}_{collection_name}"
):
self._create_collection(
collection_name=collection_name, dimension=len(items[0]["vector"])
)
return self.client.insert(
collection_name=f"{self.collection_prefix}_{collection_name}",
data=[
{
"id": item["id"],
"vector": item["vector"],
"data": {"text": item["text"]},
"metadata": item["metadata"],
}
for item in items
],
)
2024-09-10 03:37:06 +00:00
def upsert(self, collection_name: str, items: list[VectorItem]):
2024-09-12 05:52:19 +00:00
# Update the items in the collection, if the items are not present, insert them. If the collection does not exist, it will be created.
if not self.client.has_collection(
collection_name=f"{self.collection_prefix}_{collection_name}"
):
self._create_collection(
collection_name=collection_name, dimension=len(items[0]["vector"])
)
return self.client.upsert(
collection_name=f"{self.collection_prefix}_{collection_name}",
data=[
{
"id": item["id"],
"vector": item["vector"],
"data": {"text": item["text"]},
"metadata": item["metadata"],
}
for item in items
],
)
2024-09-10 03:37:06 +00:00
2024-10-03 13:43:50 +00:00
def delete(
self,
collection_name: str,
ids: Optional[list[str]] = None,
filter: Optional[dict] = None,
):
2024-09-12 05:52:19 +00:00
# Delete the items from the collection based on the ids.
2024-10-03 13:43:50 +00:00
if ids:
return self.client.delete(
collection_name=f"{self.collection_prefix}_{collection_name}",
ids=ids,
)
elif filter:
# Convert the filter dictionary to a string using JSON_CONTAINS.
filter_string = " && ".join(
[
f"JSON_CONTAINS(metadata[{key}], '{[value] if isinstance(value, str) else value}')"
for key, value in filter.items()
]
)
return self.client.delete(
collection_name=f"{self.collection_prefix}_{collection_name}",
filter=filter_string,
)
2024-09-10 03:37:06 +00:00
def reset(self):
2024-09-12 05:52:19 +00:00
# Resets the database. This will delete all collections and item entries.
collection_names = self.client.list_collections()
for collection_name in collection_names:
if collection_name.startswith(self.collection_prefix):
self.client.drop_collection(collection_name=collection_name)