from fastapi import ( Depends, FastAPI, File, Form, HTTPException, Request, UploadFile, status, APIRouter, ) import os import logging import shutil import requests from pydantic import BaseModel from starlette.responses import FileResponse from typing import Optional from open_webui.env import SRC_LOG_LEVELS from open_webui.config import CACHE_DIR from open_webui.constants import ERROR_MESSAGES from open_webui.routers.openai import get_all_models_responses from open_webui.utils.auth import get_admin_user log = logging.getLogger(__name__) log.setLevel(SRC_LOG_LEVELS["MAIN"]) ################################## # # Pipeline Middleware # ################################## def get_sorted_filters(model_id, models): filters = [ model for model in models.values() if "pipeline" in model and "type" in model["pipeline"] and model["pipeline"]["type"] == "filter" and ( model["pipeline"]["pipelines"] == ["*"] or any( model_id == target_model_id for target_model_id in model["pipeline"]["pipelines"] ) ) ] sorted_filters = sorted(filters, key=lambda x: x["pipeline"]["priority"]) return sorted_filters def process_pipeline_inlet_filter(request, payload, user, models): user = {"id": user.id, "email": user.email, "name": user.name, "role": user.role} model_id = payload["model"] sorted_filters = get_sorted_filters(model_id, models) model = models[model_id] if "pipeline" in model: sorted_filters.append(model) for filter in sorted_filters: r = None try: urlIdx = filter["urlIdx"] url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx] key = request.app.state.config.OPENAI_API_KEYS[urlIdx] if key == "": continue headers = {"Authorization": f"Bearer {key}"} r = requests.post( f"{url}/{filter['id']}/filter/inlet", headers=headers, json={ "user": user, "body": payload, }, ) r.raise_for_status() payload = r.json() except Exception as e: # Handle connection error here print(f"Connection error: {e}") if r is not None: res = r.json() if "detail" in res: raise Exception(r.status_code, res["detail"]) return payload def process_pipeline_outlet_filter(request, payload, user, models): user = {"id": user.id, "email": user.email, "name": user.name, "role": user.role} model_id = payload["model"] sorted_filters = get_sorted_filters(model_id, models) model = models[model_id] if "pipeline" in model: sorted_filters = [model] + sorted_filters for filter in sorted_filters: r = None try: urlIdx = filter["urlIdx"] url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx] key = request.app.state.config.OPENAI_API_KEYS[urlIdx] if key != "": r = requests.post( f"{url}/{filter['id']}/filter/outlet", headers={"Authorization": f"Bearer {key}"}, json={ "user": { "id": user.id, "name": user.name, "email": user.email, "role": user.role, }, "body": data, }, ) r.raise_for_status() data = r.json() except Exception as e: # Handle connection error here print(f"Connection error: {e}") if r is not None: try: res = r.json() if "detail" in res: return Exception(r.status_code, res) except Exception: pass else: pass return payload ################################## # # Pipelines Endpoints # ################################## router = APIRouter() @router.get("/list") async def get_pipelines_list(request: Request, user=Depends(get_admin_user)): responses = await get_all_models_responses(request) log.debug(f"get_pipelines_list: get_openai_models_responses returned {responses}") urlIdxs = [ idx for idx, response in enumerate(responses) if response is not None and "pipelines" in response ] return { "data": [ { "url": request.app.state.config.OPENAI_API_BASE_URLS[urlIdx], "idx": urlIdx, } for urlIdx in urlIdxs ] } @router.post("/upload") async def upload_pipeline( request: Request, urlIdx: int = Form(...), file: UploadFile = File(...), user=Depends(get_admin_user), ): print("upload_pipeline", urlIdx, file.filename) # Check if the uploaded file is a python file if not (file.filename and file.filename.endswith(".py")): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Only Python (.py) files are allowed.", ) upload_folder = f"{CACHE_DIR}/pipelines" os.makedirs(upload_folder, exist_ok=True) file_path = os.path.join(upload_folder, file.filename) r = None try: # Save the uploaded file with open(file_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx] key = request.app.state.config.OPENAI_API_KEYS[urlIdx] with open(file_path, "rb") as f: files = {"file": f} r = requests.post( f"{url}/pipelines/upload", headers={"Authorization": f"Bearer {key}"}, files=files, ) r.raise_for_status() data = r.json() return {**data} except Exception as e: # Handle connection error here print(f"Connection error: {e}") detail = None status_code = status.HTTP_404_NOT_FOUND if r is not None: status_code = r.status_code try: res = r.json() if "detail" in res: detail = res["detail"] except Exception: pass raise HTTPException( status_code=status_code, detail=detail if detail else "Pipeline not found", ) finally: # Ensure the file is deleted after the upload is completed or on failure if os.path.exists(file_path): os.remove(file_path) class AddPipelineForm(BaseModel): url: str urlIdx: int @router.post("/add") async def add_pipeline( request: Request, form_data: AddPipelineForm, user=Depends(get_admin_user) ): r = None try: urlIdx = form_data.urlIdx url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx] key = request.app.state.config.OPENAI_API_KEYS[urlIdx] r = requests.post( f"{url}/pipelines/add", headers={"Authorization": f"Bearer {key}"}, json={"url": form_data.url}, ) r.raise_for_status() data = r.json() return {**data} except Exception as e: # Handle connection error here print(f"Connection error: {e}") detail = None if r is not None: try: res = r.json() if "detail" in res: detail = res["detail"] except Exception: pass raise HTTPException( status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND), detail=detail if detail else "Pipeline not found", ) class DeletePipelineForm(BaseModel): id: str urlIdx: int @router.delete("/delete") async def delete_pipeline( request: Request, form_data: DeletePipelineForm, user=Depends(get_admin_user) ): r = None try: urlIdx = form_data.urlIdx url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx] key = request.app.state.config.OPENAI_API_KEYS[urlIdx] r = requests.delete( f"{url}/pipelines/delete", headers={"Authorization": f"Bearer {key}"}, json={"id": form_data.id}, ) r.raise_for_status() data = r.json() return {**data} except Exception as e: # Handle connection error here print(f"Connection error: {e}") detail = None if r is not None: try: res = r.json() if "detail" in res: detail = res["detail"] except Exception: pass raise HTTPException( status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND), detail=detail if detail else "Pipeline not found", ) @router.get("/") async def get_pipelines( request: Request, urlIdx: Optional[int] = None, user=Depends(get_admin_user) ): r = None try: url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx] key = request.app.state.config.OPENAI_API_KEYS[urlIdx] r = requests.get(f"{url}/pipelines", headers={"Authorization": f"Bearer {key}"}) r.raise_for_status() data = r.json() return {**data} except Exception as e: # Handle connection error here print(f"Connection error: {e}") detail = None if r is not None: try: res = r.json() if "detail" in res: detail = res["detail"] except Exception: pass raise HTTPException( status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND), detail=detail if detail else "Pipeline not found", ) @router.get("/{pipeline_id}/valves") async def get_pipeline_valves( request: Request, urlIdx: Optional[int], pipeline_id: str, user=Depends(get_admin_user), ): r = None try: url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx] key = request.app.state.config.OPENAI_API_KEYS[urlIdx] r = requests.get( f"{url}/{pipeline_id}/valves", headers={"Authorization": f"Bearer {key}"} ) r.raise_for_status() data = r.json() return {**data} except Exception as e: # Handle connection error here print(f"Connection error: {e}") detail = None if r is not None: try: res = r.json() if "detail" in res: detail = res["detail"] except Exception: pass raise HTTPException( status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND), detail=detail if detail else "Pipeline not found", ) @router.get("/{pipeline_id}/valves/spec") async def get_pipeline_valves_spec( request: Request, urlIdx: Optional[int], pipeline_id: str, user=Depends(get_admin_user), ): r = None try: url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx] key = request.app.state.config.OPENAI_API_KEYS[urlIdx] r = requests.get( f"{url}/{pipeline_id}/valves/spec", headers={"Authorization": f"Bearer {key}"}, ) r.raise_for_status() data = r.json() return {**data} except Exception as e: # Handle connection error here print(f"Connection error: {e}") detail = None if r is not None: try: res = r.json() if "detail" in res: detail = res["detail"] except Exception: pass raise HTTPException( status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND), detail=detail if detail else "Pipeline not found", ) @router.post("/{pipeline_id}/valves/update") async def update_pipeline_valves( request: Request, urlIdx: Optional[int], pipeline_id: str, form_data: dict, user=Depends(get_admin_user), ): r = None try: url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx] key = request.app.state.config.OPENAI_API_KEYS[urlIdx] r = requests.post( f"{url}/{pipeline_id}/valves/update", headers={"Authorization": f"Bearer {key}"}, json={**form_data}, ) r.raise_for_status() data = r.json() return {**data} except Exception as e: # Handle connection error here print(f"Connection error: {e}") detail = None if r is not None: try: res = r.json() if "detail" in res: detail = res["detail"] except Exception: pass raise HTTPException( status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND), detail=detail if detail else "Pipeline not found", )