Merge pull request #34 from taylorwilsdon/filesystem_enh

feat: Implement delete confirmation system, general filesystem operation improvements
This commit is contained in:
Tim Jaeryang Baek 2025-04-17 12:41:04 -07:00 committed by GitHub
commit 134d4e6778
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 172 additions and 52 deletions

View File

@ -0,0 +1,7 @@
import os
import pathlib
# Constants
ALLOWED_DIRECTORIES = [
str(pathlib.Path(os.path.expanduser("~/tmp")).resolve())
] # 👈 Replace with your paths

View File

@ -7,13 +7,17 @@ from pydantic import BaseModel, Field
import os
import pathlib
import asyncio
from typing import List, Optional, Literal
from typing import List, Optional, Literal, Dict, Union
import difflib
import shutil
from datetime import datetime, timezone
from datetime import datetime, timezone, timedelta
import json
import secrets
from config import ALLOWED_DIRECTORIES
app = FastAPI(
title="Secure Filesystem API",
version="0.1.0",
version="0.1.1",
description="A secure file manipulation server for reading, editing, writing, listing, and searching files with access restrictions.",
)
@ -27,12 +31,6 @@ app.add_middleware(
allow_headers=["*"],
)
# Constants
ALLOWED_DIRECTORIES = [
str(pathlib.Path(os.path.expanduser("~/")).resolve())
] # 👈 Replace with your paths
# ------------------------------------------------------------------------------
# Utility functions
# ------------------------------------------------------------------------------
@ -128,8 +126,8 @@ class DeletePathRequest(BaseModel):
recursive: bool = Field(
default=False, description="If true and path is a directory, delete recursively. Required if directory is not empty."
)
confirm_delete: bool = Field(
..., description="Must be explicitly set to true to confirm deletion."
confirmation_token: Optional[str] = Field(
default=None, description="Token required for confirming deletion after initial request."
)
@ -142,6 +140,62 @@ class GetMetadataRequest(BaseModel):
path: str = Field(..., description="Path to the file or directory to get metadata for.")
# ------------------------------------------------------------------------------
# Global state for pending confirmations
# ------------------------------------------------------------------------------
# --- Confirmation Token State Management (using a file) ---
CONFIRMATION_FILE = pathlib.Path("./.pending_confirmations.json")
CONFIRMATION_TTL_SECONDS = 60 # Token validity period
def load_confirmations() -> Dict[str, Dict]:
"""Loads pending confirmations from the JSON file."""
if not CONFIRMATION_FILE.exists():
return {}
try:
with CONFIRMATION_FILE.open("r") as f:
data = json.load(f)
# Convert expiry string back to datetime object
now = datetime.now(timezone.utc)
valid_confirmations = {}
for token, details in data.items():
try:
details["expiry"] = datetime.fromisoformat(details["expiry"])
# Clean up expired tokens during load
if details["expiry"] > now:
valid_confirmations[token] = details
except (ValueError, TypeError, KeyError):
print(f"Warning: Skipping invalid confirmation data for token {token}")
continue # Skip invalid entries
return valid_confirmations
except (json.JSONDecodeError, IOError) as e:
print(f"Error loading confirmations file: {e}. Returning empty dict.")
return {}
def save_confirmations(confirmations: Dict[str, Dict]):
"""Saves pending confirmations to the JSON file."""
try:
# Convert datetime objects to ISO strings for JSON serialization
serializable_confirmations = {}
for token, details in confirmations.items():
serializable_details = details.copy()
serializable_details["expiry"] = details["expiry"].isoformat()
serializable_confirmations[token] = serializable_details
with CONFIRMATION_FILE.open("w") as f:
json.dump(serializable_confirmations, f, indent=2)
except IOError as e:
print(f"Error saving confirmations file: {e}")
# Clean up the file on startup if it exists from a previous run
if CONFIRMATION_FILE.exists():
# print("Cleaning up stale confirmation file on startup.")
try:
CONFIRMATION_FILE.unlink()
except OSError as e:
# print(f"Warning: Could not delete stale confirmation file: {e}") # Removed print
pass # Silently ignore if cleanup fails, not critical
# ------------------------------------------------------------------------------
# Routes
# ------------------------------------------------------------------------------
@ -159,6 +213,12 @@ class DiffResponse(BaseModel):
diff: str = Field(..., description="Unified diff output comparing original and modified content.")
class ConfirmationRequiredResponse(BaseModel):
message: str = Field(..., description="Message indicating confirmation is required.")
confirmation_token: str = Field(..., description="Token needed for the confirmation step.")
expires_at: datetime = Field(..., description="UTC timestamp when the token expires.")
@app.post("/read_file", response_model=ReadFileResponse, summary="Read a file") # Changed response_class to response_model
async def read_file(data: ReadFileRequest = Body(...)):
"""
@ -191,9 +251,6 @@ async def write_file(data: WriteFileRequest = Body(...)):
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to write to {data.path}: {str(e)}")
from typing import Union # Add this import at the top with other typing imports
@app.post(
"/edit_file",
response_model=Union[SuccessResponse, DiffResponse], # Use Union for multiple response types
@ -330,52 +387,108 @@ async def search_files(data: SearchFilesRequest = Body(...)):
return {"matches": results or ["No matches found"]}
@app.post("/delete_path", response_model=SuccessResponse, summary="Delete a file or directory")
@app.post(
"/delete_path",
response_model=Union[SuccessResponse, ConfirmationRequiredResponse], # Updated response model
summary="Delete a file or directory (two-step confirmation)"
)
async def delete_path(data: DeletePathRequest = Body(...)):
"""
Delete a specified file or directory. Requires explicit confirmation.
Delete a specified file or directory using a two-step confirmation process.
1. Initial request (without confirmation_token): Returns a confirmation token.
2. Confirmation request (with token): Executes the deletion if the token is valid
and matches the original request parameters (path, recursive).
Use 'recursive=True' to delete non-empty directories.
Returns JSON success message.
"""
if not data.confirm_delete:
pending_confirmations = load_confirmations() # Load state from file
path = normalize_path(data.path)
now = datetime.now(timezone.utc)
# --- Step 2: Confirmation Request ---
if data.confirmation_token:
# print(f"Attempting confirmation with token: {data.confirmation_token}") # Removed print
if data.confirmation_token not in pending_confirmations:
# print(f"Error: Token '{data.confirmation_token}' not found in pending_confirmations.") # Removed print
raise HTTPException(status_code=400, detail="Invalid or expired confirmation token.")
confirmation_data = pending_confirmations[data.confirmation_token]
# Validate token expiry
if now > confirmation_data["expiry"]:
del pending_confirmations[data.confirmation_token] # Clean up expired token
save_confirmations(pending_confirmations) # Save updated state
raise HTTPException(status_code=400, detail="Confirmation token has expired.")
# Validate request parameters match
if confirmation_data["path"] != data.path or confirmation_data["recursive"] != data.recursive:
raise HTTPException(
status_code=400,
detail="Deletion not confirmed. Set 'confirm_delete' to true to proceed."
detail="Request parameters (path, recursive) do not match the original request for this token."
)
path = normalize_path(data.path)
# --- Parameters match and token is valid: Proceed with deletion ---
del pending_confirmations[data.confirmation_token] # Consume the token
save_confirmations(pending_confirmations) # Save updated state
try:
if not path.exists():
# Path might have been deleted between requests, treat as success or specific error?
# For now, raise 404 as it doesn't exist *now*.
raise HTTPException(status_code=404, detail=f"Path not found: {data.path}")
if path.is_file():
path.unlink() # Raises FileNotFoundError if not exists, PermissionError if no permission
path.unlink()
return SuccessResponse(message=f"Successfully deleted file: {data.path}")
elif path.is_dir():
if data.recursive:
shutil.rmtree(path) # Raises FileNotFoundError, PermissionError, NotADirectoryError
shutil.rmtree(path)
return SuccessResponse(message=f"Successfully deleted directory recursively: {data.path}")
else:
try:
path.rmdir() # Raises FileNotFoundError, OSError (e.g., dir not empty, permission denied)
path.rmdir()
return SuccessResponse(message=f"Successfully deleted empty directory: {data.path}")
except OSError as e:
# Catch error if directory is not empty and recursive is false
raise HTTPException(
status_code=400,
detail=f"Directory not empty. Use 'recursive=True' to delete non-empty directories. Original error: {e}"
)
else:
# Should not happen if exists() is true and it's not file/dir, but handle defensively
raise HTTPException(status_code=400, detail=f"Path is not a file or directory: {data.path}")
except PermissionError:
raise HTTPException(status_code=403, detail=f"Permission denied to delete {data.path}")
except Exception as e:
# Catch other potential errors during deletion
raise HTTPException(status_code=500, detail=f"Failed to delete {data.path}: {e}")
# --- Step 1: Initial Request (No Token Provided) ---
else:
# Check if path exists before generating token
if not path.exists():
raise HTTPException(status_code=404, detail=f"Path not found: {data.path}")
# Generate token and expiry
token = secrets.token_hex(3)[:5] # Generate 6 hex chars (3 bytes), take first 5
expiry_time = now + timedelta(seconds=CONFIRMATION_TTL_SECONDS)
# Store confirmation details
pending_confirmations[token] = {
"path": data.path,
"recursive": data.recursive,
"expiry": expiry_time,
}
save_confirmations(pending_confirmations) # Save updated state
# Return confirmation required response
# Construct the user-friendly message
confirmation_message = f"`Confirm deletion of file: {data.path} with token {token}`"
return ConfirmationRequiredResponse(
message=confirmation_message,
confirmation_token=token,
expires_at=expiry_time,
)
@app.post("/move_path", response_model=SuccessResponse, summary="Move or rename a file or directory")
async def move_path(data: MovePathRequest = Body(...)):
@ -385,13 +498,13 @@ async def move_path(data: MovePathRequest = Body(...)):
Returns JSON success message.
"""
source = normalize_path(data.source_path)
destination = normalize_path(data.destination_path) # Also normalize destination
destination = normalize_path(data.destination_path)
try:
if not source.exists():
raise HTTPException(status_code=404, detail=f"Source path not found: {data.source_path}")
shutil.move(str(source), str(destination)) # Raises FileNotFoundError, PermissionError etc.
shutil.move(str(source), str(destination))
return SuccessResponse(message=f"Successfully moved '{data.source_path}' to '{data.destination_path}'")
except PermissionError: