mirror of
https://github.com/deepseek-ai/smallpond
synced 2025-06-26 18:27:45 +00:00
568 lines
24 KiB
Python
568 lines
24 KiB
Python
import os.path
|
|
import tempfile
|
|
import unittest
|
|
from typing import List
|
|
|
|
import pyarrow.compute as pc
|
|
|
|
from smallpond.common import DATA_PARTITION_COLUMN_NAME, GB
|
|
from smallpond.execution.task import RuntimeContext
|
|
from smallpond.logical.dataset import DataSet, ParquetDataSet
|
|
from smallpond.logical.node import (
|
|
ArrowComputeNode,
|
|
ConsolidateNode,
|
|
Context,
|
|
DataSetPartitionNode,
|
|
DataSinkNode,
|
|
DataSourceNode,
|
|
EvenlyDistributedPartitionNode,
|
|
HashPartitionNode,
|
|
LoadPartitionedDataSetNode,
|
|
LogicalPlan,
|
|
ProjectionNode,
|
|
SqlEngineNode,
|
|
UnionNode,
|
|
UserDefinedPartitionNode,
|
|
UserPartitionedDataSourceNode,
|
|
)
|
|
from tests.test_execution import parse_url
|
|
from tests.test_fabric import TestFabric
|
|
|
|
|
|
class CalculatePartitionFromFilename(UserDefinedPartitionNode):
|
|
def partition(self, runtime_ctx: RuntimeContext, dataset: DataSet) -> List[DataSet]:
|
|
partitioned_datasets: List[ParquetDataSet] = [ParquetDataSet([]) for _ in range(self.npartitions)]
|
|
for path in dataset.resolved_paths:
|
|
partition_idx = hash(path) % self.npartitions
|
|
partitioned_datasets[partition_idx].paths.append(path)
|
|
return partitioned_datasets
|
|
|
|
|
|
class TestPartition(TestFabric, unittest.TestCase):
|
|
def test_many_file_partitions(self):
|
|
ctx = Context()
|
|
dataset = ParquetDataSet(["tests/data/mock_urls/*.parquet"] * 10)
|
|
data_files = DataSourceNode(ctx, dataset)
|
|
data_partitions = DataSetPartitionNode(ctx, (data_files,), npartitions=dataset.num_files)
|
|
count_rows = SqlEngineNode(
|
|
ctx,
|
|
(data_partitions,),
|
|
"select count(*) from {0}",
|
|
cpu_limit=1,
|
|
memory_limit=1 * GB,
|
|
)
|
|
plan = LogicalPlan(ctx, count_rows)
|
|
self.execute_plan(plan)
|
|
|
|
def test_many_row_partitions(self):
|
|
ctx = Context()
|
|
dataset = ParquetDataSet(["tests/data/mock_urls/*.parquet"])
|
|
data_files = DataSourceNode(ctx, dataset)
|
|
data_partitions = DataSetPartitionNode(ctx, (data_files,), npartitions=dataset.num_rows, partition_by_rows=True)
|
|
count_rows = SqlEngineNode(
|
|
ctx,
|
|
(data_partitions,),
|
|
"select count(*) from {0}",
|
|
cpu_limit=1,
|
|
memory_limit=1 * GB,
|
|
)
|
|
plan = LogicalPlan(ctx, count_rows)
|
|
exec_plan = self.execute_plan(plan, num_executors=5)
|
|
self.assertEqual(exec_plan.final_output.to_arrow_table().num_rows, dataset.num_rows)
|
|
|
|
def test_empty_dataset_partition(self):
|
|
ctx = Context()
|
|
dataset = ParquetDataSet(["tests/data/mock_urls/*.parquet"])
|
|
data_files = DataSourceNode(ctx, dataset)
|
|
# create more partitions than files
|
|
data_partitions = EvenlyDistributedPartitionNode(ctx, (data_files,), npartitions=dataset.num_files * 2)
|
|
data_partitions.max_num_producer_tasks = 3
|
|
unique_urls = SqlEngineNode(
|
|
ctx,
|
|
(data_partitions,),
|
|
r"select distinct url from {0}",
|
|
cpu_limit=1,
|
|
memory_limit=1 * GB,
|
|
)
|
|
# nested partition
|
|
nested_partitioned_urls = EvenlyDistributedPartitionNode(ctx, (unique_urls,), npartitions=3, dimension="nested", nested=True)
|
|
parsed_urls = ArrowComputeNode(
|
|
ctx,
|
|
(nested_partitioned_urls,),
|
|
process_func=parse_url,
|
|
cpu_limit=1,
|
|
memory_limit=1 * GB,
|
|
)
|
|
plan = LogicalPlan(ctx, parsed_urls)
|
|
final_output = self.execute_plan(plan, remove_empty_parquet=True, skip_task_with_empty_input=True).final_output
|
|
self.assertTrue(isinstance(final_output, ParquetDataSet))
|
|
self.assertEqual(dataset.num_rows, final_output.num_rows)
|
|
|
|
def test_hash_partition(self):
|
|
for engine_type in ("duckdb", "arrow"):
|
|
for partition_by_rows in (False, True):
|
|
for hive_partitioning in (False, True) if engine_type == "duckdb" else (False,):
|
|
with self.subTest(
|
|
engine_type=engine_type,
|
|
partition_by_rows=partition_by_rows,
|
|
hive_partitioning=hive_partitioning,
|
|
):
|
|
ctx = Context()
|
|
dataset = ParquetDataSet(["tests/data/arrow/*.parquet"])
|
|
data_files = DataSourceNode(ctx, dataset)
|
|
npartitions = 3
|
|
data_partitions = DataSetPartitionNode(
|
|
ctx,
|
|
(data_files,),
|
|
npartitions=npartitions,
|
|
partition_by_rows=partition_by_rows,
|
|
)
|
|
hash_partitions = HashPartitionNode(
|
|
ctx,
|
|
(ProjectionNode(ctx, data_partitions, ["url"]),),
|
|
npartitions=npartitions,
|
|
hash_columns=["url"],
|
|
engine_type=engine_type,
|
|
hive_partitioning=hive_partitioning,
|
|
cpu_limit=2,
|
|
memory_limit=2 * GB,
|
|
output_name="hash_partitions",
|
|
)
|
|
row_count = SqlEngineNode(
|
|
ctx,
|
|
(hash_partitions,),
|
|
r"select count(*) as row_count from {0}",
|
|
cpu_limit=1,
|
|
memory_limit=1 * GB,
|
|
)
|
|
plan = LogicalPlan(ctx, row_count)
|
|
exec_plan = self.execute_plan(plan)
|
|
self.assertEqual(
|
|
dataset.num_rows,
|
|
pc.sum(exec_plan.final_output.to_arrow_table().column("row_count")).as_py(),
|
|
)
|
|
self.assertEqual(
|
|
npartitions,
|
|
len(exec_plan.final_output.load_partitioned_datasets(npartitions, DATA_PARTITION_COLUMN_NAME)),
|
|
)
|
|
self.assertEqual(
|
|
npartitions,
|
|
len(
|
|
exec_plan.get_output("hash_partitions").load_partitioned_datasets(
|
|
npartitions,
|
|
DATA_PARTITION_COLUMN_NAME,
|
|
hive_partitioning,
|
|
)
|
|
),
|
|
)
|
|
|
|
def test_empty_hash_partition(self):
|
|
for engine_type in ("duckdb", "arrow"):
|
|
for partition_by_rows in (False, True):
|
|
for hive_partitioning in (False, True) if engine_type == "duckdb" else (False,):
|
|
with self.subTest(
|
|
engine_type=engine_type,
|
|
partition_by_rows=partition_by_rows,
|
|
hive_partitioning=hive_partitioning,
|
|
):
|
|
ctx = Context()
|
|
dataset = ParquetDataSet(["tests/data/mock_urls/*.parquet"])
|
|
data_files = DataSourceNode(ctx, dataset)
|
|
npartitions = 3
|
|
npartitions_nested = 4
|
|
num_rows = 1
|
|
head_rows = SqlEngineNode(ctx, (data_files,), f"select * from {{0}} limit {num_rows}")
|
|
data_partitions = DataSetPartitionNode(
|
|
ctx,
|
|
(head_rows,),
|
|
npartitions=npartitions,
|
|
partition_by_rows=partition_by_rows,
|
|
)
|
|
hash_partitions = HashPartitionNode(
|
|
ctx,
|
|
(data_partitions,),
|
|
npartitions=npartitions,
|
|
hash_columns=["url"],
|
|
data_partition_column="hash_partitions",
|
|
engine_type=engine_type,
|
|
hive_partitioning=hive_partitioning,
|
|
output_name="hash_partitions",
|
|
cpu_limit=2,
|
|
memory_limit=1 * GB,
|
|
)
|
|
nested_hash_partitions = HashPartitionNode(
|
|
ctx,
|
|
(hash_partitions,),
|
|
npartitions=npartitions_nested,
|
|
hash_columns=["url"],
|
|
data_partition_column="nested_hash_partitions",
|
|
nested=True,
|
|
engine_type=engine_type,
|
|
hive_partitioning=hive_partitioning,
|
|
output_name="nested_hash_partitions",
|
|
cpu_limit=2,
|
|
memory_limit=1 * GB,
|
|
)
|
|
select_every_row = SqlEngineNode(
|
|
ctx,
|
|
(nested_hash_partitions,),
|
|
r"select * from {0}",
|
|
cpu_limit=1,
|
|
memory_limit=1 * GB,
|
|
)
|
|
plan = LogicalPlan(ctx, select_every_row)
|
|
exec_plan = self.execute_plan(plan, skip_task_with_empty_input=True)
|
|
self.assertEqual(num_rows, exec_plan.final_output.num_rows)
|
|
self.assertEqual(
|
|
npartitions,
|
|
len(exec_plan.final_output.load_partitioned_datasets(npartitions, "hash_partitions")),
|
|
)
|
|
self.assertEqual(
|
|
npartitions_nested,
|
|
len(exec_plan.final_output.load_partitioned_datasets(npartitions_nested, "nested_hash_partitions")),
|
|
)
|
|
self.assertEqual(
|
|
npartitions,
|
|
len(exec_plan.get_output("hash_partitions").load_partitioned_datasets(npartitions, "hash_partitions")),
|
|
)
|
|
self.assertEqual(
|
|
npartitions_nested,
|
|
len(
|
|
exec_plan.get_output("nested_hash_partitions").load_partitioned_datasets(npartitions_nested, "nested_hash_partitions")
|
|
),
|
|
)
|
|
if hive_partitioning:
|
|
self.assertEqual(
|
|
npartitions,
|
|
len(
|
|
exec_plan.get_output("hash_partitions").load_partitioned_datasets(
|
|
npartitions,
|
|
"hash_partitions",
|
|
hive_partitioning=True,
|
|
)
|
|
),
|
|
)
|
|
self.assertEqual(
|
|
npartitions_nested,
|
|
len(
|
|
exec_plan.get_output("nested_hash_partitions").load_partitioned_datasets(
|
|
npartitions_nested,
|
|
"nested_hash_partitions",
|
|
hive_partitioning=True,
|
|
)
|
|
),
|
|
)
|
|
|
|
def test_load_partitioned_datasets(self):
|
|
def run_test_plan(
|
|
npartitions: int,
|
|
data_partition_column: str,
|
|
engine_type: str,
|
|
hive_partitioning: bool,
|
|
):
|
|
ctx = Context()
|
|
input_dataset = ParquetDataSet(["tests/data/mock_urls/*.parquet"])
|
|
input_data_files = DataSourceNode(ctx, input_dataset)
|
|
# create hash partitions
|
|
input_partitions = HashPartitionNode(
|
|
ctx,
|
|
(input_data_files,),
|
|
npartitions=npartitions,
|
|
hash_columns=["url"],
|
|
data_partition_column=data_partition_column,
|
|
engine_type=engine_type,
|
|
hive_partitioning=hive_partitioning,
|
|
output_name="input_partitions",
|
|
cpu_limit=1,
|
|
memory_limit=1 * GB,
|
|
)
|
|
split_urls = SqlEngineNode(
|
|
ctx,
|
|
(input_partitions,),
|
|
f"select url, string_split(url, '/')[0] as host from {{0}}",
|
|
cpu_limit=1,
|
|
memory_limit=1 * GB,
|
|
)
|
|
plan = LogicalPlan(ctx, split_urls)
|
|
exec_plan = self.execute_plan(plan)
|
|
self.assertEqual(
|
|
npartitions,
|
|
len(exec_plan.final_output.load_partitioned_datasets(npartitions, data_partition_column)),
|
|
)
|
|
self.assertEqual(
|
|
npartitions,
|
|
len(exec_plan.get_output("input_partitions").load_partitioned_datasets(npartitions, data_partition_column, hive_partitioning)),
|
|
)
|
|
return exec_plan
|
|
|
|
npartitions = 5
|
|
data_partition_column = "_human_readable_column_name_"
|
|
|
|
for engine_type in ("duckdb", "arrow"):
|
|
with self.subTest(engine_type=engine_type):
|
|
exec_plan1 = run_test_plan(
|
|
npartitions,
|
|
data_partition_column,
|
|
engine_type,
|
|
hive_partitioning=engine_type == "duckdb",
|
|
)
|
|
exec_plan2 = run_test_plan(
|
|
npartitions,
|
|
data_partition_column,
|
|
engine_type,
|
|
hive_partitioning=False,
|
|
)
|
|
|
|
ctx = Context()
|
|
output1 = DataSourceNode(ctx, dataset=exec_plan1.get_output("input_partitions"))
|
|
output2 = DataSourceNode(ctx, dataset=exec_plan2.get_output("input_partitions"))
|
|
split_urls1 = LoadPartitionedDataSetNode(
|
|
ctx,
|
|
(output1,),
|
|
npartitions=npartitions,
|
|
data_partition_column=data_partition_column,
|
|
hive_partitioning=engine_type == "duckdb",
|
|
)
|
|
split_urls2 = LoadPartitionedDataSetNode(
|
|
ctx,
|
|
(output2,),
|
|
npartitions=npartitions,
|
|
data_partition_column=data_partition_column,
|
|
hive_partitioning=False,
|
|
)
|
|
split_urls3 = SqlEngineNode(
|
|
ctx,
|
|
(split_urls1, split_urls2),
|
|
f"""
|
|
select split_urls1.url, string_split(split_urls2.url, '/')[0] as host
|
|
from {{0}} as split_urls1
|
|
join {{1}} as split_urls2
|
|
on split_urls1.url = split_urls2.url
|
|
""",
|
|
cpu_limit=1,
|
|
memory_limit=1 * GB,
|
|
)
|
|
plan = LogicalPlan(ctx, split_urls3)
|
|
exec_plan3 = self.execute_plan(plan)
|
|
# load each partition as arrow table and compare
|
|
final_output_partitions1 = exec_plan1.final_output.load_partitioned_datasets(npartitions, data_partition_column)
|
|
final_output_partitions3 = exec_plan3.final_output.load_partitioned_datasets(npartitions, data_partition_column)
|
|
self.assertEqual(npartitions, len(final_output_partitions3))
|
|
for x, y in zip(final_output_partitions1, final_output_partitions3):
|
|
self._compare_arrow_tables(x.to_arrow_table(), y.to_arrow_table())
|
|
|
|
def test_nested_partition(self):
|
|
ctx = Context()
|
|
parquet_files = ParquetDataSet(["tests/data/mock_urls/*.parquet"])
|
|
data_source = DataSourceNode(ctx, parquet_files)
|
|
|
|
SqlEngineNode.default_cpu_limit = 1
|
|
SqlEngineNode.default_memory_limit = 1 * GB
|
|
initial_reduce = r"select host, count(*) as cnt from {0} group by host"
|
|
combine_reduce_results = r"select host, cast(sum(cnt) as bigint) as cnt from {0} group by host"
|
|
join_query = r"select host, cnt from {0} where (exists (select * from {1} where {1}.host = {0}.host)) and (exists (select * from {2} where {2}.host = {0}.host))"
|
|
|
|
partition_by_hosts = HashPartitionNode(
|
|
ctx,
|
|
(data_source,),
|
|
npartitions=3,
|
|
hash_columns=["host"],
|
|
data_partition_column="host_partition",
|
|
)
|
|
partition_by_hosts_x_urls = HashPartitionNode(
|
|
ctx,
|
|
(partition_by_hosts,),
|
|
npartitions=5,
|
|
hash_columns=["url"],
|
|
data_partition_column="url_partition",
|
|
nested=True,
|
|
)
|
|
url_count_by_hosts_x_urls1 = SqlEngineNode(
|
|
ctx,
|
|
(partition_by_hosts_x_urls,),
|
|
initial_reduce,
|
|
output_name="url_count_by_hosts_x_urls1",
|
|
)
|
|
url_count_by_hosts1 = SqlEngineNode(
|
|
ctx,
|
|
(ConsolidateNode(ctx, url_count_by_hosts_x_urls1, ["host_partition"]),),
|
|
combine_reduce_results,
|
|
output_name="url_count_by_hosts1",
|
|
)
|
|
join_count_by_hosts_x_urls1 = SqlEngineNode(
|
|
ctx,
|
|
(url_count_by_hosts_x_urls1, url_count_by_hosts1, data_source),
|
|
join_query,
|
|
output_name="join_count_by_hosts_x_urls1",
|
|
)
|
|
|
|
partitioned_urls = LoadPartitionedDataSetNode(
|
|
ctx,
|
|
(partition_by_hosts_x_urls,),
|
|
data_partition_column="url_partition",
|
|
npartitions=5,
|
|
)
|
|
partitioned_hosts_x_urls = LoadPartitionedDataSetNode(
|
|
ctx,
|
|
(partitioned_urls,),
|
|
data_partition_column="host_partition",
|
|
npartitions=3,
|
|
nested=True,
|
|
)
|
|
partitioned_3dims = EvenlyDistributedPartitionNode(
|
|
ctx,
|
|
(partitioned_hosts_x_urls,),
|
|
npartitions=2,
|
|
dimension="inner_partition",
|
|
partition_by_rows=True,
|
|
nested=True,
|
|
)
|
|
url_count_by_3dims = SqlEngineNode(ctx, (partitioned_3dims,), initial_reduce)
|
|
url_count_by_hosts_x_urls2 = SqlEngineNode(
|
|
ctx,
|
|
(ConsolidateNode(ctx, url_count_by_3dims, ["host_partition", "url_partition"]),),
|
|
combine_reduce_results,
|
|
output_name="url_count_by_hosts_x_urls2",
|
|
)
|
|
url_count_by_hosts2 = SqlEngineNode(
|
|
ctx,
|
|
(ConsolidateNode(ctx, url_count_by_hosts_x_urls2, ["host_partition"]),),
|
|
combine_reduce_results,
|
|
output_name="url_count_by_hosts2",
|
|
)
|
|
url_count_by_hosts_expected = SqlEngineNode(
|
|
ctx,
|
|
(data_source,),
|
|
initial_reduce,
|
|
per_thread_output=False,
|
|
output_name="url_count_by_hosts_expected",
|
|
)
|
|
join_count_by_hosts_x_urls2 = SqlEngineNode(
|
|
ctx,
|
|
(url_count_by_hosts_x_urls2, url_count_by_hosts2, data_source),
|
|
join_query,
|
|
output_name="join_count_by_hosts_x_urls2",
|
|
)
|
|
|
|
union_url_count_by_hosts = UnionNode(ctx, (url_count_by_hosts1, url_count_by_hosts2))
|
|
union_url_count_by_hosts_x_urls = UnionNode(
|
|
ctx,
|
|
(
|
|
url_count_by_hosts_x_urls1,
|
|
url_count_by_hosts_x_urls2,
|
|
join_count_by_hosts_x_urls1,
|
|
join_count_by_hosts_x_urls2,
|
|
),
|
|
)
|
|
|
|
with tempfile.TemporaryDirectory(dir=self.output_root_abspath) as output_dir:
|
|
data_sink = DataSinkNode(
|
|
ctx,
|
|
(
|
|
url_count_by_hosts_expected,
|
|
union_url_count_by_hosts,
|
|
union_url_count_by_hosts_x_urls,
|
|
),
|
|
output_path=output_dir,
|
|
manifest_only=True,
|
|
)
|
|
plan = LogicalPlan(ctx, data_sink)
|
|
exec_plan = self.execute_plan(plan, remove_empty_parquet=True)
|
|
# verify results
|
|
self._compare_arrow_tables(
|
|
exec_plan.get_output("url_count_by_hosts_x_urls1").to_arrow_table(),
|
|
exec_plan.get_output("url_count_by_hosts_x_urls2").to_arrow_table(),
|
|
)
|
|
self._compare_arrow_tables(
|
|
exec_plan.get_output("join_count_by_hosts_x_urls1").to_arrow_table(),
|
|
exec_plan.get_output("join_count_by_hosts_x_urls2").to_arrow_table(),
|
|
)
|
|
self._compare_arrow_tables(
|
|
exec_plan.get_output("url_count_by_hosts_x_urls1").to_arrow_table(),
|
|
exec_plan.get_output("join_count_by_hosts_x_urls1").to_arrow_table(),
|
|
)
|
|
self._compare_arrow_tables(
|
|
exec_plan.get_output("url_count_by_hosts1").to_arrow_table(),
|
|
exec_plan.get_output("url_count_by_hosts2").to_arrow_table(),
|
|
)
|
|
self._compare_arrow_tables(
|
|
exec_plan.get_output("url_count_by_hosts_expected").to_arrow_table(),
|
|
exec_plan.get_output("url_count_by_hosts1").to_arrow_table(),
|
|
)
|
|
|
|
def test_user_defined_partition(self):
|
|
ctx = Context()
|
|
parquet_files = ParquetDataSet(["tests/data/mock_urls/*.parquet"])
|
|
data_source = DataSourceNode(ctx, parquet_files)
|
|
file_partitions1 = CalculatePartitionFromFilename(ctx, (data_source,), npartitions=3, dimension="by_filename_hash1")
|
|
url_count1 = SqlEngineNode(
|
|
ctx,
|
|
(file_partitions1,),
|
|
r"select host, count(*) as cnt from {0} group by host",
|
|
output_name="url_count1",
|
|
)
|
|
file_partitions2 = CalculatePartitionFromFilename(ctx, (url_count1,), npartitions=3, dimension="by_filename_hash2")
|
|
url_count2 = SqlEngineNode(
|
|
ctx,
|
|
(file_partitions2,),
|
|
r"select host, cnt from {0}",
|
|
output_name="url_count2",
|
|
)
|
|
plan = LogicalPlan(ctx, url_count2)
|
|
|
|
exec_plan = self.execute_plan(plan, enable_diagnostic_metrics=True)
|
|
self._compare_arrow_tables(
|
|
exec_plan.get_output("url_count1").to_arrow_table(),
|
|
exec_plan.get_output("url_count2").to_arrow_table(),
|
|
)
|
|
|
|
def test_user_partitioned_data_source(self):
|
|
ctx = Context()
|
|
parquet_dataset = ParquetDataSet(["tests/data/mock_urls/*.parquet"])
|
|
data_source = DataSourceNode(ctx, parquet_dataset)
|
|
evenly_dist_data_source = EvenlyDistributedPartitionNode(ctx, (data_source,), npartitions=parquet_dataset.num_files)
|
|
|
|
parquet_datasets = [ParquetDataSet([p]) for p in parquet_dataset.resolved_paths]
|
|
partitioned_data_source = UserPartitionedDataSourceNode(ctx, parquet_datasets)
|
|
|
|
url_count_by_host1 = SqlEngineNode(
|
|
ctx,
|
|
(evenly_dist_data_source,),
|
|
r"select host, count(*) as cnt from {0} group by host",
|
|
output_name="url_count_by_host1",
|
|
cpu_limit=1,
|
|
memory_limit=1 * GB,
|
|
)
|
|
|
|
url_count_by_host2 = SqlEngineNode(
|
|
ctx,
|
|
(evenly_dist_data_source, partitioned_data_source),
|
|
r"select {1}.host, count(*) as cnt from {0} join {1} on {0}.host = {1}.host group by {1}.host",
|
|
output_name="url_count_by_host2",
|
|
cpu_limit=1,
|
|
memory_limit=1 * GB,
|
|
)
|
|
|
|
plan = LogicalPlan(ctx, UnionNode(ctx, [url_count_by_host1, url_count_by_host2]))
|
|
exec_plan = self.execute_plan(plan, enable_diagnostic_metrics=True)
|
|
self._compare_arrow_tables(
|
|
exec_plan.get_output("url_count_by_host1").to_arrow_table(),
|
|
exec_plan.get_output("url_count_by_host2").to_arrow_table(),
|
|
)
|
|
|
|
def test_partition_info_in_sql_query(self):
|
|
"""
|
|
User can refer to the partition info in the SQL query.
|
|
"""
|
|
ctx = Context()
|
|
parquet_dataset = ParquetDataSet(["tests/data/mock_urls/*.parquet"])
|
|
data_source = DataSourceNode(ctx, parquet_dataset)
|
|
evenly_dist_data_source = EvenlyDistributedPartitionNode(ctx, (data_source,), npartitions=parquet_dataset.num_files)
|
|
sql_query = SqlEngineNode(
|
|
ctx,
|
|
(evenly_dist_data_source,),
|
|
r"select host, {__data_partition__} as partition_info from {0}",
|
|
)
|
|
plan = LogicalPlan(ctx, sql_query)
|
|
exec_plan = self.execute_plan(plan)
|