From 07be65fcf7fb1b3c1f94bec40d5e237095e322f4 Mon Sep 17 00:00:00 2001 From: Taylor Wilsdon Date: Mon, 14 Apr 2025 20:46:53 -0400 Subject: [PATCH 1/9] Refactor allowed directories to config.py; implement two-step delete_path with token-based confirmation and pending state; update related models, validation, and error handling --- servers/filesystem/config.py | 7 ++ servers/filesystem/main.py | 185 +++++++++++++++++++++++------------ 2 files changed, 127 insertions(+), 65 deletions(-) create mode 100644 servers/filesystem/config.py diff --git a/servers/filesystem/config.py b/servers/filesystem/config.py new file mode 100644 index 0000000..b104feb --- /dev/null +++ b/servers/filesystem/config.py @@ -0,0 +1,7 @@ +import os +import pathlib + +# Constants +ALLOWED_DIRECTORIES = [ + str(pathlib.Path(os.path.expanduser("~/")).resolve()) +] # 👈 Replace with your paths \ No newline at end of file diff --git a/servers/filesystem/main.py b/servers/filesystem/main.py index d48a755..4170e7b 100644 --- a/servers/filesystem/main.py +++ b/servers/filesystem/main.py @@ -7,10 +7,12 @@ from pydantic import BaseModel, Field import os import pathlib import asyncio -from typing import List, Optional, Literal +from typing import List, Optional, Union, Dict, Tuple import difflib import shutil -from datetime import datetime, timezone +from datetime import datetime, timezone, timedelta +import uuid +from .config import ALLOWED_DIRECTORIES app = FastAPI( title="Secure Filesystem API", version="0.1.0", @@ -27,11 +29,9 @@ app.add_middleware( allow_headers=["*"], ) - -# Constants -ALLOWED_DIRECTORIES = [ - str(pathlib.Path(os.path.expanduser("~/")).resolve()) -] # 👈 Replace with your paths +# In-memory store for pending confirmations. Use Redis/DB in production. +# Format: {token: (DeletePathRequest_data, expiry_datetime)} +pending_confirmations: Dict[str, Tuple['DeletePathRequest', datetime]] = {} # ------------------------------------------------------------------------------ # Utility functions @@ -41,7 +41,7 @@ ALLOWED_DIRECTORIES = [ def normalize_path(requested_path: str) -> pathlib.Path: requested = pathlib.Path(os.path.expanduser(requested_path)).resolve() for allowed in ALLOWED_DIRECTORIES: - if str(requested).lower().startswith(allowed.lower()): # Case-insensitive check + if str(requested).lower().startswith(allowed.lower()): # Case-insensitive path comparison return requested raise HTTPException( status_code=403, @@ -128,8 +128,9 @@ class DeletePathRequest(BaseModel): recursive: bool = Field( default=False, description="If true and path is a directory, delete recursively. Required if directory is not empty." ) - confirm_delete: bool = Field( - ..., description="Must be explicitly set to true to confirm deletion." + # confirmation_token replaces the old confirm_delete flag + confirmation_token: Optional[str] = Field( + default=None, description="A confirmation token obtained from a prior request, required to execute deletion." ) @@ -150,6 +151,12 @@ class GetMetadataRequest(BaseModel): class SuccessResponse(BaseModel): message: str = Field(..., description="Success message indicating the operation was completed.") +# Response model for when confirmation is required for deletion +class ConfirmationRequiredResponse(BaseModel): + message: str = Field(default="Confirmation required to proceed with deletion.") + confirmation_token: str = Field(..., description="Token needed for the confirmation step.") + expires_at: str = Field(..., description="UTC timestamp when the token expires (ISO format).") + class ReadFileResponse(BaseModel): content: str = Field(..., description="UTF-8 encoded text content of the file.") @@ -159,7 +166,7 @@ class DiffResponse(BaseModel): diff: str = Field(..., description="Unified diff output comparing original and modified content.") -@app.post("/read_file", response_model=ReadFileResponse, summary="Read a file") # Changed response_class to response_model +@app.post("/read_file", response_model=ReadFileResponse, summary="Read a file") async def read_file(data: ReadFileRequest = Body(...)): """ Read the entire contents of a file and return as JSON. @@ -167,13 +174,13 @@ async def read_file(data: ReadFileRequest = Body(...)): path = normalize_path(data.path) try: file_content = path.read_text(encoding="utf-8") - return ReadFileResponse(content=file_content) # Return Pydantic model instance + return ReadFileResponse(content=file_content) except FileNotFoundError: raise HTTPException(status_code=404, detail=f"File not found: {data.path}") except PermissionError: raise HTTPException(status_code=403, detail=f"Permission denied for file: {data.path}") except Exception as e: - # More specific error for generic read issues + # Catch potential read errors raise HTTPException(status_code=500, detail=f"Failed to read file {data.path}: {str(e)}") @@ -192,11 +199,11 @@ async def write_file(data: WriteFileRequest = Body(...)): raise HTTPException(status_code=500, detail=f"Failed to write to {data.path}: {str(e)}") -from typing import Union # Add this import at the top with other typing imports +# Note: Union was already imported earlier @app.post( "/edit_file", - response_model=Union[SuccessResponse, DiffResponse], # Use Union for multiple response types + response_model=Union[SuccessResponse, DiffResponse], summary="Edit a file with diff" ) async def edit_file(data: EditFileRequest = Body(...)): @@ -231,16 +238,16 @@ async def edit_file(data: EditFileRequest = Body(...)): fromfile=f"a/{data.path}", tofile=f"b/{data.path}", ) - return DiffResponse(diff="".join(diff_output)) # Return JSON diff + return DiffResponse(diff="".join(diff_output)) - # Write changes if not dry run + # Apply changes if not a dry run path.write_text(modified, encoding="utf-8") - return SuccessResponse(message=f"Successfully edited file {data.path}") # Return JSON success + return SuccessResponse(message=f"Successfully edited file {data.path}") except PermissionError: raise HTTPException(status_code=403, detail=f"Permission denied to write edited file: {data.path}") except Exception as e: - # Catch errors during writing the modified file + # Catch potential write errors after modification raise HTTPException(status_code=500, detail=f"Failed to write edited file {data.path}: {str(e)}") @@ -277,7 +284,7 @@ async def list_directory(data: ListDirectoryRequest = Body(...)): entry_type = "directory" if entry.is_dir() else "file" listing.append({"name": entry.name, "type": entry_type}) - # Return the list directly, FastAPI will serialize it to JSON + # FastAPI automatically serializes the list to JSON return listing @@ -313,7 +320,7 @@ async def search_files(data: SearchFilesRequest = Body(...)): for root, dirs, files in os.walk(base_path): root_path = pathlib.Path(root) - # Apply exclusion patterns + # Check if the current root directory should be excluded excluded = False for pattern in data.excludePatterns: if pathlib.Path(root).match(pattern): @@ -330,51 +337,100 @@ async def search_files(data: SearchFilesRequest = Body(...)): return {"matches": results or ["No matches found"]} -@app.post("/delete_path", response_model=SuccessResponse, summary="Delete a file or directory") +@app.post( + "/delete_path", + response_model=Union[SuccessResponse, ConfirmationRequiredResponse], + summary="Delete a file or directory (two-step confirmation)" +) async def delete_path(data: DeletePathRequest = Body(...)): """ - Delete a specified file or directory. Requires explicit confirmation. - Use 'recursive=True' to delete non-empty directories. - Returns JSON success message. + Initiate or confirm deletion of a file or directory. + + 1. **First Request:** Call without `confirmation_token`. Returns a `confirmation_token`. + 2. **Second Request:** Call again with the *exact same* `path` and `recursive` parameters, + plus the `confirmation_token` received from the first request. + + Use `recursive=True` to delete non-empty directories (requires confirmation). + Returns `ConfirmationRequiredResponse` on first step, `SuccessResponse` on completion. """ - if not data.confirm_delete: - raise HTTPException( - status_code=400, - detail="Deletion not confirmed. Set 'confirm_delete' to true to proceed." + path = normalize_path(data.path) + token = data.confirmation_token + + if token is None: + # Step 1: No token provided, generate one and request confirmation + if not path.exists(): + # Check existence before generating a token + raise HTTPException(status_code=404, detail=f"Path not found: {data.path}") + + confirmation_token = str(uuid.uuid4()) + expiry_time = datetime.now(timezone.utc) + timedelta(minutes=2) # Token valid for 2 minutes + + # Store the original request data and expiry time associated with the token + # Important: Store data *before* normalization for validation later + pending_confirmations[confirmation_token] = (data, expiry_time) + + return ConfirmationRequiredResponse( + confirmation_token=confirmation_token, + expires_at=expiry_time.isoformat() ) - path = normalize_path(data.path) + else: + # Step 2: Token provided, validate and attempt deletion + if token not in pending_confirmations: + raise HTTPException(status_code=400, detail="Invalid or expired confirmation token.") - try: - if not path.exists(): - raise HTTPException(status_code=404, detail=f"Path not found: {data.path}") + stored_request_data, expiry_time = pending_confirmations[token] - if path.is_file(): - path.unlink() # Raises FileNotFoundError if not exists, PermissionError if no permission - return SuccessResponse(message=f"Successfully deleted file: {data.path}") - elif path.is_dir(): - if data.recursive: - shutil.rmtree(path) # Raises FileNotFoundError, PermissionError, NotADirectoryError - return SuccessResponse(message=f"Successfully deleted directory recursively: {data.path}") + # Validate expiry + if datetime.now(timezone.utc) > expiry_time: + del pending_confirmations[token] # Remove expired token + raise HTTPException(status_code=400, detail="Confirmation token has expired.") + + # Validate consistency: path and recursive flag must match the original request + if stored_request_data.path != data.path or stored_request_data.recursive != data.recursive: + del pending_confirmations[token] # Remove token due to mismatch + raise HTTPException(status_code=400, detail="Request parameters do not match the original confirmation request.") + + # --- Proceed with Deletion --- + try: + if not path.exists(): + # Re-check existence in case it was deleted between steps + raise HTTPException(status_code=404, detail=f"Path not found (may have been deleted after confirmation): {data.path}") + + if path.is_file(): + path.unlink() + message = f"Successfully deleted file: {data.path}" + elif path.is_dir(): + if data.recursive: + shutil.rmtree(path) + message = f"Successfully deleted directory recursively: {data.path}" + else: + try: + path.rmdir() + message = f"Successfully deleted empty directory: {data.path}" + except OSError as e: + raise HTTPException( + status_code=400, + detail=f"Directory not empty. Use 'recursive=True' to delete non-empty directories. Original error: {e}" + ) else: - try: - path.rmdir() # Raises FileNotFoundError, OSError (e.g., dir not empty, permission denied) - return SuccessResponse(message=f"Successfully deleted empty directory: {data.path}") - except OSError as e: - # Catch error if directory is not empty and recursive is false - raise HTTPException( - status_code=400, - detail=f"Directory not empty. Use 'recursive=True' to delete non-empty directories. Original error: {e}" - ) - else: - # Should not happen if exists() is true and it's not file/dir, but handle defensively - raise HTTPException(status_code=400, detail=f"Path is not a file or directory: {data.path}") + raise HTTPException(status_code=400, detail=f"Path is not a file or directory: {data.path}") - except PermissionError: - raise HTTPException(status_code=403, detail=f"Permission denied to delete {data.path}") - except Exception as e: - # Catch other potential errors during deletion - raise HTTPException(status_code=500, detail=f"Failed to delete {data.path}: {e}") + # Deletion successful, remove the used token + del pending_confirmations[token] + return SuccessResponse(message=message) + + except PermissionError: + # Don't remove token on permission error; user might fix permissions and retry + raise HTTPException(status_code=403, detail=f"Permission denied to delete {data.path}") + except HTTPException as http_exc: + # If an expected HTTP error occurred during deletion (e.g., dir not empty), remove the token + del pending_confirmations[token] + raise http_exc # Re-raise the specific HTTP exception + except Exception as e: + # Catch unexpected errors during deletion, remove the token + del pending_confirmations[token] + raise HTTPException(status_code=500, detail=f"Failed to delete {data.path}: {e}") @app.post("/move_path", response_model=SuccessResponse, summary="Move or rename a file or directory") @@ -385,13 +441,13 @@ async def move_path(data: MovePathRequest = Body(...)): Returns JSON success message. """ source = normalize_path(data.source_path) - destination = normalize_path(data.destination_path) # Also normalize destination + destination = normalize_path(data.destination_path) # Normalize destination path too try: if not source.exists(): raise HTTPException(status_code=404, detail=f"Source path not found: {data.source_path}") - shutil.move(str(source), str(destination)) # Raises FileNotFoundError, PermissionError etc. + shutil.move(str(source), str(destination)) # Handles both file and directory moves return SuccessResponse(message=f"Successfully moved '{data.source_path}' to '{data.destination_path}'") except PermissionError: @@ -413,18 +469,17 @@ async def get_metadata(data: GetMetadataRequest = Body(...)): stat_result = path.stat() - # Determine type + # Determine if it's a file or directory if path.is_file(): file_type = "file" elif path.is_dir(): file_type = "directory" else: - file_type = "other" # Should generally not happen for existing paths normalized + file_type = "other" # Should generally not happen for existing, normalized paths - # Format timestamps (use UTC for consistency) + # Format timestamps to ISO 8601 UTC for consistency mod_time = datetime.fromtimestamp(stat_result.st_mtime, tz=timezone.utc).isoformat() - # Creation time (st_birthtime) is macOS/BSD specific, st_ctime is metadata change time on Linux - # Use st_ctime as a fallback if st_birthtime isn't available + # Get creation time (st_birthtime on macOS/BSD, fallback to st_ctime on Linux) try: create_time = datetime.fromtimestamp(stat_result.st_birthtime, tz=timezone.utc).isoformat() except AttributeError: @@ -436,7 +491,7 @@ async def get_metadata(data: GetMetadataRequest = Body(...)): "type": file_type, "size_bytes": stat_result.st_size, "modification_time_utc": mod_time, - "creation_time_utc": create_time, # Note platform differences in definition + "creation_time_utc": create_time, # Note: Definition varies by OS (birthtime vs. ctime) "last_metadata_change_time_utc": datetime.fromtimestamp(stat_result.st_ctime, tz=timezone.utc).isoformat(), } return metadata From 2110943fde3e51ce47126bb7fedb34b0001cc844 Mon Sep 17 00:00:00 2001 From: Taylor Wilsdon Date: Mon, 14 Apr 2025 21:30:26 -0400 Subject: [PATCH 2/9] absolute import for config --- servers/filesystem/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/servers/filesystem/main.py b/servers/filesystem/main.py index 4170e7b..5d56297 100644 --- a/servers/filesystem/main.py +++ b/servers/filesystem/main.py @@ -12,7 +12,7 @@ import difflib import shutil from datetime import datetime, timezone, timedelta import uuid -from .config import ALLOWED_DIRECTORIES +from config import ALLOWED_DIRECTORIES app = FastAPI( title="Secure Filesystem API", version="0.1.0", From b0b1e6eb6d2363bbf520a9123a545aa52b4d79df Mon Sep 17 00:00:00 2001 From: Taylor Wilsdon Date: Thu, 17 Apr 2025 09:31:57 -0700 Subject: [PATCH 3/9] Implement more reliable mechanism for file delete confirmation --- servers/filesystem/main.py | 182 +++++++++++++------------------------ 1 file changed, 61 insertions(+), 121 deletions(-) diff --git a/servers/filesystem/main.py b/servers/filesystem/main.py index 5d56297..06134cf 100644 --- a/servers/filesystem/main.py +++ b/servers/filesystem/main.py @@ -7,12 +7,11 @@ from pydantic import BaseModel, Field import os import pathlib import asyncio -from typing import List, Optional, Union, Dict, Tuple +from typing import List, Optional, Literal import difflib import shutil -from datetime import datetime, timezone, timedelta -import uuid -from config import ALLOWED_DIRECTORIES +from datetime import datetime, timezone +from .config import ALLOWED_DIRECTORIES app = FastAPI( title="Secure Filesystem API", version="0.1.0", @@ -29,10 +28,6 @@ app.add_middleware( allow_headers=["*"], ) -# In-memory store for pending confirmations. Use Redis/DB in production. -# Format: {token: (DeletePathRequest_data, expiry_datetime)} -pending_confirmations: Dict[str, Tuple['DeletePathRequest', datetime]] = {} - # ------------------------------------------------------------------------------ # Utility functions # ------------------------------------------------------------------------------ @@ -41,7 +36,7 @@ pending_confirmations: Dict[str, Tuple['DeletePathRequest', datetime]] = {} def normalize_path(requested_path: str) -> pathlib.Path: requested = pathlib.Path(os.path.expanduser(requested_path)).resolve() for allowed in ALLOWED_DIRECTORIES: - if str(requested).lower().startswith(allowed.lower()): # Case-insensitive path comparison + if str(requested).lower().startswith(allowed.lower()): # Case-insensitive check return requested raise HTTPException( status_code=403, @@ -128,9 +123,8 @@ class DeletePathRequest(BaseModel): recursive: bool = Field( default=False, description="If true and path is a directory, delete recursively. Required if directory is not empty." ) - # confirmation_token replaces the old confirm_delete flag - confirmation_token: Optional[str] = Field( - default=None, description="A confirmation token obtained from a prior request, required to execute deletion." + confirm_delete: bool = Field( + ..., description="Must be explicitly set to true to confirm deletion." ) @@ -151,12 +145,6 @@ class GetMetadataRequest(BaseModel): class SuccessResponse(BaseModel): message: str = Field(..., description="Success message indicating the operation was completed.") -# Response model for when confirmation is required for deletion -class ConfirmationRequiredResponse(BaseModel): - message: str = Field(default="Confirmation required to proceed with deletion.") - confirmation_token: str = Field(..., description="Token needed for the confirmation step.") - expires_at: str = Field(..., description="UTC timestamp when the token expires (ISO format).") - class ReadFileResponse(BaseModel): content: str = Field(..., description="UTF-8 encoded text content of the file.") @@ -166,7 +154,7 @@ class DiffResponse(BaseModel): diff: str = Field(..., description="Unified diff output comparing original and modified content.") -@app.post("/read_file", response_model=ReadFileResponse, summary="Read a file") +@app.post("/read_file", response_model=ReadFileResponse, summary="Read a file") # Changed response_class to response_model async def read_file(data: ReadFileRequest = Body(...)): """ Read the entire contents of a file and return as JSON. @@ -174,13 +162,13 @@ async def read_file(data: ReadFileRequest = Body(...)): path = normalize_path(data.path) try: file_content = path.read_text(encoding="utf-8") - return ReadFileResponse(content=file_content) + return ReadFileResponse(content=file_content) # Return Pydantic model instance except FileNotFoundError: raise HTTPException(status_code=404, detail=f"File not found: {data.path}") except PermissionError: raise HTTPException(status_code=403, detail=f"Permission denied for file: {data.path}") except Exception as e: - # Catch potential read errors + # More specific error for generic read issues raise HTTPException(status_code=500, detail=f"Failed to read file {data.path}: {str(e)}") @@ -199,11 +187,11 @@ async def write_file(data: WriteFileRequest = Body(...)): raise HTTPException(status_code=500, detail=f"Failed to write to {data.path}: {str(e)}") -# Note: Union was already imported earlier +from typing import Union # Add this import at the top with other typing imports @app.post( "/edit_file", - response_model=Union[SuccessResponse, DiffResponse], + response_model=Union[SuccessResponse, DiffResponse], # Use Union for multiple response types summary="Edit a file with diff" ) async def edit_file(data: EditFileRequest = Body(...)): @@ -238,16 +226,16 @@ async def edit_file(data: EditFileRequest = Body(...)): fromfile=f"a/{data.path}", tofile=f"b/{data.path}", ) - return DiffResponse(diff="".join(diff_output)) + return DiffResponse(diff="".join(diff_output)) # Return JSON diff - # Apply changes if not a dry run + # Write changes if not dry run path.write_text(modified, encoding="utf-8") - return SuccessResponse(message=f"Successfully edited file {data.path}") + return SuccessResponse(message=f"Successfully edited file {data.path}") # Return JSON success except PermissionError: raise HTTPException(status_code=403, detail=f"Permission denied to write edited file: {data.path}") except Exception as e: - # Catch potential write errors after modification + # Catch errors during writing the modified file raise HTTPException(status_code=500, detail=f"Failed to write edited file {data.path}: {str(e)}") @@ -284,7 +272,7 @@ async def list_directory(data: ListDirectoryRequest = Body(...)): entry_type = "directory" if entry.is_dir() else "file" listing.append({"name": entry.name, "type": entry_type}) - # FastAPI automatically serializes the list to JSON + # Return the list directly, FastAPI will serialize it to JSON return listing @@ -320,7 +308,7 @@ async def search_files(data: SearchFilesRequest = Body(...)): for root, dirs, files in os.walk(base_path): root_path = pathlib.Path(root) - # Check if the current root directory should be excluded + # Apply exclusion patterns excluded = False for pattern in data.excludePatterns: if pathlib.Path(root).match(pattern): @@ -337,100 +325,51 @@ async def search_files(data: SearchFilesRequest = Body(...)): return {"matches": results or ["No matches found"]} -@app.post( - "/delete_path", - response_model=Union[SuccessResponse, ConfirmationRequiredResponse], - summary="Delete a file or directory (two-step confirmation)" -) +@app.post("/delete_path", response_model=SuccessResponse, summary="Delete a file or directory") async def delete_path(data: DeletePathRequest = Body(...)): """ - Initiate or confirm deletion of a file or directory. - - 1. **First Request:** Call without `confirmation_token`. Returns a `confirmation_token`. - 2. **Second Request:** Call again with the *exact same* `path` and `recursive` parameters, - plus the `confirmation_token` received from the first request. - - Use `recursive=True` to delete non-empty directories (requires confirmation). - Returns `ConfirmationRequiredResponse` on first step, `SuccessResponse` on completion. + Delete a specified file or directory. Requires explicit confirmation. + Use 'recursive=True' to delete non-empty directories. + Returns JSON success message. """ - path = normalize_path(data.path) - token = data.confirmation_token - - if token is None: - # Step 1: No token provided, generate one and request confirmation - if not path.exists(): - # Check existence before generating a token - raise HTTPException(status_code=404, detail=f"Path not found: {data.path}") - - confirmation_token = str(uuid.uuid4()) - expiry_time = datetime.now(timezone.utc) + timedelta(minutes=2) # Token valid for 2 minutes - - # Store the original request data and expiry time associated with the token - # Important: Store data *before* normalization for validation later - pending_confirmations[confirmation_token] = (data, expiry_time) - - return ConfirmationRequiredResponse( - confirmation_token=confirmation_token, - expires_at=expiry_time.isoformat() + if not data.confirm_delete: + raise HTTPException( + status_code=400, + detail="Deletion not confirmed. Set 'confirm_delete' to true to proceed." ) - else: - # Step 2: Token provided, validate and attempt deletion - if token not in pending_confirmations: - raise HTTPException(status_code=400, detail="Invalid or expired confirmation token.") + path = normalize_path(data.path) - stored_request_data, expiry_time = pending_confirmations[token] + try: + if not path.exists(): + raise HTTPException(status_code=404, detail=f"Path not found: {data.path}") - # Validate expiry - if datetime.now(timezone.utc) > expiry_time: - del pending_confirmations[token] # Remove expired token - raise HTTPException(status_code=400, detail="Confirmation token has expired.") - - # Validate consistency: path and recursive flag must match the original request - if stored_request_data.path != data.path or stored_request_data.recursive != data.recursive: - del pending_confirmations[token] # Remove token due to mismatch - raise HTTPException(status_code=400, detail="Request parameters do not match the original confirmation request.") - - # --- Proceed with Deletion --- - try: - if not path.exists(): - # Re-check existence in case it was deleted between steps - raise HTTPException(status_code=404, detail=f"Path not found (may have been deleted after confirmation): {data.path}") - - if path.is_file(): - path.unlink() - message = f"Successfully deleted file: {data.path}" - elif path.is_dir(): - if data.recursive: - shutil.rmtree(path) - message = f"Successfully deleted directory recursively: {data.path}" - else: - try: - path.rmdir() - message = f"Successfully deleted empty directory: {data.path}" - except OSError as e: - raise HTTPException( - status_code=400, - detail=f"Directory not empty. Use 'recursive=True' to delete non-empty directories. Original error: {e}" - ) + if path.is_file(): + path.unlink() # Raises FileNotFoundError if not exists, PermissionError if no permission + return SuccessResponse(message=f"Successfully deleted file: {data.path}") + elif path.is_dir(): + if data.recursive: + shutil.rmtree(path) # Raises FileNotFoundError, PermissionError, NotADirectoryError + return SuccessResponse(message=f"Successfully deleted directory recursively: {data.path}") else: - raise HTTPException(status_code=400, detail=f"Path is not a file or directory: {data.path}") + try: + path.rmdir() # Raises FileNotFoundError, OSError (e.g., dir not empty, permission denied) + return SuccessResponse(message=f"Successfully deleted empty directory: {data.path}") + except OSError as e: + # Catch error if directory is not empty and recursive is false + raise HTTPException( + status_code=400, + detail=f"Directory not empty. Use 'recursive=True' to delete non-empty directories. Original error: {e}" + ) + else: + # Should not happen if exists() is true and it's not file/dir, but handle defensively + raise HTTPException(status_code=400, detail=f"Path is not a file or directory: {data.path}") - # Deletion successful, remove the used token - del pending_confirmations[token] - return SuccessResponse(message=message) - - except PermissionError: - # Don't remove token on permission error; user might fix permissions and retry - raise HTTPException(status_code=403, detail=f"Permission denied to delete {data.path}") - except HTTPException as http_exc: - # If an expected HTTP error occurred during deletion (e.g., dir not empty), remove the token - del pending_confirmations[token] - raise http_exc # Re-raise the specific HTTP exception - except Exception as e: - # Catch unexpected errors during deletion, remove the token - del pending_confirmations[token] - raise HTTPException(status_code=500, detail=f"Failed to delete {data.path}: {e}") + except PermissionError: + raise HTTPException(status_code=403, detail=f"Permission denied to delete {data.path}") + except Exception as e: + # Catch other potential errors during deletion + raise HTTPException(status_code=500, detail=f"Failed to delete {data.path}: {e}") @app.post("/move_path", response_model=SuccessResponse, summary="Move or rename a file or directory") @@ -441,13 +380,13 @@ async def move_path(data: MovePathRequest = Body(...)): Returns JSON success message. """ source = normalize_path(data.source_path) - destination = normalize_path(data.destination_path) # Normalize destination path too + destination = normalize_path(data.destination_path) # Also normalize destination try: if not source.exists(): raise HTTPException(status_code=404, detail=f"Source path not found: {data.source_path}") - shutil.move(str(source), str(destination)) # Handles both file and directory moves + shutil.move(str(source), str(destination)) # Raises FileNotFoundError, PermissionError etc. return SuccessResponse(message=f"Successfully moved '{data.source_path}' to '{data.destination_path}'") except PermissionError: @@ -469,17 +408,18 @@ async def get_metadata(data: GetMetadataRequest = Body(...)): stat_result = path.stat() - # Determine if it's a file or directory + # Determine type if path.is_file(): file_type = "file" elif path.is_dir(): file_type = "directory" else: - file_type = "other" # Should generally not happen for existing, normalized paths + file_type = "other" # Should generally not happen for existing paths normalized - # Format timestamps to ISO 8601 UTC for consistency + # Format timestamps (use UTC for consistency) mod_time = datetime.fromtimestamp(stat_result.st_mtime, tz=timezone.utc).isoformat() - # Get creation time (st_birthtime on macOS/BSD, fallback to st_ctime on Linux) + # Creation time (st_birthtime) is macOS/BSD specific, st_ctime is metadata change time on Linux + # Use st_ctime as a fallback if st_birthtime isn't available try: create_time = datetime.fromtimestamp(stat_result.st_birthtime, tz=timezone.utc).isoformat() except AttributeError: @@ -491,7 +431,7 @@ async def get_metadata(data: GetMetadataRequest = Body(...)): "type": file_type, "size_bytes": stat_result.st_size, "modification_time_utc": mod_time, - "creation_time_utc": create_time, # Note: Definition varies by OS (birthtime vs. ctime) + "creation_time_utc": create_time, # Note platform differences in definition "last_metadata_change_time_utc": datetime.fromtimestamp(stat_result.st_ctime, tz=timezone.utc).isoformat(), } return metadata From 8bd8f1d712b0bb3a9d120051ebe006320917bd41 Mon Sep 17 00:00:00 2001 From: Taylor Wilsdon Date: Thu, 17 Apr 2025 09:32:34 -0700 Subject: [PATCH 4/9] fix relative import --- servers/filesystem/main.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/servers/filesystem/main.py b/servers/filesystem/main.py index 06134cf..5464123 100644 --- a/servers/filesystem/main.py +++ b/servers/filesystem/main.py @@ -11,10 +11,11 @@ from typing import List, Optional, Literal import difflib import shutil from datetime import datetime, timezone -from .config import ALLOWED_DIRECTORIES +from config import ALLOWED_DIRECTORIES + app = FastAPI( title="Secure Filesystem API", - version="0.1.0", + version="0.1.1", description="A secure file manipulation server for reading, editing, writing, listing, and searching files with access restrictions.", ) From 569d8905ef2b8adecd8debc8553820c7b1ce9926 Mon Sep 17 00:00:00 2001 From: Taylor Wilsdon Date: Thu, 17 Apr 2025 10:43:48 -0700 Subject: [PATCH 5/9] re-implement pending_confirmation --- servers/filesystem/config.py | 2 +- servers/filesystem/main.py | 145 +++++++++++++++++++++++++---------- 2 files changed, 105 insertions(+), 42 deletions(-) diff --git a/servers/filesystem/config.py b/servers/filesystem/config.py index b104feb..4d7f53e 100644 --- a/servers/filesystem/config.py +++ b/servers/filesystem/config.py @@ -3,5 +3,5 @@ import pathlib # Constants ALLOWED_DIRECTORIES = [ - str(pathlib.Path(os.path.expanduser("~/")).resolve()) + str(pathlib.Path(os.path.expanduser("~/tmp")).resolve()) ] # 👈 Replace with your paths \ No newline at end of file diff --git a/servers/filesystem/main.py b/servers/filesystem/main.py index 5464123..62facd2 100644 --- a/servers/filesystem/main.py +++ b/servers/filesystem/main.py @@ -7,10 +7,11 @@ from pydantic import BaseModel, Field import os import pathlib import asyncio -from typing import List, Optional, Literal +from typing import List, Optional, Literal, Dict, Union # Added Dict, Union import difflib import shutil -from datetime import datetime, timezone +from datetime import datetime, timezone, timedelta # Added timedelta +import uuid # Added uuid from config import ALLOWED_DIRECTORIES app = FastAPI( @@ -124,8 +125,8 @@ class DeletePathRequest(BaseModel): recursive: bool = Field( default=False, description="If true and path is a directory, delete recursively. Required if directory is not empty." ) - confirm_delete: bool = Field( - ..., description="Must be explicitly set to true to confirm deletion." + confirmation_token: Optional[str] = Field( + default=None, description="Token required for confirming deletion after initial request." ) @@ -138,6 +139,14 @@ class GetMetadataRequest(BaseModel): path: str = Field(..., description="Path to the file or directory to get metadata for.") +# ------------------------------------------------------------------------------ +# Global state for pending confirmations +# ------------------------------------------------------------------------------ + +# Store pending confirmations: {token: {"path": str, "recursive": bool, "expiry": datetime}} +pending_confirmations: Dict[str, Dict] = {} +CONFIRMATION_TTL_SECONDS = 60 # Token validity period + # ------------------------------------------------------------------------------ # Routes # ------------------------------------------------------------------------------ @@ -155,6 +164,12 @@ class DiffResponse(BaseModel): diff: str = Field(..., description="Unified diff output comparing original and modified content.") +class ConfirmationRequiredResponse(BaseModel): + message: str = Field(..., description="Message indicating confirmation is required.") + confirmation_token: str = Field(..., description="Token needed for the confirmation step.") + expires_at: datetime = Field(..., description="UTC timestamp when the token expires.") + + @app.post("/read_file", response_model=ReadFileResponse, summary="Read a file") # Changed response_class to response_model async def read_file(data: ReadFileRequest = Body(...)): """ @@ -188,7 +203,7 @@ async def write_file(data: WriteFileRequest = Body(...)): raise HTTPException(status_code=500, detail=f"Failed to write to {data.path}: {str(e)}") -from typing import Union # Add this import at the top with other typing imports +# Removed duplicate import - already added Union at the top @app.post( "/edit_file", @@ -326,51 +341,99 @@ async def search_files(data: SearchFilesRequest = Body(...)): return {"matches": results or ["No matches found"]} -@app.post("/delete_path", response_model=SuccessResponse, summary="Delete a file or directory") +@app.post( + "/delete_path", + response_model=Union[SuccessResponse, ConfirmationRequiredResponse], # Updated response model + summary="Delete a file or directory (two-step confirmation)" +) async def delete_path(data: DeletePathRequest = Body(...)): """ - Delete a specified file or directory. Requires explicit confirmation. + Delete a specified file or directory using a two-step confirmation process. + + 1. Initial request (without confirmation_token): Returns a confirmation token. + 2. Confirmation request (with token): Executes the deletion if the token is valid + and matches the original request parameters (path, recursive). + Use 'recursive=True' to delete non-empty directories. - Returns JSON success message. """ - if not data.confirm_delete: - raise HTTPException( - status_code=400, - detail="Deletion not confirmed. Set 'confirm_delete' to true to proceed." - ) - path = normalize_path(data.path) + now = datetime.now(timezone.utc) - try: - if not path.exists(): - raise HTTPException(status_code=404, detail=f"Path not found: {data.path}") + # --- Step 2: Confirmation Request --- + if data.confirmation_token: + if data.confirmation_token not in pending_confirmations: + raise HTTPException(status_code=400, detail="Invalid or expired confirmation token.") - if path.is_file(): - path.unlink() # Raises FileNotFoundError if not exists, PermissionError if no permission - return SuccessResponse(message=f"Successfully deleted file: {data.path}") - elif path.is_dir(): - if data.recursive: - shutil.rmtree(path) # Raises FileNotFoundError, PermissionError, NotADirectoryError - return SuccessResponse(message=f"Successfully deleted directory recursively: {data.path}") + confirmation_data = pending_confirmations[data.confirmation_token] + + # Validate token expiry + if now > confirmation_data["expiry"]: + del pending_confirmations[data.confirmation_token] # Clean up expired token + raise HTTPException(status_code=400, detail="Confirmation token has expired.") + + # Validate request parameters match + if confirmation_data["path"] != data.path or confirmation_data["recursive"] != data.recursive: + raise HTTPException( + status_code=400, + detail="Request parameters (path, recursive) do not match the original request for this token." + ) + + # --- Parameters match and token is valid: Proceed with deletion --- + del pending_confirmations[data.confirmation_token] # Consume the token + + try: + if not path.exists(): + # Path might have been deleted between requests, treat as success or specific error? + # For now, raise 404 as it doesn't exist *now*. + raise HTTPException(status_code=404, detail=f"Path not found: {data.path}") + + if path.is_file(): + path.unlink() + return SuccessResponse(message=f"Successfully deleted file: {data.path}") + elif path.is_dir(): + if data.recursive: + shutil.rmtree(path) + return SuccessResponse(message=f"Successfully deleted directory recursively: {data.path}") + else: + try: + path.rmdir() + return SuccessResponse(message=f"Successfully deleted empty directory: {data.path}") + except OSError as e: + raise HTTPException( + status_code=400, + detail=f"Directory not empty. Use 'recursive=True' to delete non-empty directories. Original error: {e}" + ) else: - try: - path.rmdir() # Raises FileNotFoundError, OSError (e.g., dir not empty, permission denied) - return SuccessResponse(message=f"Successfully deleted empty directory: {data.path}") - except OSError as e: - # Catch error if directory is not empty and recursive is false - raise HTTPException( - status_code=400, - detail=f"Directory not empty. Use 'recursive=True' to delete non-empty directories. Original error: {e}" - ) - else: - # Should not happen if exists() is true and it's not file/dir, but handle defensively - raise HTTPException(status_code=400, detail=f"Path is not a file or directory: {data.path}") + raise HTTPException(status_code=400, detail=f"Path is not a file or directory: {data.path}") - except PermissionError: - raise HTTPException(status_code=403, detail=f"Permission denied to delete {data.path}") - except Exception as e: - # Catch other potential errors during deletion - raise HTTPException(status_code=500, detail=f"Failed to delete {data.path}: {e}") + except PermissionError: + raise HTTPException(status_code=403, detail=f"Permission denied to delete {data.path}") + except Exception as e: + raise HTTPException(status_code=500, detail=f"Failed to delete {data.path}: {e}") + + # --- Step 1: Initial Request (No Token Provided) --- + else: + # Check if path exists before generating token + if not path.exists(): + raise HTTPException(status_code=404, detail=f"Path not found: {data.path}") + + # Generate token and expiry + token = uuid.uuid4().hex + expiry_time = now + timedelta(seconds=CONFIRMATION_TTL_SECONDS) + + # Store confirmation details + pending_confirmations[token] = { + "path": data.path, + "recursive": data.recursive, + "expiry": expiry_time, + } + + # Return confirmation required response + return ConfirmationRequiredResponse( + message="Confirmation required to delete path. Use the provided token in a subsequent request with the same parameters.", + confirmation_token=token, + expires_at=expiry_time, + ) @app.post("/move_path", response_model=SuccessResponse, summary="Move or rename a file or directory") From 5478ae007b3b276fc383027934a45698297e7b2d Mon Sep 17 00:00:00 2001 From: Taylor Wilsdon Date: Thu, 17 Apr 2025 11:09:44 -0700 Subject: [PATCH 6/9] migrate pending confirmation store to local filestore for persistence between sessions --- servers/filesystem/main.py | 66 ++++++++++++++++++++++++++++++++++---- 1 file changed, 59 insertions(+), 7 deletions(-) diff --git a/servers/filesystem/main.py b/servers/filesystem/main.py index 62facd2..b9a4987 100644 --- a/servers/filesystem/main.py +++ b/servers/filesystem/main.py @@ -12,6 +12,7 @@ import difflib import shutil from datetime import datetime, timezone, timedelta # Added timedelta import uuid # Added uuid +import json # Added json from config import ALLOWED_DIRECTORIES app = FastAPI( @@ -143,10 +144,58 @@ class GetMetadataRequest(BaseModel): # Global state for pending confirmations # ------------------------------------------------------------------------------ -# Store pending confirmations: {token: {"path": str, "recursive": bool, "expiry": datetime}} -pending_confirmations: Dict[str, Dict] = {} +# --- Confirmation Token State Management (using a file) --- +CONFIRMATION_FILE = pathlib.Path("./.pending_confirmations.json") CONFIRMATION_TTL_SECONDS = 60 # Token validity period +def load_confirmations() -> Dict[str, Dict]: + """Loads pending confirmations from the JSON file.""" + if not CONFIRMATION_FILE.exists(): + return {} + try: + with CONFIRMATION_FILE.open("r") as f: + data = json.load(f) + # Convert expiry string back to datetime object + now = datetime.now(timezone.utc) + valid_confirmations = {} + for token, details in data.items(): + try: + details["expiry"] = datetime.fromisoformat(details["expiry"]) + # Clean up expired tokens during load + if details["expiry"] > now: + valid_confirmations[token] = details + except (ValueError, TypeError, KeyError): + print(f"Warning: Skipping invalid confirmation data for token {token}") + continue # Skip invalid entries + return valid_confirmations + except (json.JSONDecodeError, IOError) as e: + print(f"Error loading confirmations file: {e}. Returning empty dict.") + return {} + +def save_confirmations(confirmations: Dict[str, Dict]): + """Saves pending confirmations to the JSON file.""" + try: + # Convert datetime objects to ISO strings for JSON serialization + serializable_confirmations = {} + for token, details in confirmations.items(): + serializable_details = details.copy() + serializable_details["expiry"] = details["expiry"].isoformat() + serializable_confirmations[token] = serializable_details + + with CONFIRMATION_FILE.open("w") as f: + json.dump(serializable_confirmations, f, indent=2) + except IOError as e: + print(f"Error saving confirmations file: {e}") + +# Clean up the file on startup if it exists from a previous run +if CONFIRMATION_FILE.exists(): + # print("Cleaning up stale confirmation file on startup.") + try: + CONFIRMATION_FILE.unlink() + except OSError as e: + # print(f"Warning: Could not delete stale confirmation file: {e}") # Removed print + pass # Silently ignore if cleanup fails, not critical + # ------------------------------------------------------------------------------ # Routes # ------------------------------------------------------------------------------ @@ -202,9 +251,6 @@ async def write_file(data: WriteFileRequest = Body(...)): except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to write to {data.path}: {str(e)}") - -# Removed duplicate import - already added Union at the top - @app.post( "/edit_file", response_model=Union[SuccessResponse, DiffResponse], # Use Union for multiple response types @@ -356,12 +402,15 @@ async def delete_path(data: DeletePathRequest = Body(...)): Use 'recursive=True' to delete non-empty directories. """ + pending_confirmations = load_confirmations() # Load state from file path = normalize_path(data.path) now = datetime.now(timezone.utc) # --- Step 2: Confirmation Request --- if data.confirmation_token: + # print(f"Attempting confirmation with token: {data.confirmation_token}") # Removed print if data.confirmation_token not in pending_confirmations: + # print(f"Error: Token '{data.confirmation_token}' not found in pending_confirmations.") # Removed print raise HTTPException(status_code=400, detail="Invalid or expired confirmation token.") confirmation_data = pending_confirmations[data.confirmation_token] @@ -369,6 +418,7 @@ async def delete_path(data: DeletePathRequest = Body(...)): # Validate token expiry if now > confirmation_data["expiry"]: del pending_confirmations[data.confirmation_token] # Clean up expired token + save_confirmations(pending_confirmations) # Save updated state raise HTTPException(status_code=400, detail="Confirmation token has expired.") # Validate request parameters match @@ -380,6 +430,7 @@ async def delete_path(data: DeletePathRequest = Body(...)): # --- Parameters match and token is valid: Proceed with deletion --- del pending_confirmations[data.confirmation_token] # Consume the token + save_confirmations(pending_confirmations) # Save updated state try: if not path.exists(): @@ -427,6 +478,7 @@ async def delete_path(data: DeletePathRequest = Body(...)): "recursive": data.recursive, "expiry": expiry_time, } + save_confirmations(pending_confirmations) # Save updated state # Return confirmation required response return ConfirmationRequiredResponse( @@ -444,13 +496,13 @@ async def move_path(data: MovePathRequest = Body(...)): Returns JSON success message. """ source = normalize_path(data.source_path) - destination = normalize_path(data.destination_path) # Also normalize destination + destination = normalize_path(data.destination_path) try: if not source.exists(): raise HTTPException(status_code=404, detail=f"Source path not found: {data.source_path}") - shutil.move(str(source), str(destination)) # Raises FileNotFoundError, PermissionError etc. + shutil.move(str(source), str(destination)) return SuccessResponse(message=f"Successfully moved '{data.source_path}' to '{data.destination_path}'") except PermissionError: From 2b08d17ddcd9ee95c7815cd8210753313beb721c Mon Sep 17 00:00:00 2001 From: Taylor Wilsdon Date: Thu, 17 Apr 2025 11:12:51 -0700 Subject: [PATCH 7/9] Update main.py --- servers/filesystem/main.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/servers/filesystem/main.py b/servers/filesystem/main.py index b9a4987..98ad7d8 100644 --- a/servers/filesystem/main.py +++ b/servers/filesystem/main.py @@ -11,8 +11,8 @@ from typing import List, Optional, Literal, Dict, Union # Added Dict, Union import difflib import shutil from datetime import datetime, timezone, timedelta # Added timedelta -import uuid # Added uuid -import json # Added json +import uuid +import json from config import ALLOWED_DIRECTORIES app = FastAPI( From 4de55a5c950a68e9f8701fc9304fbcc39948f3d7 Mon Sep 17 00:00:00 2001 From: Taylor Wilsdon Date: Thu, 17 Apr 2025 11:13:05 -0700 Subject: [PATCH 8/9] Update main.py --- servers/filesystem/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/servers/filesystem/main.py b/servers/filesystem/main.py index 98ad7d8..0ac2bbf 100644 --- a/servers/filesystem/main.py +++ b/servers/filesystem/main.py @@ -7,7 +7,7 @@ from pydantic import BaseModel, Field import os import pathlib import asyncio -from typing import List, Optional, Literal, Dict, Union # Added Dict, Union +from typing import List, Optional, Literal, Dict, Union import difflib import shutil from datetime import datetime, timezone, timedelta # Added timedelta From 90ab5c0686b9097c31b8931407cf7e874940fb20 Mon Sep 17 00:00:00 2001 From: Taylor Wilsdon Date: Thu, 17 Apr 2025 11:44:58 -0700 Subject: [PATCH 9/9] improve deletion flow with shorter confirmations --- servers/filesystem/main.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/servers/filesystem/main.py b/servers/filesystem/main.py index b9a4987..3393b92 100644 --- a/servers/filesystem/main.py +++ b/servers/filesystem/main.py @@ -7,12 +7,12 @@ from pydantic import BaseModel, Field import os import pathlib import asyncio -from typing import List, Optional, Literal, Dict, Union # Added Dict, Union +from typing import List, Optional, Literal, Dict, Union import difflib import shutil -from datetime import datetime, timezone, timedelta # Added timedelta -import uuid # Added uuid -import json # Added json +from datetime import datetime, timezone, timedelta +import json +import secrets from config import ALLOWED_DIRECTORIES app = FastAPI( @@ -469,7 +469,7 @@ async def delete_path(data: DeletePathRequest = Body(...)): raise HTTPException(status_code=404, detail=f"Path not found: {data.path}") # Generate token and expiry - token = uuid.uuid4().hex + token = secrets.token_hex(3)[:5] # Generate 6 hex chars (3 bytes), take first 5 expiry_time = now + timedelta(seconds=CONFIRMATION_TTL_SECONDS) # Store confirmation details @@ -481,8 +481,10 @@ async def delete_path(data: DeletePathRequest = Body(...)): save_confirmations(pending_confirmations) # Save updated state # Return confirmation required response + # Construct the user-friendly message + confirmation_message = f"`Confirm deletion of file: {data.path} with token {token}`" return ConfirmationRequiredResponse( - message="Confirmation required to delete path. Use the provided token in a subsequent request with the same parameters.", + message=confirmation_message, confirmation_token=token, expires_at=expiry_time, )