Merge pull request #11245 from tupe2009/kleqon.fix-es-bugs

fix: ElasticSearch indexes delete and upsert bugs(Post 0.5.19 release)
This commit is contained in:
Timothy Jaeryang Baek 2025-03-05 13:36:55 -08:00 committed by GitHub
commit 73a77abd20
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 137 additions and 95 deletions

View File

@ -1553,7 +1553,7 @@ ELASTICSEARCH_USERNAME = os.environ.get("ELASTICSEARCH_USERNAME", None)
ELASTICSEARCH_PASSWORD = os.environ.get("ELASTICSEARCH_PASSWORD", None) ELASTICSEARCH_PASSWORD = os.environ.get("ELASTICSEARCH_PASSWORD", None)
ELASTICSEARCH_CLOUD_ID = os.environ.get("ELASTICSEARCH_CLOUD_ID", None) ELASTICSEARCH_CLOUD_ID = os.environ.get("ELASTICSEARCH_CLOUD_ID", None)
SSL_ASSERT_FINGERPRINT = os.environ.get("SSL_ASSERT_FINGERPRINT", None) SSL_ASSERT_FINGERPRINT = os.environ.get("SSL_ASSERT_FINGERPRINT", None)
ELASTICSEARCH_INDEX_PREFIX = os.environ.get("ELASTICSEARCH_INDEX_PREFIX", "open_webui_collections")
# Pgvector # Pgvector
PGVECTOR_DB_URL = os.environ.get("PGVECTOR_DB_URL", DATABASE_URL) PGVECTOR_DB_URL = os.environ.get("PGVECTOR_DB_URL", DATABASE_URL)
if VECTOR_DB == "pgvector" and not PGVECTOR_DB_URL.startswith("postgres"): if VECTOR_DB == "pgvector" and not PGVECTOR_DB_URL.startswith("postgres"):

View File

@ -1,47 +1,46 @@
from elasticsearch import Elasticsearch, BadRequestError from elasticsearch import Elasticsearch, BadRequestError
from typing import Optional from typing import Optional
import ssl import ssl
from elasticsearch.helpers import bulk, scan from elasticsearch.helpers import bulk,scan
from open_webui.retrieval.vector.main import VectorItem, SearchResult, GetResult from open_webui.retrieval.vector.main import VectorItem, SearchResult, GetResult
from open_webui.config import ( from open_webui.config import (
ELASTICSEARCH_URL, ELASTICSEARCH_URL,
ELASTICSEARCH_CA_CERTS, ELASTICSEARCH_CA_CERTS,
ELASTICSEARCH_API_KEY, ELASTICSEARCH_API_KEY,
ELASTICSEARCH_USERNAME, ELASTICSEARCH_USERNAME,
ELASTICSEARCH_PASSWORD, ELASTICSEARCH_PASSWORD,
ELASTICSEARCH_CLOUD_ID, ELASTICSEARCH_CLOUD_ID,
ELASTICSEARCH_INDEX_PREFIX,
SSL_ASSERT_FINGERPRINT, SSL_ASSERT_FINGERPRINT,
) )
class ElasticsearchClient: class ElasticsearchClient:
""" """
Important: Important:
in order to reduce the number of indexes and since the embedding vector length is fixed, we avoid creating in order to reduce the number of indexes and since the embedding vector length is fixed, we avoid creating
an index for each file but store it as a text field, while seperating to different index an index for each file but store it as a text field, while seperating to different index
baesd on the embedding length. baesd on the embedding length.
""" """
def __init__(self): def __init__(self):
self.index_prefix = "open_webui_collections" self.index_prefix = ELASTICSEARCH_INDEX_PREFIX
self.client = Elasticsearch( self.client = Elasticsearch(
hosts=[ELASTICSEARCH_URL], hosts=[ELASTICSEARCH_URL],
ca_certs=ELASTICSEARCH_CA_CERTS, ca_certs=ELASTICSEARCH_CA_CERTS,
api_key=ELASTICSEARCH_API_KEY, api_key=ELASTICSEARCH_API_KEY,
cloud_id=ELASTICSEARCH_CLOUD_ID, cloud_id=ELASTICSEARCH_CLOUD_ID,
basic_auth=( basic_auth=(ELASTICSEARCH_USERNAME,ELASTICSEARCH_PASSWORD) if ELASTICSEARCH_USERNAME and ELASTICSEARCH_PASSWORD else None,
(ELASTICSEARCH_USERNAME, ELASTICSEARCH_PASSWORD) ssl_assert_fingerprint=SSL_ASSERT_FINGERPRINT
if ELASTICSEARCH_USERNAME and ELASTICSEARCH_PASSWORD
else None
),
ssl_assert_fingerprint=SSL_ASSERT_FINGERPRINT,
) )
#Status: works
# Status: works def _get_index_name(self,dimension:int)->str:
def _get_index_name(self, dimension: int) -> str:
return f"{self.index_prefix}_d{str(dimension)}" return f"{self.index_prefix}_d{str(dimension)}"
# Status: works #Status: works
def _scan_result_to_get_result(self, result) -> GetResult: def _scan_result_to_get_result(self, result) -> GetResult:
if not result: if not result:
return None return None
@ -56,7 +55,7 @@ class ElasticsearchClient:
return GetResult(ids=[ids], documents=[documents], metadatas=[metadatas]) return GetResult(ids=[ids], documents=[documents], metadatas=[metadatas])
# Status: works #Status: works
def _result_to_get_result(self, result) -> GetResult: def _result_to_get_result(self, result) -> GetResult:
if not result["hits"]["hits"]: if not result["hits"]["hits"]:
return None return None
@ -71,7 +70,7 @@ class ElasticsearchClient:
return GetResult(ids=[ids], documents=[documents], metadatas=[metadatas]) return GetResult(ids=[ids], documents=[documents], metadatas=[metadatas])
# Status: works #Status: works
def _result_to_search_result(self, result) -> SearchResult: def _result_to_search_result(self, result) -> SearchResult:
ids = [] ids = []
distances = [] distances = []
@ -85,16 +84,22 @@ class ElasticsearchClient:
metadatas.append(hit["_source"].get("metadata")) metadatas.append(hit["_source"].get("metadata"))
return SearchResult( return SearchResult(
ids=[ids], ids=[ids], distances=[distances], documents=[documents], metadatas=[metadatas]
distances=[distances],
documents=[documents],
metadatas=[metadatas],
) )
#Status: works
# Status: works
def _create_index(self, dimension: int): def _create_index(self, dimension: int):
body = { body = {
"mappings": { "mappings": {
"dynamic_templates": [
{
"strings": {
"match_mapping_type": "string",
"mapping": {
"type": "keyword"
}
}
}
],
"properties": { "properties": {
"collection": {"type": "keyword"}, "collection": {"type": "keyword"},
"id": {"type": "keyword"}, "id": {"type": "keyword"},
@ -110,51 +115,64 @@ class ElasticsearchClient:
} }
} }
self.client.indices.create(index=self._get_index_name(dimension), body=body) self.client.indices.create(index=self._get_index_name(dimension), body=body)
#Status: works
# Status: works
def _create_batches(self, items: list[VectorItem], batch_size=100): def _create_batches(self, items: list[VectorItem], batch_size=100):
for i in range(0, len(items), batch_size): for i in range(0, len(items), batch_size):
yield items[i : min(i + batch_size, len(items))] yield items[i : min(i + batch_size,len(items))]
# Status: works #Status: works
def has_collection(self, collection_name) -> bool: def has_collection(self,collection_name) -> bool:
query_body = {"query": {"bool": {"filter": []}}} query_body = {"query": {"bool": {"filter": []}}}
query_body["query"]["bool"]["filter"].append( query_body["query"]["bool"]["filter"].append({"term": {"collection": collection_name}})
{"term": {"collection": collection_name}}
)
try: try:
result = self.client.count(index=f"{self.index_prefix}*", body=query_body) result = self.client.count(
index=f"{self.index_prefix}*",
return result.body["count"] > 0 body=query_body
)
return result.body["count"]>0
except Exception as e: except Exception as e:
return None return None
# @TODO: Make this delete a collection and not an index
def delete_colleciton(self, collection_name: str): def delete_collection(self, collection_name: str):
# TODO: fix this to include the dimension or a * prefix query = {
# delete_collection here means delete a bunch of documents for an index. "query": {
# We are simply adapting to the norms of the other DBs. "term": {"collection": collection_name}
self.client.indices.delete(index=self._get_collection_name(collection_name)) }
}
# Status: works self.client.delete_by_query(index=f"{self.index_prefix}*", body=query)
#Status: works
def search( def search(
self, collection_name: str, vectors: list[list[float]], limit: int self, collection_name: str, vectors: list[list[float]], limit: int
) -> Optional[SearchResult]: ) -> Optional[SearchResult]:
query = { query = {
"size": limit, "size": limit,
"_source": ["text", "metadata"], "_source": [
"text",
"metadata"
],
"query": { "query": {
"script_score": { "script_score": {
"query": { "query": {
"bool": {"filter": [{"term": {"collection": collection_name}}]} "bool": {
"filter": [
{
"term": {
"collection": collection_name
}
}
]
}
}, },
"script": { "script": {
"source": "cosineSimilarity(params.vector, 'vector') + 1.0", "source": "cosineSimilarity(params.vector, 'vector') + 1.0",
"params": { "params": {
"vector": vectors[0] "vector": vectors[0]
}, # Assuming single query vector }, # Assuming single query vector
}, },
} }
}, },
@ -165,8 +183,7 @@ class ElasticsearchClient:
) )
return self._result_to_search_result(result) return self._result_to_search_result(result)
#Status: only tested halfwat
# Status: only tested halfwat
def query( def query(
self, collection_name: str, filter: dict, limit: Optional[int] = None self, collection_name: str, filter: dict, limit: Optional[int] = None
) -> Optional[GetResult]: ) -> Optional[GetResult]:
@ -180,9 +197,7 @@ class ElasticsearchClient:
for field, value in filter.items(): for field, value in filter.items():
query_body["query"]["bool"]["filter"].append({"term": {field: value}}) query_body["query"]["bool"]["filter"].append({"term": {field: value}})
query_body["query"]["bool"]["filter"].append( query_body["query"]["bool"]["filter"].append({"term": {"collection": collection_name}})
{"term": {"collection": collection_name}}
)
size = limit if limit else 10 size = limit if limit else 10
try: try:
@ -191,82 +206,109 @@ class ElasticsearchClient:
body=query_body, body=query_body,
size=size, size=size,
) )
return self._result_to_get_result(result) return self._result_to_get_result(result)
except Exception as e: except Exception as e:
return None return None
#Status: works
def _has_index(self,dimension:int):
return self.client.indices.exists(index=self._get_index_name(dimension=dimension))
# Status: works
def _has_index(self, dimension: int):
return self.client.indices.exists(
index=self._get_index_name(dimension=dimension)
)
def get_or_create_index(self, dimension: int): def get_or_create_index(self, dimension: int):
if not self._has_index(dimension=dimension): if not self._has_index(dimension=dimension):
self._create_index(dimension=dimension) self._create_index(dimension=dimension)
#Status: works
# Status: works
def get(self, collection_name: str) -> Optional[GetResult]: def get(self, collection_name: str) -> Optional[GetResult]:
# Get all the items in the collection. # Get all the items in the collection.
query = { query = {
"query": {"bool": {"filter": [{"term": {"collection": collection_name}}]}}, "query": {
"_source": ["text", "metadata"], "bool": {
} "filter": [
{
"term": {
"collection": collection_name
}
}
]
}
}, "_source": ["text", "metadata"]}
results = list(scan(self.client, index=f"{self.index_prefix}*", query=query)) results = list(scan(self.client, index=f"{self.index_prefix}*", query=query))
return self._scan_result_to_get_result(results) return self._scan_result_to_get_result(results)
# Status: works #Status: works
def insert(self, collection_name: str, items: list[VectorItem]): def insert(self, collection_name: str, items: list[VectorItem]):
if not self._has_index(dimension=len(items[0]["vector"])): if not self._has_index(dimension=len(items[0]["vector"])):
self._create_index(dimension=len(items[0]["vector"])) self._create_index(dimension=len(items[0]["vector"]))
for batch in self._create_batches(items):
actions = [
{
"_index":self._get_index_name(dimension=len(items[0]["vector"])),
"_id": item["id"],
"_source": {
"collection": collection_name,
"vector": item["vector"],
"text": item["text"],
"metadata": item["metadata"],
},
}
for item in batch
]
bulk(self.client,actions)
# Upsert documents using the update API with doc_as_upsert=True.
def upsert(self, collection_name: str, items: list[VectorItem]):
if not self._has_index(dimension=len(items[0]["vector"])):
self._create_index(dimension=len(items[0]["vector"]))
for batch in self._create_batches(items): for batch in self._create_batches(items):
actions = [ actions = [
{ {
"_index": self._get_index_name(dimension=len(items[0]["vector"])), "_op_type": "update",
"_index": self._get_index_name(dimension=len(item["vector"])),
"_id": item["id"], "_id": item["id"],
"_source": { "doc": {
"collection": collection_name, "collection": collection_name,
"vector": item["vector"], "vector": item["vector"],
"text": item["text"], "text": item["text"],
"metadata": item["metadata"], "metadata": item["metadata"],
}, },
"doc_as_upsert": True,
} }
for item in batch for item in batch
] ]
bulk(self.client, actions) bulk(self.client,actions)
# Status: should work
def upsert(self, collection_name: str, items: list[VectorItem]):
if not self._has_index(dimension=len(items[0]["vector"])):
self._create_index(collection_name, dimension=len(items[0]["vector"]))
for batch in self._create_batches(items): # Delete specific documents from a collection by filtering on both collection and document IDs.
actions = [ def delete(
{ self,
"_index": self._get_index_name(dimension=len(items[0]["vector"])), collection_name: str,
"_id": item["id"], ids: Optional[list[str]] = None,
"_source": { filter: Optional[dict] = None,
"vector": item["vector"], ):
"text": item["text"],
"metadata": item["metadata"], query = {
}, "query": {
"bool": {
"filter": [
{"term": {"collection": collection_name}}
]
} }
for item in batch }
] }
self.client.bulk(actions) #logic based on chromaDB
if ids:
query["query"]["bool"]["filter"].append({"terms": {"_id": ids}})
elif filter:
for field, value in filter.items():
query["query"]["bool"]["filter"].append({"term": {f"metadata.{field}": value}})
# TODO: This currently deletes by * which is not always supported in ElasticSearch.
# Need to read a bit before changing. Also, need to delete from a specific collection self.client.delete_by_query(index=f"{self.index_prefix}*", body=query)
def delete(self, collection_name: str, ids: list[str]):
# Assuming ID is unique across collections and indexes
actions = [
{"delete": {"_index": f"{self.index_prefix}*", "_id": id}} for id in ids
]
self.client.bulk(body=actions)
def reset(self): def reset(self):
indices = self.client.indices.get(index=f"{self.index_prefix}*") indices = self.client.indices.get(index=f"{self.index_prefix}*")