Skip to content

feat: rewrite durablestream store using conformance-tested client#48

Merged
jilio merged 4 commits into
mainfrom
jilio/durablestream-conformant
Dec 21, 2025
Merged

feat: rewrite durablestream store using conformance-tested client#48
jilio merged 4 commits into
mainfrom
jilio/durablestream-conformant

Conversation

@jilio
Copy link
Copy Markdown
Owner

@jilio jilio commented Dec 21, 2025

Summary

Replace our custom HTTP client implementation with ahimsalabs/durable-streams-go - a conformance-tested Go client library that passes the official durable-streams test suite.

Why this change?

  • Our previous implementation was not fully compatible with the durable-streams protocol
  • The ahimsalabs library is conformance-tested against the reference implementation
  • Ensures we work correctly with any durable-streams server

Changes

  • Create standalone module with go.mod (like sqlite store)
  • Use ahimsalabs/durable-streams-go for protocol compliance
  • Remove custom client.go and export_test.go
  • Simplify options (remove retry config, handled by client library)
  • Update API: New(baseURL, streamPath, opts...) for clearer separation
  • Maintain 100% test coverage using ahimsalabs in-memory handler
  • Update README and PERSISTENCE docs with new API

Breaking Changes

  • New() signature changed from New(streamURL) to New(baseURL, streamPath)
  • WithRetry() option removed (client handles retries internally)

Test plan

  • All tests pass with 100% coverage
  • Tests use ahimsalabs in-memory handler for protocol-correct testing
  • Manual testing against reference durable-streams server (optional)

🤖 Generated with Claude Code

jilio and others added 4 commits December 21, 2025 20:38
- Add 🌍 Remote storage feature to README
- Add Remote Storage section with durable-streams example
- Add Storage Backends table documenting all available options
- Update StoredEvent structure to use Offset type instead of Position
- Add SQLite and Durable-Streams store usage examples
- Update EventStore interface to new Append/Read signatures
- Add SubscriptionStore and EventStoreStreamer interfaces
- Update all replay examples to use eventbus.OffsetOldest

Closes #42

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add reference to Electric's announcement blog post explaining the
Durable Streams protocol specification.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Update link to point to the official durable-streams organization
repository instead of the fork.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Replace custom HTTP client with ahimsalabs/durable-streams-go - a
conformance-tested Go client library that passes the official
durable-streams test suite.

Changes:
- Create standalone module with go.mod (like sqlite store)
- Use ahimsalabs/durable-streams-go for protocol compliance
- Remove custom client.go and export_test.go
- Simplify options (remove retry config, handled by client library)
- Update API: New(baseURL, streamPath, opts...) for clearer separation
- Maintain 100% test coverage using ahimsalabs in-memory handler
- Compatible with any durable-streams server implementation
- Update README and PERSISTENCE docs with new API

Breaking changes:
- New() signature changed from New(streamURL) to New(baseURL, streamPath)
- WithRetry() option removed (client handles retries internally)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@claude
Copy link
Copy Markdown

claude Bot commented Dec 21, 2025

Code Review: Durable-Streams Store Rewrite

Summary

This PR replaces the custom durable-streams HTTP client with the conformance-tested ahimsalabs/durable-streams-go library. This is a significant improvement that ensures protocol compliance and reduces maintenance burden.


✅ Strengths

1. Excellent Architectural Decision

  • Moving to a conformance-tested client library is the right call
  • Reduces code complexity (removed 171 lines of custom client code)
  • Better protocol compliance guaranteed by the upstream library
  • Standalone module structure matches the sqlite store pattern

2. Comprehensive Test Coverage

  • Tests cover all major paths: success cases, error handling, edge cases
  • Good use of the ahimsalabs in-memory handler for testing
  • Tests for malformed events, invalid JSON, context cancellation
  • Tests for both embedded and synthetic offsets

3. Clean API Design

  • New(baseURL, streamPath, opts...) provides clear separation
  • NewWithContext allows cancellable initialization
  • Options pattern is well-implemented with sensible defaults
  • Good documentation and comments

4. Error Handling

  • Proper error wrapping with context
  • Validates required parameters (baseURL, streamPath)
  • Handles malformed events gracefully with optional logging
  • Context cancellation is respected throughout

🔍 Code Quality Observations

1. Synthetic Offsets Pattern (stores/durablestream/store.go:168-176)

The synthetic offset generation is clever but has a critical limitation that's properly documented:

// Synthesize unique offset: "nextOffset/index"
// These are ephemeral - use nextOffset for reliable resumption
eventOffset = eventbus.Offset(fmt.Sprintf("%s/%d", result.NextOffset, i))

Observation: This is well-documented in comments, but the limitation is significant:

  • Synthetic offsets containing "/" are not stable across Read calls
  • Only works for single-batch reads
  • Users MUST use the returned nextOffset for resumption, not individual event offsets

Recommendation: Consider adding a runtime warning or panic if synthetic offsets are used with SubscribeWithReplay, as this could lead to duplicate event processing. The logger helps but might be missed.

2. Missing SubscriptionStore Implementation

The store implements EventStore but not SubscriptionStore. This means:

  • ✅ Can be used with eventbus.New(eventbus.WithStore(store))
  • ❌ Cannot track subscription offsets for resumable subscriptions
  • SubscribeWithReplay won't work without a separate subscription store

Question: Is this intentional? The PR description mentions "Maintain 100% test coverage" but doesn't address subscription tracking. Should this store also implement SubscriptionStore, or is it expected that users combine it with another store for subscription tracking?

3. Timestamp Handling (stores/durablestream/store.go:199-210)

The parseTimestamp function silently returns zero time on errors:

func parseTimestamp(s string) time.Time {
    if s == "" {
        return time.Time{}
    }
    t, err := time.Parse(time.RFC3339Nano, s)
    if err != nil {
        return time.Time{}  // Silent failure
    }
    return t
}

Observation: While there's a test for invalid timestamps, the silent failure might mask data issues.

Recommendation: Consider logging invalid timestamps when cfg.logger is set, similar to how malformed events are logged.

4. Context.Background() in New() (stores/durablestream/store.go:43)

func New(baseURL string, streamPath string, opts ...Option) (*Store, error) {
    return NewWithContext(context.Background(), baseURL, streamPath, opts...)
}

Observation: Using context.Background() for stream creation means initialization cannot be cancelled.

Good: The comment documents this and provides NewWithContext as an alternative.

Consideration: In production, stream creation might involve network calls that could hang. Users should prefer NewWithContext for production code.


🛡️ Security & Performance

Security: ✅ No Concerns

  • No SQL injection risks (HTTP-based)
  • No sensitive data exposure
  • Proper error message sanitization
  • Context handling prevents resource leaks

Performance: ✅ Generally Good

Strengths:

  • Uses json.RawMessage to avoid unnecessary unmarshaling
  • Respects context timeouts
  • No obvious memory leaks
  • HTTP client is reusable

Minor Consideration:

  • The Read method unmarshals the entire response into memory: json.Unmarshal(result.Data, &rawEvents) (store.go:151)
  • For very large batches, this could be memory-intensive
  • However, this is appropriate for the durable-streams protocol design

📝 Best Practices Adherence

Follows Go Idioms: ✅

  • Options pattern for configuration
  • Proper error wrapping
  • Interface compliance verification: var _ eventbus.EventStore = (*Store)(nil)
  • Exported test helpers removed (export_test.go deleted)

Documentation: ✅ Excellent

  • Package-level documentation
  • Function-level comments
  • Important limitations documented (synthetic offsets)
  • README and PERSISTENCE.md updated

🧪 Testing

Coverage: Based on the test file, coverage should be ~100%

The tests cover:

  • ✅ Initialization (with/without context, error cases)
  • ✅ Append (success, errors, context cancellation)
  • ✅ Read (from beginning, with limit, from offset, empty stream)
  • ✅ Options (timeout, client, content type, logger)
  • ✅ Timestamp parsing (valid/invalid formats)
  • ✅ Malformed events handling
  • ✅ Embedded vs synthetic offsets
  • ✅ Server errors
  • ✅ Close operation

Recommendation: Before merging, run:

cd stores/durablestream && go test -coverprofile=coverage.out ./... && go tool cover -func=coverage.out

Verify 100% coverage as mentioned in CLAUDE.md.


🚨 Breaking Changes: Well Documented

The PR clearly documents breaking changes:

  1. API signature change: New(streamURL)New(baseURL, streamPath)
  2. Removed option: WithRetry() (now handled by client)

Both are reasonable and well-communicated.


📋 Recommendations

High Priority

  1. Clarify subscription tracking: Add a note in the README about whether SubscriptionStore is supported or if users need a hybrid approach
  2. Verify test coverage: Run go test -cover to confirm 100% coverage before merge

Medium Priority

  1. Log invalid timestamps: When cfg.logger is set, log timestamp parsing failures (similar to malformed events)
  2. Add example: Include a complete example in docs showing the new API with both New() and NewWithContext()

Low Priority (Nice to Have)

  1. Synthetic offset validation: Consider adding a runtime check or warning if synthetic offsets are used inappropriately
  2. Benchmark tests: Add benchmark tests for Read/Append operations to catch performance regressions

✅ Verdict

Approve with minor suggestions. This is a well-executed refactoring that:

  • ✅ Improves protocol compliance
  • ✅ Reduces maintenance burden
  • ✅ Maintains excellent test coverage
  • ✅ Follows Go best practices
  • ✅ Has clear, helpful documentation

The code is production-ready. The main question is around SubscriptionStore support - clarification would help users understand the full capabilities.

Recommendation: Merge after:

  1. Confirming 100% test coverage with go test -cover
  2. Running go fmt ./... per CLAUDE.md
  3. Adding clarification about subscription tracking (if needed)

Great work on this refactoring! 🎉

@jilio jilio merged commit 61068e8 into main Dec 21, 2025
3 checks passed
@jilio jilio deleted the jilio/durablestream-conformant branch December 21, 2025 12:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant