From 5eb21e781fa127416eccba9b27de944e9e4edc7c Mon Sep 17 00:00:00 2001 From: Andy Date: Tue, 19 May 2026 12:11:42 +0300 Subject: [PATCH] feat: mailbox frame delivery + Server.Snapshot() (ADR-002) Add per-module mailbox slot with latest-wins semantics and Snapshot() method for compositor render-tick polling. Push and pull coexist naturally -- no mode negotiation, no wire protocol changes. Changes: - Frame.Sequence field: maps wire protocol sequence for change detection - moduleConn.storeLatest/loadLatest: mutex-guarded mailbox slot - Server.Snapshot(): returns map[uint64]*Frame of latest per-module frames - readFrameLoop stores every frame in mailbox before OnFrame callback - 8 new tests: basic, latest-wins, no-frames, multi-module, pull, concurrent, backward-compat, headerToFrame sequence mapping --- compose.go | 6 + compose_test.go | 447 ++++++++++++++++++++++++++++++++++++++++++++++++ server.go | 59 ++++++- 3 files changed, 511 insertions(+), 1 deletion(-) diff --git a/compose.go b/compose.go index ae8e88f..184d9cf 100644 --- a/compose.go +++ b/compose.go @@ -32,4 +32,10 @@ type Frame struct { // Timestamp is a monotonic nanosecond timestamp from the module's clock. Timestamp int64 + + // Sequence is the monotonically increasing frame counter assigned by + // the publishing client. It is carried from the wire protocol header + // and can be used for change detection (e.g., comparing against a + // previously seen sequence number in Snapshot). + Sequence uint64 } diff --git a/compose_test.go b/compose_test.go index 052e59f..8008851 100644 --- a/compose_test.go +++ b/compose_test.go @@ -809,3 +809,450 @@ func buildTestHeader(flags uint8, dirtyX, dirtyY, dirtyW, dirtyH uint16) protoco PayloadSize: 256, } } + +// --------------------------------------------------------------------------- +// Mailbox / Snapshot tests (ADR-002) +// --------------------------------------------------------------------------- + +func TestSnapshotBasic(t *testing.T) { + addr := tempSocket(t) + + srv, err := Listen(addr) + if err != nil { + t.Fatalf("Listen: %v", err) + } + t.Cleanup(func() { _ = srv.Close() }) + + client, err := Dial(addr, WithName("snap-basic"), WithFrameSize(4, 4)) + if err != nil { + t.Fatalf("Dial: %v", err) + } + t.Cleanup(func() { _ = client.Close() }) + + pixels := makePixels(4, 4, 0xDD) + if err := client.PublishFrame(Frame{ + Pixels: pixels, + Width: 4, + Height: 4, + }); err != nil { + t.Fatalf("PublishFrame: %v", err) + } + + // Wait for the frame to propagate through the read loop. + var snap map[uint64]*Frame + ok := waitFor(t, func() bool { + snap = srv.Snapshot() + for _, f := range snap { + if f != nil { + return true + } + } + return false + }) + if !ok { + t.Fatal("timed out waiting for Snapshot to contain a frame") + } + + // Verify exactly one module with a non-nil frame. + var frame *Frame + for _, f := range snap { + if f != nil { + frame = f + } + } + if frame == nil { + t.Fatal("Snapshot returned no non-nil frames") + } + if frame.Width != 4 || frame.Height != 4 { + t.Errorf("frame dimensions = %dx%d, want 4x4", frame.Width, frame.Height) + } + if frame.Sequence != 1 { + t.Errorf("frame Sequence = %d, want 1", frame.Sequence) + } +} + +func TestSnapshotLatestWins(t *testing.T) { + addr := tempSocket(t) + + srv, err := Listen(addr) + if err != nil { + t.Fatalf("Listen: %v", err) + } + t.Cleanup(func() { _ = srv.Close() }) + + // Track frame count via OnFrame to know when all 3 arrive. + var frameCount atomic.Int32 + srv.OnFrame(func(_ Frame) { + frameCount.Add(1) + }) + + client, err := Dial(addr, WithName("snap-latest"), WithFrameSize(2, 2)) + if err != nil { + t.Fatalf("Dial: %v", err) + } + t.Cleanup(func() { _ = client.Close() }) + + // Publish 3 frames rapidly with different fill bytes. + for i := byte(1); i <= 3; i++ { + if pubErr := client.PublishFrame(Frame{ + Pixels: makePixels(2, 2, i), + Width: 2, + Height: 2, + }); pubErr != nil { + t.Fatalf("PublishFrame %d: %v", i, pubErr) + } + } + + // Wait for all 3 frames to be processed. + ok := waitFor(t, func() bool { + return frameCount.Load() >= 3 + }) + if !ok { + t.Fatalf("timed out: received %d/3 frames", frameCount.Load()) + } + + snap := srv.Snapshot() + var frame *Frame + for _, f := range snap { + if f != nil { + frame = f + } + } + if frame == nil { + t.Fatal("Snapshot returned no non-nil frames") + } + + // Latest frame (3rd publish) should have sequence 3. + if frame.Sequence != 3 { + t.Errorf("Snapshot Sequence = %d, want 3 (latest)", frame.Sequence) + } + + // Verify pixel content matches the last fill byte (0x03). + for i, b := range frame.Pixels { + if b != 0x03 { + t.Errorf("pixel[%d] = 0x%02X, want 0x03 (latest frame)", i, b) + break + } + } +} + +func TestSnapshotNoFrames(t *testing.T) { + addr := tempSocket(t) + + srv, err := Listen(addr) + if err != nil { + t.Fatalf("Listen: %v", err) + } + t.Cleanup(func() { _ = srv.Close() }) + + var moduleID atomic.Uint64 + srv.OnConnect(func(id uint64, _ string) { + moduleID.Store(id) + }) + + client, err := Dial(addr, WithName("snap-empty"), WithFrameSize(2, 2)) + if err != nil { + t.Fatalf("Dial: %v", err) + } + t.Cleanup(func() { _ = client.Close() }) + + // Wait for the connection to be established. + ok := waitFor(t, func() bool { + return moduleID.Load() != 0 + }) + if !ok { + t.Fatal("timed out waiting for connection") + } + + // Snapshot before any frames published should have nil entry. + snap := srv.Snapshot() + id := moduleID.Load() + f, exists := snap[id] + if !exists { + t.Fatalf("Snapshot missing module %d", id) + } + if f != nil { + t.Errorf("Snapshot for module with no frames should be nil, got Sequence=%d", f.Sequence) + } +} + +func TestSnapshotMultipleModules(t *testing.T) { + addr := tempSocket(t) + + srv, err := Listen(addr, WithMaxModules(4)) + if err != nil { + t.Fatalf("Listen: %v", err) + } + t.Cleanup(func() { _ = srv.Close() }) + + var frameCount atomic.Int32 + srv.OnFrame(func(_ Frame) { + frameCount.Add(1) + }) + + // Connect two clients with different frame sizes. + c1, err := Dial(addr, WithName("mod-a"), WithFrameSize(4, 4)) + if err != nil { + t.Fatalf("Dial mod-a: %v", err) + } + t.Cleanup(func() { _ = c1.Close() }) + + c2, err := Dial(addr, WithName("mod-b"), WithFrameSize(8, 8)) + if err != nil { + t.Fatalf("Dial mod-b: %v", err) + } + t.Cleanup(func() { _ = c2.Close() }) + + // Publish from both. + if pubErr := c1.PublishFrame(Frame{ + Pixels: makePixels(4, 4, 0xAA), + Width: 4, + Height: 4, + }); pubErr != nil { + t.Fatalf("PublishFrame mod-a: %v", pubErr) + } + + if pubErr := c2.PublishFrame(Frame{ + Pixels: makePixels(8, 8, 0xBB), + Width: 8, + Height: 8, + }); pubErr != nil { + t.Fatalf("PublishFrame mod-b: %v", pubErr) + } + + // Wait for both frames. + ok := waitFor(t, func() bool { + return frameCount.Load() >= 2 + }) + if !ok { + t.Fatalf("timed out: received %d/2 frames", frameCount.Load()) + } + + snap := srv.Snapshot() + + // Should have two entries, both non-nil. + nonNil := 0 + for _, f := range snap { + if f != nil { + nonNil++ + } + } + if nonNil != 2 { + t.Errorf("Snapshot has %d non-nil entries, want 2", nonNil) + } + + // Verify different dimensions. + dims := make(map[uint32]bool) + for _, f := range snap { + if f != nil { + dims[f.Width] = true + } + } + if !dims[4] || !dims[8] { + t.Errorf("expected widths 4 and 8 in snapshot, got %v", dims) + } +} + +func TestSnapshotWithPull(t *testing.T) { + addr := tempSocket(t) + + srv, err := Listen(addr) + if err != nil { + t.Fatalf("Listen: %v", err) + } + t.Cleanup(func() { _ = srv.Close() }) + + var moduleID atomic.Uint64 + srv.OnConnect(func(id uint64, _ string) { + moduleID.Store(id) + }) + + client, err := Dial(addr, WithName("snap-pull"), WithFrameSize(2, 2)) + if err != nil { + t.Fatalf("Dial: %v", err) + } + t.Cleanup(func() { _ = client.Close() }) + + // Set up pull-based: respond to FrameRequest with a publish. + client.OnFrameRequest(func() { + _ = client.PublishFrame(Frame{ + Pixels: makePixels(2, 2, 0xEE), + Width: 2, + Height: 2, + }) + }) + + // Wait for the connection. + ok := waitFor(t, func() bool { + return moduleID.Load() != 0 + }) + if !ok { + t.Fatal("timed out waiting for connection") + } + + // Server requests a frame (pull model). + id := moduleID.Load() + if reqErr := srv.RequestFrame(id); reqErr != nil { + t.Fatalf("RequestFrame: %v", reqErr) + } + + // Wait for the pulled frame to appear in Snapshot. + ok = waitFor(t, func() bool { + snap := srv.Snapshot() + f := snap[id] + return f != nil + }) + if !ok { + t.Fatal("timed out waiting for pull frame in Snapshot") + } + + snap := srv.Snapshot() + f := snap[id] + if f.Width != 2 || f.Height != 2 { + t.Errorf("pulled frame dimensions = %dx%d, want 2x2", f.Width, f.Height) + } + if f.Pixels[0] != 0xEE { + t.Errorf("pulled frame pixel[0] = 0x%02X, want 0xEE", f.Pixels[0]) + } +} + +func TestSnapshotConcurrent(t *testing.T) { + addr := tempSocket(t) + + srv, err := Listen(addr) + if err != nil { + t.Fatalf("Listen: %v", err) + } + t.Cleanup(func() { _ = srv.Close() }) + + client, err := Dial(addr, WithName("snap-race"), WithFrameSize(2, 2)) + if err != nil { + t.Fatalf("Dial: %v", err) + } + t.Cleanup(func() { _ = client.Close() }) + + // Parallel writers: push frames from multiple goroutines. + const writers = 4 + const framesPerWriter = 10 + var wg sync.WaitGroup + + for w := 0; w < writers; w++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + for i := 0; i < framesPerWriter; i++ { + _ = client.PublishFrame(Frame{ + Pixels: makePixels(2, 2, byte(id*framesPerWriter+i)), + Width: 2, + Height: 2, + }) + } + }(w) + } + + // Parallel reader: call Snapshot concurrently with writes. + const readers = 4 + for r := 0; r < readers; r++ { + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < framesPerWriter*2; i++ { + snap := srv.Snapshot() + // Snapshot must never panic, and all entries are either nil or valid. + for _, f := range snap { + if f != nil { + _ = f.Sequence + _ = f.Width + } + } + } + }() + } + + wg.Wait() +} + +func TestOnFrameStillFires(t *testing.T) { + addr := tempSocket(t) + + srv, err := Listen(addr) + if err != nil { + t.Fatalf("Listen: %v", err) + } + t.Cleanup(func() { _ = srv.Close() }) + + var callbackCount atomic.Int32 + srv.OnFrame(func(_ Frame) { + callbackCount.Add(1) + }) + + client, err := Dial(addr, WithName("snap-compat"), WithFrameSize(2, 2)) + if err != nil { + t.Fatalf("Dial: %v", err) + } + t.Cleanup(func() { _ = client.Close() }) + + // Publish 5 frames. + const n = 5 + for i := 0; i < n; i++ { + if pubErr := client.PublishFrame(Frame{ + Pixels: makePixels(2, 2, byte(i)), + Width: 2, + Height: 2, + }); pubErr != nil { + t.Fatalf("PublishFrame %d: %v", i, pubErr) + } + } + + // Wait for all 5 callbacks to fire. + ok := waitFor(t, func() bool { + return callbackCount.Load() >= n + }) + if !ok { + t.Fatalf("OnFrame fired %d/%d times", callbackCount.Load(), n) + } + + // Verify: OnFrame fires for EVERY frame, not just latest. + if got := callbackCount.Load(); got != n { + t.Errorf("OnFrame count = %d, want %d (every frame)", got, n) + } + + // Snapshot returns only the latest. + snap := srv.Snapshot() + var frame *Frame + for _, f := range snap { + if f != nil { + frame = f + } + } + if frame == nil { + t.Fatal("Snapshot returned no frames") + } + if frame.Sequence != n { + t.Errorf("Snapshot Sequence = %d, want %d (latest)", frame.Sequence, n) + } +} + +func TestHeaderToFrameSequence(t *testing.T) { + pixels := makePixels(4, 4, 0xFF) + hdr := protocol.Header{ + Magic: protocol.Magic, + Version: protocol.ProtocolVersion, + MsgType: protocol.MsgFrame, + ModuleID: 42, + Sequence: 99, + TimestampNs: 1234, + Width: 4, + Height: 4, + Stride: 16, + } + + f := headerToFrame(hdr, pixels, "seqtest") + + if f.Sequence != 99 { + t.Errorf("Frame.Sequence = %d, want 99", f.Sequence) + } + if f.ModuleID != 42 { + t.Errorf("Frame.ModuleID = %d, want 42", f.ModuleID) + } +} diff --git a/server.go b/server.go index 464f09d..29efca4 100644 --- a/server.go +++ b/server.go @@ -15,10 +15,43 @@ import ( ) // moduleConn tracks a per-module connection on the server side. +// Each moduleConn contains a mailbox slot that stores the most recent frame +// received from this module. The mailbox enables "latest frame wins" semantics +// for compositors that poll via Server.Snapshot() instead of (or in addition to) +// the OnFrame callback. type moduleConn struct { conn *socket.Conn moduleID uint64 name string + + // latestMu guards access to the mailbox slot (latest and seq). + latestMu sync.Mutex + + // latest holds the most recently received frame, or nil if no frame + // has been received yet. Overwritten on every receipt (latest wins). + latest *Frame + + // seq is the wire protocol sequence number of the stored frame. + // Used for change detection by Snapshot consumers. + seq uint64 +} + +// storeLatest overwrites the mailbox with the given frame. Thread-safe. +func (mc *moduleConn) storeLatest(f Frame) { + mc.latestMu.Lock() + mc.latest = &f + mc.seq = f.Sequence + mc.latestMu.Unlock() +} + +// loadLatest returns the current mailbox frame (may be nil) and its sequence. +// Thread-safe. Does not clear the mailbox — compositors may call Snapshot +// multiple times and the frame stays until overwritten by a newer one. +func (mc *moduleConn) loadLatest() *Frame { + mc.latestMu.Lock() + f := mc.latest + mc.latestMu.Unlock() + return f } // Server is the compositor-side endpoint that accepts module connections @@ -182,6 +215,26 @@ func (s *Server) RequestFrame(moduleID uint64) error { return nil } +// Snapshot returns the latest frame from each connected module. +// Returns nil entries for modules that haven't sent any frames yet. +// The returned map is keyed by module ID. +// +// Snapshot does NOT clear the mailbox — the frame stays until overwritten +// by a newer one from the same module. Compositors may call Snapshot +// multiple times per render tick; each call returns the same frame until +// the module publishes a new one. Use Frame.Sequence for change detection. +// +// Thread-safe. Designed to be called once per compositor render tick. +func (s *Server) Snapshot() map[uint64]*Frame { + s.modulesMu.RLock() + result := make(map[uint64]*Frame, len(s.modules)) + for id, mc := range s.modules { + result[id] = mc.loadLatest() + } + s.modulesMu.RUnlock() + return result +} + // Close shuts down the server, disconnects all modules, and releases // resources. After Close returns, no more callbacks will be invoked. func (s *Server) Close() error { @@ -333,13 +386,16 @@ func (s *Server) readFrameLoop(mc *moduleConn) { // Build Frame from header fields. frame := headerToFrame(hdr, pixels, mc.name) + // Store in mailbox (latest-wins for Snapshot consumers). + mc.storeLatest(frame) + // Notify flow controller. s.flow.FrameDelivered(mc.moduleID) // Update registry last frame time. s.manager.Registry().UpdateLastFrame(mc.moduleID, time.Now()) - // Fire OnFrame callback. + // Fire OnFrame callback (backward compatible — fires on every receipt). s.mu.RLock() cb := s.onFrame s.mu.RUnlock() @@ -393,6 +449,7 @@ func headerToFrame(hdr protocol.Header, pixels []byte, name string) Frame { Width: uint32(hdr.Width), Height: uint32(hdr.Height), Timestamp: hdr.TimestampNs, + Sequence: hdr.Sequence, } if hdr.Flags.Has(protocol.FlagDirtyValid) {