refac
This commit is contained in:
@@ -39,7 +39,7 @@ from langchain_core.documents import Document
|
||||
from open_webui.models.files import FileModel, FileUpdateForm, Files
|
||||
from open_webui.models.knowledge import Knowledges
|
||||
from open_webui.storage.provider import Storage
|
||||
from open_webui.internal.db import get_session
|
||||
from open_webui.internal.db import get_session, get_db
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
|
||||
@@ -1431,7 +1431,7 @@ def save_docs_to_vector_db(
|
||||
existing_file_id = None
|
||||
if result.metadatas and result.metadatas[0]:
|
||||
existing_file_id = result.metadatas[0][0].get("file_id")
|
||||
|
||||
|
||||
if existing_file_id != metadata.get("file_id"):
|
||||
log.info(f"Document with hash {metadata['hash']} already exists")
|
||||
raise ValueError(ERROR_MESSAGES.DUPLICATE_CONTENT)
|
||||
@@ -1602,6 +1602,9 @@ def process_file(
|
||||
):
|
||||
"""
|
||||
Process a file and save its content to the vector database.
|
||||
Process a file and save its content to the vector database.
|
||||
Note: granular session management is used to prevent connection pool exhaustion.
|
||||
The session is committed before external API calls, and updates use a fresh session.
|
||||
"""
|
||||
if user.role == "admin":
|
||||
file = Files.get_file_by_id(form_data.file_id, db=db)
|
||||
@@ -1763,6 +1766,13 @@ def process_file(
|
||||
}
|
||||
else:
|
||||
try:
|
||||
# Release the database connection relative to the 'file' object
|
||||
# to prevent holding the connection during the slow embedding step.
|
||||
db.expunge(file)
|
||||
db.commit()
|
||||
|
||||
# External embedding API takes time (5-60s+).
|
||||
# No DB connection is held here.
|
||||
result = save_docs_to_vector_db(
|
||||
request,
|
||||
docs=docs,
|
||||
@@ -1778,27 +1788,29 @@ def process_file(
|
||||
log.info(f"added {len(docs)} items to collection {collection_name}")
|
||||
|
||||
if result:
|
||||
Files.update_file_metadata_by_id(
|
||||
file.id,
|
||||
{
|
||||
# Fresh session for the final update.
|
||||
with get_db() as session:
|
||||
Files.update_file_metadata_by_id(
|
||||
file.id,
|
||||
{
|
||||
"collection_name": collection_name,
|
||||
},
|
||||
db=session,
|
||||
)
|
||||
|
||||
Files.update_file_data_by_id(
|
||||
file.id,
|
||||
{"status": "completed"},
|
||||
db=session,
|
||||
)
|
||||
Files.update_file_hash_by_id(file.id, hash, db=session)
|
||||
|
||||
return {
|
||||
"status": True,
|
||||
"collection_name": collection_name,
|
||||
},
|
||||
db=db,
|
||||
)
|
||||
|
||||
Files.update_file_data_by_id(
|
||||
file.id,
|
||||
{"status": "completed"},
|
||||
db=db,
|
||||
)
|
||||
Files.update_file_hash_by_id(file.id, hash, db=db)
|
||||
|
||||
return {
|
||||
"status": True,
|
||||
"collection_name": collection_name,
|
||||
"filename": file.filename,
|
||||
"content": text_content,
|
||||
}
|
||||
"filename": file.filename,
|
||||
"content": text_content,
|
||||
}
|
||||
else:
|
||||
raise Exception("Error saving document to vector database")
|
||||
except Exception as e:
|
||||
@@ -1806,13 +1818,15 @@ def process_file(
|
||||
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
Files.update_file_data_by_id(
|
||||
file.id,
|
||||
{"status": "failed"},
|
||||
db=db,
|
||||
)
|
||||
# Clear the hash so the file can be re-uploaded after fixing the issue
|
||||
Files.update_file_hash_by_id(file.id, None, db=db)
|
||||
# Fresh session for error status update.
|
||||
with get_db() as session:
|
||||
Files.update_file_data_by_id(
|
||||
file.id,
|
||||
{"status": "failed"},
|
||||
db=session,
|
||||
)
|
||||
# Clear the hash so the file can be re-uploaded after fixing the issue
|
||||
Files.update_file_hash_by_id(file.id, None, db=session)
|
||||
|
||||
if "No pandoc was found" in str(e):
|
||||
raise HTTPException(
|
||||
@@ -2709,9 +2723,7 @@ async def process_files_batch(
|
||||
|
||||
# Update all files with collection name
|
||||
for file_update, file_result in zip(file_updates, file_results):
|
||||
Files.update_file_by_id(
|
||||
id=file_result.file_id, form_data=file_update
|
||||
)
|
||||
Files.update_file_by_id(id=file_result.file_id, form_data=file_update)
|
||||
file_result.status = "completed"
|
||||
|
||||
except Exception as e:
|
||||
@@ -2721,7 +2733,9 @@ async def process_files_batch(
|
||||
for file_result in file_results:
|
||||
file_result.status = "failed"
|
||||
file_errors.append(
|
||||
BatchProcessFilesResult(file_id=file_result.file_id, status="failed", error=str(e))
|
||||
BatchProcessFilesResult(
|
||||
file_id=file_result.file_id, status="failed", error=str(e)
|
||||
)
|
||||
)
|
||||
|
||||
return BatchProcessFilesResponse(results=file_results, errors=file_errors)
|
||||
|
||||
Reference in New Issue
Block a user