diff --git a/servers/filesystem/main.py b/servers/filesystem/main.py index 5d56297..06134cf 100644 --- a/servers/filesystem/main.py +++ b/servers/filesystem/main.py @@ -7,12 +7,11 @@ from pydantic import BaseModel, Field import os import pathlib import asyncio -from typing import List, Optional, Union, Dict, Tuple +from typing import List, Optional, Literal import difflib import shutil -from datetime import datetime, timezone, timedelta -import uuid -from config import ALLOWED_DIRECTORIES +from datetime import datetime, timezone +from .config import ALLOWED_DIRECTORIES app = FastAPI( title="Secure Filesystem API", version="0.1.0", @@ -29,10 +28,6 @@ app.add_middleware( allow_headers=["*"], ) -# In-memory store for pending confirmations. Use Redis/DB in production. -# Format: {token: (DeletePathRequest_data, expiry_datetime)} -pending_confirmations: Dict[str, Tuple['DeletePathRequest', datetime]] = {} - # ------------------------------------------------------------------------------ # Utility functions # ------------------------------------------------------------------------------ @@ -41,7 +36,7 @@ pending_confirmations: Dict[str, Tuple['DeletePathRequest', datetime]] = {} def normalize_path(requested_path: str) -> pathlib.Path: requested = pathlib.Path(os.path.expanduser(requested_path)).resolve() for allowed in ALLOWED_DIRECTORIES: - if str(requested).lower().startswith(allowed.lower()): # Case-insensitive path comparison + if str(requested).lower().startswith(allowed.lower()): # Case-insensitive check return requested raise HTTPException( status_code=403, @@ -128,9 +123,8 @@ class DeletePathRequest(BaseModel): recursive: bool = Field( default=False, description="If true and path is a directory, delete recursively. Required if directory is not empty." ) - # confirmation_token replaces the old confirm_delete flag - confirmation_token: Optional[str] = Field( - default=None, description="A confirmation token obtained from a prior request, required to execute deletion." + confirm_delete: bool = Field( + ..., description="Must be explicitly set to true to confirm deletion." ) @@ -151,12 +145,6 @@ class GetMetadataRequest(BaseModel): class SuccessResponse(BaseModel): message: str = Field(..., description="Success message indicating the operation was completed.") -# Response model for when confirmation is required for deletion -class ConfirmationRequiredResponse(BaseModel): - message: str = Field(default="Confirmation required to proceed with deletion.") - confirmation_token: str = Field(..., description="Token needed for the confirmation step.") - expires_at: str = Field(..., description="UTC timestamp when the token expires (ISO format).") - class ReadFileResponse(BaseModel): content: str = Field(..., description="UTF-8 encoded text content of the file.") @@ -166,7 +154,7 @@ class DiffResponse(BaseModel): diff: str = Field(..., description="Unified diff output comparing original and modified content.") -@app.post("/read_file", response_model=ReadFileResponse, summary="Read a file") +@app.post("/read_file", response_model=ReadFileResponse, summary="Read a file") # Changed response_class to response_model async def read_file(data: ReadFileRequest = Body(...)): """ Read the entire contents of a file and return as JSON. @@ -174,13 +162,13 @@ async def read_file(data: ReadFileRequest = Body(...)): path = normalize_path(data.path) try: file_content = path.read_text(encoding="utf-8") - return ReadFileResponse(content=file_content) + return ReadFileResponse(content=file_content) # Return Pydantic model instance except FileNotFoundError: raise HTTPException(status_code=404, detail=f"File not found: {data.path}") except PermissionError: raise HTTPException(status_code=403, detail=f"Permission denied for file: {data.path}") except Exception as e: - # Catch potential read errors + # More specific error for generic read issues raise HTTPException(status_code=500, detail=f"Failed to read file {data.path}: {str(e)}") @@ -199,11 +187,11 @@ async def write_file(data: WriteFileRequest = Body(...)): raise HTTPException(status_code=500, detail=f"Failed to write to {data.path}: {str(e)}") -# Note: Union was already imported earlier +from typing import Union # Add this import at the top with other typing imports @app.post( "/edit_file", - response_model=Union[SuccessResponse, DiffResponse], + response_model=Union[SuccessResponse, DiffResponse], # Use Union for multiple response types summary="Edit a file with diff" ) async def edit_file(data: EditFileRequest = Body(...)): @@ -238,16 +226,16 @@ async def edit_file(data: EditFileRequest = Body(...)): fromfile=f"a/{data.path}", tofile=f"b/{data.path}", ) - return DiffResponse(diff="".join(diff_output)) + return DiffResponse(diff="".join(diff_output)) # Return JSON diff - # Apply changes if not a dry run + # Write changes if not dry run path.write_text(modified, encoding="utf-8") - return SuccessResponse(message=f"Successfully edited file {data.path}") + return SuccessResponse(message=f"Successfully edited file {data.path}") # Return JSON success except PermissionError: raise HTTPException(status_code=403, detail=f"Permission denied to write edited file: {data.path}") except Exception as e: - # Catch potential write errors after modification + # Catch errors during writing the modified file raise HTTPException(status_code=500, detail=f"Failed to write edited file {data.path}: {str(e)}") @@ -284,7 +272,7 @@ async def list_directory(data: ListDirectoryRequest = Body(...)): entry_type = "directory" if entry.is_dir() else "file" listing.append({"name": entry.name, "type": entry_type}) - # FastAPI automatically serializes the list to JSON + # Return the list directly, FastAPI will serialize it to JSON return listing @@ -320,7 +308,7 @@ async def search_files(data: SearchFilesRequest = Body(...)): for root, dirs, files in os.walk(base_path): root_path = pathlib.Path(root) - # Check if the current root directory should be excluded + # Apply exclusion patterns excluded = False for pattern in data.excludePatterns: if pathlib.Path(root).match(pattern): @@ -337,100 +325,51 @@ async def search_files(data: SearchFilesRequest = Body(...)): return {"matches": results or ["No matches found"]} -@app.post( - "/delete_path", - response_model=Union[SuccessResponse, ConfirmationRequiredResponse], - summary="Delete a file or directory (two-step confirmation)" -) +@app.post("/delete_path", response_model=SuccessResponse, summary="Delete a file or directory") async def delete_path(data: DeletePathRequest = Body(...)): """ - Initiate or confirm deletion of a file or directory. - - 1. **First Request:** Call without `confirmation_token`. Returns a `confirmation_token`. - 2. **Second Request:** Call again with the *exact same* `path` and `recursive` parameters, - plus the `confirmation_token` received from the first request. - - Use `recursive=True` to delete non-empty directories (requires confirmation). - Returns `ConfirmationRequiredResponse` on first step, `SuccessResponse` on completion. + Delete a specified file or directory. Requires explicit confirmation. + Use 'recursive=True' to delete non-empty directories. + Returns JSON success message. """ - path = normalize_path(data.path) - token = data.confirmation_token - - if token is None: - # Step 1: No token provided, generate one and request confirmation - if not path.exists(): - # Check existence before generating a token - raise HTTPException(status_code=404, detail=f"Path not found: {data.path}") - - confirmation_token = str(uuid.uuid4()) - expiry_time = datetime.now(timezone.utc) + timedelta(minutes=2) # Token valid for 2 minutes - - # Store the original request data and expiry time associated with the token - # Important: Store data *before* normalization for validation later - pending_confirmations[confirmation_token] = (data, expiry_time) - - return ConfirmationRequiredResponse( - confirmation_token=confirmation_token, - expires_at=expiry_time.isoformat() + if not data.confirm_delete: + raise HTTPException( + status_code=400, + detail="Deletion not confirmed. Set 'confirm_delete' to true to proceed." ) - else: - # Step 2: Token provided, validate and attempt deletion - if token not in pending_confirmations: - raise HTTPException(status_code=400, detail="Invalid or expired confirmation token.") + path = normalize_path(data.path) - stored_request_data, expiry_time = pending_confirmations[token] + try: + if not path.exists(): + raise HTTPException(status_code=404, detail=f"Path not found: {data.path}") - # Validate expiry - if datetime.now(timezone.utc) > expiry_time: - del pending_confirmations[token] # Remove expired token - raise HTTPException(status_code=400, detail="Confirmation token has expired.") - - # Validate consistency: path and recursive flag must match the original request - if stored_request_data.path != data.path or stored_request_data.recursive != data.recursive: - del pending_confirmations[token] # Remove token due to mismatch - raise HTTPException(status_code=400, detail="Request parameters do not match the original confirmation request.") - - # --- Proceed with Deletion --- - try: - if not path.exists(): - # Re-check existence in case it was deleted between steps - raise HTTPException(status_code=404, detail=f"Path not found (may have been deleted after confirmation): {data.path}") - - if path.is_file(): - path.unlink() - message = f"Successfully deleted file: {data.path}" - elif path.is_dir(): - if data.recursive: - shutil.rmtree(path) - message = f"Successfully deleted directory recursively: {data.path}" - else: - try: - path.rmdir() - message = f"Successfully deleted empty directory: {data.path}" - except OSError as e: - raise HTTPException( - status_code=400, - detail=f"Directory not empty. Use 'recursive=True' to delete non-empty directories. Original error: {e}" - ) + if path.is_file(): + path.unlink() # Raises FileNotFoundError if not exists, PermissionError if no permission + return SuccessResponse(message=f"Successfully deleted file: {data.path}") + elif path.is_dir(): + if data.recursive: + shutil.rmtree(path) # Raises FileNotFoundError, PermissionError, NotADirectoryError + return SuccessResponse(message=f"Successfully deleted directory recursively: {data.path}") else: - raise HTTPException(status_code=400, detail=f"Path is not a file or directory: {data.path}") + try: + path.rmdir() # Raises FileNotFoundError, OSError (e.g., dir not empty, permission denied) + return SuccessResponse(message=f"Successfully deleted empty directory: {data.path}") + except OSError as e: + # Catch error if directory is not empty and recursive is false + raise HTTPException( + status_code=400, + detail=f"Directory not empty. Use 'recursive=True' to delete non-empty directories. Original error: {e}" + ) + else: + # Should not happen if exists() is true and it's not file/dir, but handle defensively + raise HTTPException(status_code=400, detail=f"Path is not a file or directory: {data.path}") - # Deletion successful, remove the used token - del pending_confirmations[token] - return SuccessResponse(message=message) - - except PermissionError: - # Don't remove token on permission error; user might fix permissions and retry - raise HTTPException(status_code=403, detail=f"Permission denied to delete {data.path}") - except HTTPException as http_exc: - # If an expected HTTP error occurred during deletion (e.g., dir not empty), remove the token - del pending_confirmations[token] - raise http_exc # Re-raise the specific HTTP exception - except Exception as e: - # Catch unexpected errors during deletion, remove the token - del pending_confirmations[token] - raise HTTPException(status_code=500, detail=f"Failed to delete {data.path}: {e}") + except PermissionError: + raise HTTPException(status_code=403, detail=f"Permission denied to delete {data.path}") + except Exception as e: + # Catch other potential errors during deletion + raise HTTPException(status_code=500, detail=f"Failed to delete {data.path}: {e}") @app.post("/move_path", response_model=SuccessResponse, summary="Move or rename a file or directory") @@ -441,13 +380,13 @@ async def move_path(data: MovePathRequest = Body(...)): Returns JSON success message. """ source = normalize_path(data.source_path) - destination = normalize_path(data.destination_path) # Normalize destination path too + destination = normalize_path(data.destination_path) # Also normalize destination try: if not source.exists(): raise HTTPException(status_code=404, detail=f"Source path not found: {data.source_path}") - shutil.move(str(source), str(destination)) # Handles both file and directory moves + shutil.move(str(source), str(destination)) # Raises FileNotFoundError, PermissionError etc. return SuccessResponse(message=f"Successfully moved '{data.source_path}' to '{data.destination_path}'") except PermissionError: @@ -469,17 +408,18 @@ async def get_metadata(data: GetMetadataRequest = Body(...)): stat_result = path.stat() - # Determine if it's a file or directory + # Determine type if path.is_file(): file_type = "file" elif path.is_dir(): file_type = "directory" else: - file_type = "other" # Should generally not happen for existing, normalized paths + file_type = "other" # Should generally not happen for existing paths normalized - # Format timestamps to ISO 8601 UTC for consistency + # Format timestamps (use UTC for consistency) mod_time = datetime.fromtimestamp(stat_result.st_mtime, tz=timezone.utc).isoformat() - # Get creation time (st_birthtime on macOS/BSD, fallback to st_ctime on Linux) + # Creation time (st_birthtime) is macOS/BSD specific, st_ctime is metadata change time on Linux + # Use st_ctime as a fallback if st_birthtime isn't available try: create_time = datetime.fromtimestamp(stat_result.st_birthtime, tz=timezone.utc).isoformat() except AttributeError: @@ -491,7 +431,7 @@ async def get_metadata(data: GetMetadataRequest = Body(...)): "type": file_type, "size_bytes": stat_result.st_size, "modification_time_utc": mod_time, - "creation_time_utc": create_time, # Note: Definition varies by OS (birthtime vs. ctime) + "creation_time_utc": create_time, # Note platform differences in definition "last_metadata_change_time_utc": datetime.fromtimestamp(stat_result.st_ctime, tz=timezone.utc).isoformat(), } return metadata