This commit is contained in:
Timothy Jaeryang Baek 2025-02-12 23:21:16 -08:00
parent c83e68282d
commit 2b7f9d14d0
2 changed files with 67 additions and 66 deletions

View File

@ -10,8 +10,8 @@ import inspect
import uuid
import asyncio
from fastapi import Request
from starlette.responses import Response, StreamingResponse
from fastapi import Request, status
from starlette.responses import Response, StreamingResponse, JSONResponse
from open_webui.models.users import UserModel
@ -82,51 +82,16 @@ async def generate_direct_chat_completion(
if form_data.get("stream"):
q = asyncio.Queue()
# Define a generator to stream responses
async def event_generator():
nonlocal q
async def message_listener(sid, data):
"""
Handle received socket messages and push them into the queue.
"""
await q.put(data)
async def message_listener(sid, data):
"""
Handle received socket messages and push them into the queue.
"""
await q.put(data)
# Register the listener
sio.on(channel, message_listener)
# Register the listener
sio.on(channel, message_listener)
# Start processing chat completion in background
await event_emitter(
{
"type": "request:chat:completion",
"data": {
"form_data": form_data,
"model": models[form_data["model"]],
"channel": channel,
"session_id": session_id,
},
}
)
try:
while True:
data = await q.get() # Wait for new messages
if isinstance(data, dict):
if "error" in data:
raise Exception(data["error"])
if "done" in data and data["done"]:
break # Stop streaming when 'done' is received
yield f"data: {json.dumps(data)}\n\n"
elif isinstance(data, str):
yield data
finally:
del sio.handlers["/"][channel] # Remove the listener
# Return the streaming response
return StreamingResponse(event_generator(), media_type="text/event-stream")
else:
# Start processing chat completion in background
res = await event_caller(
{
"type": "request:chat:completion",
@ -139,7 +104,51 @@ async def generate_direct_chat_completion(
}
)
print(res)
print("res", res)
if res.get("status", False):
# Define a generator to stream responses
async def event_generator():
nonlocal q
try:
while True:
data = await q.get() # Wait for new messages
if isinstance(data, dict):
if "done" in data and data["done"]:
break # Stop streaming when 'done' is received
yield f"data: {json.dumps(data)}\n\n"
elif isinstance(data, str):
yield data
except Exception as e:
log.debug(f"Error in event generator: {e}")
pass
# Define a background task to run the event generator
async def background():
try:
del sio.handlers["/"][channel]
except Exception as e:
pass
# Return the streaming response
return StreamingResponse(
event_generator(), media_type="text/event-stream", background=background
)
else:
raise Exception(str(res))
else:
res = await event_caller(
{
"type": "request:chat:completion",
"data": {
"form_data": form_data,
"model": models[form_data["model"]],
"channel": channel,
"session_id": session_id,
},
}
)
if "error" in res:
raise Exception(res["error"])

View File

@ -279,8 +279,17 @@
OPENAI_API_URL
);
if (res && res.ok) {
if (res) {
// raise if the response is not ok
if (!res.ok) {
throw await res.json();
}
if (form_data?.stream ?? false) {
cb({
status: true
});
// res will either be SSE or JSON
const reader = res.body.getReader();
const decoder = new TextDecoder();
@ -316,29 +325,12 @@
}
} catch (error) {
console.error('chatCompletion', error);
if (form_data?.stream ?? false) {
$socket.emit(channel, {
error: error
});
} else {
cb({
error: error
});
}
cb(error);
}
}
} catch (error) {
console.error('chatCompletion', error);
if (form_data?.stream ?? false) {
$socket.emit(channel, {
error: error
});
} else {
cb({
error: error
});
}
cb(error);
} finally {
$socket.emit(channel, {
done: true