mirror of
				https://github.com/open-webui/openapi-servers
				synced 2025-06-26 18:17:04 +00:00 
			
		
		
		
	Refactor allowed directories to config.py; implement two-step delete_path with token-based confirmation and pending state; update related models, validation, and error handling
This commit is contained in:
		
							parent
							
								
									455993c7e1
								
							
						
					
					
						commit
						07be65fcf7
					
				
							
								
								
									
										7
									
								
								servers/filesystem/config.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										7
									
								
								servers/filesystem/config.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,7 @@ | |||||||
|  | import os | ||||||
|  | import pathlib | ||||||
|  | 
 | ||||||
|  | # Constants | ||||||
|  | ALLOWED_DIRECTORIES = [ | ||||||
|  |     str(pathlib.Path(os.path.expanduser("~/")).resolve()) | ||||||
|  | ]  # 👈 Replace with your paths | ||||||
| @ -7,10 +7,12 @@ from pydantic import BaseModel, Field | |||||||
| import os | import os | ||||||
| import pathlib | import pathlib | ||||||
| import asyncio | import asyncio | ||||||
| from typing import List, Optional, Literal | from typing import List, Optional, Union, Dict, Tuple | ||||||
| import difflib | import difflib | ||||||
| import shutil | import shutil | ||||||
| from datetime import datetime, timezone | from datetime import datetime, timezone, timedelta | ||||||
|  | import uuid | ||||||
|  | from .config import ALLOWED_DIRECTORIES | ||||||
| app = FastAPI( | app = FastAPI( | ||||||
|     title="Secure Filesystem API", |     title="Secure Filesystem API", | ||||||
|     version="0.1.0", |     version="0.1.0", | ||||||
| @ -27,11 +29,9 @@ app.add_middleware( | |||||||
|     allow_headers=["*"], |     allow_headers=["*"], | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| 
 | # In-memory store for pending confirmations. Use Redis/DB in production. | ||||||
| # Constants | # Format: {token: (DeletePathRequest_data, expiry_datetime)} | ||||||
| ALLOWED_DIRECTORIES = [ | pending_confirmations: Dict[str, Tuple['DeletePathRequest', datetime]] = {} | ||||||
|     str(pathlib.Path(os.path.expanduser("~/")).resolve()) |  | ||||||
| ]  # 👈 Replace with your paths |  | ||||||
| 
 | 
 | ||||||
| # ------------------------------------------------------------------------------ | # ------------------------------------------------------------------------------ | ||||||
| # Utility functions | # Utility functions | ||||||
| @ -41,7 +41,7 @@ ALLOWED_DIRECTORIES = [ | |||||||
| def normalize_path(requested_path: str) -> pathlib.Path: | def normalize_path(requested_path: str) -> pathlib.Path: | ||||||
|     requested = pathlib.Path(os.path.expanduser(requested_path)).resolve() |     requested = pathlib.Path(os.path.expanduser(requested_path)).resolve() | ||||||
|     for allowed in ALLOWED_DIRECTORIES: |     for allowed in ALLOWED_DIRECTORIES: | ||||||
|         if str(requested).lower().startswith(allowed.lower()): # Case-insensitive check |         if str(requested).lower().startswith(allowed.lower()): # Case-insensitive path comparison | ||||||
|             return requested |             return requested | ||||||
|     raise HTTPException( |     raise HTTPException( | ||||||
|         status_code=403, |         status_code=403, | ||||||
| @ -128,8 +128,9 @@ class DeletePathRequest(BaseModel): | |||||||
|     recursive: bool = Field( |     recursive: bool = Field( | ||||||
|         default=False, description="If true and path is a directory, delete recursively. Required if directory is not empty." |         default=False, description="If true and path is a directory, delete recursively. Required if directory is not empty." | ||||||
|     ) |     ) | ||||||
|     confirm_delete: bool = Field( |     # confirmation_token replaces the old confirm_delete flag | ||||||
|         ..., description="Must be explicitly set to true to confirm deletion." |     confirmation_token: Optional[str] = Field( | ||||||
|  |         default=None, description="A confirmation token obtained from a prior request, required to execute deletion." | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @ -150,6 +151,12 @@ class GetMetadataRequest(BaseModel): | |||||||
| class SuccessResponse(BaseModel): | class SuccessResponse(BaseModel): | ||||||
|     message: str = Field(..., description="Success message indicating the operation was completed.") |     message: str = Field(..., description="Success message indicating the operation was completed.") | ||||||
| 
 | 
 | ||||||
|  | # Response model for when confirmation is required for deletion | ||||||
|  | class ConfirmationRequiredResponse(BaseModel): | ||||||
|  |     message: str = Field(default="Confirmation required to proceed with deletion.") | ||||||
|  |     confirmation_token: str = Field(..., description="Token needed for the confirmation step.") | ||||||
|  |     expires_at: str = Field(..., description="UTC timestamp when the token expires (ISO format).") | ||||||
|  | 
 | ||||||
| 
 | 
 | ||||||
| class ReadFileResponse(BaseModel): | class ReadFileResponse(BaseModel): | ||||||
|     content: str = Field(..., description="UTF-8 encoded text content of the file.") |     content: str = Field(..., description="UTF-8 encoded text content of the file.") | ||||||
| @ -159,7 +166,7 @@ class DiffResponse(BaseModel): | |||||||
|     diff: str = Field(..., description="Unified diff output comparing original and modified content.") |     diff: str = Field(..., description="Unified diff output comparing original and modified content.") | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @app.post("/read_file", response_model=ReadFileResponse, summary="Read a file") # Changed response_class to response_model | @app.post("/read_file", response_model=ReadFileResponse, summary="Read a file") | ||||||
| async def read_file(data: ReadFileRequest = Body(...)): | async def read_file(data: ReadFileRequest = Body(...)): | ||||||
|     """ |     """ | ||||||
|     Read the entire contents of a file and return as JSON. |     Read the entire contents of a file and return as JSON. | ||||||
| @ -167,13 +174,13 @@ async def read_file(data: ReadFileRequest = Body(...)): | |||||||
|     path = normalize_path(data.path) |     path = normalize_path(data.path) | ||||||
|     try: |     try: | ||||||
|         file_content = path.read_text(encoding="utf-8") |         file_content = path.read_text(encoding="utf-8") | ||||||
|         return ReadFileResponse(content=file_content) # Return Pydantic model instance |         return ReadFileResponse(content=file_content) | ||||||
|     except FileNotFoundError: |     except FileNotFoundError: | ||||||
|         raise HTTPException(status_code=404, detail=f"File not found: {data.path}") |         raise HTTPException(status_code=404, detail=f"File not found: {data.path}") | ||||||
|     except PermissionError: |     except PermissionError: | ||||||
|          raise HTTPException(status_code=403, detail=f"Permission denied for file: {data.path}") |          raise HTTPException(status_code=403, detail=f"Permission denied for file: {data.path}") | ||||||
|     except Exception as e: |     except Exception as e: | ||||||
|         # More specific error for generic read issues |         # Catch potential read errors | ||||||
|         raise HTTPException(status_code=500, detail=f"Failed to read file {data.path}: {str(e)}") |         raise HTTPException(status_code=500, detail=f"Failed to read file {data.path}: {str(e)}") | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @ -192,11 +199,11 @@ async def write_file(data: WriteFileRequest = Body(...)): | |||||||
|         raise HTTPException(status_code=500, detail=f"Failed to write to {data.path}: {str(e)}") |         raise HTTPException(status_code=500, detail=f"Failed to write to {data.path}: {str(e)}") | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| from typing import Union # Add this import at the top with other typing imports | # Note: Union was already imported earlier | ||||||
| 
 | 
 | ||||||
| @app.post( | @app.post( | ||||||
|     "/edit_file", |     "/edit_file", | ||||||
|     response_model=Union[SuccessResponse, DiffResponse], # Use Union for multiple response types |     response_model=Union[SuccessResponse, DiffResponse], | ||||||
|     summary="Edit a file with diff" |     summary="Edit a file with diff" | ||||||
| ) | ) | ||||||
| async def edit_file(data: EditFileRequest = Body(...)): | async def edit_file(data: EditFileRequest = Body(...)): | ||||||
| @ -231,16 +238,16 @@ async def edit_file(data: EditFileRequest = Body(...)): | |||||||
|                 fromfile=f"a/{data.path}", |                 fromfile=f"a/{data.path}", | ||||||
|                 tofile=f"b/{data.path}", |                 tofile=f"b/{data.path}", | ||||||
|             ) |             ) | ||||||
|             return DiffResponse(diff="".join(diff_output)) # Return JSON diff |             return DiffResponse(diff="".join(diff_output)) | ||||||
| 
 | 
 | ||||||
|         # Write changes if not dry run |         # Apply changes if not a dry run | ||||||
|         path.write_text(modified, encoding="utf-8") |         path.write_text(modified, encoding="utf-8") | ||||||
|         return SuccessResponse(message=f"Successfully edited file {data.path}") # Return JSON success |         return SuccessResponse(message=f"Successfully edited file {data.path}") | ||||||
| 
 | 
 | ||||||
|     except PermissionError: |     except PermissionError: | ||||||
|         raise HTTPException(status_code=403, detail=f"Permission denied to write edited file: {data.path}") |         raise HTTPException(status_code=403, detail=f"Permission denied to write edited file: {data.path}") | ||||||
|     except Exception as e: |     except Exception as e: | ||||||
|         # Catch errors during writing the modified file |         # Catch potential write errors after modification | ||||||
|         raise HTTPException(status_code=500, detail=f"Failed to write edited file {data.path}: {str(e)}") |         raise HTTPException(status_code=500, detail=f"Failed to write edited file {data.path}: {str(e)}") | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @ -277,7 +284,7 @@ async def list_directory(data: ListDirectoryRequest = Body(...)): | |||||||
|         entry_type = "directory" if entry.is_dir() else "file" |         entry_type = "directory" if entry.is_dir() else "file" | ||||||
|         listing.append({"name": entry.name, "type": entry_type}) |         listing.append({"name": entry.name, "type": entry_type}) | ||||||
| 
 | 
 | ||||||
|     # Return the list directly, FastAPI will serialize it to JSON |     # FastAPI automatically serializes the list to JSON | ||||||
|     return listing |     return listing | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @ -313,7 +320,7 @@ async def search_files(data: SearchFilesRequest = Body(...)): | |||||||
| 
 | 
 | ||||||
|     for root, dirs, files in os.walk(base_path): |     for root, dirs, files in os.walk(base_path): | ||||||
|         root_path = pathlib.Path(root) |         root_path = pathlib.Path(root) | ||||||
|         # Apply exclusion patterns |         # Check if the current root directory should be excluded | ||||||
|         excluded = False |         excluded = False | ||||||
|         for pattern in data.excludePatterns: |         for pattern in data.excludePatterns: | ||||||
|             if pathlib.Path(root).match(pattern): |             if pathlib.Path(root).match(pattern): | ||||||
| @ -330,51 +337,100 @@ async def search_files(data: SearchFilesRequest = Body(...)): | |||||||
|     return {"matches": results or ["No matches found"]} |     return {"matches": results or ["No matches found"]} | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @app.post("/delete_path", response_model=SuccessResponse, summary="Delete a file or directory") | @app.post( | ||||||
|  |     "/delete_path", | ||||||
|  |     response_model=Union[SuccessResponse, ConfirmationRequiredResponse], | ||||||
|  |     summary="Delete a file or directory (two-step confirmation)" | ||||||
|  | ) | ||||||
| async def delete_path(data: DeletePathRequest = Body(...)): | async def delete_path(data: DeletePathRequest = Body(...)): | ||||||
|     """ |     """ | ||||||
|     Delete a specified file or directory. Requires explicit confirmation. |     Initiate or confirm deletion of a file or directory. | ||||||
|     Use 'recursive=True' to delete non-empty directories. | 
 | ||||||
|     Returns JSON success message. |     1.  **First Request:** Call without `confirmation_token`. Returns a `confirmation_token`. | ||||||
|  |     2.  **Second Request:** Call again with the *exact same* `path` and `recursive` parameters, | ||||||
|  |         plus the `confirmation_token` received from the first request. | ||||||
|  | 
 | ||||||
|  |     Use `recursive=True` to delete non-empty directories (requires confirmation). | ||||||
|  |     Returns `ConfirmationRequiredResponse` on first step, `SuccessResponse` on completion. | ||||||
|     """ |     """ | ||||||
|     if not data.confirm_delete: |     path = normalize_path(data.path) | ||||||
|         raise HTTPException( |     token = data.confirmation_token | ||||||
|             status_code=400, | 
 | ||||||
|             detail="Deletion not confirmed. Set 'confirm_delete' to true to proceed." |     if token is None: | ||||||
|  |         # Step 1: No token provided, generate one and request confirmation | ||||||
|  |         if not path.exists(): | ||||||
|  |              # Check existence before generating a token | ||||||
|  |              raise HTTPException(status_code=404, detail=f"Path not found: {data.path}") | ||||||
|  | 
 | ||||||
|  |         confirmation_token = str(uuid.uuid4()) | ||||||
|  |         expiry_time = datetime.now(timezone.utc) + timedelta(minutes=2) # Token valid for 2 minutes | ||||||
|  | 
 | ||||||
|  |         # Store the original request data and expiry time associated with the token | ||||||
|  |         # Important: Store data *before* normalization for validation later | ||||||
|  |         pending_confirmations[confirmation_token] = (data, expiry_time) | ||||||
|  | 
 | ||||||
|  |         return ConfirmationRequiredResponse( | ||||||
|  |             confirmation_token=confirmation_token, | ||||||
|  |             expires_at=expiry_time.isoformat() | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|     path = normalize_path(data.path) |     else: | ||||||
|  |         # Step 2: Token provided, validate and attempt deletion | ||||||
|  |         if token not in pending_confirmations: | ||||||
|  |             raise HTTPException(status_code=400, detail="Invalid or expired confirmation token.") | ||||||
| 
 | 
 | ||||||
|     try: |         stored_request_data, expiry_time = pending_confirmations[token] | ||||||
|         if not path.exists(): |  | ||||||
|             raise HTTPException(status_code=404, detail=f"Path not found: {data.path}") |  | ||||||
| 
 | 
 | ||||||
|         if path.is_file(): |         # Validate expiry | ||||||
|             path.unlink() # Raises FileNotFoundError if not exists, PermissionError if no permission |         if datetime.now(timezone.utc) > expiry_time: | ||||||
|             return SuccessResponse(message=f"Successfully deleted file: {data.path}") |             del pending_confirmations[token] # Remove expired token | ||||||
|         elif path.is_dir(): |             raise HTTPException(status_code=400, detail="Confirmation token has expired.") | ||||||
|             if data.recursive: | 
 | ||||||
|                 shutil.rmtree(path) # Raises FileNotFoundError, PermissionError, NotADirectoryError |         # Validate consistency: path and recursive flag must match the original request | ||||||
|                 return SuccessResponse(message=f"Successfully deleted directory recursively: {data.path}") |         if stored_request_data.path != data.path or stored_request_data.recursive != data.recursive: | ||||||
|  |              del pending_confirmations[token] # Remove token due to mismatch | ||||||
|  |              raise HTTPException(status_code=400, detail="Request parameters do not match the original confirmation request.") | ||||||
|  | 
 | ||||||
|  |         # --- Proceed with Deletion --- | ||||||
|  |         try: | ||||||
|  |             if not path.exists(): | ||||||
|  |                 # Re-check existence in case it was deleted between steps | ||||||
|  |                 raise HTTPException(status_code=404, detail=f"Path not found (may have been deleted after confirmation): {data.path}") | ||||||
|  | 
 | ||||||
|  |             if path.is_file(): | ||||||
|  |                 path.unlink() | ||||||
|  |                 message = f"Successfully deleted file: {data.path}" | ||||||
|  |             elif path.is_dir(): | ||||||
|  |                 if data.recursive: | ||||||
|  |                     shutil.rmtree(path) | ||||||
|  |                     message = f"Successfully deleted directory recursively: {data.path}" | ||||||
|  |                 else: | ||||||
|  |                     try: | ||||||
|  |                         path.rmdir() | ||||||
|  |                         message = f"Successfully deleted empty directory: {data.path}" | ||||||
|  |                     except OSError as e: | ||||||
|  |                         raise HTTPException( | ||||||
|  |                             status_code=400, | ||||||
|  |                             detail=f"Directory not empty. Use 'recursive=True' to delete non-empty directories. Original error: {e}" | ||||||
|  |                         ) | ||||||
|             else: |             else: | ||||||
|                 try: |                  raise HTTPException(status_code=400, detail=f"Path is not a file or directory: {data.path}") | ||||||
|                     path.rmdir() # Raises FileNotFoundError, OSError (e.g., dir not empty, permission denied) |  | ||||||
|                     return SuccessResponse(message=f"Successfully deleted empty directory: {data.path}") |  | ||||||
|                 except OSError as e: |  | ||||||
|                     # Catch error if directory is not empty and recursive is false |  | ||||||
|                     raise HTTPException( |  | ||||||
|                         status_code=400, |  | ||||||
|                         detail=f"Directory not empty. Use 'recursive=True' to delete non-empty directories. Original error: {e}" |  | ||||||
|                     ) |  | ||||||
|         else: |  | ||||||
|              # Should not happen if exists() is true and it's not file/dir, but handle defensively |  | ||||||
|              raise HTTPException(status_code=400, detail=f"Path is not a file or directory: {data.path}") |  | ||||||
| 
 | 
 | ||||||
|     except PermissionError: |             # Deletion successful, remove the used token | ||||||
|         raise HTTPException(status_code=403, detail=f"Permission denied to delete {data.path}") |             del pending_confirmations[token] | ||||||
|     except Exception as e: |             return SuccessResponse(message=message) | ||||||
|         # Catch other potential errors during deletion | 
 | ||||||
|         raise HTTPException(status_code=500, detail=f"Failed to delete {data.path}: {e}") |         except PermissionError: | ||||||
|  |             # Don't remove token on permission error; user might fix permissions and retry | ||||||
|  |             raise HTTPException(status_code=403, detail=f"Permission denied to delete {data.path}") | ||||||
|  |         except HTTPException as http_exc: | ||||||
|  |              # If an expected HTTP error occurred during deletion (e.g., dir not empty), remove the token | ||||||
|  |              del pending_confirmations[token] | ||||||
|  |              raise http_exc # Re-raise the specific HTTP exception | ||||||
|  |         except Exception as e: | ||||||
|  |             # Catch unexpected errors during deletion, remove the token | ||||||
|  |             del pending_confirmations[token] | ||||||
|  |             raise HTTPException(status_code=500, detail=f"Failed to delete {data.path}: {e}") | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @app.post("/move_path", response_model=SuccessResponse, summary="Move or rename a file or directory") | @app.post("/move_path", response_model=SuccessResponse, summary="Move or rename a file or directory") | ||||||
| @ -385,13 +441,13 @@ async def move_path(data: MovePathRequest = Body(...)): | |||||||
|     Returns JSON success message. |     Returns JSON success message. | ||||||
|     """ |     """ | ||||||
|     source = normalize_path(data.source_path) |     source = normalize_path(data.source_path) | ||||||
|     destination = normalize_path(data.destination_path) # Also normalize destination |     destination = normalize_path(data.destination_path) # Normalize destination path too | ||||||
| 
 | 
 | ||||||
|     try: |     try: | ||||||
|         if not source.exists(): |         if not source.exists(): | ||||||
|             raise HTTPException(status_code=404, detail=f"Source path not found: {data.source_path}") |             raise HTTPException(status_code=404, detail=f"Source path not found: {data.source_path}") | ||||||
| 
 | 
 | ||||||
|         shutil.move(str(source), str(destination)) # Raises FileNotFoundError, PermissionError etc. |         shutil.move(str(source), str(destination)) # Handles both file and directory moves | ||||||
|         return SuccessResponse(message=f"Successfully moved '{data.source_path}' to '{data.destination_path}'") |         return SuccessResponse(message=f"Successfully moved '{data.source_path}' to '{data.destination_path}'") | ||||||
| 
 | 
 | ||||||
|     except PermissionError: |     except PermissionError: | ||||||
| @ -413,18 +469,17 @@ async def get_metadata(data: GetMetadataRequest = Body(...)): | |||||||
| 
 | 
 | ||||||
|         stat_result = path.stat() |         stat_result = path.stat() | ||||||
| 
 | 
 | ||||||
|         # Determine type |         # Determine if it's a file or directory | ||||||
|         if path.is_file(): |         if path.is_file(): | ||||||
|             file_type = "file" |             file_type = "file" | ||||||
|         elif path.is_dir(): |         elif path.is_dir(): | ||||||
|             file_type = "directory" |             file_type = "directory" | ||||||
|         else: |         else: | ||||||
|             file_type = "other" # Should generally not happen for existing paths normalized |             file_type = "other" # Should generally not happen for existing, normalized paths | ||||||
| 
 | 
 | ||||||
|         # Format timestamps (use UTC for consistency) |         # Format timestamps to ISO 8601 UTC for consistency | ||||||
|         mod_time = datetime.fromtimestamp(stat_result.st_mtime, tz=timezone.utc).isoformat() |         mod_time = datetime.fromtimestamp(stat_result.st_mtime, tz=timezone.utc).isoformat() | ||||||
|         # Creation time (st_birthtime) is macOS/BSD specific, st_ctime is metadata change time on Linux |         # Get creation time (st_birthtime on macOS/BSD, fallback to st_ctime on Linux) | ||||||
|         # Use st_ctime as a fallback if st_birthtime isn't available |  | ||||||
|         try: |         try: | ||||||
|             create_time = datetime.fromtimestamp(stat_result.st_birthtime, tz=timezone.utc).isoformat() |             create_time = datetime.fromtimestamp(stat_result.st_birthtime, tz=timezone.utc).isoformat() | ||||||
|         except AttributeError: |         except AttributeError: | ||||||
| @ -436,7 +491,7 @@ async def get_metadata(data: GetMetadataRequest = Body(...)): | |||||||
|             "type": file_type, |             "type": file_type, | ||||||
|             "size_bytes": stat_result.st_size, |             "size_bytes": stat_result.st_size, | ||||||
|             "modification_time_utc": mod_time, |             "modification_time_utc": mod_time, | ||||||
|             "creation_time_utc": create_time, # Note platform differences in definition |             "creation_time_utc": create_time, # Note: Definition varies by OS (birthtime vs. ctime) | ||||||
|             "last_metadata_change_time_utc": datetime.fromtimestamp(stat_result.st_ctime, tz=timezone.utc).isoformat(), |             "last_metadata_change_time_utc": datetime.fromtimestamp(stat_result.st_ctime, tz=timezone.utc).isoformat(), | ||||||
|         } |         } | ||||||
|         return metadata |         return metadata | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user