diff --git a/backend/open_webui/routers/knowledge.py b/backend/open_webui/routers/knowledge.py index 87e8599ed..ad67cc31f 100644 --- a/backend/open_webui/routers/knowledge.py +++ b/backend/open_webui/routers/knowledge.py @@ -520,6 +520,7 @@ async def reset_knowledge_by_id(id: str, user=Depends(get_verified_user)): @router.post("/{id}/files/batch/add", response_model=Optional[KnowledgeFilesResponse]) def add_files_to_knowledge_batch( + request: Request, id: str, form_data: list[KnowledgeFileIdForm], user=Depends(get_verified_user), @@ -555,7 +556,9 @@ def add_files_to_knowledge_batch( # Process files try: result = process_files_batch( - BatchProcessFilesForm(files=files, collection_name=id) + request=request, + form_data=BatchProcessFilesForm(files=files, collection_name=id), + user=user, ) except Exception as e: log.error( diff --git a/backend/open_webui/routers/retrieval.py b/backend/open_webui/routers/retrieval.py index d6ff463a9..c791bde84 100644 --- a/backend/open_webui/routers/retrieval.py +++ b/backend/open_webui/routers/retrieval.py @@ -1458,6 +1458,7 @@ class BatchProcessFilesResponse(BaseModel): @router.post("/process/files/batch") def process_files_batch( + request: Request, form_data: BatchProcessFilesForm, user=Depends(get_verified_user), ) -> BatchProcessFilesResponse: @@ -1504,7 +1505,10 @@ def process_files_batch( if all_docs: try: save_docs_to_vector_db( - docs=all_docs, collection_name=collection_name, add=True + request=request, + docs=all_docs, + collection_name=collection_name, + add=True, ) # Update all files with collection name diff --git a/backend/open_webui/utils/middleware.py b/backend/open_webui/utils/middleware.py index 4741623c4..0e32bf626 100644 --- a/backend/open_webui/utils/middleware.py +++ b/backend/open_webui/utils/middleware.py @@ -23,7 +23,7 @@ from open_webui.models.users import Users from open_webui.socket.main import ( get_event_call, get_event_emitter, - get_user_id_from_session_pool, + get_active_status_by_user_id, ) from open_webui.routers.tasks import ( generate_queries, @@ -750,7 +750,7 @@ async def process_chat_response( ): async def background_tasks_handler(): message_map = Chats.get_messages_by_chat_id(metadata["chat_id"]) - message = message_map.get(metadata["message_id"]) + message = message_map.get(metadata["message_id"]) if message_map else None if message: messages = get_message_list(message_map, message.get("id")) @@ -896,7 +896,7 @@ async def process_chat_response( ) # Send a webhook notification if the user is not active - if get_user_id_from_session_pool(metadata["session_id"]) is None: + if get_active_status_by_user_id(user.id) is None: webhook_url = Users.get_user_webhook_url_by_id(user.id) if webhook_url: post_webhook( @@ -1002,51 +1002,56 @@ async def process_chat_response( "content": content, } + await event_emitter( + { + "type": "chat:completion", + "data": data, + } + ) + except Exception as e: done = "data: [DONE]" in line - title = Chats.get_chat_title_by_id(metadata["chat_id"]) if done: - data = {"done": True, "content": content, "title": title} - - if not ENABLE_REALTIME_CHAT_SAVE: - # Save message in the database - Chats.upsert_message_to_chat_by_id_and_message_id( - metadata["chat_id"], - metadata["message_id"], - { - "content": content, - }, - ) - - # Send a webhook notification if the user is not active - if ( - get_user_id_from_session_pool(metadata["session_id"]) - is None - ): - webhook_url = Users.get_user_webhook_url_by_id(user.id) - if webhook_url: - post_webhook( - webhook_url, - f"{title} - {request.app.state.config.WEBUI_URL}/c/{metadata['chat_id']}\n\n{content}", - { - "action": "chat", - "message": content, - "title": title, - "url": f"{request.app.state.config.WEBUI_URL}/c/{metadata['chat_id']}", - }, - ) - + pass else: continue - await event_emitter( + title = Chats.get_chat_title_by_id(metadata["chat_id"]) + data = {"done": True, "content": content, "title": title} + + if not ENABLE_REALTIME_CHAT_SAVE: + # Save message in the database + Chats.upsert_message_to_chat_by_id_and_message_id( + metadata["chat_id"], + metadata["message_id"], { - "type": "chat:completion", - "data": data, - } + "content": content, + }, ) + # Send a webhook notification if the user is not active + if get_active_status_by_user_id(user.id) is None: + webhook_url = Users.get_user_webhook_url_by_id(user.id) + if webhook_url: + post_webhook( + webhook_url, + f"{title} - {request.app.state.config.WEBUI_URL}/c/{metadata['chat_id']}\n\n{content}", + { + "action": "chat", + "message": content, + "title": title, + "url": f"{request.app.state.config.WEBUI_URL}/c/{metadata['chat_id']}", + }, + ) + + await event_emitter( + { + "type": "chat:completion", + "data": data, + } + ) + await background_tasks_handler() except asyncio.CancelledError: print("Task was cancelled!") diff --git a/backend/open_webui/utils/response.py b/backend/open_webui/utils/response.py index d429db8aa..d6f7b0ac6 100644 --- a/backend/open_webui/utils/response.py +++ b/backend/open_webui/utils/response.py @@ -29,7 +29,7 @@ async def convert_streaming_response_ollama_to_openai(ollama_streaming_response) ( ( data.get("eval_count", 0) - / ((data.get("eval_duration", 0) / 1_000_000)) + / ((data.get("eval_duration", 0) / 10_000_000)) ) * 100 ), @@ -43,7 +43,7 @@ async def convert_streaming_response_ollama_to_openai(ollama_streaming_response) ( ( data.get("prompt_eval_count", 0) - / ((data.get("prompt_eval_duration", 0) / 1_000_000)) + / ((data.get("prompt_eval_duration", 0) / 10_000_000)) ) * 100 ),