mirror of
				https://github.com/open-webui/openapi-servers
				synced 2025-06-26 18:17:04 +00:00 
			
		
		
		
	Fix rest of json issues for all routes, implement SuccessResponse so that read calls are not attempted after move file operations
This commit is contained in:
		
							parent
							
								
									c0bb3350fb
								
							
						
					
					
						commit
						e4f51122ec
					
				| @ -41,7 +41,7 @@ ALLOWED_DIRECTORIES = [ | |||||||
| def normalize_path(requested_path: str) -> pathlib.Path: | def normalize_path(requested_path: str) -> pathlib.Path: | ||||||
|     requested = pathlib.Path(os.path.expanduser(requested_path)).resolve() |     requested = pathlib.Path(os.path.expanduser(requested_path)).resolve() | ||||||
|     for allowed in ALLOWED_DIRECTORIES: |     for allowed in ALLOWED_DIRECTORIES: | ||||||
|         if str(requested).startswith(allowed): |         if str(requested).lower().startswith(allowed.lower()): # Case-insensitive check | ||||||
|             return requested |             return requested | ||||||
|     raise HTTPException( |     raise HTTPException( | ||||||
|         status_code=403, |         status_code=403, | ||||||
| @ -147,76 +147,118 @@ class GetMetadataRequest(BaseModel): | |||||||
| # ------------------------------------------------------------------------------ | # ------------------------------------------------------------------------------ | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @app.post("/read_file", response_class=PlainTextResponse, summary="Read a file") | class SuccessResponse(BaseModel): | ||||||
|  |     message: str = Field(..., description="Success message indicating the operation was completed.") | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | class ReadFileResponse(BaseModel): | ||||||
|  |     content: str = Field(..., description="UTF-8 encoded text content of the file.") | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | class DiffResponse(BaseModel): | ||||||
|  |     diff: str = Field(..., description="Unified diff output comparing original and modified content.") | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | @app.post("/read_file", response_model=ReadFileResponse, summary="Read a file") # Changed response_class to response_model | ||||||
| async def read_file(data: ReadFileRequest = Body(...)): | async def read_file(data: ReadFileRequest = Body(...)): | ||||||
|     """ |     """ | ||||||
|     Read the entire contents of a file. |     Read the entire contents of a file and return as JSON. | ||||||
|     """ |     """ | ||||||
|     path = normalize_path(data.path) |     path = normalize_path(data.path) | ||||||
|     try: |     try: | ||||||
|         return path.read_text(encoding="utf-8") |         file_content = path.read_text(encoding="utf-8") | ||||||
|  |         return ReadFileResponse(content=file_content) # Return Pydantic model instance | ||||||
|  |     except FileNotFoundError: | ||||||
|  |         raise HTTPException(status_code=404, detail=f"File not found: {data.path}") | ||||||
|  |     except PermissionError: | ||||||
|  |          raise HTTPException(status_code=403, detail=f"Permission denied for file: {data.path}") | ||||||
|     except Exception as e: |     except Exception as e: | ||||||
|         raise HTTPException(status_code=400, detail=str(e)) |         # More specific error for generic read issues | ||||||
|  |         raise HTTPException(status_code=500, detail=f"Failed to read file {data.path}: {str(e)}") | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @app.post("/write_file", response_class=PlainTextResponse, summary="Write to a file") | @app.post("/write_file", response_model=SuccessResponse, summary="Write to a file") | ||||||
| async def write_file(data: WriteFileRequest = Body(...)): | async def write_file(data: WriteFileRequest = Body(...)): | ||||||
|     """ |     """ | ||||||
|     Write content to a file, overwriting if it exists. |     Write content to a file, overwriting if it exists. Returns JSON success message. | ||||||
|     """ |     """ | ||||||
|     path = normalize_path(data.path) |     path = normalize_path(data.path) | ||||||
|     try: |     try: | ||||||
|         path.write_text(data.content, encoding="utf-8") |         path.write_text(data.content, encoding="utf-8") | ||||||
|         return f"Successfully wrote to {data.path}" |         return SuccessResponse(message=f"Successfully wrote to {data.path}") | ||||||
|  |     except PermissionError: | ||||||
|  |         raise HTTPException(status_code=403, detail=f"Permission denied to write to {data.path}") | ||||||
|     except Exception as e: |     except Exception as e: | ||||||
|         raise HTTPException(status_code=400, detail=str(e)) |         raise HTTPException(status_code=500, detail=f"Failed to write to {data.path}: {str(e)}") | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | from typing import Union # Add this import at the top with other typing imports | ||||||
|  | 
 | ||||||
| @app.post( | @app.post( | ||||||
|     "/edit_file", response_class=PlainTextResponse, summary="Edit a file with diff" |     "/edit_file", | ||||||
|  |     response_model=Union[SuccessResponse, DiffResponse], # Use Union for multiple response types | ||||||
|  |     summary="Edit a file with diff" | ||||||
| ) | ) | ||||||
| async def edit_file(data: EditFileRequest = Body(...)): | async def edit_file(data: EditFileRequest = Body(...)): | ||||||
|     """ |     """ | ||||||
|     Apply a list of edits to a text file. Support dry-run to get unified diff. |     Apply a list of edits to a text file. | ||||||
|  |     Returns JSON success message or JSON diff on dry-run. | ||||||
|     """ |     """ | ||||||
|     path = normalize_path(data.path) |     path = normalize_path(data.path) | ||||||
|     original = path.read_text(encoding="utf-8") |     try: | ||||||
|  |         original = path.read_text(encoding="utf-8") | ||||||
|  |     except FileNotFoundError: | ||||||
|  |         raise HTTPException(status_code=404, detail=f"File not found: {data.path}") | ||||||
|  |     except PermissionError: | ||||||
|  |         raise HTTPException(status_code=403, detail=f"Permission denied to read file: {data.path}") | ||||||
|  |     except Exception as e: | ||||||
|  |         raise HTTPException(status_code=500, detail=f"Failed to read file {data.path} for editing: {str(e)}") | ||||||
|  | 
 | ||||||
|     modified = original |     modified = original | ||||||
|  |     try: | ||||||
|  |         for edit in data.edits: | ||||||
|  |             if edit.oldText not in modified: | ||||||
|  |                 raise HTTPException( | ||||||
|  |                     status_code=400, | ||||||
|  |                     detail=f"Edit failed: oldText not found in content: '{edit.oldText[:50]}...'", | ||||||
|  |                 ) | ||||||
|  |             modified = modified.replace(edit.oldText, edit.newText, 1) | ||||||
| 
 | 
 | ||||||
|     for edit in data.edits: |         if data.dryRun: | ||||||
|         if edit.oldText not in modified: |             diff_output = difflib.unified_diff( | ||||||
|             raise HTTPException( |                 original.splitlines(keepends=True), | ||||||
|                 status_code=400, |                 modified.splitlines(keepends=True), | ||||||
|                 detail=f"oldText not found in content: {edit.oldText[:50]}", |                 fromfile=f"a/{data.path}", | ||||||
|  |                 tofile=f"b/{data.path}", | ||||||
|             ) |             ) | ||||||
|         modified = modified.replace(edit.oldText, edit.newText, 1) |             return DiffResponse(diff="".join(diff_output)) # Return JSON diff | ||||||
| 
 | 
 | ||||||
|     if data.dryRun: |         # Write changes if not dry run | ||||||
|         diff = difflib.unified_diff( |         path.write_text(modified, encoding="utf-8") | ||||||
|             original.splitlines(keepends=True), |         return SuccessResponse(message=f"Successfully edited file {data.path}") # Return JSON success | ||||||
|             modified.splitlines(keepends=True), |  | ||||||
|             fromfile="original", |  | ||||||
|             tofile="modified", |  | ||||||
|         ) |  | ||||||
|         return "".join(diff) |  | ||||||
| 
 | 
 | ||||||
|     path.write_text(modified, encoding="utf-8") |     except PermissionError: | ||||||
|     return f"Successfully edited file {data.path}" |         raise HTTPException(status_code=403, detail=f"Permission denied to write edited file: {data.path}") | ||||||
|  |     except Exception as e: | ||||||
|  |         # Catch errors during writing the modified file | ||||||
|  |         raise HTTPException(status_code=500, detail=f"Failed to write edited file {data.path}: {str(e)}") | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @app.post( | @app.post( | ||||||
|     "/create_directory", response_class=PlainTextResponse, summary="Create a directory" |     "/create_directory", response_model=SuccessResponse, summary="Create a directory" | ||||||
| ) | ) | ||||||
| async def create_directory(data: CreateDirectoryRequest = Body(...)): | async def create_directory(data: CreateDirectoryRequest = Body(...)): | ||||||
|     """ |     """ | ||||||
|     Create a new directory recursively. |     Create a new directory recursively. Returns JSON success message. | ||||||
|     """ |     """ | ||||||
|     dir_path = normalize_path(data.path) |     dir_path = normalize_path(data.path) | ||||||
|     try: |     try: | ||||||
|         dir_path.mkdir(parents=True, exist_ok=True) |         dir_path.mkdir(parents=True, exist_ok=True) | ||||||
|         return f"Successfully created directory {data.path}" |         return SuccessResponse(message=f"Successfully created directory {data.path}") | ||||||
|  |     except PermissionError: | ||||||
|  |         raise HTTPException(status_code=403, detail=f"Permission denied to create directory {data.path}") | ||||||
|     except Exception as e: |     except Exception as e: | ||||||
|         raise HTTPException(status_code=400, detail=str(e)) |         raise HTTPException(status_code=500, detail=f"Failed to create directory {data.path}: {str(e)}") | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @app.post( | @app.post( | ||||||
| @ -288,11 +330,12 @@ async def search_files(data: SearchFilesRequest = Body(...)): | |||||||
|     return {"matches": results or ["No matches found"]} |     return {"matches": results or ["No matches found"]} | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @app.post("/delete_path", response_class=PlainTextResponse, summary="Delete a file or directory") | @app.post("/delete_path", response_model=SuccessResponse, summary="Delete a file or directory") | ||||||
| async def delete_path(data: DeletePathRequest = Body(...)): | async def delete_path(data: DeletePathRequest = Body(...)): | ||||||
|     """ |     """ | ||||||
|     Delete a specified file or directory. Requires explicit confirmation. |     Delete a specified file or directory. Requires explicit confirmation. | ||||||
|     Use 'recursive=True' to delete non-empty directories. |     Use 'recursive=True' to delete non-empty directories. | ||||||
|  |     Returns JSON success message. | ||||||
|     """ |     """ | ||||||
|     if not data.confirm_delete: |     if not data.confirm_delete: | ||||||
|         raise HTTPException( |         raise HTTPException( | ||||||
| @ -307,16 +350,16 @@ async def delete_path(data: DeletePathRequest = Body(...)): | |||||||
|             raise HTTPException(status_code=404, detail=f"Path not found: {data.path}") |             raise HTTPException(status_code=404, detail=f"Path not found: {data.path}") | ||||||
| 
 | 
 | ||||||
|         if path.is_file(): |         if path.is_file(): | ||||||
|             path.unlink() |             path.unlink() # Raises FileNotFoundError if not exists, PermissionError if no permission | ||||||
|             return f"Successfully deleted file: {data.path}" |             return SuccessResponse(message=f"Successfully deleted file: {data.path}") | ||||||
|         elif path.is_dir(): |         elif path.is_dir(): | ||||||
|             if data.recursive: |             if data.recursive: | ||||||
|                 shutil.rmtree(path) |                 shutil.rmtree(path) # Raises FileNotFoundError, PermissionError, NotADirectoryError | ||||||
|                 return f"Successfully deleted directory recursively: {data.path}" |                 return SuccessResponse(message=f"Successfully deleted directory recursively: {data.path}") | ||||||
|             else: |             else: | ||||||
|                 try: |                 try: | ||||||
|                     path.rmdir() # Only works for empty directories |                     path.rmdir() # Raises FileNotFoundError, OSError (e.g., dir not empty, permission denied) | ||||||
|                     return f"Successfully deleted empty directory: {data.path}" |                     return SuccessResponse(message=f"Successfully deleted empty directory: {data.path}") | ||||||
|                 except OSError as e: |                 except OSError as e: | ||||||
|                     # Catch error if directory is not empty and recursive is false |                     # Catch error if directory is not empty and recursive is false | ||||||
|                     raise HTTPException( |                     raise HTTPException( | ||||||
| @ -334,11 +377,12 @@ async def delete_path(data: DeletePathRequest = Body(...)): | |||||||
|         raise HTTPException(status_code=500, detail=f"Failed to delete {data.path}: {e}") |         raise HTTPException(status_code=500, detail=f"Failed to delete {data.path}: {e}") | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @app.post("/move_path", response_class=PlainTextResponse, summary="Move or rename a file or directory") | @app.post("/move_path", response_model=SuccessResponse, summary="Move or rename a file or directory") | ||||||
| async def move_path(data: MovePathRequest = Body(...)): | async def move_path(data: MovePathRequest = Body(...)): | ||||||
|     """ |     """ | ||||||
|     Move or rename a file or directory from source_path to destination_path. |     Move or rename a file or directory from source_path to destination_path. | ||||||
|     Both paths must be within the allowed directories. |     Both paths must be within the allowed directories. | ||||||
|  |     Returns JSON success message. | ||||||
|     """ |     """ | ||||||
|     source = normalize_path(data.source_path) |     source = normalize_path(data.source_path) | ||||||
|     destination = normalize_path(data.destination_path) # Also normalize destination |     destination = normalize_path(data.destination_path) # Also normalize destination | ||||||
| @ -347,8 +391,8 @@ async def move_path(data: MovePathRequest = Body(...)): | |||||||
|         if not source.exists(): |         if not source.exists(): | ||||||
|             raise HTTPException(status_code=404, detail=f"Source path not found: {data.source_path}") |             raise HTTPException(status_code=404, detail=f"Source path not found: {data.source_path}") | ||||||
| 
 | 
 | ||||||
|         shutil.move(str(source), str(destination)) |         shutil.move(str(source), str(destination)) # Raises FileNotFoundError, PermissionError etc. | ||||||
|         return f"Successfully moved '{data.source_path}' to '{data.destination_path}'" |         return SuccessResponse(message=f"Successfully moved '{data.source_path}' to '{data.destination_path}'") | ||||||
| 
 | 
 | ||||||
|     except PermissionError: |     except PermissionError: | ||||||
|         raise HTTPException(status_code=403, detail=f"Permission denied for move operation involving '{data.source_path}' or '{data.destination_path}'") |         raise HTTPException(status_code=403, detail=f"Permission denied for move operation involving '{data.source_path}' or '{data.destination_path}'") | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user