diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 671c115..eab10af 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -51,6 +51,66 @@ jobs: - uses: Swatinem/rust-cache@v2 - uses: obi1kenobi/cargo-semver-checks-action@v2 + no_std_target: + # Cross-build for a true no_std target (cortex-m4f, no allocator, + # no std). This is the literal phase-18 gate from + # `bare_metal_plan_v3.md`: phases 4–17 shipped the trait surface + # and no-alloc primitives, but until this job is green the crate + # cannot actually be consumed on cortex-m. Each combination here + # is a separate `cargo build` so a failure surfaces the specific + # feature combo that regressed. + # + # `client + bare_metal` is verified alloc-free (no `__rust_alloc` + # symbols in the rlib); `server + bare_metal` and the combined + # build pull `extern crate alloc` for `Arc` / + # `Arc` and so do reference allocator symbols — that's + # documented in `lib.rs` and tracked for a future refactor. + name: no_std target build (thumbv7em-none-eabihf) + needs: check + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v6 + - uses: dtolnay/rust-toolchain@stable + with: + targets: thumbv7em-none-eabihf + - uses: Swatinem/rust-cache@v2 + - name: bare_metal alone + run: cargo build --target thumbv7em-none-eabihf --no-default-features --features bare_metal + - name: server + bare_metal + run: cargo build --target thumbv7em-none-eabihf --no-default-features --features server,bare_metal + - name: client + server + bare_metal + run: cargo build --target thumbv7em-none-eabihf --no-default-features --features client,server,bare_metal + # `client + bare_metal` runs LAST so the rlib in + # target/thumbv7em-none-eabihf/debug/ comes from this exact + # feature set when the alloc-symbol audit reads it. + - name: client + bare_metal + run: | + # Wipe the bare_metal-only artifact from earlier in this + # job so the audit step doesn't accidentally read it; then + # build fresh under client+bare_metal. + rm -f target/thumbv7em-none-eabihf/debug/libsimple_someip*.rlib + cargo build --target thumbv7em-none-eabihf --no-default-features --features client,bare_metal + - name: alloc-symbol audit (client + bare_metal must be alloc-free) + # If `client + bare_metal` ever starts pulling `__rust_alloc`, + # something inside the client engine has regressed onto an + # allocator-bound primitive. Fail loudly so it gets caught in + # the PR rather than discovered downstream. (`server` and + # `client+server` builds DO reference alloc symbols via + # `Arc` — documented; not gated here.) + run: | + rlib=$(find target/thumbv7em-none-eabihf -name 'libsimple_someip*.rlib' | head -1) + if [ -z "$rlib" ]; then + echo "::error::no simple_someip rlib found under target/thumbv7em-none-eabihf" + exit 1 + fi + alloc_refs=$(nm -A "$rlib" 2>/dev/null | grep -c -E '__rust_alloc|__rg_alloc' || true) + echo "client+bare_metal alloc-symbol references: $alloc_refs" + if [ "$alloc_refs" -ne 0 ]; then + echo "::error::client+bare_metal must be alloc-free; found $alloc_refs alloc references." + nm -A "$rlib" 2>/dev/null | grep -E '__rust_alloc|__rg_alloc' || true + exit 1 + fi + test: name: Build, Test & Coverage needs: check diff --git a/CHANGELOG.md b/CHANGELOG.md index c39d428..3e1973f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,8 +14,12 @@ - **`transport::Spawner` trait** (re-exported as `simple_someip::Spawner`) — executor-agnostic task-spawn abstraction. `tokio_transport::TokioSpawner` is the default `std + tokio` impl. - **`transport::LocalSpawner` trait** — single-threaded task-spawn abstraction for `!Send` futures. Enables use on runtimes like `tokio::LocalSet` or embassy's single-threaded executor. - **`transport::TransportSocket` / `TransportFactory` / `Timer` traits** — executor-agnostic UDP transport abstraction. Default `tokio_transport::TokioTransport` / `TokioSocket` / `TokioTimer` impls available behind the `client-tokio` / `server-tokio` features. -- **`bare_metal` cargo feature** — activates embassy-sync as the channel backend and enables the `static_channels` module, `AtomicInterfaceHandle`, and `StaticE2EHandle` types. The heap-backed `EmbassySyncChannels` factory is separately gated by the `embassy_channels` feature (which implies `bare_metal`). See `examples/bare_metal_client/` and `examples/bare_metal_server/` for runnable integration examples. Validate with `cargo build -p bare_metal_client` / `cargo build -p bare_metal_server`, NOT `cargo build --workspace` (workspace builds may unify features and mask regressions). +- **`bare_metal` cargo feature** — activates embassy-sync as the channel backend and enables the `static_channels` module, `AtomicInterfaceHandle`, `StaticE2EHandle`, and `StaticSubscriptionHandle` types. All four are pure `no_std` (no allocator required). The heap-backed `EmbassySyncChannels` factory is separately gated by the `embassy_channels` feature (which implies `bare_metal`). See `examples/bare_metal_client/` and `examples/bare_metal_server/` for runnable integration examples. Validate with `cargo build -p bare_metal_client` / `cargo build -p bare_metal_server`, NOT `cargo build --workspace` (workspace builds may unify features and mask regressions). - **`SubscriptionManager::subscribe` returning a `Result`** — see "Changed" below; the regression test list now exercises the major-version mismatch path explicitly. +- **`StaticSubscriptionHandle` + `StaticSubscriptionStorage`** — no-alloc `SubscriptionHandle` impl backed by `&'static BlockingMutex>`. The bare-metal counterpart to `Arc>`. `SubscriptionManager::new()` is now `const`, so the storage can live in a plain `static` (no `Box::leak`). Gated on `feature = "bare_metal"`, re-exported from `server::*`. +- **`server::Error::InvalidUsage(&'static str)`** — new variant for `Server` API misuse paths. Currently emitted with the tags `"passive_server_announcement_loop"`, `"announcement_loop_already_started"`, and `"passive_server_run"`. Replaces the previous `Error::Io(std::io::Error::new(InvalidInput, ..))` paths so these errors are reachable on no_std builds. +- **`E2ERegistryFull`** — new typed error returned by `E2ERegistry::register` (and propagated through `E2ERegistryHandle::register` / `Client::register_e2e` / `Server::register_e2e`) when the fixed-capacity registry is at its `E2E_REGISTRY_CAP` limit. Replacing an already-registered key still always succeeds. +- **`PayloadWireFormat::for_each_offered_endpoint` / `for_each_service_instance`** — visitor-pattern methods replacing the previous `Vec`-returning `offered_endpoints` / `service_instances`. Lets the `Client` run loop iterate SD entries without per-message heap allocation, which was the last bare-metal blocker on the receive path. The `Vec`-returning forms are preserved as `cfg(feature = "std")` convenience wrappers that delegate to the visitors, so std consumers keep the original ergonomic shape. ### Changed @@ -31,7 +35,19 @@ - **Breaking: `Server::new` type signature now `Server::::new`** — the `Server` struct gained type parameters for the pluggable backends. The tokio-default convenience constructor is now gated behind the `server-tokio` feature (was `server`). Migration: add `features = ["server-tokio"]` to continue using `Server::new`; trait-surface consumers use `Server::new_with_deps`. - **Breaking: `SubscriptionHandle` trait redesigned** — the previous `get_subscribers(&self, …) -> impl Future>` method has been replaced with `for_each_subscriber(&self, …, f: FnMut)` visitor pattern. This allows `EventPublisher::publish_event` to copy subscriber addresses into a stack buffer (`heapless::Vec<_, 16>`) instead of allocating per-event. Implementors of custom `SubscriptionHandle` must migrate. - **Breaking: `SubscriptionHandle` RPITIT futures no longer `+ Send`** — the `subscribe`, `unsubscribe`, and `for_each_subscriber` methods now return `impl Future<…>` without a `+ Send` bound. This enables single-threaded lock-free implementations on bare-metal targets, but means `SubscriptionHandle` trait objects cannot be held across `.await` points in multi-threaded executors. Direct usage with the default `Arc>` is unaffected. -- New optional dependency `dep:futures` (default-features-off) for `futures::select!` + `FusedFuture` plumbing — pulled in transitively by both `client` and `server` features. +- **Breaking: `client` and `server` features no longer imply `std`** — previously `client = ["std", "dep:futures"]` and `server = ["std", "dep:futures"]`; now `client = ["dep:futures-util"]` and `server = ["dep:futures-util"]`. The `std` feature moved to `client-tokio` / `server-tokio`, which is where it belongs (the tokio backends genuinely require std). Bare-metal trait-surface consumers (`features = ["client", "bare_metal"]`) compile in pure no_std now. `server` still pulls `extern crate alloc` because `Server` holds `Arc` and `EventPublisher` holds `Arc` — documented in `lib.rs`; refactor to `&'static` borrows is tracked for a future phase. +- **Breaking: optional dep `futures` replaced with `futures-util`** — direct dependency on `futures-util` with features `["async-await", "async-await-macro"]`. The `futures` umbrella crate's `select!` macro re-export is gated on its `std` feature, which transitively pulls `slab` / `memchr` / `futures-io` and breaks no_std cross-compiles. `futures-util` provides `select_biased!`, `pin_mut!`, and `FutureExt` under just `async-await(-macro)`. +- **Breaking: internal `select!` → `select_biased!`** — `Inner::run_future`, `socket_loop_future`, and `server::run` now poll their select arms top-first instead of pseudo-randomly. For these workloads the bias gives slightly better behavior (control messages, sends, and unicast recvs get priority over their lower-priority siblings) and there is no genuine starvation path because the higher-priority arms are sporadic. The change is observable only under contrived workloads where every arm is permanently ready simultaneously. +- **Breaking: `PayloadWireFormat::offered_endpoints` / `service_instances` replaced by visitor-pattern methods** — see `for_each_offered_endpoint` / `for_each_service_instance` in "Added" above. Implementors of custom `PayloadWireFormat` types must override the visitors instead of the `Vec`-returning forms. The `Vec`-returning forms remain as default-implemented `cfg(feature = "std")` convenience wrappers, so std callers' code keeps compiling unchanged. +- **Breaking: `PayloadWireFormat::new_subscription_sd_header` parameter type** — `client_ip` is now `core::net::Ipv4Addr` (was `std::net::Ipv4Addr`). The two are the same underlying type; the change unblocks no_std builds. Dropping the `#[cfg(feature = "std")]` gate on the method itself makes it reachable in pure no_std. +- **Breaking: `PayloadWireFormat::set_reboot_flag` no longer `cfg(feature = "std")`** — the method is now always available on the trait. Its default impl is still a no-op; downstream payload types that participate in SD reboot tracking must override it. +- **Breaking: `OfferedEndpoint` no longer `cfg(feature = "std")`** — type is always available; its `addr` field is `Option` (was `Option`). Same underlying type; allows no_std consumers to receive offered-endpoint visits. +- **Breaking: `server::Error::Io(std::io::Error)` now `cfg(feature = "std")`** — the variant is gated on `feature = "std"` because `std::io::Error` is itself std-only. No-std consumers receive transport failures via `Error::Transport(TransportError)` which carries the portable `IoErrorKind`. +- **Breaking: misuse paths on `Server::announcement_loop` / `Server::run` return `Error::InvalidUsage(...)`** — previously these returned `Error::Io(std::io::Error::new(InvalidInput, ..))` with a formatted message. The new variant is no_std-friendly and carries a machine-readable `&'static str` tag (`"passive_server_announcement_loop"`, `"announcement_loop_already_started"`, `"passive_server_run"`); the diagnostic moves to `tracing::warn!`. +- **Breaking: `server::SubscriptionManager::get_subscribers` now `cfg(feature = "std")`** — convenience accessor returning a heap `Vec`. Production code paths use `for_each_subscriber` (visitor) since 0.8.0; this accessor remains for std consumers' tests and ad-hoc tooling. No_std consumers must use `for_each_subscriber`. +- **Breaking: `server::ServiceInfo` / `server::EventGroupInfo` now `cfg(feature = "std")`** — both types' `pub` fields hold `Vec<...>`. Bare-metal consumers don't construct these types today; if the use case emerges, a future port will switch to `heapless::Vec`. `Subscriber` is unaffected and stays no_std. +- **Breaking: `E2ERegistry` API change** — backing storage migrated from `std::collections::HashMap` to `heapless::index_map::FnvIndexMap` (cap = `E2E_REGISTRY_CAP = 32`, exposed). `E2ERegistry::register` now returns `Result<(), E2ERegistryFull>`; replacing an already-registered key always succeeds, adding a new key past the cap returns `Err`. `E2ERegistry::new()` is now `const`. The module is no longer `cfg(feature = "std")` — `E2ERegistry` works in pure no_std. +- **Breaking: `E2ERegistryHandle::register` trait method now returns `Result<(), E2ERegistryFull>`** — propagates the new typed overflow from `E2ERegistry::register` through every handle impl. Callers (`Client::register_e2e`, `Server::register_e2e`) lift the `Result` through to their public surface. - `client::Error::Transport` adopts `#[error(transparent)]` Display delegation (the previous wrapping with `{:?}` debug-formatted the inner `TransportError`); user-facing error strings are now stable. - Subscribe-NACK reason strings normalized to `snake_case` for log consistency: `wrong_service_id`, `wrong_instance_id`, `wrong_major_version`, `no_endpoint_in_options`, `subscribers_per_group_full`, `event_groups_full`. Wire format is unchanged (NACK is signalled by `TTL=0`). @@ -47,6 +63,8 @@ ### Notes - **Crate version bumped to 0.8.0** — reflects the breaking changes above. Downstream `Cargo.toml` snippets in `README.md` were updated accordingly. +- **Bare-metal compile gate is now literal.** `cargo build --target thumbv7em-none-eabihf --no-default-features --features client,server,bare_metal` succeeds; `client + bare_metal` is verified alloc-free (zero `__rust_alloc` references in the resulting rlib). CI runs this matrix on every PR. The cortex-m4f target is the closest no_std proxy mainline Rust supports — the project's actual production target (Infineon AURIX TriCore) requires HighTec's commercial Rust distribution because mainline Rust + LLVM don't have a TriCore backend; a future phase will swap or layer in a TriCore CI runner once that infrastructure is in place. See `bare_metal_plan_v3.md`. +- **Known limitation: `server` feature pulls `extern crate alloc`.** `Server` holds `Arc` and `EventPublisher` holds `Arc`; both require an allocator. Pure no_std-without-allocator consumers can use the `client` feature alone (alloc-free) but will need a global allocator for the server side. A refactor to `&'static` borrows is on the v3 phase 21+ backlog. ### Test runner diff --git a/Cargo.lock b/Cargo.lock index 25f4daa..e20bf97 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7,6 +7,7 @@ name = "bare_metal_client" version = "0.0.0" dependencies = [ "critical-section", + "embassy-sync", "simple-someip", "tokio", ] @@ -16,6 +17,7 @@ name = "bare_metal_server" version = "0.0.0" dependencies = [ "critical-section", + "embassy-sync", "simple-someip", "tokio", ] @@ -109,42 +111,12 @@ dependencies = [ "embedded-io 0.6.1", ] -[[package]] -name = "futures" -version = "0.3.32" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b147ee9d1f6d097cef9ce628cd2ee62288d963e16fb287bd9286455b241382d" -dependencies = [ - "futures-channel", - "futures-core", - "futures-io", - "futures-sink", - "futures-task", - "futures-util", -] - -[[package]] -name = "futures-channel" -version = "0.3.32" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07bbe89c50d7a535e539b8c17bc0b49bdb77747034daa8087407d655f3f7cc1d" -dependencies = [ - "futures-core", - "futures-sink", -] - [[package]] name = "futures-core" version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" -[[package]] -name = "futures-io" -version = "0.3.32" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" - [[package]] name = "futures-macro" version = "0.3.32" @@ -174,15 +146,10 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6" dependencies = [ - "futures-channel", "futures-core", - "futures-io", "futures-macro", - "futures-sink", "futures-task", - "memchr", "pin-project-lite", - "slab", ] [[package]] @@ -232,12 +199,6 @@ version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" -[[package]] -name = "memchr" -version = "2.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" - [[package]] name = "mio" version = "1.2.0" @@ -305,7 +266,7 @@ dependencies = [ "critical-section", "embassy-sync", "embedded-io 0.7.1", - "futures", + "futures-util", "heapless 0.9.2", "socket2 0.5.10", "thiserror", @@ -314,12 +275,6 @@ dependencies = [ "tracing-subscriber", ] -[[package]] -name = "slab" -version = "0.4.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c790de23124f9ab44544d7ac05d60440adc586479ce501c1d6d7da3cd8c9cf5" - [[package]] name = "smallvec" version = "1.15.1" diff --git a/Cargo.toml b/Cargo.toml index bb25e8f..a86890b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,9 +28,15 @@ embedded-io = { version = "0.7" } # `select!` macro and `FutureExt::fuse` / `pin_mut!` helpers — used by # the client/server event loops in place of `tokio::select!`. Default # features disabled so we only pull in the parts we use. -futures = { version = "0.3", default-features = false, features = [ +# `futures-util` (not the `futures` umbrella) because the umbrella +# gates the `select!` macro re-export behind its `std` feature, and +# pulling that feature drags in `slab` / `memchr` / `futures-io` etc. +# which do not compile on no_std targets. `futures-util` itself +# provides `select!`, `pin_mut!`, `FutureExt::fuse`, and friends +# under just `async-await` (which is alloc-friendly, no_std-clean). +futures-util = { version = "0.3", default-features = false, features = [ "async-await", - "std", + "async-await-macro", ], optional = true } heapless = "0.9" socket2 = { version = "0.5", optional = true, features = ["all"] } @@ -63,17 +69,18 @@ std = ["embedded-io/std", "thiserror/std", "tracing/std"] # `ChannelFactory` / `TransportFactory` impls). Consumers who want the # `Client::new` shortcut (defaulting to `TokioSpawner` / `TokioTimer` / # `TokioChannels` / `TokioTransport`) enable `client-tokio`. -client = ["std", "dep:futures"] -client-tokio = ["client", "dep:tokio", "dep:socket2"] +client = ["dep:futures-util"] +client-tokio = ["client", "std", "dep:tokio", "dep:socket2"] # Feature split (matches the client side): `server` exposes the -# trait-surface server (no tokio, no socket2). The engine itself uses -# `futures::select!` so `dep:futures` lives here. `server-tokio` adds -# the tokio + socket2 convenience defaults (`Server::new`, -# `Server::new_with_loopback`, `Server::new_passive`), bringing -# `Arc>` / `Arc>` / -# / `TokioTransport` / `TokioTimer` defaults into scope. -server = ["std", "dep:futures"] -server-tokio = ["server", "dep:tokio", "dep:socket2"] +# trait-surface server (no tokio, no socket2, no std). The engine +# itself uses `futures::select!` so `dep:futures` lives here. +# `server-tokio` adds the tokio + socket2 convenience defaults +# (`Server::new`, `Server::new_with_loopback`, `Server::new_passive`), +# bringing `Arc>` / `Arc>` / +# / `TokioTransport` / `TokioTimer` defaults into scope, and forces +# `std`. +server = ["dep:futures-util"] +server-tokio = ["server", "std", "dep:tokio", "dep:socket2"] # Marks a build as intended for bare-metal / no_std consumption. # Activates embassy-sync as the channel backend, the `static_channels` # module, `AtomicInterfaceHandle`, and `StaticE2EHandle`. diff --git a/README.md b/README.md index a8ef040..e5c2a43 100644 --- a/README.md +++ b/README.md @@ -53,15 +53,15 @@ simple-someip = { version = "0.8", features = ["client-tokio", "server-tokio"] } | Feature | Default | Description | |---------|---------|-------------| -| `std` | **yes** | Enables `thiserror`, `tracing`, and `embedded-io/std` | -| `client` | no | Client trait surface; implies `std` + futures (no tokio) | -| `client-tokio` | no | Adds `Client::new` / `TokioSpawner` / `TokioTransport` defaults; implies `client` + tokio + socket2 | -| `server` | no | Server trait surface; implies `std` + futures (no tokio) | -| `server-tokio` | no | Adds `Server::new` / `TokioTimer` / `TokioTransport` defaults; implies `server` + tokio + socket2 | -| `bare_metal` | no | Activates embassy-sync, no-alloc `static_channels` module, `AtomicInterfaceHandle`, and `StaticE2EHandle`. See `examples/bare_metal_client` and `examples/bare_metal_server`; verify with `cargo build -p bare_metal_client` (NOT `cargo build --workspace`, which can unify features). | +| `std` | **yes** | Enables `thiserror`, `tracing`, and `embedded-io/std`. The `Arc>` / `Arc>` default lock-handle impls (used by the tokio backends) live behind this gate. | +| `client` | no | Client trait surface. Pure `no_std`-clean (does not pull `extern crate alloc`). Caller supplies trait impls for transport / channels / spawner / timer / lock handles. | +| `client-tokio` | no | Adds `Client::new` / `TokioSpawner` / `TokioTransport` defaults; implies `client` + std + tokio + socket2. | +| `server` | no | Server trait surface. Pulls `extern crate alloc` (for `Arc` / `Arc`); on no_std, downstream consumers must provide a `#[global_allocator]`. | +| `server-tokio` | no | Adds `Server::new` / `TokioTimer` / `TokioTransport` defaults; implies `server` + std + tokio + socket2. | +| `bare_metal` | no | Activates embassy-sync, no-alloc `static_channels` module, `AtomicInterfaceHandle`, `StaticE2EHandle`, and `StaticSubscriptionHandle` — all five pure `no_std` (no allocator required). See `examples/bare_metal_client` and `examples/bare_metal_server`; verify with `cargo build -p bare_metal_client` (NOT `cargo build --workspace`, which can unify features). | | `embassy_channels` | no | Heap-backed `EmbassySyncChannels` (implies `bare_metal` + `alloc`). Useful for tests before sizing static pools. | -By default the crate enables `std`. To use in a `no_std` environment (e.g., embedded targets), disable default features with `default-features = false`. In that mode the `protocol`, `traits`, `transport`, and `e2e` modules are available; `client` / `server` (and their `tokio_transport` backend) are not. Most applications only need one of `client` or `server`. +By default the crate enables `std`. To use in a `no_std` environment (e.g., embedded targets), disable default features with `default-features = false`. In that mode the `protocol`, `traits`, `transport`, and `e2e` modules are always available; `client` / `server` are usable too (the trait surfaces compile in pure no_std), but the tokio convenience defaults (`Client::new`, `Server::new`) live behind `client-tokio` / `server-tokio` and require std. The `cargo build --target thumbv7em-none-eabihf --no-default-features --features client,server,bare_metal` cross-build is verified in CI on every PR. ## Quick Start diff --git a/examples/bare_metal_client/Cargo.toml b/examples/bare_metal_client/Cargo.toml index 844497a..8908c7b 100644 --- a/examples/bare_metal_client/Cargo.toml +++ b/examples/bare_metal_client/Cargo.toml @@ -10,8 +10,21 @@ publish = false # executor and mock driver; real firmware would use embassy_executor or # a similar bare-metal async runtime instead. [dependencies] -simple-someip = { path = "../..", default-features = false, features = ["client", "bare_metal"] } +# `std` enabled here so the example can use the std-only `RawPayload` +# convenience type. Real firmware drops `"std"` and provides its own +# `PayloadWireFormat` implementation (RawPayload uses heap `Vec` for +# its SD-header storage and is unsuitable for true no_std). The +# `client + bare_metal` shape — pure no_std-clean trait surface — is +# verified by the cortex-m4f cross-build in CI; this host example +# additionally exercises the runtime end-to-end. +simple-someip = { path = "../..", default-features = false, features = ["std", "client", "bare_metal"] } tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] } # Provides the host platform critical-section implementation required by # embassy-sync (pulled in via simple-someip's bare_metal feature). critical-section = { version = "1", features = ["std"] } +# Used directly by this example's `static StaticE2EStorage` +# declaration to spell the `BlockingMutex>` type. Version pin matches what +# simple-someip's `bare_metal` feature pulls transitively (so we +# don't accidentally fork the dep tree). +embassy-sync = "0.6" diff --git a/examples/bare_metal_client/src/main.rs b/examples/bare_metal_client/src/main.rs index db910fb..383841a 100644 --- a/examples/bare_metal_client/src/main.rs +++ b/examples/bare_metal_client/src/main.rs @@ -26,28 +26,34 @@ //! | Channel factory | `BareMetalChannels` via `define_static_channels!` | same macro, sized to your HWM | //! | Transport | `MockFactory` / `MockSocket` | `embassy_net`, smoltcp, custom Ethernet ISR | //! | Timer | `MockTimer` using `tokio::time::sleep` | `embassy_time::Timer::after` | -//! | Task spawner | `TokioBackedSpawner` | `embassy_executor::Spawner` | -//! | Lock handles | `Arc>` / `Arc>` | stack-allocated handles (see below) | +//! | Task spawner | `TokioBackedSpawner` wrapping `tokio::spawn` | `embassy_executor::Spawner` | +//! | E2E registry handle | `StaticE2EHandle` over `&'static StaticE2EStorage` | same — already firmware-ready | +//! | Interface handle | `AtomicInterfaceHandle` over `&'static AtomicU32` | same — already firmware-ready | //! -//! # What is not yet demonstrated -//! -//! The `E2ERegistry` and interface handles still use heap-allocated -//! `Arc>` / `Arc>` wrappers. A future verification -//! pass will replace these with stack-allocated alternatives and confirm -//! zero heap allocation after `Client::new_with_deps` returns. +//! All five handle/factory types except `Transport` and `Timer` are the +//! actual `no_std` types you'd ship — `Static*` / +//! `Atomic*` over `&'static` storage. The transport and timer are +//! mocks because the example runs on the host; firmware swaps them +//! for embassy-net + embassy-time. `RawPayload` is std-only (it uses +//! a heap `Vec` for SD storage); a true firmware build provides its +//! own `PayloadWireFormat` impl. //! //! [`Client::new_with_deps`]: simple_someip::Client::new_with_deps //! [`ChannelFactory`]: simple_someip::transport::ChannelFactory +use core::cell::RefCell; use core::future::Future; use core::net::{Ipv4Addr, SocketAddrV4}; use core::pin::Pin; +use core::sync::atomic::AtomicU32; use core::task::{Context, Poll}; use core::time::Duration; use std::collections::VecDeque; use std::sync::{Arc, Mutex}; +use embassy_sync::blocking_mutex::Mutex as BlockingMutex; +use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; use simple_someip::client::Error as ClientError; use simple_someip::client::{ClientUpdate, ControlMessage, ReceivedMessage, SendMessage}; use simple_someip::define_static_channels; @@ -57,6 +63,7 @@ use simple_someip::transport::{ ReceivedDatagram, SocketOptions, Spawner, Timer, TransportError, TransportFactory, TransportSocket, }; +use simple_someip::{AtomicInterfaceHandle, StaticE2EHandle, StaticE2EStorage}; use simple_someip::{Client, ClientDeps, RawPayload}; // ── Static-pool channel factory ─────────────────────────────────────── @@ -82,6 +89,21 @@ define_static_channels! { ], } +// ── Bare-metal lock-handle storage ──────────────────────────────────── +// +// `&'static` storage for the no-alloc lock handles. `E2ERegistry::new()` +// is `const`, so the storage lives in plain `static`s — no `Box::leak` +// required. On real firmware you'd write the same `static` declarations +// in boot code. + +static E2E_STORAGE: StaticE2EStorage = + BlockingMutex::>::new(RefCell::new( + E2ERegistry::new(), + )); + +// 127.0.0.1 packed as a big-endian u32. +static IFACE_STORAGE: AtomicU32 = AtomicU32::new(0x7F00_0001); + // ── Mock transport ──────────────────────────────────────────────────── // // Two queues simulate the network. A real firmware transport drives @@ -257,18 +279,17 @@ async fn main() { next_port: Arc::new(Mutex::new(0)), }; - // std Arc/Mutex/RwLock are sufficient here — they implement the - // E2ERegistryHandle / InterfaceHandle lock-handle traits and are - // gated by `feature = "std"`, not by `client-tokio`. A future - // no-alloc port replaces these with stack-allocated handles. - let e2e: Arc> = Arc::new(Mutex::new(E2ERegistry::new())); - let iface: Arc> = - Arc::new(std::sync::RwLock::new(Ipv4Addr::LOCALHOST)); + // Bare-metal lock handles: both pure no_std (no allocator), each + // backed by a `&'static` storage. The `static`s themselves are + // declared at module scope (see top of file) — clippy::pedantic + // dislikes `static` after `let` statements. + let e2e = StaticE2EHandle::new(&E2E_STORAGE); + let iface = AtomicInterfaceHandle::new(&IFACE_STORAGE); let (client, _updates, run_fut) = Client::< RawPayload, - Arc>, - Arc>, + StaticE2EHandle, + AtomicInterfaceHandle, BareMetalChannels, >::new_with_deps( ClientDeps { diff --git a/examples/bare_metal_server/Cargo.toml b/examples/bare_metal_server/Cargo.toml index 4847af6..6d57a15 100644 --- a/examples/bare_metal_server/Cargo.toml +++ b/examples/bare_metal_server/Cargo.toml @@ -10,8 +10,20 @@ publish = false # executor and mock driver; real firmware would use embassy_executor or # a similar bare-metal async runtime instead. [dependencies] -simple-someip = { path = "../..", default-features = false, features = ["server", "bare_metal"] } +# `std` enabled here because the example uses `tokio::spawn` for the +# announcement-loop driver and tokio requires std. The `server + +# bare_metal` shape — std-droppable trait surface (`server` itself +# does not imply std as of 0.8.0) — is verified by the cortex-m4f +# cross-build in CI; this host example additionally exercises the +# runtime end-to-end. +simple-someip = { path = "../..", default-features = false, features = ["std", "server", "bare_metal"] } tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] } # Provides the host platform critical-section implementation required by # embassy-sync (pulled in via simple-someip's bare_metal feature). critical-section = { version = "1", features = ["std"] } +# Used directly by this example's `static StaticE2EStorage` / +# `static StaticSubscriptionStorage` declarations to spell the +# `BlockingMutex>` types. The +# version pin matches what simple-someip's `bare_metal` feature pulls +# transitively (so we don't accidentally fork the dep tree). +embassy-sync = "0.6" diff --git a/examples/bare_metal_server/src/main.rs b/examples/bare_metal_server/src/main.rs index db0037f..1a5e46c 100644 --- a/examples/bare_metal_server/src/main.rs +++ b/examples/bare_metal_server/src/main.rs @@ -25,18 +25,19 @@ //! |---------|-------------|----------------------| //! | Transport | `MockFactory` / `MockSocket` | `embassy_net`, smoltcp, custom Ethernet ISR | //! | Timer | `MockTimer` using `tokio::time::sleep` | `embassy_time::Timer::after` | -//! | Subscription table | `MockSubscriptions` | `heapless`-backed table behind a CS mutex | -//! | Lock handle | `Arc>` | stack-allocated handle (see below) | +//! | Subscription table | `StaticSubscriptionHandle` over `&'static StaticSubscriptionStorage` | same — already firmware-ready | +//! | E2E registry | `StaticE2EHandle` over `&'static StaticE2EStorage` | same — already firmware-ready | //! -//! # What is not yet demonstrated -//! -//! The `E2ERegistry` handle still uses a heap-allocated `Arc>`. -//! A future verification pass will replace this with a stack-allocated -//! alternative and confirm zero heap allocation after -//! `Server::new_with_deps` returns. +//! Both handles are pure `no_std` (no allocator required) and use a +//! `&'static` critical-section mutex around the underlying state, which +//! is the firmware-target shape. `E2ERegistry::new()` and +//! `SubscriptionManager::new()` are both `const`, so the storage lives +//! in plain `static` declarations at module scope (see `E2E_STORAGE` +//! and `SUBS_STORAGE` near the top of this file). //! //! [`Server::new_with_deps`]: simple_someip::Server::new_with_deps +use core::cell::RefCell; use core::future::Future; use core::net::{Ipv4Addr, SocketAddrV4}; use core::pin::Pin; @@ -47,12 +48,34 @@ use std::collections::VecDeque; use std::sync::{Arc, Mutex}; use std::vec::Vec; +use embassy_sync::blocking_mutex::Mutex as BlockingMutex; +use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; use simple_someip::e2e::E2ERegistry; -use simple_someip::server::{ServerConfig, SubscribeError, Subscriber, SubscriptionHandle}; +use simple_someip::server::{ + ServerConfig, StaticSubscriptionHandle, StaticSubscriptionStorage, SubscriptionManager, +}; use simple_someip::transport::{ ReceivedDatagram, SocketOptions, Timer, TransportError, TransportFactory, TransportSocket, }; -use simple_someip::{Server, ServerDeps}; +use simple_someip::{Server, ServerDeps, StaticE2EHandle, StaticE2EStorage}; + +// ── Bare-metal lock-handle storage ──────────────────────────────────── +// +// `&'static` storage for the no-alloc lock handles. Both +// `E2ERegistry::new()` and `SubscriptionManager::new()` are `const`, +// so the storage lives in plain `static`s — no `Box::leak` required. +// On real firmware you'd write the same `static` declarations in +// boot code. + +static E2E_STORAGE: StaticE2EStorage = + BlockingMutex::>::new(RefCell::new( + E2ERegistry::new(), + )); + +static SUBS_STORAGE: StaticSubscriptionStorage = BlockingMutex::< + CriticalSectionRawMutex, + RefCell, +>::new(RefCell::new(SubscriptionManager::new())); // ── Mock transport ──────────────────────────────────────────────────── // @@ -204,82 +227,6 @@ impl Timer for MockTimer { } } -// ── Mock SubscriptionHandle ─────────────────────────────────────────── -// -// On `server-tokio`, `Arc>` is the built-in -// impl. Bare-metal callers supply their own. A real firmware impl would -// back this with a `critical_section::Mutex>` or -// `spin::Mutex<_>` over a `heapless`-backed table; here we use -// `std::sync::Mutex` over a `Vec` because the example runs on the host. -// The trait impl itself is the portable pattern — only the concurrency -// primitive and storage type change on firmware. - -type SubKey = (u16, u16, u16, SocketAddrV4); - -#[derive(Clone, Default)] -struct MockSubscriptions(Arc>>); - -impl SubscriptionHandle for MockSubscriptions { - fn subscribe( - &self, - service_id: u16, - instance_id: u16, - event_group_id: u16, - subscriber_addr: SocketAddrV4, - ) -> impl Future> + '_ { - let inner = Arc::clone(&self.0); - async move { - let mut guard = inner.lock().unwrap(); - let key = (service_id, instance_id, event_group_id, subscriber_addr); - if !guard.contains(&key) { - guard.push(key); - } - Ok(()) - } - } - - fn unsubscribe( - &self, - service_id: u16, - instance_id: u16, - event_group_id: u16, - subscriber_addr: SocketAddrV4, - ) -> impl Future + '_ { - let inner = Arc::clone(&self.0); - async move { - inner - .lock() - .unwrap() - .retain(|e| *e != (service_id, instance_id, event_group_id, subscriber_addr)); - } - } - - fn for_each_subscriber<'a, F>( - &'a self, - service_id: u16, - instance_id: u16, - event_group_id: u16, - mut f: F, - ) -> impl Future + 'a - where - F: FnMut(&Subscriber) + 'a, - { - let inner = Arc::clone(&self.0); - async move { - let guard = inner.lock().unwrap(); - let mut count = 0; - for (s, i, e, addr) in guard.iter() { - if *s == service_id && *i == instance_id && *e == event_group_id { - let sub = Subscriber::new(*addr, *s, *i, *e); - f(&sub); - count += 1; - } - } - count - } - } -} - // ── Main ────────────────────────────────────────────────────────────── // current_thread matches a single-core bare-metal executor; yields are @@ -293,27 +240,30 @@ async fn main() { next_port: Arc::new(Mutex::new(0)), }; - // std Arc/Mutex implements E2ERegistryHandle and is gated by - // `feature = "std"`, not `server-tokio`. A future no-alloc port - // replaces this with a stack-allocated handle. - let e2e: Arc> = Arc::new(Mutex::new(E2ERegistry::new())); - let subs = MockSubscriptions::default(); + // Bare-metal lock handles: both StaticE2EHandle and + // StaticSubscriptionHandle are pure no_std (alloc-free) and back + // their state with a `&'static` critical-section mutex. The + // `static` storages themselves live at module scope (see top of + // file) — clippy::pedantic dislikes `static` after `let`. + let e2e = StaticE2EHandle::new(&E2E_STORAGE); + let subs = StaticSubscriptionHandle::new(&SUBS_STORAGE); // service_id=0x1234, instance_id=1, bound to LOCALHOST:30490. let config = ServerConfig::new(Ipv4Addr::LOCALHOST, 30490, 0x1234, 1); - let server = Server::< - Arc>, - MockSubscriptions, - MockFactory, - MockTimer, - >::new_with_deps( - ServerDeps { factory, timer: MockTimer, e2e_registry: e2e, subscriptions: subs }, - config, - false, // multicast_loopback - ) - .await - .expect("Server::new_with_deps failed"); + let server = + Server::::new_with_deps( + ServerDeps { + factory, + timer: MockTimer, + e2e_registry: e2e, + subscriptions: subs, + }, + config, + false, // multicast_loopback + ) + .await + .expect("Server::new_with_deps failed"); // The announcement loop periodically multicasts SD OfferService // entries so clients on the network can discover this service. diff --git a/src/client/inner.rs b/src/client/inner.rs index b6c6674..c75a54f 100644 --- a/src/client/inner.rs +++ b/src/client/inner.rs @@ -1,9 +1,8 @@ use core::future; use core::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use core::task::Poll; -use futures::{FutureExt, pin_mut, select}; +use futures_util::{FutureExt, pin_mut, select_biased}; use heapless::{Deque, index_map::FnvIndexMap}; -use std::borrow::ToOwned; #[cfg(all(test, feature = "client-tokio"))] use std::sync::{Arc, Mutex}; use tracing::{debug, error, info, trace, warn}; @@ -84,8 +83,8 @@ pub enum ControlMessage { ForceSdSessionWrappedForTest(bool, C::OneshotSender>), } -impl std::fmt::Debug for ControlMessage { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl core::fmt::Debug for ControlMessage { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { match self { Self::SetInterface(addr, _) => f.debug_tuple("SetInterface").field(addr).finish(), Self::BindDiscovery(_) => f.write_str("BindDiscovery"), @@ -363,10 +362,10 @@ pub(super) struct Inner< phantom: core::marker::PhantomData, } -impl std::fmt::Debug +impl core::fmt::Debug for Inner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { f.debug_struct("Inner") .field("interface", &self.interface) .field("session_tracker", &self.session_tracker) @@ -379,7 +378,7 @@ impl Inner where - PayloadDefinitions: PayloadWireFormat + Clone + std::fmt::Debug + Send + 'static, + PayloadDefinitions: PayloadWireFormat + Clone + core::fmt::Debug + Send + 'static, Tm: Timer + 'static, R: E2ERegistryHandle, C: ChannelFactory, @@ -591,7 +590,7 @@ where let received = result?; let someip_header = received.message.header().clone(); if let Some(sd_header) = received.message.sd_header() { - Ok((received.source, someip_header, sd_header.to_owned())) + Ok((received.source, someip_header, Clone::clone(sd_header))) } else { Err(Error::UnexpectedDiscoveryMessage(someip_header)) } @@ -616,7 +615,7 @@ where return future::pending().await; } - std::future::poll_fn(|cx| { + core::future::poll_fn(|cx| { // Collect ports of any sockets that report `Ready(None)` // (loop has exited). Evict them after the iteration so we // do not mutate the map while iterating it. @@ -1077,7 +1076,7 @@ where // arm check order each poll so no single arm can // starve the others under sustained load. Matches // the original `tokio::select!` fairness behavior. - select! { + select_biased! { // Receive a control message ctrl = control_fut => { if let Some(ctrl) = ctrl { @@ -1122,7 +1121,7 @@ where // detection works for all SD traffic (FindService, // Subscribe, SubscribeAck, etc.). let mut rebooted = false; - for (svc_id, inst_id) in sd_payload.service_instances() { + sd_payload.for_each_service_instance(|svc_id, inst_id| { let verdict = session_tracker.check( source, TransportKind::Multicast, @@ -1134,11 +1133,11 @@ where if verdict == SessionVerdict::Reboot { rebooted = true; } - } + }); // Auto-populate service registry from offer/stop-offer // SD entries. - for ep in sd_payload.offered_endpoints() { + sd_payload.for_each_offered_endpoint(|ep| { let id = ServiceInstanceId { service_id: ep.service_id, instance_id: ep.instance_id, @@ -1175,7 +1174,7 @@ where ep.service_id, ep.instance_id, ); } - } + }); if rebooted { let _ = update_sender.send_now(ClientUpdate::SenderRebooted(source)); @@ -1290,7 +1289,7 @@ mod tests { #[test] fn reject_with_capacity_notifies_every_sender() { use crate::transport::OneshotCancelled; - use futures::FutureExt; + use futures_util::FutureExt; fn expect_capacity(rx: F, label: &str) where @@ -1462,7 +1461,7 @@ mod tests { /// alive so a future unicast reply can resolve it. #[tokio::test] async fn track_or_reject_pending_response_inserts_when_room_available() { - use futures::FutureExt; + use futures_util::FutureExt; let mut inner = make_inner_for_test(); let (tx, rx) = oneshot::channel::>(); @@ -1556,7 +1555,7 @@ mod tests { /// caller gets a clean `Result` instead of a panicking `RecvError`. #[tokio::test] async fn track_or_reject_pending_response_completes_displaced_sender() { - use futures::FutureExt; + use futures_util::FutureExt; let mut inner = make_inner_for_test(); let key: u32 = 0xCAFE_F00D; diff --git a/src/client/mod.rs b/src/client/mod.rs index a7881e8..4aa28f4 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -93,8 +93,8 @@ pub struct PendingResponse { receiver: C::OneshotReceiver>, } -impl std::fmt::Debug for PendingResponse { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl core::fmt::Debug for PendingResponse { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { f.debug_struct("PendingResponse").finish_non_exhaustive() } } @@ -128,8 +128,8 @@ pub struct DiscoveryMessage { pub sd_header: P::SdHeader, } -impl std::fmt::Debug for DiscoveryMessage

{ - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl core::fmt::Debug for DiscoveryMessage

{ + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { f.debug_struct("DiscoveryMessage") .field("source", &self.source) .field("someip_header", &self.someip_header) @@ -159,8 +159,8 @@ pub enum ClientUpdate { Error(Error), } -impl std::fmt::Debug for ClientUpdate

{ - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl core::fmt::Debug for ClientUpdate

{ + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { match self { Self::DiscoveryUpdated(msg) => f.debug_tuple("DiscoveryUpdated").field(msg).finish(), Self::SenderRebooted(addr) => f.debug_tuple("SenderRebooted").field(addr).finish(), @@ -187,10 +187,10 @@ pub struct ClientUpdates>, } -impl std::fmt::Debug +impl core::fmt::Debug for ClientUpdates { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { f.debug_struct("ClientUpdates").finish_non_exhaustive() } } @@ -260,14 +260,14 @@ pub struct Client< e2e_registry: R, } -impl std::fmt::Debug for Client +impl core::fmt::Debug for Client where MessageDefinitions: PayloadWireFormat + Send + 'static, R: E2ERegistryHandle, I: InterfaceHandle, C: ChannelFactory, { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { f.debug_struct("Client") .field("interface", &self.interface.get()) .finish_non_exhaustive() @@ -284,7 +284,7 @@ where impl Client>, Arc>, TokioChannels> where - MessageDefinitions: PayloadWireFormat + Clone + std::fmt::Debug + 'static, + MessageDefinitions: PayloadWireFormat + Clone + core::fmt::Debug + 'static, { /// Creates a new client bound to the given network interface and returns its run-loop future to be driven by the caller. /// @@ -417,7 +417,7 @@ where /// Methods available on all `Client` regardless of handle types. impl Client where - MessageDefinitions: PayloadWireFormat + Clone + std::fmt::Debug + Send + 'static, + MessageDefinitions: PayloadWireFormat + Clone + core::fmt::Debug + Send + 'static, R: E2ERegistryHandle, I: InterfaceHandle, C: ChannelFactory, @@ -930,14 +930,27 @@ where /// `Err(Error::Shutdown)` after the run-loop has exited; the /// registry is still accessible via any held `Client` clone. /// + /// # Errors + /// + /// Returns [`crate::e2e::E2ERegistryFull`] when the underlying + /// registry has no room for a new key. Replacing the profile of an + /// already-registered key always succeeds. Bare-metal users sizing + /// their E2E registry should set + /// [`crate::e2e::E2E_REGISTRY_CAP`]-equivalent storage to their + /// workload's high-water mark. + /// /// # Panics /// /// May panic if the underlying [`E2ERegistryHandle`] /// implementation panics (e.g., `Arc>` on mutex poison). /// /// [`E2ERegistryHandle`]: crate::transport::E2ERegistryHandle - pub fn register_e2e(&self, key: E2EKey, profile: E2EProfile) { - self.e2e_registry.register(key, profile); + pub fn register_e2e( + &self, + key: E2EKey, + profile: E2EProfile, + ) -> Result<(), crate::e2e::E2ERegistryFull> { + self.e2e_registry.register(key, profile) } /// Remove E2E configuration for the given key. @@ -966,7 +979,7 @@ where #[cfg(feature = "client-tokio")] impl Client where - MessageDefinitions: PayloadWireFormat + Clone + std::fmt::Debug + 'static, + MessageDefinitions: PayloadWireFormat + Clone + core::fmt::Debug + 'static, R: E2ERegistryHandle, I: InterfaceHandle, { @@ -1373,7 +1386,9 @@ mod tests { method_or_event_id: 0x0001, }; let profile = E2EProfile::Profile4(crate::e2e::Profile4Config::new(42, 10)); - client.register_e2e(key, profile); + client + .register_e2e(key, profile) + .expect("E2E registry has capacity for one entry"); client.unregister_e2e(&key); client.shut_down(); } diff --git a/src/client/session.rs b/src/client/session.rs index 268b0b2..558ad06 100644 --- a/src/client/session.rs +++ b/src/client/session.rs @@ -1,6 +1,6 @@ use crate::protocol::sd::RebootFlag; +use core::net::SocketAddr; use heapless::index_map::FnvIndexMap; -use std::net::SocketAddr; /// Max number of distinct `(sender, transport, service, instance)` tuples tracked /// for reboot detection. Must be a power of two (heapless `FnvIndexMap` diff --git a/src/client/socket_manager.rs b/src/client/socket_manager.rs index 6fdad5d..e57c322 100644 --- a/src/client/socket_manager.rs +++ b/src/client/socket_manager.rs @@ -52,11 +52,11 @@ use crate::{ }; use super::error::Error; -use futures::{FutureExt, pin_mut, select}; -use std::{ +use core::{ net::{Ipv4Addr, SocketAddr, SocketAddrV4}, task::{Context, Poll}, }; +use futures_util::{FutureExt, pin_mut, select_biased}; use tracing::{debug, error, info, trace, warn}; /// A received message together with the source address it came from. @@ -80,10 +80,10 @@ pub struct SendMessage { response: C::OneshotSender>, } -impl std::fmt::Debug +impl core::fmt::Debug for SendMessage { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { f.debug_struct("SendMessage") .field("target_addr", &self.target_addr) .field("message", &self.message) @@ -133,10 +133,10 @@ pub struct SocketManager session_has_wrapped: bool, } -impl std::fmt::Debug +impl core::fmt::Debug for SocketManager { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { f.debug_struct("SocketManager") .field("local_port", &self.local_port) .field("session_id", &self.session_id) @@ -584,7 +584,7 @@ where let send_fut = MpscRecv::recv(&mut tx_rx).fuse(); let recv_fut = socket.recv_from(&mut buf).fuse(); pin_mut!(send_fut, recv_fut); - select! { + select_biased! { message = send_fut => Outcome::Send(message), result = recv_fut => Outcome::Recv(result), } @@ -1047,7 +1047,8 @@ mod tests { let message_id = MessageId::new_from_service_and_method(0x1234, 0x5678); let key = E2EKey::from_message_id(message_id); let mut reg = E2ERegistry::new(); - reg.register(key, E2EProfile::Profile4(Profile4Config::new(0, 15))); + reg.register(key, E2EProfile::Profile4(Profile4Config::new(0, 15))) + .expect("E2E registry has capacity for one entry"); let e2e_registry = Arc::new(Mutex::new(reg)); let mut sm = SocketManager::::bind(0, e2e_registry) diff --git a/src/e2e/mod.rs b/src/e2e/mod.rs index 02a52b6..dd1c2d8 100644 --- a/src/e2e/mod.rs +++ b/src/e2e/mod.rs @@ -29,7 +29,6 @@ mod crc; mod e2e_checker; mod e2e_protector; mod error; -#[cfg(feature = "std")] mod registry; mod state; @@ -40,8 +39,7 @@ pub use e2e_protector::{ protect_profile5_with_header, }; pub use error::Error; -#[cfg(feature = "std")] -pub use registry::E2ERegistry; +pub use registry::{E2E_REGISTRY_CAP, E2ERegistry, E2ERegistryFull}; pub use state::{Profile4State, Profile5State}; /// Status result from E2E check operations. @@ -161,7 +159,6 @@ impl E2EKey { } /// Internal E2E state, one per registered key. -#[cfg(feature = "std")] #[derive(Debug, Clone)] pub(crate) enum E2EState { /// State for Profile 4. @@ -170,7 +167,6 @@ pub(crate) enum E2EState { Profile5(Profile5State), } -#[cfg(feature = "std")] impl E2EState { pub(crate) fn from_profile(profile: &E2EProfile) -> Self { match profile { @@ -184,7 +180,6 @@ impl E2EState { /// Run the appropriate E2E check for the given profile, returning the status /// and the best available payload slice (stripped on success, original on error). -#[cfg(feature = "std")] pub(crate) fn e2e_check<'a>( profile: &E2EProfile, state: &mut E2EState, @@ -212,7 +207,6 @@ pub(crate) fn e2e_check<'a>( /// # Errors /// /// Returns [`Error::BufferTooSmall`] if `output` cannot hold the protected payload. -#[cfg(feature = "std")] pub(crate) fn e2e_protect( profile: &E2EProfile, state: &mut E2EState, diff --git a/src/e2e/registry.rs b/src/e2e/registry.rs index 7a7c39b..30c7dfe 100644 --- a/src/e2e/registry.rs +++ b/src/e2e/registry.rs @@ -1,31 +1,86 @@ //! E2E configuration registry for runtime E2E management. +//! +//! Backed by [`heapless::index_map::FnvIndexMap`] so the registry is +//! `no_std`-compatible and allocates no heap memory after construction. +//! The capacity is bounded at compile time to [`E2E_REGISTRY_CAP`]; the +//! registry rejects further registrations once that cap is reached +//! rather than silently dropping or growing — see [`E2ERegistry::register`] +//! and [`E2ERegistryFull`]. -use std::collections::HashMap; +use heapless::index_map::FnvIndexMap; use super::{E2ECheckStatus, E2EKey, E2EProfile, E2EState, Error, e2e_check, e2e_protect}; -/// Registry mapping message keys to E2E profile configurations and state. +/// Maximum number of distinct `(key → profile)` bindings the registry +/// can hold. Sized for typical workloads where a single service +/// instance has at most a few dozen E2E-protected message types. +/// +/// Must be a power of two for [`FnvIndexMap`]; the `const _` assertion +/// below catches any future change that would violate the requirement. +pub const E2E_REGISTRY_CAP: usize = 32; + +const _: () = assert!( + E2E_REGISTRY_CAP.is_power_of_two(), + "E2E_REGISTRY_CAP must be a power of two for heapless::FnvIndexMap" +); + +/// Returned by [`E2ERegistry::register`] when the registry is at +/// capacity. +/// +/// The contained value is the cap that was hit (i.e. +/// [`E2E_REGISTRY_CAP`]); kept in the error so log lines and panic +/// messages name the constant the user can adjust. +#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)] +#[error("e2e registry at capacity ({0})")] +pub struct E2ERegistryFull(pub usize); + +/// Registry mapping message keys to E2E profile configurations and +/// the per-key counter / sequence state. +/// +/// `no_std`-friendly: backed by a fixed-capacity +/// [`FnvIndexMap`] so construction and the entire lifetime of the +/// registry are heap-free. Construction is `const`, so a `static` +/// instance can be declared in firmware boot code. #[derive(Debug)] pub struct E2ERegistry { - map: HashMap, + map: FnvIndexMap, } impl E2ERegistry { - /// Create an empty registry. + /// Create an empty registry. `const`-constructible so it can live + /// in `static` storage on bare-metal targets. #[must_use] - pub fn new() -> Self { + pub const fn new() -> Self { Self { - map: HashMap::new(), + map: FnvIndexMap::new(), } } /// Register an E2E profile for the given key, creating fresh state. - pub fn register(&mut self, key: E2EKey, profile: E2EProfile) { + /// + /// Replacing the profile of an already-registered key always + /// succeeds (the existing slot is reused). Adding a new key when + /// the registry already holds [`E2E_REGISTRY_CAP`] entries returns + /// [`Err(E2ERegistryFull)`](E2ERegistryFull); the caller is + /// responsible for sizing the cap to its workload's high-water + /// mark. + /// + /// # Errors + /// + /// [`E2ERegistryFull`] when the registry is full and `key` is not + /// already present. + pub fn register(&mut self, key: E2EKey, profile: E2EProfile) -> Result<(), E2ERegistryFull> { let state = E2EState::from_profile(&profile); - self.map.insert(key, (profile, state)); + // `FnvIndexMap::insert` returns `Err((K, V))` only when the + // map is full AND `key` is not already present (replacing an + // existing entry never overflows). + match self.map.insert(key, (profile, state)) { + Ok(_) => Ok(()), + Err(_) => Err(E2ERegistryFull(E2E_REGISTRY_CAP)), + } } - /// Remove E2E configuration for the given key. + /// Remove E2E configuration for the given key. No-op if absent. pub fn unregister(&mut self, key: &E2EKey) { self.map.remove(key); } @@ -85,8 +140,9 @@ mod tests { fn register_and_check_profile4() { let mut reg = E2ERegistry::new(); let key = make_key(); - let config = Profile4Config::new(0x1234_5678, 15); - reg.register(key, E2EProfile::Profile4(config.clone())); + let config = Profile4Config::new(0x12345678, 15); + reg.register(key, E2EProfile::Profile4(config.clone())) + .expect("register fits within E2E_REGISTRY_CAP"); assert!(reg.contains_key(&key)); // Protect a payload @@ -108,7 +164,8 @@ mod tests { let mut reg = E2ERegistry::new(); let key = make_key(); let config = Profile5Config::new(0x1234, 20, 15); - reg.register(key, E2EProfile::Profile5(config)); + reg.register(key, E2EProfile::Profile5(config)) + .expect("register fits within E2E_REGISTRY_CAP"); let mut payload = [0u8; 20]; payload[..5].copy_from_slice(b"Hello"); @@ -136,7 +193,8 @@ mod tests { fn unregister_removes_key() { let mut reg = E2ERegistry::new(); let key = make_key(); - reg.register(key, E2EProfile::Profile4(Profile4Config::new(0, 15))); + reg.register(key, E2EProfile::Profile4(Profile4Config::new(0, 15))) + .expect("register fits within E2E_REGISTRY_CAP"); assert!(reg.contains_key(&key)); reg.unregister(&key); assert!(!reg.contains_key(&key)); @@ -147,4 +205,52 @@ mod tests { let reg = E2ERegistry::default(); assert!(!reg.contains_key(&make_key())); } + + /// Replacing the profile of an already-registered key MUST succeed + /// even when the registry is at capacity — the slot is reused, not + /// added. Regression guard for the FnvIndexMap "full + missing key" + /// branch. + #[test] + fn register_replacement_succeeds_when_full() { + let mut reg = E2ERegistry::new(); + for i in 0..E2E_REGISTRY_CAP { + let key = E2EKey::new(0x1000 + u16::try_from(i).unwrap(), 0); + reg.register(key, E2EProfile::Profile4(Profile4Config::new(0, 15))) + .expect("filling to cap"); + } + // Re-register the first key with a different profile — must succeed. + let key0 = E2EKey::new(0x1000, 0); + let result = reg.register(key0, E2EProfile::Profile4(Profile4Config::new(42, 15))); + assert!( + result.is_ok(), + "replacing an existing entry must succeed even at capacity" + ); + } + + /// Adding a new key beyond the cap MUST return + /// `Err(E2ERegistryFull(E2E_REGISTRY_CAP))` and leave the registry + /// otherwise unchanged. Regression test that locks in the + /// capacity contract documented on `register`. + #[test] + fn register_overflow_returns_err_and_does_not_mutate() { + let mut reg = E2ERegistry::new(); + for i in 0..E2E_REGISTRY_CAP { + reg.register( + E2EKey::new(0x2000 + u16::try_from(i).unwrap(), 0), + E2EProfile::Profile4(Profile4Config::new(0, 15)), + ) + .expect("filling to cap"); + } + // The (cap+1)-th distinct key must be rejected. + let overflow_key = E2EKey::new(0xFFFE, 0); + let err = reg + .register( + overflow_key, + E2EProfile::Profile4(Profile4Config::new(0, 15)), + ) + .expect_err("registering the (cap+1)-th key must overflow"); + assert_eq!(err, E2ERegistryFull(E2E_REGISTRY_CAP)); + // And the rejected key must NOT be present. + assert!(!reg.contains_key(&overflow_key)); + } } diff --git a/src/lib.rs b/src/lib.rs index 39991af..c429cdd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,12 +26,12 @@ //! //! | Feature | Default | Description | //! |---------|---------|-------------| -//! | `std` | yes | Enables std-dependent helpers (`RawPayload`, `VecSdHeader`, `OfferedEndpoint`) | -//! | `client` | no | Trait-surface client; implies `std` + futures (no tokio) | -//! | `client-tokio` | no | Adds the `Client::new` / `TokioSpawner` / `TokioTransport` convenience defaults; implies `client` + tokio + socket2 | -//! | `server` | no | Trait-surface server; implies `std` + futures (no tokio) | -//! | `server-tokio` | no | Adds the `Server::new` / `TokioTransport` / `TokioTimer` convenience defaults; implies `server` + tokio + socket2 | -//! | `bare_metal` | no | Activates embassy-sync, the `static_channels` module (no-alloc `ChannelFactory`), and `AtomicInterfaceHandle`. `StaticE2EHandle` additionally requires `std` because the underlying `E2ERegistry` is currently `std`-only. See `examples/bare_metal_client/` and `examples/bare_metal_server/` for runnable bare-metal integration examples. | +//! | `std` | yes | Enables std-dependent helpers (`RawPayload`, `VecSdHeader`) and the `Arc>` / `Arc>` default lock-handle impls used by the tokio backends. | +//! | `client` | no | Trait-surface client. Pure `no_std`-clean (does not pull `extern crate alloc`). Caller supplies `Spawner` / `Timer` / `ChannelFactory` / `TransportFactory` / `E2ERegistryHandle` / `InterfaceHandle` impls. | +//! | `client-tokio` | no | Adds the `Client::new` / `TokioSpawner` / `TokioTransport` convenience defaults; implies `client` + std + tokio + socket2. | +//! | `server` | no | Trait-surface server. Pulls `extern crate alloc` (for `Arc` / `Arc`); on `no_std`, downstream consumers must provide a `#[global_allocator]`. | +//! | `server-tokio` | no | Adds the `Server::new` / `TokioTransport` / `TokioTimer` convenience defaults; implies `server` + std + tokio + socket2. | +//! | `bare_metal` | no | Activates embassy-sync, the `static_channels` module (no-alloc `ChannelFactory`), `AtomicInterfaceHandle`, `StaticE2EHandle`, and `StaticSubscriptionHandle`. All five are pure `no_std` (no allocator required). See `examples/bare_metal_client/` and `examples/bare_metal_server/` for runnable bare-metal integration examples. | //! | `embassy_channels` | no | Heap-backed `EmbassySyncChannels` `ChannelFactory`. Implies `bare_metal` and pulls `extern crate alloc;` into the crate; **on `no_std`, downstream consumers must provide a `#[global_allocator]`**. Useful for tests / early prototypes before sizing static pools. | //! //! The default feature set is `["std"]`, which links `std` and enables @@ -109,11 +109,22 @@ #[cfg(feature = "std")] extern crate std; -// `embassy_channels` needs `alloc` for `EmbassySyncChannels`'s -// `Arc>` storage (the heap-backed bare-metal channel -// primitive). The `static_channels` module does NOT need alloc — users -// who only enable `bare_metal` (without `embassy_channels`) get no-alloc. -#[cfg(feature = "embassy_channels")] +// `alloc` is required by: +// - `embassy_channels` — `EmbassySyncChannels` heap-allocates an +// `Arc>` per oneshot/bounded/unbounded. +// - `server` — `EventPublisher` and the `Server` struct hold +// `Arc>` / `Arc` for sharing +// between the run loop and external publishing tasks. A +// future refactor may switch to `&'static` borrows so the +// server compiles in pure no_std without an allocator; +// tracked in `bare_metal_plan_v3.md` Phase 21+ backlog. +// +// The `static_channels` module (under `bare_metal` alone) does +// NOT need alloc — users wanting `client` + `bare_metal` without +// allocator get the no-alloc oneshot/mpsc primitives via the +// macro. Pure `bare_metal` without `client` / `server` / +// `embassy_channels` also stays alloc-free. +#[cfg(any(feature = "embassy_channels", feature = "server"))] extern crate alloc; /// Maximum size, in bytes, of UDP payloads for `client` / `server` send @@ -214,5 +225,5 @@ pub use transport::{ MpscSend, OneshotCancelled, OneshotRecv, OneshotSend, ReceivedDatagram, SocketOptions, Spawner, Timer, TransportError, TransportFactory, TransportSocket, UnboundedRecv, UnboundedSend, }; -#[cfg(all(feature = "bare_metal", feature = "std"))] +#[cfg(feature = "bare_metal")] pub use transport::{StaticE2EHandle, StaticE2EStorage}; diff --git a/src/raw_payload.rs b/src/raw_payload.rs index 6533f53..dc7f48d 100644 --- a/src/raw_payload.rs +++ b/src/raw_payload.rs @@ -175,49 +175,49 @@ impl PayloadWireFormat for RawPayload { header.flags = sd::Flags::new(bool::from(reboot), header.flags.unicast()); } - fn offered_endpoints(&self) -> Vec { + fn for_each_offered_endpoint(&self, mut f: F) + where + F: FnMut(crate::OfferedEndpoint), + { let header = match &self.kind { RawPayloadKind::Sd(header) => header, - RawPayloadKind::Raw(_) => return Vec::new(), + RawPayloadKind::Raw(_) => return, }; - header - .entries - .iter() - .filter_map(|entry| match entry { - sd::Entry::OfferService(svc) | sd::Entry::StopOfferService(svc) => { - let is_offer = matches!(entry, sd::Entry::OfferService(_)); - let addr = sd::extract_ipv4_endpoint(&header.options); - Some(crate::OfferedEndpoint { - service_id: svc.service_id, - instance_id: svc.instance_id, - major_version: svc.major_version, - minor_version: svc.minor_version, - addr, - is_offer, - }) - } - _ => None, - }) - .collect() + for entry in &header.entries { + if let sd::Entry::OfferService(svc) | sd::Entry::StopOfferService(svc) = entry { + let is_offer = matches!(entry, sd::Entry::OfferService(_)); + let addr = sd::extract_ipv4_endpoint(&header.options); + f(crate::OfferedEndpoint { + service_id: svc.service_id, + instance_id: svc.instance_id, + major_version: svc.major_version, + minor_version: svc.minor_version, + addr, + is_offer, + }); + } + } } - fn service_instances(&self) -> Vec<(u16, u16)> { + fn for_each_service_instance(&self, mut f: F) + where + F: FnMut(u16, u16), + { let header = match &self.kind { RawPayloadKind::Sd(header) => header, - RawPayloadKind::Raw(_) => return Vec::new(), + RawPayloadKind::Raw(_) => return, }; - header - .entries - .iter() - .map(|entry| match entry { + for entry in &header.entries { + let (svc, inst) = match entry { sd::Entry::FindService(svc) | sd::Entry::OfferService(svc) | sd::Entry::StopOfferService(svc) => (svc.service_id, svc.instance_id), sd::Entry::SubscribeEventGroup(eg) | sd::Entry::SubscribeAckEventGroup(eg) => { (eg.service_id, eg.instance_id) } - }) - .collect() + }; + f(svc, inst); + } } } diff --git a/src/server/error.rs b/src/server/error.rs index 7b6a187..65ec6ec 100644 --- a/src/server/error.rs +++ b/src/server/error.rs @@ -11,6 +11,12 @@ pub enum Error { #[error(transparent)] Protocol(#[from] crate::protocol::Error), /// An I/O error from the underlying network transport. + /// + /// Gated on `feature = "std"` because [`std::io::Error`] is itself + /// std-only. Bare-metal consumers receive transport-layer + /// failures through [`Self::Transport`] instead, which carries a + /// portable [`crate::transport::IoErrorKind`]. + #[cfg(feature = "std")] #[error(transparent)] Io(#[from] std::io::Error), /// A transport-layer error from a [`crate::transport::TransportFactory`] @@ -27,6 +33,19 @@ pub enum Error { /// tags: `"udp_buffer"` (→ `crate::UDP_BUFFER_SIZE`). #[error("internal capacity exceeded: {0}")] Capacity(&'static str), + /// A `Server` API was called in a way that violates its + /// preconditions. The argument is a `&'static str` tag naming the + /// misuse; current tags: + /// - `"passive_server_announcement_loop"` — `announcement_loop` + /// was called on a server constructed via `new_passive`. Passive + /// servers have no real SD socket bound to port 30490, so any + /// announcements would go out with an incorrect source port. + /// Drive announcements from the client side instead. + /// - `"announcement_loop_already_started"` — `announcement_loop` + /// was called twice on the same server. Two announcement + /// futures cannot share the same SD socket and session counter. + #[error("invalid server usage: {0}")] + InvalidUsage(&'static str), } impl From for Error { diff --git a/src/server/event_publisher.rs b/src/server/event_publisher.rs index 3bb850e..fbcb4b3 100644 --- a/src/server/event_publisher.rs +++ b/src/server/event_publisher.rs @@ -7,9 +7,9 @@ use crate::e2e::E2EKey; use crate::protocol::{Header, Message}; use crate::traits::{PayloadWireFormat, WireFormat}; use crate::transport::{E2ERegistryHandle, TransportSocket}; +use alloc::sync::Arc; use core::net::SocketAddrV4; use heapless::Vec as HeaplessVec; -use std::sync::Arc; /// The publish snapshot buffer is sized to `SUBSCRIBERS_PER_GROUP` so /// `for_each_subscriber` can never overflow it. If a future refactor @@ -394,7 +394,7 @@ where service_id: u16, instance_id: u16, event_group_id: u16, - subscriber_addr: std::net::SocketAddrV4, + subscriber_addr: core::net::SocketAddrV4, ) -> Result<(), crate::server::SubscribeError> { self.subscriptions .subscribe(service_id, instance_id, event_group_id, subscriber_addr) @@ -416,7 +416,7 @@ where service_id: u16, instance_id: u16, event_group_id: u16, - subscriber_addr: std::net::SocketAddrV4, + subscriber_addr: core::net::SocketAddrV4, ) { self.subscriptions .unsubscribe(service_id, instance_id, event_group_id, subscriber_addr) @@ -514,7 +514,7 @@ mod tests { // Create a receiver socket to act as subscriber let receiver = UdpSocket::bind("127.0.0.1:0").await.unwrap(); - let std::net::SocketAddr::V4(recv_addr) = receiver.local_addr().unwrap() else { + let core::net::SocketAddr::V4(recv_addr) = receiver.local_addr().unwrap() else { panic!("expected v4 source address"); }; @@ -772,7 +772,8 @@ mod tests { let message_id = MessageId::new_from_service_and_method(0x5B, 0x8001); let key = E2EKey::from_message_id(message_id); let mut reg = E2ERegistry::new(); - reg.register(key, E2EProfile::Profile4(Profile4Config::new(0, 15))); + reg.register(key, E2EProfile::Profile4(Profile4Config::new(0, 15))) + .expect("E2E registry has capacity for one entry"); let e2e_registry = Arc::new(Mutex::new(reg)); // Pre-register a subscriber so we don't short-circuit on the @@ -825,7 +826,7 @@ mod tests { let subscriptions = Arc::new(RwLock::new(SubscriptionManager::new())); let receiver = UdpSocket::bind("127.0.0.1:0").await.unwrap(); - let std::net::SocketAddr::V4(recv_addr) = receiver.local_addr().unwrap() else { + let core::net::SocketAddr::V4(recv_addr) = receiver.local_addr().unwrap() else { panic!("expected v4 source address"); }; diff --git a/src/server/mod.rs b/src/server/mod.rs index 04b2d84..40dadd4 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -14,7 +14,11 @@ mod subscription_manager; pub use error::Error; pub use event_publisher::EventPublisher; -pub use service_info::{EventGroupInfo, ServiceInfo, Subscriber}; +pub use service_info::Subscriber; +#[cfg(feature = "std")] +pub use service_info::{EventGroupInfo, ServiceInfo}; +#[cfg(feature = "bare_metal")] +pub use subscription_manager::{StaticSubscriptionHandle, StaticSubscriptionStorage}; pub use subscription_manager::{SubscribeError, SubscriptionHandle, SubscriptionManager}; use sd_state::SdStateManager; @@ -25,15 +29,11 @@ use crate::Timer; use crate::e2e::{E2EKey, E2EProfile}; use crate::protocol::sd::{self, Entry, Flags, OptionsCount, ServiceEntry, TransportProtocol}; use crate::transport::{E2ERegistryHandle, SocketOptions, TransportFactory, TransportSocket}; -use futures::{FutureExt, pin_mut, select}; +use alloc::sync::Arc; +use core::net::{Ipv4Addr, SocketAddrV4}; +use futures_util::{FutureExt, pin_mut, select_biased}; #[cfg(test)] use std::vec::Vec; -use std::{ - format, - net::{Ipv4Addr, SocketAddrV4}, - sync::Arc, - vec, -}; #[cfg(feature = "server-tokio")] use crate::e2e::E2ERegistry; @@ -476,30 +476,26 @@ where &self, ) -> Result + Send + 'static, Error> { if self.is_passive { - return Err(Error::Io(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - format!( - "announcement_loop called on passive Server for service 0x{:04X}; \ - announcements must be driven externally (e.g. via \ - `simple_someip::Client::sd_announcements_loop`)", - self.config.service_id - ), - ))); + tracing::warn!( + "announcement_loop called on passive Server for service 0x{:04X}; \ + announcements must be driven externally (e.g. via \ + `simple_someip::Client::sd_announcements_loop`)", + self.config.service_id + ); + return Err(Error::InvalidUsage("passive_server_announcement_loop")); } if self .announcement_loop_started .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) .is_err() { - return Err(Error::Io(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - format!( - "announcement_loop already started for service 0x{:04X}; \ - two announcement futures cannot share the same SD socket \ - and session counter", - self.config.service_id - ), - ))); + tracing::warn!( + "announcement_loop already started for service 0x{:04X}; \ + two announcement futures cannot share the same SD socket \ + and session counter", + self.config.service_id + ); + return Err(Error::InvalidUsage("announcement_loop_already_started")); } let config = self.config.clone(); let sd_socket = Arc::clone(&self.sd_socket); @@ -540,7 +536,7 @@ where } /// Send a unicast `OfferService` to a specific address (in response to `FindService`) - async fn send_unicast_offer(&self, target: std::net::SocketAddr) -> Result<(), Error> { + async fn send_unicast_offer(&self, target: core::net::SocketAddr) -> Result<(), Error> { use crate::protocol::Header as SomeIpHeader; use crate::traits::WireFormat; @@ -599,9 +595,9 @@ where /// # Errors /// /// Returns an error if the socket's local address cannot be retrieved. - pub fn unicast_local_addr(&self) -> Result { + pub fn unicast_local_addr(&self) -> Result { match self.unicast_socket.local_addr() { - Ok(v4) => Ok(std::net::SocketAddr::V4(v4)), + Ok(v4) => Ok(core::net::SocketAddr::V4(v4)), Err(e) => Err(Error::Transport(e)), } } @@ -615,8 +611,18 @@ where /// /// Once registered, outgoing events published via [`EventPublisher::publish_event`] /// will have E2E protection applied automatically. - pub fn register_e2e(&self, key: E2EKey, profile: E2EProfile) { - self.e2e_registry.register(key, profile); + /// + /// # Errors + /// + /// Returns [`crate::e2e::E2ERegistryFull`] when the underlying + /// registry has no room for a new key. Replacing the profile of an + /// already-registered key always succeeds. + pub fn register_e2e( + &self, + key: E2EKey, + profile: E2EProfile, + ) -> Result<(), crate::e2e::E2ERegistryFull> { + self.e2e_registry.register(key, profile) } /// Remove E2E configuration for the given key. @@ -643,16 +649,14 @@ where use crate::protocol::MessageView; if self.is_passive { - return Err(Error::Io(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - format!( - "run called on passive Server for service 0x{:04X}; \ - SD receive must be driven externally (e.g. via the \ - Client's discovery socket, routing Subscribes to \ - `EventPublisher::register_subscriber`)", - self.config.service_id - ), - ))); + tracing::warn!( + "run called on passive Server for service 0x{:04X}; \ + SD receive must be driven externally (e.g. via the \ + Client's discovery socket, routing Subscribes to \ + `EventPublisher::register_subscriber`)", + self.config.service_id + ); + return Err(Error::InvalidUsage("passive_server_run")); } // Incoming-peer buffers sized to the IP datagram limit (64 KiB - 1). @@ -664,8 +668,8 @@ where // 1500 by an undersized buffer. Out-going `EventPublisher` paths // do use the smaller `UDP_BUFFER_SIZE` because we control the // wire size of what we emit; that asymmetry is intentional. - let mut unicast_buf = vec![0u8; 65535]; - let mut sd_buf = vec![0u8; 65535]; + let mut unicast_buf = alloc::vec![0u8; 65535]; + let mut sd_buf = alloc::vec![0u8; 65535]; loop { // `select!` (not `select_biased!`) gives pseudo-random fairness @@ -691,12 +695,12 @@ where let unicast_fut = self.unicast_socket.recv_from(&mut unicast_buf).fuse(); let sd_fut = self.sd_socket.recv_from(&mut sd_buf).fuse(); pin_mut!(unicast_fut, sd_fut); - select! { + select_biased! { result = unicast_fut => { let datagram = result?; ( datagram.bytes_received, - std::net::SocketAddr::V4(datagram.source), + core::net::SocketAddr::V4(datagram.source), "unicast", true, ) @@ -705,7 +709,7 @@ where let datagram = result?; ( datagram.bytes_received, - std::net::SocketAddr::V4(datagram.source), + core::net::SocketAddr::V4(datagram.source), "sd-multicast", false, ) @@ -774,7 +778,7 @@ where async fn handle_sd_message( &mut self, sd_view: &sd::SdHeaderView<'_>, - sender: std::net::SocketAddr, + sender: core::net::SocketAddr, ) -> Result<(), Error> { tracing::trace!("Handling SD message from {}", sender); @@ -976,17 +980,17 @@ where } } -/// Convert a [`std::net::SocketAddr`] into a [`SocketAddrV4`] for the +/// Convert a [`core::net::SocketAddr`] into a [`SocketAddrV4`] for the /// transport layer. SOME/IP-SD is IPv4-only at this layer; if a V6 /// address ever surfaces here it indicates a misconfiguration upstream /// (a V6 socket binding the SD port, or a V6 source address surfaced /// by a transport that should not produce one). Returns /// [`TransportError::Unsupported`](crate::transport::TransportError::Unsupported) /// in that case so the caller can log and drop the message instead of panicking. -fn socket_addr_v4(addr: std::net::SocketAddr) -> Result { +fn socket_addr_v4(addr: core::net::SocketAddr) -> Result { match addr { - std::net::SocketAddr::V4(v4) => Ok(v4), - std::net::SocketAddr::V6(_) => Err(Error::Transport( + core::net::SocketAddr::V4(v4) => Ok(v4), + core::net::SocketAddr::V6(_) => Err(Error::Transport( crate::transport::TransportError::Unsupported, )), } @@ -1072,7 +1076,7 @@ where async fn send_subscribe_ack_from_view( &self, entry_view: &sd::EntryView<'_>, - subscriber: std::net::SocketAddr, + subscriber: core::net::SocketAddr, ) -> Result<(), Error> { use crate::protocol::Header as SomeIpHeader; use crate::traits::WireFormat; @@ -1120,7 +1124,7 @@ where async fn send_subscribe_nack_from_view( &self, entry_view: &sd::EntryView<'_>, - subscriber: std::net::SocketAddr, + subscriber: core::net::SocketAddr, reason: &str, ) -> Result<(), Error> { use crate::protocol::Header as SomeIpHeader; @@ -1177,6 +1181,7 @@ mod tests { use crate::traits::WireFormat; use std::format; use std::net::IpAddr; + use std::vec; use tokio::net::UdpSocket; /// Type alias bringing the tokio-flavor concrete type parameters back @@ -1327,7 +1332,7 @@ mod tests { ); let view = MessageView::parse(&bytes).expect("parse Subscribe"); let sd_view = view.sd_header().expect("Subscribe has SD header"); - let sender = std::net::SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 45000)); + let sender = core::net::SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 45000)); // The H3 fix: handle_sd_message must NOT bubble the ACK send // failure as Err — it logs and continues. @@ -1360,16 +1365,15 @@ mod tests { .expect("first announcement_loop call must succeed"); let second = server.announcement_loop(); match second { - Err(Error::Io(io_err)) => { - assert_eq!(io_err.kind(), std::io::ErrorKind::InvalidInput); - let msg = format!("{io_err}"); - assert!( - msg.contains("already started"), - "expected the diagnostic to say 'already started', got: {msg}" - ); + Err(Error::InvalidUsage(tag)) => { + assert_eq!(tag, "announcement_loop_already_started"); } Ok(_) => panic!("second announcement_loop must error, got Ok"), - Err(other) => panic!("expected Error::Io(InvalidInput), got {other:?}"), + Err(other) => { + panic!( + "expected Error::InvalidUsage(\"announcement_loop_already_started\"), got {other:?}" + ) + } } } @@ -1432,8 +1436,8 @@ mod tests { .await .expect("Failed to create server"); let port = match server.unicast_local_addr().unwrap() { - std::net::SocketAddr::V4(addr) => addr.port(), - std::net::SocketAddr::V6(_) => panic!("expected IPv4 address"), + core::net::SocketAddr::V4(addr) => addr.port(), + core::net::SocketAddr::V6(_) => panic!("expected IPv4 address"), }; // Update config to reflect actual bound port server.set_local_port(port); @@ -1502,7 +1506,7 @@ mod tests { let mut buf = vec![0u8; 65535]; let datagram = server.unicast_socket.recv_from(&mut buf).await.unwrap(); let len = datagram.bytes_received; - let addr = std::net::SocketAddr::V4(datagram.source); + let addr = core::net::SocketAddr::V4(datagram.source); let data = &buf[..len]; let view = MessageView::parse(data).unwrap(); let sd_view = view.sd_header().unwrap(); @@ -1556,7 +1560,7 @@ mod tests { let mut buf = vec![0u8; 65535]; let datagram = server.unicast_socket.recv_from(&mut buf).await.unwrap(); let len = datagram.bytes_received; - let addr = std::net::SocketAddr::V4(datagram.source); + let addr = core::net::SocketAddr::V4(datagram.source); let data = &buf[..len]; let view = MessageView::parse(data).unwrap(); let sd_view = view.sd_header().unwrap(); @@ -1607,7 +1611,7 @@ mod tests { let mut buf = vec![0u8; 65535]; let datagram = server.unicast_socket.recv_from(&mut buf).await.unwrap(); let len = datagram.bytes_received; - let addr = std::net::SocketAddr::V4(datagram.source); + let addr = core::net::SocketAddr::V4(datagram.source); let data = &buf[..len]; let view = MessageView::parse(data).unwrap(); let sd_view = view.sd_header().unwrap(); @@ -1656,7 +1660,7 @@ mod tests { let mut buf = vec![0u8; 65535]; let datagram = server.unicast_socket.recv_from(&mut buf).await.unwrap(); let len = datagram.bytes_received; - let addr = std::net::SocketAddr::V4(datagram.source); + let addr = core::net::SocketAddr::V4(datagram.source); let data = &buf[..len]; let view = MessageView::parse(data).unwrap(); let sd_view = view.sd_header().unwrap(); @@ -1708,7 +1712,7 @@ mod tests { let mut buf = vec![0u8; 65535]; let datagram = server.unicast_socket.recv_from(&mut buf).await.unwrap(); let len = datagram.bytes_received; - let addr = std::net::SocketAddr::V4(datagram.source); + let addr = core::net::SocketAddr::V4(datagram.source); let data = &buf[..len]; let view = MessageView::parse(data).unwrap(); let sd_view = view.sd_header().unwrap(); @@ -1757,7 +1761,7 @@ mod tests { let mut buf = vec![0u8; 65535]; let datagram = server.unicast_socket.recv_from(&mut buf).await.unwrap(); let len = datagram.bytes_received; - let addr = std::net::SocketAddr::V4(datagram.source); + let addr = core::net::SocketAddr::V4(datagram.source); let data = &buf[..len]; let view = MessageView::parse(data).unwrap(); let sd_view = view.sd_header().unwrap(); @@ -1799,7 +1803,7 @@ mod tests { let mut buf = vec![0u8; 65535]; let datagram = server.unicast_socket.recv_from(&mut buf).await.unwrap(); let len = datagram.bytes_received; - let addr = std::net::SocketAddr::V4(datagram.source); + let addr = core::net::SocketAddr::V4(datagram.source); let data = &buf[..len]; let view = MessageView::parse(data).unwrap(); let sd_view = view.sd_header().unwrap(); @@ -1878,8 +1882,8 @@ mod tests { let (mut server, server_port) = create_test_server(0x5B, 1).await; let client_socket = UdpSocket::bind("127.0.0.1:0").await.unwrap(); let client_port = match client_socket.local_addr().unwrap() { - std::net::SocketAddr::V4(a) => a.port(), - std::net::SocketAddr::V6(_) => panic!("expected v4 source address"), + core::net::SocketAddr::V4(a) => a.port(), + core::net::SocketAddr::V6(_) => panic!("expected v4 source address"), }; let subscriptions = Arc::clone(&server.subscriptions); @@ -1947,8 +1951,8 @@ mod tests { let (mut server, server_port) = create_test_server(0x5B, 1).await; let client_socket = UdpSocket::bind("127.0.0.1:0").await.unwrap(); let client_port = match client_socket.local_addr().unwrap() { - std::net::SocketAddr::V4(a) => a.port(), - std::net::SocketAddr::V6(_) => panic!("expected v4 source address"), + core::net::SocketAddr::V4(a) => a.port(), + core::net::SocketAddr::V6(_) => panic!("expected v4 source address"), }; let subscriptions = Arc::clone(&server.subscriptions); @@ -2053,7 +2057,7 @@ mod tests { let mut buf = vec![0u8; 65535]; let datagram = server.unicast_socket.recv_from(&mut buf).await.unwrap(); let len = datagram.bytes_received; - let addr = std::net::SocketAddr::V4(datagram.source); + let addr = core::net::SocketAddr::V4(datagram.source); let data = &buf[..len]; let view = MessageView::parse(data).unwrap(); let sd_view = view.sd_header().unwrap(); @@ -2351,7 +2355,7 @@ mod tests { .expect("timeout receiving combined SD packet") .unwrap(); let len = datagram.bytes_received; - let sender = std::net::SocketAddr::V4(datagram.source); + let sender = core::net::SocketAddr::V4(datagram.source); let view = MessageView::parse(&buf[..len]).unwrap(); let sd_view = view.sd_header().unwrap(); server.handle_sd_message(&sd_view, sender).await.unwrap(); @@ -2401,14 +2405,14 @@ mod tests { let server = make_passive_server(0x005C, 0x0001).await; let local = server.unicast_local_addr().unwrap(); match local { - std::net::SocketAddr::V4(v4) => { + core::net::SocketAddr::V4(v4) => { assert_ne!( v4.port(), 0, "kernel should assign an ephemeral port when local_port=0" ); } - std::net::SocketAddr::V6(_) => panic!("expected IPv4 unicast address"), + core::net::SocketAddr::V6(_) => panic!("expected IPv4 unicast address"), } } @@ -2461,19 +2465,12 @@ mod tests { .err() .expect("announcement_loop on a passive server must fail"); match err { - Error::Io(io_err) => { - assert_eq!(io_err.kind(), std::io::ErrorKind::InvalidInput); - let msg = format!("{io_err}"); - assert!( - msg.contains("passive"), - "error message should mention 'passive': {msg}" - ); - assert!( - msg.contains("0x005C"), - "error message should include the service_id: {msg}" - ); + Error::InvalidUsage(tag) => { + assert_eq!(tag, "passive_server_announcement_loop"); } - other => panic!("expected Error::Io(InvalidInput), got {other:?}"), + other => panic!( + "expected Error::InvalidUsage(\"passive_server_announcement_loop\"), got {other:?}" + ), } } @@ -2485,19 +2482,10 @@ mod tests { .await .expect_err("run on a passive server must fail"); match err { - Error::Io(io_err) => { - assert_eq!(io_err.kind(), std::io::ErrorKind::InvalidInput); - let msg = format!("{io_err}"); - assert!( - msg.contains("passive"), - "error message should mention 'passive': {msg}" - ); - assert!( - msg.contains("0x005C"), - "error message should include the service_id: {msg}" - ); + Error::InvalidUsage(tag) => { + assert_eq!(tag, "passive_server_run"); } - other => panic!("expected Error::Io(InvalidInput), got {other:?}"), + other => panic!("expected Error::InvalidUsage(\"passive_server_run\"), got {other:?}"), } } @@ -2549,7 +2537,7 @@ mod tests { s.set_reuse_address(true).unwrap(); #[cfg(unix)] s.set_reuse_port(true).unwrap(); - s.bind(&std::net::SocketAddr::new(IpAddr::V4(iface), sd::MULTICAST_PORT).into()) + s.bind(&core::net::SocketAddr::new(IpAddr::V4(iface), sd::MULTICAST_PORT).into()) .unwrap(); s.set_nonblocking(true).unwrap(); let std_s: std::net::UdpSocket = s.into(); @@ -2639,8 +2627,8 @@ mod tests { .await .expect("blocker bind should succeed"); let blocker_port = match blocker.local_addr().unwrap() { - std::net::SocketAddr::V4(v4) => v4.port(), - std::net::SocketAddr::V6(_) => panic!("expected IPv4"), + core::net::SocketAddr::V4(v4) => v4.port(), + core::net::SocketAddr::V6(_) => panic!("expected IPv4"), }; let config = ServerConfig::new(Ipv4Addr::LOCALHOST, blocker_port, 0x005C, 0x0001); @@ -2781,7 +2769,7 @@ mod tests { raw_rx.set_reuse_port(true).unwrap(); raw_rx.set_multicast_loop_v4(true).unwrap(); raw_rx - .bind(&std::net::SocketAddr::new(IpAddr::V4(interface), sd::MULTICAST_PORT).into()) + .bind(&core::net::SocketAddr::new(IpAddr::V4(interface), sd::MULTICAST_PORT).into()) .unwrap(); raw_rx.set_nonblocking(true).unwrap(); let rx: UdpSocket = UdpSocket::from_std(raw_rx.into()).unwrap(); diff --git a/src/server/sd_state.rs b/src/server/sd_state.rs index 2deec16..08837ff 100644 --- a/src/server/sd_state.rs +++ b/src/server/sd_state.rs @@ -10,8 +10,8 @@ //! parameter on [`SdStateManager::send_offer_service`] becomes the single //! migration point for the announcement path. +use core::net::SocketAddrV4; use core::sync::atomic::{AtomicU32, Ordering}; -use std::net::SocketAddrV4; use crate::protocol::sd::{ self, Entry, Flags, OptionsCount, RebootFlag, ServiceEntry, TransportProtocol, diff --git a/src/server/service_info.rs b/src/server/service_info.rs index a702278..c910a7b 100644 --- a/src/server/service_info.rs +++ b/src/server/service_info.rs @@ -1,8 +1,16 @@ //! Service and event group information -use std::{net::SocketAddrV4, vec::Vec}; +use core::net::SocketAddrV4; +#[cfg(feature = "std")] +use std::vec::Vec; -/// Information about a SOME/IP service being provided +/// Information about a SOME/IP service being provided. +/// +/// Gated on `feature = "std"` because the `event_groups` field is a +/// heap `Vec`. Bare-metal consumers don't construct this type today; +/// a future port will switch to `heapless::Vec` if a use case +/// emerges. +#[cfg(feature = "std")] #[derive(Debug, Clone)] pub struct ServiceInfo { /// Service ID @@ -17,7 +25,11 @@ pub struct ServiceInfo { pub event_groups: Vec, } -/// Information about an event group +/// Information about an event group. +/// +/// Gated on `feature = "std"` for the same reason as +/// [`ServiceInfo`]. +#[cfg(feature = "std")] #[derive(Debug, Clone)] pub struct EventGroupInfo { /// Event group ID @@ -26,6 +38,7 @@ pub struct EventGroupInfo { pub event_ids: Vec, } +#[cfg(feature = "std")] impl EventGroupInfo { /// Create a new event group #[must_use] diff --git a/src/server/subscription_manager.rs b/src/server/subscription_manager.rs index 57d180c..4822563 100644 --- a/src/server/subscription_manager.rs +++ b/src/server/subscription_manager.rs @@ -2,10 +2,10 @@ use super::service_info::Subscriber; use core::future::Future; +use core::net::SocketAddrV4; use heapless::{Vec as HeaplessVec, index_map::FnvIndexMap}; #[cfg(feature = "server-tokio")] use std::sync::Arc; -use std::{net::SocketAddrV4, vec::Vec}; #[cfg(feature = "server-tokio")] use tokio::sync::RwLock; @@ -72,9 +72,11 @@ pub struct SubscriptionManager { } impl SubscriptionManager { - /// Create a new subscription manager + /// Create a new subscription manager. `const`-constructible so a + /// `static` instance can be declared in firmware boot code (used by + /// `StaticSubscriptionHandle` on bare-metal targets). #[must_use] - pub fn new() -> Self { + pub const fn new() -> Self { Self { subscriptions: FnvIndexMap::new(), } @@ -231,14 +233,25 @@ impl SubscriptionManager { } } - /// Get all subscribers for an event group + /// Get all subscribers for an event group as a heap-allocated `Vec`. + /// + /// Convenience accessor for `std` consumers (testing, ad-hoc tooling). + /// **Production code paths use [`Self::for_each_subscriber`] instead** + /// — that visitor walks the same data structure under the lock without + /// allocating per call, which is required for the bare-metal / + /// no-alloc story. + /// + /// Gated on `feature = "std"` because the return type forces an + /// `alloc` dependency. Without `std`, callers should use + /// [`Self::for_each_subscriber`]. + #[cfg(feature = "std")] #[must_use] pub fn get_subscribers( &self, service_id: u16, instance_id: u16, event_group_id: u16, - ) -> Vec { + ) -> std::vec::Vec { let key = (service_id, instance_id, event_group_id); self.subscriptions .get(&key) @@ -377,10 +390,143 @@ impl SubscriptionHandle for Arc> { } } +/// No-alloc [`SubscriptionHandle`] backed by a `&'static` +/// critical-section mutex around a [`SubscriptionManager`]. +/// +/// The bare-metal counterpart to `Arc>`. +/// All clones are the same thin pointer; the mutex serializes +/// concurrent subscribe/unsubscribe/visit calls. The futures returned +/// by the [`SubscriptionHandle`] methods are `!Send`-friendly because +/// the embassy-sync mutex's lock closure is synchronous — no `.await` +/// inside the critical section. +/// +/// # Example +/// +/// ```ignore +/// use core::cell::RefCell; +/// use embassy_sync::blocking_mutex::Mutex; +/// use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; +/// use simple_someip::server::{StaticSubscriptionHandle, StaticSubscriptionStorage, SubscriptionManager}; +/// +/// // Place the storage in a `static` so the handle can borrow it for +/// // `'static`. `SubscriptionManager::new()` is `const`, so no +/// // `Box::leak` is needed. +/// static SUBS: StaticSubscriptionStorage = +/// Mutex::new(RefCell::new(SubscriptionManager::new())); +/// +/// let handle = StaticSubscriptionHandle::new(&SUBS); +/// ``` +#[cfg(feature = "bare_metal")] +pub mod bare_metal_subscription_impl { + use super::{SubscribeError, Subscriber, SubscriptionHandle, SubscriptionManager}; + use core::cell::RefCell; + use core::future::Future; + use core::net::SocketAddrV4; + use embassy_sync::blocking_mutex::Mutex; + use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; + + /// Convenience type alias for the embassy-sync critical-section + /// mutex backing [`StaticSubscriptionHandle`]. + pub type StaticSubscriptionStorage = + Mutex>; + + /// No-alloc [`SubscriptionHandle`] backed by a `&'static` + /// critical-section mutex. + /// + /// All clones are the same thin pointer. Construct via + /// [`Self::new`] and supply a `&'static StaticSubscriptionStorage`. + /// Because [`SubscriptionManager::new`] is `const`, the storage can + /// live in a plain `static` — no `Box::leak` required. + #[derive(Clone, Copy)] + pub struct StaticSubscriptionHandle(&'static StaticSubscriptionStorage); + + impl StaticSubscriptionHandle { + /// Wraps a static reference to the backing mutex. + #[must_use] + pub const fn new(storage: &'static StaticSubscriptionStorage) -> Self { + Self(storage) + } + } + + impl SubscriptionHandle for StaticSubscriptionHandle { + fn subscribe( + &self, + service_id: u16, + instance_id: u16, + event_group_id: u16, + subscriber_addr: SocketAddrV4, + ) -> impl Future> + '_ { + let storage = self.0; + async move { + storage.lock(|cell| { + cell.borrow_mut().subscribe( + service_id, + instance_id, + event_group_id, + subscriber_addr, + ) + }) + } + } + + fn unsubscribe( + &self, + service_id: u16, + instance_id: u16, + event_group_id: u16, + subscriber_addr: SocketAddrV4, + ) -> impl Future + '_ { + let storage = self.0; + async move { + storage.lock(|cell| { + cell.borrow_mut().unsubscribe( + service_id, + instance_id, + event_group_id, + subscriber_addr, + ); + }); + } + } + + fn for_each_subscriber<'a, F>( + &'a self, + service_id: u16, + instance_id: u16, + event_group_id: u16, + mut f: F, + ) -> impl Future + 'a + where + F: FnMut(&Subscriber) + 'a, + { + let storage = self.0; + async move { + storage.lock(|cell| { + let guard = cell.borrow(); + let key = (service_id, instance_id, event_group_id); + match guard.subscriptions.get(&key) { + Some(list) => { + for sub in list { + f(sub); + } + list.len() + } + None => 0, + } + }) + } + } + } +} + +#[cfg(feature = "bare_metal")] +pub use bare_metal_subscription_impl::{StaticSubscriptionHandle, StaticSubscriptionStorage}; + #[cfg(test)] mod tests { use super::*; use std::net::Ipv4Addr; + use std::vec::Vec; #[test] fn test_subscription_management() { @@ -607,4 +753,73 @@ mod tests { assert_eq!(visited, [a2]); } } + + /// `StaticSubscriptionHandle` must satisfy the full + /// [`SubscriptionHandle`] contract so a bare-metal Server can be + /// constructed with it as the `S: SubscriptionHandle` parameter. + /// Walks subscribe → for_each_subscriber → unsubscribe → + /// for_each_subscriber to lock in each method's wiring. + #[cfg(feature = "bare_metal")] + mod static_handle { + use super::*; + use crate::server::{StaticSubscriptionHandle, StaticSubscriptionStorage}; + use core::cell::RefCell; + use embassy_sync::blocking_mutex::Mutex as BlockingMutex; + use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; + + // Driver for poll-once tests: SubscriptionHandle methods return + // a Future that may complete synchronously when the underlying + // storage is a critical-section mutex (no actual yield point). + // We poll with a noop waker to avoid spinning up a runtime. + fn block_on_sync(fut: F) -> F::Output { + use core::pin::pin; + use core::task::{Context, Poll, Waker}; + let mut fut = pin!(fut); + let waker = Waker::noop(); + let mut cx = Context::from_waker(waker); + match fut.as_mut().poll(&mut cx) { + Poll::Ready(v) => v, + Poll::Pending => panic!( + "StaticSubscriptionHandle methods must complete \ + synchronously (no .await inside the lock); got Pending" + ), + } + } + + #[test] + fn static_subscription_handle_full_contract() { + // Box::leak rather than a #[test]-local `static` so we + // don't need to thread const-init constraints through + // every test. + let storage: &'static StaticSubscriptionStorage = + std::boxed::Box::leak(std::boxed::Box::new(BlockingMutex::< + CriticalSectionRawMutex, + RefCell, + >::new(RefCell::new( + SubscriptionManager::new(), + )))); + let handle = StaticSubscriptionHandle::new(storage); + let a1 = SocketAddrV4::new(Ipv4Addr::new(10, 0, 0, 1), 8001); + let a2 = SocketAddrV4::new(Ipv4Addr::new(10, 0, 0, 2), 8002); + + block_on_sync(handle.subscribe(0x5B, 1, 0x01, a1)).unwrap(); + block_on_sync(handle.subscribe(0x5B, 1, 0x01, a2)).unwrap(); + + let mut visited: std::vec::Vec = std::vec::Vec::new(); + let count = block_on_sync( + handle.for_each_subscriber(0x5B, 1, 0x01, |s| visited.push(s.address)), + ); + assert_eq!(count, 2); + assert!(visited.contains(&a1)); + assert!(visited.contains(&a2)); + + block_on_sync(handle.unsubscribe(0x5B, 1, 0x01, a1)); + visited.clear(); + let count = block_on_sync( + handle.for_each_subscriber(0x5B, 1, 0x01, |s| visited.push(s.address)), + ); + assert_eq!(count, 1); + assert_eq!(visited, [a2]); + } + } } diff --git a/src/tokio_transport.rs b/src/tokio_transport.rs index e720a86..b0c8dd7 100644 --- a/src/tokio_transport.rs +++ b/src/tokio_transport.rs @@ -279,22 +279,36 @@ impl crate::transport::Spawner for TokioSpawner { // their owning `SocketManager` drops its channel ends, at // which point the future completes naturally. // - // Wrap in `catch_unwind` so a panic inside the spawned task is - // logged through the `tracing` pipeline that the rest of the - // crate uses, instead of being swallowed silently to stderr by - // tokio's default panic handler. The caller's - // `Error::SocketClosedUnexpectedly` (surfaced when the - // panicking task drops its channel ends) then has a - // corresponding diagnostic in the operator's logs. - use futures::FutureExt; + // Spawn the future on tokio. If it panics, tokio aborts the + // task and the `JoinHandle::await` resolves to a `JoinError` + // with `is_panic() == true`; we log through the `tracing` + // pipeline so the panic is visible alongside the rest of the + // crate's diagnostics, instead of being swallowed to stderr. + // The caller's `Error::SocketClosedUnexpectedly` (surfaced + // when the panicking task drops its channel ends) then has a + // corresponding log line. Done via a watcher task rather than + // `futures::FutureExt::catch_unwind` so we don't need + // futures-util's std feature on the bare-metal builds (the + // tokio backend pulls std anyway, but the dep wiring is + // simpler this way). + let join = tokio::spawn(future); drop(tokio::spawn(async move { - let result = std::panic::AssertUnwindSafe(future).catch_unwind().await; - if let Err(payload) = result { - let msg = panic_payload_str(&payload); - tracing::error!( - panic_message = msg, - "spawned task panicked; channels will close", - ); + match join.await { + Ok(()) => {} + Err(e) if e.is_panic() => { + let payload = e.into_panic(); + let msg = panic_payload_str(&payload); + tracing::error!( + panic_message = msg, + "spawned task panicked; channels will close", + ); + } + Err(e) => { + tracing::debug!( + join_error = ?e, + "spawned task ended without panic (e.g. cancelled)", + ); + } } })); } diff --git a/src/traits.rs b/src/traits.rs index 6cd8c2f..261a081 100644 --- a/src/traits.rs +++ b/src/traits.rs @@ -1,9 +1,7 @@ -#[cfg(feature = "std")] use crate::protocol::sd; use crate::protocol::{self, MessageId, sd::Flags}; /// Information about a service endpoint extracted from an SD message. -#[cfg(feature = "std")] pub struct OfferedEndpoint { /// The SOME/IP service ID. pub service_id: u16, @@ -14,7 +12,7 @@ pub struct OfferedEndpoint { /// The minor version of the offered service interface. pub minor_version: u32, /// The IPv4 socket address extracted from the SD options, if present. - pub addr: Option, + pub addr: Option, /// `true` for `OfferService`, `false` for `StopOfferService`. pub is_offer: bool, } @@ -87,7 +85,6 @@ pub trait PayloadWireFormat: core::fmt::Debug + Send + Sized + Sync { fn encode(&self, writer: &mut T) -> Result; /// Construct an SD header for subscribing to an event group. - #[cfg(feature = "std")] #[allow(clippy::too_many_arguments)] fn new_subscription_sd_header( service_id: u16, @@ -95,7 +92,7 @@ pub trait PayloadWireFormat: core::fmt::Debug + Send + Sized + Sync { major_version: u8, ttl: u32, event_group_id: u16, - client_ip: std::net::Ipv4Addr, + client_ip: core::net::Ipv4Addr, protocol: sd::TransportProtocol, client_port: u16, reboot_flag: sd::RebootFlag, @@ -103,31 +100,66 @@ pub trait PayloadWireFormat: core::fmt::Debug + Send + Sized + Sync { /// Override the reboot flag on an SD header in-place. /// - /// Used by `Client::sd_announcements_loop` (when the `client` feature is - /// enabled) to refresh the reboot flag per-tick from the client's - /// tracked state. Defaults to a no-op so that `std`-but-not-`client` - /// consumers (e.g. host-side tooling that builds SD headers manually - /// without ever driving an announcement loop) don't have to provide - /// an impl that will never be called. - #[cfg(feature = "std")] + /// Used by `Client::sd_announcements_loop` to refresh the reboot + /// flag per-tick from the client's tracked state. Defaults to a + /// no-op so payload types that never participate in SD reboot + /// tracking (e.g. `RawPayload` for static-only SD use) don't have + /// to provide an impl that will never be called. fn set_reboot_flag(_header: &mut Self::SdHeader, _reboot: sd::RebootFlag) {} - /// Extract offered/stopped service endpoints from this SD payload. + /// Visit each offered / stopped service endpoint in this SD + /// payload with `f`. + /// + /// Visitor pattern (rather than returning a `Vec`) so the trait + /// is `no_std`-compatible: the implementation walks its internal + /// SD entries and invokes `f` for each `OfferedEndpoint`. The + /// `Client` run loop uses this to auto-populate its service + /// registry from inbound discovery messages. + /// + /// The default implementation visits nothing — payload types + /// that don't carry SD entries (e.g. application payloads) leave + /// it unimplemented; SD-bearing types (e.g. `RawPayload`'s + /// `VecSdHeader` payload) override. + fn for_each_offered_endpoint(&self, _f: F) + where + F: FnMut(OfferedEndpoint), + { + } + + /// Visit `(service_id, instance_id)` for every SD entry in this + /// payload, regardless of entry type, with `f`. + /// + /// Used by the `Client` run loop for per-service-instance + /// session/reboot tracking so that all SD traffic (not just + /// offers) contributes to reboot detection. /// - /// Default implementation returns an empty vec. Concrete implementations - /// that have access to SD entries and options should override this. + /// Visitor pattern for the same `no_std` reason as + /// [`Self::for_each_offered_endpoint`]; default visits nothing. + fn for_each_service_instance(&self, _f: F) + where + F: FnMut(u16, u16), + { + } + + /// Convenience accessor returning all offered endpoints as a heap + /// `Vec`. Wraps [`Self::for_each_offered_endpoint`] so std users + /// get the original ergonomic shape; bare-metal users use the + /// visitor directly. Gated on `feature = "std"`. #[cfg(feature = "std")] fn offered_endpoints(&self) -> std::vec::Vec { - std::vec::Vec::new() + let mut out = std::vec::Vec::new(); + self.for_each_offered_endpoint(|ep| out.push(ep)); + out } - /// Return `(service_id, instance_id)` pairs for every SD entry in this - /// payload, regardless of entry type. - /// - /// Used for per-service-instance session/reboot tracking so that all SD - /// traffic (not just offers) contributes to reboot detection. + /// Convenience accessor returning all `(service_id, instance_id)` + /// pairs as a heap `Vec`. Wraps + /// [`Self::for_each_service_instance`] for std users. Gated on + /// `feature = "std"`. #[cfg(feature = "std")] fn service_instances(&self) -> std::vec::Vec<(u16, u16)> { - std::vec::Vec::new() + let mut out = std::vec::Vec::new(); + self.for_each_service_instance(|svc, inst| out.push((svc, inst))); + out } } diff --git a/src/transport.rs b/src/transport.rs index df98ae8..b44bfac 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -732,7 +732,17 @@ pub trait Spawner { /// event loop. pub trait E2ERegistryHandle: Clone + Send + Sync + 'static { /// Register an E2E profile for the given key, replacing any prior entry. - fn register(&self, key: E2EKey, profile: E2EProfile); + /// + /// # Errors + /// + /// Returns [`E2ERegistryFull`] when the underlying registry has no + /// capacity for a new key. Replacing an already-registered key + /// always succeeds (the existing slot is reused). Implementations + /// that wrap [`crate::e2e::E2ERegistry`] forward this error + /// directly; backends with their own storage should pick an + /// equivalent overflow contract. + fn register(&self, key: E2EKey, profile: E2EProfile) + -> Result<(), crate::e2e::E2ERegistryFull>; /// Remove the E2E configuration for the given key. No-op if absent. fn unregister(&self, key: &E2EKey); @@ -794,15 +804,15 @@ pub trait InterfaceHandle: Clone + Send + Sync + 'static { mod std_handle_impls { use super::{E2ERegistryHandle, InterfaceHandle}; use crate::e2e::Error as E2EError; - use crate::e2e::{E2ECheckStatus, E2EKey, E2EProfile, E2ERegistry}; + use crate::e2e::{E2ECheckStatus, E2EKey, E2EProfile, E2ERegistry, E2ERegistryFull}; use core::net::Ipv4Addr; use std::sync::{Arc, Mutex, RwLock}; impl E2ERegistryHandle for Arc> { - fn register(&self, key: E2EKey, profile: E2EProfile) { + fn register(&self, key: E2EKey, profile: E2EProfile) -> Result<(), E2ERegistryFull> { self.lock() .expect("e2e registry lock poisoned") - .register(key, profile); + .register(key, profile) } fn unregister(&self, key: &E2EKey) { @@ -949,15 +959,17 @@ pub mod bare_metal_handle_impls { } /// `StaticE2EHandle` — no-alloc `E2ERegistryHandle` backed by a -/// `&'static` critical-section mutex. Requires `feature = "std"` because -/// the underlying [`crate::e2e::E2ERegistry`] currently uses `HashMap`. -/// On a pure-`no_std` target the registry must be ported (see crate -/// roadmap); until then, callers wanting bare-metal interface handles -/// (the more common need) can use [`AtomicInterfaceHandle`] alone. -#[cfg(all(feature = "bare_metal", feature = "std"))] +/// `&'static` critical-section mutex. +/// +/// Available in pure `no_std` builds: [`crate::e2e::E2ERegistry`] is +/// backed by [`heapless::index_map::FnvIndexMap`] (since phase 18a), +/// so no allocator is required. +#[cfg(feature = "bare_metal")] pub mod bare_metal_e2e_impl { use super::E2ERegistryHandle; - use crate::e2e::{E2ECheckStatus, E2EKey, E2EProfile, E2ERegistry, Error as E2EError}; + use crate::e2e::{ + E2ECheckStatus, E2EKey, E2EProfile, E2ERegistry, E2ERegistryFull, Error as E2EError, + }; use core::cell::RefCell; use embassy_sync::blocking_mutex::Mutex; use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; @@ -983,8 +995,8 @@ pub mod bare_metal_e2e_impl { } impl E2ERegistryHandle for StaticE2EHandle { - fn register(&self, key: E2EKey, profile: E2EProfile) { - self.0.lock(|cell| cell.borrow_mut().register(key, profile)); + fn register(&self, key: E2EKey, profile: E2EProfile) -> Result<(), E2ERegistryFull> { + self.0.lock(|cell| cell.borrow_mut().register(key, profile)) } fn unregister(&self, key: &E2EKey) { @@ -1023,7 +1035,7 @@ pub mod bare_metal_e2e_impl { #[cfg(feature = "bare_metal")] pub use bare_metal_handle_impls::AtomicInterfaceHandle; -#[cfg(all(feature = "bare_metal", feature = "std"))] +#[cfg(feature = "bare_metal")] pub use bare_metal_e2e_impl::{StaticE2EHandle, StaticE2EStorage}; // ── Channel-handle abstraction ──────────────────────────────────────────── @@ -1422,7 +1434,13 @@ mod tests { struct NullE2ERegistry; impl E2ERegistryHandle for NullE2ERegistry { - fn register(&self, _key: E2EKey, _profile: E2EProfile) {} + fn register( + &self, + _key: E2EKey, + _profile: E2EProfile, + ) -> Result<(), crate::e2e::E2ERegistryFull> { + Ok(()) + } fn unregister(&self, _key: &E2EKey) {} fn contains_key(&self, _key: &E2EKey) -> bool { false @@ -1463,7 +1481,8 @@ mod tests { r.register( key, crate::e2e::E2EProfile::Profile4(crate::e2e::Profile4Config::new(0, 8)), - ); + ) + .expect("NullE2ERegistry::register is infallible"); assert!(!r.contains_key(&key)); assert!(r.check(key, b"hello", [0; 8]).is_none()); } diff --git a/tests/client_server.rs b/tests/client_server.rs index 459f6bb..a93e676 100644 --- a/tests/client_server.rs +++ b/tests/client_server.rs @@ -421,7 +421,9 @@ async fn test_e2e_protect_on_publish_and_check_on_receive() { method_or_event_id: 0x0001, }; let profile = E2EProfile::Profile4(Profile4Config::new(0x12345678, 15)); - server.register_e2e(key, profile.clone()); + server + .register_e2e(key, profile.clone()) + .expect("E2E registry has capacity for one entry"); let server_handle = tokio::spawn(async move { server.run().await }); @@ -429,7 +431,9 @@ async fn test_e2e_protect_on_publish_and_check_on_receive() { let _run_handle = tokio::spawn(run_fut); // Register matching E2E profile on client - client.register_e2e(key, profile); + client + .register_e2e(key, profile) + .expect("E2E registry has capacity for one entry"); let server_addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, server_port); client diff --git a/tests/no_alloc_witness.rs b/tests/no_alloc_witness.rs index db4c1f2..0466ffd 100644 --- a/tests/no_alloc_witness.rs +++ b/tests/no_alloc_witness.rs @@ -179,11 +179,15 @@ fn witness_static_e2e_handle_reads() { >::new(RefCell::new(E2ERegistry::new())))); let handle = StaticE2EHandle::new(storage); - // register() allocates into the HashMap — also construction-time. - handle.register( - E2EKey::new(0x1234, 0x0001), - E2EProfile::Profile4(Profile4Config::new(0xDEAD_BEEF, 15)), - ); + // register() writes into the heapless FnvIndexMap — fits within the + // E2E_REGISTRY_CAP, so no allocation. Done at construction-time + // (outside the assert_no_alloc closures below). + handle + .register( + E2EKey::new(0x1234, 0x0001), + E2EProfile::Profile4(Profile4Config::new(0xDEAD_BEEF, 15)), + ) + .expect("register fits within E2E_REGISTRY_CAP"); // Hot-path reads must be alloc-free. assert_no_alloc("StaticE2EHandle::contains_key (hit)", || { @@ -211,19 +215,23 @@ fn witness_static_e2e_handle_protect_check() { >::new(RefCell::new(E2ERegistry::new())))); let handle = StaticE2EHandle::new(storage); - handle.register( - E2EKey::new(0x0001, 0x8001), - E2EProfile::Profile4(Profile4Config::new(0x1234_5678, 15)), - ); + handle + .register( + E2EKey::new(0x0001, 0x8001), + E2EProfile::Profile4(Profile4Config::new(0x1234_5678, 15)), + ) + .expect("register fits within E2E_REGISTRY_CAP"); // Register a second profile (Profile5) so the protect/check witness // covers both profile families' hot paths, not just Profile4. - handle.register( - E2EKey::new(0x0002, 0x8002), - // data_length must equal payload length (5 = b"hello".len()) - // — a mismatch routes through `tracing::warn!`, which is fine in - // production but adds noise to a no-alloc witness. - E2EProfile::Profile5(Profile5Config::new(0xABCD, 5, 15)), - ); + handle + .register( + E2EKey::new(0x0002, 0x8002), + // data_length must equal payload length (5 = b"hello".len()) + // — a mismatch routes through `tracing::warn!`, which is fine in + // production but adds noise to a no-alloc witness. + E2EProfile::Profile5(Profile5Config::new(0xABCD, 5, 15)), + ) + .expect("register fits within E2E_REGISTRY_CAP"); let key = E2EKey::new(0x0001, 0x8001); let payload = b"hello";