wip: pipelines

This commit is contained in:
Timothy Jaeryang Baek 2024-12-11 18:16:07 -08:00
parent b3987ad41e
commit 9e85ed861d

View File

@ -1,17 +1,33 @@
from fastapi import APIRouter, Depends, HTTPException, Response, status from fastapi import (
Depends,
FastAPI,
File,
Form,
HTTPException,
Request,
UploadFile,
status,
APIRouter,
)
import os
import logging
import shutil
import requests
from pydantic import BaseModel from pydantic import BaseModel
from starlette.responses import FileResponse from starlette.responses import FileResponse
from typing import Optional
from open_webui.env import SRC_LOG_LEVELS
from open_webui.models.chats import ChatTitleMessagesForm from open_webui.config import CACHE_DIR
from open_webui.config import DATA_DIR, ENABLE_ADMIN_EXPORT
from open_webui.constants import ERROR_MESSAGES from open_webui.constants import ERROR_MESSAGES
from open_webui.utils.misc import get_gravatar_url
from open_webui.utils.pdf_generator import PDFGenerator from open_webui.routers.openai import get_all_models_responses
from open_webui.utils.auth import get_admin_user from open_webui.utils.auth import get_admin_user
router = APIRouter() log = logging.getLogger(__name__)
log.setLevel(SRC_LOG_LEVELS["MAIN"])
################################## ##################################
@ -20,15 +36,14 @@ router = APIRouter()
# #
################################## ##################################
router = APIRouter()
# TODO: Refactor pipelines API endpoints below into a separate file
@app.get("/api/pipelines/list") @router.get("/api/pipelines/list")
async def get_pipelines_list(user=Depends(get_admin_user)): async def get_pipelines_list(request: Request, user=Depends(get_admin_user)):
responses = await get_openai_models_responses() responses = await get_all_models_responses(request)
log.debug(f"get_pipelines_list: get_openai_models_responses returned {responses}") log.debug(f"get_pipelines_list: get_openai_models_responses returned {responses}")
urlIdxs = [ urlIdxs = [
idx idx
for idx, response in enumerate(responses) for idx, response in enumerate(responses)
@ -38,7 +53,7 @@ async def get_pipelines_list(user=Depends(get_admin_user)):
return { return {
"data": [ "data": [
{ {
"url": openai_app.state.config.OPENAI_API_BASE_URLS[urlIdx], "url": request.app.state.config.OPENAI_API_BASE_URLS[urlIdx],
"idx": urlIdx, "idx": urlIdx,
} }
for urlIdx in urlIdxs for urlIdx in urlIdxs
@ -46,9 +61,12 @@ async def get_pipelines_list(user=Depends(get_admin_user)):
} }
@app.post("/api/pipelines/upload") @router.post("/api/pipelines/upload")
async def upload_pipeline( async def upload_pipeline(
urlIdx: int = Form(...), file: UploadFile = File(...), user=Depends(get_admin_user) request: Request,
urlIdx: int = Form(...),
file: UploadFile = File(...),
user=Depends(get_admin_user),
): ):
print("upload_pipeline", urlIdx, file.filename) print("upload_pipeline", urlIdx, file.filename)
# Check if the uploaded file is a python file # Check if the uploaded file is a python file
@ -68,14 +86,16 @@ async def upload_pipeline(
with open(file_path, "wb") as buffer: with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer) shutil.copyfileobj(file.file, buffer)
url = openai_app.state.config.OPENAI_API_BASE_URLS[urlIdx] url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx]
key = openai_app.state.config.OPENAI_API_KEYS[urlIdx] key = request.app.state.config.OPENAI_API_KEYS[urlIdx]
headers = {"Authorization": f"Bearer {key}"}
with open(file_path, "rb") as f: with open(file_path, "rb") as f:
files = {"file": f} files = {"file": f}
r = requests.post(f"{url}/pipelines/upload", headers=headers, files=files) r = requests.post(
f"{url}/pipelines/upload",
headers={"Authorization": f"Bearer {key}"},
files=files,
)
r.raise_for_status() r.raise_for_status()
data = r.json() data = r.json()
@ -85,7 +105,7 @@ async def upload_pipeline(
# Handle connection error here # Handle connection error here
print(f"Connection error: {e}") print(f"Connection error: {e}")
detail = "Pipeline not found" detail = None
status_code = status.HTTP_404_NOT_FOUND status_code = status.HTTP_404_NOT_FOUND
if r is not None: if r is not None:
status_code = r.status_code status_code = r.status_code
@ -98,7 +118,7 @@ async def upload_pipeline(
raise HTTPException( raise HTTPException(
status_code=status_code, status_code=status_code,
detail=detail, detail=detail if detail else "Pipeline not found",
) )
finally: finally:
# Ensure the file is deleted after the upload is completed or on failure # Ensure the file is deleted after the upload is completed or on failure
@ -111,18 +131,21 @@ class AddPipelineForm(BaseModel):
urlIdx: int urlIdx: int
@app.post("/api/pipelines/add") @router.post("/api/pipelines/add")
async def add_pipeline(form_data: AddPipelineForm, user=Depends(get_admin_user)): async def add_pipeline(
request: Request, form_data: AddPipelineForm, user=Depends(get_admin_user)
):
r = None r = None
try: try:
urlIdx = form_data.urlIdx urlIdx = form_data.urlIdx
url = openai_app.state.config.OPENAI_API_BASE_URLS[urlIdx] url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx]
key = openai_app.state.config.OPENAI_API_KEYS[urlIdx] key = request.app.state.config.OPENAI_API_KEYS[urlIdx]
headers = {"Authorization": f"Bearer {key}"}
r = requests.post( r = requests.post(
f"{url}/pipelines/add", headers=headers, json={"url": form_data.url} f"{url}/pipelines/add",
headers={"Authorization": f"Bearer {key}"},
json={"url": form_data.url},
) )
r.raise_for_status() r.raise_for_status()
@ -133,7 +156,7 @@ async def add_pipeline(form_data: AddPipelineForm, user=Depends(get_admin_user))
# Handle connection error here # Handle connection error here
print(f"Connection error: {e}") print(f"Connection error: {e}")
detail = "Pipeline not found" detail = None
if r is not None: if r is not None:
try: try:
res = r.json() res = r.json()
@ -144,7 +167,7 @@ async def add_pipeline(form_data: AddPipelineForm, user=Depends(get_admin_user))
raise HTTPException( raise HTTPException(
status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND), status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND),
detail=detail, detail=detail if detail else "Pipeline not found",
) )
@ -153,18 +176,21 @@ class DeletePipelineForm(BaseModel):
urlIdx: int urlIdx: int
@app.delete("/api/pipelines/delete") @router.delete("/api/pipelines/delete")
async def delete_pipeline(form_data: DeletePipelineForm, user=Depends(get_admin_user)): async def delete_pipeline(
request: Request, form_data: DeletePipelineForm, user=Depends(get_admin_user)
):
r = None r = None
try: try:
urlIdx = form_data.urlIdx urlIdx = form_data.urlIdx
url = openai_app.state.config.OPENAI_API_BASE_URLS[urlIdx] url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx]
key = openai_app.state.config.OPENAI_API_KEYS[urlIdx] key = request.app.state.config.OPENAI_API_KEYS[urlIdx]
headers = {"Authorization": f"Bearer {key}"}
r = requests.delete( r = requests.delete(
f"{url}/pipelines/delete", headers=headers, json={"id": form_data.id} f"{url}/pipelines/delete",
headers={"Authorization": f"Bearer {key}"},
json={"id": form_data.id},
) )
r.raise_for_status() r.raise_for_status()
@ -175,7 +201,7 @@ async def delete_pipeline(form_data: DeletePipelineForm, user=Depends(get_admin_
# Handle connection error here # Handle connection error here
print(f"Connection error: {e}") print(f"Connection error: {e}")
detail = "Pipeline not found" detail = None
if r is not None: if r is not None:
try: try:
res = r.json() res = r.json()
@ -186,19 +212,20 @@ async def delete_pipeline(form_data: DeletePipelineForm, user=Depends(get_admin_
raise HTTPException( raise HTTPException(
status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND), status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND),
detail=detail, detail=detail if detail else "Pipeline not found",
) )
@app.get("/api/pipelines") @router.get("/api/pipelines")
async def get_pipelines(urlIdx: Optional[int] = None, user=Depends(get_admin_user)): async def get_pipelines(
request: Request, urlIdx: Optional[int] = None, user=Depends(get_admin_user)
):
r = None r = None
try: try:
url = openai_app.state.config.OPENAI_API_BASE_URLS[urlIdx] url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx]
key = openai_app.state.config.OPENAI_API_KEYS[urlIdx] key = request.app.state.config.OPENAI_API_KEYS[urlIdx]
headers = {"Authorization": f"Bearer {key}"} r = requests.get(f"{url}/pipelines", headers={"Authorization": f"Bearer {key}"})
r = requests.get(f"{url}/pipelines", headers=headers)
r.raise_for_status() r.raise_for_status()
data = r.json() data = r.json()
@ -208,7 +235,7 @@ async def get_pipelines(urlIdx: Optional[int] = None, user=Depends(get_admin_use
# Handle connection error here # Handle connection error here
print(f"Connection error: {e}") print(f"Connection error: {e}")
detail = "Pipeline not found" detail = None
if r is not None: if r is not None:
try: try:
res = r.json() res = r.json()
@ -219,23 +246,25 @@ async def get_pipelines(urlIdx: Optional[int] = None, user=Depends(get_admin_use
raise HTTPException( raise HTTPException(
status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND), status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND),
detail=detail, detail=detail if detail else "Pipeline not found",
) )
@app.get("/api/pipelines/{pipeline_id}/valves") @router.get("/api/pipelines/{pipeline_id}/valves")
async def get_pipeline_valves( async def get_pipeline_valves(
request: Request,
urlIdx: Optional[int], urlIdx: Optional[int],
pipeline_id: str, pipeline_id: str,
user=Depends(get_admin_user), user=Depends(get_admin_user),
): ):
r = None r = None
try: try:
url = openai_app.state.config.OPENAI_API_BASE_URLS[urlIdx] url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx]
key = openai_app.state.config.OPENAI_API_KEYS[urlIdx] key = request.app.state.config.OPENAI_API_KEYS[urlIdx]
headers = {"Authorization": f"Bearer {key}"} r = requests.get(
r = requests.get(f"{url}/{pipeline_id}/valves", headers=headers) f"{url}/{pipeline_id}/valves", headers={"Authorization": f"Bearer {key}"}
)
r.raise_for_status() r.raise_for_status()
data = r.json() data = r.json()
@ -245,8 +274,7 @@ async def get_pipeline_valves(
# Handle connection error here # Handle connection error here
print(f"Connection error: {e}") print(f"Connection error: {e}")
detail = "Pipeline not found" detail = None
if r is not None: if r is not None:
try: try:
res = r.json() res = r.json()
@ -257,23 +285,26 @@ async def get_pipeline_valves(
raise HTTPException( raise HTTPException(
status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND), status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND),
detail=detail, detail=detail if detail else "Pipeline not found",
) )
@app.get("/api/pipelines/{pipeline_id}/valves/spec") @router.get("/api/pipelines/{pipeline_id}/valves/spec")
async def get_pipeline_valves_spec( async def get_pipeline_valves_spec(
request: Request,
urlIdx: Optional[int], urlIdx: Optional[int],
pipeline_id: str, pipeline_id: str,
user=Depends(get_admin_user), user=Depends(get_admin_user),
): ):
r = None r = None
try: try:
url = openai_app.state.config.OPENAI_API_BASE_URLS[urlIdx] url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx]
key = openai_app.state.config.OPENAI_API_KEYS[urlIdx] key = request.app.state.config.OPENAI_API_KEYS[urlIdx]
headers = {"Authorization": f"Bearer {key}"} r = requests.get(
r = requests.get(f"{url}/{pipeline_id}/valves/spec", headers=headers) f"{url}/{pipeline_id}/valves/spec",
headers={"Authorization": f"Bearer {key}"},
)
r.raise_for_status() r.raise_for_status()
data = r.json() data = r.json()
@ -283,7 +314,7 @@ async def get_pipeline_valves_spec(
# Handle connection error here # Handle connection error here
print(f"Connection error: {e}") print(f"Connection error: {e}")
detail = "Pipeline not found" detail = None
if r is not None: if r is not None:
try: try:
res = r.json() res = r.json()
@ -294,12 +325,13 @@ async def get_pipeline_valves_spec(
raise HTTPException( raise HTTPException(
status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND), status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND),
detail=detail, detail=detail if detail else "Pipeline not found",
) )
@app.post("/api/pipelines/{pipeline_id}/valves/update") @router.post("/api/pipelines/{pipeline_id}/valves/update")
async def update_pipeline_valves( async def update_pipeline_valves(
request: Request,
urlIdx: Optional[int], urlIdx: Optional[int],
pipeline_id: str, pipeline_id: str,
form_data: dict, form_data: dict,
@ -307,13 +339,12 @@ async def update_pipeline_valves(
): ):
r = None r = None
try: try:
url = openai_app.state.config.OPENAI_API_BASE_URLS[urlIdx] url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx]
key = openai_app.state.config.OPENAI_API_KEYS[urlIdx] key = request.app.state.config.OPENAI_API_KEYS[urlIdx]
headers = {"Authorization": f"Bearer {key}"}
r = requests.post( r = requests.post(
f"{url}/{pipeline_id}/valves/update", f"{url}/{pipeline_id}/valves/update",
headers=headers, headers={"Authorization": f"Bearer {key}"},
json={**form_data}, json={**form_data},
) )
@ -325,7 +356,7 @@ async def update_pipeline_valves(
# Handle connection error here # Handle connection error here
print(f"Connection error: {e}") print(f"Connection error: {e}")
detail = "Pipeline not found" detail = None
if r is not None: if r is not None:
try: try:
@ -337,5 +368,5 @@ async def update_pipeline_valves(
raise HTTPException( raise HTTPException(
status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND), status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND),
detail=detail, detail=detail if detail else "Pipeline not found",
) )