re-implement pending_confirmation

This commit is contained in:
Taylor Wilsdon 2025-04-17 10:43:48 -07:00
parent 8bd8f1d712
commit 569d8905ef
2 changed files with 105 additions and 42 deletions

View File

@ -3,5 +3,5 @@ import pathlib
# Constants # Constants
ALLOWED_DIRECTORIES = [ ALLOWED_DIRECTORIES = [
str(pathlib.Path(os.path.expanduser("~/")).resolve()) str(pathlib.Path(os.path.expanduser("~/tmp")).resolve())
] # 👈 Replace with your paths ] # 👈 Replace with your paths

View File

@ -7,10 +7,11 @@ from pydantic import BaseModel, Field
import os import os
import pathlib import pathlib
import asyncio import asyncio
from typing import List, Optional, Literal from typing import List, Optional, Literal, Dict, Union # Added Dict, Union
import difflib import difflib
import shutil import shutil
from datetime import datetime, timezone from datetime import datetime, timezone, timedelta # Added timedelta
import uuid # Added uuid
from config import ALLOWED_DIRECTORIES from config import ALLOWED_DIRECTORIES
app = FastAPI( app = FastAPI(
@ -124,8 +125,8 @@ class DeletePathRequest(BaseModel):
recursive: bool = Field( recursive: bool = Field(
default=False, description="If true and path is a directory, delete recursively. Required if directory is not empty." default=False, description="If true and path is a directory, delete recursively. Required if directory is not empty."
) )
confirm_delete: bool = Field( confirmation_token: Optional[str] = Field(
..., description="Must be explicitly set to true to confirm deletion." default=None, description="Token required for confirming deletion after initial request."
) )
@ -138,6 +139,14 @@ class GetMetadataRequest(BaseModel):
path: str = Field(..., description="Path to the file or directory to get metadata for.") path: str = Field(..., description="Path to the file or directory to get metadata for.")
# ------------------------------------------------------------------------------
# Global state for pending confirmations
# ------------------------------------------------------------------------------
# Store pending confirmations: {token: {"path": str, "recursive": bool, "expiry": datetime}}
pending_confirmations: Dict[str, Dict] = {}
CONFIRMATION_TTL_SECONDS = 60 # Token validity period
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Routes # Routes
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
@ -155,6 +164,12 @@ class DiffResponse(BaseModel):
diff: str = Field(..., description="Unified diff output comparing original and modified content.") diff: str = Field(..., description="Unified diff output comparing original and modified content.")
class ConfirmationRequiredResponse(BaseModel):
message: str = Field(..., description="Message indicating confirmation is required.")
confirmation_token: str = Field(..., description="Token needed for the confirmation step.")
expires_at: datetime = Field(..., description="UTC timestamp when the token expires.")
@app.post("/read_file", response_model=ReadFileResponse, summary="Read a file") # Changed response_class to response_model @app.post("/read_file", response_model=ReadFileResponse, summary="Read a file") # Changed response_class to response_model
async def read_file(data: ReadFileRequest = Body(...)): async def read_file(data: ReadFileRequest = Body(...)):
""" """
@ -188,7 +203,7 @@ async def write_file(data: WriteFileRequest = Body(...)):
raise HTTPException(status_code=500, detail=f"Failed to write to {data.path}: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to write to {data.path}: {str(e)}")
from typing import Union # Add this import at the top with other typing imports # Removed duplicate import - already added Union at the top
@app.post( @app.post(
"/edit_file", "/edit_file",
@ -326,52 +341,100 @@ async def search_files(data: SearchFilesRequest = Body(...)):
return {"matches": results or ["No matches found"]} return {"matches": results or ["No matches found"]}
@app.post("/delete_path", response_model=SuccessResponse, summary="Delete a file or directory") @app.post(
"/delete_path",
response_model=Union[SuccessResponse, ConfirmationRequiredResponse], # Updated response model
summary="Delete a file or directory (two-step confirmation)"
)
async def delete_path(data: DeletePathRequest = Body(...)): async def delete_path(data: DeletePathRequest = Body(...)):
""" """
Delete a specified file or directory. Requires explicit confirmation. Delete a specified file or directory using a two-step confirmation process.
1. Initial request (without confirmation_token): Returns a confirmation token.
2. Confirmation request (with token): Executes the deletion if the token is valid
and matches the original request parameters (path, recursive).
Use 'recursive=True' to delete non-empty directories. Use 'recursive=True' to delete non-empty directories.
Returns JSON success message.
""" """
if not data.confirm_delete: path = normalize_path(data.path)
now = datetime.now(timezone.utc)
# --- Step 2: Confirmation Request ---
if data.confirmation_token:
if data.confirmation_token not in pending_confirmations:
raise HTTPException(status_code=400, detail="Invalid or expired confirmation token.")
confirmation_data = pending_confirmations[data.confirmation_token]
# Validate token expiry
if now > confirmation_data["expiry"]:
del pending_confirmations[data.confirmation_token] # Clean up expired token
raise HTTPException(status_code=400, detail="Confirmation token has expired.")
# Validate request parameters match
if confirmation_data["path"] != data.path or confirmation_data["recursive"] != data.recursive:
raise HTTPException( raise HTTPException(
status_code=400, status_code=400,
detail="Deletion not confirmed. Set 'confirm_delete' to true to proceed." detail="Request parameters (path, recursive) do not match the original request for this token."
) )
path = normalize_path(data.path) # --- Parameters match and token is valid: Proceed with deletion ---
del pending_confirmations[data.confirmation_token] # Consume the token
try: try:
if not path.exists(): if not path.exists():
# Path might have been deleted between requests, treat as success or specific error?
# For now, raise 404 as it doesn't exist *now*.
raise HTTPException(status_code=404, detail=f"Path not found: {data.path}") raise HTTPException(status_code=404, detail=f"Path not found: {data.path}")
if path.is_file(): if path.is_file():
path.unlink() # Raises FileNotFoundError if not exists, PermissionError if no permission path.unlink()
return SuccessResponse(message=f"Successfully deleted file: {data.path}") return SuccessResponse(message=f"Successfully deleted file: {data.path}")
elif path.is_dir(): elif path.is_dir():
if data.recursive: if data.recursive:
shutil.rmtree(path) # Raises FileNotFoundError, PermissionError, NotADirectoryError shutil.rmtree(path)
return SuccessResponse(message=f"Successfully deleted directory recursively: {data.path}") return SuccessResponse(message=f"Successfully deleted directory recursively: {data.path}")
else: else:
try: try:
path.rmdir() # Raises FileNotFoundError, OSError (e.g., dir not empty, permission denied) path.rmdir()
return SuccessResponse(message=f"Successfully deleted empty directory: {data.path}") return SuccessResponse(message=f"Successfully deleted empty directory: {data.path}")
except OSError as e: except OSError as e:
# Catch error if directory is not empty and recursive is false
raise HTTPException( raise HTTPException(
status_code=400, status_code=400,
detail=f"Directory not empty. Use 'recursive=True' to delete non-empty directories. Original error: {e}" detail=f"Directory not empty. Use 'recursive=True' to delete non-empty directories. Original error: {e}"
) )
else: else:
# Should not happen if exists() is true and it's not file/dir, but handle defensively
raise HTTPException(status_code=400, detail=f"Path is not a file or directory: {data.path}") raise HTTPException(status_code=400, detail=f"Path is not a file or directory: {data.path}")
except PermissionError: except PermissionError:
raise HTTPException(status_code=403, detail=f"Permission denied to delete {data.path}") raise HTTPException(status_code=403, detail=f"Permission denied to delete {data.path}")
except Exception as e: except Exception as e:
# Catch other potential errors during deletion
raise HTTPException(status_code=500, detail=f"Failed to delete {data.path}: {e}") raise HTTPException(status_code=500, detail=f"Failed to delete {data.path}: {e}")
# --- Step 1: Initial Request (No Token Provided) ---
else:
# Check if path exists before generating token
if not path.exists():
raise HTTPException(status_code=404, detail=f"Path not found: {data.path}")
# Generate token and expiry
token = uuid.uuid4().hex
expiry_time = now + timedelta(seconds=CONFIRMATION_TTL_SECONDS)
# Store confirmation details
pending_confirmations[token] = {
"path": data.path,
"recursive": data.recursive,
"expiry": expiry_time,
}
# Return confirmation required response
return ConfirmationRequiredResponse(
message="Confirmation required to delete path. Use the provided token in a subsequent request with the same parameters.",
confirmation_token=token,
expires_at=expiry_time,
)
@app.post("/move_path", response_model=SuccessResponse, summary="Move or rename a file or directory") @app.post("/move_path", response_model=SuccessResponse, summary="Move or rename a file or directory")
async def move_path(data: MovePathRequest = Body(...)): async def move_path(data: MovePathRequest = Body(...)):