Skip to content

feat(api): XIP-83 d14n binding — bidirectional Subscribe on QueryApi#2020

Open
tylerhawkes wants to merge 3 commits into
mainfrom
tyler/xip-83-d14n-subscribe
Open

feat(api): XIP-83 d14n binding — bidirectional Subscribe on QueryApi#2020
tylerhawkes wants to merge 3 commits into
mainfrom
tyler/xip-83-d14n-subscribe

Conversation

@tylerhawkes

@tylerhawkes tylerhawkes commented Jun 18, 2026

Copy link
Copy Markdown
Contributor

XIP-83 d14n binding — bidirectional QueryApi.Subscribe

Implements the decentralized (d14n) binding of XIP-83 mutable subscription streams on xmtpd. A single long-lived bidirectional stream lets a client mutate its topic set in place (add/remove subscriptions) and keep the connection alive with ping/pong — instead of tearing down and reopening a stream every time group membership changes.

This is the xmtpd counterpart to the node-go v3 MlsApi.Subscribe binding. Same control protocol (Mutate / Started / CatchupComplete / TopicsLive / Ping / Pong); the d14n binding delivers OriginatorEnvelopes and uses per-originator vector cursors.

Design: gated async catch-up

  • Single-writer + single-sender actor per subscription. All topic-set mutation happens on the writer goroutine; all stream sends happen on the sender goroutine (drained via a bounded queue).
  • Per-topic gate + pending buffer. A newly-added topic is caught up by an async fetcher goroutine while already-live topics keep flowing — a slow catch-up never head-of-line-blocks live delivery.
  • No missing messages across the switch. A topic is registered live before its catch-up snapshot is taken, and delivery is deduped by a monotonic per-originator cursor (advanceTopicCursors). Anything that lands during catch-up is either in the snapshot or arrives live; the dedup drops the overlap.
  • Sparse vs. filled cursors. The writer keeps only the originator cursors the client provided (growing them as new originators are seen); the fetcher holds a filled copy purely for the query. Keeps memory bounded at the active-topic ceiling.
  • 1M active-topic ceiling (maxActiveSubscribeTopics) — ~16–32MB for a consumer that large, deliberately favored over forcing clients into multiple connections (which would invite rate-limiting and wreck some topologies).
  • Liveness. Either peer may Ping; the receiver must Pong the nonce. No pong within the deadline → the node reaps the vanished peer (e.g. a mobile client the OS suspended behind a proxy that still ACKs the transport).

Commits

  1. feat(api): add mutableSubscription handle to subscribeWorker — in-place topic add/remove on the existing subscribe worker, guarded by topicsMu.
  2. feat(api): XIP-83 bidirectional Subscribe handler on QueryApi + xmtpv4 protos — the handler (subscribe.go) + regenerated xmtpv4 protos.
  3. test(api): Subscribe handler tests + configurable keepalive interval for tests — 5 race-tested scenarios + a test-only knob to shorten the keepalive cadence.

Tests

go test -race ./pkg/api/message — all green:

  • TestSubscribe_CatchUpThenLive
  • TestSubscribe_MutateRemoveStopsDelivery
  • TestSubscribe_HalfCloseHistoryOnlyDrains
  • TestSubscribe_HistoryOnlyOnLiveRejected
  • TestSubscribe_NoPongIsReaped

Dependency & merge ordering

Built on xmtp/proto#338 (QueryApi.Subscribe). The committed .pb.go here is generated from that branch via dev/gen/protos; once #338 merges, regenerating from proto main produces byte-identical output.

Note: proto#338's Test (Golang) check cross-compiles xmtpd main against the new proto and will stay red until this handler lands on main (xmtpd's *Service must implement Subscribe). Recommended order: merge proto#338 (its red Golang check is expected), then this.

🤖 Generated with Claude Code

Note

Add XIP-83 bidirectional Subscribe RPC to QueryApi with mutable topic membership and catch-up pagination

  • Adds a new Subscribe bidi-streaming RPC to QueryApi (subscribe.go) implementing XIP-83: clients can mutate their topic set in-flight, receive catch-up history, then transition to live delivery.
  • Introduces mutableSubscription (mutable_subscription.go) to manage a listener whose topic set can be atomically added to or removed from while the worker is running.
  • Catch-up is paginated per wave; live envelopes for catching-up topics are buffered and de-duplicated before being released, with a configurable byte cap (ResourceExhausted on overflow).
  • Keepalive ping/pong is enforced: the stream is reaped if a pong deadline passes, with a pre-reap drain to avoid false positives from select fairness.
  • Fixes a concurrency bug in subscribeWorker.closeListener (subscribe_worker.go) where the closed flag and topic map removal were not properly synchronized, risking sends on a closed channel.
  • Risk: new topicsMu lock in the listener struct (listener.go) is acquired on every add/remove/reap path; incorrect lock ordering could deadlock under high concurrency.

Macroscope summarized c8131f9.

@tylerhawkes tylerhawkes requested a review from a team as a code owner June 18, 2026 22:26
@octane-security-app

Copy link
Copy Markdown

Summary by Octane

New Contracts

  • mutable_subscription.go: The smart contract manages dynamic topic subscriptions for message streams, allowing topics to be added or removed in real-time.
  • subscribe.go: The smart contract facilitates bidirectional mutable subscriptions, allowing topic-based stream management with live updates and history retrieval.

Updated Contracts

  • listener.go: Added a mutex for thread-safe topic management in the listener to handle concurrency issues.
  • subscribe_worker.go: Concurrent access protection added for topic mutations using locking, ensuring safe modifications.
  • message_api.pb.go: The smart contract adds new subscription functionalities with "V1 Mutate" and "Ping/Pong" features.
  • query_api.connect.go: Introduced XIP-83 bidirectional mutable subscriptions for dynamic topic management, requiring HTTP/2.
  • query_api.pb.go: Added new "Subscribe" method to the smart contract API.
  • query_api_grpc.pb.go: Added a bidirectional Subscribe method, allowing clients to maintain mutable subscriptions with real-time topic adjustments.
  • api.go: Added configurable keep-alive interval for server ping/pong handling in test API server.

🔗 Commit Hash: c44b65c

Comment thread pkg/api/message/subscribe.go
Comment on lines +47 to +56
m.l.topicsMu.Lock()
defer m.l.topicsMu.Unlock()
if m.l.closed {
return
}
m.worker.topicListeners.addListener(keys, m.l)
for k := range keys {
m.l.topics[k] = struct{}{}
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Critical message/mutable_subscription.go:47

mutableSubscription.addTopics checks m.l.closed while holding topicsMu, but subscribeWorker.closeListener sets l.closed = true without taking that lock. A concurrent mutate can pass the stale closed check and re-add the listener to topicListeners after its channel is closed, causing dispatchToListeners to panic on send.

 func (m *mutableSubscription) addTopics(keys map[string]struct{}) {
 	if len(keys) == 0 {
 		return
 	}
-	m.l.topicsMu.Lock()
-	defer m.l.topicsMu.Unlock()
-	if m.l.closed {
+	m.worker.topicListeners.mu.RLock()
+	defer m.worker.topicListeners.mu.RUnlock()
+	m.l.topicsMu.Lock()
+	defer m.l.topicsMu.Unlock()
+	if m.l.closed {
 		return
 	}
 	m.worker.topicListeners.addListener(keys, m.l)
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @pkg/api/message/mutable_subscription.go around lines 47-56:

`mutableSubscription.addTopics` checks `m.l.closed` while holding `topicsMu`, but `subscribeWorker.closeListener` sets `l.closed = true` without taking that lock. A concurrent mutate can pass the stale `closed` check and re-add the listener to `topicListeners` after its channel is closed, causing `dispatchToListeners` to panic on send.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. closeListener now sets l.closed under l.topicsMu, so a concurrent mutableSubscription.addTopics observes it atomically with its addListener call and can no longer re-register a listener after its channel is closed. Added a -race regression test (TestMutableSubscriptionAddRaceWithClose) that hammers addTopics against the reap.

Comment thread pkg/api/message/subscribe.go
Comment on lines +368 to +373
wire := make([][]byte, 0, len(w.topics))
for _, t := range w.topics {
if w.historyOnly {
wire = append(wire, t.wire)
continue
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium message/subscribe.go:368

handleCatchUp appends history_only topics to wire at line 371 and emits TopicsLive for them at line 392, but these topics are never added to liveTopics so no live envelopes will actually arrive. A client receiving the TopicsLive frame incorrectly believes these topics are now live. Consider skipping the TopicsLive announcement entirely for history_only waves.

 	wire := make([][]byte, 0, len(w.topics))
 	for _, t := range w.topics {
 		if w.historyOnly {
-			wire = append(wire, t.wire)
 			continue
 		}
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @pkg/api/message/subscribe.go around lines 368-373:

`handleCatchUp` appends `history_only` topics to `wire` at line 371 and emits `TopicsLive` for them at line 392, but these topics are never added to `liveTopics` so no live envelopes will actually arrive. A client receiving the `TopicsLive` frame incorrectly believes these topics are now live. Consider skipping the `TopicsLive` announcement entirely for `history_only` waves.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Working as specified, leaving as-is. The proto contract for the history_only field explicitly mandates these markers — it says the adds are caught up via "history, TopicsLive markers, and the wave's CatchupComplete" — and TopicsLive is documented informational-only ("delivery correctness (no duplicates, no gaps) never depends on it"). A client that opted into history_only interprets TopicsLive as the "everything as of now" boundary, not "a live tail follows"; it learns there is no live tail because it sent history_only (and typically half-closes). Suppressing the marker here would make this binding diverge from the field contract and from the v3 MlsApi binding.

@macroscopeapp

macroscopeapp Bot commented Jun 18, 2026

Copy link
Copy Markdown
Contributor

Approvability

Verdict: Needs human review

5 blocking correctness issues found. This PR introduces a significant new feature (XIP-83 bidirectional Subscribe RPC) with ~1000+ lines of new implementation code including complex concurrency patterns. The author does not own any of the changed files (owned by @xmtp/backend). Additionally, unresolved review comments identify critical and high-severity bugs including a race condition and infinite loop.

You can customize Macroscope's approvability policy. Learn more.

…lace topic add/remove

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
@octane-security-app

Copy link
Copy Markdown

Overview

Vulnerabilities found: 18                                                                                
Severity breakdown: 6 High, 1 Medium, 6 Low, 5 Informational
Warnings found: 1                                                                                

Detailed findings

pkg/api/message/listener.go

  • Data race on listener.closed and non-idempotent removal in XIP-83 mutable subscription causes durable listener leaks and node-level DoS risk. See more
  • Write-locked O(N) listener removal in mutable Subscribe (teardown/removeTopics) causes server-wide stalls of subscription mutations. See more

pkg/api/message/subscribe.go

  • Per-send uncancelable timer allocation (time.After) in QueryApi.Subscribe causes resource exhaustion/DoS risk. See more
  • Excessive live-topic budget and missing rate limits in QueryApi.Subscribe cause low-effort node DoS via memory exhaustion. See more
  • Missing rate limiting and cursor monotonicity in QueryApi.Subscribe mutable subscriptions causes unmetered in-place history replays and node resource exhaustion. See more
  • Unbatched, size-unchecked flush in mutable Subscribe catch-up in QueryApi.Subscribe causes subscriber stream abort (DoS). See more
  • Unmetered bidi QueryApi.Subscribe without rate limiting and silent ignore of unknown request types causes single-node volumetric DoS. See more
  • Overlapping catch-up waves and missing rate limiting in QueryApi.Subscribe cause resource-exhaustion risk. See more
  • TTL-cached originator enumeration in XIP-83 Subscribe catch-up causes missed historical topic envelopes. See more
  • Count-based buffering of large stream elements in QueryApi.Subscribe causes single-node memory exhaustion/DoS. See more
  • Keepalive Ping/Pong not suspended after client half-close in bidi Subscribe causes premature stream aborts during bounded sync. See more
  • Non-atomic mutate handling in XIP-83 Subscribe causes unexpected partial application and stream teardown. See more
  • Immediate per-topic state removal in mutable Subscribe (pkg/api/message/subscribe.go) causes loss of in-flight envelopes at unsubscribe boundary. See more
  • Lack of per-topic wave serialization and mixed dedup state in mutable Subscribe causes duplicate and out-of-order delivery. See more
  • Half-close + flush path not draining live producer channel in bidi Subscribe causes tail live envelopes to be omitted at stream close. See more
  • Missing sender error check in mutable Subscribe flush() causes false-OK termination and missed final frames. See more

pkg/api/server.go

  • HTTP/2 per-stream read timeout from http.Server.ReadTimeout in API server causes long‑lived QueryApi.Subscribe streams to be force‑closed and enables DoS via reconnection churn. See more

pkg/interceptors/server/rate_limit.go

  • Unbounded concurrent catch-up waves and missing opens limit in QueryApi.Subscribe cause node-level DoS risk. See more

Warnings

pkg/api/message/subscribe.go

  • removes-first mutate ordering in Subscribe allows same-frame history_only reset causing silent loss of live delivery. See more

🔗 Commit Hash: c44b65c
🛡️ Octane Dashboard: All vulnerabilities

tylerhawkes and others added 2 commits June 19, 2026 00:38
…4 protos

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…for tests

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
@tylerhawkes tylerhawkes force-pushed the tyler/xip-83-d14n-subscribe branch from c44b65c to c8131f9 Compare June 19, 2026 06:40
@tylerhawkes

Copy link
Copy Markdown
Contributor Author

Pushed an update addressing the Macroscope findings (3 fixed, 1 working-as-specified per the history_only proto contract — see inline replies).

Beyond those, I did a deeper self-review of the concurrent paths and hardened several edge cases, each with a regression test:

  • Liveness: decoupled the pong-reap deadline from the delivery/ping cadence (a single shared ticker meant steady outbound traffic could postpone the reap indefinitely; long half-close drains could be falsely reaped). Ping cadence now tracks real sends; the reap deadline is independent and suppressed during half-close.
  • Re-add: a plain re-add of an already-live topic (no remove) is now a no-op instead of resetting the cursor and replaying — replay still requires remove+re-add.
  • Teardown: flush() now propagates a sender send-error instead of returning a false OK with the wave's terminal frames undelivered; the stream context is cancelable so catch-up fetchers tear down promptly.
  • Writer latency: the per-Mutate originator-list lookup (a DB round-trip on a cache miss) moved off the single writer goroutine into the catch-up fetcher, so a slow DB can no longer stall liveness or false-reap a healthy stream.
  • Overlap: rejecting a second overlapping catch-up for the same topic (including via remove+re-add over an in-flight history_only wave).

go test -race ./pkg/api/message is green; golangci-lint clean.

errors.New("add targets a topic with an in-flight history_only catch-up"),
)
}
provided := make(db.VectorClock)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium message/subscribe.go:560

Entries from the client-provided LastSeen cursor are copied into provided (line 560–561) without checking that uint32 node IDs fit int32 or that uint64 sequence IDs fit int64. When db.SetPerTopicCursors later builds the catch-up query, it silently drops entries that overflow the signed database column types — so the catch-up fetch skips those originators entirely. Meanwhile the oversized value is still stored verbatim in sess.cursors (line 637), causing advanceTopicCursors to treat every future envelope from that originator as "already seen" (since the real sequence will always be less than the oversized floor). The topic is permanently dead on this stream.

Validate each LastSeen entry here and reject the Mutate with InvalidArgument if any node ID exceeds math.MaxInt32 or any sequence ID exceeds math.MaxInt64.

🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @pkg/api/message/subscribe.go around line 560:

Entries from the client-provided `LastSeen` cursor are copied into `provided` (line 560–561) without checking that `uint32` node IDs fit `int32` or that `uint64` sequence IDs fit `int64`. When `db.SetPerTopicCursors` later builds the catch-up query, it silently drops entries that overflow the signed database column types — so the catch-up fetch skips those originators entirely. Meanwhile the oversized value is still stored verbatim in `sess.cursors` (line 637), causing `advanceTopicCursors` to treat every future envelope from that originator as "already seen" (since the real sequence will always be less than the oversized floor). The topic is permanently dead on this stream.

Validate each `LastSeen` entry here and reject the `Mutate` with `InvalidArgument` if any node ID exceeds `math.MaxInt32` or any sequence ID exceeds `math.MaxInt64`.

}
}

knownOriginators, err := s.originatorList.GetOriginatorNodeIDs(ctx)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium message/subscribe.go:733

runSubscribeCatchUp snapshots the originator set once (line 733) and fills queryCursors from that fixed snapshot. For a history_only wave, no live listener is registered, so if a new originator publishes to one of the subscribed topics between the snapshot and the final pagination page, its envelopes are never queried — yet the client still receives CatchupComplete. This means history_only catch-up can silently miss messages, violating the completeness guarantee the done marker implies.

Consider re-fetching the originator list after the pagination loop finishes and, if new originators appeared, running an additional pass for those originators. Alternatively, document this as a known limitation of history_only if the race window is deemed acceptable.

🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @pkg/api/message/subscribe.go around line 733:

`runSubscribeCatchUp` snapshots the originator set once (line 733) and fills `queryCursors` from that fixed snapshot. For a `history_only` wave, no live listener is registered, so if a new originator publishes to one of the subscribed topics between the snapshot and the final pagination page, its envelopes are never queried — yet the client still receives `CatchupComplete`. This means `history_only` catch-up can silently miss messages, violating the completeness guarantee the `done` marker implies.

Consider re-fetching the originator list after the pagination loop finishes and, if new originators appeared, running an additional pass for those originators. Alternatively, document this as a known limitation of `history_only` if the race window is deemed acceptable.

Comment on lines +767 to +770
envs := unmarshalEnvelopes(rows, logger)
// Advance the fetcher's own (filled) cursors so the next page continues; the proto
// result is discarded here — the writer re-dedups the raw envs against the live cursor.
_ = advanceTopicCursors(queryCursors, envs, logger)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 High message/subscribe.go:767

advanceTopicCursors is called with only the successfully unmarshaled envs, so any row whose OriginatorEnvelope bytes fail to unmarshal is silently dropped and never advances queryCursors. Because the pagination loop breaks on int32(len(rows)) < rowsPerEntry (raw row count), a full page containing even one permanently bad row causes the cursor to stay at the same position, re-fetch the same page, and spin indefinitely — the wave's done marker is never emitted and the subscription is stuck in catch-up. Advance queryCursors from the raw rows instead, so malformed envelopes are skipped past.

 			envs := unmarshalEnvelopes(rows, logger)
-			// Advance the fetcher's own (filled) cursors so the next page continues; the proto
-			// result is discarded here — the writer re-dedups the raw envs against the live cursor.
-			_ = advanceTopicCursors(queryCursors, envs, logger)
+			// Advance the fetcher's own (filled) cursors from raw rows so the
+			// next page continues even when some rows fail to unmarshal.
+			for _, r := range rows {
+				vc, ok := queryCursors[string(r.Topic)]
+				if !ok {
+					continue
+				}
+				origID := uint32(r.OriginatorNodeID)
+				seqID := uint64(r.OriginatorSequenceID)
+				if cur, exists := vc[origID]; !exists || cur < seqID {
+					vc[origID] = seqID
+				}
+			}
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @pkg/api/message/subscribe.go around lines 767-770:

`advanceTopicCursors` is called with only the successfully unmarshaled `envs`, so any row whose `OriginatorEnvelope` bytes fail to unmarshal is silently dropped and never advances `queryCursors`. Because the pagination loop breaks on `int32(len(rows)) < rowsPerEntry` (raw row count), a full page containing even one permanently bad row causes the cursor to stay at the same position, re-fetch the same page, and spin indefinitely — the wave's `done` marker is never emitted and the subscription is stuck in catch-up. Advance `queryCursors` from the raw rows instead, so malformed envelopes are skipped past.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant