open-webui/backend/open_webui/routers/files.py
Timothy Jaeryang Baek 021e25ade1
Merge pull request #11087 from tarmst/fix-files-access-control
fix: Add access control usage to files APIs
2025-03-31 01:05:08 -07:00

506 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import os
import uuid
from pathlib import Path
from typing import Optional
from urllib.parse import quote
from fastapi import (
APIRouter,
Depends,
File,
HTTPException,
Request,
UploadFile,
status,
Query,
)
from fastapi.responses import FileResponse, StreamingResponse
from open_webui.constants import ERROR_MESSAGES
from open_webui.env import SRC_LOG_LEVELS
from open_webui.models.files import (
FileForm,
FileModel,
FileModelResponse,
Files,
)
from open_webui.routers.knowledge import get_knowledge, get_knowledge_list
from open_webui.routers.retrieval import ProcessFileForm, process_file
from open_webui.routers.audio import transcribe
from open_webui.storage.provider import Storage
from open_webui.utils.auth import get_admin_user, get_verified_user
from pydantic import BaseModel
log = logging.getLogger(__name__)
log.setLevel(SRC_LOG_LEVELS["MODELS"])
router = APIRouter()
############################
# Check if the current user has access to a file through any knowledge bases the user may be in.
############################
async def check_user_has_access_to_file_via_any_knowledge_base(file_id: Optional[str], access_type: str, user=Depends(get_verified_user)) -> bool:
file = Files.get_file_by_id(file_id)
log.debug(f"Checking if user has {access_type} access to file")
if not file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
has_access = False
knowledge_base_id = file.meta.get("collection_name") if file.meta else None
log.debug(f"Knowledge base associated with file: {knowledge_base_id}")
if knowledge_base_id:
if access_type == "read":
user_access = await get_knowledge(user=user) # get_knowledge checks for read access
elif access_type == "write":
user_access = await get_knowledge_list(user=user) # get_knowledge_list checks for write access
else:
user_access = list()
for knowledge_base in user_access:
if knowledge_base.id == knowledge_base_id:
log.debug(f"User knowledge base with {access_type} access {knowledge_base.id} == File knowledge base {knowledge_base_id}")
has_access = True
break
log.debug(f"Does user have {access_type} access to file: {has_access}")
return has_access
############################
# Upload File
############################
@router.post("/", response_model=FileModelResponse)
def upload_file(
request: Request,
file: UploadFile = File(...),
user=Depends(get_verified_user),
file_metadata: dict = {},
process: bool = Query(True),
):
log.info(f"file.content_type: {file.content_type}")
try:
unsanitized_filename = file.filename
filename = os.path.basename(unsanitized_filename)
# replace filename with uuid
id = str(uuid.uuid4())
name = filename
filename = f"{id}_{filename}"
contents, file_path = Storage.upload_file(file.file, filename)
file_item = Files.insert_new_file(
user.id,
FileForm(
**{
"id": id,
"filename": name,
"path": file_path,
"meta": {
"name": name,
"content_type": file.content_type,
"size": len(contents),
"data": file_metadata,
},
}
),
)
if process:
try:
if file.content_type in [
"audio/mpeg",
"audio/wav",
"audio/ogg",
"audio/x-m4a",
]:
file_path = Storage.get_file(file_path)
result = transcribe(request, file_path)
process_file(
request,
ProcessFileForm(file_id=id, content=result.get("text", "")),
user=user,
)
elif file.content_type not in ["image/png", "image/jpeg", "image/gif"]:
process_file(request, ProcessFileForm(file_id=id), user=user)
file_item = Files.get_file_by_id(id=id)
except Exception as e:
log.exception(e)
log.error(f"Error processing file: {file_item.id}")
file_item = FileModelResponse(
**{
**file_item.model_dump(),
"error": str(e.detail) if hasattr(e, "detail") else str(e),
}
)
if file_item:
return file_item
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.DEFAULT("Error uploading file"),
)
except Exception as e:
log.exception(e)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.DEFAULT(e),
)
############################
# List Files
############################
@router.get("/", response_model=list[FileModelResponse])
async def list_files(user=Depends(get_verified_user)):
if user.role == "admin":
files = Files.get_files()
else:
files = Files.get_files_by_user_id(user.id)
return files
############################
# Delete All Files
############################
@router.delete("/all")
async def delete_all_files(user=Depends(get_admin_user)):
result = Files.delete_all_files()
if result:
try:
Storage.delete_all_files()
except Exception as e:
log.exception(e)
log.error("Error deleting files")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.DEFAULT("Error deleting files"),
)
return {"message": "All files deleted successfully"}
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.DEFAULT("Error deleting files"),
)
############################
# Get File By Id
############################
@router.get("/{id}", response_model=Optional[FileModel])
async def get_file_by_id(id: str, user=Depends(get_verified_user)):
file = Files.get_file_by_id(id)
if not file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
has_read_access: bool = await check_user_has_access_to_file_via_any_knowledge_base(id, "read", user)
if file.user_id == user.id or user.role == "admin" or has_read_access:
return file
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
############################
# Get File Data Content By Id
############################
@router.get("/{id}/data/content")
async def get_file_data_content_by_id(id: str, user=Depends(get_verified_user)):
file = Files.get_file_by_id(id)
if not file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
has_read_access: bool = await check_user_has_access_to_file_via_any_knowledge_base(id, "read", user)
if file.user_id == user.id or user.role == "admin" or has_read_access:
return {"content": file.data.get("content", "")}
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
############################
# Update File Data Content By Id
############################
class ContentForm(BaseModel):
content: str
@router.post("/{id}/data/content/update")
async def update_file_data_content_by_id(
request: Request, id: str, form_data: ContentForm, user=Depends(get_verified_user)
):
file = Files.get_file_by_id(id)
if not file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
has_write_access: bool = await check_user_has_access_to_file_via_any_knowledge_base(id, "write", user)
if file.user_id == user.id or user.role == "admin" or has_write_access:
try:
process_file(
request,
ProcessFileForm(file_id=id, content=form_data.content),
user=user,
)
file = Files.get_file_by_id(id=id)
except Exception as e:
log.exception(e)
log.error(f"Error processing file: {file.id}")
return {"content": file.data.get("content", "")}
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
############################
# Get File Content By Id
############################
@router.get("/{id}/content")
async def get_file_content_by_id(
id: str, user=Depends(get_verified_user), attachment: bool = Query(False)
):
file = Files.get_file_by_id(id)
if not file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
has_read_access: bool = await check_user_has_access_to_file_via_any_knowledge_base(id, "read", user)
if file.user_id == user.id or user.role == "admin" or has_read_access:
try:
file_path = Storage.get_file(file.path)
file_path = Path(file_path)
# Check if the file already exists in the cache
if file_path.is_file():
# Handle Unicode filenames
filename = file.meta.get("name", file.filename)
encoded_filename = quote(filename) # RFC5987 encoding
content_type = file.meta.get("content_type")
filename = file.meta.get("name", file.filename)
encoded_filename = quote(filename)
headers = {}
if attachment:
headers["Content-Disposition"] = (
f"attachment; filename*=UTF-8''{encoded_filename}"
)
else:
if content_type == "application/pdf" or filename.lower().endswith(
".pdf"
):
headers["Content-Disposition"] = (
f"inline; filename*=UTF-8''{encoded_filename}"
)
content_type = "application/pdf"
elif content_type != "text/plain":
headers["Content-Disposition"] = (
f"attachment; filename*=UTF-8''{encoded_filename}"
)
return FileResponse(file_path, headers=headers, media_type=content_type)
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
except Exception as e:
log.exception(e)
log.error("Error getting file content")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.DEFAULT("Error getting file content"),
)
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
@router.get("/{id}/content/html")
async def get_html_file_content_by_id(id: str, user=Depends(get_verified_user)):
file = Files.get_file_by_id(id)
if not file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
has_read_access: bool = await check_user_has_access_to_file_via_any_knowledge_base(id, "read", user)
if file.user_id == user.id or user.role == "admin" or has_read_access:
try:
file_path = Storage.get_file(file.path)
file_path = Path(file_path)
# Check if the file already exists in the cache
if file_path.is_file():
log.info(f"file_path: {file_path}")
return FileResponse(file_path)
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
except Exception as e:
log.exception(e)
log.error("Error getting file content")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.DEFAULT("Error getting file content"),
)
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
@router.get("/{id}/content/{file_name}")
async def get_file_content_by_id(id: str, user=Depends(get_verified_user)):
file = Files.get_file_by_id(id)
if not file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
has_read_access: bool = await check_user_has_access_to_file_via_any_knowledge_base(id, "read", user)
if file.user_id == user.id or user.role == "admin" or has_read_access:
file_path = file.path
# Handle Unicode filenames
filename = file.meta.get("name", file.filename)
encoded_filename = quote(filename) # RFC5987 encoding
headers = {
"Content-Disposition": f"attachment; filename*=UTF-8''{encoded_filename}"
}
if file_path:
file_path = Storage.get_file(file_path)
file_path = Path(file_path)
# Check if the file already exists in the cache
if file_path.is_file():
return FileResponse(file_path, headers=headers)
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
else:
# File path doesnt exist, return the content as .txt if possible
file_content = file.content.get("content", "")
file_name = file.filename
# Create a generator that encodes the file content
def generator():
yield file_content.encode("utf-8")
return StreamingResponse(
generator(),
media_type="text/plain",
headers=headers,
)
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
############################
# Delete File By Id
############################
@router.delete("/{id}")
async def delete_file_by_id(id: str, user=Depends(get_verified_user)):
file = Files.get_file_by_id(id)
if not file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
has_write_access: bool = await check_user_has_access_to_file_via_any_knowledge_base(id, "write", user)
if file.user_id == user.id or user.role == "admin" or has_write_access:
# We should add Chroma cleanup here
result = Files.delete_file_by_id(id)
if result:
try:
Storage.delete_file(file.path)
except Exception as e:
log.exception(e)
log.error("Error deleting files")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.DEFAULT("Error deleting files"),
)
return {"message": "File deleted successfully"}
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.DEFAULT("Error deleting file"),
)
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)