From 0f80da8458765ecbb8eff1ce8e635fdb7ecb5b4b Mon Sep 17 00:00:00 2001 From: "fujianhao.fjh" Date: Thu, 10 Apr 2025 18:18:30 +0800 Subject: [PATCH] fix: not output result in some linux system --- tests/test_internode.py | 18 +++++++++--------- tests/test_intranode.py | 18 +++++++++--------- tests/test_low_latency.py | 4 ++-- 3 files changed, 20 insertions(+), 20 deletions(-) diff --git a/tests/test_internode.py b/tests/test_internode.py index 765ba1a..7c73faa 100644 --- a/tests/test_internode.py +++ b/tests/test_internode.py @@ -76,7 +76,7 @@ def test_main(num_sms: int, local_rank: int, num_local_ranks: int, num_ranks: in t = bench(lambda: buffer.get_dispatch_layout(topk_idx, num_experts))[0] if local_rank == 0: print(f'[layout] Kernel performance: {t * 1000:.3f} ms', flush=True) - print() + print('', flush=True) group.barrier() time.sleep(1) @@ -163,7 +163,7 @@ def test_main(num_sms: int, local_rank: int, num_local_ranks: int, num_ranks: in if local_rank == 0: print(' passed', flush=True) if local_rank == 0: - print() + print('', flush=True) # Tune dispatch performance best_dispatch_results = None @@ -180,10 +180,10 @@ def test_main(num_sms: int, local_rank: int, num_local_ranks: int, num_ranks: in if t < best_time: best_time, best_results = t, (num_sms, nvl_chunk_size, rdma_chunk_size) if local_rank == 0: - print(f'[tuning] SMs {num_sms}, NVL chunk {nvl_chunk_size}, RDMA chunk {rdma_chunk_size}: {rdma_send_bytes / 1e9 / t:.2f} GB/s (RDMA), {nvl_recv_bytes / 1e9 / t:.2f} GB/s (NVL) ') + print(f'[tuning] SMs {num_sms}, NVL chunk {nvl_chunk_size}, RDMA chunk {rdma_chunk_size}: {rdma_send_bytes / 1e9 / t:.2f} GB/s (RDMA), {nvl_recv_bytes / 1e9 / t:.2f} GB/s (NVL) ', flush=True) if local_rank == 0: - print(f'[tuning] Best dispatch ({"FP8" if isinstance(current_x, tuple) else "BF16"}): SMs {best_results[0]}, NVL chunk {best_results[1]}, RDMA chunk {best_results[2]}: {rdma_send_bytes / 1e9 / best_time:.2f} GB/s (RDMA), {nvl_recv_bytes / 1e9 / best_time:.2f} GB/s (NVL)') - print() + print(f'[tuning] Best dispatch ({"FP8" if isinstance(current_x, tuple) else "BF16"}): SMs {best_results[0]}, NVL chunk {best_results[1]}, RDMA chunk {best_results[2]}: {rdma_send_bytes / 1e9 / best_time:.2f} GB/s (RDMA), {nvl_recv_bytes / 1e9 / best_time:.2f} GB/s (NVL)', flush=True) + print('', flush=True) if isinstance(current_x, tuple): # Gather FP8 the best config from rank 0 @@ -206,13 +206,13 @@ def test_main(num_sms: int, local_rank: int, num_local_ranks: int, num_ranks: in tune_args = {'x': recv_x, 'handle': handle, 'config': config} t = bench(lambda: buffer.combine(**tune_args))[0] if local_rank == 0: - print(f'[tuning] SMs {num_sms}, NVL chunk {nvl_chunk_size}, RDMA chunk {rdma_chunk_size}: {combine_bf16_rdma_recv_bytes / 1e9 / t:.2f} GB/s (RDMA), {combine_bf16_nvl_send_bytes / 1e9 / t:.2f} GB/s (NVL) ') + print(f'[tuning] SMs {num_sms}, NVL chunk {nvl_chunk_size}, RDMA chunk {rdma_chunk_size}: {combine_bf16_rdma_recv_bytes / 1e9 / t:.2f} GB/s (RDMA), {combine_bf16_nvl_send_bytes / 1e9 / t:.2f} GB/s (NVL) ', flush=True) if t < best_time: best_time, best_results = t, (num_sms, nvl_chunk_size, rdma_chunk_size) if local_rank == 0: - print(f'[tuning] Best combine: SMs {best_results[0]}, NVL chunk {best_results[1]}, RDMA chunk {best_results[2]}: {combine_bf16_rdma_recv_bytes / 1e9 / best_time:.2f} GB/s (RDMA), {combine_bf16_nvl_send_bytes / 1e9 / best_time:.2f} GB/s (NVL)') - print() + print(f'[tuning] Best combine: SMs {best_results[0]}, NVL chunk {best_results[1]}, RDMA chunk {best_results[2]}: {combine_bf16_rdma_recv_bytes / 1e9 / best_time:.2f} GB/s (RDMA), {combine_bf16_nvl_send_bytes / 1e9 / best_time:.2f} GB/s (NVL)', flush=True) + print('', flush=True) # noinspection PyUnboundLocalVariable @@ -231,7 +231,7 @@ def test_loop(local_rank: int, num_local_ranks: int): for i in (24, ): test_main(i, local_rank, num_local_ranks, num_ranks, num_nodes, rank, buffer, group) if local_rank == 0: - print() + print('', flush=True) # Test compatibility with low latency functions if test_ll_compatibility: diff --git a/tests/test_intranode.py b/tests/test_intranode.py index 107fea4..169668c 100644 --- a/tests/test_intranode.py +++ b/tests/test_intranode.py @@ -60,7 +60,7 @@ def test_main(num_sms: int, local_rank: int, num_ranks: int, rank: int, buffer: t = bench(lambda: buffer.get_dispatch_layout(topk_idx, num_experts))[0] if local_rank == 0: print(f'[layout] Kernel performance: {t * 1000:.3f} ms', flush=True) - print() + print('', flush=True) group.barrier() time.sleep(1) @@ -145,7 +145,7 @@ def test_main(num_sms: int, local_rank: int, num_ranks: int, rank: int, buffer: if local_rank == 0: print(' passed', flush=True) if local_rank == 0: - print() + print('', flush=True) # Tune dispatch performance best_dispatch_results = None @@ -160,10 +160,10 @@ def test_main(num_sms: int, local_rank: int, num_ranks: int, rank: int, buffer: if t < best_time: best_time, best_results = t, (num_sms, nvl_chunk_size) if local_rank == 0: - print(f'[tuning] SMs {num_sms}, NVL chunk {nvl_chunk_size}: {nvl_recv_bytes / 1e9 / t:.2f} GB/s (NVL) ') + print(f'[tuning] SMs {num_sms}, NVL chunk {nvl_chunk_size}: {nvl_recv_bytes / 1e9 / t:.2f} GB/s (NVL) ', flush=True) if local_rank == 0: - print(f'[tuning] Best dispatch ({"FP8" if isinstance(current_x, tuple) else "BF16"}): SMs {best_results[0]}, NVL chunk {best_results[1]}, {nvl_recv_bytes / 1e9 / best_time:.2f} GB/s (NVL)') - print() + print(f'[tuning] Best dispatch ({"FP8" if isinstance(current_x, tuple) else "BF16"}): SMs {best_results[0]}, NVL chunk {best_results[1]}, {nvl_recv_bytes / 1e9 / best_time:.2f} GB/s (NVL)', flush=True) + print('', flush=True) if isinstance(current_x, tuple): # Gather FP8 the best config from rank 0 @@ -185,13 +185,13 @@ def test_main(num_sms: int, local_rank: int, num_ranks: int, rank: int, buffer: tune_args = {'x': recv_x, 'handle': handle, 'config': config} t = bench(lambda: buffer.combine(**tune_args))[0] if local_rank == 0: - print(f'[tuning] SMs {num_sms}, NVL chunk {nvl_chunk_size}: {combine_bf16_nvl_send_bytes / 1e9 / t:.2f} GB/s (NVL) ') + print(f'[tuning] SMs {num_sms}, NVL chunk {nvl_chunk_size}: {combine_bf16_nvl_send_bytes / 1e9 / t:.2f} GB/s (NVL) ', flush=True) if t < best_time: best_time, best_results = t, (num_sms, nvl_chunk_size) if local_rank == 0: - print(f'[tuning] Best combine: SMs {best_results[0]}, NVL chunk {best_results[1]}: {combine_bf16_nvl_send_bytes / 1e9 / best_time:.2f} GB/s (NVL)') - print() + print(f'[tuning] Best combine: SMs {best_results[0]}, NVL chunk {best_results[1]}: {combine_bf16_nvl_send_bytes / 1e9 / best_time:.2f} GB/s (NVL)', flush=True) + print('', flush=True) # noinspection PyUnboundLocalVariable @@ -209,7 +209,7 @@ def test_loop(local_rank: int, num_local_ranks: int): for i in (24, ): test_main(i, local_rank, num_ranks, rank, buffer, group) if local_rank == 0: - print() + print('', flush=True) # Test compatibility with low latency functions if test_ll_compatibility: diff --git a/tests/test_low_latency.py b/tests/test_low_latency.py index ed7b32e..3dba6f1 100644 --- a/tests/test_low_latency.py +++ b/tests/test_low_latency.py @@ -137,10 +137,10 @@ def test_main(num_tokens: int, hidden: int, num_experts: int, num_topk: int, suppress_kineto_output=True) if not return_recv_hook: print(f'[rank {rank}] Dispatch bandwidth: {num_dispatch_comm_bytes / 1e9 / dispatch_t:.2f} GB/s, avg_t={dispatch_t * 1e6:.2f} us | ' - f'Combine bandwidth: {num_combine_comm_bytes / 1e9 / combine_t:.2f} GB/s, avg_t={combine_t * 1e6:.2f} us') + f'Combine bandwidth: {num_combine_comm_bytes / 1e9 / combine_t:.2f} GB/s, avg_t={combine_t * 1e6:.2f} us', flush=True) else: print(f'[rank {rank}] Dispatch send/recv time: {dispatch_t * 2 * 1e6:.2f} us | ' - f'Combine send/recv time: {combine_t * 2 * 1e6:.2f} us') + f'Combine send/recv time: {combine_t * 2 * 1e6:.2f} us', flush=True) return hash_value