From eff0a968c785779bb75296925cdb4b6e5fbfe898 Mon Sep 17 00:00:00 2001 From: "Timothy J. Baek" Date: Sun, 26 May 2024 16:04:44 -0700 Subject: [PATCH] feat: manifold --- main.py | 67 ++++++++++++------- pipelines/examples/applescript_pipeline.py | 2 +- pipelines/examples/azure_openai_pipeline.py | 2 +- pipelines/examples/haystack_pipeline.py | 2 +- pipelines/examples/llama_cpp_pipeline.py | 2 +- .../llamaindex_ollama_github_pipeline.py | 2 +- .../examples/llamaindex_ollama_pipeline.py | 2 +- pipelines/examples/llamaindex_pipeline.py | 2 +- pipelines/examples/manifold_pipeline.py | 43 ++++++++++++ pipelines/examples/mlx_pipeline.py | 2 +- pipelines/examples/ollama_pipeline.py | 2 +- pipelines/examples/openai_pipeline.py | 2 +- pipelines/examples/pipeline_example.py | 2 +- pipelines/examples/python_code_pipeline.py | 2 +- pipelines/manifold_pipeline.py | 45 +++++++++++++ pipelines/ollama_pipeline.py | 2 +- 16 files changed, 144 insertions(+), 37 deletions(-) create mode 100644 pipelines/examples/manifold_pipeline.py create mode 100644 pipelines/manifold_pipeline.py diff --git a/main.py b/main.py index aef7888..47f61b5 100644 --- a/main.py +++ b/main.py @@ -25,28 +25,46 @@ from concurrent.futures import ThreadPoolExecutor PIPELINES = {} -def load_modules_from_directory(directory): - for filename in os.listdir(directory): - if filename.endswith(".py"): - module_name = filename[:-3] # Remove the .py extension - module_path = os.path.join(directory, filename) - spec = importlib.util.spec_from_file_location(module_name, module_path) - module = importlib.util.module_from_spec(spec) - spec.loader.exec_module(module) - yield module +def on_startup(): + def load_modules_from_directory(directory): + for filename in os.listdir(directory): + if filename.endswith(".py"): + module_name = filename[:-3] # Remove the .py extension + module_path = os.path.join(directory, filename) + spec = importlib.util.spec_from_file_location(module_name, module_path) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + yield module + + for loaded_module in load_modules_from_directory("./pipelines"): + # Do something with the loaded module + logging.info("Loaded:", loaded_module.__name__) + + pipeline = loaded_module.Pipeline() + + pipeline_id = pipeline.id if hasattr(pipeline, "id") else loaded_module.__name__ + + if hasattr(pipeline, "manifold") and pipeline.manifold: + for p in pipeline.pipelines: + manifold_pipeline_id = f'{pipeline_id}.{p["id"]}' + PIPELINES[manifold_pipeline_id] = { + "module": pipeline, + "id": manifold_pipeline_id, + "name": p["name"], + } + else: + PIPELINES[loaded_module.__name__] = { + "module": pipeline, + "id": pipeline_id, + "name": ( + pipeline.name + if hasattr(pipeline, "name") + else loaded_module.__name__ + ), + } -for loaded_module in load_modules_from_directory("./pipelines"): - # Do something with the loaded module - logging.info("Loaded:", loaded_module.__name__) - - pipeline = loaded_module.Pipeline() - - PIPELINES[loaded_module.__name__] = { - "module": pipeline, - "id": pipeline.id if hasattr(pipeline, "id") else loaded_module.__name__, - "name": pipeline.name if hasattr(pipeline, "name") else loaded_module.__name__, - } +on_startup() from contextlib import asynccontextmanager @@ -60,7 +78,6 @@ async def lifespan(app: FastAPI): yield for pipeline in PIPELINES.values(): - if hasattr(pipeline["module"], "on_shutdown"): await pipeline["module"].on_shutdown() @@ -126,14 +143,15 @@ async def generate_openai_chat_completion(form_data: OpenAIChatCompletionForm): ) def job(): - logging.info(form_data.model) + print(form_data.model) get_response = app.state.PIPELINES[form_data.model]["module"].get_response if form_data.stream: def stream_content(): res = get_response( - user_message, + user_message=user_message, + model_id=form_data.model, messages=messages, body=form_data.model_dump(), ) @@ -186,7 +204,8 @@ async def generate_openai_chat_completion(form_data: OpenAIChatCompletionForm): return StreamingResponse(stream_content(), media_type="text/event-stream") else: res = get_response( - user_message, + user_message=user_message, + model_id=form_data.model, messages=messages, body=form_data.model_dump(), ) diff --git a/pipelines/examples/applescript_pipeline.py b/pipelines/examples/applescript_pipeline.py index 3453ddc..c61bd6c 100644 --- a/pipelines/examples/applescript_pipeline.py +++ b/pipelines/examples/applescript_pipeline.py @@ -25,7 +25,7 @@ class Pipeline: pass def get_response( - self, user_message: str, messages: List[dict], body: dict + self, user_message: str, model_id: str, messages: List[dict], body: dict ) -> Union[str, Generator, Iterator]: # This is where you can add your custom pipelines like RAG.' print(f"get_response:{__name__}") diff --git a/pipelines/examples/azure_openai_pipeline.py b/pipelines/examples/azure_openai_pipeline.py index 6b746d0..ed6b298 100644 --- a/pipelines/examples/azure_openai_pipeline.py +++ b/pipelines/examples/azure_openai_pipeline.py @@ -21,7 +21,7 @@ class Pipeline: pass def get_response( - self, user_message: str, messages: List[dict], body: dict + self, user_message: str, model_id: str, messages: List[dict], body: dict ) -> Union[str, Generator, Iterator]: # This is where you can add your custom pipelines like RAG.' print(f"get_response:{__name__}") diff --git a/pipelines/examples/haystack_pipeline.py b/pipelines/examples/haystack_pipeline.py index 650a771..91c62ad 100644 --- a/pipelines/examples/haystack_pipeline.py +++ b/pipelines/examples/haystack_pipeline.py @@ -79,7 +79,7 @@ class Pipeline: pass def get_response( - self, user_message: str, messages: List[dict], body: dict + self, user_message: str, model_id: str, messages: List[dict], body: dict ) -> Union[str, Generator, Iterator]: # This is where you can add your custom RAG pipeline. # Typically, you would retrieve relevant information from your knowledge base and synthesize it to generate a response. diff --git a/pipelines/examples/llama_cpp_pipeline.py b/pipelines/examples/llama_cpp_pipeline.py index 47bf630..a35623c 100644 --- a/pipelines/examples/llama_cpp_pipeline.py +++ b/pipelines/examples/llama_cpp_pipeline.py @@ -30,7 +30,7 @@ class Pipeline: pass def get_response( - self, user_message: str, messages: List[dict], body: dict + self, user_message: str, model_id: str, messages: List[dict], body: dict ) -> Union[str, Generator, Iterator]: # This is where you can add your custom pipelines like RAG.' print(f"get_response:{__name__}") diff --git a/pipelines/examples/llamaindex_ollama_github_pipeline.py b/pipelines/examples/llamaindex_ollama_github_pipeline.py index 2f091d6..992bb0f 100644 --- a/pipelines/examples/llamaindex_ollama_github_pipeline.py +++ b/pipelines/examples/llamaindex_ollama_github_pipeline.py @@ -70,7 +70,7 @@ class Pipeline: pass def get_response( - self, user_message: str, messages: List[dict], body: dict + self, user_message: str, model_id: str, messages: List[dict], body: dict ) -> Union[str, Generator, Iterator]: # This is where you can add your custom RAG pipeline. # Typically, you would retrieve relevant information from your knowledge base and synthesize it to generate a response. diff --git a/pipelines/examples/llamaindex_ollama_pipeline.py b/pipelines/examples/llamaindex_ollama_pipeline.py index 19ed721..2ea1638 100644 --- a/pipelines/examples/llamaindex_ollama_pipeline.py +++ b/pipelines/examples/llamaindex_ollama_pipeline.py @@ -30,7 +30,7 @@ class Pipeline: pass def get_response( - self, user_message: str, messages: List[dict], body: dict + self, user_message: str, model_id: str, messages: List[dict], body: dict ) -> Union[str, Generator, Iterator]: # This is where you can add your custom RAG pipeline. # Typically, you would retrieve relevant information from your knowledge base and synthesize it to generate a response. diff --git a/pipelines/examples/llamaindex_pipeline.py b/pipelines/examples/llamaindex_pipeline.py index 195e3f8..d6fccf9 100644 --- a/pipelines/examples/llamaindex_pipeline.py +++ b/pipelines/examples/llamaindex_pipeline.py @@ -25,7 +25,7 @@ class Pipeline: pass def get_response( - self, user_message: str, messages: List[dict], body: dict + self, user_message: str, model_id: str, messages: List[dict], body: dict ) -> Union[str, Generator, Iterator]: # This is where you can add your custom RAG pipeline. # Typically, you would retrieve relevant information from your knowledge base and synthesize it to generate a response. diff --git a/pipelines/examples/manifold_pipeline.py b/pipelines/examples/manifold_pipeline.py new file mode 100644 index 0000000..c0eda65 --- /dev/null +++ b/pipelines/examples/manifold_pipeline.py @@ -0,0 +1,43 @@ +from typing import List, Union, Generator, Iterator +from schemas import OpenAIChatMessage + + +class Pipeline: + def __init__(self): + # Optionally, you can set the id and name of the pipeline. + self.id = "pipeline_example" + self.name = "Pipeline Example" + # You can also set the pipelines that are available in this pipeline. + self.pipelines = [ + { + "id": "pipeline-1", + "name": "Pipeline 1", + }, + { + "id": "pipeline-2", + "name": "Pipeline 2", + }, + ] + pass + + async def on_startup(self): + # This function is called when the server is started. + print(f"on_startup:{__name__}") + pass + + async def on_shutdown(self): + # This function is called when the server is stopped. + print(f"on_shutdown:{__name__}") + pass + + def get_response( + self, user_message: str, model_id: str, messages: List[dict], body: dict + ) -> Union[str, Generator, Iterator]: + # This is where you can add your custom pipelines like RAG.' + print(f"get_response:{__name__}") + + print(messages) + print(user_message) + print(body) + + return f"{__name__} response to: {user_message}" diff --git a/pipelines/examples/mlx_pipeline.py b/pipelines/examples/mlx_pipeline.py index 8487d8e..a79dcd6 100644 --- a/pipelines/examples/mlx_pipeline.py +++ b/pipelines/examples/mlx_pipeline.py @@ -73,7 +73,7 @@ class Pipeline: print(f"Failed to terminate subprocess: {e}") def get_response( - self, user_message: str, messages: List[dict], body: dict + self, user_message: str, model_id: str, messages: List[dict], body: dict ) -> Union[str, Generator, Iterator]: # This is where you can add your custom pipelines like RAG.' print(f"get_response:{__name__}") diff --git a/pipelines/examples/ollama_pipeline.py b/pipelines/examples/ollama_pipeline.py index 876380a..53da4c8 100644 --- a/pipelines/examples/ollama_pipeline.py +++ b/pipelines/examples/ollama_pipeline.py @@ -21,7 +21,7 @@ class Pipeline: pass def get_response( - self, user_message: str, messages: List[dict], body: dict + self, user_message: str, model_id: str, messages: List[dict], body: dict ) -> Union[str, Generator, Iterator]: # This is where you can add your custom pipelines like RAG.' print(f"get_response:{__name__}") diff --git a/pipelines/examples/openai_pipeline.py b/pipelines/examples/openai_pipeline.py index f4273e7..98a42d5 100644 --- a/pipelines/examples/openai_pipeline.py +++ b/pipelines/examples/openai_pipeline.py @@ -21,7 +21,7 @@ class Pipeline: pass def get_response( - self, user_message: str, messages: List[dict], body: dict + self, user_message: str, model_id: str, messages: List[dict], body: dict ) -> Union[str, Generator, Iterator]: # This is where you can add your custom pipelines like RAG.' print(f"get_response:{__name__}") diff --git a/pipelines/examples/pipeline_example.py b/pipelines/examples/pipeline_example.py index 1015ba8..0fba805 100644 --- a/pipelines/examples/pipeline_example.py +++ b/pipelines/examples/pipeline_example.py @@ -20,7 +20,7 @@ class Pipeline: pass def get_response( - self, user_message: str, messages: List[dict], body: dict + self, user_message: str, model_id: str, messages: List[dict], body: dict ) -> Union[str, Generator, Iterator]: # This is where you can add your custom pipelines like RAG.' print(f"get_response:{__name__}") diff --git a/pipelines/examples/python_code_pipeline.py b/pipelines/examples/python_code_pipeline.py index 8d3aa20..7e35655 100644 --- a/pipelines/examples/python_code_pipeline.py +++ b/pipelines/examples/python_code_pipeline.py @@ -31,7 +31,7 @@ class Pipeline: return e.output.strip(), e.returncode def get_response( - self, user_message: str, messages: List[dict], body: dict + self, user_message: str, model_id: str, messages: List[dict], body: dict ) -> Union[str, Generator, Iterator]: # This is where you can add your custom pipelines like RAG.' print(f"get_response:{__name__}") diff --git a/pipelines/manifold_pipeline.py b/pipelines/manifold_pipeline.py new file mode 100644 index 0000000..b5d141c --- /dev/null +++ b/pipelines/manifold_pipeline.py @@ -0,0 +1,45 @@ +from typing import List, Union, Generator, Iterator +from schemas import OpenAIChatMessage + + +class Pipeline: + def __init__(self): + self.id = "manifold_pipeline" + self.name = "Manifold Pipeline" + # You can also set the pipelines that are available in this pipeline. + # Set manifold to True if you want to use this pipeline as a manifold. + # Manifold pipelines can have multiple pipelines. + self.manifold = True + self.pipelines = [ + { + "id": "pipeline-1", # This will turn into `manifold_pipeline.pipeline-1` + "name": "Manifold: Pipeline 1", + }, + { + "id": "pipeline-2", + "name": "Manifold: Pipeline 2", + }, + ] + pass + + async def on_startup(self): + # This function is called when the server is started. + print(f"on_startup:{__name__}") + pass + + async def on_shutdown(self): + # This function is called when the server is stopped. + print(f"on_shutdown:{__name__}") + pass + + def get_response( + self, user_message: str, model_id: str, messages: List[dict], body: dict + ) -> Union[str, Generator, Iterator]: + # This is where you can add your custom pipelines like RAG.' + print(f"get_response:{__name__}") + + print(messages) + print(user_message) + print(body) + + return f"{model_id} response to: {user_message}" diff --git a/pipelines/ollama_pipeline.py b/pipelines/ollama_pipeline.py index 876380a..53da4c8 100644 --- a/pipelines/ollama_pipeline.py +++ b/pipelines/ollama_pipeline.py @@ -21,7 +21,7 @@ class Pipeline: pass def get_response( - self, user_message: str, messages: List[dict], body: dict + self, user_message: str, model_id: str, messages: List[dict], body: dict ) -> Union[str, Generator, Iterator]: # This is where you can add your custom pipelines like RAG.' print(f"get_response:{__name__}")