Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 59 additions & 16 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -732,12 +732,36 @@ where
self.e2e_registry.unregister(key);
}

/// Run the server event loop
/// Run the server event loop with caller-provided receive buffers.
///
/// Handles incoming subscription requests and manages event groups.
/// Listens on both the unicast socket (for direct requests) and the
/// SD multicast socket (for `FindService` and `SubscribeEventGroup`).
///
/// `unicast_buf` and `sd_buf` are caller-supplied scratch buffers
/// for incoming datagrams. Each must be at least one MTU
/// (~1500 bytes) and ideally up to the IP datagram limit
/// (64 KiB - 1) — peer SD messages are bounded by the link MTU,
/// but a SOME/IP server should not silently cap at 1500 because
/// it is a sink for any peer datagram landing on its SD or
/// unicast port. Backends that surface truncation
/// (`ReceivedDatagram::truncated`) emit a `tracing::warn!` when
/// the caller's buffer was too small; backends that don't
/// (TokioSocket today) silently truncate at the OS level.
Comment on lines +747 to +750
///
/// On bare-metal, callers typically place the buffers in
/// `static` storage:
/// ```ignore
/// static mut UNICAST_BUF: [u8; 65535] = [0; 65535];
/// static mut SD_BUF: [u8; 65535] = [0; 65535];
/// // SAFETY: only one task drives `run_with_buffers` for a given Server.
/// unsafe { server.run_with_buffers(&mut UNICAST_BUF, &mut SD_BUF).await }?;
/// ```
///
/// On std (or any alloc-using target), [`Self::run`] is the
/// convenience shim that heap-allocates 64 KiB buffers and
/// delegates here.
///
/// # Errors
///
/// Returns [`Error::Io`] with [`std::io::ErrorKind::InvalidInput`] if
Expand All @@ -747,7 +771,11 @@ where
///
/// Otherwise returns an error if receiving from a socket fails or
/// handling an SD message fails.
pub async fn run(&mut self) -> Result<(), Error> {
pub async fn run_with_buffers(
&mut self,
unicast_buf: &mut [u8],
sd_buf: &mut [u8],
) -> Result<(), Error> {
use crate::protocol::MessageView;

if self.is_passive {
Expand All @@ -761,18 +789,6 @@ where
return Err(Error::InvalidUsage("passive_server_run"));
}

// Incoming-peer buffers sized to the IP datagram limit (64 KiB - 1).
// Do NOT shrink to `UDP_BUFFER_SIZE` (1500): peer SD messages are
// bounded by the link MTU but `recv_from` here is a server-side
// sink for any peer datagram landing on the SD/unicast port, and
// larger-than-MTU peer messages must surface (or be cleanly
// truncated by the kernel) rather than being silently capped at
// 1500 by an undersized buffer. Out-going `EventPublisher` paths
// do use the smaller `UDP_BUFFER_SIZE` because we control the
// wire size of what we emit; that asymmetry is intentional.
let mut unicast_buf = alloc::vec![0u8; 65535];
let mut sd_buf = alloc::vec![0u8; 65535];

loop {
// `select!` (not `select_biased!`) gives pseudo-random fairness
// across ready arms each poll — matches the prior
Expand All @@ -794,12 +810,16 @@ where
// select macro returns, freeing the buffer we index into
// below.
let (len, addr, source, from_unicast) = {
// Reborrow `&mut *foo` rather than `&mut foo` because
// `unicast_buf` / `sd_buf` are `&mut [u8]` parameters
// here (caller-owned), not owned `Vec<u8>` locals
// — direct `&mut foo` would produce `&mut &mut [u8]`.
let unicast_fut = self
.unicast_socket
.socket()
.recv_from(&mut unicast_buf)
.recv_from(&mut *unicast_buf)
.fuse();
let sd_fut = self.sd_socket.socket().recv_from(&mut sd_buf).fuse();
let sd_fut = self.sd_socket.socket().recv_from(&mut *sd_buf).fuse();
Comment on lines +813 to +822
pin_mut!(unicast_fut, sd_fut);
select_biased! {
result = unicast_fut => {
Expand Down Expand Up @@ -879,6 +899,29 @@ where
}
}

/// Run the server event loop with heap-allocated 64 KiB recv buffers.
///
/// Convenience wrapper over [`Self::run_with_buffers`] for callers
/// who have an allocator available — this is the simplest entry
/// point for std and bare-metal-with-alloc consumers. Bare-metal
/// callers without an allocator must use
/// [`Self::run_with_buffers`] directly with caller-supplied
/// buffers (e.g. `static`-declared `[u8; N]` arrays).
///
/// The 64 KiB sizing matches the IP datagram limit so the server
/// surfaces (or cleanly truncates at the OS level) any peer
/// datagram that exceeds the link MTU. See
/// [`Self::run_with_buffers`] for the full sizing rationale.
///
/// # Errors
///
/// Same as [`Self::run_with_buffers`].
pub async fn run(&mut self) -> Result<(), Error> {
let mut unicast_buf = alloc::vec![0u8; 65535];
let mut sd_buf = alloc::vec![0u8; 65535];
self.run_with_buffers(&mut unicast_buf, &mut sd_buf).await
}

/// Handle a Service Discovery message
#[allow(clippy::too_many_lines)]
async fn handle_sd_message(
Expand Down