Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 47 additions & 112 deletions src/server/event_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ use crate::UDP_BUFFER_SIZE;
use crate::e2e::E2EKey;
use crate::protocol::{Header, Message};
use crate::traits::{PayloadWireFormat, WireFormat};
use crate::transport::{E2ERegistryHandle, SocketHandle, TransportSocket};
#[cfg(any(feature = "embassy_channels", feature = "server"))]
use crate::transport::{E2ERegistryHandle, SharedHandle, TransportSocket};
#[cfg(test)]
Copy link

Copilot AI Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arc is only used inside the #[cfg(all(test, feature = "server-tokio"))] test module below, but the import is gated with just #[cfg(test)]. This will produce an unused_imports warning when compiling unit tests without server-tokio enabled. Consider moving the Arc import into the test module or gating it with the same all(test, feature = "server-tokio") condition.

Suggested change
#[cfg(test)]
#[cfg(all(test, feature = "server-tokio"))]

Copilot uses AI. Check for mistakes.
use alloc::sync::Arc;
use core::marker::PhantomData;
use core::net::SocketAddrV4;
use heapless::Vec as HeaplessVec;

Expand All @@ -23,11 +24,11 @@ const _: () = assert!(

/// Publishes events to subscribers.
///
/// Generic over `H: SocketHandle` (abstracting the storage of the
/// transport socket — `Arc<TokioSocket>` in the std/tokio path,
/// `StaticSocketHandle<EmbassyNetSocket>` on bare metal, etc.),
/// `R: E2ERegistryHandle`, and `S: SubscriptionHandle`. The
/// underlying socket type is reachable as `H::Socket`.
/// Generic over `H: SharedHandle<T>` (abstracting how the
/// transport socket is shared — `Arc<T>` in alloc-using builds,
/// `&'static T` on bare-metal-no-alloc), `T: TransportSocket`
/// (the concrete underlying socket type), `R: E2ERegistryHandle`,
/// and `S: SubscriptionHandle`.
///
/// Pre-19f revision: this type held an `Arc<T>` directly and required
/// `T: Send + Sync + 'static`. The handle indirection drops the
Expand All @@ -36,33 +37,47 @@ const _: () = assert!(
/// construct an `EventPublisher`. Multi-threaded callers continue
/// to use `Arc<T>` (which is `Send + Sync` whenever `T` is) without
/// any change.
pub struct EventPublisher<R, S, H>
///
/// The explicit `T` parameter is the price of consolidating all
/// three former handle traits (Phase 20e) into a single
/// [`SharedHandle<T>`]: the trait carries `T` as a generic, not
/// as an associated type, so consumers that need to name the
/// socket type spell it out.
pub struct EventPublisher<R, S, H, T>
where
R: E2ERegistryHandle,
S: SubscriptionHandle,
H: SocketHandle,
T: TransportSocket + 'static,
H: SharedHandle<T>,
{
subscriptions: S,
socket: H,
e2e_registry: R,
/// `T` appears only in the bound `H: SharedHandle<T>`; the
/// struct doesn't directly hold a `T`. PhantomData carries the
/// type so the parameter is well-formed without affecting
/// drop-check or auto-trait propagation negatively.
_phantom: PhantomData<T>,
}

impl<R, S, H> EventPublisher<R, S, H>
impl<R, S, H, T> EventPublisher<R, S, H, T>
where
R: E2ERegistryHandle,
S: SubscriptionHandle,
H: SocketHandle,
T: TransportSocket + 'static,
H: SharedHandle<T>,
{
/// Create a new event publisher.
///
/// `socket` is whatever `SocketHandle` impl the caller chose for
/// storage — `Arc<T>` on std, `StaticSocketHandle<T>` on bare
/// metal.
/// `socket` is whatever [`SharedHandle<T>`] impl the caller
/// chose for storage — `Arc<T>` on std/alloc, `&'static T` on
/// bare-metal-no-alloc.
pub fn new(subscriptions: S, socket: H, e2e_registry: R) -> Self {
Self {
subscriptions,
socket,
e2e_registry,
_phantom: PhantomData,
}
}

Expand Down Expand Up @@ -197,7 +212,7 @@ where
let mut sent_count = 0usize;
let mut last_err: Option<crate::transport::TransportError> = None;
for addr in &subscribers {
match self.socket.socket().send_to(datagram, *addr).await {
match self.socket.get().send_to(datagram, *addr).await {
Ok(()) => {
sent_count += 1;
tracing::trace!(
Expand Down Expand Up @@ -323,7 +338,7 @@ where
let mut sent_count = 0usize;
let mut last_err: Option<crate::transport::TransportError> = None;
for addr in &subscribers {
match self.socket.socket().send_to(datagram, *addr).await {
match self.socket.get().send_to(datagram, *addr).await {
Ok(()) => {
sent_count += 1;
}
Expand Down Expand Up @@ -451,99 +466,13 @@ where
}
}

/// Shared handle to the [`EventPublisher`] backing a
/// [`Server`](super::Server).
///
/// Abstracts how the event publisher is shared between the Server's
/// run loop and any external task that wants to publish events
/// (`server.publisher().publish_event(...)`). Two impls ship out
/// of the box, mirroring the pattern established by
/// [`crate::transport::SocketHandle`] and
/// [`super::SdStateHandle`]:
///
/// - `Arc<EventPublisher<R, S, H>>` on alloc-using builds — the
/// default for `Server::new_with_deps` / `new_passive_with_deps`.
/// - `&'static EventPublisher<R, S, H>` on bare-metal-no-alloc
/// — caller declares a `static` somewhere and supplies the
/// reference into a future `Server::new_with_handles`
/// constructor.
///
/// `Clone + 'static` only — neither `Send` nor `Sync` at the trait
/// level. Method-level `where` clauses on Server add Send bounds
/// at use sites that need them (`announcement_loop`'s
/// `+ Send`-bounded return type, etc.).
pub trait EventPublisherHandle<R, S, H>: Clone + 'static
where
R: E2ERegistryHandle,
S: SubscriptionHandle,
H: SocketHandle,
{
/// Borrow the underlying [`EventPublisher`] for read-only
/// access. Used by Server's run loop and accessor.
fn publisher(&self) -> &EventPublisher<R, S, H>;
}

// `&'static EventPublisher<...>`: trivially `Copy + Clone + 'static`
// for any 'static publisher. Caller arranges the static storage.
impl<R, S, H> EventPublisherHandle<R, S, H> for &'static EventPublisher<R, S, H>
where
R: E2ERegistryHandle,
S: SubscriptionHandle,
H: SocketHandle,
{
fn publisher(&self) -> &EventPublisher<R, S, H> {
self
}
}

#[cfg(any(feature = "embassy_channels", feature = "server"))]
impl<R, S, H> EventPublisherHandle<R, S, H> for Arc<EventPublisher<R, S, H>>
where
R: E2ERegistryHandle,
S: SubscriptionHandle,
H: SocketHandle,
{
fn publisher(&self) -> &EventPublisher<R, S, H> {
self
}
}

/// Extension of [`EventPublisherHandle`] for handles that can be
/// constructed inline from an owned [`EventPublisher`].
///
/// Required by `Server` constructors that build the publisher
/// internally (the alloc-using path). The future
/// `Server::new_with_handles` will accept a pre-built `Hep:
/// EventPublisherHandle` directly and won't need this trait —
/// callers using `&'static EventPublisher<...>` declare their
/// `static` storage themselves and pass the reference in.
///
/// Mirrors the [`crate::transport::WrappableSocketHandle`] /
/// [`super::WrappableSdStateHandle`] split: the basic `Handle`
/// trait gives read access; the `Wrappable*` extension adds the
/// inline-construction path that requires an allocator.
pub trait WrappableEventPublisherHandle<R, S, H>: EventPublisherHandle<R, S, H>
where
R: E2ERegistryHandle,
S: SubscriptionHandle,
H: SocketHandle,
{
/// Place an owned [`EventPublisher`] behind this handle's
/// shared storage.
fn wrap(publisher: EventPublisher<R, S, H>) -> Self;
}

#[cfg(any(feature = "embassy_channels", feature = "server"))]
impl<R, S, H> WrappableEventPublisherHandle<R, S, H> for Arc<EventPublisher<R, S, H>>
where
R: E2ERegistryHandle,
S: SubscriptionHandle,
H: SocketHandle,
{
fn wrap(publisher: EventPublisher<R, S, H>) -> Self {
Arc::new(publisher)
}
}
// Phase 20e collapsed `EventPublisherHandle<R, S, H>` /
// `WrappableEventPublisherHandle<R, S, H>` into the unified
// `crate::transport::SharedHandle<EventPublisher<R, S, H, T>>` /
// `WrappableSharedHandle<EventPublisher<R, S, H, T>>` traits. The
// blanket impls there cover both `&'static EventPublisher<...>`
// and `Arc<EventPublisher<...>>`; no dedicated trait survives
// here.

#[cfg(all(test, feature = "server-tokio"))]
mod tests {
Expand All @@ -561,9 +490,13 @@ mod tests {

/// Type alias bringing the tokio-flavor concrete type parameters back
/// into scope so tests can spell `TestEventPublisher` without
/// chasing the three-type-parameter signature on every call site.
type TestEventPublisher =
EventPublisher<Arc<Mutex<E2ERegistry>>, Arc<RwLock<SubscriptionManager>>, Arc<TokioSocket>>;
/// chasing the four-type-parameter signature on every call site.
type TestEventPublisher = EventPublisher<
Arc<Mutex<E2ERegistry>>,
Arc<RwLock<SubscriptionManager>>,
Arc<TokioSocket>,
TokioSocket,
>;

fn test_registry() -> Arc<Mutex<E2ERegistry>> {
Arc::new(Mutex::new(E2ERegistry::new()))
Expand Down Expand Up @@ -738,6 +671,7 @@ mod tests {
Arc<Mutex<E2ERegistry>>,
Arc<RwLock<SubscriptionManager>>,
Arc<AlwaysFailSocket>,
AlwaysFailSocket,
> = EventPublisher::new(subscriptions, Arc::new(AlwaysFailSocket), test_registry());

let msg = make_test_message();
Expand Down Expand Up @@ -799,6 +733,7 @@ mod tests {
Arc<Mutex<E2ERegistry>>,
Arc<RwLock<SubscriptionManager>>,
Arc<AlwaysFailSocket>,
AlwaysFailSocket,
> = EventPublisher::new(subscriptions, Arc::new(AlwaysFailSocket), test_registry());

let err = publisher
Expand Down
Loading