mirror of
https://github.com/open-webui/pipelines
synced 2025-06-26 18:15:58 +00:00
Merge branch 'open-webui:main' into mlx-lm
This commit is contained in:
commit
61802b0d95
23
README.md
Normal file
23
README.md
Normal file
@ -0,0 +1,23 @@
|
||||
<p align="center">
|
||||
<a href="#"><img src="./header.png" alt="Pipelines Logo"></a>
|
||||
</p>
|
||||
|
||||
# Pipelines: UI-Agnostic OpenAI API Plugin Framework
|
||||
|
||||
Welcome to **Pipelines**, [Open WebUI](https://github.com/open-webui) initiative that brings modular, customizable workflows to any UI client supporting OpenAI API specs – and much more! Dive into a world where you can effortlessly extend functionalities, integrate unique logic, and create dynamic agentic workflows, all with a few lines of code.
|
||||
|
||||
## 🚀 Why Pipelines?
|
||||
|
||||
- **Seamless Integration:** Compatible with any UI/client that supports OpenAI API specs.
|
||||
- **Endless Possibilities:** Got a specific need? Pipelines make it easy to add your custom logic and functionalities. Integrate any Python library, from AI agents via libraries like CrewAI to API calls for home automation – the sky's the limit!
|
||||
- **Custom Hooks:** Build and integrate custom RAG pipelines and more.
|
||||
|
||||
## 🔧 How It Works
|
||||
|
||||
Integrating Pipelines with any OpenAI API-compatible UI client is a breeze. Simply launch your Pipelines instance and set the OpenAI URL on your client to the Pipelines URL. That's it! You're now ready to leverage any Python library, whether you want an agent to manage your home or need a custom pipeline for your enterprise workflow.
|
||||
|
||||
## 🎉 Work in Progress
|
||||
|
||||
We’re continuously evolving! We'd love to hear your feedback and understand which hooks and features would best suit your use case. Feel free to reach out and become a part of our Open WebUI community!
|
||||
|
||||
Our vision is to push **Pipelines** to become the ultimate plugin framework for our AI interface, **Open WebUI**. Imagine **Open WebUI** as the WordPress of AI interfaces, with **Pipelines** being its diverse range of plugins. Join us on this exciting journey! 🌍
|
BIN
header.png
Normal file
BIN
header.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 28 KiB |
104
main.py
104
main.py
@ -23,30 +23,56 @@ import logging
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
PIPELINES = {}
|
||||
PIPELINE_MODULES = {}
|
||||
|
||||
|
||||
def load_modules_from_directory(directory):
|
||||
for filename in os.listdir(directory):
|
||||
if filename.endswith(".py"):
|
||||
module_name = filename[:-3] # Remove the .py extension
|
||||
module_path = os.path.join(directory, filename)
|
||||
spec = importlib.util.spec_from_file_location(module_name, module_path)
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(module)
|
||||
yield module
|
||||
def on_startup():
|
||||
def load_modules_from_directory(directory):
|
||||
for filename in os.listdir(directory):
|
||||
if filename.endswith(".py"):
|
||||
module_name = filename[:-3] # Remove the .py extension
|
||||
module_path = os.path.join(directory, filename)
|
||||
spec = importlib.util.spec_from_file_location(module_name, module_path)
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(module)
|
||||
yield module
|
||||
|
||||
for loaded_module in load_modules_from_directory("./pipelines"):
|
||||
# Do something with the loaded module
|
||||
logging.info("Loaded:", loaded_module.__name__)
|
||||
|
||||
pipeline = loaded_module.Pipeline()
|
||||
pipeline_id = pipeline.id if hasattr(pipeline, "id") else loaded_module.__name__
|
||||
|
||||
PIPELINE_MODULES[pipeline_id] = pipeline
|
||||
|
||||
if hasattr(pipeline, "manifold") and pipeline.manifold:
|
||||
for p in pipeline.pipelines:
|
||||
manifold_pipeline_id = f'{pipeline_id}.{p["id"]}'
|
||||
|
||||
manifold_pipeline_name = p["name"]
|
||||
if hasattr(pipeline, "name"):
|
||||
manifold_pipeline_name = f"{pipeline.name}{manifold_pipeline_name}"
|
||||
|
||||
PIPELINES[manifold_pipeline_id] = {
|
||||
"module": pipeline_id,
|
||||
"id": manifold_pipeline_id,
|
||||
"name": manifold_pipeline_name,
|
||||
"manifold": True,
|
||||
}
|
||||
else:
|
||||
PIPELINES[loaded_module.__name__] = {
|
||||
"module": pipeline_id,
|
||||
"id": pipeline_id,
|
||||
"name": (
|
||||
pipeline.name
|
||||
if hasattr(pipeline, "name")
|
||||
else loaded_module.__name__
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
for loaded_module in load_modules_from_directory("./pipelines"):
|
||||
# Do something with the loaded module
|
||||
logging.info("Loaded:", loaded_module.__name__)
|
||||
|
||||
pipeline = loaded_module.Pipeline()
|
||||
|
||||
PIPELINES[loaded_module.__name__] = {
|
||||
"module": pipeline,
|
||||
"id": pipeline.id if hasattr(pipeline, "id") else loaded_module.__name__,
|
||||
"name": pipeline.name if hasattr(pipeline, "name") else loaded_module.__name__,
|
||||
}
|
||||
on_startup()
|
||||
|
||||
|
||||
from contextlib import asynccontextmanager
|
||||
@ -54,15 +80,14 @@ from contextlib import asynccontextmanager
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
for pipeline in PIPELINES.values():
|
||||
if hasattr(pipeline["module"], "on_startup"):
|
||||
await pipeline["module"].on_startup()
|
||||
for module in PIPELINE_MODULES.values():
|
||||
if hasattr(module, "on_startup"):
|
||||
await module.on_startup()
|
||||
yield
|
||||
|
||||
for pipeline in PIPELINES.values():
|
||||
|
||||
if hasattr(pipeline["module"], "on_shutdown"):
|
||||
await pipeline["module"].on_shutdown()
|
||||
for module in PIPELINE_MODULES.values():
|
||||
if hasattr(module, "on_shutdown"):
|
||||
await module.on_shutdown()
|
||||
|
||||
|
||||
app = FastAPI(docs_url="/docs", redoc_url=None, lifespan=lifespan)
|
||||
@ -117,6 +142,7 @@ async def get_models():
|
||||
@app.post("/v1/chat/completions")
|
||||
async def generate_openai_chat_completion(form_data: OpenAIChatCompletionForm):
|
||||
user_message = get_last_user_message(form_data.messages)
|
||||
messages = [message.model_dump() for message in form_data.messages]
|
||||
|
||||
if form_data.model not in app.state.PIPELINES:
|
||||
return HTTPException(
|
||||
@ -125,15 +151,26 @@ async def generate_openai_chat_completion(form_data: OpenAIChatCompletionForm):
|
||||
)
|
||||
|
||||
def job():
|
||||
logging.info(form_data.model)
|
||||
get_response = app.state.PIPELINES[form_data.model]["module"].get_response
|
||||
print(form_data.model)
|
||||
|
||||
pipeline = app.state.PIPELINES[form_data.model]
|
||||
pipeline_id = form_data.model
|
||||
|
||||
print(pipeline_id)
|
||||
|
||||
if pipeline.get("manifold", False):
|
||||
manifold_id, pipeline_id = pipeline_id.split(".", 1)
|
||||
get_response = PIPELINE_MODULES[manifold_id].get_response
|
||||
else:
|
||||
get_response = PIPELINE_MODULES[pipeline_id].get_response
|
||||
|
||||
if form_data.stream:
|
||||
|
||||
def stream_content():
|
||||
res = get_response(
|
||||
user_message,
|
||||
messages=form_data.messages,
|
||||
user_message=user_message,
|
||||
model_id=pipeline_id,
|
||||
messages=messages,
|
||||
body=form_data.model_dump(),
|
||||
)
|
||||
|
||||
@ -185,8 +222,9 @@ async def generate_openai_chat_completion(form_data: OpenAIChatCompletionForm):
|
||||
return StreamingResponse(stream_content(), media_type="text/event-stream")
|
||||
else:
|
||||
res = get_response(
|
||||
user_message,
|
||||
messages=form_data.messages,
|
||||
user_message=user_message,
|
||||
model_id=pipeline_id,
|
||||
messages=messages,
|
||||
body=form_data.model_dump(),
|
||||
)
|
||||
logging.info(f"stream:false:{res}")
|
||||
|
@ -25,7 +25,7 @@ class Pipeline:
|
||||
pass
|
||||
|
||||
def get_response(
|
||||
self, user_message: str, messages: List[OpenAIChatMessage], body: dict
|
||||
self, user_message: str, model_id: str, messages: List[dict], body: dict
|
||||
) -> Union[str, Generator, Iterator]:
|
||||
# This is where you can add your custom pipelines like RAG.'
|
||||
print(f"get_response:{__name__}")
|
||||
|
@ -21,7 +21,7 @@ class Pipeline:
|
||||
pass
|
||||
|
||||
def get_response(
|
||||
self, user_message: str, messages: List[OpenAIChatMessage], body: dict
|
||||
self, user_message: str, model_id: str, messages: List[dict], body: dict
|
||||
) -> Union[str, Generator, Iterator]:
|
||||
# This is where you can add your custom pipelines like RAG.'
|
||||
print(f"get_response:{__name__}")
|
||||
|
@ -79,7 +79,7 @@ class Pipeline:
|
||||
pass
|
||||
|
||||
def get_response(
|
||||
self, user_message: str, messages: List[OpenAIChatMessage], body: dict
|
||||
self, user_message: str, model_id: str, messages: List[dict], body: dict
|
||||
) -> Union[str, Generator, Iterator]:
|
||||
# This is where you can add your custom RAG pipeline.
|
||||
# Typically, you would retrieve relevant information from your knowledge base and synthesize it to generate a response.
|
||||
|
@ -30,7 +30,7 @@ class Pipeline:
|
||||
pass
|
||||
|
||||
def get_response(
|
||||
self, user_message: str, messages: List[OpenAIChatMessage], body: dict
|
||||
self, user_message: str, model_id: str, messages: List[dict], body: dict
|
||||
) -> Union[str, Generator, Iterator]:
|
||||
# This is where you can add your custom pipelines like RAG.'
|
||||
print(f"get_response:{__name__}")
|
||||
@ -40,7 +40,7 @@ class Pipeline:
|
||||
print(body)
|
||||
|
||||
response = self.llm.create_chat_completion_openai_v1(
|
||||
messages=[message.model_dump() for message in messages],
|
||||
messages=messages,
|
||||
stream=body["stream"],
|
||||
)
|
||||
|
||||
|
@ -70,7 +70,7 @@ class Pipeline:
|
||||
pass
|
||||
|
||||
def get_response(
|
||||
self, user_message: str, messages: List[OpenAIChatMessage], body: dict
|
||||
self, user_message: str, model_id: str, messages: List[dict], body: dict
|
||||
) -> Union[str, Generator, Iterator]:
|
||||
# This is where you can add your custom RAG pipeline.
|
||||
# Typically, you would retrieve relevant information from your knowledge base and synthesize it to generate a response.
|
||||
|
@ -30,7 +30,7 @@ class Pipeline:
|
||||
pass
|
||||
|
||||
def get_response(
|
||||
self, user_message: str, messages: List[OpenAIChatMessage], body: dict
|
||||
self, user_message: str, model_id: str, messages: List[dict], body: dict
|
||||
) -> Union[str, Generator, Iterator]:
|
||||
# This is where you can add your custom RAG pipeline.
|
||||
# Typically, you would retrieve relevant information from your knowledge base and synthesize it to generate a response.
|
||||
|
@ -25,7 +25,7 @@ class Pipeline:
|
||||
pass
|
||||
|
||||
def get_response(
|
||||
self, user_message: str, messages: List[OpenAIChatMessage], body: dict
|
||||
self, user_message: str, model_id: str, messages: List[dict], body: dict
|
||||
) -> Union[str, Generator, Iterator]:
|
||||
# This is where you can add your custom RAG pipeline.
|
||||
# Typically, you would retrieve relevant information from your knowledge base and synthesize it to generate a response.
|
||||
|
47
pipelines/examples/manifold_pipeline.py
Normal file
47
pipelines/examples/manifold_pipeline.py
Normal file
@ -0,0 +1,47 @@
|
||||
from typing import List, Union, Generator, Iterator
|
||||
from schemas import OpenAIChatMessage
|
||||
|
||||
|
||||
class Pipeline:
|
||||
def __init__(self):
|
||||
# You can also set the pipelines that are available in this pipeline.
|
||||
# Set manifold to True if you want to use this pipeline as a manifold.
|
||||
# Manifold pipelines can have multiple pipelines.
|
||||
self.manifold = True
|
||||
|
||||
self.id = "manifold_pipeline"
|
||||
# Optionally, you can set the name of the manifold pipeline.
|
||||
self.name = "Manifold: "
|
||||
self.pipelines = [
|
||||
{
|
||||
"id": "pipeline-1", # This will turn into `manifold_pipeline.pipeline-1`
|
||||
"name": "Pipeline 1", # This will turn into `Manifold: Pipeline 1`
|
||||
},
|
||||
{
|
||||
"id": "pipeline-2",
|
||||
"name": "Pipeline 2",
|
||||
},
|
||||
]
|
||||
pass
|
||||
|
||||
async def on_startup(self):
|
||||
# This function is called when the server is started.
|
||||
print(f"on_startup:{__name__}")
|
||||
pass
|
||||
|
||||
async def on_shutdown(self):
|
||||
# This function is called when the server is stopped.
|
||||
print(f"on_shutdown:{__name__}")
|
||||
pass
|
||||
|
||||
def get_response(
|
||||
self, user_message: str, model_id: str, messages: List[dict], body: dict
|
||||
) -> Union[str, Generator, Iterator]:
|
||||
# This is where you can add your custom pipelines like RAG.'
|
||||
print(f"get_response:{__name__}")
|
||||
|
||||
print(messages)
|
||||
print(user_message)
|
||||
print(body)
|
||||
|
||||
return f"{model_id} response to: {user_message}"
|
@ -64,7 +64,7 @@ class Pipeline:
|
||||
print(f"Failed to terminate subprocess: {e}")
|
||||
|
||||
def get_response(
|
||||
self, user_message: str, messages: List[OpenAIChatMessage], body: dict
|
||||
self, user_message: str, model_id: str, messages: List[dict], body: dict
|
||||
) -> Union[str, Generator, Iterator]:
|
||||
# This is where you can add your custom pipelines like RAG.'
|
||||
print(f"get_response:{__name__}")
|
||||
|
63
pipelines/examples/ollama_manifold_pipeline.py
Normal file
63
pipelines/examples/ollama_manifold_pipeline.py
Normal file
@ -0,0 +1,63 @@
|
||||
from typing import List, Union, Generator, Iterator
|
||||
from schemas import OpenAIChatMessage
|
||||
import requests
|
||||
|
||||
|
||||
class Pipeline:
|
||||
def __init__(self):
|
||||
# You can also set the pipelines that are available in this pipeline.
|
||||
# Set manifold to True if you want to use this pipeline as a manifold.
|
||||
# Manifold pipelines can have multiple pipelines.
|
||||
self.manifold = True
|
||||
self.id = "ollama_manifold"
|
||||
# Optionally, you can set the name of the manifold pipeline.
|
||||
self.name = "Ollama: "
|
||||
|
||||
self.OLLAMA_BASE_URL = "http://localhost:11434"
|
||||
self.pipelines = self.get_ollama_models()
|
||||
pass
|
||||
|
||||
def get_ollama_models(self):
|
||||
r = requests.get(f"{self.OLLAMA_BASE_URL}/api/tags")
|
||||
models = r.json()
|
||||
|
||||
return [
|
||||
{"id": model["model"], "name": model["name"]} for model in models["models"]
|
||||
]
|
||||
|
||||
async def on_startup(self):
|
||||
# This function is called when the server is started.
|
||||
print(f"on_startup:{__name__}")
|
||||
pass
|
||||
|
||||
async def on_shutdown(self):
|
||||
# This function is called when the server is stopped.
|
||||
print(f"on_shutdown:{__name__}")
|
||||
pass
|
||||
|
||||
def get_response(
|
||||
self, user_message: str, model_id: str, messages: List[dict], body: dict
|
||||
) -> Union[str, Generator, Iterator]:
|
||||
# This is where you can add your custom pipelines like RAG.'
|
||||
|
||||
if "user" in body:
|
||||
print("######################################")
|
||||
print(f'# User: {body["user"]["name"]} ({body["user"]["id"]})')
|
||||
print(f"# Message: {user_message}")
|
||||
print("######################################")
|
||||
|
||||
try:
|
||||
r = requests.post(
|
||||
url=f"{self.OLLAMA_BASE_URL}/v1/chat/completions",
|
||||
json={**body, "model": model_id},
|
||||
stream=True,
|
||||
)
|
||||
|
||||
r.raise_for_status()
|
||||
|
||||
if body["stream"]:
|
||||
return r.iter_lines()
|
||||
else:
|
||||
return r.json()
|
||||
except Exception as e:
|
||||
return f"Error: {e}"
|
@ -21,7 +21,7 @@ class Pipeline:
|
||||
pass
|
||||
|
||||
def get_response(
|
||||
self, user_message: str, messages: List[OpenAIChatMessage], body: dict
|
||||
self, user_message: str, model_id: str, messages: List[dict], body: dict
|
||||
) -> Union[str, Generator, Iterator]:
|
||||
# This is where you can add your custom pipelines like RAG.'
|
||||
print(f"get_response:{__name__}")
|
||||
|
@ -21,7 +21,7 @@ class Pipeline:
|
||||
pass
|
||||
|
||||
def get_response(
|
||||
self, user_message: str, messages: List[OpenAIChatMessage], body: dict
|
||||
self, user_message: str, model_id: str, messages: List[dict], body: dict
|
||||
) -> Union[str, Generator, Iterator]:
|
||||
# This is where you can add your custom pipelines like RAG.'
|
||||
print(f"get_response:{__name__}")
|
||||
|
@ -20,7 +20,7 @@ class Pipeline:
|
||||
pass
|
||||
|
||||
def get_response(
|
||||
self, user_message: str, messages: List[OpenAIChatMessage], body: dict
|
||||
self, user_message: str, model_id: str, messages: List[dict], body: dict
|
||||
) -> Union[str, Generator, Iterator]:
|
||||
# This is where you can add your custom pipelines like RAG.'
|
||||
print(f"get_response:{__name__}")
|
||||
|
@ -31,7 +31,7 @@ class Pipeline:
|
||||
return e.output.strip(), e.returncode
|
||||
|
||||
def get_response(
|
||||
self, user_message: str, messages: List[OpenAIChatMessage], body: dict
|
||||
self, user_message: str, model_id: str, messages: List[dict], body: dict
|
||||
) -> Union[str, Generator, Iterator]:
|
||||
# This is where you can add your custom pipelines like RAG.'
|
||||
print(f"get_response:{__name__}")
|
||||
|
@ -21,7 +21,7 @@ class Pipeline:
|
||||
pass
|
||||
|
||||
def get_response(
|
||||
self, user_message: str, messages: List[OpenAIChatMessage], body: dict
|
||||
self, user_message: str, model_id: str, messages: List[dict], body: dict
|
||||
) -> Union[str, Generator, Iterator]:
|
||||
# This is where you can add your custom pipelines like RAG.'
|
||||
print(f"get_response:{__name__}")
|
||||
|
5
start.bat
Normal file
5
start.bat
Normal file
@ -0,0 +1,5 @@
|
||||
@echo off
|
||||
set PORT=9099
|
||||
set HOST=0.0.0.0
|
||||
|
||||
uvicorn main:app --host %HOST% --port %PORT% --forwarded-allow-ips '*'
|
2
utils.py
2
utils.py
@ -22,7 +22,7 @@ def stream_message_template(model: str, message: str):
|
||||
}
|
||||
|
||||
|
||||
def get_last_user_message(messages: List[OpenAIChatMessage]) -> str:
|
||||
def get_last_user_message(messages: List[dict]) -> str:
|
||||
for message in reversed(messages):
|
||||
if message.role == "user":
|
||||
return message.content
|
||||
|
Loading…
Reference in New Issue
Block a user