mirror of
https://github.com/open-webui/open-webui
synced 2024-11-30 07:47:34 +00:00
220 lines
5.7 KiB
Python
220 lines
5.7 KiB
Python
from fastapi import (
|
|
Depends,
|
|
FastAPI,
|
|
HTTPException,
|
|
status,
|
|
Request,
|
|
UploadFile,
|
|
File,
|
|
Form,
|
|
)
|
|
|
|
|
|
from datetime import datetime, timedelta
|
|
from typing import List, Union, Optional
|
|
from pathlib import Path
|
|
|
|
from fastapi import APIRouter
|
|
from fastapi.responses import StreamingResponse, JSONResponse, FileResponse
|
|
|
|
from pydantic import BaseModel
|
|
import json
|
|
|
|
from apps.webui.models.files import (
|
|
Files,
|
|
FileForm,
|
|
FileModel,
|
|
FileModelResponse,
|
|
)
|
|
from utils.utils import get_verified_user, get_admin_user
|
|
from constants import ERROR_MESSAGES
|
|
|
|
from importlib import util
|
|
import os
|
|
import uuid
|
|
import os, shutil, logging, re
|
|
|
|
|
|
from config import SRC_LOG_LEVELS, UPLOAD_DIR
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
log.setLevel(SRC_LOG_LEVELS["MODELS"])
|
|
|
|
|
|
router = APIRouter()
|
|
|
|
############################
|
|
# Upload File
|
|
############################
|
|
|
|
|
|
@router.post("/")
|
|
def upload_file(
|
|
file: UploadFile = File(...),
|
|
user=Depends(get_verified_user),
|
|
):
|
|
log.info(f"file.content_type: {file.content_type}")
|
|
try:
|
|
unsanitized_filename = file.filename
|
|
filename = os.path.basename(unsanitized_filename)
|
|
|
|
# replace filename with uuid
|
|
id = str(uuid.uuid4())
|
|
filename = f"{id}_{filename}"
|
|
file_path = f"{UPLOAD_DIR}/{filename}"
|
|
|
|
contents = file.file.read()
|
|
with open(file_path, "wb") as f:
|
|
f.write(contents)
|
|
f.close()
|
|
|
|
file = Files.insert_new_file(
|
|
user.id,
|
|
FileForm(
|
|
**{
|
|
"id": id,
|
|
"filename": filename,
|
|
"meta": {
|
|
"content_type": file.content_type,
|
|
"size": len(contents),
|
|
"path": file_path,
|
|
},
|
|
}
|
|
),
|
|
)
|
|
|
|
if file:
|
|
return file
|
|
else:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=ERROR_MESSAGES.DEFAULT("Error uploading file"),
|
|
)
|
|
|
|
except Exception as e:
|
|
log.exception(e)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=ERROR_MESSAGES.DEFAULT(e),
|
|
)
|
|
|
|
|
|
############################
|
|
# List Files
|
|
############################
|
|
|
|
|
|
@router.get("/", response_model=List[FileModel])
|
|
async def list_files(user=Depends(get_verified_user)):
|
|
files = Files.get_files()
|
|
return files
|
|
|
|
|
|
############################
|
|
# Delete All Files
|
|
############################
|
|
|
|
|
|
@router.delete("/all")
|
|
async def delete_all_files(user=Depends(get_admin_user)):
|
|
result = Files.delete_all_files()
|
|
|
|
if result:
|
|
folder = f"{UPLOAD_DIR}"
|
|
try:
|
|
# Check if the directory exists
|
|
if os.path.exists(folder):
|
|
# Iterate over all the files and directories in the specified directory
|
|
for filename in os.listdir(folder):
|
|
file_path = os.path.join(folder, filename)
|
|
try:
|
|
if os.path.isfile(file_path) or os.path.islink(file_path):
|
|
os.unlink(file_path) # Remove the file or link
|
|
elif os.path.isdir(file_path):
|
|
shutil.rmtree(file_path) # Remove the directory
|
|
except Exception as e:
|
|
print(f"Failed to delete {file_path}. Reason: {e}")
|
|
else:
|
|
print(f"The directory {folder} does not exist")
|
|
except Exception as e:
|
|
print(f"Failed to process the directory {folder}. Reason: {e}")
|
|
|
|
return {"message": "All files deleted successfully"}
|
|
else:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=ERROR_MESSAGES.DEFAULT("Error deleting files"),
|
|
)
|
|
|
|
|
|
############################
|
|
# Get File By Id
|
|
############################
|
|
|
|
|
|
@router.get("/{id}", response_model=Optional[FileModel])
|
|
async def get_file_by_id(id: str, user=Depends(get_verified_user)):
|
|
file = Files.get_file_by_id(id)
|
|
|
|
if file:
|
|
return file
|
|
else:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=ERROR_MESSAGES.NOT_FOUND,
|
|
)
|
|
|
|
|
|
############################
|
|
# Get File Content By Id
|
|
############################
|
|
|
|
|
|
@router.get("/{id}/content", response_model=Optional[FileModel])
|
|
async def get_file_content_by_id(id: str, user=Depends(get_verified_user)):
|
|
file = Files.get_file_by_id(id)
|
|
|
|
if file:
|
|
file_path = Path(file.meta["path"])
|
|
|
|
# Check if the file already exists in the cache
|
|
if file_path.is_file():
|
|
print(f"file_path: {file_path}")
|
|
return FileResponse(file_path)
|
|
else:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=ERROR_MESSAGES.NOT_FOUND,
|
|
)
|
|
else:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=ERROR_MESSAGES.NOT_FOUND,
|
|
)
|
|
|
|
|
|
############################
|
|
# Delete File By Id
|
|
############################
|
|
|
|
|
|
@router.delete("/{id}")
|
|
async def delete_file_by_id(id: str, user=Depends(get_verified_user)):
|
|
file = Files.get_file_by_id(id)
|
|
|
|
if file:
|
|
result = Files.delete_file_by_id(id)
|
|
if result:
|
|
return {"message": "File deleted successfully"}
|
|
else:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=ERROR_MESSAGES.DEFAULT("Error deleting file"),
|
|
)
|
|
else:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=ERROR_MESSAGES.NOT_FOUND,
|
|
)
|