From c89b34fd75ed4a76d8d491061a80a1e8a36d2dd3 Mon Sep 17 00:00:00 2001 From: Michael Poluektov Date: Wed, 31 Jul 2024 22:05:37 +0100 Subject: [PATCH] flatten job() --- backend/apps/webui/main.py | 109 ++++++++++++++++++------------------- 1 file changed, 52 insertions(+), 57 deletions(-) diff --git a/backend/apps/webui/main.py b/backend/apps/webui/main.py index 69493f5cc..96adb5080 100644 --- a/backend/apps/webui/main.py +++ b/backend/apps/webui/main.py @@ -147,7 +147,7 @@ async def get_pipe_models(): function_module = get_function_module(pipe.id) # Check if function is a manifold - if hasattr(function_module, "type") and function_module.type == "manifold": + if hasattr(function_module, "pipes"): manifold_pipes = [] # Check if pipes is a function or a list @@ -343,70 +343,65 @@ async def generate_function_chat_completion(form_data, user): form_data = add_model_params(params, form_data) form_data = populate_system_message(params, form_data, user) - async def job(): - pipe_id = get_pipe_id(form_data) - function_module = get_function_module(pipe_id) + pipe_id = get_pipe_id(form_data) + function_module = get_function_module(pipe_id) - pipe = function_module.pipe - params = get_params_dict(pipe, form_data, user, extra_params, function_module) + pipe = function_module.pipe + params = get_params_dict(pipe, form_data, user, extra_params, function_module) - if form_data["stream"]: + if form_data["stream"]: - async def stream_content(): - try: - res = await execute_pipe(pipe, params) - - # Directly return if the response is a StreamingResponse - if isinstance(res, StreamingResponse): - async for data in res.body_iterator: - yield data - return - if isinstance(res, dict): - yield f"data: {json.dumps(res)}\n\n" - return - - except Exception as e: - print(f"Error: {e}") - yield f"data: {json.dumps({'error': {'detail':str(e)}})}\n\n" - return - - if isinstance(res, str): - message = openai_chat_chunk_message_template( - form_data["model"], res - ) - yield f"data: {json.dumps(message)}\n\n" - - if isinstance(res, Iterator): - for line in res: - yield process_line(form_data, line) - - if isinstance(res, AsyncGenerator): - async for line in res: - yield process_line(form_data, line) - - if isinstance(res, str) or isinstance(res, Generator): - finish_message = openai_chat_chunk_message_template( - form_data["model"], "" - ) - finish_message["choices"][0]["finish_reason"] = "stop" - yield f"data: {json.dumps(finish_message)}\n\n" - yield "data: [DONE]" - - return StreamingResponse(stream_content(), media_type="text/event-stream") - else: + async def stream_content(): try: res = await execute_pipe(pipe, params) + # Directly return if the response is a StreamingResponse + if isinstance(res, StreamingResponse): + async for data in res.body_iterator: + yield data + return + if isinstance(res, dict): + yield f"data: {json.dumps(res)}\n\n" + return + except Exception as e: print(f"Error: {e}") - return {"error": {"detail": str(e)}} + yield f"data: {json.dumps({'error': {'detail':str(e)}})}\n\n" + return - if isinstance(res, StreamingResponse) or isinstance(res, dict): - return res - if isinstance(res, BaseModel): - return res.model_dump() + if isinstance(res, str): + message = openai_chat_chunk_message_template(form_data["model"], res) + yield f"data: {json.dumps(message)}\n\n" - message = await get_message_content(res) - return openai_chat_completion_message_template(form_data["model"], message) + if isinstance(res, Iterator): + for line in res: + yield process_line(form_data, line) - return await job() + if isinstance(res, AsyncGenerator): + async for line in res: + yield process_line(form_data, line) + + if isinstance(res, str) or isinstance(res, Generator): + finish_message = openai_chat_chunk_message_template( + form_data["model"], "" + ) + finish_message["choices"][0]["finish_reason"] = "stop" + yield f"data: {json.dumps(finish_message)}\n\n" + yield "data: [DONE]" + + return StreamingResponse(stream_content(), media_type="text/event-stream") + else: + try: + res = await execute_pipe(pipe, params) + + except Exception as e: + print(f"Error: {e}") + return {"error": {"detail": str(e)}} + + if isinstance(res, StreamingResponse) or isinstance(res, dict): + return res + if isinstance(res, BaseModel): + return res.model_dump() + + message = await get_message_content(res) + return openai_chat_completion_message_template(form_data["model"], message)