Add search_content, delete_path, move_path and get_metadata routes, return available directories on 403s

This commit is contained in:
Taylor Wilsdon 2025-04-06 13:42:59 -04:00
parent d837001b5d
commit c0bb3350fb

View File

@ -9,7 +9,8 @@ import pathlib
import asyncio
from typing import List, Optional, Literal
import difflib
import shutil
from datetime import datetime, timezone
app = FastAPI(
title="Secure Filesystem API",
version="0.1.0",
@ -44,7 +45,12 @@ def normalize_path(requested_path: str) -> pathlib.Path:
return requested
raise HTTPException(
status_code=403,
detail=f"Access denied: {requested} is outside allowed directories.",
detail={
"error": "Access Denied",
"requested_path": str(requested),
"message": "Requested path is outside allowed directories.",
"allowed_directories": ALLOWED_DIRECTORIES,
},
)
@ -106,6 +112,36 @@ class SearchFilesRequest(BaseModel):
)
class SearchContentRequest(BaseModel):
path: str = Field(..., description="Base directory to search within.")
search_query: str = Field(..., description="Text content to search for (case-insensitive).")
recursive: bool = Field(
default=True, description="Whether to search recursively in subdirectories."
)
file_pattern: Optional[str] = Field(
default="*", description="Glob pattern to filter files to search within (e.g., '*.py')."
)
class DeletePathRequest(BaseModel):
path: str = Field(..., description="Path to the file or directory to delete.")
recursive: bool = Field(
default=False, description="If true and path is a directory, delete recursively. Required if directory is not empty."
)
confirm_delete: bool = Field(
..., description="Must be explicitly set to true to confirm deletion."
)
class MovePathRequest(BaseModel):
source_path: str = Field(..., description="The current path of the file or directory.")
destination_path: str = Field(..., description="The new path for the file or directory.")
class GetMetadataRequest(BaseModel):
path: str = Field(..., description="Path to the file or directory to get metadata for.")
# ------------------------------------------------------------------------------
# Routes
# ------------------------------------------------------------------------------
@ -252,6 +288,157 @@ async def search_files(data: SearchFilesRequest = Body(...)):
return {"matches": results or ["No matches found"]}
@app.post("/delete_path", response_class=PlainTextResponse, summary="Delete a file or directory")
async def delete_path(data: DeletePathRequest = Body(...)):
"""
Delete a specified file or directory. Requires explicit confirmation.
Use 'recursive=True' to delete non-empty directories.
"""
if not data.confirm_delete:
raise HTTPException(
status_code=400,
detail="Deletion not confirmed. Set 'confirm_delete' to true to proceed."
)
path = normalize_path(data.path)
try:
if not path.exists():
raise HTTPException(status_code=404, detail=f"Path not found: {data.path}")
if path.is_file():
path.unlink()
return f"Successfully deleted file: {data.path}"
elif path.is_dir():
if data.recursive:
shutil.rmtree(path)
return f"Successfully deleted directory recursively: {data.path}"
else:
try:
path.rmdir() # Only works for empty directories
return f"Successfully deleted empty directory: {data.path}"
except OSError as e:
# Catch error if directory is not empty and recursive is false
raise HTTPException(
status_code=400,
detail=f"Directory not empty. Use 'recursive=True' to delete non-empty directories. Original error: {e}"
)
else:
# Should not happen if exists() is true and it's not file/dir, but handle defensively
raise HTTPException(status_code=400, detail=f"Path is not a file or directory: {data.path}")
except PermissionError:
raise HTTPException(status_code=403, detail=f"Permission denied to delete {data.path}")
except Exception as e:
# Catch other potential errors during deletion
raise HTTPException(status_code=500, detail=f"Failed to delete {data.path}: {e}")
@app.post("/move_path", response_class=PlainTextResponse, summary="Move or rename a file or directory")
async def move_path(data: MovePathRequest = Body(...)):
"""
Move or rename a file or directory from source_path to destination_path.
Both paths must be within the allowed directories.
"""
source = normalize_path(data.source_path)
destination = normalize_path(data.destination_path) # Also normalize destination
try:
if not source.exists():
raise HTTPException(status_code=404, detail=f"Source path not found: {data.source_path}")
shutil.move(str(source), str(destination))
return f"Successfully moved '{data.source_path}' to '{data.destination_path}'"
except PermissionError:
raise HTTPException(status_code=403, detail=f"Permission denied for move operation involving '{data.source_path}' or '{data.destination_path}'")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to move '{data.source_path}' to '{data.destination_path}': {e}")
@app.post("/get_metadata", summary="Get file or directory metadata")
async def get_metadata(data: GetMetadataRequest = Body(...)):
"""
Retrieve metadata for a specified file or directory path.
"""
path = normalize_path(data.path)
try:
if not path.exists():
raise HTTPException(status_code=404, detail=f"Path not found: {data.path}")
stat_result = path.stat()
# Determine type
if path.is_file():
file_type = "file"
elif path.is_dir():
file_type = "directory"
else:
file_type = "other" # Should generally not happen for existing paths normalized
# Format timestamps (use UTC for consistency)
mod_time = datetime.fromtimestamp(stat_result.st_mtime, tz=timezone.utc).isoformat()
# Creation time (st_birthtime) is macOS/BSD specific, st_ctime is metadata change time on Linux
# Use st_ctime as a fallback if st_birthtime isn't available
try:
create_time = datetime.fromtimestamp(stat_result.st_birthtime, tz=timezone.utc).isoformat()
except AttributeError:
create_time = datetime.fromtimestamp(stat_result.st_ctime, tz=timezone.utc).isoformat()
metadata = {
"path": str(path),
"type": file_type,
"size_bytes": stat_result.st_size,
"modification_time_utc": mod_time,
"creation_time_utc": create_time, # Note platform differences in definition
"last_metadata_change_time_utc": datetime.fromtimestamp(stat_result.st_ctime, tz=timezone.utc).isoformat(),
}
return metadata
except PermissionError:
raise HTTPException(status_code=403, detail=f"Permission denied to access metadata for {data.path}")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to get metadata for {data.path}: {e}")
@app.post("/search_content", summary="Search for content within files")
async def search_content(data: SearchContentRequest = Body(...)):
"""
Search for text content within files in a specified directory.
"""
base_path = normalize_path(data.path)
results = []
search_query_lower = data.search_query.lower()
if not base_path.is_dir():
raise HTTPException(status_code=400, detail="Provided path is not a directory")
iterator = base_path.rglob(data.file_pattern) if data.recursive else base_path.glob(data.file_pattern)
for item_path in iterator:
if item_path.is_file():
try:
# Read file line by line to handle potentially large files and different encodings
with item_path.open("r", encoding="utf-8", errors="ignore") as f:
for line_num, line in enumerate(f, 1):
if search_query_lower in line.lower():
results.append(
{
"file_path": str(item_path),
"line_number": line_num,
"line_content": line.strip(),
}
)
except Exception as e:
# Log or handle files that cannot be read (e.g., permission errors, binary files)
print(f"Could not read or search file {item_path}: {e}")
continue
return {"matches": results or ["No matches found"]}
@app.get("/list_allowed_directories", summary="List access-permitted directories")
async def list_allowed_directories():
"""