Skip to content
50 changes: 41 additions & 9 deletions core/protocol/router/helper/accounts/dispatcher/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,29 @@ func (d *AccountDispatcher) Run(ctx context.Context) (types.DispatchSummary, err
//
// Workers never close nonceChan — they only read from it.
//
// Stream lifecycle: the per-worker stream is opened LAZILY, on the first page
// the worker actually has to send (and re-opened the same way after a send
// failure). This is deliberate. libp2p uses lazy multistream negotiation once
// the protocol is in the peer's peerstore (via Identify), so the negotiation
// handshake is deferred until the first Write on the stream. If a worker opened
// its stream up-front and then blocked on nonceChan for longer than the remote's
// negotiation timeout (DefaultNegotiationTimeout = 10s), the client would reset
// the un-negotiated stream with StreamProtocolNegotiationFailed (0x1001), and
// the worker's eventual first write would fail instantly with "stream reset by
// remote, error code: 4097". Opening on the first page guarantees the first
// write immediately follows the open, so negotiation always completes before any
// idle window. Once negotiated, idling on nonceChan between pages is harmless
// (yamux keepalive holds the stream open).
//
// Time: O(1) per iteration overhead.
// Space: O(NoncePageSize) per active page (one DB result set at a time).
func (d *AccountDispatcher) dispatchWorker(ctx context.Context) {
stream, err := d.callbacks.OpenStream(ctx)
if err != nil {
return
}
defer stream.Close()
var stream types.AccountSyncStream
defer func() {
if stream != nil {
stream.Close()
}
}()

for {
select {
Expand All @@ -97,13 +112,30 @@ func (d *AccountDispatcher) dispatchWorker(ctx context.Context) {
case <-ctx.Done():
return
case page := <-d.nonceChan:
// Open lazily on the first page (or after a prior failure reset
// stream to nil) so the first write immediately follows the open.
if stream == nil {
s, err := d.callbacks.OpenStream(ctx)
if err != nil {
// Could not open a stream — fail this page (re-queue or
// dead-letter). The stream itself never existed, so there
// is nothing to close; the next page will retry the open.
d.handleFailure(ctx, page, fmt.Errorf("open dispatch stream: %w", err),
types.DispatchPageMetrics{
PageIndex: page.PageIndex,
NonceCount: len(page.Nonces),
Retries: page.Retries,
Success: false,
})
continue
}
stream = s
}

streamOK := d.processPage(ctx, stream, page)
if !streamOK {
stream.Close()
stream, err = d.callbacks.OpenStream(ctx)
if err != nil {
return
}
stream = nil // reopen lazily on the next page
}
}
}
Expand Down
57 changes: 38 additions & 19 deletions core/sync/sync_protocols.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,11 +658,13 @@ func (s *Sync) HandleAccountsSync(ctx context.Context, node host.Host) error {
}

// HandleAccountsSyncData registers the client-side handler for the
// AccountsSyncDataProtocol stream. The server dials this protocol for each
// page of missing accounts it wants to deliver; the client reads the page,
// routes it to the Datarouter for storage, and acks before the stream closes.
// AccountsSyncDataProtocol stream. The server dials this protocol and streams
// many pages of missing accounts over one persistent stream; the client persists
// each page durably (WAL + Redis stream via the Datarouter) and only then ACKs it,
// so the ACK reflects true durability and no page is buffered beyond write time.
//
// Pattern mirrors HandleMerkle: single read → route → single write.
// Pattern: per-page loop of read → persist → ACK (durable-before-ACK). Pages are
// NOT accumulated across the session — receive memory is bounded to one page.
func (s *Sync) HandleAccountsSyncData(ctx context.Context, node host.Host) error {
node.SetStreamHandler(constants.AccountsSyncDataProtocol, func(str network.Stream) {
defer str.Close()
Expand All @@ -681,10 +683,20 @@ func (s *Sync) HandleAccountsSyncData(ctx context.Context, node host.Host) error

s.Debug(ctx, constants.AccountsSyncDataProtocol, node, remoteNodeInfo)

var batch []*accountspb.Account

// Durable-before-ACK, per page. Each page is persisted (WAL + Redis stream via
// the Datarouter) BEFORE its ACK is sent, so the client ACK reflects TRUE
// durability: a page that cannot be persisted is NAK'd (Ack.Ok=false) and the
// dispatcher retries it (DispatchMaxRetries → dead-letter) instead of the data
// being silently lost. There is NO cross-page accumulation — peak client receive
// memory is one page per stream, not the whole diff range (previously the entire
// session was buffered in one slice and written once at EOF, which OOM'd on large
// syncs). The persist call enqueues to WAL + Redis and returns fast; it does NOT
// block on the downstream ImmuDB commit, so per-page ACK latency stays well under
// DispatchACKTimeout.
defer str.SetReadDeadline(time.Time{})
defer str.SetWriteDeadline(time.Time{})

var pages, accountsWritten int
for {
_ = str.SetReadDeadline(time.Now().Add(constants.DispatchACKTimeout))
page := &accountspb.AccountSyncServerMessage{}
Expand All @@ -706,27 +718,34 @@ func (s *Sync) HandleAccountsSyncData(ctx context.Context, node host.Host) error
continue
}

batch = append(batch, resp.GetAccounts()...)
accounts := resp.GetAccounts()

// Persist this page durably (WAL + Redis stream) before acking.
_ = str.SetWriteDeadline(time.Now().Add(constants.DispatchACKTimeout))
ack := accountshelper.NewResultFactory(resp.GetPageIndex()).BatchAck()
_ = pbstream.WriteDelimited(str, ack)
}
if err := s.Datarouter.WriteAccountsBatch(ctx, accounts); err != nil {
logging.Logger(logging.Sync).Error(ctx, "accountsync: page persist failed — NAK", err,
ion.Int("page_index", int(resp.GetPageIndex())),
ion.Int("account_count", len(accounts)),
ion.String("from_peer", remoteNodeInfo.PeerID.String()),
)
// NAK → dispatcher retries this page; data is not lost.
_ = pbstream.WriteDelimited(str, accountshelper.NewResultFactory(resp.GetPageIndex()).ErrBatchAck(err.Error()))
continue
}

if len(batch) == 0 {
return
// Durable in WAL + Redis — ACK so the server proceeds to the next page.
_ = pbstream.WriteDelimited(str, accountshelper.NewResultFactory(resp.GetPageIndex()).BatchAck())
pages++
accountsWritten += len(accounts)
}

if err := s.Datarouter.WriteAccountsBatch(ctx, batch); err != nil {
logging.Logger(logging.Sync).Error(ctx, "accountsync: batch write failed", err,
ion.Int("account_count", len(batch)),
ion.String("from_peer", remoteNodeInfo.PeerID.String()),
)
if accountsWritten == 0 {
return
}

logging.Logger(logging.Sync).Info(ctx, "accountsync: batch written to DB",
ion.Int("account_count", len(batch)),
logging.Logger(logging.Sync).Info(ctx, "accountsync: stream drained (durable-before-ACK)",
ion.Int("pages", pages),
ion.Int("account_count", accountsWritten),
ion.String("from_peer", remoteNodeInfo.PeerID.String()),
)
})
Expand Down
Loading