diff --git a/main.py b/main.py index 45db6e8..f3bcfbf 100644 --- a/main.py +++ b/main.py @@ -5,7 +5,7 @@ from fastapi.concurrency import run_in_threadpool from starlette.responses import StreamingResponse, Response from pydantic import BaseModel, ConfigDict -from typing import List, Union, Generator +from typing import List, Union, Generator, Iterator import time @@ -132,77 +132,88 @@ async def generate_openai_chat_completion(form_data: OpenAIChatCompletionForm): res = get_response( user_message, messages=form_data.messages, - body=form_data.model_dump_json(), + body=form_data.model_dump(), ) print(f"stream:true:{res}") - if isinstance(res, str): - message = stream_message_template(form_data.model, res) - print(f"stream_content:str:{message}") - yield f"data: {json.dumps(message)}\n\n" - - elif isinstance(res, Generator): - for message in res: - print(f"stream_content:Generator:{message}") - message = stream_message_template(form_data.model, message) + if isinstance(res, Iterator): + for line in res: + if line: + # Decode the JSON data + decoded_line = line.decode("utf-8") + print(f"stream_content:Iterator:{decoded_line}") + yield f"{decoded_line}\n\n" + else: + if isinstance(res, str): + message = stream_message_template(form_data.model, res) + print(f"stream_content:str:{message}") yield f"data: {json.dumps(message)}\n\n" - finish_message = { - "id": f"{form_data.model}-{str(uuid.uuid4())}", - "object": "chat.completion.chunk", - "created": int(time.time()), - "model": form_data.model, - "choices": [ - { - "index": 0, - "delta": {}, - "logprobs": None, - "finish_reason": "stop", - } - ], - } + elif isinstance(res, Generator): + for message in res: + print(f"stream_content:Generator:{message}") + message = stream_message_template(form_data.model, message) + yield f"data: {json.dumps(message)}\n\n" - yield f"data: {json.dumps(finish_message)}\n\n" - yield f"data: [DONE]" + finish_message = { + "id": f"{form_data.model}-{str(uuid.uuid4())}", + "object": "chat.completion.chunk", + "created": int(time.time()), + "model": form_data.model, + "choices": [ + { + "index": 0, + "delta": {}, + "logprobs": None, + "finish_reason": "stop", + } + ], + } + + yield f"data: {json.dumps(finish_message)}\n\n" + yield f"data: [DONE]" return StreamingResponse(stream_content(), media_type="text/event-stream") else: res = get_response( user_message, messages=form_data.messages, - body=form_data.model_dump_json(), + body=form_data.model_dump(), ) print(f"stream:false:{res}") - message = "" + if isinstance(res, dict): + return res + else: + message = "" - if isinstance(res, str): - message = res + if isinstance(res, str): + message = res - elif isinstance(res, Generator): - for stream in res: - message = f"{message}{stream}" + elif isinstance(res, Generator): + for stream in res: + message = f"{message}{stream}" - print(f"stream:false:{message}") + print(f"stream:false:{message}") - return { - "id": f"{form_data.model}-{str(uuid.uuid4())}", - "object": "chat.completion", - "created": int(time.time()), - "model": form_data.model, - "choices": [ - { - "index": 0, - "message": { - "role": "assistant", - "content": message, - }, - "logprobs": None, - "finish_reason": "stop", - } - ], - } + return { + "id": f"{form_data.model}-{str(uuid.uuid4())}", + "object": "chat.completion", + "created": int(time.time()), + "model": form_data.model, + "choices": [ + { + "index": 0, + "message": { + "role": "assistant", + "content": message, + }, + "logprobs": None, + "finish_reason": "stop", + } + ], + } return await run_in_threadpool(job) diff --git a/pipelines/examples/haystack_pipeline.py b/pipelines/examples/haystack_pipeline.py index 309e5f3..8917ebc 100644 --- a/pipelines/examples/haystack_pipeline.py +++ b/pipelines/examples/haystack_pipeline.py @@ -80,7 +80,7 @@ class Pipeline: def get_response( self, user_message: str, messages: List[OpenAIChatMessage], body: dict - ) -> Union[str, Generator]: + ) -> Union[str, Generator, Iterator]: # This is where you can add your custom RAG pipeline. # Typically, you would retrieve relevant information from your knowledge base and synthesize it to generate a response. diff --git a/pipelines/examples/llamaindex_ollama_github_pipeline.py b/pipelines/examples/llamaindex_ollama_github_pipeline.py index 09c7e9c..b000b74 100644 --- a/pipelines/examples/llamaindex_ollama_github_pipeline.py +++ b/pipelines/examples/llamaindex_ollama_github_pipeline.py @@ -71,7 +71,7 @@ class Pipeline: def get_response( self, user_message: str, messages: List[OpenAIChatMessage], body: dict - ) -> Union[str, Generator]: + ) -> Union[str, Generator, Iterator]: # This is where you can add your custom RAG pipeline. # Typically, you would retrieve relevant information from your knowledge base and synthesize it to generate a response. diff --git a/pipelines/examples/llamaindex_ollama_pipeline.py b/pipelines/examples/llamaindex_ollama_pipeline.py index c846c0d..5f4f614 100644 --- a/pipelines/examples/llamaindex_ollama_pipeline.py +++ b/pipelines/examples/llamaindex_ollama_pipeline.py @@ -31,7 +31,7 @@ class Pipeline: def get_response( self, user_message: str, messages: List[OpenAIChatMessage], body: dict - ) -> Union[str, Generator]: + ) -> Union[str, Generator, Iterator]: # This is where you can add your custom RAG pipeline. # Typically, you would retrieve relevant information from your knowledge base and synthesize it to generate a response. diff --git a/pipelines/examples/llamaindex_pipeline.py b/pipelines/examples/llamaindex_pipeline.py index d61777b..355c15d 100644 --- a/pipelines/examples/llamaindex_pipeline.py +++ b/pipelines/examples/llamaindex_pipeline.py @@ -26,7 +26,7 @@ class Pipeline: def get_response( self, user_message: str, messages: List[OpenAIChatMessage], body: dict - ) -> Union[str, Generator]: + ) -> Union[str, Generator, Iterator]: # This is where you can add your custom RAG pipeline. # Typically, you would retrieve relevant information from your knowledge base and synthesize it to generate a response. diff --git a/pipelines/examples/openai_pipeline.py b/pipelines/examples/openai_pipeline.py index 16e9db5..416fdcd 100644 --- a/pipelines/examples/openai_pipeline.py +++ b/pipelines/examples/openai_pipeline.py @@ -1,4 +1,4 @@ -from typing import List, Union, Generator +from typing import List, Union, Generator, Iterator from schemas import OpenAIChatMessage import requests @@ -19,31 +19,35 @@ class Pipeline: def get_response( self, user_message: str, messages: List[OpenAIChatMessage], body: dict - ) -> Union[str, Generator]: + ) -> Union[str, Generator, Iterator]: # This is where you can add your custom pipelines like RAG.' print(f"get_response:{__name__}") print(messages) print(user_message) - OPENAI_API_KEY = "your-api-key-here" + OPENAI_API_KEY = "your-openai-api-key-here" headers = {} headers["Authorization"] = f"Bearer {OPENAI_API_KEY}" headers["Content-Type"] = "application/json" - r = requests.request( - method="POST", - url="https://api.openai.com/v1", - data=body, - headers=headers, - stream=True, - ) + data = {**body, "model": "gpt-3.5-turbo"} - r.raise_for_status() + print(data) - # Check if response is SSE - if "text/event-stream" in r.headers.get("Content-Type", ""): - return r.iter_content(chunk_size=8192) - else: - response_data = r.json() - return f"{response_data['choices'][0]['text']}" + try: + r = requests.post( + url="https://api.openai.com/v1/chat/completions", + json={**body, "model": "gpt-3.5-turbo"}, + headers=headers, + stream=True, + ) + + r.raise_for_status() + + if data["stream"]: + return r.iter_lines() + else: + return r.json() + except Exception as e: + return f"Error: {e}" diff --git a/pipelines/examples/pipeline_example.py b/pipelines/examples/pipeline_example.py index ec9edfd..1684fbc 100644 --- a/pipelines/examples/pipeline_example.py +++ b/pipelines/examples/pipeline_example.py @@ -18,7 +18,7 @@ class Pipeline: def get_response( self, user_message: str, messages: List[OpenAIChatMessage], body: dict - ) -> Union[str, Generator]: + ) -> Union[str, Generator, Iterator]: # This is where you can add your custom pipelines like RAG.' print(f"get_response:{__name__}")