3FS/tests/fuse/fuse_test_ci.py
2025-02-27 21:53:53 +08:00

587 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import struct
import os
import mmap
import array
import time
import threading
try:
import numpy as np
except ImportError:
np = None
file_modes = ['r', 'rb', 'r+', 'rb+', 'w', 'wb', 'w+', 'a', 'ab', 'a+', 'ab+']
path = ""
text_path = ""
binary_path = ""
def check_read(fd, path, val, desc):
if fd == None:
fd = open(path, 'r')
v = fd.read(len(val))
if v != val:
raise RuntimeError(v, val, 'Check Error: ', desc)
fd.close()
def write_file(fd, val, cls = False):
fd.write(val)
fd.flush()
if cls:
fd.close()
def print_file_data(fd, path, num):
if fd == None:
fd = open(path, 'r')
fd.read(num)
print('print_file_data', num, fd.read(num))
fd.close()
def open_test():
fd = open(text_path+'1', 'w+')
write_file(fd, 'abcdefghigk')
check_read(None, text_path+'1', 'abc', 'open w+ mode')
fd2 = open(text_path+'2', 'w+')
fd2.close()
fd = open(text_path+'1', 'r')
check_read(fd, '', 'abc', 'open r mode')
fd = open(text_path+'1', 'rb')
check_read(fd, '', b'abc', 'open rb mode')
fd = open(text_path+'1', 'r+')
check_read(fd, '', 'abc', 'open r+ mode')
fd = open(text_path+'1', 'rb+')
check_read(fd, '', b'abc', 'open rb+ mode')
#正常写文本文件
fd = open(text_path+'2_w', 'w')
write_file(fd, 'cba')
check_read(None, text_path+'2_w', 'cba', 'open w mode')
#将文本写入二进制文件
fd = open(binary_path+'2_w', 'wb')
#print_file_data(None, binary_path+'2', 6)
write_file(fd, b'efg')
check_read(None, binary_path+'2_w', 'efg', 'open wb mode')
#读写打开用同一个fd读写校验
fd = open(text_path+'2_w', 'w+')
write_file(fd, 'etc', True)
check_read(None, text_path+'2_w', 'etc', 'open w+ mode')
#追加文本文件
fd = open(text_path+'2_w', 'a')
write_file(fd, 'zzz', True)
check_read(None, text_path+'2_w', 'etczzz', 'open a mode')
#追加二进制文件
fd = open(binary_path+'2_w', 'ab')
write_file(fd, b'zzz', True)
check_read(None, binary_path+'2_w', 'efgzzz', 'open ab mode')
#读写文本文件,追加后再读取
fd = open(text_path+'2_w', 'a+')
write_file(fd, 'yyy', True)
check_read(None, text_path+'2_w', 'etczzzyyy', 'open a+ mode')
#读写二进制文件,追加后再读取
# fd = open(binary_path+'389', 'ab+')
# write_file(fd, b'xxx', True)
# check_read(None, binary_path+'389', 'xxx', 'open ab+ mode')
# fd.close()
#测试非法模式
is_err = True
try:
fd = open(binary_path+'389', 'aa')
except ValueError as err:
is_err = False
print('open_test 1: {0}'.format(err))
if is_err:
raise RuntimeError('open_test 1 Error')
os.unlink(text_path+'2_w')
print('open test over')
def seek_test():
fd = open(text_path+'1', 'r+')
#fd.seek(-1)
#获取文件长度
fd.seek(0, 2)
len = fd.tell()
print(text_path+'1', ' file_len = ', len)
#seek到文件末尾之后,是成功的
idx = fd.seek(len+2, 0)
fd.read(2)
#raise RuntimeError('seek_test len+1,', idx)
#直接seek到最后+1异常
is_err = True
try:
fd.seek(1, 2)
except ValueError as err:
is_err = False
print('seek_test 2: {0}'.format(err))
if is_err:
raise RuntimeError('seek_test 2 Error')
fd.close()
print('seek test over')
def flush_test():
fd = open(text_path+'2', 'r')
fd.flush()
fd = open(text_path+'2_w', 'wb')
fd.flush()
print('flush test over')
def next_test():
fd = open(text_path+'2_w', 'w+')
#空文件获取行
is_err = True
try:
val = next(fd)
except StopIteration as err:
is_err = False
print('next_test 1: {0}'.format(err))
if is_err:
raise RuntimeError('next_test 1 Error')
#写入不同数据,顺序读取
fd.write('abc\nttt\nbbb\nccc\nggg')
fd.flush()
fd.seek(0)
val = next(fd)
if val.strip() != 'abc':
raise RuntimeError('next_test 2 Error')
val = next(fd)
if val.strip() != 'ttt':
raise RuntimeError('next_test 3 Error')
val = next(fd)
if val.strip() != 'bbb':
raise RuntimeError('next_test 4 Error')
fd.close()
os.remove(text_path+'2_w')
print('next test over')
def read_test():
fd = open(text_path+'1', 'r+')
#读取长度为0的数据
val = fd.read(0)
if len(val) != 0:
raise RuntimeError('read_test 1,', val)
#在文件最后位置读取数据
fd.seek(0, 2)
val = fd.read(1)
if len(val) != 0:
raise RuntimeError('read_test 2 Error')
fd.close()
#在写权限的文件读取
is_err = True
fd = open(text_path+'2_w', 'w')
try:
fd.read(1)
except IOError as err:
is_err = False
print('read_test 3: {0}'.format(err))
if is_err:
raise RuntimeError('read_test 3 Error')
print('read test over')
def truncate_test():
#读权限截取
fd = open(text_path+'2_w', 'r')
is_err = True
try:
fd.truncate()
fd.close()
except IOError as err:
is_err = False
print('truncate_test 1: {0}'.format(err))
if is_err:
raise RuntimeError('truncate_test 1 Error')
#正常截取
fd = open(text_path+'2_w', 'w+')
fd.truncate()
fd.write('abcd\nefg')
#当前位置截取
fd.seek(4, 0)
fd.truncate()
fd.seek(0, 0)
val = fd.read(4)
print('val = ', val)
if val != 'abcd':
raise RuntimeError('truncate_test 2 Error')
#非法位置截取
fd.truncate(10)
fd.close()
print('truncate test over')
def tell_test():
#正常获取
fd = open(text_path+'1', 'r')
fd.seek(0, 2)
len = fd.tell()
fd.seek(1, 0)
fd.read(5)
fd.seek(0, 1)
idx = fd.tell()
if idx != 6:
raise RuntimeError('tell_test 1 Error')
#超出文件末尾获取
fd.seek(len+1, 0)
idx = fd.tell()
if idx != len+1:
raise RuntimeError('tell_test 2 Error')
print('tell test over')
def readlines_test():
fd = open(text_path+'2_w', 'w+')
#空文件获取行
val = fd.readline(-1)
fd.write('abc\nttt\nbbb\nccc\nggg')
fd.flush()
fd.seek(0, 2)
f_len = fd.tell()
fd.seek(0)
val = fd.readline(-1)
if val.strip() != 'abc':
raise RuntimeError('readlines_test 1 Error')
val = fd.readline(0)
if len(val) != 0:
raise RuntimeError('readlines_test 2 Error')
val = fd.readline(2)
if val.strip() != 'tt':
raise RuntimeError('readlines_test 3 Error')
val = fd.readline(f_len + 5)
if val.strip() != 't':
raise RuntimeError('readlines_test 4 Error')
print('readlines test over')
def mmap_test():
fd = os.open(text_path+'1', os.O_RDWR | os.O_NONBLOCK)
mm = mmap.mmap(fd, 0)
if mm[:6] != b'abcdef':
raise RuntimeError('mmap_test 1 Error')
os.close(fd)
print('mmap test over')
def os_open_test():
fd = os.open(text_path+'2_rw', os.O_RDWR | os.O_CREAT | os.O_TRUNC)
b1 = bytearray(b'123')
b2 = bytearray(b'abc')
b3 = bytearray(b'456')
bufs = []
bufs.append(b1)
bufs.append(b2)
bufs.append(b3)
os.writev(fd, bufs)
os.lseek(fd, 0, 0)
val = os.read(fd, 5)
if val != b'123ab':
raise RuntimeError('os_open_test 1 Error')
os.close(fd)
#open文件但bufsize超出长度
fd = os.open(text_path+'2_rw', os.O_RDONLY, 0)
val = os.read(fd, 20)
if val != b'123abc456':
raise RuntimeError('os_open_test 2 Error')
os.close(fd)
print('os_open test over')
def os_mkdir_test():
#创建文件夹
try:
os.mkdir(text_path+'test_file')
fd = open(text_path+'test_file/1_t', 'w')
fd.close()
#在文件夹里有文件的情况下删除
os.rmdir(text_path+'test_file')
except OSError as err:
print('os_mkdir_test 1: {0} 预期的结果'.format(err))
os.unlink(text_path+'test_file/1_t')
os.rmdir(text_path+'test_file')
if os.path.exists(text_path+'test_file2'):
os.rmdir(text_path+'test_file2')
print('os_mkdir test over')
def os_link_test():
try:
os.mkdir(text_path+'test_file')
os.mkdir(text_path+'test_file2')
fd = open(text_path+'test_file/1_t', 'w')
os.link(text_path+'test_file/1_t', text_path+'test_file2/1_t2')
except PermissionError as err:
print('os_link_test 1: {0}'.format(err))
print('os_link test over')
def os_unlink_test():
os.unlink(text_path+'test_file/1_t')
os.unlink(text_path+'test_file2/1_t2')
os.rmdir(text_path+'test_file')
os.rmdir(text_path+'test_file2')
print('os_unlink test over')
letter_ary = []
binary_ary = []
for ch in range(97, 123):
letter_ary.append(chr(ch))
binary_ary.append(ch)
def thread_check_text(start, offset, ary, text_data):
for i in range(offset):
t = (start + i) % 26
#print(t, text_data, text_data[i])
if ary[t] != text_data[i]:
raise RuntimeError('text_data check error: {0}, {1}, {2} right: {3}, real:{4}'.format( start, offset, i, ary[t], text_data[i]))
def thread_check_binary(start, offset, binary_data):
for i in range(offset):
t = (start + i + 1) % 126
if t == 0:
t = 126
if t != binary_data[i]:
raise RuntimeError('binary_data check error: start {0} offset {1} index {2} expected: {3} actual: {4}'.format( start, offset, i, t, binary_data[i]))
def thread_check_text_np(start, offset, ary, text_data):
expected = np.arange(start, start + offset, dtype=np.int32)
expected %= 26
expected += ary[0]
# actual = np.array(text_data, dtype=np.int32)
actual = np.frombuffer(text_data, dtype=np.uint8)
if not np.array_equal(expected, actual):
i = np.argmin(expected == actual)
raise RuntimeError('text_data check error: start {0} offset {1} index {2} expected: {3} actual: {4}'.format( start, offset, i, expected[i:i+16], actual[i:i+16]))
def thread_check_binary_np(start, offset, binary_data):
expected = np.arange(start + 1, start + offset + 1, dtype=np.int32)
expected %= 126
expected[expected == 0] = 126
# actual = np.array(binary_data, dtype=np.int32)
actual = np.frombuffer(binary_data, dtype=np.uint8)
if not np.array_equal(expected, actual):
i = np.argmin(expected == actual)
raise RuntimeError('binary_data check error: start {0} offset {1} index {2} expected: {3} actual: {4}'.format( start, offset, i, expected[i:i+16], actual[i:i+16]))
#文本校验
def check_text_data(start, offset, text_data):
if offset != len(text_data):
raise RuntimeError('check_text_data parameter error: offset != len(text_data), {0} {1} {2}'.format(offset, len(text_data), start))
return
if type(text_data).__name__ == 'bytes':
ary = binary_ary
else:
ary = letter_ary
if len(text_data) <= 1024:
thread_check_text(start, offset, ary, text_data)
else:
thread_check_text_np(start, offset, ary, text_data)
# p = threading.Thread(target=thread_check_text_np, args=(start, offset, ary, text_data,))
# p.start()
# for i in range(offset):
# t = (start + i) % 26
# #print(t, text_data, text_data[i])
# if ary[t] != text_data[i]:
# raise RuntimeError('text_data check error: {0}, {1}, {2} right: {3}, real:{4}'.format( start, offset, i, ary[t], text_data[i]))
#二进制校验
def check_binary_data(start, offset, binary_data):
if offset != len(binary_data):
raise RuntimeError('check_binary_data parameter error: offset != len(binary_data), {0} {1}'.format(offset, len(binary_data)))
if len(binary_data) <= 1024:
thread_check_binary(start, offset, binary_data)
else:
thread_check_binary_np(start, offset, binary_data)
# p = threading.Thread(target=thread_check_binary_np, args=(start, offset, binary_data,))
# p.start()
# for i in range(offset):
# t = (start + i + 1) % 126
# if t == 0:
# t = 126
# if t != binary_data[i]:
# raise RuntimeError('binary_data check error: {0}, {1}, {2} right: {3}, real:{4}'.format( start, offset, i, t, binary_data[i]))
def check_readv_results(file_path, read_offset, data_bufs, data_len):
index = read_offset
readbufsize = sum(len(buf) for buf in data_bufs)
filesize = os.path.getsize(file_path)
if read_offset > filesize :
assert data_len == 0, "read start exceeding file end, return size should be 0. now it's %d " % (data_len)
return
expected_return_size = readbufsize if (read_offset + readbufsize <= filesize) else (filesize - read_offset)
assert data_len == expected_return_size, "expected size %d, result return size %d" % (expected_return_size, data_len)
#print('data_bufs size = ', len(data_bufs[0]))
#print(data_bufs)
for buf in data_bufs:
l = len(buf)
if l <= data_len:
val = bytes(buf)
else:
val = bytes(buf[:data_len])
l = data_len
#print('val = ', val, data_len)
data_len -= l
#print(index, l, val[:128])
if file_path.find('binary') != -1:
check_binary_data(index, l, val)
else:
check_text_data(index, l, val)
index += l
if data_len <= 0:
break
def os_read_test_base(file_path, index, size, file_len = -1):
fd = os.open(file_path, os.O_RDONLY )
os.lseek(fd, index, 0)
val = os.read(fd, size)
os.close(fd)
readbufsize = size
filesize = os.path.getsize(file_path)
read_offset = index
data_len = len(val)
if read_offset > filesize :
assert data_len == 0, "read start exceeding file end, return size should be 0. now it's %d " % (data_len)
return
expected_return_size = readbufsize if (read_offset + readbufsize <= filesize) else (filesize - read_offset)
assert data_len == expected_return_size, "expected size %d, result return size %d" % (expected_return_size, data_len)
#print(index, l, val[:128])
if file_path.find('binary') != -1:
check_binary_data(index, data_len, val)
else:
check_text_data(index, data_len, val)
def os_readv_test_base(file_path, index, buffers_size, file_len = -1):
buffers = []
size = 0
for s in buffers_size:
buf = bytearray(s)
buffers.append(buf)
size += s
print('bufs = ', buffers_size)
fd = os.open(file_path, os.O_RDONLY ) #| os.O_DIRECT
os.lseek(fd, index, 0)
num_bytes = os.readv(fd, buffers)
os.close(fd)
check_readv_results(file_path, index, buffers, num_bytes)
def test_continuous_readv(fd, index, buffers_size):
buffers = []
size = 0
for s in buffers_size:
buf = bytearray(s)
buffers.append(buf)
size += s
print('bufs = ', buffers_size)
os.lseek(fd, index, 0)
num_bytes = os.readv(fd, buffers)
#print('buffers size = ', len(buffers), num_bytes, size, buffers)
if num_bytes == size:
for buf in buffers:
l = len(buf)
val = bytes(buf)
check_text_data(index, l, val)
index += l
else:
raise RuntimeError('os_readv_test_base error: num_bytes != size {0}, {1}'.format( num_bytes, size))
def posix_io_text():
print("=== [start test suite] posix_io_test basic funtions ===")
open_test()
seek_test()
flush_test()
next_test()
read_test()
truncate_test()
tell_test()
readlines_test()
mmap_test()
os_open_test()
os_mkdir_test()
os_link_test()
os_unlink_test()
print("=== [finish test suite] posix_io_test basic functions ===")
def os_write_run(path, open_mode, max_file_size, write_data):
fd = os.open(path, open_mode)
idx = 0
print("write started", path, max_file_size)
while idx < max_file_size:
os.write(fd, write_data)
idx += len(write_data)
os.close(fd)
print("write done", path, max_file_size)
def os_read_run(path, open_mode, max_file_size):
fd = os.open(path, open_mode)
idx = 0
data = array.array('B')
check_data = check_text_data if path.find('text') != -1 else check_binary_data
print("read started", path, max_file_size, flush=True)
while idx < max_file_size - 26:
val = os.read(fd, 26)
data.frombytes(val)
l = len(val)
idx += l
if l == 0:
print("wait for more data...")
time.sleep(0.5)
if idx % (max_file_size // 20) < 26:
print(idx * 100 // max_file_size, 'percent done', 'idx', idx, 'max file size', max_file_size, flush=True)
print("read done", path, flush=True)
os.close(fd)
print("check data", flush=True)
check_data(0, len(data), data)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("path", help="The path argument")
args = parser.parse_args()
path = args.path
text_path = os.path.join(path, "text")
binary_path = os.path.join(path, "binary")
print('Path', path)
print('posix test')
print('text_path = ', text_path)
print('binary_path = ', binary_path)
print('letter_ary = ', letter_ary)
print('binary_ary = ', binary_ary)
posix_io_text()
#
# os_read_test()
# os_readv_test()