From 569d8905ef2b8adecd8debc8553820c7b1ce9926 Mon Sep 17 00:00:00 2001 From: Taylor Wilsdon Date: Thu, 17 Apr 2025 10:43:48 -0700 Subject: [PATCH] re-implement pending_confirmation --- servers/filesystem/config.py | 2 +- servers/filesystem/main.py | 145 +++++++++++++++++++++++++---------- 2 files changed, 105 insertions(+), 42 deletions(-) diff --git a/servers/filesystem/config.py b/servers/filesystem/config.py index b104feb..4d7f53e 100644 --- a/servers/filesystem/config.py +++ b/servers/filesystem/config.py @@ -3,5 +3,5 @@ import pathlib # Constants ALLOWED_DIRECTORIES = [ - str(pathlib.Path(os.path.expanduser("~/")).resolve()) + str(pathlib.Path(os.path.expanduser("~/tmp")).resolve()) ] # 👈 Replace with your paths \ No newline at end of file diff --git a/servers/filesystem/main.py b/servers/filesystem/main.py index 5464123..62facd2 100644 --- a/servers/filesystem/main.py +++ b/servers/filesystem/main.py @@ -7,10 +7,11 @@ from pydantic import BaseModel, Field import os import pathlib import asyncio -from typing import List, Optional, Literal +from typing import List, Optional, Literal, Dict, Union # Added Dict, Union import difflib import shutil -from datetime import datetime, timezone +from datetime import datetime, timezone, timedelta # Added timedelta +import uuid # Added uuid from config import ALLOWED_DIRECTORIES app = FastAPI( @@ -124,8 +125,8 @@ class DeletePathRequest(BaseModel): recursive: bool = Field( default=False, description="If true and path is a directory, delete recursively. Required if directory is not empty." ) - confirm_delete: bool = Field( - ..., description="Must be explicitly set to true to confirm deletion." + confirmation_token: Optional[str] = Field( + default=None, description="Token required for confirming deletion after initial request." ) @@ -138,6 +139,14 @@ class GetMetadataRequest(BaseModel): path: str = Field(..., description="Path to the file or directory to get metadata for.") +# ------------------------------------------------------------------------------ +# Global state for pending confirmations +# ------------------------------------------------------------------------------ + +# Store pending confirmations: {token: {"path": str, "recursive": bool, "expiry": datetime}} +pending_confirmations: Dict[str, Dict] = {} +CONFIRMATION_TTL_SECONDS = 60 # Token validity period + # ------------------------------------------------------------------------------ # Routes # ------------------------------------------------------------------------------ @@ -155,6 +164,12 @@ class DiffResponse(BaseModel): diff: str = Field(..., description="Unified diff output comparing original and modified content.") +class ConfirmationRequiredResponse(BaseModel): + message: str = Field(..., description="Message indicating confirmation is required.") + confirmation_token: str = Field(..., description="Token needed for the confirmation step.") + expires_at: datetime = Field(..., description="UTC timestamp when the token expires.") + + @app.post("/read_file", response_model=ReadFileResponse, summary="Read a file") # Changed response_class to response_model async def read_file(data: ReadFileRequest = Body(...)): """ @@ -188,7 +203,7 @@ async def write_file(data: WriteFileRequest = Body(...)): raise HTTPException(status_code=500, detail=f"Failed to write to {data.path}: {str(e)}") -from typing import Union # Add this import at the top with other typing imports +# Removed duplicate import - already added Union at the top @app.post( "/edit_file", @@ -326,51 +341,99 @@ async def search_files(data: SearchFilesRequest = Body(...)): return {"matches": results or ["No matches found"]} -@app.post("/delete_path", response_model=SuccessResponse, summary="Delete a file or directory") +@app.post( + "/delete_path", + response_model=Union[SuccessResponse, ConfirmationRequiredResponse], # Updated response model + summary="Delete a file or directory (two-step confirmation)" +) async def delete_path(data: DeletePathRequest = Body(...)): """ - Delete a specified file or directory. Requires explicit confirmation. + Delete a specified file or directory using a two-step confirmation process. + + 1. Initial request (without confirmation_token): Returns a confirmation token. + 2. Confirmation request (with token): Executes the deletion if the token is valid + and matches the original request parameters (path, recursive). + Use 'recursive=True' to delete non-empty directories. - Returns JSON success message. """ - if not data.confirm_delete: - raise HTTPException( - status_code=400, - detail="Deletion not confirmed. Set 'confirm_delete' to true to proceed." - ) - path = normalize_path(data.path) + now = datetime.now(timezone.utc) - try: - if not path.exists(): - raise HTTPException(status_code=404, detail=f"Path not found: {data.path}") + # --- Step 2: Confirmation Request --- + if data.confirmation_token: + if data.confirmation_token not in pending_confirmations: + raise HTTPException(status_code=400, detail="Invalid or expired confirmation token.") - if path.is_file(): - path.unlink() # Raises FileNotFoundError if not exists, PermissionError if no permission - return SuccessResponse(message=f"Successfully deleted file: {data.path}") - elif path.is_dir(): - if data.recursive: - shutil.rmtree(path) # Raises FileNotFoundError, PermissionError, NotADirectoryError - return SuccessResponse(message=f"Successfully deleted directory recursively: {data.path}") + confirmation_data = pending_confirmations[data.confirmation_token] + + # Validate token expiry + if now > confirmation_data["expiry"]: + del pending_confirmations[data.confirmation_token] # Clean up expired token + raise HTTPException(status_code=400, detail="Confirmation token has expired.") + + # Validate request parameters match + if confirmation_data["path"] != data.path or confirmation_data["recursive"] != data.recursive: + raise HTTPException( + status_code=400, + detail="Request parameters (path, recursive) do not match the original request for this token." + ) + + # --- Parameters match and token is valid: Proceed with deletion --- + del pending_confirmations[data.confirmation_token] # Consume the token + + try: + if not path.exists(): + # Path might have been deleted between requests, treat as success or specific error? + # For now, raise 404 as it doesn't exist *now*. + raise HTTPException(status_code=404, detail=f"Path not found: {data.path}") + + if path.is_file(): + path.unlink() + return SuccessResponse(message=f"Successfully deleted file: {data.path}") + elif path.is_dir(): + if data.recursive: + shutil.rmtree(path) + return SuccessResponse(message=f"Successfully deleted directory recursively: {data.path}") + else: + try: + path.rmdir() + return SuccessResponse(message=f"Successfully deleted empty directory: {data.path}") + except OSError as e: + raise HTTPException( + status_code=400, + detail=f"Directory not empty. Use 'recursive=True' to delete non-empty directories. Original error: {e}" + ) else: - try: - path.rmdir() # Raises FileNotFoundError, OSError (e.g., dir not empty, permission denied) - return SuccessResponse(message=f"Successfully deleted empty directory: {data.path}") - except OSError as e: - # Catch error if directory is not empty and recursive is false - raise HTTPException( - status_code=400, - detail=f"Directory not empty. Use 'recursive=True' to delete non-empty directories. Original error: {e}" - ) - else: - # Should not happen if exists() is true and it's not file/dir, but handle defensively - raise HTTPException(status_code=400, detail=f"Path is not a file or directory: {data.path}") + raise HTTPException(status_code=400, detail=f"Path is not a file or directory: {data.path}") - except PermissionError: - raise HTTPException(status_code=403, detail=f"Permission denied to delete {data.path}") - except Exception as e: - # Catch other potential errors during deletion - raise HTTPException(status_code=500, detail=f"Failed to delete {data.path}: {e}") + except PermissionError: + raise HTTPException(status_code=403, detail=f"Permission denied to delete {data.path}") + except Exception as e: + raise HTTPException(status_code=500, detail=f"Failed to delete {data.path}: {e}") + + # --- Step 1: Initial Request (No Token Provided) --- + else: + # Check if path exists before generating token + if not path.exists(): + raise HTTPException(status_code=404, detail=f"Path not found: {data.path}") + + # Generate token and expiry + token = uuid.uuid4().hex + expiry_time = now + timedelta(seconds=CONFIRMATION_TTL_SECONDS) + + # Store confirmation details + pending_confirmations[token] = { + "path": data.path, + "recursive": data.recursive, + "expiry": expiry_time, + } + + # Return confirmation required response + return ConfirmationRequiredResponse( + message="Confirmation required to delete path. Use the provided token in a subsequent request with the same parameters.", + confirmation_token=token, + expires_at=expiry_time, + ) @app.post("/move_path", response_model=SuccessResponse, summary="Move or rename a file or directory")