feat: add state package implementing Durable Streams State Protocol#50
Conversation
Add an optional `state/` package that provides database-style state synchronization semantics on top of ebu's event bus. This enables interoperability with the durable-streams ecosystem while keeping core ebu unchanged. Key features: - Type-safe helpers (Insert, Update, Delete) using Go generics - Options pattern for message configuration (TxID, timestamps) - Materializer that builds typed state from events - Full JSON interoperability with State Protocol spec - 100% test coverage 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
Pull Request Review: State Package ImplementationSummaryThis PR adds a well-designed ✅ StrengthsCode Quality
Design
Testing
🔍 Issues & Concerns1. Missing Validation in Control Messages (Minor)Location: Control message constructors ( func SnapshotStart(offset string) (*ControlMessage, error) {
if offset == "" {
return nil, fmt.Errorf("state: offset cannot be empty")
}
// ...
}Impact: Low - but could prevent confusing bugs 2. Potential Race Condition in Materializer.lastOffset (Medium)Location:
// Line 217 - Write (no lock)
m.lastOffset = event.Offset
// Line 283 - Read (no lock)
func (m *Materializer) LastOffset() eventbus.Offset {
return m.lastOffset
}Fix: Add mutex protection: func (m *Materializer) LastOffset() eventbus.Offset {
m.mu.RLock()
defer m.mu.RUnlock()
return m.lastOffset
}Impact: Medium - potential data race under concurrent access 3. Error Handling Inconsistency (Minor)Location: In Suggestion: Add logging or explicit error handling for control message parsing failures. 4. Missing Nil Checks (Low)Location: The func (m *Materializer) applyChange(msg *ChangeMessage) error {
if msg == nil {
return fmt.Errorf("state: nil change message")
}
// ...
}5. JSON Marshaling Errors Could Be More Specific (Minor)Location: When JSON marshaling fails for complex types, the error message could be more helpful: if err != nil {
return nil, fmt.Errorf("state: marshal value of type %T: %w", value, err)
}🔒 Security Considerations✅ No Major Security Issues Found
|
Fixes a potential race condition where lastOffset was being written in Apply() and read in LastOffset() without synchronization. Now both reads and writes are protected by the RWMutex. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
Code Review: State Package ImplementationI've completed a comprehensive review of this PR implementing the Durable Streams State Protocol. Overall, this is excellent work with high-quality implementation. Here are my findings: ✅ StrengthsCode Quality & Design
Testing
Thread Safety
🔍 Potential Issues & Suggestions1. Potential Nil Pointer Issue in EntityType (state/message.go:90)func EntityType(entity any) string {
if namer, ok := entity.(TypeNamer); ok {
return namer.StateTypeName()
}
return reflect.TypeOf(entity).String() // ⚠️ Panics if entity is nil
}Issue: If Recommendation: Add nil check: func EntityType(entity any) string {
if namer, ok := entity.(TypeNamer); ok {
return namer.StateTypeName()
}
t := reflect.TypeOf(entity)
if t == nil {
return "nil" // or return an error
}
return t.String()
}2. Missing Validation in Helper FunctionsThe helper functions validate empty keys but don't validate other constraints:
Recommendation: Consider adding validation if strict protocol compliance is important. 3. MemoryStore.All() Returns Shallow Copy (state/materializer.go:71-78)func (s *MemoryStore[T]) All() map[string]T {
s.mu.RLock()
defer s.mu.RUnlock()
result := make(map[string]T, len(s.data))
for k, v := range s.data {
result[k] = v // ⚠️ Shallow copy
}
return result
}Issue: If Recommendation: Document this behavior clearly, or consider returning 4. TypedCollection.Get() Inconsistency// Get retrieves an entity by its key (without the type prefix).
func (c *TypedCollection[T]) Get(key string) (T, bool) {
return c.store.Get(CompositeKey(c.entityType, key))
}Observation: Recommendation: Document this clearly (which you have), or consider adding a 5. Materializer.Apply() Double Unmarshalfunc (m *Materializer) Apply(event *eventbus.StoredEvent) error {
var raw struct {
Headers json.RawMessage `json:"headers"`
}
if err := json.Unmarshal(event.Data, &raw); err != nil {
return fmt.Errorf("state: unmarshal event: %w", err)
}
var ctrlHeaders ControlHeaders
if json.Unmarshal(raw.Headers, &ctrlHeaders) == nil && ctrlHeaders.Control != "" {
// ...
}
var changeMsg ChangeMessage
if err := json.Unmarshal(event.Data, &changeMsg); err != nil { // ⚠️ Second full unmarshal
return fmt.Errorf("state: unmarshal change message: %w", err)
}
// ...
}Issue: Performance - unmarshals Recommendation: Consider unmarshaling once into a union type or using json.RawMessage more efficiently. For small messages this is fine, but could be optimized. 6. Missing Bounds Checking
Recommendation: Consider adding configurable limits if this will be used in production with untrusted input. 🔒 Security ConsiderationsNo Critical Issues Found
Minor Considerations
⚡ Performance ConsiderationsGood
Could Improve
Benchmark RecommendationConsider adding benchmarks for:
📋 Pre-Merge ChecklistPer CLAUDE.md, before merging verify:
🎯 VerdictThis is high-quality code that follows Go best practices and the repository's standards. The implementation is:
Recommendations Priority
The nil pointer issue should be addressed before merge. Other items are minor and can be addressed in follow-up PRs if desired. Great work on this implementation! 🚀 |
Add nil check in EntityType() to return "nil" string instead of panicking when reflect.TypeOf(nil).String() is called. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
Document the new optional state package that implements the Durable Streams State Protocol for database-style state synchronization. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
Code Review for State Package ImplementationSummaryThis is a well-architected implementation of the Durable Streams State Protocol for ebu. The code demonstrates strong design principles, comprehensive testing, and thoughtful integration with the existing ebu architecture. StrengthsCode Quality ✅
Test Coverage ✅
Design Decisions ✅
Issues & Recommendations🟡 Minor Issues1. Potential Race Condition in
|
Code Review: State Package ImplementationOverviewThis PR adds an optional ✅ StrengthsArchitecture & Design
Code Quality
Integration
🔍 Code Quality Observationsmessage.go (lines 87-95)The EntityType() function has good fallback logic. Consider documenting that reflect.TypeOf().String() returns package-qualified names which may not be stable across package refactoring (hence the recommendation to implement TypeNamer). helpers.go (lines 60-75)The zero value instantiation pattern materializer.go (lines 190-223)The Apply() method's type detection logic is clever - checking for control headers first. However, there's a potential edge case: Issue: If json.Unmarshal(event.Data, &raw) succeeds but the Data contains neither a valid ControlMessage nor ChangeMessage, the error from line 212 will be returned, but it may not accurately reflect the actual problem. Suggestion: Consider adding a more descriptive error message that indicates the event data doesn't match either expected message type. materializer.go (lines 237-258)Good error handling with the strictSchema option. The dual-mode behavior (strict vs lenient) is well-designed for different use cases. 🔐 Security ConsiderationsNo Critical Issues FoundThe code handles untrusted JSON input safely:
Minor observation: The package stores arbitrary JSON blobs in Value/OldValue fields. This is fine, but consumers should be aware that malicious JSON could consume memory if entities are large. This is an acceptable trade-off for the flexibility provided. ⚡ Performance ConsiderationsGood Performance Characteristics
Potential Optimizations (Not Required)
🧪 Test CoverageThe PR claims 100% test coverage with 43 tests. The test file (state_test.go) shows:
Recommendation: Before merging, verify coverage with: go test -coverprofile=coverage.out ./state/...
go tool cover -func=coverage.out📋 Best Practices CompliancePer CLAUDE.md requirements: ✅ Go idioms: Excellent use of interfaces, generics, and options pattern 🐛 Potential IssuesMinor: Empty key validation (helpers.go:61-63)The validation checks for empty keys but doesn't validate the key format. Consider if keys containing "/" could cause issues since CompositeKey uses "/" as a separator. Example edge case: msg, _ := Insert("user/admin", User{Name: "Alice"})
// Composite key becomes: "state.User/user/admin"
// This could be ambiguous when parsingRecommendation: Either:
Minor: Materializer offset tracking (materializer.go:203-205, 219-221)The lastOffset is updated inside the Apply() method, but if Apply() is called with events out of order, the lastOffset could be misleading. Consider documenting that events should be replayed in order. 📝 Documentation QualityExcellent Documentation
README.md AdditionThe README addition (lines 387-428) is clear and provides good quick-start examples. Well done. 🎯 RecommendationsBefore Merge (Required per CLAUDE.md):
Consider for Future (Optional):
🏆 Overall AssessmentThis is high-quality code that follows Go best practices and integrates well with ebu's architecture. The implementation is clean, well-tested, and production-ready. Recommendation: ✅ Approve with minor suggestions The code is ready to merge after running the standard pre-merge checks (fmt, test, coverage) as specified in CLAUDE.md. The minor issues noted above are not blockers but could be addressed in follow-up PRs if desired. Great work on this implementation! 🎉 |
Summary
This PR adds an optional
state/package that implements the Durable Streams State Protocol for ebu, enabling database-style state synchronization on top of ebu's event-driven architecture.What's New
Insert,Update,Delete,UpdateWithOldValue,DeleteWithOldValueWithTxID,WithTimestamp,WithAutoTimestamp,WithEntityTypeDesign
The package is completely optional and doesn't modify core ebu. It integrates seamlessly with ebu's
Replayfunctionality and event bus. Messages implement ebu'sTypeNamerinterface and can be published directly.Test Coverage
All 44 tests pass with 100% code coverage. Testing includes unit tests for all types and functions, integration tests with the event bus, JSON conformance tests, concurrent access tests, and error case coverage.
Pre-Merge Verification
go fmt ./...- code is properly formattedgo test ./...- all tests passgo test -coverprofile=coverage.out ./state && go tool cover -func=coverage.out- 100.0% coverage verified🤖 Generated with Claude Code