diff --git a/README.md b/README.md new file mode 100644 index 0000000..26e7282 --- /dev/null +++ b/README.md @@ -0,0 +1,23 @@ +

+ Pipelines Logo +

+ +# Pipelines: UI-Agnostic OpenAI API Plugin Framework + +Welcome to **Pipelines**, [Open WebUI](https://github.com/open-webui) initiative that brings modular, customizable workflows to any UI client supporting OpenAI API specs – and much more! Dive into a world where you can effortlessly extend functionalities, integrate unique logic, and create dynamic agentic workflows, all with a few lines of code. + +## πŸš€ Why Pipelines? + +- **Seamless Integration:** Compatible with any UI/client that supports OpenAI API specs. +- **Endless Possibilities:** Got a specific need? Pipelines make it easy to add your custom logic and functionalities. Integrate any Python library, from AI agents via libraries like CrewAI to API calls for home automation – the sky's the limit! +- **Custom Hooks:** Build and integrate custom RAG pipelines and more. + +## πŸ”§ How It Works + +Integrating Pipelines with any OpenAI API-compatible UI client is a breeze. Simply launch your Pipelines instance and set the OpenAI URL on your client to the Pipelines URL. That's it! You're now ready to leverage any Python library, whether you want an agent to manage your home or need a custom pipeline for your enterprise workflow. + +## πŸŽ‰ Work in Progress + +We’re continuously evolving! We'd love to hear your feedback and understand which hooks and features would best suit your use case. Feel free to reach out and become a part of our Open WebUI community! + +Our vision is to push **Pipelines** to become the ultimate plugin framework for our AI interface, **Open WebUI**. Imagine **Open WebUI** as the WordPress of AI interfaces, with **Pipelines** being its diverse range of plugins. Join us on this exciting journey! 🌍 diff --git a/header.png b/header.png new file mode 100644 index 0000000..2f2e9b1 Binary files /dev/null and b/header.png differ diff --git a/main.py b/main.py index 63234e7..7796aa2 100644 --- a/main.py +++ b/main.py @@ -23,30 +23,56 @@ import logging from concurrent.futures import ThreadPoolExecutor PIPELINES = {} +PIPELINE_MODULES = {} -def load_modules_from_directory(directory): - for filename in os.listdir(directory): - if filename.endswith(".py"): - module_name = filename[:-3] # Remove the .py extension - module_path = os.path.join(directory, filename) - spec = importlib.util.spec_from_file_location(module_name, module_path) - module = importlib.util.module_from_spec(spec) - spec.loader.exec_module(module) - yield module +def on_startup(): + def load_modules_from_directory(directory): + for filename in os.listdir(directory): + if filename.endswith(".py"): + module_name = filename[:-3] # Remove the .py extension + module_path = os.path.join(directory, filename) + spec = importlib.util.spec_from_file_location(module_name, module_path) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + yield module + + for loaded_module in load_modules_from_directory("./pipelines"): + # Do something with the loaded module + logging.info("Loaded:", loaded_module.__name__) + + pipeline = loaded_module.Pipeline() + pipeline_id = pipeline.id if hasattr(pipeline, "id") else loaded_module.__name__ + + PIPELINE_MODULES[pipeline_id] = pipeline + + if hasattr(pipeline, "manifold") and pipeline.manifold: + for p in pipeline.pipelines: + manifold_pipeline_id = f'{pipeline_id}.{p["id"]}' + + manifold_pipeline_name = p["name"] + if hasattr(pipeline, "name"): + manifold_pipeline_name = f"{pipeline.name}{manifold_pipeline_name}" + + PIPELINES[manifold_pipeline_id] = { + "module": pipeline_id, + "id": manifold_pipeline_id, + "name": manifold_pipeline_name, + "manifold": True, + } + else: + PIPELINES[loaded_module.__name__] = { + "module": pipeline_id, + "id": pipeline_id, + "name": ( + pipeline.name + if hasattr(pipeline, "name") + else loaded_module.__name__ + ), + } -for loaded_module in load_modules_from_directory("./pipelines"): - # Do something with the loaded module - logging.info("Loaded:", loaded_module.__name__) - - pipeline = loaded_module.Pipeline() - - PIPELINES[loaded_module.__name__] = { - "module": pipeline, - "id": pipeline.id if hasattr(pipeline, "id") else loaded_module.__name__, - "name": pipeline.name if hasattr(pipeline, "name") else loaded_module.__name__, - } +on_startup() from contextlib import asynccontextmanager @@ -54,15 +80,14 @@ from contextlib import asynccontextmanager @asynccontextmanager async def lifespan(app: FastAPI): - for pipeline in PIPELINES.values(): - if hasattr(pipeline["module"], "on_startup"): - await pipeline["module"].on_startup() + for module in PIPELINE_MODULES.values(): + if hasattr(module, "on_startup"): + await module.on_startup() yield - for pipeline in PIPELINES.values(): - - if hasattr(pipeline["module"], "on_shutdown"): - await pipeline["module"].on_shutdown() + for module in PIPELINE_MODULES.values(): + if hasattr(module, "on_shutdown"): + await module.on_shutdown() app = FastAPI(docs_url="/docs", redoc_url=None, lifespan=lifespan) @@ -117,6 +142,7 @@ async def get_models(): @app.post("/v1/chat/completions") async def generate_openai_chat_completion(form_data: OpenAIChatCompletionForm): user_message = get_last_user_message(form_data.messages) + messages = [message.model_dump() for message in form_data.messages] if form_data.model not in app.state.PIPELINES: return HTTPException( @@ -125,15 +151,26 @@ async def generate_openai_chat_completion(form_data: OpenAIChatCompletionForm): ) def job(): - logging.info(form_data.model) - get_response = app.state.PIPELINES[form_data.model]["module"].get_response + print(form_data.model) + + pipeline = app.state.PIPELINES[form_data.model] + pipeline_id = form_data.model + + print(pipeline_id) + + if pipeline.get("manifold", False): + manifold_id, pipeline_id = pipeline_id.split(".", 1) + get_response = PIPELINE_MODULES[manifold_id].get_response + else: + get_response = PIPELINE_MODULES[pipeline_id].get_response if form_data.stream: def stream_content(): res = get_response( - user_message, - messages=form_data.messages, + user_message=user_message, + model_id=pipeline_id, + messages=messages, body=form_data.model_dump(), ) @@ -185,8 +222,9 @@ async def generate_openai_chat_completion(form_data: OpenAIChatCompletionForm): return StreamingResponse(stream_content(), media_type="text/event-stream") else: res = get_response( - user_message, - messages=form_data.messages, + user_message=user_message, + model_id=pipeline_id, + messages=messages, body=form_data.model_dump(), ) logging.info(f"stream:false:{res}") diff --git a/pipelines/examples/applescript_pipeline.py b/pipelines/examples/applescript_pipeline.py index d2c28f6..c61bd6c 100644 --- a/pipelines/examples/applescript_pipeline.py +++ b/pipelines/examples/applescript_pipeline.py @@ -25,7 +25,7 @@ class Pipeline: pass def get_response( - self, user_message: str, messages: List[OpenAIChatMessage], body: dict + self, user_message: str, model_id: str, messages: List[dict], body: dict ) -> Union[str, Generator, Iterator]: # This is where you can add your custom pipelines like RAG.' print(f"get_response:{__name__}") diff --git a/pipelines/examples/azure_openai_pipeline.py b/pipelines/examples/azure_openai_pipeline.py index 0052760..ed6b298 100644 --- a/pipelines/examples/azure_openai_pipeline.py +++ b/pipelines/examples/azure_openai_pipeline.py @@ -21,7 +21,7 @@ class Pipeline: pass def get_response( - self, user_message: str, messages: List[OpenAIChatMessage], body: dict + self, user_message: str, model_id: str, messages: List[dict], body: dict ) -> Union[str, Generator, Iterator]: # This is where you can add your custom pipelines like RAG.' print(f"get_response:{__name__}") diff --git a/pipelines/examples/haystack_pipeline.py b/pipelines/examples/haystack_pipeline.py index cea62ff..91c62ad 100644 --- a/pipelines/examples/haystack_pipeline.py +++ b/pipelines/examples/haystack_pipeline.py @@ -79,7 +79,7 @@ class Pipeline: pass def get_response( - self, user_message: str, messages: List[OpenAIChatMessage], body: dict + self, user_message: str, model_id: str, messages: List[dict], body: dict ) -> Union[str, Generator, Iterator]: # This is where you can add your custom RAG pipeline. # Typically, you would retrieve relevant information from your knowledge base and synthesize it to generate a response. diff --git a/pipelines/examples/llama_cpp_pipeline.py b/pipelines/examples/llama_cpp_pipeline.py index 2032558..a35623c 100644 --- a/pipelines/examples/llama_cpp_pipeline.py +++ b/pipelines/examples/llama_cpp_pipeline.py @@ -30,7 +30,7 @@ class Pipeline: pass def get_response( - self, user_message: str, messages: List[OpenAIChatMessage], body: dict + self, user_message: str, model_id: str, messages: List[dict], body: dict ) -> Union[str, Generator, Iterator]: # This is where you can add your custom pipelines like RAG.' print(f"get_response:{__name__}") @@ -40,7 +40,7 @@ class Pipeline: print(body) response = self.llm.create_chat_completion_openai_v1( - messages=[message.model_dump() for message in messages], + messages=messages, stream=body["stream"], ) diff --git a/pipelines/examples/llamaindex_ollama_github_pipeline.py b/pipelines/examples/llamaindex_ollama_github_pipeline.py index 17d1461..992bb0f 100644 --- a/pipelines/examples/llamaindex_ollama_github_pipeline.py +++ b/pipelines/examples/llamaindex_ollama_github_pipeline.py @@ -70,7 +70,7 @@ class Pipeline: pass def get_response( - self, user_message: str, messages: List[OpenAIChatMessage], body: dict + self, user_message: str, model_id: str, messages: List[dict], body: dict ) -> Union[str, Generator, Iterator]: # This is where you can add your custom RAG pipeline. # Typically, you would retrieve relevant information from your knowledge base and synthesize it to generate a response. diff --git a/pipelines/examples/llamaindex_ollama_pipeline.py b/pipelines/examples/llamaindex_ollama_pipeline.py index 089c499..2ea1638 100644 --- a/pipelines/examples/llamaindex_ollama_pipeline.py +++ b/pipelines/examples/llamaindex_ollama_pipeline.py @@ -30,7 +30,7 @@ class Pipeline: pass def get_response( - self, user_message: str, messages: List[OpenAIChatMessage], body: dict + self, user_message: str, model_id: str, messages: List[dict], body: dict ) -> Union[str, Generator, Iterator]: # This is where you can add your custom RAG pipeline. # Typically, you would retrieve relevant information from your knowledge base and synthesize it to generate a response. diff --git a/pipelines/examples/llamaindex_pipeline.py b/pipelines/examples/llamaindex_pipeline.py index 8dbce35..d6fccf9 100644 --- a/pipelines/examples/llamaindex_pipeline.py +++ b/pipelines/examples/llamaindex_pipeline.py @@ -25,7 +25,7 @@ class Pipeline: pass def get_response( - self, user_message: str, messages: List[OpenAIChatMessage], body: dict + self, user_message: str, model_id: str, messages: List[dict], body: dict ) -> Union[str, Generator, Iterator]: # This is where you can add your custom RAG pipeline. # Typically, you would retrieve relevant information from your knowledge base and synthesize it to generate a response. diff --git a/pipelines/examples/manifold_pipeline.py b/pipelines/examples/manifold_pipeline.py new file mode 100644 index 0000000..99e71e2 --- /dev/null +++ b/pipelines/examples/manifold_pipeline.py @@ -0,0 +1,47 @@ +from typing import List, Union, Generator, Iterator +from schemas import OpenAIChatMessage + + +class Pipeline: + def __init__(self): + # You can also set the pipelines that are available in this pipeline. + # Set manifold to True if you want to use this pipeline as a manifold. + # Manifold pipelines can have multiple pipelines. + self.manifold = True + + self.id = "manifold_pipeline" + # Optionally, you can set the name of the manifold pipeline. + self.name = "Manifold: " + self.pipelines = [ + { + "id": "pipeline-1", # This will turn into `manifold_pipeline.pipeline-1` + "name": "Pipeline 1", # This will turn into `Manifold: Pipeline 1` + }, + { + "id": "pipeline-2", + "name": "Pipeline 2", + }, + ] + pass + + async def on_startup(self): + # This function is called when the server is started. + print(f"on_startup:{__name__}") + pass + + async def on_shutdown(self): + # This function is called when the server is stopped. + print(f"on_shutdown:{__name__}") + pass + + def get_response( + self, user_message: str, model_id: str, messages: List[dict], body: dict + ) -> Union[str, Generator, Iterator]: + # This is where you can add your custom pipelines like RAG.' + print(f"get_response:{__name__}") + + print(messages) + print(user_message) + print(body) + + return f"{model_id} response to: {user_message}" diff --git a/pipelines/examples/mlx_pipeline.py b/pipelines/examples/mlx_pipeline.py index 03403a9..38a8f79 100644 --- a/pipelines/examples/mlx_pipeline.py +++ b/pipelines/examples/mlx_pipeline.py @@ -64,7 +64,7 @@ class Pipeline: print(f"Failed to terminate subprocess: {e}") def get_response( - self, user_message: str, messages: List[OpenAIChatMessage], body: dict + self, user_message: str, model_id: str, messages: List[dict], body: dict ) -> Union[str, Generator, Iterator]: # This is where you can add your custom pipelines like RAG.' print(f"get_response:{__name__}") diff --git a/pipelines/examples/ollama_manifold_pipeline.py b/pipelines/examples/ollama_manifold_pipeline.py new file mode 100644 index 0000000..752fb6e --- /dev/null +++ b/pipelines/examples/ollama_manifold_pipeline.py @@ -0,0 +1,63 @@ +from typing import List, Union, Generator, Iterator +from schemas import OpenAIChatMessage +import requests + + +class Pipeline: + def __init__(self): + # You can also set the pipelines that are available in this pipeline. + # Set manifold to True if you want to use this pipeline as a manifold. + # Manifold pipelines can have multiple pipelines. + self.manifold = True + self.id = "ollama_manifold" + # Optionally, you can set the name of the manifold pipeline. + self.name = "Ollama: " + + self.OLLAMA_BASE_URL = "http://localhost:11434" + self.pipelines = self.get_ollama_models() + pass + + def get_ollama_models(self): + r = requests.get(f"{self.OLLAMA_BASE_URL}/api/tags") + models = r.json() + + return [ + {"id": model["model"], "name": model["name"]} for model in models["models"] + ] + + async def on_startup(self): + # This function is called when the server is started. + print(f"on_startup:{__name__}") + pass + + async def on_shutdown(self): + # This function is called when the server is stopped. + print(f"on_shutdown:{__name__}") + pass + + def get_response( + self, user_message: str, model_id: str, messages: List[dict], body: dict + ) -> Union[str, Generator, Iterator]: + # This is where you can add your custom pipelines like RAG.' + + if "user" in body: + print("######################################") + print(f'# User: {body["user"]["name"]} ({body["user"]["id"]})') + print(f"# Message: {user_message}") + print("######################################") + + try: + r = requests.post( + url=f"{self.OLLAMA_BASE_URL}/v1/chat/completions", + json={**body, "model": model_id}, + stream=True, + ) + + r.raise_for_status() + + if body["stream"]: + return r.iter_lines() + else: + return r.json() + except Exception as e: + return f"Error: {e}" diff --git a/pipelines/examples/ollama_pipeline.py b/pipelines/examples/ollama_pipeline.py index 437461d..53da4c8 100644 --- a/pipelines/examples/ollama_pipeline.py +++ b/pipelines/examples/ollama_pipeline.py @@ -21,7 +21,7 @@ class Pipeline: pass def get_response( - self, user_message: str, messages: List[OpenAIChatMessage], body: dict + self, user_message: str, model_id: str, messages: List[dict], body: dict ) -> Union[str, Generator, Iterator]: # This is where you can add your custom pipelines like RAG.' print(f"get_response:{__name__}") diff --git a/pipelines/examples/openai_pipeline.py b/pipelines/examples/openai_pipeline.py index 1c712e4..98a42d5 100644 --- a/pipelines/examples/openai_pipeline.py +++ b/pipelines/examples/openai_pipeline.py @@ -21,7 +21,7 @@ class Pipeline: pass def get_response( - self, user_message: str, messages: List[OpenAIChatMessage], body: dict + self, user_message: str, model_id: str, messages: List[dict], body: dict ) -> Union[str, Generator, Iterator]: # This is where you can add your custom pipelines like RAG.' print(f"get_response:{__name__}") diff --git a/pipelines/examples/pipeline_example.py b/pipelines/examples/pipeline_example.py index a0d1f95..0fba805 100644 --- a/pipelines/examples/pipeline_example.py +++ b/pipelines/examples/pipeline_example.py @@ -20,7 +20,7 @@ class Pipeline: pass def get_response( - self, user_message: str, messages: List[OpenAIChatMessage], body: dict + self, user_message: str, model_id: str, messages: List[dict], body: dict ) -> Union[str, Generator, Iterator]: # This is where you can add your custom pipelines like RAG.' print(f"get_response:{__name__}") diff --git a/pipelines/examples/python_code_pipeline.py b/pipelines/examples/python_code_pipeline.py index cfe1408..7e35655 100644 --- a/pipelines/examples/python_code_pipeline.py +++ b/pipelines/examples/python_code_pipeline.py @@ -31,7 +31,7 @@ class Pipeline: return e.output.strip(), e.returncode def get_response( - self, user_message: str, messages: List[OpenAIChatMessage], body: dict + self, user_message: str, model_id: str, messages: List[dict], body: dict ) -> Union[str, Generator, Iterator]: # This is where you can add your custom pipelines like RAG.' print(f"get_response:{__name__}") diff --git a/pipelines/ollama_pipeline.py b/pipelines/ollama_pipeline.py index 437461d..53da4c8 100644 --- a/pipelines/ollama_pipeline.py +++ b/pipelines/ollama_pipeline.py @@ -21,7 +21,7 @@ class Pipeline: pass def get_response( - self, user_message: str, messages: List[OpenAIChatMessage], body: dict + self, user_message: str, model_id: str, messages: List[dict], body: dict ) -> Union[str, Generator, Iterator]: # This is where you can add your custom pipelines like RAG.' print(f"get_response:{__name__}") diff --git a/readme.md b/readme.md deleted file mode 100644 index 80d2f6e..0000000 --- a/readme.md +++ /dev/null @@ -1 +0,0 @@ -# OpenAI API Compatible Plugin Server diff --git a/start.bat b/start.bat new file mode 100644 index 0000000..05c5795 --- /dev/null +++ b/start.bat @@ -0,0 +1,5 @@ +@echo off +set PORT=9099 +set HOST=0.0.0.0 + +uvicorn main:app --host %HOST% --port %PORT% --forwarded-allow-ips '*' \ No newline at end of file diff --git a/utils.py b/utils.py index 97b22fc..a50cddf 100644 --- a/utils.py +++ b/utils.py @@ -22,7 +22,7 @@ def stream_message_template(model: str, message: str): } -def get_last_user_message(messages: List[OpenAIChatMessage]) -> str: +def get_last_user_message(messages: List[dict]) -> str: for message in reversed(messages): if message.role == "user": return message.content