diff --git a/core/protocol/router/helper/accounts/dispatcher/run.go b/core/protocol/router/helper/accounts/dispatcher/run.go index 2b50bc0..73b24db 100644 --- a/core/protocol/router/helper/accounts/dispatcher/run.go +++ b/core/protocol/router/helper/accounts/dispatcher/run.go @@ -81,14 +81,29 @@ func (d *AccountDispatcher) Run(ctx context.Context) (types.DispatchSummary, err // // Workers never close nonceChan — they only read from it. // +// Stream lifecycle: the per-worker stream is opened LAZILY, on the first page +// the worker actually has to send (and re-opened the same way after a send +// failure). This is deliberate. libp2p uses lazy multistream negotiation once +// the protocol is in the peer's peerstore (via Identify), so the negotiation +// handshake is deferred until the first Write on the stream. If a worker opened +// its stream up-front and then blocked on nonceChan for longer than the remote's +// negotiation timeout (DefaultNegotiationTimeout = 10s), the client would reset +// the un-negotiated stream with StreamProtocolNegotiationFailed (0x1001), and +// the worker's eventual first write would fail instantly with "stream reset by +// remote, error code: 4097". Opening on the first page guarantees the first +// write immediately follows the open, so negotiation always completes before any +// idle window. Once negotiated, idling on nonceChan between pages is harmless +// (yamux keepalive holds the stream open). +// // Time: O(1) per iteration overhead. // Space: O(NoncePageSize) per active page (one DB result set at a time). func (d *AccountDispatcher) dispatchWorker(ctx context.Context) { - stream, err := d.callbacks.OpenStream(ctx) - if err != nil { - return - } - defer stream.Close() + var stream types.AccountSyncStream + defer func() { + if stream != nil { + stream.Close() + } + }() for { select { @@ -97,13 +112,30 @@ func (d *AccountDispatcher) dispatchWorker(ctx context.Context) { case <-ctx.Done(): return case page := <-d.nonceChan: + // Open lazily on the first page (or after a prior failure reset + // stream to nil) so the first write immediately follows the open. + if stream == nil { + s, err := d.callbacks.OpenStream(ctx) + if err != nil { + // Could not open a stream — fail this page (re-queue or + // dead-letter). The stream itself never existed, so there + // is nothing to close; the next page will retry the open. + d.handleFailure(ctx, page, fmt.Errorf("open dispatch stream: %w", err), + types.DispatchPageMetrics{ + PageIndex: page.PageIndex, + NonceCount: len(page.Nonces), + Retries: page.Retries, + Success: false, + }) + continue + } + stream = s + } + streamOK := d.processPage(ctx, stream, page) if !streamOK { stream.Close() - stream, err = d.callbacks.OpenStream(ctx) - if err != nil { - return - } + stream = nil // reopen lazily on the next page } } } diff --git a/core/sync/sync_protocols.go b/core/sync/sync_protocols.go index 32a46f0..1582ebd 100644 --- a/core/sync/sync_protocols.go +++ b/core/sync/sync_protocols.go @@ -658,11 +658,13 @@ func (s *Sync) HandleAccountsSync(ctx context.Context, node host.Host) error { } // HandleAccountsSyncData registers the client-side handler for the -// AccountsSyncDataProtocol stream. The server dials this protocol for each -// page of missing accounts it wants to deliver; the client reads the page, -// routes it to the Datarouter for storage, and acks before the stream closes. +// AccountsSyncDataProtocol stream. The server dials this protocol and streams +// many pages of missing accounts over one persistent stream; the client persists +// each page durably (WAL + Redis stream via the Datarouter) and only then ACKs it, +// so the ACK reflects true durability and no page is buffered beyond write time. // -// Pattern mirrors HandleMerkle: single read → route → single write. +// Pattern: per-page loop of read → persist → ACK (durable-before-ACK). Pages are +// NOT accumulated across the session — receive memory is bounded to one page. func (s *Sync) HandleAccountsSyncData(ctx context.Context, node host.Host) error { node.SetStreamHandler(constants.AccountsSyncDataProtocol, func(str network.Stream) { defer str.Close() @@ -681,10 +683,20 @@ func (s *Sync) HandleAccountsSyncData(ctx context.Context, node host.Host) error s.Debug(ctx, constants.AccountsSyncDataProtocol, node, remoteNodeInfo) - var batch []*accountspb.Account - + // Durable-before-ACK, per page. Each page is persisted (WAL + Redis stream via + // the Datarouter) BEFORE its ACK is sent, so the client ACK reflects TRUE + // durability: a page that cannot be persisted is NAK'd (Ack.Ok=false) and the + // dispatcher retries it (DispatchMaxRetries → dead-letter) instead of the data + // being silently lost. There is NO cross-page accumulation — peak client receive + // memory is one page per stream, not the whole diff range (previously the entire + // session was buffered in one slice and written once at EOF, which OOM'd on large + // syncs). The persist call enqueues to WAL + Redis and returns fast; it does NOT + // block on the downstream ImmuDB commit, so per-page ACK latency stays well under + // DispatchACKTimeout. defer str.SetReadDeadline(time.Time{}) defer str.SetWriteDeadline(time.Time{}) + + var pages, accountsWritten int for { _ = str.SetReadDeadline(time.Now().Add(constants.DispatchACKTimeout)) page := &accountspb.AccountSyncServerMessage{} @@ -706,27 +718,34 @@ func (s *Sync) HandleAccountsSyncData(ctx context.Context, node host.Host) error continue } - batch = append(batch, resp.GetAccounts()...) + accounts := resp.GetAccounts() + // Persist this page durably (WAL + Redis stream) before acking. _ = str.SetWriteDeadline(time.Now().Add(constants.DispatchACKTimeout)) - ack := accountshelper.NewResultFactory(resp.GetPageIndex()).BatchAck() - _ = pbstream.WriteDelimited(str, ack) - } + if err := s.Datarouter.WriteAccountsBatch(ctx, accounts); err != nil { + logging.Logger(logging.Sync).Error(ctx, "accountsync: page persist failed — NAK", err, + ion.Int("page_index", int(resp.GetPageIndex())), + ion.Int("account_count", len(accounts)), + ion.String("from_peer", remoteNodeInfo.PeerID.String()), + ) + // NAK → dispatcher retries this page; data is not lost. + _ = pbstream.WriteDelimited(str, accountshelper.NewResultFactory(resp.GetPageIndex()).ErrBatchAck(err.Error())) + continue + } - if len(batch) == 0 { - return + // Durable in WAL + Redis — ACK so the server proceeds to the next page. + _ = pbstream.WriteDelimited(str, accountshelper.NewResultFactory(resp.GetPageIndex()).BatchAck()) + pages++ + accountsWritten += len(accounts) } - if err := s.Datarouter.WriteAccountsBatch(ctx, batch); err != nil { - logging.Logger(logging.Sync).Error(ctx, "accountsync: batch write failed", err, - ion.Int("account_count", len(batch)), - ion.String("from_peer", remoteNodeInfo.PeerID.String()), - ) + if accountsWritten == 0 { return } - logging.Logger(logging.Sync).Info(ctx, "accountsync: batch written to DB", - ion.Int("account_count", len(batch)), + logging.Logger(logging.Sync).Info(ctx, "accountsync: stream drained (durable-before-ACK)", + ion.Int("pages", pages), + ion.Int("account_count", accountsWritten), ion.String("from_peer", remoteNodeInfo.PeerID.String()), ) })