mirror of
https://github.com/open-webui/pipelines
synced 2025-05-21 12:36:17 +00:00
Adding status event in pipeline
This commit is contained in:
parent
275655fd2e
commit
6e4b210f14
73
examples/pipelines/pipeline_with_status_event.py
Normal file
73
examples/pipelines/pipeline_with_status_event.py
Normal file
@ -0,0 +1,73 @@
|
|||||||
|
from typing import List, Union, Generator, Iterator, Optional
|
||||||
|
from pprint import pprint
|
||||||
|
import time
|
||||||
|
|
||||||
|
# Uncomment to disable SSL verification warnings if needed.
|
||||||
|
# warnings.filterwarnings('ignore', message='Unverified HTTPS request')
|
||||||
|
|
||||||
|
class Pipeline:
|
||||||
|
def __init__(self):
|
||||||
|
self.name = "Pipeline with Status Event"
|
||||||
|
self.description = "This is a pipeline that demonstrates how to use the status event."
|
||||||
|
self.debug = True
|
||||||
|
self.version = "0.1.0"
|
||||||
|
self.author = "Anthony Durussel"
|
||||||
|
|
||||||
|
async def on_startup(self):
|
||||||
|
# This function is called when the server is started.
|
||||||
|
print(f"on_startup: {__name__}")
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def on_shutdown(self):
|
||||||
|
# This function is called when the server is shutdown.
|
||||||
|
print(f"on_shutdown: {__name__}")
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
|
||||||
|
# This function is called before the OpenAI API request is made. You can modify the form data before it is sent to the OpenAI API.
|
||||||
|
print(f"inlet: {__name__}")
|
||||||
|
if self.debug:
|
||||||
|
print(f"inlet: {__name__} - body:")
|
||||||
|
pprint(body)
|
||||||
|
print(f"inlet: {__name__} - user:")
|
||||||
|
pprint(user)
|
||||||
|
return body
|
||||||
|
|
||||||
|
async def outlet(self, body: dict, user: Optional[dict] = None) -> dict:
|
||||||
|
# This function is called after the OpenAI API response is completed. You can modify the messages after they are received from the OpenAI API.
|
||||||
|
print(f"outlet: {__name__}")
|
||||||
|
if self.debug:
|
||||||
|
print(f"outlet: {__name__} - body:")
|
||||||
|
pprint(body)
|
||||||
|
print(f"outlet: {__name__} - user:")
|
||||||
|
pprint(user)
|
||||||
|
return body
|
||||||
|
|
||||||
|
def pipe(self, user_message: str, model_id: str, messages: List[dict], body: dict, __event_emitter__=None) -> Union[str, Generator, Iterator]:
|
||||||
|
print(f"pipe: {__name__}")
|
||||||
|
|
||||||
|
if self.debug:
|
||||||
|
print(f"pipe: {__name__} - received message from user: {user_message}")
|
||||||
|
|
||||||
|
if __event_emitter__:
|
||||||
|
yield __event_emitter__({
|
||||||
|
"type": "status",
|
||||||
|
"data": {
|
||||||
|
"description": "Fake Status",
|
||||||
|
"done": False,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
time.sleep(5) # Sleep for 5 seconds
|
||||||
|
|
||||||
|
|
||||||
|
yield f"user_message: {user_message}"
|
||||||
|
|
||||||
|
if __event_emitter__:
|
||||||
|
yield __event_emitter__({
|
||||||
|
"type": "status",
|
||||||
|
"data": {
|
||||||
|
"description": "",
|
||||||
|
"done": True,
|
||||||
|
},
|
||||||
|
})
|
17
main.py
17
main.py
@ -682,13 +682,18 @@ async def generate_openai_chat_completion(form_data: OpenAIChatCompletionForm):
|
|||||||
pipe = PIPELINE_MODULES[pipeline_id].pipe
|
pipe = PIPELINE_MODULES[pipeline_id].pipe
|
||||||
|
|
||||||
if form_data.stream:
|
if form_data.stream:
|
||||||
|
|
||||||
def stream_content():
|
def stream_content():
|
||||||
|
|
||||||
|
def __event_emitter__(event):
|
||||||
|
logging.error(f"stream_event:{event}")
|
||||||
|
return f"event: {json.dumps(event)}\n\n"
|
||||||
|
|
||||||
res = pipe(
|
res = pipe(
|
||||||
user_message=user_message,
|
user_message=user_message,
|
||||||
model_id=pipeline_id,
|
model_id=pipeline_id,
|
||||||
messages=messages,
|
messages=messages,
|
||||||
body=form_data.model_dump(),
|
body=form_data.model_dump(),
|
||||||
|
__event_emitter__=__event_emitter__,
|
||||||
)
|
)
|
||||||
|
|
||||||
logging.info(f"stream:true:{res}")
|
logging.info(f"stream:true:{res}")
|
||||||
@ -711,7 +716,7 @@ async def generate_openai_chat_completion(form_data: OpenAIChatCompletionForm):
|
|||||||
|
|
||||||
logging.info(f"stream_content:Generator:{line}")
|
logging.info(f"stream_content:Generator:{line}")
|
||||||
|
|
||||||
if line.startswith("data:"):
|
if line.startswith("data:") or line.startswith("event:"):
|
||||||
yield f"{line}\n\n"
|
yield f"{line}\n\n"
|
||||||
else:
|
else:
|
||||||
line = stream_message_template(form_data.model, line)
|
line = stream_message_template(form_data.model, line)
|
||||||
@ -732,7 +737,13 @@ async def generate_openai_chat_completion(form_data: OpenAIChatCompletionForm):
|
|||||||
}
|
}
|
||||||
],
|
],
|
||||||
}
|
}
|
||||||
|
event = {
|
||||||
|
"type" : 'status',
|
||||||
|
"data" : {
|
||||||
|
"description" : "test",
|
||||||
|
"done" : True
|
||||||
|
}
|
||||||
|
}
|
||||||
yield f"data: {json.dumps(finish_message)}\n\n"
|
yield f"data: {json.dumps(finish_message)}\n\n"
|
||||||
yield f"data: [DONE]"
|
yield f"data: [DONE]"
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user