From 23ab686403a036e524a220190bb0021a65aed7a4 Mon Sep 17 00:00:00 2001 From: Alvaro Gaona Date: Wed, 11 Mar 2026 01:42:21 +0100 Subject: [PATCH] fix: set SO_RCVBUF on UDP sockets to prevent fragment loss RustDDS used OS-default UDP receive buffers (~786 KB on macOS, ~208 KB on Linux), which caused silent packet loss when receiving large fragmented DDS messages (e.g., 1080p images at ~6 MB). The buffer overflowed during fragment bursts, and with BestEffort QoS, lost fragments are never retransmitted. Set SO_RCVBUF to 8 MB by default (configurable via DomainParticipantBuilder::socket_receive_buffer_size). On Linux, users may need to raise net.core.rmem_max for larger payloads. This matches the approach used by CycloneDDS and FastDDS. --- src/dds/participant.rs | 40 +++++++++++++++++++++++++++---------- src/network/udp_listener.rs | 32 +++++++++++++++++++++++++++-- 2 files changed, 60 insertions(+), 12 deletions(-) diff --git a/src/dds/participant.rs b/src/dds/participant.rs index 8f9af963..cba2b297 100644 --- a/src/dds/participant.rs +++ b/src/dds/participant.rs @@ -72,6 +72,8 @@ pub struct DomainParticipantBuilder { which interfaces the DomainParticipant will talk to. */ only_networks: Option>, // if specified, run RTPS only over these interfaces + socket_receive_buffer_size: usize, + #[cfg(feature = "security")] security_plugins: Option, #[cfg(feature = "security")] @@ -83,6 +85,7 @@ impl DomainParticipantBuilder { DomainParticipantBuilder { domain_id, only_networks: None, + socket_receive_buffer_size: Self::DEFAULT_SOCKET_RECEIVE_BUFFER_SIZE, #[cfg(feature = "security")] security_plugins: None, #[cfg(feature = "security")] @@ -90,6 +93,13 @@ impl DomainParticipantBuilder { } } + pub const DEFAULT_SOCKET_RECEIVE_BUFFER_SIZE: usize = 8 * 1024 * 1024; + + pub fn socket_receive_buffer_size(mut self, size: usize) -> Self { + self.socket_receive_buffer_size = size; + self + } + #[cfg(feature = "security")] /// Low-level security configuration, which allows supplying custom plugins. pub fn security( @@ -250,6 +260,7 @@ impl DomainParticipantBuilder { status_sender.clone(), status_receiver, security_plugins_handle.clone(), + self.socket_receive_buffer_size, )?; // outer DP wrapper @@ -774,6 +785,7 @@ impl DomainParticipantDisc { status_sender: StatusChannelSender, status_receiver: StatusChannelReceiver, security_plugins_handle: Option, + socket_receive_buffer_size: usize, ) -> CreateResult { let dpi = DomainParticipantInner::new( domain_id, @@ -785,6 +797,7 @@ impl DomainParticipantDisc { status_sender, status_receiver, security_plugins_handle, + socket_receive_buffer_size, )?; Ok(Self { @@ -999,16 +1012,18 @@ impl DomainParticipantInner { status_sender: StatusChannelSender, status_receiver: StatusChannelReceiver, security_plugins_handle: Option, + socket_receive_buffer_size: usize, ) -> CreateResult { #[cfg(not(feature = "security"))] let _dummy = _qos_policies; // to make clippy happy let mut listeners = HashMap::new(); - match UDPListener::new_multicast( + match UDPListener::new_multicast_with_buf_size( "0.0.0.0", spdp_well_known_multicast_port(domain_id), Ipv4Addr::new(239, 255, 0, 1), + socket_receive_buffer_size, ) { Ok(l) => { listeners.insert(DISCOVERY_MUL_LISTENER_TOKEN, l); @@ -1023,9 +1038,10 @@ impl DomainParticipantInner { // Magic value 120 below is from RTPS spec 2.5 Section "9.6.2.3 Default Port // Numbers" while discovery_listener.is_none() && participant_id < 120 { - discovery_listener = UDPListener::new_unicast( + discovery_listener = UDPListener::new_unicast_with_buf_size( "0.0.0.0", spdp_well_known_unicast_port(domain_id, participant_id), + socket_receive_buffer_size, ) .ok(); if discovery_listener.is_none() { @@ -1044,10 +1060,11 @@ impl DomainParticipantInner { // Now the user traffic listeners - match UDPListener::new_multicast( + match UDPListener::new_multicast_with_buf_size( "0.0.0.0", user_traffic_multicast_port(domain_id), Ipv4Addr::new(239, 255, 0, 1), + socket_receive_buffer_size, ) { Ok(l) => { listeners.insert(USER_TRAFFIC_MUL_LISTENER_TOKEN, l); @@ -1055,20 +1072,23 @@ impl DomainParticipantInner { Err(e) => warn!("Cannot get multicast user traffic listener: {e:?}"), } - let user_traffic_listener = UDPListener::new_unicast( + let user_traffic_listener = UDPListener::new_unicast_with_buf_size( "0.0.0.0", user_traffic_unicast_port(domain_id, participant_id), + socket_receive_buffer_size, ) .or_else(|e| { if matches!(e.kind(), ErrorKind::AddrInUse) { // If we do not get the preferred listening port, // try again, with "any" port number. - UDPListener::new_unicast("0.0.0.0", 0).or_else(|e| { - create_error_out_of_resources!( - "Could not open unicast user traffic listener, any port number: {:?}", - e - ) - }) + UDPListener::new_unicast_with_buf_size("0.0.0.0", 0, socket_receive_buffer_size).or_else( + |e| { + create_error_out_of_resources!( + "Could not open unicast user traffic listener, any port number: {:?}", + e + ) + }, + ) } else { create_error_out_of_resources!("Could not open unicast user traffic listener: {e:?}") } diff --git a/src/network/udp_listener.rs b/src/network/udp_listener.rs index 05e4b8c8..715ef5a0 100644 --- a/src/network/udp_listener.rs +++ b/src/network/udp_listener.rs @@ -47,9 +47,18 @@ impl UDPListener { host: &str, port: u16, reuse_addr: bool, + recv_buffer_size: usize, ) -> io::Result { let raw_socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?; + if recv_buffer_size > 0 { + raw_socket + .set_recv_buffer_size(recv_buffer_size) + .unwrap_or_else(|e| { + warn!("Failed to set SO_RCVBUF to {recv_buffer_size}: {e}. Using OS default."); + }); + } + // We set ReuseAddr so that other DomainParticipants on this host can // bind to the same multicast address and port. // To have an effect on bind, this must be done before bind call, so must be @@ -102,8 +111,17 @@ impl UDPListener { } } + #[cfg(test)] pub fn new_unicast(host: &str, port: u16) -> io::Result { - let mio_socket = Self::new_listening_socket(host, port, false)?; + Self::new_unicast_with_buf_size(host, port, 0) + } + + pub fn new_unicast_with_buf_size( + host: &str, + port: u16, + recv_buffer_size: usize, + ) -> io::Result { + let mio_socket = Self::new_listening_socket(host, port, false, recv_buffer_size)?; Ok(Self { socket: mio_socket, @@ -112,7 +130,17 @@ impl UDPListener { }) } + #[cfg(test)] pub fn new_multicast(host: &str, port: u16, multicast_group: Ipv4Addr) -> io::Result { + Self::new_multicast_with_buf_size(host, port, multicast_group, 0) + } + + pub fn new_multicast_with_buf_size( + host: &str, + port: u16, + multicast_group: Ipv4Addr, + recv_buffer_size: usize, + ) -> io::Result { if !multicast_group.is_multicast() { return io::Result::Err(io::Error::new( io::ErrorKind::Other, @@ -120,7 +148,7 @@ impl UDPListener { )); } - let mio_socket = Self::new_listening_socket(host, port, true)?; + let mio_socket = Self::new_listening_socket(host, port, true, recv_buffer_size)?; for multicast_if_ipaddr in get_local_multicast_ip_addrs()? { match multicast_if_ipaddr {