Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 31 additions & 16 deletions src/server/event_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use crate::UDP_BUFFER_SIZE;
use crate::e2e::E2EKey;
use crate::protocol::{Header, Message};
use crate::traits::{PayloadWireFormat, WireFormat};
use crate::transport::{E2ERegistryHandle, TransportSocket};
use crate::transport::{E2ERegistryHandle, SocketHandle, TransportSocket};
#[cfg(test)]
use alloc::sync::Arc;
use core::net::SocketAddrV4;
use heapless::Vec as HeaplessVec;
Expand All @@ -22,28 +23,42 @@ const _: () = assert!(

/// Publishes events to subscribers.
///
/// Generic over `T: TransportSocket` (the socket primitive — `TokioSocket`
/// in the std/tokio path, a bare-metal embassy / smoltcp wrapper on
/// firmware), `R: E2ERegistryHandle`, and `S: SubscriptionHandle`.
pub struct EventPublisher<R, S, T>
/// Generic over `H: SocketHandle` (abstracting the storage of the
/// transport socket — `Arc<TokioSocket>` in the std/tokio path,
/// `StaticSocketHandle<EmbassyNetSocket>` on bare metal, etc.),
/// `R: E2ERegistryHandle`, and `S: SubscriptionHandle`. The
/// underlying socket type is reachable as `H::Socket`.
///
/// Pre-19f revision: this type held an `Arc<T>` directly and required
/// `T: Send + Sync + 'static`. The handle indirection drops the
/// Send/Sync requirement so consumers with a `!Sync` socket — most
/// notably `embassy-net`'s `UdpSocket<'static>` — can still
/// construct an `EventPublisher`. Multi-threaded callers continue
/// to use `Arc<T>` (which is `Send + Sync` whenever `T` is) without
/// any change.
pub struct EventPublisher<R, S, H>
where
R: E2ERegistryHandle,
S: SubscriptionHandle,
T: TransportSocket + Send + Sync + 'static,
H: SocketHandle,
{
subscriptions: S,
socket: Arc<T>,
socket: H,
e2e_registry: R,
}

impl<R, S, T> EventPublisher<R, S, T>
impl<R, S, H> EventPublisher<R, S, H>
where
R: E2ERegistryHandle,
S: SubscriptionHandle,
T: TransportSocket + Send + Sync + 'static,
H: SocketHandle,
{
/// Create a new event publisher
pub fn new(subscriptions: S, socket: Arc<T>, e2e_registry: R) -> Self {
/// Create a new event publisher.
///
/// `socket` is whatever `SocketHandle` impl the caller chose for
/// storage — `Arc<T>` on std, `StaticSocketHandle<T>` on bare
/// metal.
pub fn new(subscriptions: S, socket: H, e2e_registry: R) -> Self {
Self {
subscriptions,
socket,
Expand Down Expand Up @@ -182,7 +197,7 @@ where
let mut sent_count = 0usize;
let mut last_err: Option<crate::transport::TransportError> = None;
for addr in &subscribers {
match self.socket.send_to(datagram, *addr).await {
match self.socket.socket().send_to(datagram, *addr).await {
Ok(()) => {
sent_count += 1;
tracing::trace!(
Expand Down Expand Up @@ -308,7 +323,7 @@ where
let mut sent_count = 0usize;
let mut last_err: Option<crate::transport::TransportError> = None;
for addr in &subscribers {
match self.socket.send_to(datagram, *addr).await {
match self.socket.socket().send_to(datagram, *addr).await {
Ok(()) => {
sent_count += 1;
}
Expand Down Expand Up @@ -454,7 +469,7 @@ mod tests {
/// into scope so tests can spell `TestEventPublisher` without
/// chasing the three-type-parameter signature on every call site.
type TestEventPublisher =
EventPublisher<Arc<Mutex<E2ERegistry>>, Arc<RwLock<SubscriptionManager>>, TokioSocket>;
EventPublisher<Arc<Mutex<E2ERegistry>>, Arc<RwLock<SubscriptionManager>>, Arc<TokioSocket>>;

fn test_registry() -> Arc<Mutex<E2ERegistry>> {
Arc::new(Mutex::new(E2ERegistry::new()))
Expand Down Expand Up @@ -628,7 +643,7 @@ mod tests {
let publisher: EventPublisher<
Arc<Mutex<E2ERegistry>>,
Arc<RwLock<SubscriptionManager>>,
AlwaysFailSocket,
Arc<AlwaysFailSocket>,
> = EventPublisher::new(subscriptions, Arc::new(AlwaysFailSocket), test_registry());

let msg = make_test_message();
Expand Down Expand Up @@ -689,7 +704,7 @@ mod tests {
let publisher: EventPublisher<
Arc<Mutex<E2ERegistry>>,
Arc<RwLock<SubscriptionManager>>,
AlwaysFailSocket,
Arc<AlwaysFailSocket>,
> = EventPublisher::new(subscriptions, Arc::new(AlwaysFailSocket), test_registry());

let err = publisher
Expand Down
Loading