diff --git a/examples/pipelines/pipeline_with_status_event.py b/examples/pipelines/pipeline_with_status_event.py new file mode 100644 index 0000000..8ccf2cf --- /dev/null +++ b/examples/pipelines/pipeline_with_status_event.py @@ -0,0 +1,73 @@ +from typing import List, Union, Generator, Iterator, Optional +from pprint import pprint +import time + +# Uncomment to disable SSL verification warnings if needed. +# warnings.filterwarnings('ignore', message='Unverified HTTPS request') + +class Pipeline: + def __init__(self): + self.name = "Pipeline with Status Event" + self.description = "This is a pipeline that demonstrates how to use the status event." + self.debug = True + self.version = "0.1.0" + self.author = "Anthony Durussel" + + async def on_startup(self): + # This function is called when the server is started. + print(f"on_startup: {__name__}") + pass + + async def on_shutdown(self): + # This function is called when the server is shutdown. + print(f"on_shutdown: {__name__}") + pass + + async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: + # This function is called before the OpenAI API request is made. You can modify the form data before it is sent to the OpenAI API. + print(f"inlet: {__name__}") + if self.debug: + print(f"inlet: {__name__} - body:") + pprint(body) + print(f"inlet: {__name__} - user:") + pprint(user) + return body + + async def outlet(self, body: dict, user: Optional[dict] = None) -> dict: + # This function is called after the OpenAI API response is completed. You can modify the messages after they are received from the OpenAI API. + print(f"outlet: {__name__}") + if self.debug: + print(f"outlet: {__name__} - body:") + pprint(body) + print(f"outlet: {__name__} - user:") + pprint(user) + return body + + def pipe(self, user_message: str, model_id: str, messages: List[dict], body: dict, __event_emitter__=None) -> Union[str, Generator, Iterator]: + print(f"pipe: {__name__}") + + if self.debug: + print(f"pipe: {__name__} - received message from user: {user_message}") + + if __event_emitter__: + yield __event_emitter__({ + "type": "status", + "data": { + "description": "Fake Status", + "done": False, + }, + }) + + time.sleep(5) # Sleep for 5 seconds + + + yield f"user_message: {user_message}" + + if __event_emitter__: + yield __event_emitter__({ + "type": "status", + "data": { + "description": "", + "done": True, + }, + }) diff --git a/main.py b/main.py index cff3335..017b10a 100644 --- a/main.py +++ b/main.py @@ -682,13 +682,18 @@ async def generate_openai_chat_completion(form_data: OpenAIChatCompletionForm): pipe = PIPELINE_MODULES[pipeline_id].pipe if form_data.stream: - def stream_content(): + + def __event_emitter__(event): + logging.error(f"stream_event:{event}") + return f"event: {json.dumps(event)}\n\n" + res = pipe( user_message=user_message, model_id=pipeline_id, messages=messages, body=form_data.model_dump(), + __event_emitter__=__event_emitter__, ) logging.info(f"stream:true:{res}") @@ -711,7 +716,7 @@ async def generate_openai_chat_completion(form_data: OpenAIChatCompletionForm): logging.info(f"stream_content:Generator:{line}") - if line.startswith("data:"): + if line.startswith("data:") or line.startswith("event:"): yield f"{line}\n\n" else: line = stream_message_template(form_data.model, line) @@ -732,7 +737,13 @@ async def generate_openai_chat_completion(form_data: OpenAIChatCompletionForm): } ], } - + event = { + "type" : 'status', + "data" : { + "description" : "test", + "done" : True + } + } yield f"data: {json.dumps(finish_message)}\n\n" yield f"data: [DONE]"