open-webui/backend/open_webui/routers/prompts.py
Timothy Jaeryang Baek bdc60e7850 chore: format backend
2025-01-19 11:59:07 -08:00

158 lines
4.5 KiB
Python

from typing import Optional
from open_webui.models.prompts import (
PromptForm,
PromptUserResponse,
PromptModel,
Prompts,
)
from open_webui.constants import ERROR_MESSAGES
from fastapi import APIRouter, Depends, HTTPException, status, Request
from open_webui.utils.auth import get_admin_user, get_verified_user
from open_webui.utils.access_control import has_access, has_permission
router = APIRouter()
############################
# GetPrompts
############################
@router.get("/", response_model=list[PromptModel])
async def get_prompts(user=Depends(get_verified_user)):
if user.role == "admin":
prompts = Prompts.get_prompts()
else:
prompts = Prompts.get_prompts_by_user_id(user.id, "read")
return prompts
@router.get("/list", response_model=list[PromptUserResponse])
async def get_prompt_list(user=Depends(get_verified_user)):
if user.role == "admin":
prompts = Prompts.get_prompts()
else:
prompts = Prompts.get_prompts_by_user_id(user.id, "write")
return prompts
############################
# CreateNewPrompt
############################
@router.post("/create", response_model=Optional[PromptModel])
async def create_new_prompt(
request: Request, form_data: PromptForm, user=Depends(get_verified_user)
):
if user.role != "admin" and not has_permission(
user.id, "workspace.prompts", request.app.state.config.USER_PERMISSIONS
):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=ERROR_MESSAGES.UNAUTHORIZED,
)
prompt = Prompts.get_prompt_by_command(form_data.command)
if prompt is None:
prompt = Prompts.insert_new_prompt(user.id, form_data)
if prompt:
return prompt
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.DEFAULT(),
)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.COMMAND_TAKEN,
)
############################
# GetPromptByCommand
############################
@router.get("/command/{command}", response_model=Optional[PromptModel])
async def get_prompt_by_command(command: str, user=Depends(get_verified_user)):
prompt = Prompts.get_prompt_by_command(f"/{command}")
if prompt:
if (
user.role == "admin"
or prompt.user_id == user.id
or has_access(user.id, "read", prompt.access_control)
):
return prompt
else:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=ERROR_MESSAGES.NOT_FOUND,
)
############################
# UpdatePromptByCommand
############################
@router.post("/command/{command}/update", response_model=Optional[PromptModel])
async def update_prompt_by_command(
command: str,
form_data: PromptForm,
user=Depends(get_verified_user),
):
prompt = Prompts.get_prompt_by_command(f"/{command}")
if not prompt:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=ERROR_MESSAGES.NOT_FOUND,
)
# Is the user the original creator, in a group with write access, or an admin
if (
prompt.user_id != user.id
and not has_access(user.id, "write", prompt.access_control)
and user.role != "admin"
):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
prompt = Prompts.update_prompt_by_command(f"/{command}", form_data)
if prompt:
return prompt
else:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
############################
# DeletePromptByCommand
############################
@router.delete("/command/{command}/delete", response_model=bool)
async def delete_prompt_by_command(command: str, user=Depends(get_verified_user)):
prompt = Prompts.get_prompt_by_command(f"/{command}")
if not prompt:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=ERROR_MESSAGES.NOT_FOUND,
)
if prompt.user_id != user.id and user.role != "admin":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
result = Prompts.delete_prompt_by_command(f"/{command}")
return result