diff --git a/servers/filesystem/config.py b/servers/filesystem/config.py new file mode 100644 index 0000000..4d7f53e --- /dev/null +++ b/servers/filesystem/config.py @@ -0,0 +1,7 @@ +import os +import pathlib + +# Constants +ALLOWED_DIRECTORIES = [ + str(pathlib.Path(os.path.expanduser("~/tmp")).resolve()) +] # 👈 Replace with your paths \ No newline at end of file diff --git a/servers/filesystem/main.py b/servers/filesystem/main.py index d48a755..3393b92 100644 --- a/servers/filesystem/main.py +++ b/servers/filesystem/main.py @@ -7,13 +7,17 @@ from pydantic import BaseModel, Field import os import pathlib import asyncio -from typing import List, Optional, Literal +from typing import List, Optional, Literal, Dict, Union import difflib import shutil -from datetime import datetime, timezone +from datetime import datetime, timezone, timedelta +import json +import secrets +from config import ALLOWED_DIRECTORIES + app = FastAPI( title="Secure Filesystem API", - version="0.1.0", + version="0.1.1", description="A secure file manipulation server for reading, editing, writing, listing, and searching files with access restrictions.", ) @@ -27,12 +31,6 @@ app.add_middleware( allow_headers=["*"], ) - -# Constants -ALLOWED_DIRECTORIES = [ - str(pathlib.Path(os.path.expanduser("~/")).resolve()) -] # 👈 Replace with your paths - # ------------------------------------------------------------------------------ # Utility functions # ------------------------------------------------------------------------------ @@ -128,8 +126,8 @@ class DeletePathRequest(BaseModel): recursive: bool = Field( default=False, description="If true and path is a directory, delete recursively. Required if directory is not empty." ) - confirm_delete: bool = Field( - ..., description="Must be explicitly set to true to confirm deletion." + confirmation_token: Optional[str] = Field( + default=None, description="Token required for confirming deletion after initial request." ) @@ -142,6 +140,62 @@ class GetMetadataRequest(BaseModel): path: str = Field(..., description="Path to the file or directory to get metadata for.") +# ------------------------------------------------------------------------------ +# Global state for pending confirmations +# ------------------------------------------------------------------------------ + +# --- Confirmation Token State Management (using a file) --- +CONFIRMATION_FILE = pathlib.Path("./.pending_confirmations.json") +CONFIRMATION_TTL_SECONDS = 60 # Token validity period + +def load_confirmations() -> Dict[str, Dict]: + """Loads pending confirmations from the JSON file.""" + if not CONFIRMATION_FILE.exists(): + return {} + try: + with CONFIRMATION_FILE.open("r") as f: + data = json.load(f) + # Convert expiry string back to datetime object + now = datetime.now(timezone.utc) + valid_confirmations = {} + for token, details in data.items(): + try: + details["expiry"] = datetime.fromisoformat(details["expiry"]) + # Clean up expired tokens during load + if details["expiry"] > now: + valid_confirmations[token] = details + except (ValueError, TypeError, KeyError): + print(f"Warning: Skipping invalid confirmation data for token {token}") + continue # Skip invalid entries + return valid_confirmations + except (json.JSONDecodeError, IOError) as e: + print(f"Error loading confirmations file: {e}. Returning empty dict.") + return {} + +def save_confirmations(confirmations: Dict[str, Dict]): + """Saves pending confirmations to the JSON file.""" + try: + # Convert datetime objects to ISO strings for JSON serialization + serializable_confirmations = {} + for token, details in confirmations.items(): + serializable_details = details.copy() + serializable_details["expiry"] = details["expiry"].isoformat() + serializable_confirmations[token] = serializable_details + + with CONFIRMATION_FILE.open("w") as f: + json.dump(serializable_confirmations, f, indent=2) + except IOError as e: + print(f"Error saving confirmations file: {e}") + +# Clean up the file on startup if it exists from a previous run +if CONFIRMATION_FILE.exists(): + # print("Cleaning up stale confirmation file on startup.") + try: + CONFIRMATION_FILE.unlink() + except OSError as e: + # print(f"Warning: Could not delete stale confirmation file: {e}") # Removed print + pass # Silently ignore if cleanup fails, not critical + # ------------------------------------------------------------------------------ # Routes # ------------------------------------------------------------------------------ @@ -159,6 +213,12 @@ class DiffResponse(BaseModel): diff: str = Field(..., description="Unified diff output comparing original and modified content.") +class ConfirmationRequiredResponse(BaseModel): + message: str = Field(..., description="Message indicating confirmation is required.") + confirmation_token: str = Field(..., description="Token needed for the confirmation step.") + expires_at: datetime = Field(..., description="UTC timestamp when the token expires.") + + @app.post("/read_file", response_model=ReadFileResponse, summary="Read a file") # Changed response_class to response_model async def read_file(data: ReadFileRequest = Body(...)): """ @@ -191,9 +251,6 @@ async def write_file(data: WriteFileRequest = Body(...)): except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to write to {data.path}: {str(e)}") - -from typing import Union # Add this import at the top with other typing imports - @app.post( "/edit_file", response_model=Union[SuccessResponse, DiffResponse], # Use Union for multiple response types @@ -330,51 +387,107 @@ async def search_files(data: SearchFilesRequest = Body(...)): return {"matches": results or ["No matches found"]} -@app.post("/delete_path", response_model=SuccessResponse, summary="Delete a file or directory") +@app.post( + "/delete_path", + response_model=Union[SuccessResponse, ConfirmationRequiredResponse], # Updated response model + summary="Delete a file or directory (two-step confirmation)" +) async def delete_path(data: DeletePathRequest = Body(...)): """ - Delete a specified file or directory. Requires explicit confirmation. + Delete a specified file or directory using a two-step confirmation process. + + 1. Initial request (without confirmation_token): Returns a confirmation token. + 2. Confirmation request (with token): Executes the deletion if the token is valid + and matches the original request parameters (path, recursive). + Use 'recursive=True' to delete non-empty directories. - Returns JSON success message. """ - if not data.confirm_delete: - raise HTTPException( - status_code=400, - detail="Deletion not confirmed. Set 'confirm_delete' to true to proceed." - ) - + pending_confirmations = load_confirmations() # Load state from file path = normalize_path(data.path) + now = datetime.now(timezone.utc) - try: - if not path.exists(): - raise HTTPException(status_code=404, detail=f"Path not found: {data.path}") + # --- Step 2: Confirmation Request --- + if data.confirmation_token: + # print(f"Attempting confirmation with token: {data.confirmation_token}") # Removed print + if data.confirmation_token not in pending_confirmations: + # print(f"Error: Token '{data.confirmation_token}' not found in pending_confirmations.") # Removed print + raise HTTPException(status_code=400, detail="Invalid or expired confirmation token.") - if path.is_file(): - path.unlink() # Raises FileNotFoundError if not exists, PermissionError if no permission - return SuccessResponse(message=f"Successfully deleted file: {data.path}") - elif path.is_dir(): - if data.recursive: - shutil.rmtree(path) # Raises FileNotFoundError, PermissionError, NotADirectoryError - return SuccessResponse(message=f"Successfully deleted directory recursively: {data.path}") + confirmation_data = pending_confirmations[data.confirmation_token] + + # Validate token expiry + if now > confirmation_data["expiry"]: + del pending_confirmations[data.confirmation_token] # Clean up expired token + save_confirmations(pending_confirmations) # Save updated state + raise HTTPException(status_code=400, detail="Confirmation token has expired.") + + # Validate request parameters match + if confirmation_data["path"] != data.path or confirmation_data["recursive"] != data.recursive: + raise HTTPException( + status_code=400, + detail="Request parameters (path, recursive) do not match the original request for this token." + ) + + # --- Parameters match and token is valid: Proceed with deletion --- + del pending_confirmations[data.confirmation_token] # Consume the token + save_confirmations(pending_confirmations) # Save updated state + + try: + if not path.exists(): + # Path might have been deleted between requests, treat as success or specific error? + # For now, raise 404 as it doesn't exist *now*. + raise HTTPException(status_code=404, detail=f"Path not found: {data.path}") + + if path.is_file(): + path.unlink() + return SuccessResponse(message=f"Successfully deleted file: {data.path}") + elif path.is_dir(): + if data.recursive: + shutil.rmtree(path) + return SuccessResponse(message=f"Successfully deleted directory recursively: {data.path}") + else: + try: + path.rmdir() + return SuccessResponse(message=f"Successfully deleted empty directory: {data.path}") + except OSError as e: + raise HTTPException( + status_code=400, + detail=f"Directory not empty. Use 'recursive=True' to delete non-empty directories. Original error: {e}" + ) else: - try: - path.rmdir() # Raises FileNotFoundError, OSError (e.g., dir not empty, permission denied) - return SuccessResponse(message=f"Successfully deleted empty directory: {data.path}") - except OSError as e: - # Catch error if directory is not empty and recursive is false - raise HTTPException( - status_code=400, - detail=f"Directory not empty. Use 'recursive=True' to delete non-empty directories. Original error: {e}" - ) - else: - # Should not happen if exists() is true and it's not file/dir, but handle defensively - raise HTTPException(status_code=400, detail=f"Path is not a file or directory: {data.path}") + raise HTTPException(status_code=400, detail=f"Path is not a file or directory: {data.path}") - except PermissionError: - raise HTTPException(status_code=403, detail=f"Permission denied to delete {data.path}") - except Exception as e: - # Catch other potential errors during deletion - raise HTTPException(status_code=500, detail=f"Failed to delete {data.path}: {e}") + except PermissionError: + raise HTTPException(status_code=403, detail=f"Permission denied to delete {data.path}") + except Exception as e: + raise HTTPException(status_code=500, detail=f"Failed to delete {data.path}: {e}") + + # --- Step 1: Initial Request (No Token Provided) --- + else: + # Check if path exists before generating token + if not path.exists(): + raise HTTPException(status_code=404, detail=f"Path not found: {data.path}") + + # Generate token and expiry + token = secrets.token_hex(3)[:5] # Generate 6 hex chars (3 bytes), take first 5 + expiry_time = now + timedelta(seconds=CONFIRMATION_TTL_SECONDS) + + # Store confirmation details + pending_confirmations[token] = { + "path": data.path, + "recursive": data.recursive, + "expiry": expiry_time, + } + save_confirmations(pending_confirmations) # Save updated state + + # Return confirmation required response + # Construct the user-friendly message + confirmation_message = f"`Confirm deletion of file: {data.path} with token {token}`" + return ConfirmationRequiredResponse( + message=confirmation_message, + confirmation_token=token, + expires_at=expiry_time, + ) @app.post("/move_path", response_model=SuccessResponse, summary="Move or rename a file or directory") @@ -385,13 +498,13 @@ async def move_path(data: MovePathRequest = Body(...)): Returns JSON success message. """ source = normalize_path(data.source_path) - destination = normalize_path(data.destination_path) # Also normalize destination + destination = normalize_path(data.destination_path) try: if not source.exists(): raise HTTPException(status_code=404, detail=f"Source path not found: {data.source_path}") - shutil.move(str(source), str(destination)) # Raises FileNotFoundError, PermissionError etc. + shutil.move(str(source), str(destination)) return SuccessResponse(message=f"Successfully moved '{data.source_path}' to '{data.destination_path}'") except PermissionError: