feat(api): XIP-83 d14n binding — bidirectional Subscribe on QueryApi#2020
feat(api): XIP-83 d14n binding — bidirectional Subscribe on QueryApi#2020tylerhawkes wants to merge 3 commits into
Conversation
Summary by OctaneNew Contracts
Updated Contracts
🔗 Commit Hash: c44b65c |
| m.l.topicsMu.Lock() | ||
| defer m.l.topicsMu.Unlock() | ||
| if m.l.closed { | ||
| return | ||
| } | ||
| m.worker.topicListeners.addListener(keys, m.l) | ||
| for k := range keys { | ||
| m.l.topics[k] = struct{}{} | ||
| } | ||
| } |
There was a problem hiding this comment.
🔴 Critical message/mutable_subscription.go:47
mutableSubscription.addTopics checks m.l.closed while holding topicsMu, but subscribeWorker.closeListener sets l.closed = true without taking that lock. A concurrent mutate can pass the stale closed check and re-add the listener to topicListeners after its channel is closed, causing dispatchToListeners to panic on send.
func (m *mutableSubscription) addTopics(keys map[string]struct{}) {
if len(keys) == 0 {
return
}
- m.l.topicsMu.Lock()
- defer m.l.topicsMu.Unlock()
- if m.l.closed {
+ m.worker.topicListeners.mu.RLock()
+ defer m.worker.topicListeners.mu.RUnlock()
+ m.l.topicsMu.Lock()
+ defer m.l.topicsMu.Unlock()
+ if m.l.closed {
return
}
m.worker.topicListeners.addListener(keys, m.l)🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @pkg/api/message/mutable_subscription.go around lines 47-56:
`mutableSubscription.addTopics` checks `m.l.closed` while holding `topicsMu`, but `subscribeWorker.closeListener` sets `l.closed = true` without taking that lock. A concurrent mutate can pass the stale `closed` check and re-add the listener to `topicListeners` after its channel is closed, causing `dispatchToListeners` to panic on send.
There was a problem hiding this comment.
Fixed. closeListener now sets l.closed under l.topicsMu, so a concurrent mutableSubscription.addTopics observes it atomically with its addListener call and can no longer re-register a listener after its channel is closed. Added a -race regression test (TestMutableSubscriptionAddRaceWithClose) that hammers addTopics against the reap.
| wire := make([][]byte, 0, len(w.topics)) | ||
| for _, t := range w.topics { | ||
| if w.historyOnly { | ||
| wire = append(wire, t.wire) | ||
| continue | ||
| } |
There was a problem hiding this comment.
🟡 Medium message/subscribe.go:368
handleCatchUp appends history_only topics to wire at line 371 and emits TopicsLive for them at line 392, but these topics are never added to liveTopics so no live envelopes will actually arrive. A client receiving the TopicsLive frame incorrectly believes these topics are now live. Consider skipping the TopicsLive announcement entirely for history_only waves.
wire := make([][]byte, 0, len(w.topics))
for _, t := range w.topics {
if w.historyOnly {
- wire = append(wire, t.wire)
continue
}🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @pkg/api/message/subscribe.go around lines 368-373:
`handleCatchUp` appends `history_only` topics to `wire` at line 371 and emits `TopicsLive` for them at line 392, but these topics are never added to `liveTopics` so no live envelopes will actually arrive. A client receiving the `TopicsLive` frame incorrectly believes these topics are now live. Consider skipping the `TopicsLive` announcement entirely for `history_only` waves.
There was a problem hiding this comment.
Working as specified, leaving as-is. The proto contract for the history_only field explicitly mandates these markers — it says the adds are caught up via "history, TopicsLive markers, and the wave's CatchupComplete" — and TopicsLive is documented informational-only ("delivery correctness (no duplicates, no gaps) never depends on it"). A client that opted into history_only interprets TopicsLive as the "everything as of now" boundary, not "a live tail follows"; it learns there is no live tail because it sent history_only (and typically half-closes). Suppressing the marker here would make this binding diverge from the field contract and from the v3 MlsApi binding.
ApprovabilityVerdict: Needs human review 5 blocking correctness issues found. This PR introduces a significant new feature (XIP-83 bidirectional Subscribe RPC) with ~1000+ lines of new implementation code including complex concurrency patterns. The author does not own any of the changed files (owned by @xmtp/backend). Additionally, unresolved review comments identify critical and high-severity bugs including a race condition and infinite loop. You can customize Macroscope's approvability policy. Learn more. |
…lace topic add/remove Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Overview
Detailed findings
|
…4 protos Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…for tests Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
c44b65c to
c8131f9
Compare
|
Pushed an update addressing the Macroscope findings (3 fixed, 1 working-as-specified per the Beyond those, I did a deeper self-review of the concurrent paths and hardened several edge cases, each with a regression test:
|
| errors.New("add targets a topic with an in-flight history_only catch-up"), | ||
| ) | ||
| } | ||
| provided := make(db.VectorClock) |
There was a problem hiding this comment.
🟡 Medium message/subscribe.go:560
Entries from the client-provided LastSeen cursor are copied into provided (line 560–561) without checking that uint32 node IDs fit int32 or that uint64 sequence IDs fit int64. When db.SetPerTopicCursors later builds the catch-up query, it silently drops entries that overflow the signed database column types — so the catch-up fetch skips those originators entirely. Meanwhile the oversized value is still stored verbatim in sess.cursors (line 637), causing advanceTopicCursors to treat every future envelope from that originator as "already seen" (since the real sequence will always be less than the oversized floor). The topic is permanently dead on this stream.
Validate each LastSeen entry here and reject the Mutate with InvalidArgument if any node ID exceeds math.MaxInt32 or any sequence ID exceeds math.MaxInt64.
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @pkg/api/message/subscribe.go around line 560:
Entries from the client-provided `LastSeen` cursor are copied into `provided` (line 560–561) without checking that `uint32` node IDs fit `int32` or that `uint64` sequence IDs fit `int64`. When `db.SetPerTopicCursors` later builds the catch-up query, it silently drops entries that overflow the signed database column types — so the catch-up fetch skips those originators entirely. Meanwhile the oversized value is still stored verbatim in `sess.cursors` (line 637), causing `advanceTopicCursors` to treat every future envelope from that originator as "already seen" (since the real sequence will always be less than the oversized floor). The topic is permanently dead on this stream.
Validate each `LastSeen` entry here and reject the `Mutate` with `InvalidArgument` if any node ID exceeds `math.MaxInt32` or any sequence ID exceeds `math.MaxInt64`.
| } | ||
| } | ||
|
|
||
| knownOriginators, err := s.originatorList.GetOriginatorNodeIDs(ctx) |
There was a problem hiding this comment.
🟡 Medium message/subscribe.go:733
runSubscribeCatchUp snapshots the originator set once (line 733) and fills queryCursors from that fixed snapshot. For a history_only wave, no live listener is registered, so if a new originator publishes to one of the subscribed topics between the snapshot and the final pagination page, its envelopes are never queried — yet the client still receives CatchupComplete. This means history_only catch-up can silently miss messages, violating the completeness guarantee the done marker implies.
Consider re-fetching the originator list after the pagination loop finishes and, if new originators appeared, running an additional pass for those originators. Alternatively, document this as a known limitation of history_only if the race window is deemed acceptable.
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @pkg/api/message/subscribe.go around line 733:
`runSubscribeCatchUp` snapshots the originator set once (line 733) and fills `queryCursors` from that fixed snapshot. For a `history_only` wave, no live listener is registered, so if a new originator publishes to one of the subscribed topics between the snapshot and the final pagination page, its envelopes are never queried — yet the client still receives `CatchupComplete`. This means `history_only` catch-up can silently miss messages, violating the completeness guarantee the `done` marker implies.
Consider re-fetching the originator list after the pagination loop finishes and, if new originators appeared, running an additional pass for those originators. Alternatively, document this as a known limitation of `history_only` if the race window is deemed acceptable.
| envs := unmarshalEnvelopes(rows, logger) | ||
| // Advance the fetcher's own (filled) cursors so the next page continues; the proto | ||
| // result is discarded here — the writer re-dedups the raw envs against the live cursor. | ||
| _ = advanceTopicCursors(queryCursors, envs, logger) |
There was a problem hiding this comment.
🟠 High message/subscribe.go:767
advanceTopicCursors is called with only the successfully unmarshaled envs, so any row whose OriginatorEnvelope bytes fail to unmarshal is silently dropped and never advances queryCursors. Because the pagination loop breaks on int32(len(rows)) < rowsPerEntry (raw row count), a full page containing even one permanently bad row causes the cursor to stay at the same position, re-fetch the same page, and spin indefinitely — the wave's done marker is never emitted and the subscription is stuck in catch-up. Advance queryCursors from the raw rows instead, so malformed envelopes are skipped past.
envs := unmarshalEnvelopes(rows, logger)
- // Advance the fetcher's own (filled) cursors so the next page continues; the proto
- // result is discarded here — the writer re-dedups the raw envs against the live cursor.
- _ = advanceTopicCursors(queryCursors, envs, logger)
+ // Advance the fetcher's own (filled) cursors from raw rows so the
+ // next page continues even when some rows fail to unmarshal.
+ for _, r := range rows {
+ vc, ok := queryCursors[string(r.Topic)]
+ if !ok {
+ continue
+ }
+ origID := uint32(r.OriginatorNodeID)
+ seqID := uint64(r.OriginatorSequenceID)
+ if cur, exists := vc[origID]; !exists || cur < seqID {
+ vc[origID] = seqID
+ }
+ }🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @pkg/api/message/subscribe.go around lines 767-770:
`advanceTopicCursors` is called with only the successfully unmarshaled `envs`, so any row whose `OriginatorEnvelope` bytes fail to unmarshal is silently dropped and never advances `queryCursors`. Because the pagination loop breaks on `int32(len(rows)) < rowsPerEntry` (raw row count), a full page containing even one permanently bad row causes the cursor to stay at the same position, re-fetch the same page, and spin indefinitely — the wave's `done` marker is never emitted and the subscription is stuck in catch-up. Advance `queryCursors` from the raw rows instead, so malformed envelopes are skipped past.
XIP-83 d14n binding — bidirectional
QueryApi.SubscribeImplements the decentralized (d14n) binding of XIP-83 mutable subscription streams on xmtpd. A single long-lived bidirectional stream lets a client mutate its topic set in place (add/remove subscriptions) and keep the connection alive with ping/pong — instead of tearing down and reopening a stream every time group membership changes.
This is the xmtpd counterpart to the node-go v3
MlsApi.Subscribebinding. Same control protocol (Mutate/Started/CatchupComplete/TopicsLive/Ping/Pong); the d14n binding deliversOriginatorEnvelopes and uses per-originator vector cursors.Design: gated async catch-up
advanceTopicCursors). Anything that lands during catch-up is either in the snapshot or arrives live; the dedup drops the overlap.maxActiveSubscribeTopics) — ~16–32MB for a consumer that large, deliberately favored over forcing clients into multiple connections (which would invite rate-limiting and wreck some topologies).Ping; the receiver mustPongthe nonce. No pong within the deadline → the node reaps the vanished peer (e.g. a mobile client the OS suspended behind a proxy that still ACKs the transport).Commits
feat(api): add mutableSubscription handle to subscribeWorker— in-place topic add/remove on the existing subscribe worker, guarded bytopicsMu.feat(api): XIP-83 bidirectional Subscribe handler on QueryApi + xmtpv4 protos— the handler (subscribe.go) + regenerated xmtpv4 protos.test(api): Subscribe handler tests + configurable keepalive interval for tests— 5 race-tested scenarios + a test-only knob to shorten the keepalive cadence.Tests
go test -race ./pkg/api/message— all green:TestSubscribe_CatchUpThenLiveTestSubscribe_MutateRemoveStopsDeliveryTestSubscribe_HalfCloseHistoryOnlyDrainsTestSubscribe_HistoryOnlyOnLiveRejectedTestSubscribe_NoPongIsReapedDependency & merge ordering
Built on xmtp/proto#338 (
QueryApi.Subscribe). The committed.pb.gohere is generated from that branch viadev/gen/protos; once #338 merges, regenerating from protomainproduces byte-identical output.🤖 Generated with Claude Code
Note
Add XIP-83 bidirectional
SubscribeRPC toQueryApiwith mutable topic membership and catch-up paginationSubscribebidi-streaming RPC toQueryApi(subscribe.go) implementing XIP-83: clients can mutate their topic set in-flight, receive catch-up history, then transition to live delivery.mutableSubscription(mutable_subscription.go) to manage a listener whose topic set can be atomically added to or removed from while the worker is running.subscribeWorker.closeListener(subscribe_worker.go) where the closed flag and topic map removal were not properly synchronized, risking sends on a closed channel.topicsMulock in thelistenerstruct (listener.go) is acquired on every add/remove/reap path; incorrect lock ordering could deadlock under high concurrency.Macroscope summarized c8131f9.