mirror of
https://github.com/open-webui/pipelines
synced 2025-05-12 00:20:48 +00:00
48 lines
1.5 KiB
Python
48 lines
1.5 KiB
Python
from typing import List, Union, Generator, Iterator
|
|
from schemas import OpenAIChatMessage
|
|
import subprocess
|
|
|
|
|
|
class Pipeline:
|
|
def __init__(self):
|
|
# Optionally, you can set the id and name of the pipeline.
|
|
self.id = "python_code_pipeline"
|
|
self.name = "Python Code Pipeline"
|
|
pass
|
|
|
|
async def on_startup(self):
|
|
# This function is called when the server is started.
|
|
print(f"on_startup:{__name__}")
|
|
pass
|
|
|
|
async def on_shutdown(self):
|
|
# This function is called when the server is stopped.
|
|
print(f"on_shutdown:{__name__}")
|
|
pass
|
|
|
|
def execute_python_code(self, code):
|
|
try:
|
|
result = subprocess.run(
|
|
["python", "-c", code], capture_output=True, text=True, check=True
|
|
)
|
|
stdout = result.stdout.strip()
|
|
return stdout, result.returncode
|
|
except subprocess.CalledProcessError as e:
|
|
return e.output.strip(), e.returncode
|
|
|
|
def pipe(
|
|
self, user_message: str, model_id: str, messages: List[dict], body: dict
|
|
) -> Union[str, Generator, Iterator]:
|
|
# This is where you can add your custom pipelines like RAG.
|
|
print(f"pipe:{__name__}")
|
|
|
|
print(messages)
|
|
print(user_message)
|
|
|
|
if body.get("title", False):
|
|
print("Title Generation")
|
|
return "Python Code Pipeline"
|
|
else:
|
|
stdout, return_code = self.execute_python_code(user_message)
|
|
return stdout
|