Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 30 additions & 10 deletions src/dds/participant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ pub struct DomainParticipantBuilder {
which interfaces the DomainParticipant will talk to. */
only_networks: Option<Vec<String>>, // if specified, run RTPS only over these interfaces

socket_receive_buffer_size: usize,

#[cfg(feature = "security")]
security_plugins: Option<SecurityPlugins>,
#[cfg(feature = "security")]
Expand All @@ -83,13 +85,21 @@ impl DomainParticipantBuilder {
DomainParticipantBuilder {
domain_id,
only_networks: None,
socket_receive_buffer_size: Self::DEFAULT_SOCKET_RECEIVE_BUFFER_SIZE,
#[cfg(feature = "security")]
security_plugins: None,
#[cfg(feature = "security")]
sec_properties: None,
}
}

pub const DEFAULT_SOCKET_RECEIVE_BUFFER_SIZE: usize = 8 * 1024 * 1024;

pub fn socket_receive_buffer_size(mut self, size: usize) -> Self {
self.socket_receive_buffer_size = size;
self
}

#[cfg(feature = "security")]
/// Low-level security configuration, which allows supplying custom plugins.
pub fn security(
Expand Down Expand Up @@ -250,6 +260,7 @@ impl DomainParticipantBuilder {
status_sender.clone(),
status_receiver,
security_plugins_handle.clone(),
self.socket_receive_buffer_size,
)?;

// outer DP wrapper
Expand Down Expand Up @@ -774,6 +785,7 @@ impl DomainParticipantDisc {
status_sender: StatusChannelSender<DomainParticipantStatusEvent>,
status_receiver: StatusChannelReceiver<DomainParticipantStatusEvent>,
security_plugins_handle: Option<SecurityPluginsHandle>,
socket_receive_buffer_size: usize,
) -> CreateResult<Self> {
let dpi = DomainParticipantInner::new(
domain_id,
Expand All @@ -785,6 +797,7 @@ impl DomainParticipantDisc {
status_sender,
status_receiver,
security_plugins_handle,
socket_receive_buffer_size,
)?;

Ok(Self {
Expand Down Expand Up @@ -999,16 +1012,18 @@ impl DomainParticipantInner {
status_sender: StatusChannelSender<DomainParticipantStatusEvent>,
status_receiver: StatusChannelReceiver<DomainParticipantStatusEvent>,
security_plugins_handle: Option<SecurityPluginsHandle>,
socket_receive_buffer_size: usize,
) -> CreateResult<Self> {
#[cfg(not(feature = "security"))]
let _dummy = _qos_policies; // to make clippy happy

let mut listeners = HashMap::new();

match UDPListener::new_multicast(
match UDPListener::new_multicast_with_buf_size(
"0.0.0.0",
spdp_well_known_multicast_port(domain_id),
Ipv4Addr::new(239, 255, 0, 1),
socket_receive_buffer_size,
) {
Ok(l) => {
listeners.insert(DISCOVERY_MUL_LISTENER_TOKEN, l);
Expand All @@ -1023,9 +1038,10 @@ impl DomainParticipantInner {
// Magic value 120 below is from RTPS spec 2.5 Section "9.6.2.3 Default Port
// Numbers"
while discovery_listener.is_none() && participant_id < 120 {
discovery_listener = UDPListener::new_unicast(
discovery_listener = UDPListener::new_unicast_with_buf_size(
"0.0.0.0",
spdp_well_known_unicast_port(domain_id, participant_id),
socket_receive_buffer_size,
)
.ok();
if discovery_listener.is_none() {
Expand All @@ -1044,31 +1060,35 @@ impl DomainParticipantInner {

// Now the user traffic listeners

match UDPListener::new_multicast(
match UDPListener::new_multicast_with_buf_size(
"0.0.0.0",
user_traffic_multicast_port(domain_id),
Ipv4Addr::new(239, 255, 0, 1),
socket_receive_buffer_size,
) {
Ok(l) => {
listeners.insert(USER_TRAFFIC_MUL_LISTENER_TOKEN, l);
}
Err(e) => warn!("Cannot get multicast user traffic listener: {e:?}"),
}

let user_traffic_listener = UDPListener::new_unicast(
let user_traffic_listener = UDPListener::new_unicast_with_buf_size(
"0.0.0.0",
user_traffic_unicast_port(domain_id, participant_id),
socket_receive_buffer_size,
)
.or_else(|e| {
if matches!(e.kind(), ErrorKind::AddrInUse) {
// If we do not get the preferred listening port,
// try again, with "any" port number.
UDPListener::new_unicast("0.0.0.0", 0).or_else(|e| {
create_error_out_of_resources!(
"Could not open unicast user traffic listener, any port number: {:?}",
e
)
})
UDPListener::new_unicast_with_buf_size("0.0.0.0", 0, socket_receive_buffer_size).or_else(
|e| {
create_error_out_of_resources!(
"Could not open unicast user traffic listener, any port number: {:?}",
e
)
},
)
} else {
create_error_out_of_resources!("Could not open unicast user traffic listener: {e:?}")
}
Expand Down
32 changes: 30 additions & 2 deletions src/network/udp_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,18 @@ impl UDPListener {
host: &str,
port: u16,
reuse_addr: bool,
recv_buffer_size: usize,
) -> io::Result<mio_06::net::UdpSocket> {
let raw_socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?;

if recv_buffer_size > 0 {
raw_socket
.set_recv_buffer_size(recv_buffer_size)
.unwrap_or_else(|e| {
warn!("Failed to set SO_RCVBUF to {recv_buffer_size}: {e}. Using OS default.");
});
}

// We set ReuseAddr so that other DomainParticipants on this host can
// bind to the same multicast address and port.
// To have an effect on bind, this must be done before bind call, so must be
Expand Down Expand Up @@ -102,8 +111,17 @@ impl UDPListener {
}
}

#[cfg(test)]
pub fn new_unicast(host: &str, port: u16) -> io::Result<Self> {
let mio_socket = Self::new_listening_socket(host, port, false)?;
Self::new_unicast_with_buf_size(host, port, 0)
}

pub fn new_unicast_with_buf_size(
host: &str,
port: u16,
recv_buffer_size: usize,
) -> io::Result<Self> {
let mio_socket = Self::new_listening_socket(host, port, false, recv_buffer_size)?;

Ok(Self {
socket: mio_socket,
Expand All @@ -112,15 +130,25 @@ impl UDPListener {
})
}

#[cfg(test)]
pub fn new_multicast(host: &str, port: u16, multicast_group: Ipv4Addr) -> io::Result<Self> {
Self::new_multicast_with_buf_size(host, port, multicast_group, 0)
}

pub fn new_multicast_with_buf_size(
host: &str,
port: u16,
multicast_group: Ipv4Addr,
recv_buffer_size: usize,
) -> io::Result<Self> {
if !multicast_group.is_multicast() {
return io::Result::Err(io::Error::new(
io::ErrorKind::Other,
"Not a multicast address",
));
}

let mio_socket = Self::new_listening_socket(host, port, true)?;
let mio_socket = Self::new_listening_socket(host, port, true, recv_buffer_size)?;

for multicast_if_ipaddr in get_local_multicast_ip_addrs()? {
match multicast_if_ipaddr {
Expand Down
Loading