feat: XIP-83 bidirectional Subscribe handler (mutable subscriptions + ping/pong liveness)#562
Conversation
| return out, rows.Err() | ||
| } | ||
|
|
||
| const queryWelcomeMessagesBatch = ` |
There was a problem hiding this comment.
🟡 Medium store/readStore.go:424
The SQL query queryWelcomeMessagesBatch duplicates rows when the same installation_key appears multiple times in filters because unnest preserves duplicates and CROSS JOIN LATERAL executes the subquery once per duplicate. The caller in catchUpWelcomes uses counts[installationKey] == catchUpPerGroupLimit to decide pagination is complete, so duplicate keys can make the count exceed the limit on the first page and incorrectly mark the topic as caught up, causing later welcome messages to be dropped silently.
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @pkg/mls/store/readStore.go around line 424:
The SQL query `queryWelcomeMessagesBatch` duplicates rows when the same `installation_key` appears multiple times in `filters` because `unnest` preserves duplicates and `CROSS JOIN LATERAL` executes the subquery once per duplicate. The caller in `catchUpWelcomes` uses `counts[installationKey] == catchUpPerGroupLimit` to decide pagination is complete, so duplicate keys can make the count exceed the limit on the first page and incorrectly mark the topic as caught up, causing later welcome messages to be dropped silently.
| default: | ||
| // Parity with the live dispatch path (worker.go): surface, don't swallow. | ||
| s.log.Error("unknown welcome message type", zap.Int16("message_type", messageType)) | ||
| } |
There was a problem hiding this comment.
🟡 Medium store/readStore.go:510
QueryWelcomeMessagesBatch drops DB rows with unknown message_type values and returns success. The caller in subscribe.go counts returned proto messages to decide whether more pages exist; when unknown rows are dropped, it receives fewer than perGroupLimit results and incorrectly concludes the installation key is fully caught up. This silently truncates catch-up history after the first batch containing an unknown row.
- default:
- // Parity with the live dispatch path (worker.go): surface, don't swallow.
- s.log.Error("unknown welcome message type", zap.Int16("message_type", messageType))
+ default:
+ return nil, fmt.Errorf("unknown welcome message type: %d", messageType)🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @pkg/mls/store/readStore.go around lines 510-513:
`QueryWelcomeMessagesBatch` drops DB rows with unknown `message_type` values and returns success. The caller in `subscribe.go` counts returned proto messages to decide whether more pages exist; when unknown rows are dropped, it receives fewer than `perGroupLimit` results and incorrectly concludes the installation key is fully caught up. This silently truncates catch-up history after the first batch containing an unknown row.
55dfdca to
07a3e27
Compare
07a3e27 to
7e60932
Compare
7e60932 to
77d7b5e
Compare
ApprovabilityVerdict: Needs human review 3 blocking correctness issues found. This PR introduces a significant new feature (XIP-83 bidirectional Subscribe handler) with complex streaming and concurrency logic. The author does not own any of the modified files, and there are multiple unresolved medium-severity review comments identifying potential bugs that could cause silent data loss. You can customize Macroscope's approvability policy. Learn more. |
77d7b5e to
bf5ecd2
Compare
| select { | ||
| case <-ctx.Done(): | ||
| return nil | ||
| case <-s.ctx.Done(): |
There was a problem hiding this comment.
🟢 Low v1/subscribe.go:567
lastActivity is only updated inside send at line 172, so inbound client frames like Ping, Pong, or Mutate that don't trigger a response frame never reset the idle timer. On an otherwise idle stream where the client is actively sending keepalive traffic, the server still emits its own Ping after s.pingInterval and can eventually reap the stream with DeadlineExceeded despite active client communication. Consider updating lastActivity whenever any valid client frame is received.
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @pkg/mls/api/v1/subscribe.go around line 567:
`lastActivity` is only updated inside `send` at line 172, so inbound client frames like `Ping`, `Pong`, or `Mutate` that don't trigger a response frame never reset the idle timer. On an otherwise idle stream where the client is actively sending keepalive traffic, the server still emits its own `Ping` after `s.pingInterval` and can eventually reap the stream with `DeadlineExceeded` despite active client communication. Consider updating `lastActivity` whenever any valid client frame is received.
c462cf9 to
e6588e2
Compare
e6588e2 to
47a33fd
Compare
… ping/pong liveness) Single-writer bidi Subscribe on the v3 MLS API: mutate-in-place subscriptions over kind-prefixed topics (XIP-49) with per-topic catch-up (batched, bounded-pool), live-gate + high-water-mark dedup, TopicsLive markers, mutate_id-correlated per-wave CatchupComplete, history_only bounded catch-up with half-close drain, ping/pong reaping, and O(1) Add/Remove on the dispatcher subscription. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
47a33fd to
ce3f169
Compare
Server side of XIP-83: one bidirectional
Subscribestream on the v3 MLS API that replaces repeated server-streaming subscriptions.Depends on xmtp/proto#337 (regenerated
pkg/proto/mls/api/v1comes from that branch) — draft until it merges.What's in here
pkg/mls/api/v1/subscribe.go): the select loop is the sole owner of all stream state and the sole caller ofstream.Send— no mutexes; catch-up fetchers and the frame reader are pure producers over channels.(*Subscription).Add/Removeare O(1) under the dispatch lock (pkg/subscriptions).unnest(...) CROSS JOIN LATERALqueries with per-group pagination (pkg/mls/store/readStore.go); 2MB frame splitting; 64MB pending-buffer cap.TopicsLiveafter each topic's history (including the drained pending buffer); oneCATCHUP_COMPLETEper adding Mutate (wave), after the wave's last marker.history_onlyMutates never register with the dispatcher; client half-close drains in-flight waves then the server closesOK(no pings post-half-close).Ping(≤30s, resets on traffic), reap on missedPong(DeadlineExceeded), answers client pings.Testing
8 handler tests against real Postgres via a hand-written
MlsApi_SubscribeServerfake: catch-up-then-live no-dupes, mutate-remove, ping/pong keepalive, reap-on-missed-pong, multi-identity multiplexing, TopicsLive boundary ordering, history-only non-delivery, half-close drain. All race-clean across 3 consecutive-raceruns;golangci-lint0 issues.🤖 Generated with Claude Code
Note
Add bidirectional
MlsApi.SubscribegRPC handler with mutable subscriptions and ping/pong livenessSubscribebidirectional streaming RPC inpkg/mls/api/v1/subscribe.go: clients sendMutaterequests to add/remove topics in-place, and the server delivers ordered history (catch-up) followed by live messages per topic.maxFrameBytes), and aborts withResourceExhaustedwhen the pending buffer exceedsmaxPendingBytes.TopicsLivemarkers after catch-up per topic andCatchupCompleteframes echoingmutate_idonce a full wave completes; supportshistory_onlyadds that fetch bounded history without live registration.QueryGroupMessagesBatchandQueryWelcomeMessagesBatchtopkg/mls/store/readStore.gofor single-round-trip batched catch-up queries with per-topic cursors and limits.Subscriptioninpkg/subscriptions/subscription.gowithAdd/Removemethods andSubscriptionDispatcherwithNewSubscriptionto support in-place topic mutation under the dispatcher lock.pongDeadline) or send stall; non-reading clients receiveDeadlineExceededrather than backpressure.Macroscope summarized ce3f169.