From 9af40624c5f0f8f7f640a11356e167543b07b2bb Mon Sep 17 00:00:00 2001 From: Timothy Jaeryang Baek Date: Thu, 22 Jan 2026 18:58:00 +0400 Subject: [PATCH] refac --- backend/open_webui/routers/retrieval.py | 80 +++++++++++++++---------- 1 file changed, 47 insertions(+), 33 deletions(-) diff --git a/backend/open_webui/routers/retrieval.py b/backend/open_webui/routers/retrieval.py index ce9153c8d..a37fc50b0 100644 --- a/backend/open_webui/routers/retrieval.py +++ b/backend/open_webui/routers/retrieval.py @@ -39,7 +39,7 @@ from langchain_core.documents import Document from open_webui.models.files import FileModel, FileUpdateForm, Files from open_webui.models.knowledge import Knowledges from open_webui.storage.provider import Storage -from open_webui.internal.db import get_session +from open_webui.internal.db import get_session, get_db from sqlalchemy.orm import Session @@ -1431,7 +1431,7 @@ def save_docs_to_vector_db( existing_file_id = None if result.metadatas and result.metadatas[0]: existing_file_id = result.metadatas[0][0].get("file_id") - + if existing_file_id != metadata.get("file_id"): log.info(f"Document with hash {metadata['hash']} already exists") raise ValueError(ERROR_MESSAGES.DUPLICATE_CONTENT) @@ -1602,6 +1602,9 @@ def process_file( ): """ Process a file and save its content to the vector database. + Process a file and save its content to the vector database. + Note: granular session management is used to prevent connection pool exhaustion. + The session is committed before external API calls, and updates use a fresh session. """ if user.role == "admin": file = Files.get_file_by_id(form_data.file_id, db=db) @@ -1763,6 +1766,13 @@ def process_file( } else: try: + # Release the database connection relative to the 'file' object + # to prevent holding the connection during the slow embedding step. + db.expunge(file) + db.commit() + + # External embedding API takes time (5-60s+). + # No DB connection is held here. result = save_docs_to_vector_db( request, docs=docs, @@ -1778,27 +1788,29 @@ def process_file( log.info(f"added {len(docs)} items to collection {collection_name}") if result: - Files.update_file_metadata_by_id( - file.id, - { + # Fresh session for the final update. + with get_db() as session: + Files.update_file_metadata_by_id( + file.id, + { + "collection_name": collection_name, + }, + db=session, + ) + + Files.update_file_data_by_id( + file.id, + {"status": "completed"}, + db=session, + ) + Files.update_file_hash_by_id(file.id, hash, db=session) + + return { + "status": True, "collection_name": collection_name, - }, - db=db, - ) - - Files.update_file_data_by_id( - file.id, - {"status": "completed"}, - db=db, - ) - Files.update_file_hash_by_id(file.id, hash, db=db) - - return { - "status": True, - "collection_name": collection_name, - "filename": file.filename, - "content": text_content, - } + "filename": file.filename, + "content": text_content, + } else: raise Exception("Error saving document to vector database") except Exception as e: @@ -1806,13 +1818,15 @@ def process_file( except Exception as e: log.exception(e) - Files.update_file_data_by_id( - file.id, - {"status": "failed"}, - db=db, - ) - # Clear the hash so the file can be re-uploaded after fixing the issue - Files.update_file_hash_by_id(file.id, None, db=db) + # Fresh session for error status update. + with get_db() as session: + Files.update_file_data_by_id( + file.id, + {"status": "failed"}, + db=session, + ) + # Clear the hash so the file can be re-uploaded after fixing the issue + Files.update_file_hash_by_id(file.id, None, db=session) if "No pandoc was found" in str(e): raise HTTPException( @@ -2709,9 +2723,7 @@ async def process_files_batch( # Update all files with collection name for file_update, file_result in zip(file_updates, file_results): - Files.update_file_by_id( - id=file_result.file_id, form_data=file_update - ) + Files.update_file_by_id(id=file_result.file_id, form_data=file_update) file_result.status = "completed" except Exception as e: @@ -2721,7 +2733,9 @@ async def process_files_batch( for file_result in file_results: file_result.status = "failed" file_errors.append( - BatchProcessFilesResult(file_id=file_result.file_id, status="failed", error=str(e)) + BatchProcessFilesResult( + file_id=file_result.file_id, status="failed", error=str(e) + ) ) return BatchProcessFilesResponse(results=file_results, errors=file_errors)