Skip to content

feat(xmtp_mls): BidiSubscription actor (mutate-in-place, auto-pong liveness)#3772

Draft
tylerhawkes wants to merge 1 commit into
tyler/xip83-3-bidi-apifrom
tyler/xip83-4-bidi-actor
Draft

feat(xmtp_mls): BidiSubscription actor (mutate-in-place, auto-pong liveness)#3772
tylerhawkes wants to merge 1 commit into
tyler/xip83-3-bidi-apifrom
tyler/xip83-4-bidi-actor

Conversation

@tylerhawkes

@tylerhawkes tylerhawkes commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

Stack 4/4 of the XIP-83 bidi client lane (prev: #3769#3770#3771).

BidiSubscription in xmtp_mls::subscriptions::bidi (native-only): one spawned task owns the inbound wire stream — the client mirror of the server's single-writer — so wire order is preserved verbatim in event order.

  • mutate() adds/removes subscriptions in place (no reconnect on membership change)
  • auto-answers server Pings; ping() probes a resumed link
  • BidiEvent: Started{keepalive, capabilities} / CatchUpComplete / TopicsLive / GroupMessages / WelcomeMessages / Pong
  • next() == None = stream dead → caller re-opens from durable cursors (reconnect policy deliberately lives one layer up)
  • Drop aborts the actor (no zombie keepalive); unknown versions/frames skipped for forward-compat

6 unit tests against a channel-backed mock peer: initial-mutate + Started, auto-pong, wire-order preservation (history → markers → CATCHUP_COMPLETE → live), mutate + ping round-trip, unknown-frame tolerance, stream-end semantics. Integration into the stream_all/stream_conversations machinery is the next lane (per-client router + fallback-to-legacy), not this PR.

🤖 Generated with Claude Code

Note

Add BidiSubscription actor for bidirectional MLS stream subscriptions with auto-pong liveness

  • Adds a new native-only bidi module implementing XIP-83 bidirectional subscription semantics over a single long-lived stream.
  • BidiSubscription::open spawns a single-writer actor that handles inbound/outbound frames, sends an initial Mutate on startup, and exposes ordered events via a BidiEvent enum.
  • The actor automatically responds to server Ping frames with Pong, and callers can send client-initiated pings via BidiSubscription::ping and correlate responses by nonce.
  • BidiSubscription::mutate updates the active topic set in-place without reconnecting; dropping the subscription stops the actor and cancels the server-side stream.
  • The module is conditionally compiled via xmtp_common::if_native! in subscriptions/mod.rs, leaving non-native builds unchanged.
📊 Macroscope summarized e06c6e2. 2 files reviewed, 0 issues evaluated, 0 issues filtered, 0 comments posted

🗂️ Filtered Issues

No issues evaluated.

@claude

claude Bot commented Jun 12, 2026

Copy link
Copy Markdown

Claude finished @tylerhawkes's task —— View job


Code Review in Progress

Todo List:

  • Analyze code for bugs and edge cases not already commented on
  • Review performance considerations
  • Check security concerns
  • Evaluate test coverage and test quality
  • Review API design and usability
  • Post comprehensive review feedback

Code Review - June 16, 2026, 21:06 UTC

I've reviewed the BidiSubscription implementation. The code is generally well-structured with good test coverage. Here are the most important findings (excluding issues already raised by macroscopeapp):

Critical Issues

1. Race Condition in Actor Shutdown (bidi.rs:147-154)
The Drop implementation calls self.actor.end() to prevent zombie keepalives, but there's a potential race: if the actor is currently executing outbound.send(pong).await (line 196) when Drop runs, the send could complete after the subscription is logically "closed" from the caller's perspective. This is benign for liveness but violates the stated invariant that dropping stops all activity.

2. Silent Failure on Channel Full (bidi.rs:196-198)
When auto-ponging a server Ping, if outbound.send(pong).await fails (channel closed), the actor breaks the loop—correct. But if the channel is somehow full (though capacity is 64), the .await blocks indefinitely, delaying all subsequent frame processing including user data. Consider using try_send with a fallback or document that pongs must never queue behind much (already mentioned in comments, but the code doesn't enforce it).

High-Priority Issues

3. Missing Backpressure Documentation (bidi.rs:35-39)
The EVENT_BUFFER comment states "consumer too slow" semantics are intended, but the code doesn't explicitly document what happens to the actor when the channel fills. The actor's emit() at line 256 will block on events.send(event).await, which is correct but should be explicitly documented as the backpressure mechanism.

4. Test Gap: Concurrent Operations (bidi.rs:259-572)
Tests verify sequential operations well, but there's no test for concurrent mutate()/ping() calls. Since both use the same outbound channel, are multiple concurrent mutate() calls from different tasks safe? The FIFO guarantee holds (tokio mpsc is multi-producer), but this should be tested or documented.

5. Error Type Lacks Context (bidi.rs:67-72)
BidiSubscriptionError::Closed is the only error variant. When a send fails, callers can't distinguish between "actor panicked," "network died," or "stream was dropped." For debugging/observability, consider adding context (e.g., "actor_dead," "send_failed").

Medium-Priority Issues

6. Ping Nonce Overflow (bidi.rs:124)
fetch_add(1, Ordering::Relaxed) + 1 will overflow at u64::MAX. While unlikely in practice (requires 2^64 pings), the code doesn't handle wraparound. If nonces must be unique, use Wrapping or document that overflow is acceptable (pong correlation would break for one ping).

7. No Timeout on next() (bidi.rs:132-134)
If the server stops sending frames but doesn't close the stream, next().await hangs indefinitely. The comment at line 131 says "None means the stream ended," but doesn't address liveness detection. Callers must use ping() + timeout separately—this pattern should be documented in the method's doc comment.

Positive Observations

  • The single-writer actor model is clean and ensures wire-order preservation
  • Tests are thorough for the covered scenarios (6 tests with good edge case coverage)
  • Forward-compatibility (skipping unknown frames/versions) is well-handled
  • The Drop implementation preventing zombie streams is a nice touch

Test Coverage

Codecov reports 16 lines uncovered. Reviewing the code:

  • Lines 178-181: Error path when inbound.next() returns Err—tested implicitly if the mock ever errored, but not explicitly
  • Lines 196-198, 201-203, 210-212, etc.: Early break paths when emit() or send() fail—covered by the "stream end" test but could be more explicit

Consider adding:

  • Test for actor behavior when events channel consumer drops (emit returns error)
  • Test for inbound stream error (not just clean close)
  • Test for concurrent mutate() + ping() operations

Overall Assessment: Solid implementation with good design. The two critical issues (Drop race, pong blocking) are edge cases but should be addressed or documented. The macroscopeapp comments about frame ordering and error swallowing are the highest priority.


@codecov

codecov Bot commented Jun 12, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 95.34884% with 16 lines in your changes missing coverage. Please review.
⚠️ Please upload report for BASE (tyler/xip83-3-bidi-api@0f1cead). Learn more about missing BASE report.

Files with missing lines Patch % Lines
crates/xmtp_mls/src/subscriptions/bidi.rs 95.34% 16 Missing ⚠️
Additional details and impacted files
@@                    Coverage Diff                    @@
##             tyler/xip83-3-bidi-api    #3772   +/-   ##
=========================================================
  Coverage                          ?   84.49%           
=========================================================
  Files                             ?      410           
  Lines                             ?    60252           
  Branches                          ?        0           
=========================================================
  Hits                              ?    50911           
  Misses                            ?     9341           
  Partials                          ?        0           

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Comment on lines +96 to +113
let inbound = api
.subscribe_bidi(Box::pin(ReceiverStream::new(outbound_rx)))
.await?;

let actor = xmtp_common::spawn(
None,
run_actor(Box::pin(inbound), outbound.clone(), event_tx),
);

let this = Self {
outbound,
events,
ping_nonce: AtomicU64::new(0),
actor: actor.abort_handle(),
};
// Buffer capacity is fresh, so this cannot block meaningfully.
this.mutate(initial).await.map_err(|_| ()).ok();
Ok(this)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium subscriptions/bidi.rs:96

open spawns run_actor before sending the initial Mutate, so if the server sends a Ping immediately, the actor's auto-generated Pong can be enqueued before the Mutate. This violates open's contract that the first client frame is the initial Mutate, causing the server to observe protocol-invalid frame order. Consider sending the Mutate synchronously before spawning the actor, or ensuring no other frames can be sent until open completes.

-        let inbound = api
-            .subscribe_bidi(Box::pin(ReceiverStream::new(outbound_rx)))
-            .await?;
-
-        let actor = xmtp_common::spawn(
-            None,
-            run_actor(Box::pin(inbound), outbound.clone(), event_tx),
-        );
-
-        let this = Self {
-            outbound,
-            events,
-            ping_nonce: AtomicU64::new(0),
-            actor: actor.abort_handle(),
-        };
-        // Buffer capacity is fresh, so this cannot block meaningfully.
-        this.mutate(initial).await.map_err(|_| ()).ok();
-        Ok(this)
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @crates/xmtp_mls/src/subscriptions/bidi.rs around lines 96-113:

`open` spawns `run_actor` before sending the initial `Mutate`, so if the server sends a `Ping` immediately, the actor's auto-generated `Pong` can be enqueued before the `Mutate`. This violates `open`'s contract that the first client frame is the initial `Mutate`, causing the server to observe protocol-invalid frame order. Consider sending the `Mutate` synchronously before spawning the actor, or ensuring no other frames can be sent until `open` completes.

@tylerhawkes tylerhawkes force-pushed the tyler/xip83-3-bidi-api branch from 22b2c8b to a9ffc28 Compare June 12, 2026 21:33
@tylerhawkes tylerhawkes force-pushed the tyler/xip83-4-bidi-actor branch from e06c6e2 to a9b8444 Compare June 12, 2026 21:33
Comment on lines +109 to +111
// Buffer capacity is fresh, so this cannot block meaningfully.
this.mutate(initial).await.map_err(|_| ()).ok();
Ok(this)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 High subscriptions/bidi.rs:109

When the server immediately closes the stream and the actor exits before the initial mutate completes, this.mutate(initial) returns Err(BidiSubscriptionError::Closed). The .map_err(|_| ()).ok() at line 110 discards this error, causing open to return Ok(this) despite the subscription failing to initialize. Callers believe the subscription is valid but the required initial topic set was never sent.

-        // Buffer capacity is fresh, so this cannot block meaningfully.
-        this.mutate(initial).await.map_err(|_| ()).ok();
-        Ok(this)
+        // Buffer capacity is fresh, so this cannot block meaningfully.
+        this.mutate(initial).await?;
+        Ok(this)
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @crates/xmtp_mls/src/subscriptions/bidi.rs around lines 109-111:

When the server immediately closes the stream and the actor exits before the initial `mutate` completes, `this.mutate(initial)` returns `Err(BidiSubscriptionError::Closed)`. The `.map_err(|_| ()).ok()` at line 110 discards this error, causing `open` to return `Ok(this)` despite the subscription failing to initialize. Callers believe the subscription is valid but the required initial topic set was never sent.

@tylerhawkes tylerhawkes force-pushed the tyler/xip83-4-bidi-actor branch from a9b8444 to 1993216 Compare June 12, 2026 21:59
@tylerhawkes tylerhawkes force-pushed the tyler/xip83-3-bidi-api branch from a9ffc28 to 249cf99 Compare June 12, 2026 21:59
@tylerhawkes tylerhawkes force-pushed the tyler/xip83-4-bidi-actor branch from 1993216 to 294f32b Compare June 15, 2026 20:20
@tylerhawkes tylerhawkes force-pushed the tyler/xip83-3-bidi-api branch from 249cf99 to 13bce00 Compare June 15, 2026 20:20
tylerhawkes added a commit that referenced this pull request Jun 16, 2026
…#3769)

**Stack 1/4** of the XIP-83 bidi client lane: #3769#3770#3771#3772.

Regenerated `xmtp.mls.api.v1` from xmtp/proto#337: the bidirectional
`Subscribe` RPC with versioned `SubscribeRequest`/`SubscribeResponse`,
id-based `Mutate` (cursors, `history_only`), `Ping`/`Pong`,
`TopicsLive`, `CATCHUP_COMPLETE`, and STARTED `capabilities`. Purely
additive (+1,896 generated lines); `proto_version` pinned to that
branch's sha — draft until the proto PR merges.
…veness, TopicsLive/CATCHUP_COMPLETE events)

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
@tylerhawkes tylerhawkes force-pushed the tyler/xip83-4-bidi-actor branch from 294f32b to 630c62a Compare June 16, 2026 21:04
@tylerhawkes tylerhawkes force-pushed the tyler/xip83-3-bidi-api branch from 13bce00 to 0f1cead Compare June 16, 2026 21:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant