diff --git a/.github/workflows/perftest.yml b/.github/workflows/perftest.yml new file mode 100644 index 0000000..30097cc --- /dev/null +++ b/.github/workflows/perftest.yml @@ -0,0 +1,60 @@ +# This workflow runs the SimpleStorage performance test +name: Performance Test + +on: + push: + branches: + - main + - v0.* + pull_request: + branches: + - main + - v0.* + +jobs: + perftest: + runs-on: ubuntu-latest + timeout-minutes: 10 + strategy: + fail-fast: false + matrix: + python-version: ["3.11"] + + steps: + - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@0b93645e9fea7318ecaed2b359559ac225c90a2b # v5.3.0 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install torch torchvision --index-url https://download.pytorch.org/whl/cpu + pip install -e . + - name: Start Ray cluster + run: | + # Get the host IP + HOST_IP=$(hostname -I | awk '{print $1}') + echo "Host IP: $HOST_IP" + # Start Ray with node resource + ray start --head --resources="{\"node:$HOST_IP\":1}" + ray status + - name: Run SimpleStorage performance test + run: | + # Get the host IP + HOST_IP=$(hostname -I | awk '{print $1}') + echo "Host IP: $HOST_IP" + # Run the perftest with small batch size for quick test + cd scripts/performance_test + python perftest.py \ + --backend_config=../../transfer_queue/config.yaml \ + --device=cpu \ + --global_batch_size=128 \ + --field_num=4 \ + --seq_len=1024 \ + --head_node_ip=$HOST_IP \ + --output_csv=results.csv + - name: Stop Ray cluster + run: | + ray stop + if: always() diff --git a/.gitignore b/.gitignore index 983c6ca..fa410a7 100644 --- a/.gitignore +++ b/.gitignore @@ -220,3 +220,6 @@ __marimo__/ #MacOS **/.DS_Store + +# Perftest +scripts/performance_test/results/ diff --git a/docs/storage_backends/openyuanrong_datasystem.md b/docs/storage_backends/openyuanrong_datasystem.md index 084bf55..e25c2f9 100644 --- a/docs/storage_backends/openyuanrong_datasystem.md +++ b/docs/storage_backends/openyuanrong_datasystem.md @@ -132,11 +132,11 @@ from transfer_queue import ( TransferQueueController, process_zmq_server_info, ) -# host, port, manager_type and client_name are the config for booting the datasystem. +# port, manager_type and client_name are the config for booting the datasystem. +# host will be auto-detected by checking local IP addresses. config_str = """ manager_type: YuanrongStorageManager client_name: YuanrongStorageClient - host: 127.0.0.1 port: 31501 """ dict_conf = OmegaConf.create(config_str, flags={"allow_objects": True}) @@ -360,26 +360,22 @@ def main(): config_str = """ manager_type: YuanrongStorageManager client_name: YuanrongStorageClient - host: 10.170.27.24 port: 31501 """ dict_conf = OmegaConf.create(config_str, flags={"allow_objects": True}) # It is important to pay attention to the controller's lifecycle. controller, dict_conf.controller_info = initialize_controller() - - conf_writer = dict_conf.copy() - conf_writer.host = HEAD_NODE_IP - conf_reader = dict_conf.copy() - conf_reader.host = WORKER_NODE_IP + + # Note: host is auto-detected on each node, no need to configure explicitly data = TensorDict({ "prompt": torch.ones(3, 512), "big_tensor": torch.randn(3,1024,1024)}, batch_size=[3]) # you could assign npu or gpu devices by 'resources' # resources={f"node:{HEAD_NODE_IP}": 0.001} could Force the actor to run on HEAD_NODE writer = TransferQueueClientActor.options( resources={f"node:{HEAD_NODE_IP}": 0.001}, - ).remote(conf_writer, "train") + ).remote(dict_conf, "train") reader = TransferQueueClientActor.options( resources={f"node:{WORKER_NODE_IP}": 0.001} - ).remote(conf_reader, "rollout") + ).remote(dict_conf, "rollout") ray.get(writer.put.remote(data=data, partition_id="train_0")) diff --git a/scripts/performance_test.py b/scripts/performance_test.py deleted file mode 100644 index 14d06a4..0000000 --- a/scripts/performance_test.py +++ /dev/null @@ -1,350 +0,0 @@ -# Copyright 2025 Huawei Technologies Co., Ltd. All Rights Reserved. -# Copyright 2025 The TransferQueue Team -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import asyncio -import logging -import math -import random -import sys -import time -from pathlib import Path - -import ray -import torch -from omegaconf import OmegaConf -from tensordict import TensorDict -from tensordict.tensorclass import NonTensorData - -parent_dir = Path(__file__).resolve().parent.parent.parent -sys.path.append(str(parent_dir)) - -from transfer_queue.client import TransferQueueClient # noqa: E402 -from transfer_queue.controller import TransferQueueController # noqa: E402 -from transfer_queue.storage.simple_backend import SimpleStorageUnit # noqa: E402 -from transfer_queue.utils.common import get_placement_group # noqa: E402 -from transfer_queue.utils.zmq_utils import process_zmq_server_info # noqa: E402 - -logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") -logger = logging.getLogger(__name__) - -######################################################################## -# Please set up Ray cluster before running this script -######################################################################## -HEAD_NODE_IP = "NodeA" # Replace with your head node IP -WORKER_NODE_IP = "NodeB" # Replace with your worker node IP - - -# This is the Medium setting of the performance test. -# You can modify the parameters according to -# https://www.yuque.com/haomingzi-lfse7/lhp4el/tml8ke0zkgn6roey?singleDoc# -config_str = """ - global_batch_size: 1024 - seq_length: 8192 - field_num: 10 - num_global_batch: 1 - num_data_storage_units: 8 -""" -dict_conf = OmegaConf.create(config_str) - - -def create_complex_test_case(batch_size=None, seq_length=None, field_num=None): - tensor_field_size_bytes = batch_size * seq_length * 4 - tensor_field_size_gb = tensor_field_size_bytes / (1024**3) - - num_tensor_fields = (field_num + 1) // 2 - num_nontensor_fields = field_num // 2 - - total_tensor_size_gb = tensor_field_size_gb * num_tensor_fields - total_nontensor_size_gb = (batch_size * 1024 / (1024**3)) * num_nontensor_fields - total_size_gb = total_tensor_size_gb + total_nontensor_size_gb - - logger.info(f"Total data size: {total_size_gb:.6f} GB") - - fields = {} - - for i in range(field_num): - field_name = f"field_{i}" - - if i % 2 == 0: - # Tensor - tensor_data = torch.randn(batch_size, seq_length, dtype=torch.float32) - fields[field_name] = tensor_data - else: - # NonTensorData - str_length = 1024 - non_tensor_data = [ - "".join(random.choices("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789", k=str_length)) - for _ in range(batch_size) - ] - fields[field_name] = NonTensorData(data=non_tensor_data, batch_size=(batch_size,), device=None) - - batch_size_tuple = (batch_size,) - prompt_batch = TensorDict( - fields, - batch_size=batch_size_tuple, - device=None, - ) - - return prompt_batch, total_size_gb - - -@ray.remote -class RemoteDataStoreObjStore: - def __init__(self): - pass - - def get_data(self, data_handler): - start_get = time.time() - ray.get(data_handler) - end_get = time.time() - - get_time = end_get - start_get - return get_time - - -@ray.remote -class RemoteDataStoreRemote: - def __init__(self): - self.stored_data = None - - def put_data(self, data): - self.stored_data = data - - def get_data(self): - return self.stored_data - - def clear_data(self): - self.stored_data = None - - -class RayBandwidthTester: - def __init__(self, config, test_mode="obj_store"): - self.config = config - self.test_mode = test_mode - - if test_mode == "obj_store": - RemoteDataStore = RemoteDataStoreObjStore - else: - RemoteDataStore = RemoteDataStoreRemote - - self.remote_store = RemoteDataStore.options(num_cpus=10, resources={f"node:{WORKER_NODE_IP}": 0.001}).remote() - - logger.info(f"Remote data store created on worker node {WORKER_NODE_IP}") - - def run_bandwidth_test(self): - start_create_data = time.time() - test_data, total_data_size_gb = create_complex_test_case( - batch_size=self.config.global_batch_size, seq_length=self.config.seq_length, field_num=self.config.field_num - ) - end_create_data = time.time() - logger.info(f"Data creation time: {end_create_data - start_create_data:.8f}s") - - if self.test_mode == "obj_store": - self._run_obj_store_test(test_data, total_data_size_gb) - else: - self._run_remote_test(test_data, total_data_size_gb) - - def _run_obj_store_test(self, test_data, total_data_size_gb): - start_time = time.time() - data_handler = ray.put(test_data) - ray.get(self.remote_store.get_data.remote([data_handler])) - end_time = time.time() - - transfer_time = end_time - start_time - throughput = (total_data_size_gb * 8) / transfer_time - - logger.info("=" * 60) - logger.info("RAY OBJECT STORE BANDWIDTH TEST SUMMARY") - logger.info("=" * 60) - logger.info(f"Data Size: {(total_data_size_gb):.6f} GB") - logger.info(f"Transfer Time: {transfer_time:.8f}s") - logger.info(f"Throughput: {throughput:.8f} Gb/s") - - def _run_remote_test(self, test_data, total_data_size_gb): - logger.info("Starting Ray PUT bandwidth test...") - start_put = time.time() - ray.get(self.remote_store.put_data.remote(test_data)) - end_put = time.time() - put_time = end_put - start_put - logger.info(f"PUT Time: {put_time:.8f}s") - - time.sleep(2) - - logger.info("Starting Ray GET bandwidth test...") - start_get = time.time() - ray.get(self.remote_store.get_data.remote()) - end_get = time.time() - get_time = end_get - start_get - logger.info(f"GET Time: {get_time:.8f}s") - - ray.get(self.remote_store.clear_data.remote()) - - put_throughput = (total_data_size_gb * 8) / put_time - get_throughput = (total_data_size_gb * 8) / get_time - - logger.info("=" * 60) - logger.info("RAY REMOTE ACTOR BANDWIDTH TEST SUMMARY") - logger.info("=" * 60) - logger.info(f"Data Size: {total_data_size_gb:.6f} GB") - logger.info(f"PUT Time: {put_time:.8f}s") - logger.info(f"GET Time: {get_time:.8f}s") - logger.info(f"PUT Throughput (Head->Worker): {put_throughput:.8f} Gb/s") - logger.info(f"GET Throughput (Worker->Head): {get_throughput:.8f} Gb/s") - logger.info(f"Round-trip Average Throughput: {total_data_size_gb * 16 / (put_time + get_time):.8f} Gb/s") - - -class TQBandwidthTester: - def __init__(self, config, remote_mode=False): - self.config = config - self.remote_mode = remote_mode - self.data_system_client = self._initialize_data_system() - - def _initialize_data_system(self): - total_storage_size = self.config.global_batch_size * self.config.num_global_batch - self.data_system_storage_units = {} - - if self.remote_mode: - for storage_unit_rank in range(self.config.num_data_storage_units): - storage_node = SimpleStorageUnit.options( - num_cpus=10, - resources={f"node:{WORKER_NODE_IP}": 0.001}, - ).remote(storage_unit_size=math.ceil(total_storage_size / self.config.num_data_storage_units)) - self.data_system_storage_units[storage_unit_rank] = storage_node - else: - storage_placement_group = get_placement_group(self.config.num_data_storage_units, num_cpus_per_actor=10) - for storage_unit_rank in range(self.config.num_data_storage_units): - storage_node = SimpleStorageUnit.options( - placement_group=storage_placement_group, - placement_group_bundle_index=storage_unit_rank, - ).remote(storage_unit_size=math.ceil(total_storage_size / self.config.num_data_storage_units)) - self.data_system_storage_units[storage_unit_rank] = storage_node - - logger.info(f"TransferQueueStorageSimpleUnit #0 ~ #{storage_unit_rank} has been created.") - - self.data_system_controller = TransferQueueController.remote() - logger.info("TransferQueueController has been created.") - - self.data_system_controller_info = process_zmq_server_info(self.data_system_controller) - self.data_system_storage_unit_infos = process_zmq_server_info(self.data_system_storage_units) - - tq_config = OmegaConf.create({}, flags={"allow_objects": True}) - tq_config.controller_info = self.data_system_controller_info - tq_config.storage_unit_infos = self.data_system_storage_unit_infos - self.config = OmegaConf.merge(tq_config, self.config) - - self.data_system_client = TransferQueueClient( - client_id="Trainer", controller_info=self.data_system_controller_info - ) - self.data_system_client.initialize_storage_manager(manager_type="AsyncSimpleStorageManager", config=self.config) - return self.data_system_client - - def run_bandwidth_test(self): - logger.info("Creating large batch for bandwidth test...") - start_create_data = time.time() - big_input_ids, total_data_size_gb = create_complex_test_case( - batch_size=self.config.global_batch_size, seq_length=self.config.seq_length, field_num=self.config.field_num - ) - end_create_data = time.time() - logger.info(f"Data creation time: {end_create_data - start_create_data:.8f}s") - - logger.info("Starting PUT operation...") - start_async_put = time.time() - asyncio.run(self.data_system_client.async_put(data=big_input_ids, partition_id="train_0")) - end_async_put = time.time() - put_time = end_async_put - start_async_put - - put_throughput_gbps = (total_data_size_gb * 8) / put_time - logger.info(f"async_put cost time: {put_time:.8f}s") - logger.info(f"PUT Throughput: {put_throughput_gbps:.8f} Gb/s") - - time.sleep(2) - - logger.info("Starting GET_META operation...") - start_async_get_meta = time.time() - prompt_meta = asyncio.run( - self.data_system_client.async_get_meta( - data_fields=list(big_input_ids.keys()), - batch_size=big_input_ids.size(0), - partition_id="train_0", - task_name="generate_sequences", - ) - ) - end_async_get_meta = time.time() - logger.info(f"async_get_meta cost time: {end_async_get_meta - start_async_get_meta:.8f}s") - - time.sleep(2) - - logger.info("Starting GET_DATA operation...") - start_async_get_data = time.time() - asyncio.run(self.data_system_client.async_get_data(prompt_meta)) - end_async_get_data = time.time() - get_time = end_async_get_data - start_async_get_data - get_throughput_gbps = (total_data_size_gb * 8) / get_time - - logger.info(f"async_get_data cost time: {get_time:.8f}s") - logger.info(f"GET Throughput: {get_throughput_gbps:.8f} Gb/s") - - mode_name = "TQ REMOTE" if self.remote_mode else "TQ NORMAL" - logger.info("=" * 60) - logger.info(f"{mode_name} BANDWIDTH TEST SUMMARY") - logger.info("=" * 60) - logger.info(f"Total Data Size: {total_data_size_gb:.6f} GB") - logger.info(f"PUT Time: {put_time:.8f}s") - logger.info(f"GET Time: {get_time:.8f}s") - logger.info(f"PUT Throughput: {put_throughput_gbps:.8f} Gb/s") - logger.info(f"GET Throughput: {get_throughput_gbps:.8f} Gb/s") - logger.info(f"Network Round-trip Throughput: {(total_data_size_gb * 16) / (put_time + get_time):.8f} Gb/s") - - -def main(): - if len(sys.argv) < 2: - print("Usage: python performance_test.py ") - print("Available test modes:") - print(" ray-obj-store - Ray Object Store bandwidth test") - print(" ray-remote - Ray Remote Actor bandwidth test") - print(" tq-normal - TQ Normal mode bandwidth test") - print(" tq-remote - TQ Remote mode bandwidth test") - return - - test_mode = sys.argv[1] - - if test_mode == "ray-obj-store": - logger.info("Starting Ray Object Store bandwidth test") - tester = RayBandwidthTester(config=dict_conf, test_mode="obj_store") - tester.run_bandwidth_test() - logger.info("Ray Object Store bandwidth test completed successfully!") - - elif test_mode == "ray-remote": - logger.info("Starting Ray Remote Actor bandwidth test") - tester = RayBandwidthTester(config=dict_conf, test_mode="remote") - tester.run_bandwidth_test() - logger.info("Ray Remote Actor bandwidth test completed successfully!") - - elif test_mode in ["tq-normal", "tq-remote"]: - remote_mode = test_mode == "tq-remote" - mode_name = "TQ Remote" if remote_mode else "TQ Normal" - logger.info(f"Starting {mode_name} bandwidth test") - - tester = TQBandwidthTester(config=dict_conf, remote_mode=remote_mode) - tester.run_bandwidth_test() - logger.info(f"{mode_name} bandwidth test completed successfully!") - - else: - print(f"Unknown test mode: {test_mode}") - print("Available test modes: ray-obj-store, ray-remote, tq-normal, tq-remote") - - -if __name__ == "__main__": - main() diff --git a/scripts/performance_test/README_PERFTEST.md b/scripts/performance_test/README_PERFTEST.md new file mode 100644 index 0000000..1b5ddc6 --- /dev/null +++ b/scripts/performance_test/README_PERFTEST.md @@ -0,0 +1,184 @@ +# TransferQueue Throughput Test + +This script runs throughput tests for TransferQueue with different backends. + +## Prerequisites + +1. Start Ray cluster with node resources: + ```bash + # On head node + ray start --head --resources='{"node:192.168.0.1":1}' + # On worker node + ray start --address=192.168.0.1:6379 --resources='{"node:192.168.0.2":1}' + ``` + +2. Start the backend service (Yuanrong, MooncakeStore, etc.) if testing non-SimpleStorage backends. + +## Usage + +```bash +python perftest.py \ + --backend_config=perftest_config.yaml \ + --backend=SimpleStorage \ + --device=cpu \ + --global_batch_size=1024 \ + --field_num=9 \ + --seq_len=8192 \ + --head_node_ip=192.168.0.1 \ + --worker_node_ip=192.168.0.2 +``` + +## Arguments + +| Argument | Description | Default | Required | +|----------|-------------|---------|----------| +| `--backend_config` | Path to backend config YAML file | - | Yes | +| `--backend` | Override `storage_backend` in config (`SimpleStorage`, `Yuanrong`, `MooncakeStore`) | None | No | +| `--device` | Device: `cpu`, `npu`, `gpu` | `cpu` | No | +| `--global_batch_size` | Global batch size | 1024 | No | +| `--field_num` | Number of fields in the TensorDict | 10 | No | +| `--seq_len` | Sequence length | 8192 | No | +| `--num_test_iterations` | Number of test iterations | 4 | No | +| `--head_node_ip` | Head node IP address | - | Yes | +| `--worker_node_ip` | Worker node IP address (required for Yuanrong) | None | No | +| `--output_csv` | Path to output CSV file | None | No | + +## Backend Configuration + +The script reads the backend configuration directly from the provided `--backend_config` YAML file. The backend type is determined by `backend.storage_backend` in the config file. When `--backend` is specified, it overrides the value in the config. + +For device support of each backend: +- `SimpleStorage`: `cpu` +- `Yuanrong`: `cpu`, `npu` +- `MooncakeStore`: `cpu`, `gpu` + +## Test Data Format + +The test case creates a `TensorDict` with three types of fields to simulate real training batches: + +1. **Regular tensors**: Shape `(batch_size, seq_length)`, float32. +2. **Nested tensors** (non-NPU devices): Variable-length ragged sequences with lengths forming an arithmetic progression from 1 to `seq_length`. Average length ≈ `seq_length / 2`, so each nested field is roughly half the size of a regular field. +3. **NonTensorStack strings**: Each string is `seq_length × 4` bytes, matching the memory footprint of one tensor element. + +Fields are distributed evenly across the three types (rounded up). For NPU devices, nested tensors fall back to regular tensors of shape `(batch_size, seq_length // 2)`. + +## Test Flow + +Each iteration performs a PUT → LIST → GET → DELETE cycle via TransferQueue's KV API: + +1. **PUT** (`kv_batch_put`): Writer sends the TensorDict to storage. +2. **LIST** (`kv_list`): Reader queries available keys in the partition. +3. **GET** (`kv_batch_get`): Reader fetches data for those keys. +4. **DELETE** (`kv_clear`): Writer removes the written data. + +The test runs `--num_test_iterations` iterations. Data creation only happens in the first iteration; subsequent iterations reuse the same TensorDict to isolate transfer overhead. + +## Yuanrong Backend + +For Yuanrong backend, writer runs on the head node and reader runs on the worker node. `--worker_node_ip` is required. + +## Running Full Test Suite + +The `run_perf_test.sh` script automates the full test suite across all backends and data sizes, then generates a comparison chart: + +```bash +cd scripts/performance_test +./run_perf_test.sh +``` + +### Configuration + +Configure via environment variables: + +| Variable | Description | Default | +|----------|-------------|---------| +| `HEAD_NODE_IP` | Head node IP address | `127.0.0.1` | +| `WORKER_NODE_IP` | Worker node IP address | `127.0.0.1` | +| `DEVICE` | Device type (`cpu`, `npu`, `gpu`) | `cpu` | +| `NUM_TEST_ITERATIONS` | Number of iterations per test | `4` | +| `USE_COMPLEX_CASE` | Run with complex test case (nested + nontensor fields) | `false` | + +Example: +```bash +# Simple case (default, regular tensors only) +./run_perf_test.sh + +# Complex case (nested tensors + nontensor strings) +USE_COMPLEX_CASE=true ./run_perf_test.sh + +# With specific node IPs & use NPU +HEAD_NODE_IP=192.168.0.1 WORKER_NODE_IP=192.168.0.2 DEVICE=npu ./run_perf_test.sh +``` + +### Test Matrix + +- **Backends**: SimpleStorage, Yuanrong, MooncakeStore, Ray (baseline) +- **Data sizes**: Small (batch=1024, fields=9, seq=8192), Medium (batch=4096, fields=15, seq=32768), Large (batch=8192, fields=18, seq=100000) + +### Output + +- CSV results: `results/{backend}_{size}.csv` (e.g., `results/simplestorage_small.csv`, `results/ray_baseline_medium.csv`) +- Performance chart: `results/performance_comparison.pdf` + +### Ray Baseline + +`ray_perftest_baseline.py` measures raw Ray inter-node transfer throughput without TransferQueue, serving as a baseline. It passes a TensorDict directly to a remote Ray actor (via `ray.get`), using the same test data format. It is automatically included in `run_perf_test.sh`. + +### draw_figure.py + +After running the tests, `draw_figure.py` reads all CSV files from `results/` and generates a grouped bar chart comparing total throughput (Gbps) across backends and data sizes. + +## Examples + +### SimpleStorage backend +```bash +python perftest.py --backend_config=perftest_config.yaml --backend=SimpleStorage \ + --head_node_ip=192.168.0.1 +``` + +### Yuanrong backend (inter-node) +```bash +python perftest.py --backend_config=perftest_config.yaml --backend=Yuanrong \ + --head_node_ip=192.168.0.1 --worker_node_ip=192.168.0.2 +``` + +### MooncakeStore backend +```bash +python perftest.py --backend_config=perftest_config.yaml --backend=MooncakeStore \ + --head_node_ip=192.168.0.1 +``` + +### NPU device test (Yuanrong) +```bash +python perftest.py --backend_config=perftest_config.yaml --backend=Yuanrong --device=npu \ + --head_node_ip=192.168.0.1 --worker_node_ip=192.168.0.2 +``` + +### Output to CSV +```bash +python perftest.py --backend_config=perftest_config.yaml --backend=SimpleStorage \ + --head_node_ip=192.168.0.1 --output_csv=results.csv +``` + +## Output Format + +The test prints: +- Total data size +- PUT time and throughput +- GET time and throughput +- Total round-trip throughput + +Throughput is shown in both Gb/s (gigabits per second) and GB/s (gigabytes per second). + +### CSV Columns + +| Column | Description | +|--------|-------------| +| `backend` | Backend name | +| `device` | Device type | +| `total_data_size_gb` | Data size in GB | +| `put_time` | PUT duration (seconds) | +| `get_time` | GET duration (seconds) | +| `put_gbit_per_sec` | PUT throughput (Gbps) | +| `get_gbit_per_sec` | GET throughput (Gbps) | +| `total_gbit_per_sec` | Round-trip throughput (Gbps) | diff --git a/scripts/performance_test/draw_figure.py b/scripts/performance_test/draw_figure.py new file mode 100644 index 0000000..400c7b9 --- /dev/null +++ b/scripts/performance_test/draw_figure.py @@ -0,0 +1,169 @@ +#!/usr/bin/env python3 +# Copyright 2025 Huawei Technologies Co., Ltd. All Rights Reserved. +# Copyright 2025 The TransferQueue Team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pathlib import Path + +import matplotlib.pyplot as plt +import pandas as pd +import seaborn as sns + +results_dir = Path(__file__).resolve().parent / "results" +csv_files = list(results_dir.glob("*.csv")) + +if not csv_files: + raise FileNotFoundError(f"No CSV files found in {results_dir}") + +size_order = ["Small", "Medium", "Large"] + +# Filename -> display name mapping for backends. +# All normalization lives here so the shell script keeps simple lowercase names. +BACKEND_DISPLAY_NAMES = { + "simplestorage": "SimpleStorage", + "yuanrong": "Yuanrong", + "mooncakestore": "MooncakeStore", + "ray_baseline": "Ray", +} + + +def format_size(size_gb: float) -> str: + """Format a data size in GB to a human-readable string with appropriate unit.""" + if size_gb >= 1.0: + return f"{size_gb:.2f} GB" + size_mb = size_gb * 1024 + if size_mb >= 1.0: + return f"{size_mb:.2f} MB" + size_kb = size_mb * 1024 + return f"{size_kb:.2f} KB" + + +dfs = [] +for csv_file in csv_files: + df = pd.read_csv(csv_file) + # Parse size label and backend from filename: {backend}_{size_label}.csv + # Size label is always the last _-separated segment (lowercase). + # Backend is everything before the last underscore. + # e.g. "simplestorage_small.csv" -> backend_key="simplestorage", size_label="Small" + # e.g. "ray_baseline_small.csv" -> backend_key="ray_baseline", size_label="Small" + stem = csv_file.stem + parts = stem.rsplit("_", 1) + if len(parts) != 2: + print(f"Warning: skipping {csv_file.name}, unexpected filename format") + continue + raw_backend, raw_size = parts + size_label = raw_size.capitalize() + if size_label not in size_order: + print(f"Warning: skipping {csv_file.name}, unrecognized size label '{raw_size}'") + continue + df["backend_parsed"] = BACKEND_DISPLAY_NAMES.get(raw_backend, raw_backend) + df["size_label"] = size_label + dfs.append(df) + +df = pd.concat(dfs, ignore_index=True) + +existing_sizes = [s for s in size_order if s in df["size_label"].unique()] + +# Build composite X-axis label: "SizeLabel\n" +size_to_gb = df.groupby("size_label")["total_data_size_gb"].first().to_dict() + + +def make_xlabel(size_label: str) -> str: + return f"{size_label}\n{format_size(size_to_gb.get(size_label, 0))}" + + +df["X_label"] = df["size_label"].apply(make_xlabel) + +# Make X_label categorical with the correct ordering +df["X_label"] = pd.Categorical( + df["X_label"], + categories=[make_xlabel(s) for s in existing_sizes], + ordered=True, +) + +df["Bandwidth"] = df["total_gbit_per_sec"] +df["Scenario"] = df["backend_parsed"] + +# Set backend display order: only include backends that actually exist in the data +preferred_backend_order = ["Ray", "SimpleStorage", "Yuanrong", "MooncakeStore"] + +# Get actual backends present in the data, maintaining preferred order +actual_backends = df["Scenario"].unique().tolist() +backend_order = [b for b in preferred_backend_order if b in actual_backends] +# Add any unknown backends at the end (shouldn't happen normally) +backend_order += [b for b in actual_backends if b not in preferred_backend_order] + +df["Scenario"] = pd.Categorical(df["Scenario"], categories=backend_order, ordered=True) + +# ========== Plotting ========== +sns.set_theme(style="white", palette="husl") + +fig, ax = plt.subplots(figsize=(12, 7)) + +# Use Set2 palette to generate colors for all backends +# Set2 has 8 colors, which should be enough for typical use cases +palette = sns.color_palette("Set2", n_colors=len(backend_order)) +barplot = sns.barplot(data=df, x="X_label", y="Bandwidth", hue="Scenario", ax=ax, alpha=0.8, palette=palette) + +# Legend: match old style — at the top center, horizontal, with frame +handles, labels = ax.get_legend_handles_labels() +# Move legend above the plot +ax.get_legend().remove() +fig.legend( + handles, + labels, + bbox_to_anchor=(0.5, 1.0), + loc="upper center", + ncol=len(handles), + title="", + frameon=True, + fancybox=True, + shadow=True, + fontsize=13, +) + +# Annotations on bars +for p in ax.patches: + height = p.get_height() + if height > 0: + ax.annotate( + f"{height:.3f}", + (p.get_x() + p.get_width() / 2.0, height), + ha="center", + va="bottom", + fontsize=11, + rotation=0, + ) + +# Axis formatting +ax.set_title("Performance Comparison (Total Throughput)", fontsize=16, fontweight="bold") +ax.set_xlabel("") +ax.set_ylabel("Bandwidth (Gbps)", fontsize=16) + +# Adjust y range to leave room for annotations +y_max = df["Bandwidth"].max() * 1.15 +ax.set_ylim(0, y_max) + +ax.grid(True, alpha=0.3) +ax.tick_params(axis="x", labelsize=14) +ax.tick_params(axis="y", labelsize=13) + +# Unified x-label at the bottom +fig.text(0.5, 0.02, "Data Volume", ha="center", fontsize=20) + +plt.tight_layout(rect=[0, 0.04, 1, 0.95]) # room for legend + x-label +plt.savefig(results_dir / "performance_comparison.pdf", dpi=300, bbox_inches="tight") +plt.show() + +print("Performance comparison plot generated and saved as 'performance_comparison.pdf'") diff --git a/scripts/performance_test/perftest.py b/scripts/performance_test/perftest.py new file mode 100644 index 0000000..95eb45b --- /dev/null +++ b/scripts/performance_test/perftest.py @@ -0,0 +1,625 @@ +#!/usr/bin/env python3 +# Copyright 2025 Huawei Technologies Co., Ltd. All Rights Reserved. +# Copyright 2025 The TransferQueue Team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import csv +import logging +import os +import sys +import time +from pathlib import Path +from typing import Any + +import ray +import torch +from omegaconf import OmegaConf +from tensordict import NonTensorStack, TensorDict + +parent_dir = Path(__file__).resolve().parent.parent.parent +sys.path.append(str(parent_dir)) + +import transfer_queue as tq # noqa: E402 + +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") +logger = logging.getLogger(__name__) + + +def create_test_case( + batch_size: int | None = None, + seq_length: int | None = None, + field_num: int | None = None, + device: str = "cpu", +) -> tuple[TensorDict, float]: + """Create a test case with tensor data formats. + + Creates TensorDict with: + - Regular tensors: (batch_size, seq_length) shape, each element is float32 + + Args: + batch_size: Batch size for the test case + seq_length: Maximum sequence length (used for regular tensors and + as upper bound for nested tensor lengths) + field_num: Total number of fields to create (distributed across types) + device: Device to create tensors on ("cpu", "npu", or "gpu") + + Returns: + Tuple of (TensorDict, total_size_gb) + """ + bytes_per_element = 4 # float32 + + # Each regular tensor field: batch_size * seq_length * 4 bytes + regular_field_size_bytes = batch_size * seq_length * bytes_per_element + regular_field_size_gb = regular_field_size_bytes / (1024**3) + + # Total size = sum of all field types + total_size_gb = regular_field_size_gb * field_num + + logger.info(f"Total data size: {total_size_gb:.6f} GB") + + # Determine torch device + torch_device = None + if device == "npu": + torch_device = "npu:0" + elif device == "gpu": + torch_device = "cuda:0" + + # Set seeds for reproducibility (within this process) + # For non-NPU: arithmetic progression lengths from 1 to seq_length for each nested field + # For NPU: nested fields become regular tensors of seq_length // 2 + + batch_size_tuple = (batch_size,) + + prompt_batch = TensorDict(batch_size=batch_size_tuple) + + # 1. Regular tensor fields + for i in range(field_num): + field_name = f"field_{i}" + tensor_data = torch.randn(batch_size, seq_length, dtype=torch.float32, device=torch_device) + prompt_batch.set(field_name, tensor_data) + + return prompt_batch, total_size_gb + + +def create_complex_test_case( + batch_size: int | None = None, + seq_length: int | None = None, + field_num: int | None = None, + device: str = "cpu", +) -> tuple[TensorDict, float]: + """Create a test case with complex data formats. + + Creates TensorDict with: + - Regular tensors: (batch_size, seq_length) shape, each element is float32 + - Nested Tensors (non-NPU): variable-length sequences with lengths forming an + arithmetic progression from 1 to seq_length (average length ≈ seq_length/2) + - Nested Tensors (NPU): regular tensors of shape (batch_size, seq_length//2) + - NonTensorStack wrapped strings: each string size ~= seq_length * 4 bytes + (to match memory footprint of one tensor element) + + Args: + batch_size: Batch size for the test case + seq_length: Maximum sequence length (used for regular tensors and + as upper bound for nested tensor lengths) + field_num: Total number of fields to create (distributed across types) + device: Device to create tensors on ("cpu", "npu", or "gpu") + + Returns: + Tuple of (TensorDict, total_size_gb) + """ + bytes_per_element = 4 # float32 + + # Calculate field distribution (1/3 each type, last fields may be regular) + num_regular_fields = (field_num + 2) // 3 + num_nested_fields = (field_num + 2) // 3 + num_nontensor_fields = field_num - num_regular_fields - num_nested_fields + + # Each regular tensor field: batch_size * seq_length * 4 bytes + regular_field_size_bytes = batch_size * seq_length * bytes_per_element + regular_field_size_gb = regular_field_size_bytes / (1024**3) + + # Nested tensor field: average length = (1 + seq_length) / 2 (arithmetic progression), + # so avg size = batch_size * (1 + seq_length) / 2 * 4 bytes + # For NPU, nested fields become regular tensors of seq_length // 2 + if device == "npu": + avg_nested_length = seq_length // 2 + nested_field_size_bytes = int(batch_size * avg_nested_length * bytes_per_element) + else: + avg_nested_length = (1 + seq_length) / 2 + nested_field_size_bytes = int(batch_size * avg_nested_length * bytes_per_element) + nested_field_size_gb = nested_field_size_bytes / (1024**3) + + # NonTensorStack string field: each string ~= seq_length * 4 bytes to match one tensor element + # Total for field: batch_size strings * seq_length * 4 bytes each + string_size_per_elem = seq_length * bytes_per_element + nontensor_field_size_bytes = batch_size * string_size_per_elem + nontensor_field_size_gb = nontensor_field_size_bytes / (1024**3) + + # Total size = sum of all field types + total_size_gb = ( + regular_field_size_gb * num_regular_fields + + nested_field_size_gb * num_nested_fields + + nontensor_field_size_gb * num_nontensor_fields + ) + + logger.info(f"Total data size: {total_size_gb:.6f} GB") + + # Determine torch device + torch_device = None + if device == "npu": + torch_device = "npu:0" + elif device == "gpu": + torch_device = "cuda:0" + + # Set seeds for reproducibility (within this process) + # For non-NPU: arithmetic progression lengths from 1 to seq_length for each nested field + # For NPU: nested fields become regular tensors of seq_length // 2 + + batch_size_tuple = (batch_size,) + + prompt_batch = TensorDict(batch_size=batch_size_tuple) + + # 1. Regular tensor fields + for i in range(num_regular_fields): + field_name = f"field_{i}" + tensor_data = torch.randn(batch_size, seq_length, dtype=torch.float32, device=torch_device) + prompt_batch.set(field_name, tensor_data) + + # 2. Nested Tensor fields (variable-length sequences) or regular tensors for NPU + if device != "npu": + step = (seq_length - 1) / (batch_size - 1) if batch_size > 1 else 0 + lengths = [max(1, min(int(round(1 + j * step)), seq_length)) for j in range(batch_size)] + total_elements = sum(lengths) + + for i in range(num_nested_fields): + field_name = f"nested_field_{i}" + + if device == "npu": + # For NPU: create a regular tensor of seq_length // 2 + tensor_data = torch.randn(batch_size, seq_length // 2, dtype=torch.float32, device=torch_device) + prompt_batch.set(field_name, tensor_data) + else: + flat_data = torch.randn(total_elements, dtype=torch.float32, device=torch_device) + nested_tuple = torch.split(flat_data, lengths) + nested_tensor = torch.nested.as_nested_tensor(nested_tuple, layout=torch.jagged) + prompt_batch.set(field_name, nested_tensor) + + # 3. NonTensorStack wrapped strings + # Each string ~= seq_length * 4 bytes to match one tensor element's memory footprint + string_char_count = seq_length * bytes_per_element # 4 bytes per char (unicode) + + for i in range(num_nontensor_fields): + field_name = f"nontensor_field_{i}" + bytes_needed = string_char_count // 2 + string_data = [os.urandom(bytes_needed).hex() for _ in range(batch_size)] + + prompt_batch.set(field_name, NonTensorStack.from_list(string_data)) + + return prompt_batch, total_size_gb + + +@ray.remote +class TQClientActor: + """Ray actor that uses tq.init(config) to initialize.""" + + def __init__(self, config: dict[str, Any], use_complex_case: bool = False): + self.config = config + self.use_complex_case = use_complex_case + self.test_data = None + self.total_data_size_gb = 0.0 + self.test_keys = None + + def initialize(self) -> None: + """Initialize transfer_queue with the config.""" + tq.init(OmegaConf.create(self.config)) + + def create_test_case( + self, + batch_size: int | None = None, + seq_length: int | None = None, + field_num: int | None = None, + device: str = "cpu", + ) -> tuple[list[str], float]: + """Create test case on the actor.""" + if self.use_complex_case: + self.test_data, self.total_data_size_gb = create_complex_test_case( + batch_size, seq_length, field_num, device + ) + else: + self.test_data, self.total_data_size_gb = create_test_case(batch_size, seq_length, field_num, device) + # Create keys for each sample in the batch + self.test_keys = [f"test_key_{i}" for i in range(batch_size)] + return list(self.test_data.keys()), self.total_data_size_gb + + def put(self, partition_id: str) -> None: + """Put data to storage using kv_batch_put.""" + tq.kv_batch_put(keys=self.test_keys, partition_id=partition_id, fields=self.test_data) + + def list_keys(self, partition_id: str) -> list[str]: + """List keys in a partition using kv_list.""" + partition_info = tq.kv_list(partition_id=partition_id) + if partition_id in partition_info: + return list(partition_info[partition_id].keys()) + return [] + + def get_data(self, partition_id: str, keys: list[str] | None = None) -> None: + """Get data from storage using kv_batch_get.""" + if keys is None: + keys = self.test_keys + tq.kv_batch_get(keys=keys, partition_id=partition_id) + + def delete(self, partition_id: str, keys: list[str] | None = None) -> None: + """Delete data from storage using kv_clear.""" + if keys is None: + keys = self.test_keys + tq.kv_clear(keys=keys, partition_id=partition_id) + + def close(self) -> None: + """Close transfer_queue.""" + tq.close() + + +class TQThroughputTester: + """Main throughput tester for TransferQueue backends.""" + + def __init__( + self, + backend_config_path: str, + device: str, + global_batch_size: int, + field_num: int, + seq_len: int, + num_test_iterations: int, + head_node_ip: str, + backend: str | None = None, + worker_node_ip: str | None = None, + output_csv: str | None = None, + use_complex_case: bool = False, + ): + """Initialize the throughput tester. + + Args: + backend_config_path: Path to backend config YAML file + backend: Override storage_backend in config (e.g. "SimpleStorage") + device: Device type ("cpu", "npu", "gpu") + global_batch_size: Global batch size + field_num: Number of fields + seq_len: Sequence length + num_test_iterations: Number of test iterations + head_node_ip: Head node IP address + worker_node_ip: Worker node IP address (required for Yuanrong) + output_csv: Path to output CSV file (optional) + use_complex_case: Whether to use complex test case (nested + nontensor fields) + """ + self.backend_config_path = backend_config_path + self.backend_override = backend + self.device = device + self.global_batch_size = global_batch_size + self.field_num = field_num + self.seq_len = seq_len + self.num_test_iterations = num_test_iterations + self.head_node_ip = head_node_ip + self.worker_node_ip = worker_node_ip + self.output_csv = output_csv + self.use_complex_case = use_complex_case + + # Prepare full config for tq.init() + self.full_config = self._prepare_config() + + # Get backend from config + self.backend = self.full_config["backend"]["storage_backend"] + + # For Yuanrong, always use inter_node + self.use_inter_node = self.backend == "Yuanrong" + + # Validate arguments + self._validate_args() + + # Initialize clients + self._initialize_clients() + + def _validate_args(self) -> None: + """Validate input arguments.""" + # Check worker_node_ip for Yuanrong + if self.use_inter_node and self.worker_node_ip is None: + raise ValueError("worker_node_ip is required for Yuanrong backend") + + def _prepare_config(self) -> dict[str, Any]: + """Prepare the config by directly reading the backend_config file. + + Returns: + Configuration dictionary + """ + # Directly read the backend_config file, no merging with default + config = OmegaConf.load(self.backend_config_path) + + # Override storage_backend if specified via CLI + if self.backend_override is not None: + config.backend.storage_backend = self.backend_override + logger.info(f"Overriding storage_backend to: {self.backend_override}") + + # If backend.storage_backend is SimpleStorage, override total_storage_size + total_storage_size = self.global_batch_size * self.num_test_iterations + if config.backend.storage_backend == "SimpleStorage": + config.backend.SimpleStorage.total_storage_size = total_storage_size + + return OmegaConf.to_container(config, resolve=True) + + def _initialize_clients(self) -> None: + """Initialize writer and reader TQClientActors.""" + # Determine node placement + if self.use_inter_node: + writer_node = self.head_node_ip + reader_node = self.worker_node_ip + else: + writer_node = reader_node = self.head_node_ip + + logger.info(f"Writer is on {writer_node}, Reader is on {reader_node}") + + # Prepare base options + writer_options = { + "num_cpus": 0.001, + "resources": {f"node:{writer_node}": 0.001}, + } + reader_options = { + "num_cpus": 0.001, + "resources": {f"node:{reader_node}": 0.001}, + } + + # Add device-specific options + if self.device == "gpu": + writer_options["num_gpus"] = 1 + reader_options["num_gpus"] = 1 + elif self.device == "npu": + writer_options["resources"]["NPU"] = 1 + reader_options["resources"]["NPU"] = 1 + + # Prepare configs for writer and reader + # Host is auto-detected on each node for Yuanrong backend + writer_config = self.full_config + reader_config = self.full_config + + # Create writer and reader actors + self.writer = TQClientActor.options(**writer_options).remote(writer_config, self.use_complex_case) + self.reader = TQClientActor.options(**reader_options).remote(reader_config, self.use_complex_case) + + # Initialize transfer_queue + logger.info(f"Using {self.backend} as storage backend.") + + w = self.writer.initialize.remote() + r = self.reader.initialize.remote() + ray.get([w, r]) + + def run_throughput_test(self, skip_dataset_create=False) -> dict[str, Any]: + """Run the throughput test and print results. + + Returns: + Dictionary with test results + """ + # Create test data + if not skip_dataset_create: + logger.info("Creating large batch for throughput test...") + start_create_data = time.perf_counter() + data_fields, self.total_data_size_gb = ray.get( + self.writer.create_test_case.remote( + batch_size=self.global_batch_size, + seq_length=self.seq_len, + field_num=self.field_num, + device=self.device, + ) + ) + end_create_data = time.perf_counter() + logger.info(f"Total Data Size: {self.total_data_size_gb:.6f} GB") + logger.info(f"Data creation time: {end_create_data - start_create_data:.8f}s") + + partition_id = "train_0" + + # PUT operation using kv_batch_put + logger.info("Starting PUT operation (kv_batch_put)...") + start_put = time.perf_counter() + ray.get(self.writer.put.remote(partition_id=partition_id)) + end_put = time.perf_counter() + put_time = end_put - start_put + put_gbit_per_sec = (self.total_data_size_gb * 8) / put_time + put_gbyte_per_sec = self.total_data_size_gb / put_time + + time.sleep(2) + + # LIST_KEYS operation using kv_list + logger.info("Starting LIST_KEYS operation (kv_list)...") + keys = ray.get(self.reader.list_keys.remote(partition_id=partition_id)) + + time.sleep(2) + + # GET_DATA operation using kv_batch_get + logger.info("Starting GET_DATA operation (kv_batch_get)...") + start_get_data = time.perf_counter() + ray.get(self.reader.get_data.remote(partition_id=partition_id, keys=keys)) + end_get_data = time.perf_counter() + get_time = end_get_data - start_get_data + get_gbit_per_sec = (self.total_data_size_gb * 8) / get_time + get_gbyte_per_sec = self.total_data_size_gb / get_time + + time.sleep(2) + + # DELETE operation using kv_clear + logger.info("Starting DELETE operation (kv_clear)...") + ray.get(self.writer.delete.remote(partition_id=partition_id, keys=keys)) + + # Print summary + total_gbit_per_sec = (self.total_data_size_gb * 16) / (put_time + get_time) + total_gbyte_per_sec = (self.total_data_size_gb * 2) / (put_time + get_time) + + logger.info("=" * 60) + logger.info("THROUGHPUT TEST SUMMARY") + logger.info("=" * 60) + logger.info(f"Backend: {self.backend}") + logger.info(f"Device: {self.device}") + logger.info(f"Total Data Size: {self.total_data_size_gb:.6f} GB") + logger.info(f"PUT Time: {put_time:.8f}s") + logger.info(f"GET Time: {get_time:.8f}s") + logger.info(f"PUT Throughput: {put_gbit_per_sec:.8f} Gb/s ({put_gbyte_per_sec:.8f} GB/s)") + logger.info(f"GET Throughput: {get_gbit_per_sec:.8f} Gb/s ({get_gbyte_per_sec:.8f} GB/s)") + logger.info(f"Total Throughput: {total_gbit_per_sec:.8f} Gb/s ({total_gbyte_per_sec:.8f} GB/s)") + logger.info("=" * 60) + + # Return results (only Gb/s for CSV, not GB/s) + return { + "backend": self.backend, + "device": self.device, + "total_data_size_gb": self.total_data_size_gb, + "put_time": put_time, + "get_time": get_time, + "put_gbit_per_sec": put_gbit_per_sec, + "get_gbit_per_sec": get_gbit_per_sec, + "total_gbit_per_sec": total_gbit_per_sec, + } + + def close(self) -> None: + """Close the transfer_queue clients.""" + ray.get([self.writer.close.remote(), self.reader.close.remote()]) + + +def write_results_to_csv(results: list[dict[str, Any]], output_path: str) -> None: + """Write test results to CSV file. + + Args: + results: List of result dictionaries + output_path: Path to output CSV file + """ + if not results: + return + + fieldnames = list(results[0].keys()) + + with open(output_path, "w", newline="") as csvfile: + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + writer.writeheader() + for result in results: + writer.writerow(result) + + logger.info(f"Results written to {output_path}") + + +def main() -> None: + """Main entry point for the perftest script.""" + parser = argparse.ArgumentParser(description="TransferQueue Throughput Test") + parser.add_argument( + "--backend_config", + type=str, + required=True, + help="Path to backend config YAML file", + ) + parser.add_argument( + "--backend", + type=str, + default=None, + help="Override storage_backend in config (e.g. SimpleStorage, Yuanrong, MooncakeStore)", + ) + parser.add_argument( + "--device", + type=str, + default="cpu", + choices=["cpu", "npu", "gpu"], + help="Device to use (default: cpu)", + ) + parser.add_argument( + "--global_batch_size", + type=int, + default=1024, + help="Global batch size (default: 1024)", + ) + parser.add_argument( + "--field_num", + type=int, + default=10, + help="Number of fields (default: 10)", + ) + parser.add_argument( + "--seq_len", + type=int, + default=8192, + help="Sequence length (default: 8192)", + ) + parser.add_argument( + "--num_test_iterations", + type=int, + default=4, + help="Number of test iterations (default: 4)", + ) + parser.add_argument( + "--head_node_ip", + type=str, + required=True, + help="Head node IP address", + ) + parser.add_argument( + "--worker_node_ip", + type=str, + default=None, + help="Worker node IP address (required for Yuanrong)", + ) + parser.add_argument( + "--output_csv", + type=str, + default=None, + help="Path to output CSV file (optional)", + ) + parser.add_argument( + "--use_complex_case", + action="store_true", + default=False, + help="Use complex test case with nested tensors and nontensor fields (default: False, simple case)", + ) + + args = parser.parse_args() + + # Create and run tester + tester = TQThroughputTester( + backend_config_path=args.backend_config, + device=args.device, + global_batch_size=args.global_batch_size, + field_num=args.field_num, + seq_len=args.seq_len, + num_test_iterations=args.num_test_iterations, + head_node_ip=args.head_node_ip, + backend=args.backend, + worker_node_ip=args.worker_node_ip, + output_csv=args.output_csv, + use_complex_case=args.use_complex_case, + ) + + # Run test multiple times for consistent results using a for loop + all_results = [] + for i in range(args.num_test_iterations): + logger.info("-" * 60) + logger.info(f"Iteration {i + 1}/{args.num_test_iterations}") + logger.info("-" * 60) + result = tester.run_throughput_test(skip_dataset_create=(i != 0)) + all_results.append(result) + time.sleep(10) + + # Write to CSV if output path is specified + if args.output_csv: + write_results_to_csv(all_results, args.output_csv) + + # Close transfer_queue + tester.close() + + logger.info("Throughput test completed successfully!") + + +if __name__ == "__main__": + main() diff --git a/scripts/performance_test/perftest_config.yaml b/scripts/performance_test/perftest_config.yaml new file mode 100644 index 0000000..39538bc --- /dev/null +++ b/scripts/performance_test/perftest_config.yaml @@ -0,0 +1,54 @@ +# This is the default configuration of TransferQueue. Users may modify the default value +# and use transfer_queue.init(conf) to overwrite the config entries. + +controller: + # User-defined sampler. User can pass sampler instance to overwrite this string config. + sampler: SequentialSampler + # Whether return an empty BatchMeta to prevent request blocking when no enough data is available + polling_mode: False + # ZMQ Server IP & Ports (automatically generated during init) + zmq_info: null + + +backend: + # Pluggable storage/transport backend of TransferQueue. Choose from: + # SimpleStorage, Yuanrong, MooncakeStore, ... + storage_backend: SimpleStorage + + # For SimpleStorage: + SimpleStorage: + # Total number of samples + total_storage_size: 100000 + # Number of distributed storage units for SimpleStorage backend + num_data_storage_units: 16 + # ZMQ Server IP & Ports (automatically generated during init) + zmq_info: null + + # For MooncakeStore: + MooncakeStore: + # Whether to let TQ automatically init metadata_server. + auto_init: true + # Address of the HTTP metadata server + metadata_server: localhost:50050 + # Address of master server + master_server_address: localhost:50051 + # Address of local host. Set to "" to use Ray IP as local host address + local_hostname: "" + # Protocol for transmission. Choose from: tcp, rdma. (default: rdma) + protocol: rdma + # Memory segment size in bytes for mounting + global_segment_size: 86294967296 + # Local buffer size in bytes + local_buffer_size: 86294967296 + # Network device name. Set to "" to let Mooncake to auto-picks devices + device_name: "" + + # For RayStore: + RayStore: + + # For Yuanrong: + Yuanrong: + # Port of local yuanrong datasystem worker + port: 31501 + # If enable npu transport + enable_yr_npu_transport: true diff --git a/scripts/performance_test/ray_perftest_baseline.py b/scripts/performance_test/ray_perftest_baseline.py new file mode 100644 index 0000000..fe40788 --- /dev/null +++ b/scripts/performance_test/ray_perftest_baseline.py @@ -0,0 +1,451 @@ +#!/usr/bin/env python3 +# Copyright 2025 Huawei Technologies Co., Ltd. All Rights Reserved. +# Copyright 2025 The TransferQueue Team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import csv +import logging +import os +import sys +import time +from pathlib import Path +from typing import Any + +import ray +import torch +from tensordict import NonTensorStack, TensorDict + +parent_dir = Path(__file__).resolve().parent.parent.parent +sys.path.append(str(parent_dir)) + +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") +logger = logging.getLogger(__name__) + + +def create_test_case( + batch_size: int | None = None, + seq_length: int | None = None, + field_num: int | None = None, + device: str = "cpu", +) -> tuple[TensorDict, float]: + """Create a test case with only regular tensors. + + Creates TensorDict with: + - Regular tensors: (batch_size, seq_length) shape, each element is float32 + + Args: + batch_size: Batch size for the test case + seq_length: Maximum sequence length + field_num: Total number of fields to create + device: Device to create tensors on ("cpu", "npu", or "gpu") + + Returns: + Tuple of (TensorDict, total_size_gb) + """ + bytes_per_element = 4 # float32 + + # Each regular tensor field: batch_size * seq_length * 4 bytes + regular_field_size_bytes = batch_size * seq_length * bytes_per_element + regular_field_size_gb = regular_field_size_bytes / (1024**3) + + total_size_gb = regular_field_size_gb * field_num + + logger.info(f"Total data size: {total_size_gb:.6f} GB") + + # Determine torch device + torch_device = None + if device == "npu": + torch_device = "npu:0" + elif device == "gpu": + torch_device = "cuda:0" + + batch_size_tuple = (batch_size,) + + prompt_batch = TensorDict(batch_size=batch_size_tuple) + + for i in range(field_num): + field_name = f"field_{i}" + tensor_data = torch.randn(batch_size, seq_length, dtype=torch.float32, device=torch_device) + prompt_batch.set(field_name, tensor_data) + + return prompt_batch, total_size_gb + + +def create_complex_test_case( + batch_size: int | None = None, + seq_length: int | None = None, + field_num: int | None = None, + device: str = "cpu", +) -> tuple[TensorDict, float]: + """Create a test case with complex data formats. + + Creates TensorDict with: + - Regular tensors: (batch_size, seq_length) shape, each element is float32 + - Nested Tensors (non-NPU): variable-length sequences with lengths forming an + arithmetic progression from 1 to seq_length (average length ≈ seq_length/2) + - Nested Tensors (NPU): regular tensors of shape (batch_size, seq_length//2) + - NonTensorStack wrapped strings: each string size ~= seq_length * 4 bytes + (to match memory footprint of one tensor element) + + Args: + batch_size: Batch size for the test case + seq_length: Maximum sequence length (used for regular tensors and + as upper bound for nested tensor lengths) + field_num: Total number of fields to create (distributed across types) + device: Device to create tensors on ("cpu", "npu", or "gpu") + + Returns: + Tuple of (TensorDict, total_size_gb) + """ + bytes_per_element = 4 # float32 + + # Calculate field distribution (1/3 each type, last fields may be regular) + num_regular_fields = (field_num + 2) // 3 + num_nested_fields = (field_num + 2) // 3 + num_nontensor_fields = field_num - num_regular_fields - num_nested_fields + + # Each regular tensor field: batch_size * seq_length * 4 bytes + regular_field_size_bytes = batch_size * seq_length * bytes_per_element + regular_field_size_gb = regular_field_size_bytes / (1024**3) + + # Nested tensor field: average length = (1 + seq_length) / 2 (arithmetic progression), + # so avg size = batch_size * (1 + seq_length) / 2 * 4 bytes + # For NPU, nested fields become regular tensors of seq_length // 2 + if device == "npu": + avg_nested_length = seq_length // 2 + nested_field_size_bytes = int(batch_size * avg_nested_length * bytes_per_element) + else: + avg_nested_length = (1 + seq_length) / 2 + nested_field_size_bytes = int(batch_size * avg_nested_length * bytes_per_element) + nested_field_size_gb = nested_field_size_bytes / (1024**3) + + # NonTensorStack string field: each string ~= seq_length * 4 bytes to match one tensor element + # Total for field: batch_size strings * seq_length * 4 bytes each + string_size_per_elem = seq_length * bytes_per_element + nontensor_field_size_bytes = batch_size * string_size_per_elem + nontensor_field_size_gb = nontensor_field_size_bytes / (1024**3) + + # Total size = sum of all field types + total_size_gb = ( + regular_field_size_gb * num_regular_fields + + nested_field_size_gb * num_nested_fields + + nontensor_field_size_gb * num_nontensor_fields + ) + + logger.info(f"Total data size: {total_size_gb:.6f} GB") + + # Determine torch device + torch_device = None + if device == "npu": + torch_device = "npu:0" + elif device == "gpu": + torch_device = "cuda:0" + + # Set seeds for reproducibility (within this process) + # For non-NPU: arithmetic progression lengths from 1 to seq_length for each nested field + # For NPU: nested fields become regular tensors of seq_length // 2 + + batch_size_tuple = (batch_size,) + + prompt_batch = TensorDict(batch_size=batch_size_tuple) + + # 1. Regular tensor fields + for i in range(num_regular_fields): + field_name = f"field_{i}" + tensor_data = torch.randn(batch_size, seq_length, dtype=torch.float32, device=torch_device) + prompt_batch.set(field_name, tensor_data) + + # 2. Nested Tensor fields (variable-length sequences) or regular tensors for NPU + if device != "npu": + step = (seq_length - 1) / (batch_size - 1) if batch_size > 1 else 0 + lengths = [max(1, min(int(round(1 + j * step)), seq_length)) for j in range(batch_size)] + total_elements = sum(lengths) + + for i in range(num_nested_fields): + field_name = f"nested_field_{i}" + + if device == "npu": + # For NPU: create a regular tensor of seq_length // 2 + tensor_data = torch.randn(batch_size, seq_length // 2, dtype=torch.float32, device=torch_device) + prompt_batch.set(field_name, tensor_data) + else: + flat_data = torch.randn(total_elements, dtype=torch.float32, device=torch_device) + nested_tuple = torch.split(flat_data, lengths) + nested_tensor = torch.nested.as_nested_tensor(nested_tuple, layout=torch.jagged) + prompt_batch.set(field_name, nested_tensor) + + # 3. NonTensorStack wrapped strings + # Each string ~= seq_length * 4 bytes to match one tensor element's memory footprint + string_char_count = seq_length * bytes_per_element # 4 bytes per char (unicode) + + for i in range(num_nontensor_fields): + field_name = f"nontensor_field_{i}" + bytes_needed = string_char_count // 2 + string_data = [os.urandom(bytes_needed).hex() for _ in range(batch_size)] + + prompt_batch.set(field_name, NonTensorStack.from_list(string_data)) + + return prompt_batch, total_size_gb + + +@ray.remote +class RemoteDataStore: + """Ray remote actor that stores and retrieves data directly (without ray.put).""" + + def __init__(self): + self.stored_data = None + + def put_data(self, data: TensorDict) -> None: + self.stored_data = data + + def get_data(self) -> TensorDict: + return self.stored_data + + def clear_data(self) -> None: + self.stored_data = None + + +class RayBaselineTester: + """Ray baseline throughput tester - measures raw Ray data transfer performance.""" + + def __init__( + self, + global_batch_size: int, + field_num: int, + seq_len: int, + num_test_iterations: int, + head_node_ip: str, + worker_node_ip: str | None = None, + output_csv: str | None = None, + use_complex_case: bool = False, + ): + """Initialize the Ray baseline tester. + + Args: + global_batch_size: Global batch size + field_num: Number of fields + seq_len: Sequence length + num_test_iterations: Number of test iterations + head_node_ip: Head node IP address + worker_node_ip: Worker node IP address + output_csv: Path to output CSV file (optional) + use_complex_case: Whether to use complex test case (nested + nontensor fields) + """ + self.global_batch_size = global_batch_size + self.field_num = field_num + self.seq_len = seq_len + self.num_test_iterations = num_test_iterations + self.head_node_ip = head_node_ip + self.worker_node_ip = worker_node_ip + self.output_csv = output_csv + self.use_complex_case = use_complex_case + + # Initialize remote store on worker node + self._initialize_remote_store() + + def _initialize_remote_store(self) -> None: + """Initialize the RemoteDataStore actor on worker node.""" + writer_node = self.head_node_ip + reader_node = self.worker_node_ip if self.worker_node_ip else self.head_node_ip + + logger.info(f"Writer is on {writer_node}, Reader is on {reader_node}") + + self.remote_store = RemoteDataStore.options( + num_cpus=0.001, + resources={f"node:{reader_node}": 0.001}, + ).remote() + + logger.info(f"RemoteDataStore created on {reader_node}") + + def run_throughput_test(self, skip_dataset_create=False) -> dict[str, Any]: + """Run the throughput test and print results. + + Returns: + Dictionary with test results + """ + # Create test data + if not skip_dataset_create: + logger.info("Creating large batch for throughput test...") + start_create_data = time.perf_counter() + if self.use_complex_case: + self.test_data, self.total_data_size_gb = create_complex_test_case( + batch_size=self.global_batch_size, + seq_length=self.seq_len, + field_num=self.field_num, + device="cpu", + ) + else: + self.test_data, self.total_data_size_gb = create_test_case( + batch_size=self.global_batch_size, + seq_length=self.seq_len, + field_num=self.field_num, + device="cpu", + ) + end_create_data = time.perf_counter() + logger.info(f"Data creation time: {end_create_data - start_create_data:.8f}s") + + # PUT operation - pass data directly to remote actor + logger.info("Starting PUT operation...") + start_put = time.perf_counter() + ray.get(self.remote_store.put_data.remote(self.test_data)) + end_put = time.perf_counter() + put_time = end_put - start_put + put_gbit_per_sec = (self.total_data_size_gb * 8) / put_time + + time.sleep(2) + + # GET operation - retrieve data from remote actor + logger.info("Starting GET operation...") + start_get = time.perf_counter() + _ = ray.get(self.remote_store.get_data.remote()) + end_get = time.perf_counter() + get_time = end_get - start_get + get_gbit_per_sec = (self.total_data_size_gb * 8) / get_time + + # Clear data + ray.get(self.remote_store.clear_data.remote()) + + # Calculate total throughput + total_gbit_per_sec = (self.total_data_size_gb * 16) / (put_time + get_time) + + # Print summary + logger.info("=" * 60) + logger.info("RAY BASELINE THROUGHPUT TEST SUMMARY") + logger.info("=" * 60) + logger.info(f"Total Data Size: {self.total_data_size_gb:.6f} GB") + logger.info(f"PUT Time: {put_time:.8f}s") + logger.info(f"GET Time: {get_time:.8f}s") + logger.info(f"PUT Throughput: {put_gbit_per_sec:.8f} Gb/s") + logger.info(f"GET Throughput: {get_gbit_per_sec:.8f} Gb/s") + logger.info(f"Total Throughput (round-trip): {total_gbit_per_sec:.8f} Gb/s") + logger.info("=" * 60) + + return { + "backend": "RayBaseline", + "device": "cpu", + "total_data_size_gb": self.total_data_size_gb, + "put_time": put_time, + "get_time": get_time, + "put_gbit_per_sec": put_gbit_per_sec, + "get_gbit_per_sec": get_gbit_per_sec, + "total_gbit_per_sec": total_gbit_per_sec, + } + + +def write_results_to_csv(results: list[dict[str, Any]], output_path: str) -> None: + """Write test results to CSV file. + + Args: + results: List of result dictionaries + output_path: Path to output CSV file + """ + if not results: + return + + fieldnames = list(results[0].keys()) + + with open(output_path, "w", newline="") as csvfile: + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + writer.writeheader() + for result in results: + writer.writerow(result) + + logger.info(f"Results written to {output_path}") + + +def main() -> None: + """Main entry point for the Ray baseline perftest script.""" + parser = argparse.ArgumentParser(description="Ray Baseline Throughput Test") + parser.add_argument( + "--global_batch_size", + type=int, + default=1024, + help="Global batch size (default: 1024)", + ) + parser.add_argument( + "--field_num", + type=int, + default=10, + help="Number of fields (default: 10)", + ) + parser.add_argument( + "--seq_len", + type=int, + default=8192, + help="Sequence length (default: 8192)", + ) + parser.add_argument( + "--num_test_iterations", + type=int, + default=4, + help="Number of test iterations (default: 4)", + ) + parser.add_argument( + "--head_node_ip", + type=str, + required=True, + help="Head node IP address", + ) + parser.add_argument( + "--worker_node_ip", + type=str, + default=None, + help="Worker node IP address (optional)", + ) + parser.add_argument( + "--output_csv", + type=str, + default=None, + help="Path to output CSV file (optional)", + ) + parser.add_argument( + "--use_complex_case", + action="store_true", + default=False, + help="Use complex test case with nested tensors and nontensor fields (default: False, simple case)", + ) + + args = parser.parse_args() + + # Create and run tester + tester = RayBaselineTester( + global_batch_size=args.global_batch_size, + field_num=args.field_num, + seq_len=args.seq_len, + num_test_iterations=args.num_test_iterations, + head_node_ip=args.head_node_ip, + worker_node_ip=args.worker_node_ip, + output_csv=args.output_csv, + use_complex_case=args.use_complex_case, + ) + + # Run test multiple times + all_results = [] + for i in range(args.num_test_iterations): + logger.info("-" * 60) + logger.info(f"Iteration {i + 1}/{args.num_test_iterations}") + logger.info("-" * 60) + result = tester.run_throughput_test(skip_dataset_create=(i != 0)) + all_results.append(result) + + # Write to CSV if output path is specified + if args.output_csv: + write_results_to_csv(all_results, args.output_csv) + + logger.info("Ray baseline throughput test completed successfully!") + + +if __name__ == "__main__": + main() diff --git a/scripts/performance_test/run_perf_test.sh b/scripts/performance_test/run_perf_test.sh new file mode 100755 index 0000000..19aa478 --- /dev/null +++ b/scripts/performance_test/run_perf_test.sh @@ -0,0 +1,84 @@ +#!/bin/bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +RESULTS_DIR="${SCRIPT_DIR}/results" +PERFTEST_PY="${SCRIPT_DIR}/perftest.py" +RAY_PERFTEST_PY="${SCRIPT_DIR}/ray_perftest_baseline.py" +CONFIG_YAML="${SCRIPT_DIR}/perftest_config.yaml" + +mkdir -p "${RESULTS_DIR}" + +# ========== User Configuration ========== +# Modify these based on your environment +HEAD_NODE_IP="${HEAD_NODE_IP:-127.0.0.1}" +WORKER_NODE_IP="${WORKER_NODE_IP:-127.0.0.1}" +DEVICE="${DEVICE:-cpu}" +NUM_TEST_ITERATIONS="${NUM_TEST_ITERATIONS:-4}" +USE_COMPLEX_CASE="${USE_COMPLEX_CASE:-false}" +# ======================================== + +# Backends to test (passed via --backend to perftest.py) +BACKENDS=("SimpleStorage" "Yuanrong" "MooncakeStore") + +# Test settings: global_batch_size, field_num, seq_len, name +declare -a SETTINGS=( + "1024,9,8192,Small" + "4096,15,32768,Medium" + "8192,18,100000,Large" +) + +# Complex case flag +if [[ "${USE_COMPLEX_CASE}" == "true" ]]; then + COMPLEX_FLAG="--use_complex_case" +else + COMPLEX_FLAG="" +fi + +# ---- TransferQueue perftest ---- +for backend in "${BACKENDS[@]}"; do + echo "==========================================" + echo "Testing backend: ${backend}" + echo "==========================================" + + for setting in "${SETTINGS[@]}"; do + IFS=',' read -r batch_size field_num seq_len name <<< "$setting" + output_csv="${RESULTS_DIR}/${backend,,}_${name,,}.csv" + + echo " Setting: ${name} (batch=${batch_size}, fields=${field_num}, seq=${seq_len})" + + python "${PERFTEST_PY}" --backend_config="${CONFIG_YAML}" --backend="${backend}" \ + --device="${DEVICE}" \ + --global_batch_size="${batch_size}" --field_num="${field_num}" --seq_len="${seq_len}" \ + --num_test_iterations="${NUM_TEST_ITERATIONS}" \ + --head_node_ip="${HEAD_NODE_IP}" --worker_node_ip="${WORKER_NODE_IP}" \ + --output_csv="${output_csv}" \ + ${COMPLEX_FLAG} + + sleep 10 + done +done + +# ---- Ray baseline ---- +echo "==========================================" +echo "Testing backend: Ray (baseline)" +echo "==========================================" +for setting in "${SETTINGS[@]}"; do + IFS=',' read -r batch_size field_num seq_len name <<< "$setting" + output_csv="${RESULTS_DIR}/ray_baseline_${name,,}.csv" + + echo " Setting: ${name} (batch=${batch_size}, fields=${field_num}, seq=${seq_len})" + + python "${RAY_PERFTEST_PY}" \ + --global_batch_size="${batch_size}" --field_num="${field_num}" --seq_len="${seq_len}" \ + --num_test_iterations="${NUM_TEST_ITERATIONS}" \ + --head_node_ip="${HEAD_NODE_IP}" --worker_node_ip="${WORKER_NODE_IP}" \ + --output_csv="${output_csv}" \ + ${COMPLEX_FLAG} +done + +# ---- Draw figures ---- +python "${SCRIPT_DIR}/draw_figure.py" + +echo "" +echo "All tests completed!" \ No newline at end of file diff --git a/tests/test_yuanrong_client_zero_copy.py b/tests/test_yuanrong_client_zero_copy.py index b93fd32..423b1c7 100644 --- a/tests/test_yuanrong_client_zero_copy.py +++ b/tests/test_yuanrong_client_zero_copy.py @@ -47,6 +47,7 @@ def mock_kv_client(self, mocker): mocker.patch("yr.datasystem.KVClient", return_value=mock_client) mocker.patch("yr.datasystem.DsTensorClient") + mocker.patch("transfer_queue.storage.clients.yuanrong_client.find_reachable_host", return_value="127.0.0.1") return mock_client diff --git a/tests/test_yuanrong_storage_client_e2e.py b/tests/test_yuanrong_storage_client_e2e.py index 519335f..3cb1f99 100644 --- a/tests/test_yuanrong_storage_client_e2e.py +++ b/tests/test_yuanrong_storage_client_e2e.py @@ -38,18 +38,18 @@ def __init__(self, host, port, device_id): def init(self): pass - def dev_mset(self, keys, values): + def mset_d2h(self, keys, values): for k, v in zip(keys, values, strict=True): assert v.device.type == "npu" self.storage[k] = v - def dev_mget(self, keys, out_tensors): + def mget_h2d(self, keys, out_tensors): for i, k in enumerate(keys): # Note: If key is missing, tensor remains unchanged (mock limitation) if k in self.storage: out_tensors[i].copy_(self.storage[k]) - def dev_delete(self, keys): + def delete(self, keys): for k in keys: self.storage.pop(k, None) @@ -108,10 +108,17 @@ def mock_yr_datasystem(): # - sys.modules: Redirects 'import yr' to our mocks # - YUANRONG_DATASYSTEM_IMPORTED: Forces the existence check to True so initialize the client successfully # - datasystem: Direct attribute patch for the module + # - find_reachable_host: Mock host detection to avoid real network checks + def mock_find_reachable_host(port, timeout=1.0): + return "127.0.0.1" + with ( mock.patch.dict("sys.modules", {"yr": yr_mock, "yr.datasystem": ds_mock}), mock.patch("transfer_queue.storage.clients.yuanrong_client.YUANRONG_DATASYSTEM_IMPORTED", True, create=True), mock.patch("transfer_queue.storage.clients.yuanrong_client.datasystem", ds_mock), + mock.patch( + "transfer_queue.storage.clients.yuanrong_client.find_reachable_host", side_effect=mock_find_reachable_host + ), ): yield diff --git a/transfer_queue/config.yaml b/transfer_queue/config.yaml index 98819ed..433c026 100644 --- a/transfer_queue/config.yaml +++ b/transfer_queue/config.yaml @@ -47,4 +47,8 @@ backend: RayStore: # For Yuanrong: - # TODO \ No newline at end of file + Yuanrong: + # Port of local yuanrong datasystem worker + port: 31501 + # If enable npu transport + enable_yr_npu_transport: false diff --git a/transfer_queue/storage/clients/yuanrong_client.py b/transfer_queue/storage/clients/yuanrong_client.py index 41219c2..77a981e 100644 --- a/transfer_queue/storage/clients/yuanrong_client.py +++ b/transfer_queue/storage/clients/yuanrong_client.py @@ -15,6 +15,7 @@ import logging import os +import socket import struct from abc import ABC, abstractmethod from concurrent.futures import ThreadPoolExecutor @@ -30,6 +31,105 @@ logger = logging.getLogger(__name__) logger.setLevel(os.getenv("TQ_LOGGING_LEVEL", logging.WARNING)) + +def get_local_ip_addresses() -> list[str]: + """Get all local IP addresses including 127.0.0.1. + + Returns: + List of local IP addresses, with 127.0.0.1 first. + """ + ips = ["127.0.0.1"] + + try: + hostname = socket.gethostname() + # Add hostname resolution + try: + host_ip = socket.gethostbyname(hostname) + if host_ip not in ips: + ips.append(host_ip) + except socket.gaierror: + pass + + # Get all network interfaces + import netifaces + + for interface in netifaces.interfaces(): + try: + addrs = netifaces.ifaddresses(interface) + if netifaces.AF_INET in addrs: + for addr_info in addrs[netifaces.AF_INET]: + ip = addr_info.get("addr") + if ip and ip not in ips: + ips.append(ip) + except (ValueError, KeyError): + continue + except ImportError: + # Fallback if netifaces is not available + try: + # Try to get IP by connecting to an external address + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + # Doesn't need to be reachable + s.connect(("8.8.8.8", 80)) + ip = s.getsockname()[0] + if ip not in ips: + ips.append(ip) + except Exception: + pass + finally: + s.close() + except Exception: + pass + + return ips + + +def check_port_connectivity(host: str, port: int, timeout: float = 2.0) -> bool: + """Check if a TCP port is reachable on the given host. + + Args: + host: Host IP address to check + port: Port number to check + timeout: Connection timeout in seconds + + Returns: + True if the port is reachable, False otherwise + """ + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(timeout) + result = sock.connect_ex((host, port)) + sock.close() + return result == 0 + except Exception: + return False + + +def find_reachable_host(port: int, timeout: float = 1.0) -> Optional[str]: + """Find a reachable local host IP address for the given port. + + Tries all local IP addresses in order and returns the first one + that has the given port open. + + Args: + port: Port number to check + timeout: Connection timeout in seconds per check + + Returns: + The first reachable host IP address, or None if none found. + """ + local_ips = get_local_ip_addresses() + logger.info(f"Checking port {port} on local IPs: {local_ips}") + + for ip in local_ips: + if check_port_connectivity(ip, port, timeout): + logger.info(f"Found reachable host: {ip}:{port}") + return ip + + logger.warning(f"No reachable host found for port {port}") + return None + + YUANRONG_DATASYSTEM_IMPORTED: bool = True try: @@ -83,9 +183,20 @@ class NPUTensorKVClientAdapter(StorageStrategy): KEYS_LIMIT: int = 10_000 def __init__(self, config: dict): - host = config.get("host") port = config.get("port") + if port is None or not isinstance(port, int): + raise ValueError("Missing or invalid 'port' in config") + + logger.info(f"Auto-detecting reachable host for Yuanrong port {port}...") + host = find_reachable_host(port) + if host is None: + raise ValueError( + f"Could not find any reachable host for Yuanrong port {port}. " + "Please ensure yuanrong datasystem is running." + ) + logger.info(f"Using auto-detected host: {host}") + self.device_id = torch.npu.current_device() torch.npu.set_device(self.device_id) @@ -123,12 +234,12 @@ def put(self, keys: list[str], values: list[Any]): for i in range(0, len(keys), self.KEYS_LIMIT): batch_keys = keys[i : i + self.KEYS_LIMIT] batch_values = values[i : i + self.KEYS_LIMIT] - # _npu_ds_client.dev_mset doesn't support to overwrite + # mset_d2h cannot overwrite existing keys try: - self._ds_client.dev_delete(batch_keys) + self._ds_client.delete(batch_keys) except Exception: pass - self._ds_client.dev_mset(batch_keys, batch_values) + self._ds_client.mset_d2h(batch_keys, batch_values) def supports_get(self, strategy_tag: str) -> bool: """Matches 'DsTensorClient' Strategy tag.""" @@ -147,8 +258,8 @@ def get(self, keys: list[str], **kwargs) -> list[Optional[Any]]: batch_dtypes = dtypes[i : i + self.KEYS_LIMIT] batch_values = self._create_empty_npu_tensorlist(batch_shapes, batch_dtypes) - self._ds_client.dev_mget(batch_keys, batch_values) - # Todo(dpj): consider checking and logging keys that fail during dev_mget + self._ds_client.mget_h2d(batch_keys, batch_values) + # Todo(dpj): consider checking and logging keys that fail during mget_h2d results.extend(batch_values) return results @@ -161,7 +272,7 @@ def clear(self, keys: list[str]): for i in range(0, len(keys), self.KEYS_LIMIT): batch = keys[i : i + self.KEYS_LIMIT] # Todo(dpj): Test call clear when no (key,value) put in ds - self._ds_client.dev_delete(batch) + self._ds_client.delete(batch) def _create_empty_npu_tensorlist(self, shapes, dtypes): """ @@ -199,9 +310,20 @@ class GeneralKVClientAdapter(StorageStrategy): DS_MAX_WORKERS: int = 16 def __init__(self, config: dict): - host = config.get("host") port = config.get("port") + if port is None or not isinstance(port, int): + raise ValueError("Missing or invalid 'port' in config") + + logger.info(f"Auto-detecting reachable host for Yuanrong port {port}...") + host = find_reachable_host(port) + if host is None: + raise ValueError( + f"Could not find any reachable host for Yuanrong port {port}. " + "Please ensure yuanrong datasystem is running." + ) + logger.info(f"Using auto-detected host: {host}") + self._ds_client = datasystem.KVClient(host, port) self._ds_client.init() logger.info("YuanrongStorageClient: Create KVClient to connect with yuanrong-datasystem backend!") @@ -357,6 +479,11 @@ def __init__(self, config: dict[str, Any]): if not YUANRONG_DATASYSTEM_IMPORTED: raise ImportError("YuanRong DataSystem not installed.") + port = config.get("port") + + if port is None or not isinstance(port, int): + raise ValueError("Missing or invalid 'port' in config") + super().__init__(config) # Storage strategies are prioritized in ascending order of list element index. diff --git a/transfer_queue/storage/managers/yuanrong_manager.py b/transfer_queue/storage/managers/yuanrong_manager.py index 54ac094..d527040 100644 --- a/transfer_queue/storage/managers/yuanrong_manager.py +++ b/transfer_queue/storage/managers/yuanrong_manager.py @@ -36,14 +36,12 @@ class YuanrongStorageManager(KVStorageManager): """Storage manager for Yuanrong backend.""" def __init__(self, controller_info: ZMQServerInfo, config: dict[str, Any]): - host = config.get("host", None) port = config.get("port", None) client_name = config.get("client_name", None) - if host is None or not isinstance(host, str): - raise ValueError("Missing or invalid 'host' in config") if port is None or not isinstance(port, int): raise ValueError("Missing or invalid 'port' in config") + if client_name is None: logger.info("Missing 'client_name' in config, using default value('YuanrongStorageClient')") config["client_name"] = "YuanrongStorageClient"