diff --git a/backend/open_webui/routers/pipelines.py b/backend/open_webui/routers/pipelines.py index 9450d520b..f1cdae140 100644 --- a/backend/open_webui/routers/pipelines.py +++ b/backend/open_webui/routers/pipelines.py @@ -1,17 +1,33 @@ -from fastapi import APIRouter, Depends, HTTPException, Response, status +from fastapi import ( + Depends, + FastAPI, + File, + Form, + HTTPException, + Request, + UploadFile, + status, + APIRouter, +) +import os +import logging +import shutil +import requests from pydantic import BaseModel from starlette.responses import FileResponse +from typing import Optional - -from open_webui.models.chats import ChatTitleMessagesForm -from open_webui.config import DATA_DIR, ENABLE_ADMIN_EXPORT +from open_webui.env import SRC_LOG_LEVELS +from open_webui.config import CACHE_DIR from open_webui.constants import ERROR_MESSAGES -from open_webui.utils.misc import get_gravatar_url -from open_webui.utils.pdf_generator import PDFGenerator + +from open_webui.routers.openai import get_all_models_responses + from open_webui.utils.auth import get_admin_user -router = APIRouter() +log = logging.getLogger(__name__) +log.setLevel(SRC_LOG_LEVELS["MAIN"]) ################################## @@ -20,15 +36,14 @@ router = APIRouter() # ################################## - -# TODO: Refactor pipelines API endpoints below into a separate file +router = APIRouter() -@app.get("/api/pipelines/list") -async def get_pipelines_list(user=Depends(get_admin_user)): - responses = await get_openai_models_responses() - +@router.get("/api/pipelines/list") +async def get_pipelines_list(request: Request, user=Depends(get_admin_user)): + responses = await get_all_models_responses(request) log.debug(f"get_pipelines_list: get_openai_models_responses returned {responses}") + urlIdxs = [ idx for idx, response in enumerate(responses) @@ -38,7 +53,7 @@ async def get_pipelines_list(user=Depends(get_admin_user)): return { "data": [ { - "url": openai_app.state.config.OPENAI_API_BASE_URLS[urlIdx], + "url": request.app.state.config.OPENAI_API_BASE_URLS[urlIdx], "idx": urlIdx, } for urlIdx in urlIdxs @@ -46,9 +61,12 @@ async def get_pipelines_list(user=Depends(get_admin_user)): } -@app.post("/api/pipelines/upload") +@router.post("/api/pipelines/upload") async def upload_pipeline( - urlIdx: int = Form(...), file: UploadFile = File(...), user=Depends(get_admin_user) + request: Request, + urlIdx: int = Form(...), + file: UploadFile = File(...), + user=Depends(get_admin_user), ): print("upload_pipeline", urlIdx, file.filename) # Check if the uploaded file is a python file @@ -68,14 +86,16 @@ async def upload_pipeline( with open(file_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) - url = openai_app.state.config.OPENAI_API_BASE_URLS[urlIdx] - key = openai_app.state.config.OPENAI_API_KEYS[urlIdx] - - headers = {"Authorization": f"Bearer {key}"} + url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx] + key = request.app.state.config.OPENAI_API_KEYS[urlIdx] with open(file_path, "rb") as f: files = {"file": f} - r = requests.post(f"{url}/pipelines/upload", headers=headers, files=files) + r = requests.post( + f"{url}/pipelines/upload", + headers={"Authorization": f"Bearer {key}"}, + files=files, + ) r.raise_for_status() data = r.json() @@ -85,7 +105,7 @@ async def upload_pipeline( # Handle connection error here print(f"Connection error: {e}") - detail = "Pipeline not found" + detail = None status_code = status.HTTP_404_NOT_FOUND if r is not None: status_code = r.status_code @@ -98,7 +118,7 @@ async def upload_pipeline( raise HTTPException( status_code=status_code, - detail=detail, + detail=detail if detail else "Pipeline not found", ) finally: # Ensure the file is deleted after the upload is completed or on failure @@ -111,18 +131,21 @@ class AddPipelineForm(BaseModel): urlIdx: int -@app.post("/api/pipelines/add") -async def add_pipeline(form_data: AddPipelineForm, user=Depends(get_admin_user)): +@router.post("/api/pipelines/add") +async def add_pipeline( + request: Request, form_data: AddPipelineForm, user=Depends(get_admin_user) +): r = None try: urlIdx = form_data.urlIdx - url = openai_app.state.config.OPENAI_API_BASE_URLS[urlIdx] - key = openai_app.state.config.OPENAI_API_KEYS[urlIdx] + url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx] + key = request.app.state.config.OPENAI_API_KEYS[urlIdx] - headers = {"Authorization": f"Bearer {key}"} r = requests.post( - f"{url}/pipelines/add", headers=headers, json={"url": form_data.url} + f"{url}/pipelines/add", + headers={"Authorization": f"Bearer {key}"}, + json={"url": form_data.url}, ) r.raise_for_status() @@ -133,7 +156,7 @@ async def add_pipeline(form_data: AddPipelineForm, user=Depends(get_admin_user)) # Handle connection error here print(f"Connection error: {e}") - detail = "Pipeline not found" + detail = None if r is not None: try: res = r.json() @@ -144,7 +167,7 @@ async def add_pipeline(form_data: AddPipelineForm, user=Depends(get_admin_user)) raise HTTPException( status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND), - detail=detail, + detail=detail if detail else "Pipeline not found", ) @@ -153,18 +176,21 @@ class DeletePipelineForm(BaseModel): urlIdx: int -@app.delete("/api/pipelines/delete") -async def delete_pipeline(form_data: DeletePipelineForm, user=Depends(get_admin_user)): +@router.delete("/api/pipelines/delete") +async def delete_pipeline( + request: Request, form_data: DeletePipelineForm, user=Depends(get_admin_user) +): r = None try: urlIdx = form_data.urlIdx - url = openai_app.state.config.OPENAI_API_BASE_URLS[urlIdx] - key = openai_app.state.config.OPENAI_API_KEYS[urlIdx] + url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx] + key = request.app.state.config.OPENAI_API_KEYS[urlIdx] - headers = {"Authorization": f"Bearer {key}"} r = requests.delete( - f"{url}/pipelines/delete", headers=headers, json={"id": form_data.id} + f"{url}/pipelines/delete", + headers={"Authorization": f"Bearer {key}"}, + json={"id": form_data.id}, ) r.raise_for_status() @@ -175,7 +201,7 @@ async def delete_pipeline(form_data: DeletePipelineForm, user=Depends(get_admin_ # Handle connection error here print(f"Connection error: {e}") - detail = "Pipeline not found" + detail = None if r is not None: try: res = r.json() @@ -186,19 +212,20 @@ async def delete_pipeline(form_data: DeletePipelineForm, user=Depends(get_admin_ raise HTTPException( status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND), - detail=detail, + detail=detail if detail else "Pipeline not found", ) -@app.get("/api/pipelines") -async def get_pipelines(urlIdx: Optional[int] = None, user=Depends(get_admin_user)): +@router.get("/api/pipelines") +async def get_pipelines( + request: Request, urlIdx: Optional[int] = None, user=Depends(get_admin_user) +): r = None try: - url = openai_app.state.config.OPENAI_API_BASE_URLS[urlIdx] - key = openai_app.state.config.OPENAI_API_KEYS[urlIdx] + url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx] + key = request.app.state.config.OPENAI_API_KEYS[urlIdx] - headers = {"Authorization": f"Bearer {key}"} - r = requests.get(f"{url}/pipelines", headers=headers) + r = requests.get(f"{url}/pipelines", headers={"Authorization": f"Bearer {key}"}) r.raise_for_status() data = r.json() @@ -208,7 +235,7 @@ async def get_pipelines(urlIdx: Optional[int] = None, user=Depends(get_admin_use # Handle connection error here print(f"Connection error: {e}") - detail = "Pipeline not found" + detail = None if r is not None: try: res = r.json() @@ -219,23 +246,25 @@ async def get_pipelines(urlIdx: Optional[int] = None, user=Depends(get_admin_use raise HTTPException( status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND), - detail=detail, + detail=detail if detail else "Pipeline not found", ) -@app.get("/api/pipelines/{pipeline_id}/valves") +@router.get("/api/pipelines/{pipeline_id}/valves") async def get_pipeline_valves( + request: Request, urlIdx: Optional[int], pipeline_id: str, user=Depends(get_admin_user), ): r = None try: - url = openai_app.state.config.OPENAI_API_BASE_URLS[urlIdx] - key = openai_app.state.config.OPENAI_API_KEYS[urlIdx] + url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx] + key = request.app.state.config.OPENAI_API_KEYS[urlIdx] - headers = {"Authorization": f"Bearer {key}"} - r = requests.get(f"{url}/{pipeline_id}/valves", headers=headers) + r = requests.get( + f"{url}/{pipeline_id}/valves", headers={"Authorization": f"Bearer {key}"} + ) r.raise_for_status() data = r.json() @@ -245,8 +274,7 @@ async def get_pipeline_valves( # Handle connection error here print(f"Connection error: {e}") - detail = "Pipeline not found" - + detail = None if r is not None: try: res = r.json() @@ -257,23 +285,26 @@ async def get_pipeline_valves( raise HTTPException( status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND), - detail=detail, + detail=detail if detail else "Pipeline not found", ) -@app.get("/api/pipelines/{pipeline_id}/valves/spec") +@router.get("/api/pipelines/{pipeline_id}/valves/spec") async def get_pipeline_valves_spec( + request: Request, urlIdx: Optional[int], pipeline_id: str, user=Depends(get_admin_user), ): r = None try: - url = openai_app.state.config.OPENAI_API_BASE_URLS[urlIdx] - key = openai_app.state.config.OPENAI_API_KEYS[urlIdx] + url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx] + key = request.app.state.config.OPENAI_API_KEYS[urlIdx] - headers = {"Authorization": f"Bearer {key}"} - r = requests.get(f"{url}/{pipeline_id}/valves/spec", headers=headers) + r = requests.get( + f"{url}/{pipeline_id}/valves/spec", + headers={"Authorization": f"Bearer {key}"}, + ) r.raise_for_status() data = r.json() @@ -283,7 +314,7 @@ async def get_pipeline_valves_spec( # Handle connection error here print(f"Connection error: {e}") - detail = "Pipeline not found" + detail = None if r is not None: try: res = r.json() @@ -294,12 +325,13 @@ async def get_pipeline_valves_spec( raise HTTPException( status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND), - detail=detail, + detail=detail if detail else "Pipeline not found", ) -@app.post("/api/pipelines/{pipeline_id}/valves/update") +@router.post("/api/pipelines/{pipeline_id}/valves/update") async def update_pipeline_valves( + request: Request, urlIdx: Optional[int], pipeline_id: str, form_data: dict, @@ -307,13 +339,12 @@ async def update_pipeline_valves( ): r = None try: - url = openai_app.state.config.OPENAI_API_BASE_URLS[urlIdx] - key = openai_app.state.config.OPENAI_API_KEYS[urlIdx] + url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx] + key = request.app.state.config.OPENAI_API_KEYS[urlIdx] - headers = {"Authorization": f"Bearer {key}"} r = requests.post( f"{url}/{pipeline_id}/valves/update", - headers=headers, + headers={"Authorization": f"Bearer {key}"}, json={**form_data}, ) @@ -325,7 +356,7 @@ async def update_pipeline_valves( # Handle connection error here print(f"Connection error: {e}") - detail = "Pipeline not found" + detail = None if r is not None: try: @@ -337,5 +368,5 @@ async def update_pipeline_valves( raise HTTPException( status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND), - detail=detail, + detail=detail if detail else "Pipeline not found", )