Skip to content

[IO] Add TCP backend and benchmark/test coverage#177

Open
maning00 wants to merge 12 commits intomainfrom
feature/tcp-backend
Open

[IO] Add TCP backend and benchmark/test coverage#177
maning00 wants to merge 12 commits intomainfrom
feature/tcp-backend

Conversation

@maning00
Copy link
Copy Markdown
Contributor

@maning00 maning00 commented Mar 2, 2026

Motivation

This PR introduces an usable TCP backend for MORI-IO so engine communication can run over standard TCP networking environments.
It also improves throughput scalability for large transfers by enabling multi-connection striping and adds coverage in both benchmark and Python tests.

Technical Details

  • Add TCP backend implementation and integration points:
    • src/io/tcp/transport.cpp
    • src/io/tcp/transport.hpp
    • src/io/tcp/backend_impl.cpp
    • src/io/tcp/backend_impl.hpp
  • Wire TCP backend into engine and public interfaces:
    • include/mori/io/backend.hpp
    • src/io/engine.cpp
    • src/pybind/mori.cpp
    • python/mori/io/__init__.py
    • python/mori/io/engine.py
  • Add validation and usability updates:
    • benchmark TCP option and benchmark-side improvements in tests/python/io/benchmark.py
    • basic TCP engine tests in tests/python/io/test_engine.py
    • build integration updates in src/io/CMakeLists.txt

Test Plan

  • Run TCP benchmark in initiator/target setup with batching enabled.
  • Run Python IO tests including TCP engine test cases.

Test Result

Benchmark/UT completed successfully.

[2026-03-02 03:48:06.320] [44761] [io] [info] Set MORI-IO log level to info
MORI-IO Benchmark Configurations:
  backend: TCP
  op_type: write
  host: 10.24.16.111
  port: 40149
  node_rank: 0
  role: EngineRole.INITIATOR
  role_rank: 0
  num_initiator_dev: 1
  num_target_dev: 1
  num_qp_per_transfer: 1
  num_worker_threads: 1
  poll_cq_mode: PollCqMode.POLLING
  buffer_size: 1048576 B
  transfer_batch_size: 256
  enable_batch_transfer: True
  enable_sess: True
  iters: 128

[Gloo] Rank 0 is connected to 1 peer ranks. Expected number of connected peer ranks is : 1
[2026-03-02 03:48:07.487] [44761] [io] [info] Create engine key INITIATOR-0 node_id mia1-p01-g03 hostname mia1-p01-g03
[2026-03-02 03:48:07.487] [44761] [io] [info] TCP: listen on 10.24.16.111:40149 (port=40149)
[2026-03-02 03:48:07.487] [44761] [io] [info] TcpBackend created key=INITIATOR-0
[2026-03-02 03:48:07.487] [44761] [io] [info] Create backend type 3
[2026-03-02 03:48:07.487] [44761] [io] [info] Register remote engine TARGET-0 node_id mia1-p01-g04 hostname mia1-p01-g04
+--------------------------------------------------------------------------------------------------------+
|                                    TCP Benchmark: Initiator Rank 0                                     |
+-------------+-----------+----------------+---------------+---------------+--------------+--------------+
| MsgSize (B) | BatchSize | TotalSize (MB) | Max BW (GB/s) | Avg BW (GB/s) | Min Lat (us) | Avg Lat (us) |
+-------------+-----------+----------------+---------------+---------------+--------------+--------------+
|      8      |    256    |      0.00      |      0.01     |      0.01     |    202.89    |    214.65    |
|      16     |    256    |      0.00      |      0.03     |      0.02     |    140.67    |    210.58    |
|      32     |    256    |      0.01      |      0.06     |      0.06     |    135.90    |    147.20    |
|      64     |    256    |      0.02      |      0.12     |      0.11     |    138.28    |    146.57    |
|     128     |    256    |      0.03      |      0.34     |      0.24     |    95.37     |    137.73    |
|     256     |    256    |      0.07      |      0.55     |      0.48     |    118.73    |    136.74    |
|     512     |    256    |      0.13      |      1.07     |      0.92     |    122.79    |    141.78    |
|     1024    |    256    |      0.26      |      1.03     |      0.95     |    254.15    |    276.45    |
|     2048    |    256    |      0.52      |      1.88     |      1.58     |    278.47    |    331.33    |
|     4096    |    256    |      1.05      |      3.99     |      3.14     |    262.98    |    333.54    |
|     8192    |    256    |      2.10      |      5.09     |      4.34     |    411.99    |    483.70    |
|    16384    |    256    |      4.19      |      6.67     |      5.66     |    628.47    |    741.43    |
|    32768    |    256    |      8.39      |      8.65     |      6.74     |    969.65    |   1243.70    |
|    65536    |    256    |     16.78      |      8.88     |      6.98     |   1888.28    |   2404.48    |
|    131072   |    256    |     33.55      |     10.48     |      7.79     |   3200.77    |   4309.03    |
|    262144   |    256    |     67.11      |     11.60     |      7.36     |   5783.08    |   9112.76    |
|    524288   |    256    |     134.22     |     10.50     |      7.67     |   12779.47   |   17510.23   |
|   1048576   |    256    |     268.44     |     11.86     |      8.54     |   22641.66   |   31428.41   |
+-------------+-----------+----------------+---------------+---------------+--------------+--------------+

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a new TCP-based MORI-IO backend to enable engine communication over standard TCP networks (including multi-connection striping), and wires it through the C++ engine + Python bindings with benchmark and test coverage.

Changes:

  • Implement TCP transport/backend and integrate it into the IO engine build and backend factory.
  • Expose TCP backend configuration via pybind and Python convenience wrappers.
  • Extend Python benchmark and add TCP-focused engine tests.

Reviewed changes

Copilot reviewed 12 out of 12 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
src/io/tcp/transport.hpp Defines TCP wire protocol, connection/worker structures, and the transport interface.
src/io/tcp/transport.cpp Implements the TCP transport (epoll loop, control/data channels, striping, GPU staging).
src/io/tcp/backend_impl.hpp Declares TcpBackend / TcpBackendSession integration layer.
src/io/tcp/backend_impl.cpp Implements backend/session forwarding into the transport.
src/io/engine.cpp Adds BackendType::TCP creation path and ephemeral port propagation to EngineDesc.
src/io/CMakeLists.txt Adds TCP sources to the mori_io library build.
include/mori/io/backend.hpp Introduces TcpBackendConfig to the public C++ backend config API.
src/pybind/mori.cpp Exposes TcpBackendConfig to Python via pybind.
python/mori/io/engine.py Allows IOEngine.create_backend() to default-configure TCP when config=None.
python/mori/io/__init__.py Re-exports TcpBackendConfig in the Python package surface.
tests/python/io/benchmark.py Adds "tcp" backend option and routes setup/flow accordingly.
tests/python/io/test_engine.py Adds TCP engine tests (auto-bind port, CPU write/read, session batch write).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +362 to +371
bool DataConnectionWorker::RecvExact(uint8_t* dst, uint64_t len) {
uint64_t got = 0;
while (got < len) {
ssize_t n =
::recv(fd_, dst + got, static_cast<size_t>(std::min<uint64_t>(len - got, 16ULL << 20)), 0);
if (n < 0) {
if (IsWouldBlock(errno)) continue;
PostEvent({WorkerEventType::CONN_ERROR, peerKey_, 0, 0, 0, false, nullptr, nullptr,
std::string("recv payload failed: ") + strerror(errno)});
return false;
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataConnectionWorker::RecvExact (and the similar segment/discard helpers) busy-spins on EAGAIN/EWOULDBLOCK by immediately continuing the loop. With non-blocking sockets this can peg a CPU core when the peer is slow or the payload arrives in small chunks. Consider either using blocking sockets in the worker thread, or restructuring receive to return to poll() on EAGAIN and resume with saved progress.

Copilot uses AI. Check for mistakes.
Comment on lines +1354 to +1364
while (true) {
tcp::CtrlHeaderView hv;
if (!tcp::TryParseCtrlHeader(c->inbuf.data(), c->inbuf.size(), &hv)) {
if (c->inbuf.size() >= tcp::kCtrlHeaderSize) {
MORI_IO_ERROR("TCP: bad ctrl header fd {}", c->fd);
ClosePeerByFd(c->fd);
}
break;
}
if (c->inbuf.size() < tcp::kCtrlHeaderSize + hv.bodyLen) break;

Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HandleCtrlReadable trusts hv.bodyLen from the wire protocol and keeps appending to c->inbuf until the full body arrives. A malicious or buggy peer can advertise a huge bodyLen and cause unbounded memory growth (potential OOM/DoS). Consider enforcing a reasonable maximum ctrl frame/body size (and similarly bounding HELLO key length) and closing the connection when the limit is exceeded.

Copilot uses AI. Check for mistakes.
Comment on lines +763 to +768
py::arg("sock_sndbuf_bytes") = 4 * 1024 * 1024,
py::arg("sock_rcvbuf_bytes") = 4 * 1024 * 1024, py::arg("op_timeout_ms") = 30 * 1000,
py::arg("enable_keepalive") = true, py::arg("keepalive_idle_sec") = 30,
py::arg("keepalive_intvl_sec") = 10, py::arg("keepalive_cnt") = 3,
py::arg("enable_ctrl_nodelay") = true, py::arg("num_data_conns") = 8,
py::arg("striping_threshold_bytes") = 256 * 1024)
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Python binding constructor sets TcpBackendConfig defaults (e.g., 4MiB socket buffers and 256KiB striping threshold) that differ from the C++ struct member defaults (32MiB and 64KiB). This means TcpBackendConfig() behaves differently in C++ vs Python, which is surprising for API consumers. Consider aligning the pybind defaults with the C++ defaults, or exposing py::init<>() (default ctor) so Python inherits the struct’s in-class initializers.

Suggested change
py::arg("sock_sndbuf_bytes") = 4 * 1024 * 1024,
py::arg("sock_rcvbuf_bytes") = 4 * 1024 * 1024, py::arg("op_timeout_ms") = 30 * 1000,
py::arg("enable_keepalive") = true, py::arg("keepalive_idle_sec") = 30,
py::arg("keepalive_intvl_sec") = 10, py::arg("keepalive_cnt") = 3,
py::arg("enable_ctrl_nodelay") = true, py::arg("num_data_conns") = 8,
py::arg("striping_threshold_bytes") = 256 * 1024)
py::arg("sock_sndbuf_bytes") = 32 * 1024 * 1024,
py::arg("sock_rcvbuf_bytes") = 32 * 1024 * 1024, py::arg("op_timeout_ms") = 30 * 1000,
py::arg("enable_keepalive") = true, py::arg("keepalive_idle_sec") = 30,
py::arg("keepalive_intvl_sec") = 10, py::arg("keepalive_cnt") = 3,
py::arg("enable_ctrl_nodelay") = true, py::arg("num_data_conns") = 8,
py::arg("striping_threshold_bytes") = 64 * 1024)

Copilot uses AI. Check for mistakes.
Comment on lines +787 to +795
# Single read (b -> a)
dst.zero_()
uid = a.allocate_transfer_uid()
st = a.read(src_md, 0, dst_md, 0, src.numel() * src.element_size(), uid)
st.Wait()
assert st.Succeeded(), st.Message()
bst = _wait_inbound_status(b, a_desc.key, uid)
assert bst.Succeeded(), bst.Message()
assert torch.equal(src, dst)
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the TCP read test, a.read(src_md, ...) uses src_md as the local destination, so it overwrites src. That makes the later batch-write assertions compare against a mutated src (likely all zeros after the read), weakening the test coverage for non-zero batch writes. Consider using a separate local destination tensor for the read (or re-initializing src after the read) so the batch-write checks validate transferring the intended pattern.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants