From 9c64310db530d92062dacbfdf4d5409162c8ad5d Mon Sep 17 00:00:00 2001 From: Phlogi Date: Mon, 31 Mar 2025 16:43:37 +0200 Subject: [PATCH] Run hybrid_search in parallel --- backend/open_webui/retrieval/utils.py | 52 ++++++++++++++++----------- 1 file changed, 31 insertions(+), 21 deletions(-) diff --git a/backend/open_webui/retrieval/utils.py b/backend/open_webui/retrieval/utils.py index bcffbc139..bb003ddfe 100644 --- a/backend/open_webui/retrieval/utils.py +++ b/backend/open_webui/retrieval/utils.py @@ -4,6 +4,7 @@ from typing import Optional, Union import requests import hashlib +from concurrent.futures import ThreadPoolExecutor from huggingface_hub import snapshot_download from langchain.retrievers import ContextualCompressionRetriever, EnsembleRetriever @@ -298,30 +299,39 @@ def query_collection_with_hybrid_search( log.exception(f"Failed to fetch collection {collection_name}: {e}") collection_results[collection_name] = None - for collection_name in collection_names: + log.info(f"Starting hybrid search for {len(queries)} queries in {len(collection_names)} collections...") + def process_query(collection_name, query): try: - for query in queries: - result = query_doc_with_hybrid_search( - collection_name=collection_name, - collection_result=collection_results[collection_name], - query=query, - embedding_function=embedding_function, - k=k, - reranking_function=reranking_function, - k_reranker=k_reranker, - r=r, - ) - results.append(result) - except Exception as e: - log.exception( - "Error when querying the collection with " f"hybrid_search: {e}" + result = query_doc_with_hybrid_search( + collection_name=collection_name, + collection_result=collection_results[collection_name], + query=query, + embedding_function=embedding_function, + k=k, + reranking_function=reranking_function, + k_reranker=k_reranker, + r=r, ) - error = True + return result, None + except Exception as e: + log.exception(f"Error when querying the collection with hybrid_search: {e}") + return None, e - if error: - raise Exception( - "Hybrid search failed for all collections. Using Non hybrid search as fallback." - ) + tasks = [(collection_name, query) for collection_name in collection_names for query in queries] + + with ThreadPoolExecutor() as executor: + future_results = [executor.submit(process_query, cn, q) for cn, q in tasks] + task_results = [future.result() for future in future_results] + + for result, err in task_results: + if err is not None: + error = True + elif result is not None: + results.append(result) + + if error and not results: + raise Exception("Hybrid search failed for all collections. Using Non-hybrid search as fallback.") + return merge_and_sort_query_results(results, k=k)