Skip to content

Per-message allocation and copy in from_rmw_message causes large-message throughput to scale poorly #628

@azerupi

Description

@azerupi

Summary

Every received message goes through a RmwMsg::default() allocation followed by an element-by-element conversion in from_rmw_message. For large payloads this is the dominant bottleneck: at 64 KB, rclrs throughput is ~18x slower than rclcpp even after eliminating the thread-per-spin overhead (see #627).

Minimal reproduction

Using the same benchmark structure as #627, but with a message size of 65536 bytes instead of 0 bytes.

rclrs (Rust)

use rclrs::{Context, CreateBasicExecutor, IntoPrimitiveOptions, QoSProfile, SpinOptions};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};

fn main() {
    let context = Context::default();
    let mut executor = context.create_basic_executor();
    let node = executor.create_node("bench").unwrap();

    let qos = QoSProfile::default().reliable().keep_last(1000);
    let recv_count = Arc::new(AtomicU64::new(0));

    let pub_handle = node
        .create_publisher::<std_msgs::msg::UInt8MultiArray>("bench_topic".qos(qos.clone()))
        .unwrap();

    let count = Arc::clone(&recv_count);
    let _sub = node
        .create_subscription::<std_msgs::msg::UInt8MultiArray, _>(
            "bench_topic".qos(qos.clone()),
            move |_msg: std_msgs::msg::UInt8MultiArray| {
                count.fetch_add(1, Ordering::Relaxed);
            },
        )
        .unwrap();

    let duration = Duration::from_secs(5);
    let mut sent: u64 = 0;
    let start = Instant::now();

    while start.elapsed() < duration {
        let mut msg = std_msgs::msg::UInt8MultiArray::default();
        msg.data = vec![0u8; 65536];
        pub_handle.publish(msg).ok();
        sent += 1;

        // Assumes a spin_some that runs on the calling thread (see issue #NNN)
        executor.spin_some(SpinOptions::new().timeout(Duration::ZERO));
    }

    let received = recv_count.load(Ordering::Relaxed);
    let elapsed = start.elapsed().as_secs_f64();
    eprintln!("Sent:       {sent}");
    eprintln!("Received:   {received}");
    eprintln!("Throughput: {:.0} msg/s", received as f64 / elapsed);
}

rclcpp (C++) equivalent

#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/u_int8_multi_array.hpp>
#include <atomic>
#include <chrono>
#include <cstdio>

int main(int argc, char** argv) {
    rclcpp::init(argc, argv);

    auto executor = std::make_shared<rclcpp::executors::SingleThreadedExecutor>();
    auto node = std::make_shared<rclcpp::Node>("spin_bench");
    executor->add_node(node);

    auto qos = rclcpp::QoS(1000).reliable();
    std::atomic<uint64_t> recv_count{0};

    auto pub_handle = node->create_publisher<std_msgs::msg::UInt8MultiArray>("bench_topic", qos);

    auto sub = node->create_subscription<std_msgs::msg::UInt8MultiArray>(
        "bench_topic", qos,
        [&recv_count](const std_msgs::msg::UInt8MultiArray::SharedPtr) {
            recv_count.fetch_add(1, std::memory_order_relaxed);
        });

    uint64_t sent = 0;
    auto start = std::chrono::steady_clock::now();
    auto duration = std::chrono::seconds(5);

    while (std::chrono::steady_clock::now() - start < duration) {
        std_msgs::msg::UInt8MultiArray msg;
        msg.data.resize(65536, 0);
        pub_handle->publish(msg);
        sent++;

        executor->spin_some(std::chrono::milliseconds(0));
    }

    uint64_t received = recv_count.load();
    double elapsed = std::chrono::duration<double>(
        std::chrono::steady_clock::now() - start).count();

    fprintf(stderr, "Sent:       %lu\n", sent);
    fprintf(stderr, "Received:   %lu\n", received);
    fprintf(stderr, "Throughput: %.0f msg/s\n", received / elapsed);

    rclcpp::shutdown();
    return 0;
}

Results (release build, 3 runs averaged, same machine)

Implementation 0 B payload 64 KB payload Slowdown factor
rclcpp spin_some ~450k msg/s ~273k msg/s 1.6x
rclrs spin_some ~373k msg/s ~15k msg/s 25x

At 0 B, rclrs is within 1.2x of rclcpp (see #627). At 64 KB, it is 18x slower. rclcpp barely slows down with larger messages (1.6x), while rclrs degrades dramatically (25x).

Most likely root cause

There are two sources of per-message overhead that scale with payload size.

1. Wasted RmwMsg::default() allocation

Every take() call creates a fresh RMW message via default() (subscription.rs:352):

let mut rmw_message = <T as Message>::RmwMsg::default();   // C heap alloc
Self::take_inner::<T>(self, &mut rmw_message)?;            // rcl_take fills it
Ok((T::from_rmw_message(rmw_message), message_info))       // convert to Rust

RmwMsg::default() calls the C init() function which pre-allocates memory for all sequences and strings. For a message with a 64 KB sequence field, this allocates 64 KB that is immediately overwritten by rcl_take and then freed when the RMW message is dropped. This is a wasted alloc/dealloc cycle per message.

2. Element-by-element sequence conversion in from_rmw_message

The generated from_rmw_message code converts Sequence<T> to Vec<T> via .into_iter().collect(). The SequenceIterator::next() implementation (rosidl_runtime_rs/src/sequence.rs) does this per element:

let elem = ptr.read();
ptr.write(std::mem::zeroed::<T>());  // writes zero back for EVERY element

For a 64 KB Sequence<u8>, this is 65,536 individual read + zero-write + insert cycles instead of a single memcpy.

How rclcpp avoids this

rclcpp uses C++ message types directly. The RMW layer writes into the final message object, so there is no temporary buffer, no type conversion step, and no element-by-element copying. The message exists in one place from rcl_take through callback invocation.

Possible improvements

  1. Efficient sequence conversion: For Copy types (all ROS 2 primitives), Sequence::as_slice().to_vec() compiles to a single memcpy. A From<Sequence<T>> for Vec<T> impl with a Copy bound, combined with a code generator change to emit .into() instead of .into_iter().collect() for BasicType sequences, would eliminate the per-element overhead.

  2. RMW message buffer reuse: Store a reusable RmwMsg buffer in the subscription instead of allocating a fresh one per take(). This avoids the wasted alloc/dealloc cycle from RmwMsg::default().

Impact

This affects any subscription receiving messages with variable-length fields (sequences, strings). The overhead scales linearly with payload size, making rclrs unsuitable for high-throughput large-message workloads (e.g. point clouds, images) without workarounds like ReadOnlyLoanedMessage.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions