diff --git a/README.md b/README.md index be3a3a9..0773f25 100644 --- a/README.md +++ b/README.md @@ -60,7 +60,7 @@ standard library. | [`conditions`](./conditions) | Conditional- and range-request value types (ETag, Range, Conditions). | | [`config`](./config) | Layered override → environment → default settings resolver; non-failing typed getters. | | [`serde`](./serde) | Serialization seam (Marshaler/Unmarshaler) with a JSON default, plus Tristate for PATCH payloads. | -| [`sse`](./sse) | Server-Sent Events (text/event-stream) WHATWG parser. | +| [`sse`](./sse) | Server-Sent Events (text/event-stream) WHATWG parser + reconnecting Stream (Last-Event-ID replay). | | [`webhook`](./webhook) | Inbound webhook signature verification (constant-time HMAC + timestamp tolerance). | | [`formdata`](./formdata) | Multipart/form-data request body builder (replayable; file uploads). | | root [`dexpace`](.) | Umbrella `Client` wiring the default policy stack. | diff --git a/doc.go b/doc.go index d4b9c7b..c812461 100644 --- a/doc.go +++ b/doc.go @@ -57,7 +57,8 @@ // decodes an error body into a typed value. // // The sse package parses Server-Sent Events (text/event-stream) into a -// range-over-func iterator of events. +// range-over-func iterator of events, with a reconnecting Stream that replays the +// Last-Event-ID. // // The webhook package verifies inbound webhook signatures (constant-time HMAC // with a timestamp-tolerance window). diff --git a/docs/superpowers/plans/2026-06-16-sse-reconnect.md b/docs/superpowers/plans/2026-06-16-sse-reconnect.md new file mode 100644 index 0000000..001f6bb --- /dev/null +++ b/docs/superpowers/plans/2026-06-16-sse-reconnect.md @@ -0,0 +1,472 @@ +# SSE Reconnecting Stream Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Add `sse.Stream` — a reconnecting SSE consumer built on `Parse` — with `Last-Event-ID` replay, server-`retry` backoff, and a `ConnectFunc` seam. + +**Architecture:** `Stream` loops: call the caller's `ConnectFunc(ctx, lastEventID)`, parse events until the stream ends (EOF/read error), wait the reconnect delay (honoring `ctx`), then reconnect with the latest event id. A connect error is terminal (yielded). An unexported `wait` seam keeps the retry-backoff wiring deterministically testable. + +**Tech Stack:** Go 1.26+, standard library only (`context`, `io`, `iter`, `time`). Zero third-party dependencies. + +**Conventions every task must follow:** +- MIT license header on every `.go` file before the `package` clause. +- Import groups: stdlib only here. +- Tests: external tests are `package sse_test` (`t.Parallel()`); the internal retry test is `package sse`. +- Tools: Go 1.26.3; `gofumpt`/`golangci-lint` NOT installed — use `gofmt`, `go vet`, `go test -race`. +- Run commands from the repo root `/Users/omar/dexpace/go-sdk`. + +--- + +## File Structure + +| Path | Responsibility | +|---|---| +| `sse/stream.go` (new) | `ConnectFunc`, `StreamOption`, `WithReconnectDelay`, `Stream`, unexported `withWait`/`realWait` | +| `sse/stream_test.go` (new, `sse_test`) | reconnect, Last-Event-ID, cancel, connect-error, consumer-break | +| `sse/stream_internal_test.go` (new, `sse`) | retry-overrides-delay via injected wait | +| `sse/doc.go` (modify) | note the reconnecting layer now exists | +| `doc.go`, `README.md` (modify) | update the sse description | + +--- + +## Task 1: `Stream` and reconnection + +**Files:** +- Create: `sse/stream.go`, `sse/stream_test.go`, `sse/stream_internal_test.go` + +- [ ] **Step 1: Write the failing tests** + +```go +// sse/stream_test.go +// Copyright (c) 2026 dexpace and Omar Aljarrah. +// Licensed under the MIT License. See LICENSE in the repository root for details. + +package sse_test + +import ( + "context" + "errors" + "io" + "strings" + "testing" + + "github.com/dexpace/go-sdk/sse" +) + +func nopBody(s string) io.ReadCloser { return io.NopCloser(strings.NewReader(s)) } + +func TestStreamReconnectsAndFlattens(t *testing.T) { + t.Parallel() + + calls := 0 + connect := func(_ context.Context, _ string) (io.ReadCloser, error) { + calls++ + switch calls { + case 1: + return nopBody("data: a\n\ndata: b\n\n"), nil + case 2: + return nopBody("data: c\n\n"), nil + default: + return nil, errors.New("stop") + } + } + + var data []string + var gotErr error + for ev, err := range sse.Stream(context.Background(), connect, sse.WithReconnectDelay(0)) { + if err != nil { + gotErr = err + break + } + data = append(data, ev.Data) + } + + if strings.Join(data, ",") != "a,b,c" { + t.Fatalf("data = %v, want [a b c]", data) + } + if gotErr == nil || gotErr.Error() != "stop" { + t.Fatalf("err = %v, want stop", gotErr) + } +} + +func TestStreamReplaysLastEventID(t *testing.T) { + t.Parallel() + + var seenIDs []string + calls := 0 + connect := func(_ context.Context, lastID string) (io.ReadCloser, error) { + seenIDs = append(seenIDs, lastID) + calls++ + if calls == 1 { + return nopBody("id: 42\ndata: a\n\n"), nil + } + return nil, errors.New("stop") + } + + for _, err := range sse.Stream(context.Background(), connect, sse.WithReconnectDelay(0)) { + if err != nil { + break + } + } + + if len(seenIDs) != 2 || seenIDs[0] != "" || seenIDs[1] != "42" { + t.Fatalf("connect lastIDs = %v, want [\"\" \"42\"]", seenIDs) + } +} + +func TestStreamCancellationStops(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + calls := 0 + connect := func(_ context.Context, _ string) (io.ReadCloser, error) { + calls++ + if calls == 1 { + return nopBody("data: a\n\ndata: b\n\n"), nil + } + return nil, errors.New("should not reconnect after cancel") + } + + var data []string + for ev, err := range sse.Stream(ctx, connect, sse.WithReconnectDelay(0)) { + if err != nil { + t.Fatalf("unexpected error (reconnected after cancel?): %v", err) + } + data = append(data, ev.Data) + if ev.Data == "b" { + cancel() + } + } + + if calls != 1 { + t.Fatalf("connect called %d times, want 1 (no reconnect after cancel)", calls) + } + if strings.Join(data, ",") != "a,b" { + t.Fatalf("data = %v, want [a b]", data) + } +} + +func TestStreamConnectErrorIsTerminal(t *testing.T) { + t.Parallel() + + boom := errors.New("dial failed") + connect := func(_ context.Context, _ string) (io.ReadCloser, error) { + return nil, boom + } + + var events, errs int + for _, err := range sse.Stream(context.Background(), connect, sse.WithReconnectDelay(0)) { + if err != nil { + if !errors.Is(err, boom) { + t.Fatalf("err = %v, want boom", err) + } + errs++ + break + } + events++ + } + if events != 0 || errs != 1 { + t.Fatalf("events=%d errs=%d, want 0 and 1", events, errs) + } +} + +type closeRecorder struct { + io.Reader + closed bool +} + +func (c *closeRecorder) Close() error { c.closed = true; return nil } + +func TestStreamConsumerBreakClosesReader(t *testing.T) { + t.Parallel() + + rec := &closeRecorder{Reader: strings.NewReader("data: a\n\ndata: b\n\n")} + calls := 0 + connect := func(_ context.Context, _ string) (io.ReadCloser, error) { + calls++ + return rec, nil + } + + for ev := range func(yield func(sse.Event) bool) { + for e, err := range sse.Stream(context.Background(), connect, sse.WithReconnectDelay(0)) { + if err != nil { + return + } + if !yield(e) { + return + } + } + } { + _ = ev + break // stop after the first event + } + + if !rec.closed { + t.Fatal("reader was not closed on consumer break") + } + if calls != 1 { + t.Fatalf("connect called %d times, want 1", calls) + } +} +``` + +```go +// sse/stream_internal_test.go +// Copyright (c) 2026 dexpace and Omar Aljarrah. +// Licensed under the MIT License. See LICENSE in the repository root for details. + +package sse + +import ( + "context" + "errors" + "io" + "strings" + "testing" + "time" +) + +func TestStreamRetryOverridesDelay(t *testing.T) { + var recorded []time.Duration + wait := func(ctx context.Context, d time.Duration) bool { + recorded = append(recorded, d) + return ctx.Err() == nil + } + + calls := 0 + connect := func(_ context.Context, _ string) (io.ReadCloser, error) { + calls++ + if calls == 1 { + return io.NopCloser(strings.NewReader("retry: 2000\ndata: a\n\n")), nil + } + return nil, errors.New("stop") + } + + var gotErr error + for _, err := range Stream(context.Background(), connect, + WithReconnectDelay(time.Hour), withWait(wait)) { + if err != nil { + gotErr = err + break + } + } + + if gotErr == nil { + t.Fatal("expected the stop error") + } + if len(recorded) != 1 || recorded[0] != 2000*time.Millisecond { + t.Fatalf("recorded wait delays = %v, want [2s] (retry overrode the hour default)", recorded) + } +} +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `go test ./sse/ -run Stream -v` +Expected: FAIL — `sse.Stream`/`ConnectFunc`/`WithReconnectDelay`/`withWait` undefined. + +- [ ] **Step 3: Create `sse/stream.go`** + +```go +// sse/stream.go +// Copyright (c) 2026 dexpace and Omar Aljarrah. +// Licensed under the MIT License. See LICENSE in the repository root for details. + +package sse + +import ( + "context" + "io" + "iter" + "time" +) + +// defaultReconnectDelay is the wait between reconnects unless overridden by an +// option or a server retry value. +const defaultReconnectDelay = 3 * time.Second + +// ConnectFunc opens a fresh event-stream connection. It receives the most recent +// event id (empty on the first connect) so the server can resume via the +// Last-Event-ID request header. Stream closes the returned reader when the +// connection ends. +type ConnectFunc func(ctx context.Context, lastEventID string) (io.ReadCloser, error) + +// StreamOption configures [Stream]. +type StreamOption func(*streamConfig) + +type streamConfig struct { + delay time.Duration + wait func(ctx context.Context, d time.Duration) bool +} + +// WithReconnectDelay sets the wait between reconnects. It defaults to three +// seconds and is overridden by any server-sent retry value. A delay <= 0 +// reconnects immediately. +func WithReconnectDelay(d time.Duration) StreamOption { + return func(c *streamConfig) { c.delay = d } +} + +// withWait injects the reconnect-wait function. Test seam. +func withWait(fn func(ctx context.Context, d time.Duration) bool) StreamOption { + return func(c *streamConfig) { c.wait = fn } +} + +// Stream yields events from a reconnecting SSE source. It calls connect, parses +// events until the stream ends (EOF or a read error), waits the reconnection +// delay, then reconnects with the most recent event id. A connect error is +// delivered as the iterator error and ends the stream; cancel ctx to stop. The +// iterator is single-pass. +func Stream(ctx context.Context, connect ConnectFunc, opts ...StreamOption) iter.Seq2[Event, error] { + cfg := streamConfig{delay: defaultReconnectDelay, wait: realWait} + for _, opt := range opts { + opt(&cfg) + } + + return func(yield func(Event, error) bool) { + lastID := "" + delay := cfg.delay + for { + if ctx.Err() != nil { + return + } + rc, err := connect(ctx, lastID) + if err != nil { + yield(Event{}, err) + return + } + + stopped := false + for ev, perr := range Parse(rc) { + if perr != nil { + break // mid-stream read error: reconnect transparently + } + if ev.ID != "" { + lastID = ev.ID + } + if ev.Retry > 0 { + delay = ev.Retry + } + if !yield(ev, nil) { + stopped = true + break + } + } + _ = rc.Close() + + if stopped { + return + } + if !cfg.wait(ctx, delay) { + return + } + } + } +} + +// realWait waits for d, returning false if ctx is (or becomes) done. A +// non-positive d returns true immediately, after the context check. +func realWait(ctx context.Context, d time.Duration) bool { + if ctx.Err() != nil { + return false + } + if d <= 0 { + return true + } + t := time.NewTimer(d) + defer t.Stop() + select { + case <-ctx.Done(): + return false + case <-t.C: + return true + } +} +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `go test -race ./sse/ -v` +Expected: PASS — all Stream tests (external + internal) plus the existing Parse tests. + +- [ ] **Step 5: Update `sse/doc.go`** + +Read `sse/doc.go`. Update the package comment to reflect that the reconnecting +layer now exists — replace the "intentionally left to the caller and a future +addition" clause with a sentence pointing at `Stream`: + +```go +// Package sse parses a text/event-stream (Server-Sent Events) per the WHATWG +// algorithm, yielding each dispatched [Event] through [Parse] as a range-over-func +// iterator. [Stream] adds a reconnecting consumer over a caller-supplied +// connection, replaying the Last-Event-ID and honoring the server's retry backoff. +package sse +``` + +- [ ] **Step 6: Commit** + +```bash +git add sse/stream.go sse/stream_test.go sse/stream_internal_test.go sse/doc.go +git commit -m "feat(sse): add reconnecting Stream with Last-Event-ID replay" +``` + +--- + +## Task 2: docs and full gate + +**Files:** +- Modify: `doc.go`, `README.md` + +- [ ] **Step 1: Update `doc.go`** + +Read `doc.go`. The `package dexpace` comment mentions the sse parser; extend that +sentence (within the single contiguous `//` block) to: + +```go +// The sse package parses Server-Sent Events (text/event-stream) into a +// range-over-func iterator of events, with a reconnecting Stream that replays the +// Last-Event-ID. +``` +(Replace the existing sse sentence; do not add a second package clause.) + +- [ ] **Step 2: Update `README.md`** + +Read `README.md`. Update the `sse` row's description to mention the reconnecting +stream: "Server-Sent Events (text/event-stream) WHATWG parser + reconnecting +Stream (Last-Event-ID replay)." + +- [ ] **Step 3: Run the full gate** + +Run: +```bash +gofmt -l . +go vet ./... +go test -race ./... +``` +Expected: `gofmt -l .` prints nothing; `go vet` clean; every package passes under +the race detector. + +- [ ] **Step 4: Commit** + +```bash +git add doc.go README.md +git commit -m "docs: document the sse reconnecting Stream" +``` + +--- + +## Self-Review notes (for the implementer) + +- **Spec coverage:** `Stream` + `ConnectFunc` + `WithReconnectDelay` + reconnect + loop with Last-Event-ID replay and retry backoff (Task 1); docs (Task 2). +- **Type consistency:** `sse.ConnectFunc`, `sse.StreamOption`, + `sse.WithReconnectDelay`, `sse.Stream`, and the unexported `withWait`/`realWait` + used identically across tasks/tests. +- **Deterministic tests:** the internal `withWait` recorder asserts the retry + override without real sleeps; external tests use `WithReconnectDelay(0)` and a + call-counting `connect`. +- **Resource safety:** the connection reader is closed on every path (consumer + stop, reconnect); a terminal connect error has no reader to close. +- **Cancellation:** `realWait` checks `ctx` before the zero-delay shortcut, and the + loop top checks `ctx.Err()`, so a canceled context never triggers a spurious + reconnect. +- **`make check`** green before opening the PR. diff --git a/docs/superpowers/specs/2026-06-16-sse-reconnect-design.md b/docs/superpowers/specs/2026-06-16-sse-reconnect-design.md new file mode 100644 index 0000000..b1f8143 --- /dev/null +++ b/docs/superpowers/specs/2026-06-16-sse-reconnect-design.md @@ -0,0 +1,128 @@ +# SSE reconnecting stream — design + +**Date:** 2026-06-16 +**Status:** Approved (standing delegation); ready for implementation planning +**Subsystem:** deferred-feature #2 (the reconnecting connection from the SSE roadmap item) + +## Context + +The `sse` package parses an event stream from an `io.Reader`. The deferred +resilience layer — auto-reconnect with `Last-Event-ID` replay and server `retry` +backoff — completes parity with the Java/Python SDKs. This adds it as +`sse.Stream`, built on the existing `Parse`. + +## Decisions + +1. **Caller supplies the connection.** A `ConnectFunc(ctx, lastEventID)` returns a + fresh `io.ReadCloser` event stream. This keeps `sse` decoupled from + `dexpace.Client` (the caller wires whatever HTTP they use) and makes + `Last-Event-ID` replay explicit. +2. **Transparent reconnect on stream end.** When a connected stream ends (EOF or a + mid-stream read error), `Stream` waits the reconnection delay and reconnects + with the most recent event id. A **connect** error is terminal (yielded), so a + persistent inability to connect does not loop forever. +3. **Server-driven backoff.** The delay starts at a configurable default + (`WithReconnectDelay`, default 3s) and is updated by any `retry` field the + stream sends. +4. **Deterministic and cancelable.** The iterator is pull-based; the reconnect + wait checks `ctx` first (so a canceled context stops without a spurious + reconnect) and `WithReconnectDelay(0)` makes tests fast. + +## Architecture + +### `sse.Stream` (added to the `sse` package) + +```go +// ConnectFunc opens a fresh event-stream connection. It receives the most recent +// event id (empty on the first connect) so the server can resume via the +// Last-Event-ID request header. The returned reader is closed by Stream when the +// connection ends. +type ConnectFunc func(ctx context.Context, lastEventID string) (io.ReadCloser, error) + +// StreamOption configures Stream. +type StreamOption func(*streamConfig) + +// WithReconnectDelay sets the wait between reconnects. It defaults to three +// seconds and is overridden by any server-sent retry value. A delay <= 0 +// reconnects immediately (useful in tests). +func WithReconnectDelay(d time.Duration) StreamOption + +// Stream yields events from a reconnecting SSE source. It calls connect, parses +// events until the stream ends (EOF or a read error), waits the reconnection +// delay, then reconnects with the most recent event id. A connect error is +// delivered as the iterator error and ends the stream; cancel ctx to stop. The +// iterator is single-pass. +func Stream(ctx context.Context, connect ConnectFunc, opts ...StreamOption) iter.Seq2[Event, error] +``` + +### Behaviour + +- Track `lastID` (carried across reconnects, sticky) and `delay` + (default → updated by `retry`). +- Loop: + 1. If `ctx` is done, stop. + 2. `rc, err := connect(ctx, lastID)`. On error → `yield(Event{}, err)` and stop. + 3. For each `(ev, perr)` from `Parse(rc)`: + - `perr != nil` (mid-stream read error) → close `rc`, break to reconnect + (the error is not surfaced; reconnection is the SSE contract). + - else: if `ev.ID != ""` set `lastID`; if `ev.Retry > 0` set `delay`; + `yield(ev, nil)`. If the consumer stops (`yield` returns false) → close `rc` + and stop. + 4. The stream ended (EOF or read error). Close `rc`. Wait `delay`, honoring + `ctx`; if `ctx` is canceled during the wait, stop. Otherwise reconnect. +- The wait helper returns immediately false if `ctx` is already done (so a + canceled context never triggers another connect), returns true for a + non-positive delay, otherwise selects on a timer vs `ctx.Done()`. + +## Edge cases + +- A connect error on the **first** attempt → yielded immediately, stream ends. +- An empty stream (connect returns a reader that EOFs with no events) → reconnect + after the delay (a heartbeat-less keep-open). +- The consumer breaking out of the range stops fetching and closes the current + reader (no further connects). +- `Last-Event-ID` replay: the id of the most recent dispatched event is passed to + the next `connect`; an event without an id keeps the prior id (sticky, from + `Parse`). +- A server `retry` value updates the delay for all subsequent reconnects. +- `WithReconnectDelay(0)` reconnects with no wait but still stops on a canceled + context (the wait helper checks `ctx` before the zero-delay shortcut). +- Each connection's reader is always closed (on consumer stop, on reconnect, on + terminal connect error there is no reader to close). + +## Package layout + +| Path | Change | +|---|---| +| `sse/stream.go` (new) | `ConnectFunc`, `StreamOption`, `WithReconnectDelay`, `Stream`, unexported `withWait` + `realWait` | +| `sse/stream_test.go` (new, `sse_test`) | reconnect, Last-Event-ID, cancel, connect-error, consumer-break tests | +| `sse/stream_internal_test.go` (new, `sse`) | retry-overrides-delay via injected `withWait` recorder | +| `sse/doc.go` (modify) | note that `Stream` now provides the reconnecting layer | +| `doc.go`, `README.md` | update the sse description | + +## Testing + +- Reconnect + flatten: `connect` returns successive in-memory streams (events + then EOF), with `WithReconnectDelay(0)`; the third connect returns an error to + stop. Assert all events arrive in order, then the connect error. +- `Last-Event-ID` replay: events carry ids; assert the id passed to the second + `connect` equals the last dispatched id from the first stream. +- Retry backoff (deterministic, internal test): an unexported `withWait(fn)` + option injects a recording wait. A stream sends `retry: 2000` then EOF; the + next connect returns a stop-error. The internal test asserts the recorded wait + delay equals 2000ms (the server `retry` overrode the default). The wait seam is + unexported (no public API addition) and used only by `package sse` tests. +- Cancellation: cancel `ctx` after consuming the first stream's events; assert no + further `connect` call and the iterator ends. +- Connect error first attempt → yielded, no events. +- Consumer break → only the consumed events fetched; the reader is closed + (a reader that records Close). +- Table-driven where natural, parallel where no shared env; stdlib-only; `gofmt`/ + `go vet`/`go test -race` clean. + +## Out of scope (deferred) + +- A `dexpace.Client`-integrated SSE client (the `ConnectFunc` seam lets callers + wire the client themselves). +- Exponential backoff / jitter on repeated connect failures (connect errors are + terminal here; add a retry-on-connect policy later if needed). diff --git a/sse/doc.go b/sse/doc.go index 3cd8be1..afb5209 100644 --- a/sse/doc.go +++ b/sse/doc.go @@ -3,7 +3,6 @@ // Package sse parses a text/event-stream (Server-Sent Events) per the WHATWG // algorithm, yielding each dispatched [Event] through [Parse] as a range-over-func -// iterator. It operates on any io.Reader; a reconnecting connection -// (Last-Event-ID replay, server retry backoff) is intentionally left to the -// caller and a future addition. +// iterator. [Stream] adds a reconnecting consumer over a caller-supplied +// connection, replaying the Last-Event-ID and honoring the server's retry backoff. package sse diff --git a/sse/stream.go b/sse/stream.go new file mode 100644 index 0000000..d748d91 --- /dev/null +++ b/sse/stream.go @@ -0,0 +1,116 @@ +// Copyright (c) 2026 dexpace and Omar Aljarrah. +// Licensed under the MIT License. See LICENSE in the repository root for details. + +package sse + +import ( + "context" + "io" + "iter" + "time" +) + +// defaultReconnectDelay is the wait between reconnects unless overridden by an +// option or a server retry value. +const defaultReconnectDelay = 3 * time.Second + +// ConnectFunc opens a fresh event-stream connection. It receives the most recent +// event id (empty on the first connect) so the server can resume via the +// Last-Event-ID request header. Stream closes the returned reader when the +// connection ends. +type ConnectFunc func(ctx context.Context, lastEventID string) (io.ReadCloser, error) + +// StreamOption configures [Stream]. +type StreamOption func(*streamConfig) + +type streamConfig struct { + delay time.Duration + wait func(ctx context.Context, d time.Duration) bool +} + +// WithReconnectDelay sets the initial wait between reconnects (default three +// seconds). A server-sent retry value overrides it for all subsequent reconnects, +// matching browser EventSource semantics. A delay <= 0 reconnects immediately. +func WithReconnectDelay(d time.Duration) StreamOption { + return func(c *streamConfig) { c.delay = d } +} + +// withWait injects the reconnect-wait function. Test seam. +func withWait(fn func(ctx context.Context, d time.Duration) bool) StreamOption { + return func(c *streamConfig) { c.wait = fn } +} + +// Stream yields events from a reconnecting SSE source. It calls connect, parses +// events until the stream ends, waits the reconnection delay, then reconnects +// with the most recent event id (so the server can resume via Last-Event-ID). +// +// A mid-stream read error (including an unexpected EOF or a line exceeding the +// parser's buffer) triggers a transparent reconnect rather than surfacing the +// error; the only error yielded to the caller is a connect error, which is +// terminal. Cancel ctx to stop reconnecting on a persistently broken stream. The +// iterator is single-pass. +func Stream(ctx context.Context, connect ConnectFunc, opts ...StreamOption) iter.Seq2[Event, error] { + cfg := streamConfig{delay: defaultReconnectDelay, wait: realWait} + for _, opt := range opts { + opt(&cfg) + } + + return func(yield func(Event, error) bool) { + lastID := "" + delay := cfg.delay + for { + if ctx.Err() != nil { + return + } + rc, err := connect(ctx, lastID) + if err != nil { + yield(Event{}, err) + return + } + + stopped := false + for ev, perr := range Parse(rc) { + if perr != nil { + break // mid-stream read error: reconnect transparently + } + if ev.ID != "" { + lastID = ev.ID + } + if ev.Retry > 0 { + delay = ev.Retry + } + if !yield(ev, nil) { + stopped = true + break + } + } + _ = rc.Close() + + if stopped { + return + } + if !cfg.wait(ctx, delay) { + return + } + } + } +} + +// realWait waits for d, returning false if ctx is (or becomes) done. A +// non-positive d returns true immediately, after the context check. +func realWait(ctx context.Context, d time.Duration) bool { + if ctx.Err() != nil { + return false + } + if d <= 0 { + return true + } + t := time.NewTimer(d) + defer t.Stop() + select { + case <-ctx.Done(): + return false + case <-t.C: + return true + } +} diff --git a/sse/stream_internal_test.go b/sse/stream_internal_test.go new file mode 100644 index 0000000..5f82805 --- /dev/null +++ b/sse/stream_internal_test.go @@ -0,0 +1,48 @@ +// Copyright (c) 2026 dexpace and Omar Aljarrah. +// Licensed under the MIT License. See LICENSE in the repository root for details. + +package sse + +import ( + "context" + "errors" + "io" + "strings" + "testing" + "time" +) + +func TestStreamRetryOverridesDelay(t *testing.T) { + t.Parallel() + + var recorded []time.Duration + wait := func(ctx context.Context, d time.Duration) bool { + recorded = append(recorded, d) + return ctx.Err() == nil + } + + calls := 0 + connect := func(_ context.Context, _ string) (io.ReadCloser, error) { + calls++ + if calls == 1 { + return io.NopCloser(strings.NewReader("retry: 2000\ndata: a\n\n")), nil + } + return nil, errors.New("stop") + } + + var gotErr error + for _, err := range Stream(context.Background(), connect, + WithReconnectDelay(time.Hour), withWait(wait)) { + if err != nil { + gotErr = err + break + } + } + + if gotErr == nil { + t.Fatal("expected the stop error") + } + if len(recorded) != 1 || recorded[0] != 2000*time.Millisecond { + t.Fatalf("recorded wait delays = %v, want [2s] (retry overrode the hour default)", recorded) + } +} diff --git a/sse/stream_test.go b/sse/stream_test.go new file mode 100644 index 0000000..2aa9f01 --- /dev/null +++ b/sse/stream_test.go @@ -0,0 +1,209 @@ +// Copyright (c) 2026 dexpace and Omar Aljarrah. +// Licensed under the MIT License. See LICENSE in the repository root for details. + +package sse_test + +import ( + "context" + "errors" + "io" + "strings" + "testing" + + "github.com/dexpace/go-sdk/sse" +) + +func nopBody(s string) io.ReadCloser { return io.NopCloser(strings.NewReader(s)) } + +func TestStreamReconnectsAndFlattens(t *testing.T) { + t.Parallel() + + calls := 0 + connect := func(_ context.Context, _ string) (io.ReadCloser, error) { + calls++ + switch calls { + case 1: + return nopBody("data: a\n\ndata: b\n\n"), nil + case 2: + return nopBody("data: c\n\n"), nil + default: + return nil, errors.New("stop") + } + } + + var data []string + var gotErr error + for ev, err := range sse.Stream(context.Background(), connect, sse.WithReconnectDelay(0)) { + if err != nil { + gotErr = err + break + } + data = append(data, ev.Data) + } + + if strings.Join(data, ",") != "a,b,c" { + t.Fatalf("data = %v, want [a b c]", data) + } + if gotErr == nil || gotErr.Error() != "stop" { + t.Fatalf("err = %v, want stop", gotErr) + } +} + +func TestStreamReplaysLastEventID(t *testing.T) { + t.Parallel() + + var seenIDs []string + calls := 0 + connect := func(_ context.Context, lastID string) (io.ReadCloser, error) { + seenIDs = append(seenIDs, lastID) + calls++ + if calls == 1 { + return nopBody("id: 42\ndata: a\n\n"), nil + } + return nil, errors.New("stop") + } + + for _, err := range sse.Stream(context.Background(), connect, sse.WithReconnectDelay(0)) { + if err != nil { + break + } + } + + if len(seenIDs) != 2 || seenIDs[0] != "" || seenIDs[1] != "42" { + t.Fatalf("connect lastIDs = %v, want [\"\" \"42\"]", seenIDs) + } +} + +func TestStreamCancellationStops(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + calls := 0 + connect := func(_ context.Context, _ string) (io.ReadCloser, error) { + calls++ + if calls == 1 { + return nopBody("data: a\n\ndata: b\n\n"), nil + } + return nil, errors.New("should not reconnect after cancel") + } + + var data []string + for ev, err := range sse.Stream(ctx, connect, sse.WithReconnectDelay(0)) { + if err != nil { + t.Fatalf("unexpected error (reconnected after cancel?): %v", err) + } + data = append(data, ev.Data) + if ev.Data == "b" { + cancel() + } + } + + if calls != 1 { + t.Fatalf("connect called %d times, want 1 (no reconnect after cancel)", calls) + } + if strings.Join(data, ",") != "a,b" { + t.Fatalf("data = %v, want [a b]", data) + } +} + +func TestStreamConnectErrorIsTerminal(t *testing.T) { + t.Parallel() + + boom := errors.New("dial failed") + connect := func(_ context.Context, _ string) (io.ReadCloser, error) { + return nil, boom + } + + var events, errs int + for _, err := range sse.Stream(context.Background(), connect, sse.WithReconnectDelay(0)) { + if err != nil { + if !errors.Is(err, boom) { + t.Fatalf("err = %v, want boom", err) + } + errs++ + break + } + events++ + } + if events != 0 || errs != 1 { + t.Fatalf("events=%d errs=%d, want 0 and 1", events, errs) + } +} + +type closeRecorder struct { + io.Reader + closed bool +} + +func (c *closeRecorder) Close() error { c.closed = true; return nil } + +type erroringReader struct{ err error } + +func (r *erroringReader) Read([]byte) (int, error) { return 0, r.err } + +func TestStreamReconnectsOnMidStreamReadError(t *testing.T) { + t.Parallel() + + readErr := errors.New("connection reset") + calls := 0 + connect := func(_ context.Context, _ string) (io.ReadCloser, error) { + calls++ + switch calls { + case 1: + // One event, then a non-EOF read error mid-stream. + return io.NopCloser(io.MultiReader( + strings.NewReader("data: a\n\n"), + &erroringReader{err: readErr}, + )), nil + case 2: + return nopBody("data: b\n\n"), nil + default: + return nil, errors.New("stop") + } + } + + var data []string + var gotErr error + for ev, err := range sse.Stream(context.Background(), connect, sse.WithReconnectDelay(0)) { + if err != nil { + gotErr = err + break + } + data = append(data, ev.Data) + } + + if strings.Join(data, ",") != "a,b" { + t.Fatalf("data = %v, want [a b] (mid-stream read error should reconnect transparently)", data) + } + // The read error must NOT be surfaced; only the terminal connect error is. + if gotErr == nil || gotErr.Error() != "stop" { + t.Fatalf("err = %v, want \"stop\" (read error not surfaced)", gotErr) + } +} + +func TestStreamConsumerBreakClosesReader(t *testing.T) { + t.Parallel() + + rec := &closeRecorder{Reader: strings.NewReader("data: a\n\ndata: b\n\n")} + calls := 0 + connect := func(_ context.Context, _ string) (io.ReadCloser, error) { + calls++ + return rec, nil + } + + for ev, err := range sse.Stream(context.Background(), connect, sse.WithReconnectDelay(0)) { + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + _ = ev + break // stop after the first event + } + + if !rec.closed { + t.Fatal("reader was not closed on consumer break") + } + if calls != 1 { + t.Fatalf("connect called %d times, want 1", calls) + } +}