smallpond/tests/test_driver.py
Runji Wang 770aa417d5 init
2025-02-27 17:23:53 +08:00

47 lines
1.8 KiB
Python

import os.path
import unittest
import uuid
from loguru import logger
from benchmarks.gray_sort_benchmark import gray_sort_benchmark
from examples.sort_mock_urls import sort_mock_urls
from smallpond.common import GB, MB
from smallpond.execution.driver import Driver
from tests.test_fabric import TestFabric
@unittest.skipUnless(os.getenv("ENABLE_DRIVER_TEST"), "unit test disabled")
class TestDriver(TestFabric, unittest.TestCase):
fault_inject_prob = 0.05
def create_driver(self, num_executors: int):
cmdline = f"scheduler --job_id {str(uuid.uuid4())} --job_name {self._testMethodName} --data_root {self.output_root_abspath} --num_executors {num_executors} --fault_inject_prob {self.fault_inject_prob}"
driver = Driver()
driver.parse_arguments(args=cmdline.split())
logger.info(f"{cmdline=} {driver.mode=} {driver.job_id=} {driver.data_root=}")
return driver
def test_standalone_mode(self):
plan = sort_mock_urls(["tests/data/mock_urls/*.tsv"], npartitions=3)
driver = self.create_driver(num_executors=0)
exec_plan = driver.run(plan, stop_process_on_done=False)
self.assertTrue(exec_plan.successful)
self.assertGreater(exec_plan.final_output.num_files, 0)
def test_run_on_remote_executors(self):
driver = self.create_driver(num_executors=2)
plan = gray_sort_benchmark(
record_nbytes=100,
key_nbytes=10,
total_data_nbytes=1 * GB,
gensort_batch_nbytes=100 * MB,
num_data_partitions=10,
num_sort_partitions=10,
validate_results=True,
)
exec_plan = driver.run(plan, stop_process_on_done=False)
self.assertTrue(exec_plan.successful)
self.assertGreater(exec_plan.final_output.num_files, 0)