import logging import os import shutil import uuid from pathlib import Path from typing import Optional from pydantic import BaseModel import mimetypes from open_webui.apps.webui.models.files import FileForm, FileModel, Files from open_webui.apps.retrieval.main import process_file, ProcessFileForm from open_webui.config import UPLOAD_DIR from open_webui.env import SRC_LOG_LEVELS from open_webui.constants import ERROR_MESSAGES from fastapi import APIRouter, Depends, File, HTTPException, UploadFile, status from fastapi.responses import FileResponse, StreamingResponse from open_webui.utils.utils import get_admin_user, get_verified_user log = logging.getLogger(__name__) log.setLevel(SRC_LOG_LEVELS["MODELS"]) router = APIRouter() ############################ # Upload File ############################ @router.post("/") def upload_file(file: UploadFile = File(...), user=Depends(get_verified_user)): log.info(f"file.content_type: {file.content_type}") try: unsanitized_filename = file.filename filename = os.path.basename(unsanitized_filename) # replace filename with uuid id = str(uuid.uuid4()) name = filename filename = f"{id}_{filename}" file_path = f"{UPLOAD_DIR}/{filename}" contents = file.file.read() if len(contents) == 0: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.EMPTY_CONTENT, ) with open(file_path, "wb") as f: f.write(contents) f.close() file = Files.insert_new_file( user.id, FileForm( **{ "id": id, "filename": filename, "meta": { "name": name, "content_type": file.content_type, "size": len(contents), "path": file_path, }, } ), ) try: process_file(ProcessFileForm(file_id=id)) file = Files.get_file_by_id(id=id) except Exception as e: log.exception(e) log.error(f"Error processing file: {file.id}") if file: return file else: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT("Error uploading file"), ) except Exception as e: log.exception(e) raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT(e), ) ############################ # List Files ############################ @router.get("/", response_model=list[FileModel]) async def list_files(user=Depends(get_verified_user)): if user.role == "admin": files = Files.get_files() else: files = Files.get_files_by_user_id(user.id) return files ############################ # Delete All Files ############################ @router.delete("/all") async def delete_all_files(user=Depends(get_admin_user)): result = Files.delete_all_files() if result: folder = f"{UPLOAD_DIR}" try: # Check if the directory exists if os.path.exists(folder): # Iterate over all the files and directories in the specified directory for filename in os.listdir(folder): file_path = os.path.join(folder, filename) try: if os.path.isfile(file_path) or os.path.islink(file_path): os.unlink(file_path) # Remove the file or link elif os.path.isdir(file_path): shutil.rmtree(file_path) # Remove the directory except Exception as e: print(f"Failed to delete {file_path}. Reason: {e}") else: print(f"The directory {folder} does not exist") except Exception as e: print(f"Failed to process the directory {folder}. Reason: {e}") return {"message": "All files deleted successfully"} else: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT("Error deleting files"), ) ############################ # Get File By Id ############################ @router.get("/{id}", response_model=Optional[FileModel]) async def get_file_by_id(id: str, user=Depends(get_verified_user)): file = Files.get_file_by_id(id) if file and (file.user_id == user.id or user.role == "admin"): return file else: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND, ) ############################ # Get File Data Content By Id ############################ @router.get("/{id}/data/content") async def get_file_data_content_by_id(id: str, user=Depends(get_verified_user)): file = Files.get_file_by_id(id) if file and (file.user_id == user.id or user.role == "admin"): return {"content": file.data.get("content", "")} else: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND, ) ############################ # Update File Data Content By Id ############################ class ContentForm(BaseModel): content: str @router.post("/{id}/data/content/update") async def update_file_data_content_by_id( id: str, form_data: ContentForm, user=Depends(get_verified_user) ): file = Files.get_file_by_id(id) if file and (file.user_id == user.id or user.role == "admin"): try: process_file(ProcessFileForm(file_id=id, content=form_data.content)) file = Files.get_file_by_id(id=id) except Exception as e: log.exception(e) log.error(f"Error processing file: {file.id}") return {"content": file.data.get("content", "")} else: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND, ) ############################ # Get File Content By Id ############################ @router.get("/{id}/content", response_model=Optional[FileModel]) async def get_file_content_by_id(id: str, user=Depends(get_verified_user)): file = Files.get_file_by_id(id) if file and (file.user_id == user.id or user.role == "admin"): file_path = Path(file.meta["path"]) # Check if the file already exists in the cache if file_path.is_file(): print(f"file_path: {file_path}") headers = { "Content-Disposition": f'attachment; filename="{file.meta.get("name", file.filename)}"' } return FileResponse(file_path, headers=headers) else: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND, ) else: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND, ) @router.get("/{id}/content/{file_name}", response_model=Optional[FileModel]) async def get_file_content_by_id(id: str, user=Depends(get_verified_user)): file = Files.get_file_by_id(id) if file and (file.user_id == user.id or user.role == "admin"): file_path = file.meta.get("path") if file_path: file_path = Path(file_path) # Check if the file already exists in the cache if file_path.is_file(): print(f"file_path: {file_path}") return FileResponse(file_path) else: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND, ) else: # File path doesn’t exist, return the content as .txt if possible file_content = file.content.get("content", "") file_name = file.filename # Create a generator that encodes the file content def generator(): yield file_content.encode("utf-8") return StreamingResponse( generator(), media_type="text/plain", headers={"Content-Disposition": f"attachment; filename={file_name}"}, ) else: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND, ) ############################ # Delete File By Id ############################ @router.delete("/{id}") async def delete_file_by_id(id: str, user=Depends(get_verified_user)): file = Files.get_file_by_id(id) if file and (file.user_id == user.id or user.role == "admin"): result = Files.delete_file_by_id(id) if result: return {"message": "File deleted successfully"} else: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT("Error deleting file"), ) else: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND, )