2024-08-27 22:10:27 +00:00
|
|
|
from typing import Optional
|
2024-01-03 04:51:19 +00:00
|
|
|
|
2024-11-18 14:19:34 +00:00
|
|
|
from open_webui.apps.webui.models.prompts import (
|
|
|
|
PromptForm,
|
|
|
|
PromptUserResponse,
|
|
|
|
PromptModel,
|
|
|
|
Prompts,
|
|
|
|
)
|
2024-09-04 14:54:48 +00:00
|
|
|
from open_webui.constants import ERROR_MESSAGES
|
2024-11-17 11:04:31 +00:00
|
|
|
from fastapi import APIRouter, Depends, HTTPException, status, Request
|
2024-12-09 00:01:56 +00:00
|
|
|
from open_webui.utils.auth import get_admin_user, get_verified_user
|
2024-11-17 11:04:31 +00:00
|
|
|
from open_webui.utils.access_control import has_access, has_permission
|
2024-01-03 04:51:19 +00:00
|
|
|
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
|
|
############################
|
|
|
|
# GetPrompts
|
|
|
|
############################
|
|
|
|
|
|
|
|
|
2024-08-14 12:46:31 +00:00
|
|
|
@router.get("/", response_model=list[PromptModel])
|
2024-06-27 18:29:59 +00:00
|
|
|
async def get_prompts(user=Depends(get_verified_user)):
|
2024-11-17 01:09:15 +00:00
|
|
|
if user.role == "admin":
|
|
|
|
prompts = Prompts.get_prompts()
|
|
|
|
else:
|
|
|
|
prompts = Prompts.get_prompts_by_user_id(user.id, "read")
|
|
|
|
|
|
|
|
return prompts
|
|
|
|
|
|
|
|
|
2024-11-18 14:19:34 +00:00
|
|
|
@router.get("/list", response_model=list[PromptUserResponse])
|
2024-11-17 01:09:15 +00:00
|
|
|
async def get_prompt_list(user=Depends(get_verified_user)):
|
|
|
|
if user.role == "admin":
|
|
|
|
prompts = Prompts.get_prompts()
|
|
|
|
else:
|
|
|
|
prompts = Prompts.get_prompts_by_user_id(user.id, "write")
|
|
|
|
|
|
|
|
return prompts
|
2024-01-03 04:51:19 +00:00
|
|
|
|
|
|
|
|
|
|
|
############################
|
|
|
|
# CreateNewPrompt
|
|
|
|
############################
|
|
|
|
|
|
|
|
|
|
|
|
@router.post("/create", response_model=Optional[PromptModel])
|
2024-11-17 11:04:31 +00:00
|
|
|
async def create_new_prompt(
|
|
|
|
request: Request, form_data: PromptForm, user=Depends(get_verified_user)
|
|
|
|
):
|
|
|
|
if user.role != "admin" and not has_permission(
|
|
|
|
user.id, "workspace.prompts", request.app.state.config.USER_PERMISSIONS
|
|
|
|
):
|
|
|
|
raise HTTPException(
|
|
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
|
|
detail=ERROR_MESSAGES.UNAUTHORIZED,
|
|
|
|
)
|
|
|
|
|
2024-06-21 12:58:57 +00:00
|
|
|
prompt = Prompts.get_prompt_by_command(form_data.command)
|
2024-08-14 12:39:53 +00:00
|
|
|
if prompt is None:
|
2024-06-21 12:58:57 +00:00
|
|
|
prompt = Prompts.insert_new_prompt(user.id, form_data)
|
2024-01-03 05:35:47 +00:00
|
|
|
|
|
|
|
if prompt:
|
|
|
|
return prompt
|
2024-01-03 04:51:19 +00:00
|
|
|
raise HTTPException(
|
2024-01-03 05:35:47 +00:00
|
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
2024-02-09 00:05:01 +00:00
|
|
|
detail=ERROR_MESSAGES.DEFAULT(),
|
2024-01-03 04:51:19 +00:00
|
|
|
)
|
2024-02-09 00:05:01 +00:00
|
|
|
raise HTTPException(
|
|
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
|
|
detail=ERROR_MESSAGES.COMMAND_TAKEN,
|
|
|
|
)
|
2024-01-03 04:51:19 +00:00
|
|
|
|
|
|
|
|
|
|
|
############################
|
|
|
|
# GetPromptByCommand
|
|
|
|
############################
|
|
|
|
|
|
|
|
|
2024-01-07 01:55:41 +00:00
|
|
|
@router.get("/command/{command}", response_model=Optional[PromptModel])
|
2024-06-27 18:29:59 +00:00
|
|
|
async def get_prompt_by_command(command: str, user=Depends(get_verified_user)):
|
2024-06-21 12:58:57 +00:00
|
|
|
prompt = Prompts.get_prompt_by_command(f"/{command}")
|
2024-01-03 04:51:19 +00:00
|
|
|
|
|
|
|
if prompt:
|
2024-11-17 02:00:49 +00:00
|
|
|
if (
|
|
|
|
user.role == "admin"
|
|
|
|
or prompt.user_id == user.id
|
|
|
|
or has_access(user.id, "read", prompt.access_control)
|
|
|
|
):
|
|
|
|
return prompt
|
2024-01-03 04:51:19 +00:00
|
|
|
else:
|
|
|
|
raise HTTPException(
|
|
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
|
|
detail=ERROR_MESSAGES.NOT_FOUND,
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
############################
|
|
|
|
# UpdatePromptByCommand
|
|
|
|
############################
|
|
|
|
|
|
|
|
|
2024-01-07 01:55:41 +00:00
|
|
|
@router.post("/command/{command}/update", response_model=Optional[PromptModel])
|
|
|
|
async def update_prompt_by_command(
|
2024-06-18 13:03:31 +00:00
|
|
|
command: str,
|
|
|
|
form_data: PromptForm,
|
2024-11-17 01:09:15 +00:00
|
|
|
user=Depends(get_verified_user),
|
2024-01-07 01:55:41 +00:00
|
|
|
):
|
2024-11-17 02:00:49 +00:00
|
|
|
prompt = Prompts.get_prompt_by_command(f"/{command}")
|
|
|
|
if not prompt:
|
|
|
|
raise HTTPException(
|
|
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
|
|
detail=ERROR_MESSAGES.NOT_FOUND,
|
|
|
|
)
|
|
|
|
|
|
|
|
if prompt.user_id != user.id and user.role != "admin":
|
|
|
|
raise HTTPException(
|
|
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
|
|
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
|
|
|
|
)
|
|
|
|
|
2024-06-21 12:58:57 +00:00
|
|
|
prompt = Prompts.update_prompt_by_command(f"/{command}", form_data)
|
2024-01-03 04:51:19 +00:00
|
|
|
if prompt:
|
|
|
|
return prompt
|
|
|
|
else:
|
|
|
|
raise HTTPException(
|
|
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
|
|
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
############################
|
|
|
|
# DeletePromptByCommand
|
|
|
|
############################
|
|
|
|
|
|
|
|
|
2024-01-07 01:55:41 +00:00
|
|
|
@router.delete("/command/{command}/delete", response_model=bool)
|
2024-11-17 01:09:15 +00:00
|
|
|
async def delete_prompt_by_command(command: str, user=Depends(get_verified_user)):
|
2024-11-17 02:00:49 +00:00
|
|
|
prompt = Prompts.get_prompt_by_command(f"/{command}")
|
|
|
|
if not prompt:
|
|
|
|
raise HTTPException(
|
|
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
|
|
detail=ERROR_MESSAGES.NOT_FOUND,
|
|
|
|
)
|
|
|
|
|
|
|
|
if prompt.user_id != user.id and user.role != "admin":
|
|
|
|
raise HTTPException(
|
|
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
|
|
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
|
|
|
|
)
|
|
|
|
|
2024-06-21 12:58:57 +00:00
|
|
|
result = Prompts.delete_prompt_by_command(f"/{command}")
|
2024-01-03 04:51:19 +00:00
|
|
|
return result
|