From 2b7f9d14d0290cd897d1791c35aa7f6a7054a60f Mon Sep 17 00:00:00 2001 From: Timothy Jaeryang Baek Date: Wed, 12 Feb 2025 23:21:16 -0800 Subject: [PATCH] refac --- backend/open_webui/utils/chat.py | 101 +++++++++++++++++-------------- src/routes/+layout.svelte | 32 ++++------ 2 files changed, 67 insertions(+), 66 deletions(-) diff --git a/backend/open_webui/utils/chat.py b/backend/open_webui/utils/chat.py index 97e1aae74..72f8eafe3 100644 --- a/backend/open_webui/utils/chat.py +++ b/backend/open_webui/utils/chat.py @@ -10,8 +10,8 @@ import inspect import uuid import asyncio -from fastapi import Request -from starlette.responses import Response, StreamingResponse +from fastapi import Request, status +from starlette.responses import Response, StreamingResponse, JSONResponse from open_webui.models.users import UserModel @@ -82,51 +82,16 @@ async def generate_direct_chat_completion( if form_data.get("stream"): q = asyncio.Queue() - # Define a generator to stream responses - async def event_generator(): - nonlocal q + async def message_listener(sid, data): + """ + Handle received socket messages and push them into the queue. + """ + await q.put(data) - async def message_listener(sid, data): - """ - Handle received socket messages and push them into the queue. - """ - await q.put(data) + # Register the listener + sio.on(channel, message_listener) - # Register the listener - sio.on(channel, message_listener) - - # Start processing chat completion in background - await event_emitter( - { - "type": "request:chat:completion", - "data": { - "form_data": form_data, - "model": models[form_data["model"]], - "channel": channel, - "session_id": session_id, - }, - } - ) - - try: - while True: - data = await q.get() # Wait for new messages - if isinstance(data, dict): - if "error" in data: - raise Exception(data["error"]) - - if "done" in data and data["done"]: - break # Stop streaming when 'done' is received - - yield f"data: {json.dumps(data)}\n\n" - elif isinstance(data, str): - yield data - finally: - del sio.handlers["/"][channel] # Remove the listener - - # Return the streaming response - return StreamingResponse(event_generator(), media_type="text/event-stream") - else: + # Start processing chat completion in background res = await event_caller( { "type": "request:chat:completion", @@ -139,7 +104,51 @@ async def generate_direct_chat_completion( } ) - print(res) + print("res", res) + + if res.get("status", False): + # Define a generator to stream responses + async def event_generator(): + nonlocal q + try: + while True: + data = await q.get() # Wait for new messages + if isinstance(data, dict): + if "done" in data and data["done"]: + break # Stop streaming when 'done' is received + + yield f"data: {json.dumps(data)}\n\n" + elif isinstance(data, str): + yield data + except Exception as e: + log.debug(f"Error in event generator: {e}") + pass + + # Define a background task to run the event generator + async def background(): + try: + del sio.handlers["/"][channel] + except Exception as e: + pass + + # Return the streaming response + return StreamingResponse( + event_generator(), media_type="text/event-stream", background=background + ) + else: + raise Exception(str(res)) + else: + res = await event_caller( + { + "type": "request:chat:completion", + "data": { + "form_data": form_data, + "model": models[form_data["model"]], + "channel": channel, + "session_id": session_id, + }, + } + ) if "error" in res: raise Exception(res["error"]) diff --git a/src/routes/+layout.svelte b/src/routes/+layout.svelte index 4d08b3e3d..bef581e74 100644 --- a/src/routes/+layout.svelte +++ b/src/routes/+layout.svelte @@ -279,8 +279,17 @@ OPENAI_API_URL ); - if (res && res.ok) { + if (res) { + // raise if the response is not ok + if (!res.ok) { + throw await res.json(); + } + if (form_data?.stream ?? false) { + cb({ + status: true + }); + // res will either be SSE or JSON const reader = res.body.getReader(); const decoder = new TextDecoder(); @@ -316,29 +325,12 @@ } } catch (error) { console.error('chatCompletion', error); - - if (form_data?.stream ?? false) { - $socket.emit(channel, { - error: error - }); - } else { - cb({ - error: error - }); - } + cb(error); } } } catch (error) { console.error('chatCompletion', error); - if (form_data?.stream ?? false) { - $socket.emit(channel, { - error: error - }); - } else { - cb({ - error: error - }); - } + cb(error); } finally { $socket.emit(channel, { done: true