This commit is contained in:
Runji Wang
2025-02-25 18:16:31 +08:00
commit 770aa417d5
77 changed files with 18785 additions and 0 deletions

0
examples/__init__.py Normal file
View File

225
examples/fstest.py Normal file
View File

@@ -0,0 +1,225 @@
# Test the correctness of file system read and write.
#
# This script runs multiple tasks to write and read data to/from the file system.
# Each task writes to an individual file in the given directory.
# Then it reads the data back and verifies the correctness.
import argparse
import glob
import logging
import os
import random
import time
from typing import Any, Dict, Iterator, Optional, Tuple, Union
import numpy as np
import smallpond
from smallpond.dataframe import Session
def fswrite(
path: str,
size: int,
blocksize: Union[int, Tuple[int, int]],
) -> Dict[str, Any]:
t0 = time.time()
with open(path, "wb") as f:
for start, length in iter_io_slice(0, size, blocksize):
logging.debug(f"writing {length} bytes at offset {start}")
f.write(generate_data(start, length))
t1 = time.time()
elapsed = t1 - t0
logging.info(f"write done: {path} in {elapsed:.2f}s")
return {
"path": path,
"size": size,
"elapsed(s)": elapsed,
"throughput(MB/s)": size / elapsed / 1024 / 1024,
}
def fsread(
path: str,
blocksize: Union[int, Tuple[int, int]],
randread: bool,
) -> Dict[str, Any]:
t0 = time.time()
with open(path, "rb") as f:
size = os.path.getsize(path)
slices = list(iter_io_slice(0, size, blocksize))
if randread:
random.shuffle(slices)
for start, length in slices:
logging.debug(f"reading {length} bytes at offset {start}")
f.seek(start)
data = f.read(length)
expected_data = generate_data(start, length)
check_data(data, expected_data, start)
t1 = time.time()
elapsed = t1 - t0
logging.info(f"read done: {path} in {elapsed:.2f}s")
return {
"path": path,
"size": size,
"elapsed(s)": elapsed,
"throughput(MB/s)": size / elapsed / 1024 / 1024,
}
def check_data(actual: bytes, expected: bytes, offset: int) -> None:
"""
Check if the expected data matches the actual data. Raise an error if there is a mismatch.
"""
if expected == actual:
return
# find the first mismatch
index = next(
(i for i, (b1, b2) in enumerate(zip(actual, expected)) if b1 != b2),
min(len(actual), len(expected)),
)
expected = expected[index : index + 16]
actual = actual[index : index + 16]
raise ValueError(
f"Data mismatch at offset {offset + index}.\nexpect: {expected}\nactual: {actual}"
)
def generate_data(offset: int, length: int) -> bytes:
"""
Generate data for the slice [offset, offset + length).
The full data is a repeated sequence of [0x00000000, 0x00000001, ..., 0xffffffff] in little-endian.
"""
istart = offset // 4
iend = (offset + length + 3) // 4
return (
np.arange(istart, iend)
.astype(np.uint32)
.tobytes()[offset % 4 : offset % 4 + length]
)
def iter_io_slice(
offset: int, length: int, block_size: Union[int, Tuple[int, int]]
) -> Iterator[Tuple[int, int]]:
"""
Generate the IO (offset, size) for the slice [offset, offset + length) with the given block size.
`block_size` can be an integer or a range [start, end]. If a range is provided, the IO size will be randomly selected from the range.
"""
start = offset
end = offset + length
while start < end:
if isinstance(block_size, int):
size = block_size
else:
smin, smax = block_size
size = random.randint(smin, smax)
size = min(size, end - start)
yield (start, size)
start += size
def size_str_to_bytes(size_str: str) -> int:
"""
Parse size string to bytes.
e.g. 1k -> 1024, 1M -> 1024^2, 1G -> 1024^3, 1T -> 1024^4
"""
if size_str.endswith("k"):
return int(size_str[:-1]) * 1024
elif size_str.endswith("M"):
return int(size_str[:-1]) * 1024 * 1024
elif size_str.endswith("G"):
return int(size_str[:-1]) * 1024 * 1024 * 1024
elif size_str.endswith("T"):
return int(size_str[:-1]) * 1024 * 1024 * 1024 * 1024
else:
return int(size_str)
def fstest(
sp: Session,
input_path: Optional[str],
output_path: Optional[str],
size: Optional[str],
npartitions: int,
blocksize: Optional[str] = "4k",
blocksize_range: Optional[str] = None,
randread: bool = False,
) -> None:
# preprocess arguments
if output_path is not None and size is None:
raise ValueError("--size is required if --output_path is provided")
if size is not None:
size = size_str_to_bytes(size)
if blocksize_range is not None:
start, end = blocksize_range.split("-")
blocksize = (size_str_to_bytes(start), size_str_to_bytes(end))
elif blocksize is not None:
blocksize = size_str_to_bytes(blocksize)
else:
raise ValueError("either --blocksize or --blocksize_range must be provided")
if output_path is not None:
os.makedirs(output_path, exist_ok=True)
df = sp.from_items(
[{"path": os.path.join(output_path, f"{i}")} for i in range(npartitions)]
)
df = df.repartition(npartitions, by_rows=True)
stats = df.map(lambda x: fswrite(x["path"], size, blocksize)).to_pandas()
logging.info(f"write stats:\n{stats}")
if input_path is not None:
paths = list(glob.glob(input_path))
df = sp.from_items([{"path": path} for path in paths])
df = df.repartition(len(paths), by_rows=True)
stats = df.map(lambda x: fsread(x["path"], blocksize, randread)).to_pandas()
logging.info(f"read stats:\n{stats}")
if __name__ == "__main__":
"""
Example usage:
- write only:
python example/fstest.py -o 'fstest' -j 8 -s 1G
- read only:
python example/fstest.py -i 'fstest/*'
- write and then read:
python example/fstest.py -o 'fstest' -j 8 -s 1G -i 'fstest/*'
"""
parser = argparse.ArgumentParser()
parser.add_argument(
"-o", "--output_path", type=str, help="The output path to write data to."
)
parser.add_argument(
"-i",
"--input_path",
type=str,
help="The input path to read data from. If -o is provided, this is ignored.",
)
parser.add_argument(
"-j", "--npartitions", type=int, help="The number of parallel jobs", default=10
)
parser.add_argument(
"-s",
"--size",
type=str,
help="The size for each file. Required if -o is provided.",
)
parser.add_argument("-bs", "--blocksize", type=str, help="Block size", default="4k")
parser.add_argument(
"-bsrange",
"--blocksize_range",
type=str,
help="A range of I/O block sizes. e.g. 4k-128k",
)
parser.add_argument(
"-randread",
"--randread",
action="store_true",
help="Whether to read data randomly",
default=False,
)
args = parser.parse_args()
sp = smallpond.init()
fstest(sp, **vars(args))

78
examples/shuffle_data.py Normal file
View File

@@ -0,0 +1,78 @@
from smallpond.contrib.copy_table import StreamCopy
from smallpond.execution.driver import Driver
from smallpond.logical.dataset import ParquetDataSet
from smallpond.logical.node import (
Context,
DataSetPartitionNode,
DataSourceNode,
HashPartitionNode,
LogicalPlan,
SqlEngineNode,
)
def shuffle_data(
input_paths,
num_out_data_partitions: int = 0,
num_data_partitions: int = 10,
num_hash_partitions: int = 10,
engine_type="duckdb",
skip_hash_partition=False,
) -> LogicalPlan:
ctx = Context()
dataset = ParquetDataSet(input_paths, union_by_name=True)
data_files = DataSourceNode(ctx, dataset)
data_partitions = DataSetPartitionNode(
ctx,
(data_files,),
npartitions=num_data_partitions,
partition_by_rows=True,
random_shuffle=skip_hash_partition,
)
if skip_hash_partition:
urls_partitions = data_partitions
else:
urls_partitions = HashPartitionNode(
ctx,
(data_partitions,),
npartitions=num_hash_partitions,
hash_columns=None,
random_shuffle=True,
engine_type=engine_type,
)
shuffled_urls = SqlEngineNode(
ctx,
(urls_partitions,),
r"select *, cast(random() * 2147483647 as integer) as sort_key from {0} order by sort_key",
cpu_limit=16,
)
repartitioned = DataSetPartitionNode(
ctx,
(shuffled_urls,),
npartitions=num_out_data_partitions,
partition_by_rows=True,
)
shuffled_urls = StreamCopy(
ctx, (repartitioned,), output_name="data_copy", cpu_limit=1
)
plan = LogicalPlan(ctx, shuffled_urls)
return plan
def main():
driver = Driver()
driver.add_argument("-i", "--input_paths", nargs="+")
driver.add_argument("-nd", "--num_data_partitions", type=int, default=1024)
driver.add_argument("-nh", "--num_hash_partitions", type=int, default=3840)
driver.add_argument("-no", "--num_out_data_partitions", type=int, default=1920)
driver.add_argument(
"-e", "--engine_type", default="duckdb", choices=("duckdb", "arrow")
)
driver.add_argument("-x", "--skip_hash_partition", action="store_true")
plan = shuffle_data(**driver.get_arguments())
driver.run(plan)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,73 @@
from smallpond.common import GB
from smallpond.execution.driver import Driver
from smallpond.logical.dataset import ParquetDataSet
from smallpond.logical.node import (
Context,
DataSetPartitionNode,
DataSourceNode,
HashPartitionNode,
LogicalPlan,
SqlEngineNode,
)
def shuffle_mock_urls(
input_paths, npartitions: int = 10, sort_rand_keys=True, engine_type="duckdb"
) -> LogicalPlan:
ctx = Context()
dataset = ParquetDataSet(input_paths)
data_files = DataSourceNode(ctx, dataset)
data_partitions = DataSetPartitionNode(ctx, (data_files,), npartitions=npartitions)
urls_partitions = HashPartitionNode(
ctx,
(data_partitions,),
npartitions=npartitions,
hash_columns=None,
random_shuffle=True,
engine_type=engine_type,
output_name="urls_partitions",
cpu_limit=1,
memory_limit=20 * GB,
)
if sort_rand_keys:
# shuffle as sorting partition keys
shuffled_urls = SqlEngineNode(
ctx,
(urls_partitions,),
r"select *, random() as partition_key from {0} order by partition_key",
output_name="shuffled_urls",
cpu_limit=1,
memory_limit=40 * GB,
)
else:
# shuffle as reservoir sampling
shuffled_urls = SqlEngineNode(
ctx,
(urls_partitions,),
r"select * from {0} using sample 100% (reservoir, {rand_seed})",
output_name="shuffled_urls",
cpu_limit=1,
memory_limit=40 * GB,
)
plan = LogicalPlan(ctx, shuffled_urls)
return plan
def main():
driver = Driver()
driver.add_argument("-i", "--input_paths", nargs="+")
driver.add_argument("-n", "--npartitions", type=int, default=500)
driver.add_argument("-s", "--sort_rand_keys", action="store_true")
driver.add_argument(
"-e", "--engine_type", default="duckdb", choices=("duckdb", "arrow")
)
plan = shuffle_mock_urls(**driver.get_arguments())
driver.run(plan)
if __name__ == "__main__":
main()

104
examples/sort_mock_urls.py Normal file
View File

@@ -0,0 +1,104 @@
import logging
import os.path
from typing import List, Optional, OrderedDict
import pyarrow as arrow
from smallpond.execution.driver import Driver
from smallpond.execution.task import RuntimeContext
from smallpond.logical.dataset import CsvDataSet
from smallpond.logical.node import (
ArrowComputeNode,
Context,
DataSetPartitionNode,
DataSinkNode,
DataSourceNode,
HashPartitionNode,
LogicalPlan,
SqlEngineNode,
)
class SortUrlsNode(ArrowComputeNode):
def process(
self, runtime_ctx: RuntimeContext, input_tables: List[arrow.Table]
) -> arrow.Table:
logging.info(f"sorting urls by 'host', table shape: {input_tables[0].shape}")
return input_tables[0].sort_by("host")
def sort_mock_urls(
input_paths,
npartitions: int,
engine_type="duckdb",
external_output_path: Optional[str] = None,
) -> LogicalPlan:
ctx = Context()
dataset = CsvDataSet(
input_paths,
schema=OrderedDict([("urlstr", "varchar"), ("valstr", "varchar")]),
delim=r"\t",
)
data_files = DataSourceNode(ctx, dataset)
data_partitions = DataSetPartitionNode(ctx, (data_files,), npartitions=npartitions)
imported_urls = SqlEngineNode(
ctx,
(data_partitions,),
r"""
select split_part(urlstr, '/', 1) as host, split_part(urlstr, ' ', 1) as url, from_base64(valstr) AS payload from {0}
""",
output_name="imported_urls",
output_path=external_output_path,
)
urls_partitions = HashPartitionNode(
ctx,
(imported_urls,),
npartitions=npartitions,
hash_columns=["host"],
engine_type=engine_type,
output_name="urls_partitions",
output_path=external_output_path,
)
if engine_type == "duckdb":
sorted_urls = SqlEngineNode(
ctx,
(urls_partitions,),
r"select * from {0} order by host",
output_name="sorted_urls",
)
else:
sorted_urls = SortUrlsNode(
ctx,
(urls_partitions,),
output_name="sorted_urls",
output_path=external_output_path,
)
final_result = DataSetPartitionNode(ctx, (sorted_urls,), npartitions=1)
if external_output_path:
final_result = DataSinkNode(
ctx,
(final_result,),
output_path=os.path.join(external_output_path, "data_sink"),
)
plan = LogicalPlan(ctx, final_result)
return plan
def main():
driver = Driver()
driver.add_argument(
"-i", "--input_paths", nargs="+", default=["tests/data/mock_urls/*.tsv"]
)
driver.add_argument("-n", "--npartitions", type=int, default=10)
driver.add_argument("-e", "--engine_type", default="duckdb")
plan = sort_mock_urls(**driver.get_arguments())
driver.run(plan)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,36 @@
import argparse
from typing import List
import smallpond
from smallpond.dataframe import Session
def sort_mock_urls_v2(
sp: Session, input_paths: List[str], output_path: str, npartitions: int
):
dataset = sp.read_csv(
input_paths, schema={"urlstr": "varchar", "valstr": "varchar"}, delim=r"\t"
).repartition(npartitions)
urls = dataset.map(
"""
split_part(urlstr, '/', 1) as host,
split_part(urlstr, ' ', 1) as url,
from_base64(valstr) AS payload
"""
)
urls = urls.repartition(npartitions, hash_by="host")
sorted_urls = urls.partial_sort(by=["host"])
sorted_urls.write_parquet(output_path)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"-i", "--input_paths", nargs="+", default=["tests/data/mock_urls/*.tsv"]
)
parser.add_argument("-o", "--output_path", type=str, default="sort_mock_urls")
parser.add_argument("-n", "--npartitions", type=int, default=10)
args = parser.parse_args()
sp = smallpond.init()
sort_mock_urls_v2(sp, **vars(args))