feat(xmtp_mls): BidiSubscription actor (mutate-in-place, auto-pong liveness)#3772
feat(xmtp_mls): BidiSubscription actor (mutate-in-place, auto-pong liveness)#3772tylerhawkes wants to merge 1 commit into
Conversation
|
Claude finished @tylerhawkes's task —— View job Code Review in Progress
|
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## tyler/xip83-3-bidi-api #3772 +/- ##
=========================================================
Coverage ? 84.49%
=========================================================
Files ? 410
Lines ? 60252
Branches ? 0
=========================================================
Hits ? 50911
Misses ? 9341
Partials ? 0 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
| let inbound = api | ||
| .subscribe_bidi(Box::pin(ReceiverStream::new(outbound_rx))) | ||
| .await?; | ||
|
|
||
| let actor = xmtp_common::spawn( | ||
| None, | ||
| run_actor(Box::pin(inbound), outbound.clone(), event_tx), | ||
| ); | ||
|
|
||
| let this = Self { | ||
| outbound, | ||
| events, | ||
| ping_nonce: AtomicU64::new(0), | ||
| actor: actor.abort_handle(), | ||
| }; | ||
| // Buffer capacity is fresh, so this cannot block meaningfully. | ||
| this.mutate(initial).await.map_err(|_| ()).ok(); | ||
| Ok(this) |
There was a problem hiding this comment.
🟡 Medium subscriptions/bidi.rs:96
open spawns run_actor before sending the initial Mutate, so if the server sends a Ping immediately, the actor's auto-generated Pong can be enqueued before the Mutate. This violates open's contract that the first client frame is the initial Mutate, causing the server to observe protocol-invalid frame order. Consider sending the Mutate synchronously before spawning the actor, or ensuring no other frames can be sent until open completes.
- let inbound = api
- .subscribe_bidi(Box::pin(ReceiverStream::new(outbound_rx)))
- .await?;
-
- let actor = xmtp_common::spawn(
- None,
- run_actor(Box::pin(inbound), outbound.clone(), event_tx),
- );
-
- let this = Self {
- outbound,
- events,
- ping_nonce: AtomicU64::new(0),
- actor: actor.abort_handle(),
- };
- // Buffer capacity is fresh, so this cannot block meaningfully.
- this.mutate(initial).await.map_err(|_| ()).ok();
- Ok(this)🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @crates/xmtp_mls/src/subscriptions/bidi.rs around lines 96-113:
`open` spawns `run_actor` before sending the initial `Mutate`, so if the server sends a `Ping` immediately, the actor's auto-generated `Pong` can be enqueued before the `Mutate`. This violates `open`'s contract that the first client frame is the initial `Mutate`, causing the server to observe protocol-invalid frame order. Consider sending the `Mutate` synchronously before spawning the actor, or ensuring no other frames can be sent until `open` completes.
22b2c8b to
a9ffc28
Compare
e06c6e2 to
a9b8444
Compare
| // Buffer capacity is fresh, so this cannot block meaningfully. | ||
| this.mutate(initial).await.map_err(|_| ()).ok(); | ||
| Ok(this) |
There was a problem hiding this comment.
🟠 High subscriptions/bidi.rs:109
When the server immediately closes the stream and the actor exits before the initial mutate completes, this.mutate(initial) returns Err(BidiSubscriptionError::Closed). The .map_err(|_| ()).ok() at line 110 discards this error, causing open to return Ok(this) despite the subscription failing to initialize. Callers believe the subscription is valid but the required initial topic set was never sent.
- // Buffer capacity is fresh, so this cannot block meaningfully.
- this.mutate(initial).await.map_err(|_| ()).ok();
- Ok(this)
+ // Buffer capacity is fresh, so this cannot block meaningfully.
+ this.mutate(initial).await?;
+ Ok(this)🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @crates/xmtp_mls/src/subscriptions/bidi.rs around lines 109-111:
When the server immediately closes the stream and the actor exits before the initial `mutate` completes, `this.mutate(initial)` returns `Err(BidiSubscriptionError::Closed)`. The `.map_err(|_| ()).ok()` at line 110 discards this error, causing `open` to return `Ok(this)` despite the subscription failing to initialize. Callers believe the subscription is valid but the required initial topic set was never sent.
a9b8444 to
1993216
Compare
a9ffc28 to
249cf99
Compare
1993216 to
294f32b
Compare
249cf99 to
13bce00
Compare
…#3769) **Stack 1/4** of the XIP-83 bidi client lane: #3769 → #3770 → #3771 → #3772. Regenerated `xmtp.mls.api.v1` from xmtp/proto#337: the bidirectional `Subscribe` RPC with versioned `SubscribeRequest`/`SubscribeResponse`, id-based `Mutate` (cursors, `history_only`), `Ping`/`Pong`, `TopicsLive`, `CATCHUP_COMPLETE`, and STARTED `capabilities`. Purely additive (+1,896 generated lines); `proto_version` pinned to that branch's sha — draft until the proto PR merges.
…veness, TopicsLive/CATCHUP_COMPLETE events) Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
294f32b to
630c62a
Compare
13bce00 to
0f1cead
Compare

Stack 4/4 of the XIP-83 bidi client lane (prev: #3769 → #3770 → #3771).
BidiSubscriptioninxmtp_mls::subscriptions::bidi(native-only): one spawned task owns the inbound wire stream — the client mirror of the server's single-writer — so wire order is preserved verbatim in event order.mutate()adds/removes subscriptions in place (no reconnect on membership change)Pings;ping()probes a resumed linkBidiEvent:Started{keepalive, capabilities}/CatchUpComplete/TopicsLive/GroupMessages/WelcomeMessages/Pongnext() == None= stream dead → caller re-opens from durable cursors (reconnect policy deliberately lives one layer up)Dropaborts the actor (no zombie keepalive); unknown versions/frames skipped for forward-compat6 unit tests against a channel-backed mock peer: initial-mutate + Started, auto-pong, wire-order preservation (history → markers → CATCHUP_COMPLETE → live), mutate + ping round-trip, unknown-frame tolerance, stream-end semantics. Integration into the stream_all/stream_conversations machinery is the next lane (per-client router + fallback-to-legacy), not this PR.
🤖 Generated with Claude Code
Note
Add
BidiSubscriptionactor for bidirectional MLS stream subscriptions with auto-pong livenessbidimodule implementing XIP-83 bidirectional subscription semantics over a single long-lived stream.BidiSubscription::openspawns a single-writer actor that handles inbound/outbound frames, sends an initialMutateon startup, and exposes ordered events via aBidiEventenum.Pingframes withPong, and callers can send client-initiated pings viaBidiSubscription::pingand correlate responses by nonce.BidiSubscription::mutateupdates the active topic set in-place without reconnecting; dropping the subscription stops the actor and cancels the server-side stream.xmtp_common::if_native!insubscriptions/mod.rs, leaving non-native builds unchanged.📊 Macroscope summarized e06c6e2. 2 files reviewed, 0 issues evaluated, 0 issues filtered, 0 comments posted
🗂️ Filtered Issues
No issues evaluated.