Skip to content

feat: XIP-83 bidirectional Subscribe handler (mutable subscriptions + ping/pong liveness)#562

Merged
tylerhawkes merged 1 commit into
mainfrom
tyler/xip-83-bidi-subscribe
Jun 16, 2026
Merged

feat: XIP-83 bidirectional Subscribe handler (mutable subscriptions + ping/pong liveness)#562
tylerhawkes merged 1 commit into
mainfrom
tyler/xip-83-bidi-subscribe

Conversation

@tylerhawkes

@tylerhawkes tylerhawkes commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

Server side of XIP-83: one bidirectional Subscribe stream on the v3 MLS API that replaces repeated server-streaming subscriptions.

Depends on xmtp/proto#337 (regenerated pkg/proto/mls/api/v1 comes from that branch) — draft until it merges.

What's in here

  • Single-writer handler (pkg/mls/api/v1/subscribe.go): the select loop is the sole owner of all stream state and the sole caller of stream.Send — no mutexes; catch-up fetchers and the frame reader are pure producers over channels.
  • Mutate-in-place: id-based add/remove with per-topic cursors; (*Subscription).Add/Remove are O(1) under the dispatch lock (pkg/subscriptions).
  • Batched catch-up: chunked (256) bounded-pool (4) unnest(...) CROSS JOIN LATERAL queries with per-group pagination (pkg/mls/store/readStore.go); 2MB frame splitting; 64MB pending-buffer cap.
  • Ordering guarantees: live-gate before dispatcher Add + per-topic high-water mark ⇒ history-before-live, no duplicates, no gaps.
  • Live-boundary signals: TopicsLive after each topic's history (including the drained pending buffer); one CATCHUP_COMPLETE per adding Mutate (wave), after the wave's last marker.
  • Bounded catch-up: history_only Mutates never register with the dispatcher; client half-close drains in-flight waves then the server closes OK (no pings post-half-close).
  • Liveness: idle-triggered Ping (≤30s, resets on traffic), reap on missed Pong (DeadlineExceeded), answers client pings.

Testing

8 handler tests against real Postgres via a hand-written MlsApi_SubscribeServer fake: catch-up-then-live no-dupes, mutate-remove, ping/pong keepalive, reap-on-missed-pong, multi-identity multiplexing, TopicsLive boundary ordering, history-only non-delivery, half-close drain. All race-clean across 3 consecutive -race runs; golangci-lint 0 issues.

🤖 Generated with Claude Code

Note

Add bidirectional MlsApi.Subscribe gRPC handler with mutable subscriptions and ping/pong liveness

  • Implements the XIP-83 Subscribe bidirectional streaming RPC in pkg/mls/api/v1/subscribe.go: clients send Mutate requests to add/remove topics in-place, and the server delivers ordered history (catch-up) followed by live messages per topic.
  • Uses a single-writer goroutine model: a dedicated sender owns all mutable state, enforces frame size limits (maxFrameBytes), and aborts with ResourceExhausted when the pending buffer exceeds maxPendingBytes.
  • Emits TopicsLive markers after catch-up per topic and CatchupComplete frames echoing mutate_id once a full wave completes; supports history_only adds that fetch bounded history without live registration.
  • Adds QueryGroupMessagesBatch and QueryWelcomeMessagesBatch to pkg/mls/store/readStore.go for single-round-trip batched catch-up queries with per-topic cursors and limits.
  • Extends Subscription in pkg/subscriptions/subscription.go with Add/Remove methods and SubscriptionDispatcher with NewSubscription to support in-place topic mutation under the dispatcher lock.
  • Risk: streams are reaped on missed pong (configurable pongDeadline) or send stall; non-reading clients receive DeadlineExceeded rather than backpressure.

Macroscope summarized ce3f169.

Comment thread pkg/mls/api/v1/subscribe.go Outdated
Comment thread pkg/mls/api/v1/subscribe.go Outdated
Comment thread pkg/mls/store/readStore.go Outdated
return out, rows.Err()
}

const queryWelcomeMessagesBatch = `

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium store/readStore.go:424

The SQL query queryWelcomeMessagesBatch duplicates rows when the same installation_key appears multiple times in filters because unnest preserves duplicates and CROSS JOIN LATERAL executes the subquery once per duplicate. The caller in catchUpWelcomes uses counts[installationKey] == catchUpPerGroupLimit to decide pagination is complete, so duplicate keys can make the count exceed the limit on the first page and incorrectly mark the topic as caught up, causing later welcome messages to be dropped silently.

🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @pkg/mls/store/readStore.go around line 424:

The SQL query `queryWelcomeMessagesBatch` duplicates rows when the same `installation_key` appears multiple times in `filters` because `unnest` preserves duplicates and `CROSS JOIN LATERAL` executes the subquery once per duplicate. The caller in `catchUpWelcomes` uses `counts[installationKey] == catchUpPerGroupLimit` to decide pagination is complete, so duplicate keys can make the count exceed the limit on the first page and incorrectly mark the topic as caught up, causing later welcome messages to be dropped silently.

Comment on lines +510 to +513
default:
// Parity with the live dispatch path (worker.go): surface, don't swallow.
s.log.Error("unknown welcome message type", zap.Int16("message_type", messageType))
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium store/readStore.go:510

QueryWelcomeMessagesBatch drops DB rows with unknown message_type values and returns success. The caller in subscribe.go counts returned proto messages to decide whether more pages exist; when unknown rows are dropped, it receives fewer than perGroupLimit results and incorrectly concludes the installation key is fully caught up. This silently truncates catch-up history after the first batch containing an unknown row.

-		default:
-			// Parity with the live dispatch path (worker.go): surface, don't swallow.
-			s.log.Error("unknown welcome message type", zap.Int16("message_type", messageType))
+		default:
+			return nil, fmt.Errorf("unknown welcome message type: %d", messageType)
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @pkg/mls/store/readStore.go around lines 510-513:

`QueryWelcomeMessagesBatch` drops DB rows with unknown `message_type` values and returns success. The caller in `subscribe.go` counts returned proto messages to decide whether more pages exist; when unknown rows are dropped, it receives fewer than `perGroupLimit` results and incorrectly concludes the installation key is fully caught up. This silently truncates catch-up history after the first batch containing an unknown row.

Comment thread pkg/mls/api/v1/subscribe.go Outdated
@tylerhawkes tylerhawkes force-pushed the tyler/xip-83-bidi-subscribe branch from 07a3e27 to 7e60932 Compare June 12, 2026 21:58
Comment thread pkg/mls/api/v1/subscribe.go Outdated
Comment thread pkg/mls/api/v1/subscribe.go
Comment thread pkg/mls/api/v1/subscribe.go
@tylerhawkes tylerhawkes force-pushed the tyler/xip-83-bidi-subscribe branch from 7e60932 to 77d7b5e Compare June 15, 2026 20:12
Comment thread pkg/mls/api/v1/subscribe.go
Comment thread pkg/mls/api/v1/subscribe.go Outdated
@tylerhawkes tylerhawkes marked this pull request as ready for review June 15, 2026 23:39
@tylerhawkes tylerhawkes requested a review from a team as a code owner June 15, 2026 23:39
@macroscopeapp

macroscopeapp Bot commented Jun 15, 2026

Copy link
Copy Markdown

Approvability

Verdict: Needs human review

3 blocking correctness issues found. This PR introduces a significant new feature (XIP-83 bidirectional Subscribe handler) with complex streaming and concurrency logic. The author does not own any of the modified files, and there are multiple unresolved medium-severity review comments identifying potential bugs that could cause silent data loss.

You can customize Macroscope's approvability policy. Learn more.

@tylerhawkes tylerhawkes force-pushed the tyler/xip-83-bidi-subscribe branch from 77d7b5e to bf5ecd2 Compare June 16, 2026 00:13
Comment thread pkg/mls/api/v1/subscribe.go
Comment thread pkg/mls/api/v1/subscribe.go
select {
case <-ctx.Done():
return nil
case <-s.ctx.Done():

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 Low v1/subscribe.go:567

lastActivity is only updated inside send at line 172, so inbound client frames like Ping, Pong, or Mutate that don't trigger a response frame never reset the idle timer. On an otherwise idle stream where the client is actively sending keepalive traffic, the server still emits its own Ping after s.pingInterval and can eventually reap the stream with DeadlineExceeded despite active client communication. Consider updating lastActivity whenever any valid client frame is received.

🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @pkg/mls/api/v1/subscribe.go around line 567:

`lastActivity` is only updated inside `send` at line 172, so inbound client frames like `Ping`, `Pong`, or `Mutate` that don't trigger a response frame never reset the idle timer. On an otherwise idle stream where the client is actively sending keepalive traffic, the server still emits its own `Ping` after `s.pingInterval` and can eventually reap the stream with `DeadlineExceeded` despite active client communication. Consider updating `lastActivity` whenever any valid client frame is received.

@tylerhawkes tylerhawkes force-pushed the tyler/xip-83-bidi-subscribe branch 3 times, most recently from c462cf9 to e6588e2 Compare June 16, 2026 19:51
Comment thread pkg/mls/api/v1/subscribe.go
@tylerhawkes tylerhawkes force-pushed the tyler/xip-83-bidi-subscribe branch from e6588e2 to 47a33fd Compare June 16, 2026 20:29
Comment thread pkg/mls/api/v1/subscribe.go Outdated
… ping/pong liveness)

Single-writer bidi Subscribe on the v3 MLS API: mutate-in-place subscriptions over kind-prefixed topics (XIP-49) with per-topic catch-up (batched, bounded-pool), live-gate + high-water-mark dedup, TopicsLive markers, mutate_id-correlated per-wave CatchupComplete, history_only bounded catch-up with half-close drain, ping/pong reaping, and O(1) Add/Remove on the dispatcher subscription.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
@tylerhawkes tylerhawkes force-pushed the tyler/xip-83-bidi-subscribe branch from 47a33fd to ce3f169 Compare June 16, 2026 20:49
@tylerhawkes tylerhawkes merged commit 658496e into main Jun 16, 2026
7 checks passed
@tylerhawkes tylerhawkes deleted the tyler/xip-83-bidi-subscribe branch June 16, 2026 20:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants