SSE: reconnecting Stream with Last-Event-ID replay#13
Merged
Conversation
Stream over a caller ConnectFunc with Last-Event-ID replay, server-retry backoff, and deterministic cancel/reconnect via an injectable wait seam. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Completes the SSE subsystem with
sse.Stream, a reconnecting consumer built onthe existing
Parse. It auto-reconnects on stream end, replays theLast-Event-ID, and honors the server'sretrybackoff.What's included
ConnectFunc(ctx, lastEventID)— the caller opens a fresh event-stream(receiving the most recent event id to resume via
Last-Event-ID). This keepsssedecoupled from any particular HTTP client.Stream(ctx, connect, opts...) iter.Seq2[Event, error]— loops: connect →parse events → on stream end (EOF or a mid-stream read error) wait the
reconnection delay → reconnect with the latest event id. A connect error is
terminal (yielded); a mid-stream read error reconnects transparently. Cancel
ctxto stop.WithReconnectDelay(d)— initial delay (default 3s), overridden by a serverretryvalue for subsequent reconnects (EventSource semantics);<= 0reconnects immediately.
Background
This was the reconnection layer deferred from the SSE subsystem. The reconnect
wait is behind an unexported test seam so the retry-backoff wiring is verified
deterministically (no real sleeps); cancellation is checked before the wait so a
canceled context never triggers a spurious reconnect.
Behavior changes
None — additive to the
ssepackage.Test plan
go build ./...go vet ./...gofmt -l .(clean)go test -race ./...(all packages pass; SSE run repeated with-count=3, no flakes)Last-Event-IDreplayed tothe next connect; mid-stream read error reconnects transparently (error not
surfaced); connect error is terminal; cancellation stops with no further
connect; consumer break closes the reader; a server
retryoverrides the delay(verified via the injected wait recorder).
🤖 Generated with Claude Code