mirror of
https://github.com/deepseek-ai/smallpond
synced 2025-06-26 18:27:45 +00:00
47 lines
1.8 KiB
Python
47 lines
1.8 KiB
Python
import os.path
|
|
import unittest
|
|
import uuid
|
|
|
|
from loguru import logger
|
|
|
|
from benchmarks.gray_sort_benchmark import gray_sort_benchmark
|
|
from examples.sort_mock_urls import sort_mock_urls
|
|
from smallpond.common import GB, MB
|
|
from smallpond.execution.driver import Driver
|
|
from tests.test_fabric import TestFabric
|
|
|
|
|
|
@unittest.skipUnless(os.getenv("ENABLE_DRIVER_TEST"), "unit test disabled")
|
|
class TestDriver(TestFabric, unittest.TestCase):
|
|
|
|
fault_inject_prob = 0.05
|
|
|
|
def create_driver(self, num_executors: int):
|
|
cmdline = f"scheduler --job_id {str(uuid.uuid4())} --job_name {self._testMethodName} --data_root {self.output_root_abspath} --num_executors {num_executors} --fault_inject_prob {self.fault_inject_prob}"
|
|
driver = Driver()
|
|
driver.parse_arguments(args=cmdline.split())
|
|
logger.info(f"{cmdline=} {driver.mode=} {driver.job_id=} {driver.data_root=}")
|
|
return driver
|
|
|
|
def test_standalone_mode(self):
|
|
plan = sort_mock_urls(["tests/data/mock_urls/*.tsv"], npartitions=3)
|
|
driver = self.create_driver(num_executors=0)
|
|
exec_plan = driver.run(plan, stop_process_on_done=False)
|
|
self.assertTrue(exec_plan.successful)
|
|
self.assertGreater(exec_plan.final_output.num_files, 0)
|
|
|
|
def test_run_on_remote_executors(self):
|
|
driver = self.create_driver(num_executors=2)
|
|
plan = gray_sort_benchmark(
|
|
record_nbytes=100,
|
|
key_nbytes=10,
|
|
total_data_nbytes=1 * GB,
|
|
gensort_batch_nbytes=100 * MB,
|
|
num_data_partitions=10,
|
|
num_sort_partitions=10,
|
|
validate_results=True,
|
|
)
|
|
exec_plan = driver.run(plan, stop_process_on_done=False)
|
|
self.assertTrue(exec_plan.successful)
|
|
self.assertGreater(exec_plan.final_output.num_files, 0)
|