mirror of
https://github.com/open-webui/pipelines
synced 2025-05-15 09:55:45 +00:00
feat: manifold
This commit is contained in:
parent
da2aa97297
commit
eff0a968c7
67
main.py
67
main.py
@ -25,28 +25,46 @@ from concurrent.futures import ThreadPoolExecutor
|
|||||||
PIPELINES = {}
|
PIPELINES = {}
|
||||||
|
|
||||||
|
|
||||||
def load_modules_from_directory(directory):
|
def on_startup():
|
||||||
for filename in os.listdir(directory):
|
def load_modules_from_directory(directory):
|
||||||
if filename.endswith(".py"):
|
for filename in os.listdir(directory):
|
||||||
module_name = filename[:-3] # Remove the .py extension
|
if filename.endswith(".py"):
|
||||||
module_path = os.path.join(directory, filename)
|
module_name = filename[:-3] # Remove the .py extension
|
||||||
spec = importlib.util.spec_from_file_location(module_name, module_path)
|
module_path = os.path.join(directory, filename)
|
||||||
module = importlib.util.module_from_spec(spec)
|
spec = importlib.util.spec_from_file_location(module_name, module_path)
|
||||||
spec.loader.exec_module(module)
|
module = importlib.util.module_from_spec(spec)
|
||||||
yield module
|
spec.loader.exec_module(module)
|
||||||
|
yield module
|
||||||
|
|
||||||
|
for loaded_module in load_modules_from_directory("./pipelines"):
|
||||||
|
# Do something with the loaded module
|
||||||
|
logging.info("Loaded:", loaded_module.__name__)
|
||||||
|
|
||||||
|
pipeline = loaded_module.Pipeline()
|
||||||
|
|
||||||
|
pipeline_id = pipeline.id if hasattr(pipeline, "id") else loaded_module.__name__
|
||||||
|
|
||||||
|
if hasattr(pipeline, "manifold") and pipeline.manifold:
|
||||||
|
for p in pipeline.pipelines:
|
||||||
|
manifold_pipeline_id = f'{pipeline_id}.{p["id"]}'
|
||||||
|
PIPELINES[manifold_pipeline_id] = {
|
||||||
|
"module": pipeline,
|
||||||
|
"id": manifold_pipeline_id,
|
||||||
|
"name": p["name"],
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
PIPELINES[loaded_module.__name__] = {
|
||||||
|
"module": pipeline,
|
||||||
|
"id": pipeline_id,
|
||||||
|
"name": (
|
||||||
|
pipeline.name
|
||||||
|
if hasattr(pipeline, "name")
|
||||||
|
else loaded_module.__name__
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
for loaded_module in load_modules_from_directory("./pipelines"):
|
on_startup()
|
||||||
# Do something with the loaded module
|
|
||||||
logging.info("Loaded:", loaded_module.__name__)
|
|
||||||
|
|
||||||
pipeline = loaded_module.Pipeline()
|
|
||||||
|
|
||||||
PIPELINES[loaded_module.__name__] = {
|
|
||||||
"module": pipeline,
|
|
||||||
"id": pipeline.id if hasattr(pipeline, "id") else loaded_module.__name__,
|
|
||||||
"name": pipeline.name if hasattr(pipeline, "name") else loaded_module.__name__,
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
@ -60,7 +78,6 @@ async def lifespan(app: FastAPI):
|
|||||||
yield
|
yield
|
||||||
|
|
||||||
for pipeline in PIPELINES.values():
|
for pipeline in PIPELINES.values():
|
||||||
|
|
||||||
if hasattr(pipeline["module"], "on_shutdown"):
|
if hasattr(pipeline["module"], "on_shutdown"):
|
||||||
await pipeline["module"].on_shutdown()
|
await pipeline["module"].on_shutdown()
|
||||||
|
|
||||||
@ -126,14 +143,15 @@ async def generate_openai_chat_completion(form_data: OpenAIChatCompletionForm):
|
|||||||
)
|
)
|
||||||
|
|
||||||
def job():
|
def job():
|
||||||
logging.info(form_data.model)
|
print(form_data.model)
|
||||||
get_response = app.state.PIPELINES[form_data.model]["module"].get_response
|
get_response = app.state.PIPELINES[form_data.model]["module"].get_response
|
||||||
|
|
||||||
if form_data.stream:
|
if form_data.stream:
|
||||||
|
|
||||||
def stream_content():
|
def stream_content():
|
||||||
res = get_response(
|
res = get_response(
|
||||||
user_message,
|
user_message=user_message,
|
||||||
|
model_id=form_data.model,
|
||||||
messages=messages,
|
messages=messages,
|
||||||
body=form_data.model_dump(),
|
body=form_data.model_dump(),
|
||||||
)
|
)
|
||||||
@ -186,7 +204,8 @@ async def generate_openai_chat_completion(form_data: OpenAIChatCompletionForm):
|
|||||||
return StreamingResponse(stream_content(), media_type="text/event-stream")
|
return StreamingResponse(stream_content(), media_type="text/event-stream")
|
||||||
else:
|
else:
|
||||||
res = get_response(
|
res = get_response(
|
||||||
user_message,
|
user_message=user_message,
|
||||||
|
model_id=form_data.model,
|
||||||
messages=messages,
|
messages=messages,
|
||||||
body=form_data.model_dump(),
|
body=form_data.model_dump(),
|
||||||
)
|
)
|
||||||
|
@ -25,7 +25,7 @@ class Pipeline:
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def get_response(
|
def get_response(
|
||||||
self, user_message: str, messages: List[dict], body: dict
|
self, user_message: str, model_id: str, messages: List[dict], body: dict
|
||||||
) -> Union[str, Generator, Iterator]:
|
) -> Union[str, Generator, Iterator]:
|
||||||
# This is where you can add your custom pipelines like RAG.'
|
# This is where you can add your custom pipelines like RAG.'
|
||||||
print(f"get_response:{__name__}")
|
print(f"get_response:{__name__}")
|
||||||
|
@ -21,7 +21,7 @@ class Pipeline:
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def get_response(
|
def get_response(
|
||||||
self, user_message: str, messages: List[dict], body: dict
|
self, user_message: str, model_id: str, messages: List[dict], body: dict
|
||||||
) -> Union[str, Generator, Iterator]:
|
) -> Union[str, Generator, Iterator]:
|
||||||
# This is where you can add your custom pipelines like RAG.'
|
# This is where you can add your custom pipelines like RAG.'
|
||||||
print(f"get_response:{__name__}")
|
print(f"get_response:{__name__}")
|
||||||
|
@ -79,7 +79,7 @@ class Pipeline:
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def get_response(
|
def get_response(
|
||||||
self, user_message: str, messages: List[dict], body: dict
|
self, user_message: str, model_id: str, messages: List[dict], body: dict
|
||||||
) -> Union[str, Generator, Iterator]:
|
) -> Union[str, Generator, Iterator]:
|
||||||
# This is where you can add your custom RAG pipeline.
|
# This is where you can add your custom RAG pipeline.
|
||||||
# Typically, you would retrieve relevant information from your knowledge base and synthesize it to generate a response.
|
# Typically, you would retrieve relevant information from your knowledge base and synthesize it to generate a response.
|
||||||
|
@ -30,7 +30,7 @@ class Pipeline:
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def get_response(
|
def get_response(
|
||||||
self, user_message: str, messages: List[dict], body: dict
|
self, user_message: str, model_id: str, messages: List[dict], body: dict
|
||||||
) -> Union[str, Generator, Iterator]:
|
) -> Union[str, Generator, Iterator]:
|
||||||
# This is where you can add your custom pipelines like RAG.'
|
# This is where you can add your custom pipelines like RAG.'
|
||||||
print(f"get_response:{__name__}")
|
print(f"get_response:{__name__}")
|
||||||
|
@ -70,7 +70,7 @@ class Pipeline:
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def get_response(
|
def get_response(
|
||||||
self, user_message: str, messages: List[dict], body: dict
|
self, user_message: str, model_id: str, messages: List[dict], body: dict
|
||||||
) -> Union[str, Generator, Iterator]:
|
) -> Union[str, Generator, Iterator]:
|
||||||
# This is where you can add your custom RAG pipeline.
|
# This is where you can add your custom RAG pipeline.
|
||||||
# Typically, you would retrieve relevant information from your knowledge base and synthesize it to generate a response.
|
# Typically, you would retrieve relevant information from your knowledge base and synthesize it to generate a response.
|
||||||
|
@ -30,7 +30,7 @@ class Pipeline:
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def get_response(
|
def get_response(
|
||||||
self, user_message: str, messages: List[dict], body: dict
|
self, user_message: str, model_id: str, messages: List[dict], body: dict
|
||||||
) -> Union[str, Generator, Iterator]:
|
) -> Union[str, Generator, Iterator]:
|
||||||
# This is where you can add your custom RAG pipeline.
|
# This is where you can add your custom RAG pipeline.
|
||||||
# Typically, you would retrieve relevant information from your knowledge base and synthesize it to generate a response.
|
# Typically, you would retrieve relevant information from your knowledge base and synthesize it to generate a response.
|
||||||
|
@ -25,7 +25,7 @@ class Pipeline:
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def get_response(
|
def get_response(
|
||||||
self, user_message: str, messages: List[dict], body: dict
|
self, user_message: str, model_id: str, messages: List[dict], body: dict
|
||||||
) -> Union[str, Generator, Iterator]:
|
) -> Union[str, Generator, Iterator]:
|
||||||
# This is where you can add your custom RAG pipeline.
|
# This is where you can add your custom RAG pipeline.
|
||||||
# Typically, you would retrieve relevant information from your knowledge base and synthesize it to generate a response.
|
# Typically, you would retrieve relevant information from your knowledge base and synthesize it to generate a response.
|
||||||
|
43
pipelines/examples/manifold_pipeline.py
Normal file
43
pipelines/examples/manifold_pipeline.py
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
from typing import List, Union, Generator, Iterator
|
||||||
|
from schemas import OpenAIChatMessage
|
||||||
|
|
||||||
|
|
||||||
|
class Pipeline:
|
||||||
|
def __init__(self):
|
||||||
|
# Optionally, you can set the id and name of the pipeline.
|
||||||
|
self.id = "pipeline_example"
|
||||||
|
self.name = "Pipeline Example"
|
||||||
|
# You can also set the pipelines that are available in this pipeline.
|
||||||
|
self.pipelines = [
|
||||||
|
{
|
||||||
|
"id": "pipeline-1",
|
||||||
|
"name": "Pipeline 1",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"id": "pipeline-2",
|
||||||
|
"name": "Pipeline 2",
|
||||||
|
},
|
||||||
|
]
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def on_startup(self):
|
||||||
|
# This function is called when the server is started.
|
||||||
|
print(f"on_startup:{__name__}")
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def on_shutdown(self):
|
||||||
|
# This function is called when the server is stopped.
|
||||||
|
print(f"on_shutdown:{__name__}")
|
||||||
|
pass
|
||||||
|
|
||||||
|
def get_response(
|
||||||
|
self, user_message: str, model_id: str, messages: List[dict], body: dict
|
||||||
|
) -> Union[str, Generator, Iterator]:
|
||||||
|
# This is where you can add your custom pipelines like RAG.'
|
||||||
|
print(f"get_response:{__name__}")
|
||||||
|
|
||||||
|
print(messages)
|
||||||
|
print(user_message)
|
||||||
|
print(body)
|
||||||
|
|
||||||
|
return f"{__name__} response to: {user_message}"
|
@ -73,7 +73,7 @@ class Pipeline:
|
|||||||
print(f"Failed to terminate subprocess: {e}")
|
print(f"Failed to terminate subprocess: {e}")
|
||||||
|
|
||||||
def get_response(
|
def get_response(
|
||||||
self, user_message: str, messages: List[dict], body: dict
|
self, user_message: str, model_id: str, messages: List[dict], body: dict
|
||||||
) -> Union[str, Generator, Iterator]:
|
) -> Union[str, Generator, Iterator]:
|
||||||
# This is where you can add your custom pipelines like RAG.'
|
# This is where you can add your custom pipelines like RAG.'
|
||||||
print(f"get_response:{__name__}")
|
print(f"get_response:{__name__}")
|
||||||
|
@ -21,7 +21,7 @@ class Pipeline:
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def get_response(
|
def get_response(
|
||||||
self, user_message: str, messages: List[dict], body: dict
|
self, user_message: str, model_id: str, messages: List[dict], body: dict
|
||||||
) -> Union[str, Generator, Iterator]:
|
) -> Union[str, Generator, Iterator]:
|
||||||
# This is where you can add your custom pipelines like RAG.'
|
# This is where you can add your custom pipelines like RAG.'
|
||||||
print(f"get_response:{__name__}")
|
print(f"get_response:{__name__}")
|
||||||
|
@ -21,7 +21,7 @@ class Pipeline:
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def get_response(
|
def get_response(
|
||||||
self, user_message: str, messages: List[dict], body: dict
|
self, user_message: str, model_id: str, messages: List[dict], body: dict
|
||||||
) -> Union[str, Generator, Iterator]:
|
) -> Union[str, Generator, Iterator]:
|
||||||
# This is where you can add your custom pipelines like RAG.'
|
# This is where you can add your custom pipelines like RAG.'
|
||||||
print(f"get_response:{__name__}")
|
print(f"get_response:{__name__}")
|
||||||
|
@ -20,7 +20,7 @@ class Pipeline:
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def get_response(
|
def get_response(
|
||||||
self, user_message: str, messages: List[dict], body: dict
|
self, user_message: str, model_id: str, messages: List[dict], body: dict
|
||||||
) -> Union[str, Generator, Iterator]:
|
) -> Union[str, Generator, Iterator]:
|
||||||
# This is where you can add your custom pipelines like RAG.'
|
# This is where you can add your custom pipelines like RAG.'
|
||||||
print(f"get_response:{__name__}")
|
print(f"get_response:{__name__}")
|
||||||
|
@ -31,7 +31,7 @@ class Pipeline:
|
|||||||
return e.output.strip(), e.returncode
|
return e.output.strip(), e.returncode
|
||||||
|
|
||||||
def get_response(
|
def get_response(
|
||||||
self, user_message: str, messages: List[dict], body: dict
|
self, user_message: str, model_id: str, messages: List[dict], body: dict
|
||||||
) -> Union[str, Generator, Iterator]:
|
) -> Union[str, Generator, Iterator]:
|
||||||
# This is where you can add your custom pipelines like RAG.'
|
# This is where you can add your custom pipelines like RAG.'
|
||||||
print(f"get_response:{__name__}")
|
print(f"get_response:{__name__}")
|
||||||
|
45
pipelines/manifold_pipeline.py
Normal file
45
pipelines/manifold_pipeline.py
Normal file
@ -0,0 +1,45 @@
|
|||||||
|
from typing import List, Union, Generator, Iterator
|
||||||
|
from schemas import OpenAIChatMessage
|
||||||
|
|
||||||
|
|
||||||
|
class Pipeline:
|
||||||
|
def __init__(self):
|
||||||
|
self.id = "manifold_pipeline"
|
||||||
|
self.name = "Manifold Pipeline"
|
||||||
|
# You can also set the pipelines that are available in this pipeline.
|
||||||
|
# Set manifold to True if you want to use this pipeline as a manifold.
|
||||||
|
# Manifold pipelines can have multiple pipelines.
|
||||||
|
self.manifold = True
|
||||||
|
self.pipelines = [
|
||||||
|
{
|
||||||
|
"id": "pipeline-1", # This will turn into `manifold_pipeline.pipeline-1`
|
||||||
|
"name": "Manifold: Pipeline 1",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"id": "pipeline-2",
|
||||||
|
"name": "Manifold: Pipeline 2",
|
||||||
|
},
|
||||||
|
]
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def on_startup(self):
|
||||||
|
# This function is called when the server is started.
|
||||||
|
print(f"on_startup:{__name__}")
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def on_shutdown(self):
|
||||||
|
# This function is called when the server is stopped.
|
||||||
|
print(f"on_shutdown:{__name__}")
|
||||||
|
pass
|
||||||
|
|
||||||
|
def get_response(
|
||||||
|
self, user_message: str, model_id: str, messages: List[dict], body: dict
|
||||||
|
) -> Union[str, Generator, Iterator]:
|
||||||
|
# This is where you can add your custom pipelines like RAG.'
|
||||||
|
print(f"get_response:{__name__}")
|
||||||
|
|
||||||
|
print(messages)
|
||||||
|
print(user_message)
|
||||||
|
print(body)
|
||||||
|
|
||||||
|
return f"{model_id} response to: {user_message}"
|
@ -21,7 +21,7 @@ class Pipeline:
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def get_response(
|
def get_response(
|
||||||
self, user_message: str, messages: List[dict], body: dict
|
self, user_message: str, model_id: str, messages: List[dict], body: dict
|
||||||
) -> Union[str, Generator, Iterator]:
|
) -> Union[str, Generator, Iterator]:
|
||||||
# This is where you can add your custom pipelines like RAG.'
|
# This is where you can add your custom pipelines like RAG.'
|
||||||
print(f"get_response:{__name__}")
|
print(f"get_response:{__name__}")
|
||||||
|
Loading…
Reference in New Issue
Block a user