mirror of
				https://github.com/open-webui/openapi-servers
				synced 2025-06-26 18:17:04 +00:00 
			
		
		
		
	Implement more reliable mechanism for file delete confirmation
This commit is contained in:
		
							parent
							
								
									2110943fde
								
							
						
					
					
						commit
						b0b1e6eb6d
					
				| @ -7,12 +7,11 @@ from pydantic import BaseModel, Field | ||||
| import os | ||||
| import pathlib | ||||
| import asyncio | ||||
| from typing import List, Optional, Union, Dict, Tuple | ||||
| from typing import List, Optional, Literal | ||||
| import difflib | ||||
| import shutil | ||||
| from datetime import datetime, timezone, timedelta | ||||
| import uuid | ||||
| from config import ALLOWED_DIRECTORIES | ||||
| from datetime import datetime, timezone | ||||
| from .config import ALLOWED_DIRECTORIES | ||||
| app = FastAPI( | ||||
|     title="Secure Filesystem API", | ||||
|     version="0.1.0", | ||||
| @ -29,10 +28,6 @@ app.add_middleware( | ||||
|     allow_headers=["*"], | ||||
| ) | ||||
| 
 | ||||
| # In-memory store for pending confirmations. Use Redis/DB in production. | ||||
| # Format: {token: (DeletePathRequest_data, expiry_datetime)} | ||||
| pending_confirmations: Dict[str, Tuple['DeletePathRequest', datetime]] = {} | ||||
| 
 | ||||
| # ------------------------------------------------------------------------------ | ||||
| # Utility functions | ||||
| # ------------------------------------------------------------------------------ | ||||
| @ -41,7 +36,7 @@ pending_confirmations: Dict[str, Tuple['DeletePathRequest', datetime]] = {} | ||||
| def normalize_path(requested_path: str) -> pathlib.Path: | ||||
|     requested = pathlib.Path(os.path.expanduser(requested_path)).resolve() | ||||
|     for allowed in ALLOWED_DIRECTORIES: | ||||
|         if str(requested).lower().startswith(allowed.lower()): # Case-insensitive path comparison | ||||
|         if str(requested).lower().startswith(allowed.lower()): # Case-insensitive check | ||||
|             return requested | ||||
|     raise HTTPException( | ||||
|         status_code=403, | ||||
| @ -128,9 +123,8 @@ class DeletePathRequest(BaseModel): | ||||
|     recursive: bool = Field( | ||||
|         default=False, description="If true and path is a directory, delete recursively. Required if directory is not empty." | ||||
|     ) | ||||
|     # confirmation_token replaces the old confirm_delete flag | ||||
|     confirmation_token: Optional[str] = Field( | ||||
|         default=None, description="A confirmation token obtained from a prior request, required to execute deletion." | ||||
|     confirm_delete: bool = Field( | ||||
|         ..., description="Must be explicitly set to true to confirm deletion." | ||||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| @ -151,12 +145,6 @@ class GetMetadataRequest(BaseModel): | ||||
| class SuccessResponse(BaseModel): | ||||
|     message: str = Field(..., description="Success message indicating the operation was completed.") | ||||
| 
 | ||||
| # Response model for when confirmation is required for deletion | ||||
| class ConfirmationRequiredResponse(BaseModel): | ||||
|     message: str = Field(default="Confirmation required to proceed with deletion.") | ||||
|     confirmation_token: str = Field(..., description="Token needed for the confirmation step.") | ||||
|     expires_at: str = Field(..., description="UTC timestamp when the token expires (ISO format).") | ||||
| 
 | ||||
| 
 | ||||
| class ReadFileResponse(BaseModel): | ||||
|     content: str = Field(..., description="UTF-8 encoded text content of the file.") | ||||
| @ -166,7 +154,7 @@ class DiffResponse(BaseModel): | ||||
|     diff: str = Field(..., description="Unified diff output comparing original and modified content.") | ||||
| 
 | ||||
| 
 | ||||
| @app.post("/read_file", response_model=ReadFileResponse, summary="Read a file") | ||||
| @app.post("/read_file", response_model=ReadFileResponse, summary="Read a file") # Changed response_class to response_model | ||||
| async def read_file(data: ReadFileRequest = Body(...)): | ||||
|     """ | ||||
|     Read the entire contents of a file and return as JSON. | ||||
| @ -174,13 +162,13 @@ async def read_file(data: ReadFileRequest = Body(...)): | ||||
|     path = normalize_path(data.path) | ||||
|     try: | ||||
|         file_content = path.read_text(encoding="utf-8") | ||||
|         return ReadFileResponse(content=file_content) | ||||
|         return ReadFileResponse(content=file_content) # Return Pydantic model instance | ||||
|     except FileNotFoundError: | ||||
|         raise HTTPException(status_code=404, detail=f"File not found: {data.path}") | ||||
|     except PermissionError: | ||||
|          raise HTTPException(status_code=403, detail=f"Permission denied for file: {data.path}") | ||||
|     except Exception as e: | ||||
|         # Catch potential read errors | ||||
|         # More specific error for generic read issues | ||||
|         raise HTTPException(status_code=500, detail=f"Failed to read file {data.path}: {str(e)}") | ||||
| 
 | ||||
| 
 | ||||
| @ -199,11 +187,11 @@ async def write_file(data: WriteFileRequest = Body(...)): | ||||
|         raise HTTPException(status_code=500, detail=f"Failed to write to {data.path}: {str(e)}") | ||||
| 
 | ||||
| 
 | ||||
| # Note: Union was already imported earlier | ||||
| from typing import Union # Add this import at the top with other typing imports | ||||
| 
 | ||||
| @app.post( | ||||
|     "/edit_file", | ||||
|     response_model=Union[SuccessResponse, DiffResponse], | ||||
|     response_model=Union[SuccessResponse, DiffResponse], # Use Union for multiple response types | ||||
|     summary="Edit a file with diff" | ||||
| ) | ||||
| async def edit_file(data: EditFileRequest = Body(...)): | ||||
| @ -238,16 +226,16 @@ async def edit_file(data: EditFileRequest = Body(...)): | ||||
|                 fromfile=f"a/{data.path}", | ||||
|                 tofile=f"b/{data.path}", | ||||
|             ) | ||||
|             return DiffResponse(diff="".join(diff_output)) | ||||
|             return DiffResponse(diff="".join(diff_output)) # Return JSON diff | ||||
| 
 | ||||
|         # Apply changes if not a dry run | ||||
|         # Write changes if not dry run | ||||
|         path.write_text(modified, encoding="utf-8") | ||||
|         return SuccessResponse(message=f"Successfully edited file {data.path}") | ||||
|         return SuccessResponse(message=f"Successfully edited file {data.path}") # Return JSON success | ||||
| 
 | ||||
|     except PermissionError: | ||||
|         raise HTTPException(status_code=403, detail=f"Permission denied to write edited file: {data.path}") | ||||
|     except Exception as e: | ||||
|         # Catch potential write errors after modification | ||||
|         # Catch errors during writing the modified file | ||||
|         raise HTTPException(status_code=500, detail=f"Failed to write edited file {data.path}: {str(e)}") | ||||
| 
 | ||||
| 
 | ||||
| @ -284,7 +272,7 @@ async def list_directory(data: ListDirectoryRequest = Body(...)): | ||||
|         entry_type = "directory" if entry.is_dir() else "file" | ||||
|         listing.append({"name": entry.name, "type": entry_type}) | ||||
| 
 | ||||
|     # FastAPI automatically serializes the list to JSON | ||||
|     # Return the list directly, FastAPI will serialize it to JSON | ||||
|     return listing | ||||
| 
 | ||||
| 
 | ||||
| @ -320,7 +308,7 @@ async def search_files(data: SearchFilesRequest = Body(...)): | ||||
| 
 | ||||
|     for root, dirs, files in os.walk(base_path): | ||||
|         root_path = pathlib.Path(root) | ||||
|         # Check if the current root directory should be excluded | ||||
|         # Apply exclusion patterns | ||||
|         excluded = False | ||||
|         for pattern in data.excludePatterns: | ||||
|             if pathlib.Path(root).match(pattern): | ||||
| @ -337,100 +325,51 @@ async def search_files(data: SearchFilesRequest = Body(...)): | ||||
|     return {"matches": results or ["No matches found"]} | ||||
| 
 | ||||
| 
 | ||||
| @app.post( | ||||
|     "/delete_path", | ||||
|     response_model=Union[SuccessResponse, ConfirmationRequiredResponse], | ||||
|     summary="Delete a file or directory (two-step confirmation)" | ||||
| ) | ||||
| @app.post("/delete_path", response_model=SuccessResponse, summary="Delete a file or directory") | ||||
| async def delete_path(data: DeletePathRequest = Body(...)): | ||||
|     """ | ||||
|     Initiate or confirm deletion of a file or directory. | ||||
| 
 | ||||
|     1.  **First Request:** Call without `confirmation_token`. Returns a `confirmation_token`. | ||||
|     2.  **Second Request:** Call again with the *exact same* `path` and `recursive` parameters, | ||||
|         plus the `confirmation_token` received from the first request. | ||||
| 
 | ||||
|     Use `recursive=True` to delete non-empty directories (requires confirmation). | ||||
|     Returns `ConfirmationRequiredResponse` on first step, `SuccessResponse` on completion. | ||||
|     Delete a specified file or directory. Requires explicit confirmation. | ||||
|     Use 'recursive=True' to delete non-empty directories. | ||||
|     Returns JSON success message. | ||||
|     """ | ||||
|     path = normalize_path(data.path) | ||||
|     token = data.confirmation_token | ||||
| 
 | ||||
|     if token is None: | ||||
|         # Step 1: No token provided, generate one and request confirmation | ||||
|         if not path.exists(): | ||||
|              # Check existence before generating a token | ||||
|              raise HTTPException(status_code=404, detail=f"Path not found: {data.path}") | ||||
| 
 | ||||
|         confirmation_token = str(uuid.uuid4()) | ||||
|         expiry_time = datetime.now(timezone.utc) + timedelta(minutes=2) # Token valid for 2 minutes | ||||
| 
 | ||||
|         # Store the original request data and expiry time associated with the token | ||||
|         # Important: Store data *before* normalization for validation later | ||||
|         pending_confirmations[confirmation_token] = (data, expiry_time) | ||||
| 
 | ||||
|         return ConfirmationRequiredResponse( | ||||
|             confirmation_token=confirmation_token, | ||||
|             expires_at=expiry_time.isoformat() | ||||
|     if not data.confirm_delete: | ||||
|         raise HTTPException( | ||||
|             status_code=400, | ||||
|             detail="Deletion not confirmed. Set 'confirm_delete' to true to proceed." | ||||
|         ) | ||||
| 
 | ||||
|     else: | ||||
|         # Step 2: Token provided, validate and attempt deletion | ||||
|         if token not in pending_confirmations: | ||||
|             raise HTTPException(status_code=400, detail="Invalid or expired confirmation token.") | ||||
|     path = normalize_path(data.path) | ||||
| 
 | ||||
|         stored_request_data, expiry_time = pending_confirmations[token] | ||||
|     try: | ||||
|         if not path.exists(): | ||||
|             raise HTTPException(status_code=404, detail=f"Path not found: {data.path}") | ||||
| 
 | ||||
|         # Validate expiry | ||||
|         if datetime.now(timezone.utc) > expiry_time: | ||||
|             del pending_confirmations[token] # Remove expired token | ||||
|             raise HTTPException(status_code=400, detail="Confirmation token has expired.") | ||||
| 
 | ||||
|         # Validate consistency: path and recursive flag must match the original request | ||||
|         if stored_request_data.path != data.path or stored_request_data.recursive != data.recursive: | ||||
|              del pending_confirmations[token] # Remove token due to mismatch | ||||
|              raise HTTPException(status_code=400, detail="Request parameters do not match the original confirmation request.") | ||||
| 
 | ||||
|         # --- Proceed with Deletion --- | ||||
|         try: | ||||
|             if not path.exists(): | ||||
|                 # Re-check existence in case it was deleted between steps | ||||
|                 raise HTTPException(status_code=404, detail=f"Path not found (may have been deleted after confirmation): {data.path}") | ||||
| 
 | ||||
|             if path.is_file(): | ||||
|                 path.unlink() | ||||
|                 message = f"Successfully deleted file: {data.path}" | ||||
|             elif path.is_dir(): | ||||
|                 if data.recursive: | ||||
|                     shutil.rmtree(path) | ||||
|                     message = f"Successfully deleted directory recursively: {data.path}" | ||||
|                 else: | ||||
|                     try: | ||||
|                         path.rmdir() | ||||
|                         message = f"Successfully deleted empty directory: {data.path}" | ||||
|                     except OSError as e: | ||||
|                         raise HTTPException( | ||||
|                             status_code=400, | ||||
|                             detail=f"Directory not empty. Use 'recursive=True' to delete non-empty directories. Original error: {e}" | ||||
|                         ) | ||||
|         if path.is_file(): | ||||
|             path.unlink() # Raises FileNotFoundError if not exists, PermissionError if no permission | ||||
|             return SuccessResponse(message=f"Successfully deleted file: {data.path}") | ||||
|         elif path.is_dir(): | ||||
|             if data.recursive: | ||||
|                 shutil.rmtree(path) # Raises FileNotFoundError, PermissionError, NotADirectoryError | ||||
|                 return SuccessResponse(message=f"Successfully deleted directory recursively: {data.path}") | ||||
|             else: | ||||
|                  raise HTTPException(status_code=400, detail=f"Path is not a file or directory: {data.path}") | ||||
|                 try: | ||||
|                     path.rmdir() # Raises FileNotFoundError, OSError (e.g., dir not empty, permission denied) | ||||
|                     return SuccessResponse(message=f"Successfully deleted empty directory: {data.path}") | ||||
|                 except OSError as e: | ||||
|                     # Catch error if directory is not empty and recursive is false | ||||
|                     raise HTTPException( | ||||
|                         status_code=400, | ||||
|                         detail=f"Directory not empty. Use 'recursive=True' to delete non-empty directories. Original error: {e}" | ||||
|                     ) | ||||
|         else: | ||||
|              # Should not happen if exists() is true and it's not file/dir, but handle defensively | ||||
|              raise HTTPException(status_code=400, detail=f"Path is not a file or directory: {data.path}") | ||||
| 
 | ||||
|             # Deletion successful, remove the used token | ||||
|             del pending_confirmations[token] | ||||
|             return SuccessResponse(message=message) | ||||
| 
 | ||||
|         except PermissionError: | ||||
|             # Don't remove token on permission error; user might fix permissions and retry | ||||
|             raise HTTPException(status_code=403, detail=f"Permission denied to delete {data.path}") | ||||
|         except HTTPException as http_exc: | ||||
|              # If an expected HTTP error occurred during deletion (e.g., dir not empty), remove the token | ||||
|              del pending_confirmations[token] | ||||
|              raise http_exc # Re-raise the specific HTTP exception | ||||
|         except Exception as e: | ||||
|             # Catch unexpected errors during deletion, remove the token | ||||
|             del pending_confirmations[token] | ||||
|             raise HTTPException(status_code=500, detail=f"Failed to delete {data.path}: {e}") | ||||
|     except PermissionError: | ||||
|         raise HTTPException(status_code=403, detail=f"Permission denied to delete {data.path}") | ||||
|     except Exception as e: | ||||
|         # Catch other potential errors during deletion | ||||
|         raise HTTPException(status_code=500, detail=f"Failed to delete {data.path}: {e}") | ||||
| 
 | ||||
| 
 | ||||
| @app.post("/move_path", response_model=SuccessResponse, summary="Move or rename a file or directory") | ||||
| @ -441,13 +380,13 @@ async def move_path(data: MovePathRequest = Body(...)): | ||||
|     Returns JSON success message. | ||||
|     """ | ||||
|     source = normalize_path(data.source_path) | ||||
|     destination = normalize_path(data.destination_path) # Normalize destination path too | ||||
|     destination = normalize_path(data.destination_path) # Also normalize destination | ||||
| 
 | ||||
|     try: | ||||
|         if not source.exists(): | ||||
|             raise HTTPException(status_code=404, detail=f"Source path not found: {data.source_path}") | ||||
| 
 | ||||
|         shutil.move(str(source), str(destination)) # Handles both file and directory moves | ||||
|         shutil.move(str(source), str(destination)) # Raises FileNotFoundError, PermissionError etc. | ||||
|         return SuccessResponse(message=f"Successfully moved '{data.source_path}' to '{data.destination_path}'") | ||||
| 
 | ||||
|     except PermissionError: | ||||
| @ -469,17 +408,18 @@ async def get_metadata(data: GetMetadataRequest = Body(...)): | ||||
| 
 | ||||
|         stat_result = path.stat() | ||||
| 
 | ||||
|         # Determine if it's a file or directory | ||||
|         # Determine type | ||||
|         if path.is_file(): | ||||
|             file_type = "file" | ||||
|         elif path.is_dir(): | ||||
|             file_type = "directory" | ||||
|         else: | ||||
|             file_type = "other" # Should generally not happen for existing, normalized paths | ||||
|             file_type = "other" # Should generally not happen for existing paths normalized | ||||
| 
 | ||||
|         # Format timestamps to ISO 8601 UTC for consistency | ||||
|         # Format timestamps (use UTC for consistency) | ||||
|         mod_time = datetime.fromtimestamp(stat_result.st_mtime, tz=timezone.utc).isoformat() | ||||
|         # Get creation time (st_birthtime on macOS/BSD, fallback to st_ctime on Linux) | ||||
|         # Creation time (st_birthtime) is macOS/BSD specific, st_ctime is metadata change time on Linux | ||||
|         # Use st_ctime as a fallback if st_birthtime isn't available | ||||
|         try: | ||||
|             create_time = datetime.fromtimestamp(stat_result.st_birthtime, tz=timezone.utc).isoformat() | ||||
|         except AttributeError: | ||||
| @ -491,7 +431,7 @@ async def get_metadata(data: GetMetadataRequest = Body(...)): | ||||
|             "type": file_type, | ||||
|             "size_bytes": stat_result.st_size, | ||||
|             "modification_time_utc": mod_time, | ||||
|             "creation_time_utc": create_time, # Note: Definition varies by OS (birthtime vs. ctime) | ||||
|             "creation_time_utc": create_time, # Note platform differences in definition | ||||
|             "last_metadata_change_time_utc": datetime.fromtimestamp(stat_result.st_ctime, tz=timezone.utc).isoformat(), | ||||
|         } | ||||
|         return metadata | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user