mirror of
				https://github.com/deepseek-ai/3FS
				synced 2025-06-26 18:16:45 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			234 lines
		
	
	
		
			8.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			234 lines
		
	
	
		
			8.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import os
 | |
| import fcntl
 | |
| import errno
 | |
| import struct
 | |
| import stat
 | |
| import sys
 | |
| import pathlib
 | |
| from typing import Tuple
 | |
| 
 | |
| 
 | |
| def is_relative_to(path1, path2) -> bool:
 | |
|     try:
 | |
|         pathlib.PurePath(path1).relative_to(path2)
 | |
|         return True
 | |
|     except:
 | |
|         return False
 | |
| 
 | |
| 
 | |
| class FileSystem:
 | |
|     HF3FS_IOCTL_MAGIC_CMD = 0x80046802
 | |
|     HF3FS_IOCTL_MAGIC_NUM = 0x8F3F5FFF
 | |
| 
 | |
|     HF3FS_IOCTL_VERSION_CMD = 0x80046803
 | |
| 
 | |
|     HF3FS_IOCTL_RENAME_CMD = 0x4218680E
 | |
|     HF3FS_IOCTL_RENAME_BUFFER_SIZE = 536
 | |
| 
 | |
|     HF3FS_IOCTL_REMOVE_CMD = 0x4110680F
 | |
|     HF3FS_IOCTL_REMOVE_BUFFER_SIZE = 272
 | |
| 
 | |
|     def __init__(self, mountpoint: str) -> None:
 | |
|         self.mountpoint = os.path.realpath(mountpoint)
 | |
|         self.virt_path = os.path.join(self.mountpoint, "3fs-virt")
 | |
| 
 | |
|         # Check if the mount point is a directory
 | |
|         if not os.path.exists(self.mountpoint):
 | |
|             raise FileNotFoundError(
 | |
|                 errno.ENOENT, os.strerror(errno.ENOENT), self.mountpoint
 | |
|             )
 | |
|         if not os.path.isdir(self.mountpoint):
 | |
|             raise NotADirectoryError(
 | |
|                 errno.ENOTDIR, os.strerror(errno.ENOTDIR), self.mountpoint
 | |
|             )
 | |
| 
 | |
|         virt_fd = None
 | |
|         try:
 | |
|             # Check the 3fs-virt directory
 | |
|             virt_fd = os.open(self.virt_path, os.O_RDONLY | os.O_DIRECTORY)
 | |
|             virt_st = os.fstat(virt_fd)
 | |
|             if not stat.S_ISDIR(virt_st.st_mode):
 | |
|                 raise NotADirectoryError(
 | |
|                     errno.ENOTDIR, os.strerror(errno.ENOTDIR), self.virt_path
 | |
|                 )
 | |
|             self.st_dev = virt_st.st_dev
 | |
|             # Check the magic number
 | |
|             buffer = bytearray(4)
 | |
|             try:
 | |
|                 self._ioctl(virt_fd, FileSystem.HF3FS_IOCTL_MAGIC_CMD, buffer)
 | |
|             except OSError:
 | |
|                 raise RuntimeError(f"{self.mountpoint} is not a 3FS mount point")
 | |
|             magic_number = struct.unpack("I", buffer)[0]
 | |
|             expected_magic_number = FileSystem.HF3FS_IOCTL_MAGIC_NUM
 | |
|             if magic_number != expected_magic_number:
 | |
|                 raise RuntimeError(
 | |
|                     f"{self.mountpoint} is not a 3FS mount point, "
 | |
|                     f"magic number {magic_number:x} != {expected_magic_number:x}"
 | |
|                 )
 | |
|             # Check if the required ioctl is supported
 | |
|             ioctl_version = -1
 | |
|             try:
 | |
|                 buffer = bytearray(4)
 | |
|                 self._ioctl(virt_fd, FileSystem.HF3FS_IOCTL_VERSION_CMD, buffer)
 | |
|                 ioctl_version = int.from_bytes(buffer, sys.byteorder, signed=False)
 | |
|             except OSError:
 | |
|                 pass
 | |
|             assert ioctl_version >= 1
 | |
|         finally:
 | |
|             if virt_fd is not None:
 | |
|                 os.close(virt_fd)
 | |
| 
 | |
|     def _check_user(self):
 | |
|         if os.geteuid() == 0 or os.getegid() == 0:
 | |
|             raise RuntimeError(f"root user not allowed")
 | |
| 
 | |
|     def _encode_filename(self, name: str) -> bytes:
 | |
|         assert name and os.sep not in name, name
 | |
|         name_bytes = name.encode("utf8")
 | |
|         if len(name_bytes) > 255:
 | |
|             raise OSError(errno.ENAMETOOLONG, os.strerror(errno.ENAMETOOLONG), name)
 | |
|         return name_bytes
 | |
| 
 | |
|     def opendir(self, dir_path: str) -> Tuple[int, os.stat_result]:
 | |
|         dir_fd = os.open(dir_path, os.O_DIRECTORY | os.O_RDONLY)
 | |
|         try:
 | |
|             try:
 | |
|                 dir_st = os.fstat(dir_fd)
 | |
|             except OSError as ex:
 | |
|                 ex.filename = dir_path
 | |
|                 raise
 | |
| 
 | |
|             if not stat.S_ISDIR(dir_st.st_mode):
 | |
|                 raise NotADirectoryError(
 | |
|                     errno.ENOTDIR, os.strerror(errno.ENOTDIR), dir_path
 | |
|                 )
 | |
|             if dir_st.st_dev != self.st_dev:
 | |
|                 raise RuntimeError(f"{dir_path} is not under the 3FS mount point {self.mountpoint}")
 | |
|             if dir_st.st_ino & 0xF000000000000000:
 | |
|                 raise RuntimeError(f"{dir_path} is a virtual path")
 | |
| 
 | |
|             return dir_fd, dir_st
 | |
|         except:
 | |
|             os.close(dir_fd)
 | |
|             raise
 | |
| 
 | |
|     def split_path(self, path: str) -> Tuple[int, os.stat_result, str]:
 | |
|         filename = os.path.basename(path)
 | |
|         if not filename:
 | |
|             raise RuntimeError(f"{path} has no filename")
 | |
|         if filename in [".", ".."]:
 | |
|             raise RuntimeError(f"{path} filename is {filename}")
 | |
|         if len(filename.encode("utf8")) > 255:
 | |
|             raise OSError(errno.ENAMETOOLONG, os.strerror(errno.ENAMETOOLONG), path)
 | |
| 
 | |
|         dir = os.path.dirname(path) or "."
 | |
|         dir_fd, dir_st = self.opendir(dir)
 | |
|         return dir_fd, dir_st, filename
 | |
| 
 | |
|     def rename(self, old_path: str, new_path: str) -> None:
 | |
|         self._check_user()
 | |
|         if is_relative_to(
 | |
|             os.path.realpath(new_path), os.path.join(self.mountpoint, "trash")
 | |
|         ):
 | |
|             raise RuntimeError(f"{new_path} is in the trash")
 | |
| 
 | |
|         old_dir_fd = None
 | |
|         new_dir_fd = None
 | |
|         try:
 | |
|             old_dir_fd, old_dir_st, old_filename = self.split_path(old_path)
 | |
|             new_dir_fd, new_dir_st, new_filename = self.split_path(new_path)
 | |
| 
 | |
|             try:
 | |
|                 old_st = os.stat(old_filename, dir_fd=old_dir_fd, follow_symlinks=False)
 | |
|                 if stat.S_ISLNK(old_st.st_mode):
 | |
|                     raise RuntimeError(f"{old_path} is symlink")
 | |
|             except OSError as ex:
 | |
|                 ex.filename = old_path
 | |
|                 raise
 | |
| 
 | |
|             try:
 | |
|                 os.stat(new_filename, dir_fd=new_dir_fd, follow_symlinks=False)
 | |
|                 raise FileExistsError(errno.EEXIST, os.strerror(errno.EEXIST), new_path)
 | |
|             except FileNotFoundError:
 | |
|                 pass
 | |
|             except OSError as ex:
 | |
|                 ex.filename = new_path
 | |
|                 raise
 | |
| 
 | |
|             try:
 | |
|                 self._rename_ioctl(
 | |
|                     old_dir_fd,
 | |
|                     old_dir_st.st_ino,
 | |
|                     old_filename,
 | |
|                     new_dir_st.st_ino,
 | |
|                     new_filename,
 | |
|                     False,
 | |
|                 )
 | |
|             except OSError as ex:
 | |
|                 ex.filename = old_path
 | |
|                 ex.filename2 = new_path
 | |
|                 raise
 | |
|         finally:
 | |
|             if old_dir_fd is not None:
 | |
|                 os.close(old_dir_fd)
 | |
|             if new_dir_fd is not None:
 | |
|                 os.close(new_dir_fd)
 | |
| 
 | |
|     def _rename_ioctl(
 | |
|         self,
 | |
|         old_dir_fd: int,
 | |
|         old_dir_ino: int,
 | |
|         old_filename: str,
 | |
|         new_dir_ino: int,
 | |
|         new_filename: str,
 | |
|         move_to_trash: bool,
 | |
|     ) -> None:
 | |
|         assert old_filename and not os.path.sep in old_filename, old_filename
 | |
|         assert new_filename and not os.path.sep in new_filename, new_filename
 | |
| 
 | |
|         cmd = FileSystem.HF3FS_IOCTL_RENAME_CMD
 | |
|         buffer = struct.pack(
 | |
|             "N256sN256s?",
 | |
|             old_dir_ino,
 | |
|             self._encode_filename(old_filename),
 | |
|             new_dir_ino,
 | |
|             self._encode_filename(new_filename),
 | |
|             move_to_trash,
 | |
|         ).ljust(FileSystem.HF3FS_IOCTL_RENAME_BUFFER_SIZE)
 | |
|         self._ioctl(old_dir_fd, cmd, buffer)
 | |
| 
 | |
|     def remove(self, path: str, recursive: bool) -> None:
 | |
|         dir_fd = None
 | |
|         try:
 | |
|             dir_fd, dir_st, filename = self.split_path(path)
 | |
|             st = os.stat(filename, dir_fd=dir_fd, follow_symlinks=False)
 | |
|             if stat.S_ISLNK(st.st_mode):
 | |
|                 raise RuntimeError(f"{path} is symlink")
 | |
|             if stat.S_ISDIR(st.st_mode) and recursive:
 | |
|                 # The user must be the owner of the directory and have rwx permissions
 | |
|                 imode = stat.S_IMODE(st.st_mode)
 | |
|                 if st.st_uid != os.geteuid() or (imode & 0o700) != 0o700:
 | |
|                     raise PermissionError(errno.EPERM, os.strerror(errno.EPERM), path)
 | |
| 
 | |
|             try:
 | |
|                 self._remove_ioctl(dir_fd, dir_st.st_ino, filename, recursive)
 | |
|             except OSError as ex:
 | |
|                 ex.filename = path
 | |
|                 raise ex
 | |
|         finally:
 | |
|             if dir_fd is not None:
 | |
|                 os.close(dir_fd)
 | |
| 
 | |
|     def _remove_ioctl(
 | |
|         self, parent_fd: int, parent_ino: int, filename: str, recursive: bool
 | |
|     ) -> None:
 | |
|         assert filename and os.sep not in filename, filename
 | |
|         cmd = FileSystem.HF3FS_IOCTL_REMOVE_CMD
 | |
|         buffer = struct.pack(
 | |
|             "N256s?", parent_ino, self._encode_filename(filename), recursive
 | |
|         ).ljust(FileSystem.HF3FS_IOCTL_REMOVE_BUFFER_SIZE)
 | |
|         self._ioctl(parent_fd, cmd, buffer)
 | |
| 
 | |
|     def _ioctl(self, fd: int, cmd: int, buffer):
 | |
|         self._check_user()
 | |
|         return fcntl.ioctl(fd, cmd, buffer) |