fix opensearch race condition, use keyword search instead of full text search in filter query

This commit is contained in:
Doris Lam 2025-06-23 18:43:33 -07:00
parent 610680ac14
commit 74ae9ab897

View File

@ -157,10 +157,10 @@ class OpenSearchClient(VectorDBBase):
for field, value in filter.items(): for field, value in filter.items():
query_body["query"]["bool"]["filter"].append( query_body["query"]["bool"]["filter"].append(
{"match": {"metadata." + str(field): value}} {"term": {"metadata." + str(field) + ".keyword": value}}
) )
size = limit if limit else 10 size = limit if limit else 10000
try: try:
result = self.client.search( result = self.client.search(
@ -206,6 +206,7 @@ class OpenSearchClient(VectorDBBase):
for item in batch for item in batch
] ]
bulk(self.client, actions) bulk(self.client, actions)
self.client.indices.refresh(self._get_index_name(collection_name))
def upsert(self, collection_name: str, items: list[VectorItem]): def upsert(self, collection_name: str, items: list[VectorItem]):
self._create_index_if_not_exists( self._create_index_if_not_exists(
@ -228,6 +229,7 @@ class OpenSearchClient(VectorDBBase):
for item in batch for item in batch
] ]
bulk(self.client, actions) bulk(self.client, actions)
self.client.indices.refresh(self._get_index_name(collection_name))
def delete( def delete(
self, self,
@ -251,11 +253,12 @@ class OpenSearchClient(VectorDBBase):
} }
for field, value in filter.items(): for field, value in filter.items():
query_body["query"]["bool"]["filter"].append( query_body["query"]["bool"]["filter"].append(
{"match": {"metadata." + str(field): value}} {"term": {"metadata." + str(field) + ".keyword": value}}
) )
self.client.delete_by_query( self.client.delete_by_query(
index=self._get_index_name(collection_name), body=query_body index=self._get_index_name(collection_name), body=query_body
) )
self.client.indices.refresh(self._get_index_name(collection_name))
def reset(self): def reset(self):
indices = self.client.indices.get(index=f"{self.index_prefix}_*") indices = self.client.indices.get(index=f"{self.index_prefix}_*")