Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 95 additions & 1 deletion src/server/event_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::e2e::E2EKey;
use crate::protocol::{Header, Message};
use crate::traits::{PayloadWireFormat, WireFormat};
use crate::transport::{E2ERegistryHandle, SocketHandle, TransportSocket};
#[cfg(test)]
#[cfg(any(feature = "embassy_channels", feature = "server"))]
use alloc::sync::Arc;
use core::net::SocketAddrV4;
use heapless::Vec as HeaplessVec;
Expand Down Expand Up @@ -451,6 +451,100 @@ where
}
}

/// Shared handle to the [`EventPublisher`] backing a
/// [`Server`](super::Server).
///
/// Abstracts how the event publisher is shared between the Server's
/// run loop and any external task that wants to publish events
/// (`server.publisher().publish_event(...)`). Two impls ship out
/// of the box, mirroring the pattern established by
Comment on lines +458 to +460
Copy link

Copilot AI Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docs/examples here assume callers can do server.publisher().publish_event(...), but Server::publisher() now returns the generic Hep handle and EventPublisherHandle does not require Deref<Target = EventPublisher<...>>. That call pattern only works for the built-in Arc<_> / &'static _ handles (via deref), and will not compile for other valid handle types. Consider updating the docs to show the trait-based access pattern (e.g., borrowing via EventPublisherHandle::publisher()), or alternatively add a Deref bound if direct method calls on the handle are intended to be a guaranteed API.

Suggested change
/// run loop and any external task that wants to publish events
/// (`server.publisher().publish_event(...)`). Two impls ship out
/// of the box, mirroring the pattern established by
/// run loop and any external task that wants to publish events.
/// Callers should borrow the underlying publisher via
/// [`EventPublisherHandle::publisher`] before invoking publisher
/// methods (for example,
/// `server.publisher().publisher().publish_event(...)`). Two impls
/// ship out of the box, mirroring the pattern established by

Copilot uses AI. Check for mistakes.
/// [`crate::transport::SocketHandle`] and
/// [`super::SdStateHandle`]:
///
/// - `Arc<EventPublisher<R, S, H>>` on alloc-using builds — the
/// default for `Server::new_with_deps` / `new_passive_with_deps`.
/// - `&'static EventPublisher<R, S, H>` on bare-metal-no-alloc
/// — caller declares a `static` somewhere and supplies the
/// reference into a future `Server::new_with_handles`
/// constructor.
///
/// `Clone + 'static` only — neither `Send` nor `Sync` at the trait
/// level. Method-level `where` clauses on Server add Send bounds
/// at use sites that need them (`announcement_loop`'s
/// `+ Send`-bounded return type, etc.).
pub trait EventPublisherHandle<R, S, H>: Clone + 'static
where
R: E2ERegistryHandle,
S: SubscriptionHandle,
H: SocketHandle,
{
/// Borrow the underlying [`EventPublisher`] for read-only
/// access. Used by Server's run loop and accessor.
fn publisher(&self) -> &EventPublisher<R, S, H>;
}

// `&'static EventPublisher<...>`: trivially `Copy + Clone + 'static`
// for any 'static publisher. Caller arranges the static storage.
impl<R, S, H> EventPublisherHandle<R, S, H> for &'static EventPublisher<R, S, H>
where
R: E2ERegistryHandle,
S: SubscriptionHandle,
H: SocketHandle,
{
fn publisher(&self) -> &EventPublisher<R, S, H> {
self
}
}

#[cfg(any(feature = "embassy_channels", feature = "server"))]
impl<R, S, H> EventPublisherHandle<R, S, H> for Arc<EventPublisher<R, S, H>>
where
R: E2ERegistryHandle,
S: SubscriptionHandle,
H: SocketHandle,
{
fn publisher(&self) -> &EventPublisher<R, S, H> {
self
}
}

/// Extension of [`EventPublisherHandle`] for handles that can be
/// constructed inline from an owned [`EventPublisher`].
///
/// Required by `Server` constructors that build the publisher
/// internally (the alloc-using path). The future
/// `Server::new_with_handles` will accept a pre-built `Hep:
/// EventPublisherHandle` directly and won't need this trait —
/// callers using `&'static EventPublisher<...>` declare their
/// `static` storage themselves and pass the reference in.
///
/// Mirrors the [`crate::transport::WrappableSocketHandle`] /
/// [`super::WrappableSdStateHandle`] split: the basic `Handle`
/// trait gives read access; the `Wrappable*` extension adds the
/// inline-construction path that requires an allocator.
pub trait WrappableEventPublisherHandle<R, S, H>: EventPublisherHandle<R, S, H>
where
R: E2ERegistryHandle,
S: SubscriptionHandle,
H: SocketHandle,
{
/// Place an owned [`EventPublisher`] behind this handle's
/// shared storage.
fn wrap(publisher: EventPublisher<R, S, H>) -> Self;
}

#[cfg(any(feature = "embassy_channels", feature = "server"))]
impl<R, S, H> WrappableEventPublisherHandle<R, S, H> for Arc<EventPublisher<R, S, H>>
where
R: E2ERegistryHandle,
S: SubscriptionHandle,
H: SocketHandle,
{
fn wrap(publisher: EventPublisher<R, S, H>) -> Self {
Arc::new(publisher)
}
}

#[cfg(all(test, feature = "server-tokio"))]
mod tests {
use super::*;
Expand Down
43 changes: 30 additions & 13 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ mod service_info;
mod subscription_manager;

pub use error::Error;
pub use event_publisher::EventPublisher;
pub use event_publisher::{EventPublisher, EventPublisherHandle, WrappableEventPublisherHandle};
pub use service_info::Subscriber;
#[cfg(feature = "std")]
pub use service_info::{EventGroupInfo, ServiceInfo};
Expand Down Expand Up @@ -143,15 +143,23 @@ where
/// these as `Arc<Mutex<E2ERegistry>>` / `Arc<RwLock<SubscriptionManager>>`
/// / `TokioTransport` / `TokioTimer`. Bare-metal callers use
/// [`Self::new_with_deps`] (under `server`) and supply their own.
pub struct Server<R, S, F, Tm, H = Arc<<F as TransportFactory>::Socket>, Hsd = Arc<SdStateManager>>
where
pub struct Server<
R,
S,
F,
Tm,
H = Arc<<F as TransportFactory>::Socket>,
Hsd = Arc<SdStateManager>,
Hep = Arc<EventPublisher<R, S, H>>,
> where
R: E2ERegistryHandle,
S: SubscriptionHandle,
F: TransportFactory + 'static,
F::Socket: 'static,
Tm: Timer + Clone + 'static,
H: SocketHandle<Socket = F::Socket>,
Hsd: SdStateHandle,
Hep: EventPublisherHandle<R, S, H>,
{
config: ServerConfig,
/// Socket for receiving subscription requests, behind whatever
Expand All @@ -163,8 +171,10 @@ where
sd_socket: H,
/// Subscription manager
subscriptions: S,
/// Event publisher
publisher: Arc<EventPublisher<R, S, H>>,
/// Event publisher, behind whatever shared-storage `Hep` chose
/// (`Arc<EventPublisher<R, S, H>>` on std,
/// `&'static EventPublisher<R, S, H>` on bare-metal-no-alloc).
publisher: Hep,
/// SD session-ID counter and announcement emitter, behind whatever
/// shared-storage `Hsd` chose (`Arc<SdStateManager>` on std,
/// `&'static SdStateManager` on bare-metal-no-alloc).
Expand Down Expand Up @@ -282,7 +292,7 @@ impl
}
}

impl<R, S, F, Tm, H, Hsd> Server<R, S, F, Tm, H, Hsd>
impl<R, S, F, Tm, H, Hsd, Hep> Server<R, S, F, Tm, H, Hsd, Hep>
where
R: E2ERegistryHandle,
S: SubscriptionHandle,
Expand All @@ -291,6 +301,7 @@ where
Tm: Timer + Clone + 'static,
H: WrappableSocketHandle<Socket = F::Socket>,
Hsd: WrappableSdStateHandle,
Hep: WrappableEventPublisherHandle<R, S, H>,
{
/// Bare-metal-friendly constructor that takes every dependency
/// explicitly via a [`ServerDeps`] bundle. The `server-tokio`
Expand Down Expand Up @@ -358,7 +369,7 @@ where
sd::MULTICAST_IP
);

let publisher = Arc::new(EventPublisher::new(
let publisher = Hep::wrap(EventPublisher::new(
subscriptions.clone(),
unicast_socket.clone(),
e2e_registry.clone(),
Expand Down Expand Up @@ -428,7 +439,7 @@ where
sd_placeholder_addr
);

let publisher = Arc::new(EventPublisher::new(
let publisher = Hep::wrap(EventPublisher::new(
subscriptions.clone(),
unicast_socket.clone(),
e2e_registry.clone(),
Expand All @@ -450,7 +461,7 @@ where
}
}

impl<R, S, F, Tm, H, Hsd> Server<R, S, F, Tm, H, Hsd>
impl<R, S, F, Tm, H, Hsd, Hep> Server<R, S, F, Tm, H, Hsd, Hep>
where
R: E2ERegistryHandle,
S: SubscriptionHandle,
Expand All @@ -459,6 +470,7 @@ where
Tm: Timer + Clone + 'static,
H: SocketHandle<Socket = F::Socket>,
Hsd: SdStateHandle,
Hep: EventPublisherHandle<R, S, H>,
{
/// Build the periodic-SD-announcement future.
///
Expand Down Expand Up @@ -694,10 +706,14 @@ where
Ok(())
}

/// Get the event publisher for sending events
/// Get a clone of the event-publisher handle for sending events.
///
/// Returns the [`EventPublisherHandle`] type — `Arc<EventPublisher<R,
/// S, H>>` for std users (the default `Hep`),
/// `&'static EventPublisher<R, S, H>` for bare-metal-no-alloc.
Comment on lines +709 to +713
Copy link

Copilot AI Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This accessor returns the generic Hep handle, but the surrounding docs imply it can always be used directly for publishing events. Since EventPublisherHandle doesn’t require Deref<Target = EventPublisher<...>>, callers using a custom handle type may need to explicitly borrow the inner publisher via the handle trait method before calling publish_event/etc. Consider adjusting the docs to reflect the guaranteed usage pattern for all Hep implementations (or constrain Hep with Deref if the direct-call ergonomics are intended).

Suggested change
/// Get a clone of the event-publisher handle for sending events.
///
/// Returns the [`EventPublisherHandle`] type — `Arc<EventPublisher<R,
/// S, H>>` for std users (the default `Hep`),
/// `&'static EventPublisher<R, S, H>` for bare-metal-no-alloc.
/// Get a clone of the configured event-publisher handle.
///
/// This returns the generic handle type `Hep` implementing
/// [`EventPublisherHandle`]. For the default handle types this is typically
/// `Arc<EventPublisher<R, S, H>>` on `std` and
/// `&'static EventPublisher<R, S, H>` on bare-metal-no-alloc.
///
/// Callers using a custom `Hep` implementation should use the
/// [`EventPublisherHandle`] trait to borrow/access the underlying
/// [`EventPublisher`] before calling publisher methods such as
/// `publish_event`.

Copilot uses AI. Check for mistakes.
#[must_use]
pub fn publisher(&self) -> Arc<EventPublisher<R, S, H>> {
Arc::clone(&self.publisher)
pub fn publisher(&self) -> Hep {
self.publisher.clone()
}

/// Get the local address of the unicast socket.
Expand Down Expand Up @@ -1219,7 +1235,7 @@ fn extract_subscriber_endpoint(
}
}

impl<R, S, F, Tm, H, Hsd> Server<R, S, F, Tm, H, Hsd>
impl<R, S, F, Tm, H, Hsd, Hep> Server<R, S, F, Tm, H, Hsd, Hep>
where
R: E2ERegistryHandle,
S: SubscriptionHandle,
Expand All @@ -1228,6 +1244,7 @@ where
Tm: Timer + Clone + 'static,
H: SocketHandle<Socket = F::Socket>,
Hsd: SdStateHandle,
Hep: EventPublisherHandle<R, S, H>,
{
/// Send `SubscribeAck` from an entry view
async fn send_subscribe_ack_from_view(
Expand Down