[IO] Add TCP backend and benchmark/test coverage#177
Conversation
There was a problem hiding this comment.
Pull request overview
Adds a new TCP-based MORI-IO backend to enable engine communication over standard TCP networks (including multi-connection striping), and wires it through the C++ engine + Python bindings with benchmark and test coverage.
Changes:
- Implement TCP transport/backend and integrate it into the IO engine build and backend factory.
- Expose TCP backend configuration via pybind and Python convenience wrappers.
- Extend Python benchmark and add TCP-focused engine tests.
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
src/io/tcp/transport.hpp |
Defines TCP wire protocol, connection/worker structures, and the transport interface. |
src/io/tcp/transport.cpp |
Implements the TCP transport (epoll loop, control/data channels, striping, GPU staging). |
src/io/tcp/backend_impl.hpp |
Declares TcpBackend / TcpBackendSession integration layer. |
src/io/tcp/backend_impl.cpp |
Implements backend/session forwarding into the transport. |
src/io/engine.cpp |
Adds BackendType::TCP creation path and ephemeral port propagation to EngineDesc. |
src/io/CMakeLists.txt |
Adds TCP sources to the mori_io library build. |
include/mori/io/backend.hpp |
Introduces TcpBackendConfig to the public C++ backend config API. |
src/pybind/mori.cpp |
Exposes TcpBackendConfig to Python via pybind. |
python/mori/io/engine.py |
Allows IOEngine.create_backend() to default-configure TCP when config=None. |
python/mori/io/__init__.py |
Re-exports TcpBackendConfig in the Python package surface. |
tests/python/io/benchmark.py |
Adds "tcp" backend option and routes setup/flow accordingly. |
tests/python/io/test_engine.py |
Adds TCP engine tests (auto-bind port, CPU write/read, session batch write). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| bool DataConnectionWorker::RecvExact(uint8_t* dst, uint64_t len) { | ||
| uint64_t got = 0; | ||
| while (got < len) { | ||
| ssize_t n = | ||
| ::recv(fd_, dst + got, static_cast<size_t>(std::min<uint64_t>(len - got, 16ULL << 20)), 0); | ||
| if (n < 0) { | ||
| if (IsWouldBlock(errno)) continue; | ||
| PostEvent({WorkerEventType::CONN_ERROR, peerKey_, 0, 0, 0, false, nullptr, nullptr, | ||
| std::string("recv payload failed: ") + strerror(errno)}); | ||
| return false; |
There was a problem hiding this comment.
DataConnectionWorker::RecvExact (and the similar segment/discard helpers) busy-spins on EAGAIN/EWOULDBLOCK by immediately continuing the loop. With non-blocking sockets this can peg a CPU core when the peer is slow or the payload arrives in small chunks. Consider either using blocking sockets in the worker thread, or restructuring receive to return to poll() on EAGAIN and resume with saved progress.
| while (true) { | ||
| tcp::CtrlHeaderView hv; | ||
| if (!tcp::TryParseCtrlHeader(c->inbuf.data(), c->inbuf.size(), &hv)) { | ||
| if (c->inbuf.size() >= tcp::kCtrlHeaderSize) { | ||
| MORI_IO_ERROR("TCP: bad ctrl header fd {}", c->fd); | ||
| ClosePeerByFd(c->fd); | ||
| } | ||
| break; | ||
| } | ||
| if (c->inbuf.size() < tcp::kCtrlHeaderSize + hv.bodyLen) break; | ||
|
|
There was a problem hiding this comment.
HandleCtrlReadable trusts hv.bodyLen from the wire protocol and keeps appending to c->inbuf until the full body arrives. A malicious or buggy peer can advertise a huge bodyLen and cause unbounded memory growth (potential OOM/DoS). Consider enforcing a reasonable maximum ctrl frame/body size (and similarly bounding HELLO key length) and closing the connection when the limit is exceeded.
| py::arg("sock_sndbuf_bytes") = 4 * 1024 * 1024, | ||
| py::arg("sock_rcvbuf_bytes") = 4 * 1024 * 1024, py::arg("op_timeout_ms") = 30 * 1000, | ||
| py::arg("enable_keepalive") = true, py::arg("keepalive_idle_sec") = 30, | ||
| py::arg("keepalive_intvl_sec") = 10, py::arg("keepalive_cnt") = 3, | ||
| py::arg("enable_ctrl_nodelay") = true, py::arg("num_data_conns") = 8, | ||
| py::arg("striping_threshold_bytes") = 256 * 1024) |
There was a problem hiding this comment.
The Python binding constructor sets TcpBackendConfig defaults (e.g., 4MiB socket buffers and 256KiB striping threshold) that differ from the C++ struct member defaults (32MiB and 64KiB). This means TcpBackendConfig() behaves differently in C++ vs Python, which is surprising for API consumers. Consider aligning the pybind defaults with the C++ defaults, or exposing py::init<>() (default ctor) so Python inherits the struct’s in-class initializers.
| py::arg("sock_sndbuf_bytes") = 4 * 1024 * 1024, | |
| py::arg("sock_rcvbuf_bytes") = 4 * 1024 * 1024, py::arg("op_timeout_ms") = 30 * 1000, | |
| py::arg("enable_keepalive") = true, py::arg("keepalive_idle_sec") = 30, | |
| py::arg("keepalive_intvl_sec") = 10, py::arg("keepalive_cnt") = 3, | |
| py::arg("enable_ctrl_nodelay") = true, py::arg("num_data_conns") = 8, | |
| py::arg("striping_threshold_bytes") = 256 * 1024) | |
| py::arg("sock_sndbuf_bytes") = 32 * 1024 * 1024, | |
| py::arg("sock_rcvbuf_bytes") = 32 * 1024 * 1024, py::arg("op_timeout_ms") = 30 * 1000, | |
| py::arg("enable_keepalive") = true, py::arg("keepalive_idle_sec") = 30, | |
| py::arg("keepalive_intvl_sec") = 10, py::arg("keepalive_cnt") = 3, | |
| py::arg("enable_ctrl_nodelay") = true, py::arg("num_data_conns") = 8, | |
| py::arg("striping_threshold_bytes") = 64 * 1024) |
| # Single read (b -> a) | ||
| dst.zero_() | ||
| uid = a.allocate_transfer_uid() | ||
| st = a.read(src_md, 0, dst_md, 0, src.numel() * src.element_size(), uid) | ||
| st.Wait() | ||
| assert st.Succeeded(), st.Message() | ||
| bst = _wait_inbound_status(b, a_desc.key, uid) | ||
| assert bst.Succeeded(), bst.Message() | ||
| assert torch.equal(src, dst) |
There was a problem hiding this comment.
In the TCP read test, a.read(src_md, ...) uses src_md as the local destination, so it overwrites src. That makes the later batch-write assertions compare against a mutated src (likely all zeros after the read), weakening the test coverage for non-zero batch writes. Consider using a separate local destination tensor for the read (or re-initializing src after the read) so the batch-write checks validate transferring the intended pattern.
Motivation
This PR introduces an usable TCP backend for MORI-IO so engine communication can run over standard TCP networking environments.
It also improves throughput scalability for large transfers by enabling multi-connection striping and adds coverage in both benchmark and Python tests.
Technical Details
src/io/tcp/transport.cppsrc/io/tcp/transport.hppsrc/io/tcp/backend_impl.cppsrc/io/tcp/backend_impl.hppinclude/mori/io/backend.hppsrc/io/engine.cppsrc/pybind/mori.cpppython/mori/io/__init__.pypython/mori/io/engine.pytests/python/io/benchmark.pytests/python/io/test_engine.pysrc/io/CMakeLists.txtTest Plan
Test Result
Benchmark/UT completed successfully.