flatten job()

This commit is contained in:
Michael Poluektov 2024-07-31 22:05:37 +01:00
parent b9b1fdd1a1
commit c89b34fd75

View File

@ -147,7 +147,7 @@ async def get_pipe_models():
function_module = get_function_module(pipe.id)
# Check if function is a manifold
if hasattr(function_module, "type") and function_module.type == "manifold":
if hasattr(function_module, "pipes"):
manifold_pipes = []
# Check if pipes is a function or a list
@ -343,70 +343,65 @@ async def generate_function_chat_completion(form_data, user):
form_data = add_model_params(params, form_data)
form_data = populate_system_message(params, form_data, user)
async def job():
pipe_id = get_pipe_id(form_data)
function_module = get_function_module(pipe_id)
pipe_id = get_pipe_id(form_data)
function_module = get_function_module(pipe_id)
pipe = function_module.pipe
params = get_params_dict(pipe, form_data, user, extra_params, function_module)
pipe = function_module.pipe
params = get_params_dict(pipe, form_data, user, extra_params, function_module)
if form_data["stream"]:
if form_data["stream"]:
async def stream_content():
try:
res = await execute_pipe(pipe, params)
# Directly return if the response is a StreamingResponse
if isinstance(res, StreamingResponse):
async for data in res.body_iterator:
yield data
return
if isinstance(res, dict):
yield f"data: {json.dumps(res)}\n\n"
return
except Exception as e:
print(f"Error: {e}")
yield f"data: {json.dumps({'error': {'detail':str(e)}})}\n\n"
return
if isinstance(res, str):
message = openai_chat_chunk_message_template(
form_data["model"], res
)
yield f"data: {json.dumps(message)}\n\n"
if isinstance(res, Iterator):
for line in res:
yield process_line(form_data, line)
if isinstance(res, AsyncGenerator):
async for line in res:
yield process_line(form_data, line)
if isinstance(res, str) or isinstance(res, Generator):
finish_message = openai_chat_chunk_message_template(
form_data["model"], ""
)
finish_message["choices"][0]["finish_reason"] = "stop"
yield f"data: {json.dumps(finish_message)}\n\n"
yield "data: [DONE]"
return StreamingResponse(stream_content(), media_type="text/event-stream")
else:
async def stream_content():
try:
res = await execute_pipe(pipe, params)
# Directly return if the response is a StreamingResponse
if isinstance(res, StreamingResponse):
async for data in res.body_iterator:
yield data
return
if isinstance(res, dict):
yield f"data: {json.dumps(res)}\n\n"
return
except Exception as e:
print(f"Error: {e}")
return {"error": {"detail": str(e)}}
yield f"data: {json.dumps({'error': {'detail':str(e)}})}\n\n"
return
if isinstance(res, StreamingResponse) or isinstance(res, dict):
return res
if isinstance(res, BaseModel):
return res.model_dump()
if isinstance(res, str):
message = openai_chat_chunk_message_template(form_data["model"], res)
yield f"data: {json.dumps(message)}\n\n"
message = await get_message_content(res)
return openai_chat_completion_message_template(form_data["model"], message)
if isinstance(res, Iterator):
for line in res:
yield process_line(form_data, line)
return await job()
if isinstance(res, AsyncGenerator):
async for line in res:
yield process_line(form_data, line)
if isinstance(res, str) or isinstance(res, Generator):
finish_message = openai_chat_chunk_message_template(
form_data["model"], ""
)
finish_message["choices"][0]["finish_reason"] = "stop"
yield f"data: {json.dumps(finish_message)}\n\n"
yield "data: [DONE]"
return StreamingResponse(stream_content(), media_type="text/event-stream")
else:
try:
res = await execute_pipe(pipe, params)
except Exception as e:
print(f"Error: {e}")
return {"error": {"detail": str(e)}}
if isinstance(res, StreamingResponse) or isinstance(res, dict):
return res
if isinstance(res, BaseModel):
return res.model_dump()
message = await get_message_content(res)
return openai_chat_completion_message_template(form_data["model"], message)