diff --git a/servers/filesystem/main.py b/servers/filesystem/main.py index cef0f8c..d1053c3 100644 --- a/servers/filesystem/main.py +++ b/servers/filesystem/main.py @@ -9,7 +9,8 @@ import pathlib import asyncio from typing import List, Optional, Literal import difflib - +import shutil +from datetime import datetime, timezone app = FastAPI( title="Secure Filesystem API", version="0.1.0", @@ -44,7 +45,12 @@ def normalize_path(requested_path: str) -> pathlib.Path: return requested raise HTTPException( status_code=403, - detail=f"Access denied: {requested} is outside allowed directories.", + detail={ + "error": "Access Denied", + "requested_path": str(requested), + "message": "Requested path is outside allowed directories.", + "allowed_directories": ALLOWED_DIRECTORIES, + }, ) @@ -106,6 +112,36 @@ class SearchFilesRequest(BaseModel): ) +class SearchContentRequest(BaseModel): + path: str = Field(..., description="Base directory to search within.") + search_query: str = Field(..., description="Text content to search for (case-insensitive).") + recursive: bool = Field( + default=True, description="Whether to search recursively in subdirectories." + ) + file_pattern: Optional[str] = Field( + default="*", description="Glob pattern to filter files to search within (e.g., '*.py')." + ) + + +class DeletePathRequest(BaseModel): + path: str = Field(..., description="Path to the file or directory to delete.") + recursive: bool = Field( + default=False, description="If true and path is a directory, delete recursively. Required if directory is not empty." + ) + confirm_delete: bool = Field( + ..., description="Must be explicitly set to true to confirm deletion." + ) + + +class MovePathRequest(BaseModel): + source_path: str = Field(..., description="The current path of the file or directory.") + destination_path: str = Field(..., description="The new path for the file or directory.") + + +class GetMetadataRequest(BaseModel): + path: str = Field(..., description="Path to the file or directory to get metadata for.") + + # ------------------------------------------------------------------------------ # Routes # ------------------------------------------------------------------------------ @@ -252,6 +288,157 @@ async def search_files(data: SearchFilesRequest = Body(...)): return {"matches": results or ["No matches found"]} +@app.post("/delete_path", response_class=PlainTextResponse, summary="Delete a file or directory") +async def delete_path(data: DeletePathRequest = Body(...)): + """ + Delete a specified file or directory. Requires explicit confirmation. + Use 'recursive=True' to delete non-empty directories. + """ + if not data.confirm_delete: + raise HTTPException( + status_code=400, + detail="Deletion not confirmed. Set 'confirm_delete' to true to proceed." + ) + + path = normalize_path(data.path) + + try: + if not path.exists(): + raise HTTPException(status_code=404, detail=f"Path not found: {data.path}") + + if path.is_file(): + path.unlink() + return f"Successfully deleted file: {data.path}" + elif path.is_dir(): + if data.recursive: + shutil.rmtree(path) + return f"Successfully deleted directory recursively: {data.path}" + else: + try: + path.rmdir() # Only works for empty directories + return f"Successfully deleted empty directory: {data.path}" + except OSError as e: + # Catch error if directory is not empty and recursive is false + raise HTTPException( + status_code=400, + detail=f"Directory not empty. Use 'recursive=True' to delete non-empty directories. Original error: {e}" + ) + else: + # Should not happen if exists() is true and it's not file/dir, but handle defensively + raise HTTPException(status_code=400, detail=f"Path is not a file or directory: {data.path}") + + except PermissionError: + raise HTTPException(status_code=403, detail=f"Permission denied to delete {data.path}") + except Exception as e: + # Catch other potential errors during deletion + raise HTTPException(status_code=500, detail=f"Failed to delete {data.path}: {e}") + + +@app.post("/move_path", response_class=PlainTextResponse, summary="Move or rename a file or directory") +async def move_path(data: MovePathRequest = Body(...)): + """ + Move or rename a file or directory from source_path to destination_path. + Both paths must be within the allowed directories. + """ + source = normalize_path(data.source_path) + destination = normalize_path(data.destination_path) # Also normalize destination + + try: + if not source.exists(): + raise HTTPException(status_code=404, detail=f"Source path not found: {data.source_path}") + + shutil.move(str(source), str(destination)) + return f"Successfully moved '{data.source_path}' to '{data.destination_path}'" + + except PermissionError: + raise HTTPException(status_code=403, detail=f"Permission denied for move operation involving '{data.source_path}' or '{data.destination_path}'") + except Exception as e: + raise HTTPException(status_code=500, detail=f"Failed to move '{data.source_path}' to '{data.destination_path}': {e}") + + +@app.post("/get_metadata", summary="Get file or directory metadata") +async def get_metadata(data: GetMetadataRequest = Body(...)): + """ + Retrieve metadata for a specified file or directory path. + """ + path = normalize_path(data.path) + + try: + if not path.exists(): + raise HTTPException(status_code=404, detail=f"Path not found: {data.path}") + + stat_result = path.stat() + + # Determine type + if path.is_file(): + file_type = "file" + elif path.is_dir(): + file_type = "directory" + else: + file_type = "other" # Should generally not happen for existing paths normalized + + # Format timestamps (use UTC for consistency) + mod_time = datetime.fromtimestamp(stat_result.st_mtime, tz=timezone.utc).isoformat() + # Creation time (st_birthtime) is macOS/BSD specific, st_ctime is metadata change time on Linux + # Use st_ctime as a fallback if st_birthtime isn't available + try: + create_time = datetime.fromtimestamp(stat_result.st_birthtime, tz=timezone.utc).isoformat() + except AttributeError: + create_time = datetime.fromtimestamp(stat_result.st_ctime, tz=timezone.utc).isoformat() + + + metadata = { + "path": str(path), + "type": file_type, + "size_bytes": stat_result.st_size, + "modification_time_utc": mod_time, + "creation_time_utc": create_time, # Note platform differences in definition + "last_metadata_change_time_utc": datetime.fromtimestamp(stat_result.st_ctime, tz=timezone.utc).isoformat(), + } + return metadata + + except PermissionError: + raise HTTPException(status_code=403, detail=f"Permission denied to access metadata for {data.path}") + except Exception as e: + raise HTTPException(status_code=500, detail=f"Failed to get metadata for {data.path}: {e}") + + +@app.post("/search_content", summary="Search for content within files") +async def search_content(data: SearchContentRequest = Body(...)): + """ + Search for text content within files in a specified directory. + """ + base_path = normalize_path(data.path) + results = [] + search_query_lower = data.search_query.lower() + + if not base_path.is_dir(): + raise HTTPException(status_code=400, detail="Provided path is not a directory") + + iterator = base_path.rglob(data.file_pattern) if data.recursive else base_path.glob(data.file_pattern) + + for item_path in iterator: + if item_path.is_file(): + try: + # Read file line by line to handle potentially large files and different encodings + with item_path.open("r", encoding="utf-8", errors="ignore") as f: + for line_num, line in enumerate(f, 1): + if search_query_lower in line.lower(): + results.append( + { + "file_path": str(item_path), + "line_number": line_num, + "line_content": line.strip(), + } + ) + except Exception as e: + # Log or handle files that cannot be read (e.g., permission errors, binary files) + print(f"Could not read or search file {item_path}: {e}") + continue + + return {"matches": results or ["No matches found"]} + + @app.get("/list_allowed_directories", summary="List access-permitted directories") async def list_allowed_directories(): """