open-webui/backend/open_webui/routers/pipelines.py

493 lines
13 KiB
Python
Raw Permalink Normal View History

2024-12-12 02:16:07 +00:00
from fastapi import (
Depends,
FastAPI,
File,
Form,
HTTPException,
Request,
UploadFile,
status,
APIRouter,
)
import os
import logging
import shutil
import requests
2024-12-10 08:00:01 +00:00
from pydantic import BaseModel
from starlette.responses import FileResponse
2024-12-12 02:16:07 +00:00
from typing import Optional
2024-12-10 08:00:01 +00:00
2024-12-12 02:16:07 +00:00
from open_webui.env import SRC_LOG_LEVELS
from open_webui.config import CACHE_DIR
2024-12-10 08:00:01 +00:00
from open_webui.constants import ERROR_MESSAGES
2024-12-12 02:16:07 +00:00
from open_webui.routers.openai import get_all_models_responses
2024-12-10 08:00:01 +00:00
from open_webui.utils.auth import get_admin_user
2024-12-12 02:16:07 +00:00
log = logging.getLogger(__name__)
log.setLevel(SRC_LOG_LEVELS["MAIN"])
2024-12-10 08:00:01 +00:00
2024-12-12 03:52:46 +00:00
##################################
#
# Pipeline Middleware
#
##################################
def get_sorted_filters(model_id, models):
filters = [
model
for model in models.values()
if "pipeline" in model
and "type" in model["pipeline"]
and model["pipeline"]["type"] == "filter"
and (
model["pipeline"]["pipelines"] == ["*"]
or any(
model_id == target_model_id
for target_model_id in model["pipeline"]["pipelines"]
)
)
]
sorted_filters = sorted(filters, key=lambda x: x["pipeline"]["priority"])
return sorted_filters
def process_pipeline_inlet_filter(request, payload, user, models):
user = {"id": user.id, "email": user.email, "name": user.name, "role": user.role}
model_id = payload["model"]
sorted_filters = get_sorted_filters(model_id, models)
model = models[model_id]
if "pipeline" in model:
sorted_filters.append(model)
for filter in sorted_filters:
r = None
try:
urlIdx = filter["urlIdx"]
url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx]
key = request.app.state.config.OPENAI_API_KEYS[urlIdx]
if key == "":
continue
headers = {"Authorization": f"Bearer {key}"}
r = requests.post(
f"{url}/{filter['id']}/filter/inlet",
headers=headers,
json={
"user": user,
"body": payload,
},
)
r.raise_for_status()
payload = r.json()
except Exception as e:
# Handle connection error here
print(f"Connection error: {e}")
if r is not None:
res = r.json()
if "detail" in res:
raise Exception(r.status_code, res["detail"])
return payload
def process_pipeline_outlet_filter(request, payload, user, models):
user = {"id": user.id, "email": user.email, "name": user.name, "role": user.role}
model_id = payload["model"]
sorted_filters = get_sorted_filters(model_id, models)
model = models[model_id]
if "pipeline" in model:
sorted_filters = [model] + sorted_filters
for filter in sorted_filters:
r = None
try:
urlIdx = filter["urlIdx"]
url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx]
key = request.app.state.config.OPENAI_API_KEYS[urlIdx]
if key != "":
r = requests.post(
f"{url}/{filter['id']}/filter/outlet",
headers={"Authorization": f"Bearer {key}"},
json={
2024-12-27 03:10:28 +00:00
"user": user,
"body": payload,
2024-12-12 03:52:46 +00:00
},
)
r.raise_for_status()
data = r.json()
2024-12-27 03:10:28 +00:00
payload = data
2024-12-12 03:52:46 +00:00
except Exception as e:
# Handle connection error here
print(f"Connection error: {e}")
if r is not None:
try:
res = r.json()
if "detail" in res:
return Exception(r.status_code, res)
except Exception:
pass
else:
pass
return payload
2024-12-10 08:54:13 +00:00
##################################
#
# Pipelines Endpoints
#
##################################
2024-12-12 02:16:07 +00:00
router = APIRouter()
2024-12-10 08:54:13 +00:00
2024-12-12 03:52:46 +00:00
@router.get("/list")
2024-12-12 02:16:07 +00:00
async def get_pipelines_list(request: Request, user=Depends(get_admin_user)):
responses = await get_all_models_responses(request)
2024-12-10 08:54:13 +00:00
log.debug(f"get_pipelines_list: get_openai_models_responses returned {responses}")
2024-12-12 02:16:07 +00:00
2024-12-10 08:54:13 +00:00
urlIdxs = [
idx
for idx, response in enumerate(responses)
if response is not None and "pipelines" in response
]
return {
"data": [
{
2024-12-12 02:16:07 +00:00
"url": request.app.state.config.OPENAI_API_BASE_URLS[urlIdx],
2024-12-10 08:54:13 +00:00
"idx": urlIdx,
}
for urlIdx in urlIdxs
]
}
2024-12-12 03:52:46 +00:00
@router.post("/upload")
2024-12-10 08:54:13 +00:00
async def upload_pipeline(
2024-12-12 02:16:07 +00:00
request: Request,
urlIdx: int = Form(...),
file: UploadFile = File(...),
user=Depends(get_admin_user),
2024-12-10 08:00:01 +00:00
):
2024-12-10 08:54:13 +00:00
print("upload_pipeline", urlIdx, file.filename)
# Check if the uploaded file is a python file
if not (file.filename and file.filename.endswith(".py")):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Only Python (.py) files are allowed.",
)
upload_folder = f"{CACHE_DIR}/pipelines"
os.makedirs(upload_folder, exist_ok=True)
file_path = os.path.join(upload_folder, file.filename)
r = None
try:
# Save the uploaded file
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
2024-12-12 02:16:07 +00:00
url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx]
key = request.app.state.config.OPENAI_API_KEYS[urlIdx]
2024-12-10 08:54:13 +00:00
with open(file_path, "rb") as f:
files = {"file": f}
2024-12-12 02:16:07 +00:00
r = requests.post(
f"{url}/pipelines/upload",
headers={"Authorization": f"Bearer {key}"},
files=files,
)
2024-12-10 08:54:13 +00:00
r.raise_for_status()
data = r.json()
return {**data}
except Exception as e:
# Handle connection error here
print(f"Connection error: {e}")
2024-12-10 08:00:01 +00:00
2024-12-12 02:16:07 +00:00
detail = None
2024-12-10 08:54:13 +00:00
status_code = status.HTTP_404_NOT_FOUND
if r is not None:
status_code = r.status_code
try:
res = r.json()
if "detail" in res:
detail = res["detail"]
except Exception:
pass
raise HTTPException(
status_code=status_code,
2024-12-12 02:16:07 +00:00
detail=detail if detail else "Pipeline not found",
2024-12-10 08:54:13 +00:00
)
finally:
# Ensure the file is deleted after the upload is completed or on failure
if os.path.exists(file_path):
os.remove(file_path)
2024-12-10 08:00:01 +00:00
2024-12-10 08:54:13 +00:00
class AddPipelineForm(BaseModel):
url: str
urlIdx: int
2024-12-10 08:00:01 +00:00
2024-12-10 08:54:13 +00:00
2024-12-12 03:52:46 +00:00
@router.post("/add")
2024-12-12 02:16:07 +00:00
async def add_pipeline(
request: Request, form_data: AddPipelineForm, user=Depends(get_admin_user)
):
2024-12-10 08:54:13 +00:00
r = None
2024-12-10 08:00:01 +00:00
try:
2024-12-10 08:54:13 +00:00
urlIdx = form_data.urlIdx
2024-12-12 02:16:07 +00:00
url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx]
key = request.app.state.config.OPENAI_API_KEYS[urlIdx]
2024-12-10 08:54:13 +00:00
r = requests.post(
2024-12-12 02:16:07 +00:00
f"{url}/pipelines/add",
headers={"Authorization": f"Bearer {key}"},
json={"url": form_data.url},
2024-12-10 08:54:13 +00:00
)
r.raise_for_status()
data = r.json()
return {**data}
2024-12-10 08:00:01 +00:00
except Exception as e:
2024-12-10 08:54:13 +00:00
# Handle connection error here
print(f"Connection error: {e}")
2024-12-12 02:16:07 +00:00
detail = None
2024-12-10 08:54:13 +00:00
if r is not None:
try:
res = r.json()
if "detail" in res:
detail = res["detail"]
except Exception:
pass
2024-12-10 08:00:01 +00:00
2024-12-10 08:54:13 +00:00
raise HTTPException(
status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND),
2024-12-12 02:16:07 +00:00
detail=detail if detail else "Pipeline not found",
2024-12-10 08:54:13 +00:00
)
2024-12-10 08:00:01 +00:00
2024-12-10 08:54:13 +00:00
class DeletePipelineForm(BaseModel):
id: str
urlIdx: int
2024-12-10 08:00:01 +00:00
2024-12-12 03:52:46 +00:00
@router.delete("/delete")
2024-12-12 02:16:07 +00:00
async def delete_pipeline(
request: Request, form_data: DeletePipelineForm, user=Depends(get_admin_user)
):
2024-12-10 08:54:13 +00:00
r = None
try:
urlIdx = form_data.urlIdx
2024-12-12 02:16:07 +00:00
url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx]
key = request.app.state.config.OPENAI_API_KEYS[urlIdx]
2024-12-10 08:54:13 +00:00
r = requests.delete(
2024-12-12 02:16:07 +00:00
f"{url}/pipelines/delete",
headers={"Authorization": f"Bearer {key}"},
json={"id": form_data.id},
2024-12-10 08:54:13 +00:00
)
r.raise_for_status()
data = r.json()
return {**data}
except Exception as e:
# Handle connection error here
print(f"Connection error: {e}")
2024-12-12 02:16:07 +00:00
detail = None
2024-12-10 08:54:13 +00:00
if r is not None:
try:
res = r.json()
if "detail" in res:
detail = res["detail"]
except Exception:
pass
raise HTTPException(
status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND),
2024-12-12 02:16:07 +00:00
detail=detail if detail else "Pipeline not found",
2024-12-10 08:54:13 +00:00
)
2024-12-12 03:52:46 +00:00
@router.get("/")
2024-12-12 02:16:07 +00:00
async def get_pipelines(
request: Request, urlIdx: Optional[int] = None, user=Depends(get_admin_user)
):
2024-12-10 08:54:13 +00:00
r = None
try:
2024-12-12 02:16:07 +00:00
url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx]
key = request.app.state.config.OPENAI_API_KEYS[urlIdx]
2024-12-10 08:54:13 +00:00
2024-12-12 02:16:07 +00:00
r = requests.get(f"{url}/pipelines", headers={"Authorization": f"Bearer {key}"})
2024-12-10 08:54:13 +00:00
r.raise_for_status()
data = r.json()
return {**data}
except Exception as e:
# Handle connection error here
print(f"Connection error: {e}")
2024-12-10 08:00:01 +00:00
2024-12-12 02:16:07 +00:00
detail = None
2024-12-10 08:54:13 +00:00
if r is not None:
try:
res = r.json()
if "detail" in res:
detail = res["detail"]
except Exception:
pass
2024-12-10 08:00:01 +00:00
2024-12-10 08:54:13 +00:00
raise HTTPException(
status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND),
2024-12-12 02:16:07 +00:00
detail=detail if detail else "Pipeline not found",
2024-12-10 08:54:13 +00:00
)
2024-12-10 08:00:01 +00:00
2024-12-10 08:54:13 +00:00
2024-12-12 03:52:46 +00:00
@router.get("/{pipeline_id}/valves")
2024-12-10 08:54:13 +00:00
async def get_pipeline_valves(
2024-12-12 02:16:07 +00:00
request: Request,
2024-12-10 08:54:13 +00:00
urlIdx: Optional[int],
pipeline_id: str,
user=Depends(get_admin_user),
2024-12-10 08:00:01 +00:00
):
2024-12-10 08:54:13 +00:00
r = None
2024-12-10 08:00:01 +00:00
try:
2024-12-12 02:16:07 +00:00
url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx]
key = request.app.state.config.OPENAI_API_KEYS[urlIdx]
2024-12-10 08:54:13 +00:00
2024-12-12 02:16:07 +00:00
r = requests.get(
f"{url}/{pipeline_id}/valves", headers={"Authorization": f"Bearer {key}"}
)
2024-12-10 08:54:13 +00:00
r.raise_for_status()
data = r.json()
return {**data}
except Exception as e:
# Handle connection error here
print(f"Connection error: {e}")
2024-12-12 02:16:07 +00:00
detail = None
2024-12-10 08:54:13 +00:00
if r is not None:
try:
res = r.json()
if "detail" in res:
detail = res["detail"]
except Exception:
pass
2024-12-10 08:00:01 +00:00
2024-12-10 08:54:13 +00:00
raise HTTPException(
status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND),
2024-12-12 02:16:07 +00:00
detail=detail if detail else "Pipeline not found",
2024-12-10 08:00:01 +00:00
)
2024-12-10 08:54:13 +00:00
2024-12-12 03:52:46 +00:00
@router.get("/{pipeline_id}/valves/spec")
2024-12-10 08:54:13 +00:00
async def get_pipeline_valves_spec(
2024-12-12 02:16:07 +00:00
request: Request,
2024-12-10 08:54:13 +00:00
urlIdx: Optional[int],
pipeline_id: str,
user=Depends(get_admin_user),
):
r = None
try:
2024-12-12 02:16:07 +00:00
url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx]
key = request.app.state.config.OPENAI_API_KEYS[urlIdx]
2024-12-10 08:54:13 +00:00
2024-12-12 02:16:07 +00:00
r = requests.get(
f"{url}/{pipeline_id}/valves/spec",
headers={"Authorization": f"Bearer {key}"},
)
2024-12-10 08:54:13 +00:00
r.raise_for_status()
data = r.json()
return {**data}
2024-12-10 08:00:01 +00:00
except Exception as e:
2024-12-10 08:54:13 +00:00
# Handle connection error here
print(f"Connection error: {e}")
2024-12-10 08:00:01 +00:00
2024-12-12 02:16:07 +00:00
detail = None
2024-12-10 08:54:13 +00:00
if r is not None:
try:
res = r.json()
if "detail" in res:
detail = res["detail"]
except Exception:
pass
2024-12-10 08:00:01 +00:00
raise HTTPException(
2024-12-10 08:54:13 +00:00
status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND),
2024-12-12 02:16:07 +00:00
detail=detail if detail else "Pipeline not found",
2024-12-10 08:54:13 +00:00
)
2024-12-12 03:52:46 +00:00
@router.post("/{pipeline_id}/valves/update")
2024-12-10 08:54:13 +00:00
async def update_pipeline_valves(
2024-12-12 02:16:07 +00:00
request: Request,
2024-12-10 08:54:13 +00:00
urlIdx: Optional[int],
pipeline_id: str,
form_data: dict,
user=Depends(get_admin_user),
):
r = None
try:
2024-12-12 02:16:07 +00:00
url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx]
key = request.app.state.config.OPENAI_API_KEYS[urlIdx]
2024-12-10 08:54:13 +00:00
r = requests.post(
f"{url}/{pipeline_id}/valves/update",
2024-12-12 02:16:07 +00:00
headers={"Authorization": f"Bearer {key}"},
2024-12-10 08:54:13 +00:00
json={**form_data},
2024-12-10 08:00:01 +00:00
)
2024-12-10 08:54:13 +00:00
r.raise_for_status()
data = r.json()
return {**data}
except Exception as e:
# Handle connection error here
print(f"Connection error: {e}")
2024-12-12 02:16:07 +00:00
detail = None
2024-12-10 08:54:13 +00:00
if r is not None:
try:
res = r.json()
if "detail" in res:
detail = res["detail"]
except Exception:
pass
2024-12-10 08:00:01 +00:00
raise HTTPException(
2024-12-10 08:54:13 +00:00
status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND),
2024-12-12 02:16:07 +00:00
detail=detail if detail else "Pipeline not found",
2024-12-10 08:00:01 +00:00
)