diff --git a/common/messaging/messaging.go b/common/messaging/messaging.go index 9925bbb..524a8f5 100644 --- a/common/messaging/messaging.go +++ b/common/messaging/messaging.go @@ -9,6 +9,7 @@ import ( accountspb "github.com/JupiterMetaLabs/JMDN-FastSync/common/proto/accounts" datasyncpb "github.com/JupiterMetaLabs/JMDN-FastSync/common/proto/datasync" + headersyncpb "github.com/JupiterMetaLabs/JMDN-FastSync/common/proto/headersync" potspb "github.com/JupiterMetaLabs/JMDN-FastSync/common/proto/pots" priorsyncpb "github.com/JupiterMetaLabs/JMDN-FastSync/common/proto/priorsync" "github.com/JupiterMetaLabs/JMDN-FastSync/common/types/constants" @@ -461,6 +462,44 @@ func SendDataSyncProtoDelimitedWithHeartbeat( ) } +// SendHeaderSyncProtoDelimitedWithHeartbeat is a heartbeat-aware variant of SendProtoDelimited, +// designed for the HeaderSync protocol. Fetching large header batches from the server's DB +// can exceed the 15s stream deadline; heartbeat frames reset the read deadline on each tick. +func SendHeaderSyncProtoDelimitedWithHeartbeat( + ctx context.Context, + version uint16, + host host.Host, + peerInfo peer.AddrInfo, + protocolID protocol.ID, + request proto.Message, + response *headersyncpb.HeaderSyncResponse, +) error { + if response == nil { + return errors.New("response message is nil") + } + return sendProtoDelimitedWithHeartbeatGeneric(ctx, version, host, peerInfo, protocolID, request, + streamConfig[*headersyncpb.HeaderSyncStreamMessage]{ + newEnvelope: func() *headersyncpb.HeaderSyncStreamMessage { + return &headersyncpb.HeaderSyncStreamMessage{} + }, + isHeartbeat: func(e *headersyncpb.HeaderSyncStreamMessage) bool { + _, ok := e.Payload.(*headersyncpb.HeaderSyncStreamMessage_Heartbeat) + return ok + }, + mergeResponse: func(e *headersyncpb.HeaderSyncStreamMessage) error { + p, ok := e.Payload.(*headersyncpb.HeaderSyncStreamMessage_Response) + if !ok { + return fmt.Errorf("unexpected HeaderSyncStreamMessage payload type: %T", e.Payload) + } + if p.Response != nil { + proto.Merge(response, p.Response) + } + return nil + }, + }, + ) +} + // SendPoTSProtoDelimitedWithHeartbeat is a heartbeat-aware variant of SendProtoDelimited, // designed for the PoTS (Proof of Time Sync) protocol. func SendPoTSProtoDelimitedWithHeartbeat( diff --git a/common/proto/availability/availability.pb.go b/common/proto/availability/availability.pb.go index 11cddea..052f257 100644 --- a/common/proto/availability/availability.pb.go +++ b/common/proto/availability/availability.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.9 -// protoc v6.32.0 +// protoc-gen-go v1.36.11 +// protoc v4.25.3 // source: availability/availability.proto package availability @@ -76,6 +76,7 @@ type AvailabilityResponse struct { BlockMerge uint32 `protobuf:"varint,3,opt,name=block_merge,json=blockMerge,proto3" json:"block_merge,omitempty"` Auth *auth.Auth `protobuf:"bytes,4,opt,name=auth,proto3" json:"auth,omitempty"` Phase *phase.Phase `protobuf:"bytes,5,opt,name=phase,proto3" json:"phase,omitempty"` + BlockHeight uint64 `protobuf:"varint,6,opt,name=block_height,json=blockHeight,proto3" json:"block_height,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -145,13 +146,20 @@ func (x *AvailabilityResponse) GetPhase() *phase.Phase { return nil } +func (x *AvailabilityResponse) GetBlockHeight() uint64 { + if x != nil { + return x.BlockHeight + } + return 0 +} + var File_availability_availability_proto protoreflect.FileDescriptor const file_availability_availability_proto_rawDesc = "" + "\n" + "\x1favailability/availability.proto\x12\favailability\x1a\x11phase/phase.proto\x1a\x13merkle/merkle.proto\x1a\x1cavailability/auth/auth.proto\x1a\x17nodeinfo/nodeinfo.proto\":\n" + "\x13AvailabilityRequest\x12#\n" + - "\x05range\x18\x02 \x01(\v2\r.merkle.rangeR\x05range\"\xce\x01\n" + + "\x05range\x18\x02 \x01(\v2\r.merkle.rangeR\x05range\"\xf1\x01\n" + "\x14AvailabilityResponse\x12!\n" + "\fis_available\x18\x01 \x01(\bR\visAvailable\x12.\n" + "\bnodeinfo\x18\x02 \x01(\v2\x12.nodeinfo.NodeInfoR\bnodeinfo\x12\x1f\n" + @@ -159,7 +167,8 @@ const file_availability_availability_proto_rawDesc = "" + "blockMerge\x12\x1e\n" + "\x04auth\x18\x04 \x01(\v2\n" + ".auth.AuthR\x04auth\x12\"\n" + - "\x05phase\x18\x05 \x01(\v2\f.phase.PhaseR\x05phaseBDZBgithub.com/JupiterMetaLabs/JMDN-FastSync/common/proto/availabilityb\x06proto3" + "\x05phase\x18\x05 \x01(\v2\f.phase.PhaseR\x05phase\x12!\n" + + "\fblock_height\x18\x06 \x01(\x04R\vblockHeightBDZBgithub.com/JupiterMetaLabs/JMDN-FastSync/common/proto/availabilityb\x06proto3" var ( file_availability_availability_proto_rawDescOnce sync.Once diff --git a/common/proto/availability/availability.proto b/common/proto/availability/availability.proto index 0a71ef6..ac6d5dd 100644 --- a/common/proto/availability/availability.proto +++ b/common/proto/availability/availability.proto @@ -18,4 +18,5 @@ message AvailabilityResponse { uint32 block_merge = 3; auth.Auth auth = 4; phase.Phase phase = 5; + uint64 block_height = 6; } \ No newline at end of file diff --git a/common/proto/headersync/headersync.pb.go b/common/proto/headersync/headersync.pb.go index 61ec766..19685b0 100644 --- a/common/proto/headersync/headersync.pb.go +++ b/common/proto/headersync/headersync.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.9 -// protoc v6.32.0 +// protoc-gen-go v1.36.11 +// protoc v4.25.3 // source: headersync/headersync.proto package headersync @@ -161,6 +161,132 @@ func (x *HeaderSyncResponse) GetPhase() *phase.Phase { return nil } +type HeaderSyncHeartbeat struct { + state protoimpl.MessageState `protogen:"open.v1"` + Timestamp int64 `protobuf:"varint,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *HeaderSyncHeartbeat) Reset() { + *x = HeaderSyncHeartbeat{} + mi := &file_headersync_headersync_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *HeaderSyncHeartbeat) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*HeaderSyncHeartbeat) ProtoMessage() {} + +func (x *HeaderSyncHeartbeat) ProtoReflect() protoreflect.Message { + mi := &file_headersync_headersync_proto_msgTypes[2] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use HeaderSyncHeartbeat.ProtoReflect.Descriptor instead. +func (*HeaderSyncHeartbeat) Descriptor() ([]byte, []int) { + return file_headersync_headersync_proto_rawDescGZIP(), []int{2} +} + +func (x *HeaderSyncHeartbeat) GetTimestamp() int64 { + if x != nil { + return x.Timestamp + } + return 0 +} + +type HeaderSyncStreamMessage struct { + state protoimpl.MessageState `protogen:"open.v1"` + // Types that are valid to be assigned to Payload: + // + // *HeaderSyncStreamMessage_Heartbeat + // *HeaderSyncStreamMessage_Response + Payload isHeaderSyncStreamMessage_Payload `protobuf_oneof:"payload"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *HeaderSyncStreamMessage) Reset() { + *x = HeaderSyncStreamMessage{} + mi := &file_headersync_headersync_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *HeaderSyncStreamMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*HeaderSyncStreamMessage) ProtoMessage() {} + +func (x *HeaderSyncStreamMessage) ProtoReflect() protoreflect.Message { + mi := &file_headersync_headersync_proto_msgTypes[3] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use HeaderSyncStreamMessage.ProtoReflect.Descriptor instead. +func (*HeaderSyncStreamMessage) Descriptor() ([]byte, []int) { + return file_headersync_headersync_proto_rawDescGZIP(), []int{3} +} + +func (x *HeaderSyncStreamMessage) GetPayload() isHeaderSyncStreamMessage_Payload { + if x != nil { + return x.Payload + } + return nil +} + +func (x *HeaderSyncStreamMessage) GetHeartbeat() *HeaderSyncHeartbeat { + if x != nil { + if x, ok := x.Payload.(*HeaderSyncStreamMessage_Heartbeat); ok { + return x.Heartbeat + } + } + return nil +} + +func (x *HeaderSyncStreamMessage) GetResponse() *HeaderSyncResponse { + if x != nil { + if x, ok := x.Payload.(*HeaderSyncStreamMessage_Response); ok { + return x.Response + } + } + return nil +} + +type isHeaderSyncStreamMessage_Payload interface { + isHeaderSyncStreamMessage_Payload() +} + +type HeaderSyncStreamMessage_Heartbeat struct { + Heartbeat *HeaderSyncHeartbeat `protobuf:"bytes,1,opt,name=heartbeat,proto3,oneof"` +} + +type HeaderSyncStreamMessage_Response struct { + Response *HeaderSyncResponse `protobuf:"bytes,2,opt,name=response,proto3,oneof"` +} + +func (*HeaderSyncStreamMessage_Heartbeat) isHeaderSyncStreamMessage_Payload() {} + +func (*HeaderSyncStreamMessage_Response) isHeaderSyncStreamMessage_Payload() {} + var File_headersync_headersync_proto protoreflect.FileDescriptor const file_headersync_headersync_proto_rawDesc = "" + @@ -176,7 +302,13 @@ const file_headersync_headersync_proto_rawDesc = "" + "\x06header\x18\x01 \x03(\v2\r.block.HeaderR\x06header\x12\x18\n" + "\aversion\x18\x02 \x01(\rR\aversion\x12\x1a\n" + "\x03ack\x18\x03 \x01(\v2\b.ack.AckR\x03ack\x12\"\n" + - "\x05phase\x18\x04 \x01(\v2\f.phase.PhaseR\x05phaseBBZ@github.com/JupiterMetaLabs/JMDN-FastSync/common/proto/headersyncb\x06proto3" + "\x05phase\x18\x04 \x01(\v2\f.phase.PhaseR\x05phase\"3\n" + + "\x13HeaderSyncHeartbeat\x12\x1c\n" + + "\ttimestamp\x18\x01 \x01(\x03R\ttimestamp\"\xa3\x01\n" + + "\x17HeaderSyncStreamMessage\x12?\n" + + "\theartbeat\x18\x01 \x01(\v2\x1f.headersync.HeaderSyncHeartbeatH\x00R\theartbeat\x12<\n" + + "\bresponse\x18\x02 \x01(\v2\x1e.headersync.HeaderSyncResponseH\x00R\bresponseB\t\n" + + "\apayloadBBZ@github.com/JupiterMetaLabs/JMDN-FastSync/common/proto/headersyncb\x06proto3" var ( file_headersync_headersync_proto_rawDescOnce sync.Once @@ -190,27 +322,31 @@ func file_headersync_headersync_proto_rawDescGZIP() []byte { return file_headersync_headersync_proto_rawDescData } -var file_headersync_headersync_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_headersync_headersync_proto_msgTypes = make([]protoimpl.MessageInfo, 4) var file_headersync_headersync_proto_goTypes = []any{ - (*HeaderSyncRequest)(nil), // 0: headersync.HeaderSyncRequest - (*HeaderSyncResponse)(nil), // 1: headersync.HeaderSyncResponse - (*tagging.Tag)(nil), // 2: tagging.Tag - (*ack.Ack)(nil), // 3: ack.Ack - (*phase.Phase)(nil), // 4: phase.Phase - (*block.Header)(nil), // 5: block.Header + (*HeaderSyncRequest)(nil), // 0: headersync.HeaderSyncRequest + (*HeaderSyncResponse)(nil), // 1: headersync.HeaderSyncResponse + (*HeaderSyncHeartbeat)(nil), // 2: headersync.HeaderSyncHeartbeat + (*HeaderSyncStreamMessage)(nil), // 3: headersync.HeaderSyncStreamMessage + (*tagging.Tag)(nil), // 4: tagging.Tag + (*ack.Ack)(nil), // 5: ack.Ack + (*phase.Phase)(nil), // 6: phase.Phase + (*block.Header)(nil), // 7: block.Header } var file_headersync_headersync_proto_depIdxs = []int32{ - 2, // 0: headersync.HeaderSyncRequest.tag:type_name -> tagging.Tag - 3, // 1: headersync.HeaderSyncRequest.ack:type_name -> ack.Ack - 4, // 2: headersync.HeaderSyncRequest.phase:type_name -> phase.Phase - 5, // 3: headersync.HeaderSyncResponse.header:type_name -> block.Header - 3, // 4: headersync.HeaderSyncResponse.ack:type_name -> ack.Ack - 4, // 5: headersync.HeaderSyncResponse.phase:type_name -> phase.Phase - 6, // [6:6] is the sub-list for method output_type - 6, // [6:6] is the sub-list for method input_type - 6, // [6:6] is the sub-list for extension type_name - 6, // [6:6] is the sub-list for extension extendee - 0, // [0:6] is the sub-list for field type_name + 4, // 0: headersync.HeaderSyncRequest.tag:type_name -> tagging.Tag + 5, // 1: headersync.HeaderSyncRequest.ack:type_name -> ack.Ack + 6, // 2: headersync.HeaderSyncRequest.phase:type_name -> phase.Phase + 7, // 3: headersync.HeaderSyncResponse.header:type_name -> block.Header + 5, // 4: headersync.HeaderSyncResponse.ack:type_name -> ack.Ack + 6, // 5: headersync.HeaderSyncResponse.phase:type_name -> phase.Phase + 2, // 6: headersync.HeaderSyncStreamMessage.heartbeat:type_name -> headersync.HeaderSyncHeartbeat + 1, // 7: headersync.HeaderSyncStreamMessage.response:type_name -> headersync.HeaderSyncResponse + 8, // [8:8] is the sub-list for method output_type + 8, // [8:8] is the sub-list for method input_type + 8, // [8:8] is the sub-list for extension type_name + 8, // [8:8] is the sub-list for extension extendee + 0, // [0:8] is the sub-list for field type_name } func init() { file_headersync_headersync_proto_init() } @@ -218,13 +354,17 @@ func file_headersync_headersync_proto_init() { if File_headersync_headersync_proto != nil { return } + file_headersync_headersync_proto_msgTypes[3].OneofWrappers = []any{ + (*HeaderSyncStreamMessage_Heartbeat)(nil), + (*HeaderSyncStreamMessage_Response)(nil), + } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_headersync_headersync_proto_rawDesc), len(file_headersync_headersync_proto_rawDesc)), NumEnums: 0, - NumMessages: 2, + NumMessages: 4, NumExtensions: 0, NumServices: 0, }, diff --git a/common/proto/headersync/headersync.proto b/common/proto/headersync/headersync.proto index 1116f99..2cb7074 100644 --- a/common/proto/headersync/headersync.proto +++ b/common/proto/headersync/headersync.proto @@ -21,4 +21,15 @@ message HeaderSyncResponse { uint32 version = 2; ack.Ack ack = 3; phase.Phase phase = 4; +} + +message HeaderSyncHeartbeat { + int64 timestamp = 1; +} + +message HeaderSyncStreamMessage { + oneof payload { + HeaderSyncHeartbeat heartbeat = 1; + HeaderSyncResponse response = 2; + } } \ No newline at end of file diff --git a/common/types/constants/constants.go b/common/types/constants/constants.go index 684781b..8376ad0 100644 --- a/common/types/constants/constants.go +++ b/common/types/constants/constants.go @@ -52,7 +52,7 @@ const ( ) const ( - AUTH_TTL = 2 * time.Minute + AUTH_TTL = 48 * time.Hour ) const ( @@ -67,7 +67,7 @@ const ( MAX_DATA_PER_REQUEST = 30 MIN_BLOCKS = 500 // if number of blocks in the client is less than 500 then do the full sync. MAX_PARALLEL_REQUESTS = 10 - ATMOST_ACCOUNT_ROUTINES = 15 + ATMOST_ACCOUNT_ROUTINES = 3 LRU_CACHE_CAPACITY = 200 ) diff --git a/common/types/nodeinfo.go b/common/types/nodeinfo.go index 0033ba6..9fe9637 100644 --- a/common/types/nodeinfo.go +++ b/common/types/nodeinfo.go @@ -100,8 +100,20 @@ type WriteData interface { type AccountUpdate struct { Address string NewBalance *big.Int - Nonce uint64 - IsNewAccount bool // true = CreateAccount, false = UpdateAccountBalance + Nonce uint64 // max outgoing tx.Nonce in this range (0 if account never sent) + TxNonce uint64 // max outgoing tx.Nonce + 1 (next expected nonce per Processing.go) + TxCountSent uint64 // number of outgoing txs in the range + IsNewAccount bool // true = CreateAccount, false = UpdateAccountBalance +} + +// AccountDelta holds the net balance/nonce effect for one account over a block range. +// Computed in a single O(blocks) BlockIterator pass; avoids per-account DB scans. +type AccountDelta struct { + BalanceDelta *big.Int // net change: negative = debit, positive = credit + Nonce uint64 // max outgoing tx.Nonce (0 if IsSender == false) + TxNonce uint64 // max outgoing tx.Nonce + 1 (per Processing.go TxNonce semantics) + TxCountSent uint64 // number of outgoing txs in the range + IsSender bool // true if the account sent at least one tx } // AccountManager handles account balance operations for reconciliation. @@ -109,6 +121,11 @@ type AccountManager interface { // GetTransactionsForAccount retrieves all transactions where the account is sender or receiver. GetTransactionsForAccount(accountAddress string) ([]DBTransaction, error) + // GetTransactionsForAccountInRange retrieves transactions in [fromBlock, toBlock] inclusive + // where the account is sender or receiver. Pass math.MaxUint64 for toBlock to mean "up to latest." + // Used by delta-only reconciliation so each sync pass replays only new transactions. + GetTransactionsForAccountInRange(accountAddress string, fromBlock, toBlock uint64) ([]DBTransaction, error) + // GetAccountBalance retrieves the current balance and nonce for an account. GetAccountBalance(accountAddress string) (*big.Int, uint64, error) diff --git a/core/catchup/catchup.go b/core/catchup/catchup.go new file mode 100644 index 0000000..e300a01 --- /dev/null +++ b/core/catchup/catchup.go @@ -0,0 +1,438 @@ +package catchup + +import ( + "context" + "fmt" + "math" + + "github.com/JupiterMetaLabs/JMDN-FastSync/common/WAL" + ackpb "github.com/JupiterMetaLabs/JMDN-FastSync/common/proto/ack" + availabilitypb "github.com/JupiterMetaLabs/JMDN-FastSync/common/proto/availability" + datasyncpb "github.com/JupiterMetaLabs/JMDN-FastSync/common/proto/datasync" + headersyncpb "github.com/JupiterMetaLabs/JMDN-FastSync/common/proto/headersync" + phasepb "github.com/JupiterMetaLabs/JMDN-FastSync/common/proto/phase" + taggingpb "github.com/JupiterMetaLabs/JMDN-FastSync/common/proto/tagging" + "github.com/JupiterMetaLabs/JMDN-FastSync/common/types" + "github.com/JupiterMetaLabs/JMDN-FastSync/common/types/constants" + "github.com/JupiterMetaLabs/JMDN-FastSync/core/accountsync" + "github.com/JupiterMetaLabs/JMDN-FastSync/core/availability" + "github.com/JupiterMetaLabs/JMDN-FastSync/core/datasync" + "github.com/JupiterMetaLabs/JMDN-FastSync/core/headersync" + "github.com/JupiterMetaLabs/JMDN-FastSync/core/pots" + potshelper "github.com/JupiterMetaLabs/JMDN-FastSync/core/pots/helper" + "github.com/JupiterMetaLabs/JMDN-FastSync/core/reconsillation" + "github.com/JupiterMetaLabs/JMDN-FastSync/core/reconsillation/LRUCache" + Log "github.com/JupiterMetaLabs/JMDN-FastSync/logging" + "github.com/JupiterMetaLabs/ion" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" +) + +const namedlogger = "log:catchup" + +// CatchUp holds state for a single catch-up sync session. +type CatchUp struct { + SyncVars *types.Syncvars +} + +// NewCatchUp returns a CatchUp ready for SetSyncVars. +func NewCatchUp() CatchUp_router { + return &CatchUp{} +} + +func (c *CatchUp) SetSyncVars(ctx context.Context, protocolVersion uint16, nodeInfo types.Nodeinfo, node host.Host, wal *WAL.WAL) CatchUp_router { + if c.SyncVars == nil { + c.SyncVars = &types.Syncvars{} + } + c.SyncVars.Version = protocolVersion + c.SyncVars.NodeInfo = nodeInfo + c.SyncVars.Ctx = ctx + c.SyncVars.WAL = wal + c.SyncVars.Node = node + return c +} + +func (c *CatchUp) GetSyncVars() *types.Syncvars { + return c.SyncVars +} + +func (c *CatchUp) Close() { + c.SyncVars = nil +} + +// Run executes the full catch-up pipeline: +// +// Phase 1 — Availability probe +// Phase 2 — HeaderSync (no Merkle confirmation round-trip) +// Phase 3 — DataSync +// Phase 4 — AccountSync +// Phase 5 — Reconciliation +// Phase 6 — PoTS gap fill (blocks produced while phases 2-5 ran) +func (c *CatchUp) Run(ctx context.Context, fromBlock uint64, peers []types.Nodeinfo) error { + if c.SyncVars == nil { + return fmt.Errorf("catchup: SetSyncVars not called") + } + if len(peers) == 0 { + return fmt.Errorf("catchup: no peers provided") + } + + // ── Phase 1: Availability probe ─────────────────────────────────────── + // Ask each remote what range it has and get an auth token. + // fromBlock is the first block we need; pass it as range start so the + // server can confirm it has data that far. + Log.Logger(namedlogger).Info(ctx, "catchup: phase 1 — availability probe", + ion.Uint64("from_block", fromBlock)) + + avail := availability.NewAvailability().SetSyncVars(ctx, c.SyncVars.Version, c.SyncVars.NodeInfo, c.SyncVars.Node, c.SyncVars.WAL) + remotes, err := avail.SendMultipleAvailabilityRequest(ctx, c.SyncVars, peers, fromBlock, math.MaxUint64) + if err != nil { + return fmt.Errorf("catchup: availability probe: %w", err) + } + + // Filter to peers that are available and have data at fromBlock. + remotes = filterAvailable(remotes, fromBlock) + if len(remotes) == 0 { + return fmt.Errorf("catchup: no available peers have data from block %d", fromBlock) + } + + // remoteTip is the highest block number across all available peers. + remoteTip := highestBlock(remotes) + if remoteTip < fromBlock { + return fmt.Errorf("catchup: remoteTip %d < fromBlock %d — nothing to sync", remoteTip, fromBlock) + } + + Log.Logger(namedlogger).Info(ctx, "catchup: availability ok", + ion.Int("peers", len(remotes)), + ion.Uint64("from_block", fromBlock), + ion.Uint64("remote_tip", remoteTip)) + + // ── PoTS WAL: open before phases 2-5 so blocks produced during the + // catch-up window are captured and replayed afterwards. + potsRouter := pots.NewPoTS(). + SetSyncVars(ctx, c.SyncVars.Version, c.SyncVars.NodeInfo, c.SyncVars.Node). + SetWAL(ctx, c.SyncVars.WAL) + defer potsRouter.Close() + + // ── Build the catch-up tag: one range [fromBlock..remoteTip] ────────── + // We already know exactly what's missing — no Merkle bisection needed. + catchUpTag := &taggingpb.Tag{ + Range: []*taggingpb.RangeTag{ + {Start: fromBlock, End: remoteTip}, + }, + } + + // auth from primary remote (index 0) drives phase and auth fields. + primaryAuth := remotes[0].GetAuth() + + // ── Phase 2: HeaderSync ─────────────────────────────────────────────── + Log.Logger(namedlogger).Info(ctx, "catchup: phase 2 — header sync", + ion.Uint64("from", fromBlock), + ion.Uint64("to", remoteTip)) + + hs := headersync.NewHeaderSync(). + SetSyncVars(ctx, c.SyncVars.Version, c.SyncVars.NodeInfo, c.SyncVars.Node, c.SyncVars.WAL) + + headerReq := &headersyncpb.HeaderSyncRequest{ + Tag: catchUpTag, + Ack: &ackpb.Ack{Ok: true}, + Phase: &phasepb.Phase{ + PresentPhase: constants.HEADER_SYNC_REQUEST, + SuccessivePhase: constants.HEADER_SYNC_RESPONSE, + Success: true, + Auth: primaryAuth, + }, + } + + // syncConfirmation=false: skip Merkle round-trip, we know the exact range. + dataSyncReq, err := hs.HeaderSync(headerReq, remotes, false) + if err != nil { + return fmt.Errorf("catchup: header sync: %w", err) + } + + Log.Logger(namedlogger).Info(ctx, "catchup: phase 2 complete") + + // ── Phase 3: DataSync ───────────────────────────────────────────────── + Log.Logger(namedlogger).Info(ctx, "catchup: phase 3 — data sync") + + ds := datasync.NewDataSync(). + SetSyncVars(ctx, c.SyncVars.Version, c.SyncVars.NodeInfo, c.SyncVars.Node, c.SyncVars.WAL) + + // If HeaderSync returned nil (range was already present), build the + // DataSyncRequest directly from the catch-up tag. + if dataSyncReq == nil { + dataSyncReq = &datasyncpb.DataSyncRequest{ + Tag: catchUpTag, + Version: uint32(c.SyncVars.Version), + Ack: &ackpb.Ack{Ok: true}, + Phase: &phasepb.Phase{ + PresentPhase: constants.DATA_SYNC_REQUEST, + SuccessivePhase: constants.DATA_SYNC_RESPONSE, + Success: true, + Auth: primaryAuth, + }, + } + } + + taggedAccounts, err := ds.DataSync(dataSyncReq, remotes) + if err != nil { + return fmt.Errorf("catchup: data sync: %w", err) + } + + Log.Logger(namedlogger).Info(ctx, "catchup: phase 3 complete") + + // ── Phase 4: AccountSync ────────────────────────────────────────────── + // Syncs zero-tx accounts not covered by DataSync TaggedAccounts. + Log.Logger(namedlogger).Info(ctx, "catchup: phase 4 — account sync") + + as := accountsync.NewAccountSync(). + SetSyncVars(ctx, c.SyncVars.Version, c.SyncVars.NodeInfo, c.SyncVars.Node, c.SyncVars.WAL) + + totalSynced, err := as.AccountSync(remotes[0]) + if err != nil { + // Non-fatal: reconciliation still runs over whatever accounts we have. + Log.Logger(namedlogger).Warn(ctx, "catchup: account sync failed — continuing", + ion.Err(err)) + } else { + Log.Logger(namedlogger).Info(ctx, "catchup: phase 4 complete", + ion.Uint64("accounts_synced", totalSynced)) + } + + // ── Phase 5: Reconciliation ─────────────────────────────────────────── + Log.Logger(namedlogger).Info(ctx, "catchup: phase 5 — reconciliation") + + lru := LRUCache.NewLRUCache(constants.LRU_CACHE_CAPACITY) + reconInst := reconsillation.NewReconciliation() + reconInst.SetLRUCache(lru) + rec := reconInst.SetSyncVars(ctx, c.SyncVars.Version, c.SyncVars.NodeInfo, c.SyncVars.WAL) + + committed, failed, err := rec.Reconcile(taggedAccounts, remotes[0], fromBlock, remoteTip) + if err != nil { + return fmt.Errorf("catchup: reconciliation: %w", err) + } + if len(failed) > 0 { + Log.Logger(namedlogger).Warn(ctx, "catchup: some accounts failed reconciliation", + ion.Int("failed", len(failed)), + ion.Int("committed", committed)) + } else { + Log.Logger(namedlogger).Info(ctx, "catchup: phase 5 complete", + ion.Int("committed", committed)) + } + + // ── Phase 6: PoTS gap fill ──────────────────────────────────────────── + // Phases 2-5 can take minutes to hours for large catch-ups; the server-side + // AUTH_TTL (2 min) will have long expired. Refresh auth before PoTS so the + // PoTS request and the secondary HeaderSync/DataSync pass are authenticated. + Log.Logger(namedlogger).Info(ctx, "catchup: phase 6 — re-auth before PoTS") + + freshAvail := availability.NewAvailability().SetSyncVars(ctx, c.SyncVars.Version, c.SyncVars.NodeInfo, c.SyncVars.Node, c.SyncVars.WAL) + freshRemotes, refreshErr := freshAvail.SendMultipleAvailabilityRequest(ctx, c.SyncVars, peers, 0, math.MaxUint64) + if refreshErr == nil { + freshRemotes = filterAvailable(freshRemotes, 0) + if len(freshRemotes) > 0 { + remotes = freshRemotes + Log.Logger(namedlogger).Info(ctx, "catchup: re-auth ok", + ion.String("uuid", remotes[0].GetAuth().GetUUID())) + } + } else { + Log.Logger(namedlogger).Warn(ctx, "catchup: re-auth failed — proceeding with original token", + ion.Err(refreshErr)) + } + + Log.Logger(namedlogger).Info(ctx, "catchup: phase 6 — PoTS gap fill") + + if err := c.runPoTS(ctx, potsRouter, remotes, remoteTip); err != nil { + return fmt.Errorf("catchup: PoTS gap fill: %w", err) + } + + Log.Logger(namedlogger).Info(ctx, "catchup: all phases complete — node is caught up", + ion.Uint64("synced_to", remoteTip)) + + return nil +} + +// runPoTS builds a PoTSRequest from the PoTS WAL and fetches any gap blocks +// that arrived on the remote while phases 2-5 were running. +func (c *CatchUp) runPoTS(ctx context.Context, potsRouter pots.PoTS_router, remotes []*availabilitypb.AvailabilityResponse, syncedTip uint64) error { + potsWAL, err := potsRouter.GetWAL() + if err != nil { + // No WAL means no blocks were buffered — nothing to do. + Log.Logger(namedlogger).Info(ctx, "catchup: PoTS WAL not initialised — skipping gap fill") + return nil + } + + // Read buffered blocks from the PoTS WAL. + walBlocks, err := potsWAL.Read(ctx, 0, 0) // offset=0, limit=0 → all + if err != nil { + return fmt.Errorf("read PoTS WAL: %w", err) + } + + if len(walBlocks) == 0 { + Log.Logger(namedlogger).Info(ctx, "catchup: PoTS WAL is empty — no gap blocks") + return nil + } + + // Build the blocks map: blockNumber → blockHash. + blocks := make(map[uint64][]byte, len(walBlocks)) + for _, b := range walBlocks { + if b != nil { + blocks[b.BlockNumber] = b.BlockHash[:] + } + } + + latestWALBlock, err := potsWAL.GetLatestBlockNumber(ctx) + if err != nil { + return fmt.Errorf("PoTS WAL latest block: %w", err) + } + + remoteNodeInfo, err := remoteToNodeinfo(remotes[0]) + if err != nil { + return fmt.Errorf("PoTS remote nodeinfo: %w", err) + } + + potsReq := potshelper.NewPoTSRequestBuilder(). + AddMap(blocks). + AddLatestBlock(latestWALBlock). + AddAuth(remotes[0].GetAuth()). + Build() + + potsResp, err := potsRouter.SendPoTSRequest(ctx, potsReq, *remoteNodeInfo) + if err != nil { + return fmt.Errorf("PoTS request: %w", err) + } + + if potsResp.Tag == nil || (len(potsResp.Tag.Range) == 0 && len(potsResp.Tag.BlockNumber) == 0) { + Log.Logger(namedlogger).Info(ctx, "catchup: PoTS — no gap blocks") + return nil + } + + Log.Logger(namedlogger).Info(ctx, "catchup: PoTS gap detected — fetching", + ion.Int("ranges", len(potsResp.Tag.Range)), + ion.Int("blocks", len(potsResp.Tag.BlockNumber))) + + // Fetch gap headers and data using existing HeaderSync/DataSync (no confirmation). + hs := headersync.NewHeaderSync(). + SetSyncVars(ctx, c.SyncVars.Version, c.SyncVars.NodeInfo, c.SyncVars.Node, c.SyncVars.WAL) + + gapHeaderReq := &headersyncpb.HeaderSyncRequest{ + Tag: potsResp.Tag, + Ack: &ackpb.Ack{Ok: true}, + Phase: &phasepb.Phase{ + PresentPhase: constants.HEADER_SYNC_REQUEST, + SuccessivePhase: constants.HEADER_SYNC_RESPONSE, + Success: true, + Auth: remotes[0].GetAuth(), + }, + } + + gapDataReq, err := hs.HeaderSync(gapHeaderReq, remotes, false) + if err != nil { + return fmt.Errorf("PoTS header sync: %w", err) + } + + if gapDataReq == nil { + gapDataReq = &datasyncpb.DataSyncRequest{ + Tag: potsResp.Tag, + Version: uint32(c.SyncVars.Version), + Ack: &ackpb.Ack{Ok: true}, + Phase: &phasepb.Phase{ + PresentPhase: constants.DATA_SYNC_REQUEST, + SuccessivePhase: constants.DATA_SYNC_RESPONSE, + Success: true, + Auth: remotes[0].GetAuth(), + }, + } + } + + ds := datasync.NewDataSync(). + SetSyncVars(ctx, c.SyncVars.Version, c.SyncVars.NodeInfo, c.SyncVars.Node, c.SyncVars.WAL) + + gapTaggedAccounts, err := ds.DataSync(gapDataReq, remotes) + if err != nil { + return fmt.Errorf("PoTS data sync: %w", err) + } + + // Reconcile gap accounts. + if gapTaggedAccounts != nil && len(gapTaggedAccounts.Accounts) > 0 { + lru := LRUCache.NewLRUCache(constants.LRU_CACHE_CAPACITY) + gapReconInst := reconsillation.NewReconciliation() + gapReconInst.SetLRUCache(lru) + rec := gapReconInst.SetSyncVars(ctx, c.SyncVars.Version, c.SyncVars.NodeInfo, c.SyncVars.WAL) + + // PoTS blocks are produced after syncedTip — use syncedTip+1 as fromBlock. + committed, failed, err := rec.Reconcile(gapTaggedAccounts, remotes[0], syncedTip+1, math.MaxUint64) + if err != nil { + return fmt.Errorf("PoTS reconciliation: %w", err) + } + Log.Logger(namedlogger).Info(ctx, "catchup: PoTS reconciliation complete", + ion.Int("committed", committed), + ion.Int("failed", len(failed))) + } + + // Hydrate PoTS WAL into the main DB. + if err := potsWAL.Close(); err != nil { + Log.Logger(namedlogger).Warn(ctx, "catchup: PoTS WAL close failed", ion.Err(err)) + } + + Log.Logger(namedlogger).Info(ctx, "catchup: phase 6 complete") + return nil +} + +// ─── helpers ────────────────────────────────────────────────────────────────── + +// filterAvailable removes peers that are not available, have no auth token, or +// whose reported tip (BlockMerge) does not reach fromBlock. +func filterAvailable(remotes []*availabilitypb.AvailabilityResponse, fromBlock uint64) []*availabilitypb.AvailabilityResponse { + out := remotes[:0] + for _, r := range remotes { + if r == nil || !r.IsAvailable { + continue + } + if r.Auth == nil || r.Auth.UUID == "" { + continue + } + // BlockMerge is the server's current tip block number. + if uint64(r.BlockMerge) < fromBlock { + continue + } + out = append(out, r) + } + return out +} + +// highestBlock returns the maximum BlockMerge (tip) across all availability responses. +func highestBlock(remotes []*availabilitypb.AvailabilityResponse) uint64 { + var tip uint64 + for _, r := range remotes { + if uint64(r.BlockMerge) > tip { + tip = uint64(r.BlockMerge) + } + } + return tip +} + +// remoteToNodeinfo parses a types.Nodeinfo from an AvailabilityResponse. +func remoteToNodeinfo(r *availabilitypb.AvailabilityResponse) (*types.Nodeinfo, error) { + if r == nil || r.Nodeinfo == nil { + return nil, fmt.Errorf("nil availability response or nodeinfo") + } + + var maddrs []multiaddr.Multiaddr + for _, b := range r.Nodeinfo.Multiaddrs { + ma, err := multiaddr.NewMultiaddrBytes(b) + if err == nil { + maddrs = append(maddrs, ma) + } + } + + pid, err := peer.IDFromBytes(r.Nodeinfo.PeerId) + if err != nil { + return nil, fmt.Errorf("parse peer ID: %w", err) + } + + return &types.Nodeinfo{ + PeerID: pid, + Multiaddr: maddrs, + Version: uint16(r.Nodeinfo.Version), + }, nil +} diff --git a/core/catchup/interface.go b/core/catchup/interface.go new file mode 100644 index 0000000..cefb9c2 --- /dev/null +++ b/core/catchup/interface.go @@ -0,0 +1,40 @@ +package catchup + +import ( + "context" + + "github.com/JupiterMetaLabs/JMDN-FastSync/common/WAL" + "github.com/JupiterMetaLabs/JMDN-FastSync/common/types" + "github.com/libp2p/go-libp2p/core/host" +) + +// CatchUp_router drives the catch-up sync that reconciles blocks from +// a known local tip (fromBlock) to the current chain tip on remote peers. +// +// Intended use: after a bootstrap snapshot has loaded blocks [0..X] into +// the local DB, call Run(ctx, X+1, peers) to pull [X+1..remoteTip] and +// replay all account balances. +// +// Phase order: +// 1. Availability probe → get auth tokens, discover remoteTip +// 2. HeaderSync (syncConfirmation=false) → fetch headers [fromBlock..remoteTip] +// 3. DataSync → fetch block bodies [fromBlock..remoteTip] +// 4. AccountSync → sync zero-tx accounts missed by DataSync tagging +// 5. Reconciliation → replay txs, compute final balances +// 6. PoTS gap fill → catch blocks produced while phases 2-5 were running +type CatchUp_router interface { + // SetSyncVars initialises the catch-up module. Must be called before Run. + SetSyncVars(ctx context.Context, protocolVersion uint16, nodeInfo types.Nodeinfo, node host.Host, wal *WAL.WAL) CatchUp_router + + // Run executes phases 1-6 against the supplied remote peers. + // fromBlock is the first block not present in the local DB (bootstrap tip + 1). + // peers is the list of candidate remote nodes; at least one must be reachable. + // Returns when the node is fully caught up and PoTS gaps are filled. + Run(ctx context.Context, fromBlock uint64, peers []types.Nodeinfo) error + + // GetSyncVars returns the current sync configuration. + GetSyncVars() *types.Syncvars + + // Close releases resources. + Close() +} diff --git a/core/protocol/communication/communication.go b/core/protocol/communication/communication.go index 3a1010e..30f0d00 100644 --- a/core/protocol/communication/communication.go +++ b/core/protocol/communication/communication.go @@ -190,6 +190,7 @@ func (c *communication) SendMerkleRequest( } // SendHeaderSyncRequest sends a HeaderSyncRequest to a peer and returns the HeaderSyncResponse. +// Uses a heartbeat-aware stream so large header batches don't hit the 15s read deadline. func (c *communication) SendHeaderSyncRequest( ctx context.Context, peerNode types.Nodeinfo, @@ -199,17 +200,14 @@ func (c *communication) SendHeaderSyncRequest( return nil, errors.New("host is nil") } - // Prepare peer.AddrInfo from types.Nodeinfo peerInfo := libp2p_peer.AddrInfo{ ID: peerNode.PeerID, Addrs: peerNode.Multiaddr, } - // Prepare response container resp := &headersyncpb.HeaderSyncResponse{} - // Send using SendProtoDelimited with HeaderSyncProtocol - if err := messaging.SendProtoDelimited( + if err := messaging.SendHeaderSyncProtoDelimitedWithHeartbeat( ctx, c.protocolVersion, c.host, diff --git a/core/protocol/router/data_router.go b/core/protocol/router/data_router.go index 3dbe793..a98be18 100644 --- a/core/protocol/router/data_router.go +++ b/core/protocol/router/data_router.go @@ -494,6 +494,7 @@ func (router *Datarouter) HandleAvailability(ctx context.Context, req *availabil } template.BlockMerge = uint32(blockmerge.BlockMerge) + template.BlockHeight = router.Nodeinfo.BlockInfo.GetBlockNumber() template.IsAvailable = true template.Phase.SuccessivePhase = constants.SYNC_REQUEST diff --git a/core/reconsillation/interface.go b/core/reconsillation/interface.go index 129c61a..86e764b 100644 --- a/core/reconsillation/interface.go +++ b/core/reconsillation/interface.go @@ -23,9 +23,17 @@ type Reconciliation_router interface { GetSyncVars() *types.Syncvars // Reconcile calculates updated balances for all tagged accounts by querying - // their transactions and updates the accounts table in the database. + // transactions in [fromBlock, toBlock] and adding the delta to each account's + // current DB balance (delta-only approach — safe for repeated sync runs). + // Pass math.MaxUint64 for toBlock to mean "up to latest block in DB." // Returns the number of accounts successfully reconciled and list of failed accounts. - Reconcile(taggedAccounts *tagging.TaggedAccounts, remote *availabilitypb.AvailabilityResponse) (int, []string, error) + Reconcile(taggedAccounts *tagging.TaggedAccounts, remote *availabilitypb.AvailabilityResponse, fromBlock, toBlock uint64) (int, []string, error) + + // ReconcileWithDeltas applies pre-computed per-account balance deltas, skipping + // the per-account GetTransactionsForAccountInRange DB scan entirely. + // deltas must be keyed by lowercase 0x-prefixed hex address. + // Returns the number of accounts committed and a list of addresses that failed. + ReconcileWithDeltas(deltas map[string]*types.AccountDelta, remote *availabilitypb.AvailabilityResponse) (int, []string, error) // Close releases resources and cleans up. Close() diff --git a/core/reconsillation/reconsillation.go b/core/reconsillation/reconsillation.go index f8fab90..e926617 100644 --- a/core/reconsillation/reconsillation.go +++ b/core/reconsillation/reconsillation.go @@ -126,7 +126,7 @@ func (r *Reconciliation) GetBlockFromLRUCache(blockNumber uint64) (*blockpb.Head // // Returns the number of accounts reconciled and the list of any accounts that failed // during the computation phase. A non-nil error always means the DB was NOT mutated. -func (r *Reconciliation) Reconcile(taggedAccounts *taggingpb.TaggedAccounts, remote *availabilitypb.AvailabilityResponse) (int, []string, error) { +func (r *Reconciliation) Reconcile(taggedAccounts *taggingpb.TaggedAccounts, remote *availabilitypb.AvailabilityResponse, fromBlock, toBlock uint64) (int, []string, error) { if taggedAccounts == nil || len(taggedAccounts.Accounts) == 0 { Log.Logger(namedlogger).Info(r.SyncVars.Ctx, "No tagged accounts to reconcile") return 0, nil, nil @@ -137,7 +137,7 @@ func (r *Reconciliation) Reconcile(taggedAccounts *taggingpb.TaggedAccounts, rem defer cancel() numAccounts := len(taggedAccounts.Accounts) - Log.Logger(namedlogger).Debug(ctx, "Starting reconciliation", + Log.Logger(namedlogger).Info(ctx, "Starting reconciliation", ion.Int("tagged_accounts_count", numAccounts)) // ---------------------------------------------------------------- @@ -163,7 +163,7 @@ func (r *Reconciliation) Reconcile(taggedAccounts *taggingpb.TaggedAccounts, rem go func(accounts map[string]bool) { defer wg.Done() for addr := range accounts { - update, err := r.computeAccountUpdate(accountManager, addr) + update, err := r.computeAccountUpdate(accountManager, addr, fromBlock, toBlock) resultCh <- computeResult{update: update, addr: addr, err: err} } }(batch) @@ -194,7 +194,7 @@ func (r *Reconciliation) Reconcile(taggedAccounts *taggingpb.TaggedAccounts, rem } collected++ if collected%logInterval == 0 { - Log.Logger(namedlogger).Debug(ctx, "Phase 1 progress — computing account states", + Log.Logger(namedlogger).Info(ctx, "Phase 1 progress — computing account states", ion.Int("collected", collected), ion.Int("total", numAccounts), ion.Int("failed_so_far", len(computeErrs))) @@ -275,7 +275,7 @@ func (r *Reconciliation) Reconcile(taggedAccounts *taggingpb.TaggedAccounts, rem // ---------------------------------------------------------------- if r.SyncVars.WAL != nil { walStart := time.Now() - Log.Logger(namedlogger).Debug(ctx, "Phase 2 starting — writing WAL batch event", + Log.Logger(namedlogger).Info(ctx, "Phase 2 starting — writing WAL batch event", ion.Int("accounts", len(updates))) entries := make([]WAL.ReconciliationBatchEntry, len(updates)) @@ -305,7 +305,7 @@ func (r *Reconciliation) Reconcile(taggedAccounts *taggingpb.TaggedAccounts, rem // Phase 3: Atomic DB commit — all or none // ---------------------------------------------------------------- dbStart := time.Now() - Log.Logger(namedlogger).Debug(ctx, "Phase 3 starting — atomic DB commit", + Log.Logger(namedlogger).Info(ctx, "Phase 3 starting — atomic DB commit", ion.Int("accounts_to_commit", len(updates))) if err := accountManager.BatchUpdateAccounts(updates); err != nil { @@ -325,20 +325,259 @@ func (r *Reconciliation) Reconcile(taggedAccounts *taggingpb.TaggedAccounts, rem return len(updates), nil, nil } +// ReconcileWithDeltas applies pre-computed per-account balance deltas without +// querying the DB per account. Identical three-phase commit structure as Reconcile +// (compute → WAL → atomic DB write), but skips GetTransactionsForAccountInRange. +// +// deltas must be keyed by lowercase 0x-prefixed hex address. +func (r *Reconciliation) ReconcileWithDeltas(deltas map[string]*types.AccountDelta, remote *availabilitypb.AvailabilityResponse) (int, []string, error) { + if len(deltas) == 0 { + Log.Logger(namedlogger).Info(r.SyncVars.Ctx, "ReconcileWithDeltas: no deltas to apply") + return 0, nil, nil + } + + ctx, cancel := context.WithCancel(r.SyncVars.Ctx) + defer cancel() + + numAccounts := len(deltas) + Log.Logger(namedlogger).Info(ctx, "ReconcileWithDeltas starting", + ion.Int("accounts", numAccounts)) + + // Build a slice of addresses for batch dispatch. + addrs := make([]string, 0, numAccounts) + for addr := range deltas { + addrs = append(addrs, addr) + } + + accountManager := r.SyncVars.NodeInfo.BlockInfo.NewAccountManager() + + type computeResult struct { + update types.AccountUpdate + addr string + err error + } + + // ---------------------------------------------------------------- + // Phase 1: Concurrently apply deltas — no DB writes + // ---------------------------------------------------------------- + numRoutines := min(numAccounts, constants.ATMOST_ACCOUNT_ROUTINES) + batchSize := (numAccounts + numRoutines - 1) / numRoutines + + resultCh := make(chan computeResult, numAccounts) + var wg sync.WaitGroup + + for i := 0; i < len(addrs); i += batchSize { + end := i + batchSize + if end > len(addrs) { + end = len(addrs) + } + batch := addrs[i:end] + wg.Add(1) + go func(batch []string) { + defer wg.Done() + for _, addr := range batch { + delta := deltas[addr] + update, err := r.computeUpdateFromDelta(accountManager, addr, delta) + resultCh <- computeResult{update: update, addr: addr, err: err} + } + }(batch) + } + + go func() { + wg.Wait() + close(resultCh) + }() + + updates := make([]types.AccountUpdate, 0, numAccounts) + var failedAccounts []string + var computeErrs []error + + logInterval := numAccounts / 10 + if logInterval == 0 { + logInterval = 1 + } + collected := 0 + + for res := range resultCh { + if res.err != nil { + failedAccounts = append(failedAccounts, res.addr) + computeErrs = append(computeErrs, res.err) + } else { + updates = append(updates, res.update) + } + collected++ + if collected%logInterval == 0 { + Log.Logger(namedlogger).Info(ctx, "ReconcileWithDeltas phase 1 progress", + ion.Int("collected", collected), + ion.Int("total", numAccounts), + ion.Int("failed_so_far", len(computeErrs))) + } + } + + if len(computeErrs) > 0 { + Log.Logger(namedlogger).Warn(ctx, "ReconcileWithDeltas: computation errors — aborting commit", + ion.Int("failed", len(computeErrs)), + ion.Int("succeeded", len(updates))) + return 0, failedAccounts, fmt.Errorf("delta computation failed for %d accounts, no DB changes made: %v", + len(computeErrs), computeErrs) + } + + // ---------------------------------------------------------------- + // Phase 1.5: Pre-create accounts missing from local DB. + // ---------------------------------------------------------------- + if remote != nil { + missingAddrs := make(map[string]bool) + missingIdx := make(map[string]int) + for updateIdx, update := range updates { + if update.IsNewAccount { + missingAddrs[update.Address] = true + missingIdx[update.Address] = updateIdx + } + } + if len(missingAddrs) > 0 { + Log.Logger(namedlogger).Debug(ctx, "ReconcileWithDeltas phase 1.5: fetching missing accounts from remote", + ion.Int("missing", len(missingAddrs))) + syncVars := r.GetSyncVars() + acctSync := accountsync.NewAccountSync().SetSyncVars(syncVars.Ctx, syncVars.Version, syncVars.NodeInfo, syncVars.Node, syncVars.WAL) + resp, err := acctSync.FetchAccounts(remote, missingAddrs) + if err != nil { + Log.Logger(namedlogger).Warn(ctx, "ReconcileWithDeltas phase 1.5: remote fetch failed", + ion.Err(err)) + } else if resp != nil { + created := 0 + for _, acc := range resp.GetAccounts() { + addrBytes := acc.GetAddress() + if len(addrBytes) == 0 { + continue + } + addr := strings.ToLower(common.BytesToAddress(addrBytes).Hex()) + idx, ok := missingIdx[addr] + if !ok { + continue + } + if err := accountManager.CreateAccount(addr, big.NewInt(0), acc.GetNonce()); err != nil { + Log.Logger(namedlogger).Warn(ctx, "ReconcileWithDeltas phase 1.5: pre-create failed", + ion.String("address", addr), ion.Err(err)) + continue + } + updates[idx].IsNewAccount = false + created++ + } + Log.Logger(namedlogger).Info(ctx, "ReconcileWithDeltas phase 1.5 complete", + ion.Int("requested", len(missingAddrs)), + ion.Int("created", created)) + } + } + } + + // ---------------------------------------------------------------- + // Phase 2: WAL batch write + // ---------------------------------------------------------------- + if r.SyncVars.WAL != nil { + walStart := time.Now() + entries := make([]WAL.ReconciliationBatchEntry, len(updates)) + for i, u := range updates { + entries[i] = WAL.ReconciliationBatchEntry{ + AccountAddress: u.Address, + NewBalance: u.NewBalance.String(), + Nonce: u.Nonce, + } + } + batchEvent := &WAL.ReconciliationBatchEvent{ + Accounts: entries, + Timestamp: time.Now().Unix(), + } + if _, err := r.SyncVars.WAL.WriteEvent(batchEvent); err != nil { + return 0, nil, fmt.Errorf("WAL batch write failed — aborting commit: %w", err) + } + if err := r.SyncVars.WAL.Flush(); err != nil { + return 0, nil, fmt.Errorf("WAL flush failed — aborting commit: %w", err) + } + Log.Logger(namedlogger).Info(ctx, "ReconcileWithDeltas phase 2 complete — WAL flushed", + ion.Int("accounts", len(updates)), + ion.String("duration", time.Since(walStart).String())) + } + + // ---------------------------------------------------------------- + // Phase 3: Atomic DB commit + // ---------------------------------------------------------------- + dbStart := time.Now() + if err := accountManager.BatchUpdateAccounts(updates); err != nil { + return 0, nil, fmt.Errorf("atomic DB commit failed — no accounts updated: %w", err) + } + Log.Logger(namedlogger).Info(ctx, "ReconcileWithDeltas phase 3 complete — committed", + ion.Int("accounts", len(updates)), + ion.String("duration", time.Since(dbStart).String())) + + if r.SyncVars.WAL != nil { + if _, err := r.SyncVars.WAL.CreateCheckpoint(); err != nil { + Log.Logger(namedlogger).Warn(ctx, "WAL checkpoint failed after ReconcileWithDeltas commit", ion.Err(err)) + } + } + + return len(updates), nil, nil +} + +// computeUpdateFromDelta reads the current account balance and applies the pre-computed +// delta to produce a ready-to-commit AccountUpdate. Read-only — no DB writes. +func (r *Reconciliation) computeUpdateFromDelta(accountManager types.AccountManager, addr string, delta *types.AccountDelta) (types.AccountUpdate, error) { + if !strings.HasPrefix(addr, "0x") { + addr = "0x" + addr + } + + currentBalance, _, err := accountManager.GetAccountBalance(addr) + if err != nil { + return types.AccountUpdate{}, fmt.Errorf("GetAccountBalance %s: %w", addr, err) + } + + isNew := currentBalance == nil + if currentBalance == nil { + currentBalance = big.NewInt(0) + } + + newBalance := new(big.Int).Add(currentBalance, delta.BalanceDelta) + if newBalance.Sign() < 0 { + newBalance = big.NewInt(0) + } + + return types.AccountUpdate{ + Address: addr, + NewBalance: newBalance, + Nonce: delta.Nonce, + TxNonce: delta.TxNonce, + TxCountSent: delta.TxCountSent, + IsNewAccount: isNew, + }, nil +} + // computeAccountUpdate reads all transactions for one account, replays them to get // the new balance/nonce, and returns a ready-to-commit AccountUpdate. // This is a read-only operation — it does not touch the DB. -func (r *Reconciliation) computeAccountUpdate(accountManager types.AccountManager, accountAddress string) (types.AccountUpdate, error) { +func (r *Reconciliation) computeAccountUpdate(accountManager types.AccountManager, accountAddress string, fromBlock, toBlock uint64) (types.AccountUpdate, error) { // Normalize address to 0x-prefixed lowercase so calculateAccountState comparisons // against tx.From.Hex() / tx.To.Hex() (which always carry the 0x prefix) are correct. if !strings.HasPrefix(accountAddress, "0x") { accountAddress = "0x" + accountAddress } - transactions, err := accountManager.GetTransactionsForAccount(accountAddress) + fetchStart := time.Now() + transactions, err := accountManager.GetTransactionsForAccountInRange(accountAddress, fromBlock, toBlock) + fetchDur := time.Since(fetchStart) if err != nil { + Log.Logger(namedlogger).Warn(r.SyncVars.Ctx, "GetTransactionsForAccountInRange failed", + ion.String("address", accountAddress), + ion.Uint64("from_block", fromBlock), + ion.Uint64("to_block", toBlock), + ion.String("duration", fetchDur.String()), + ion.Err(err)) return types.AccountUpdate{}, fmt.Errorf("failed to get transactions for account %s: %w", accountAddress, err) } + Log.Logger(namedlogger).Info(r.SyncVars.Ctx, "GetTransactionsForAccountInRange complete", + ion.String("address", accountAddress), + ion.Int("tx_count", len(transactions)), + ion.Uint64("from_block", fromBlock), + ion.Uint64("to_block", toBlock), + ion.String("duration", fetchDur.String())) state := r.calculateAccountState(accountAddress, transactions) @@ -354,7 +593,14 @@ func (r *Reconciliation) computeAccountUpdate(accountManager types.AccountManage currentBalance = big.NewInt(0) } - newBalance := state.ComputedBalance + // Delta-only reconciliation: + // currentBalance = whatever is in DB right now + // - New account: 0 (GetAccountBalance returns 0 for not-found) + // - First sync: bootstrap snapshot balance + // - Nth sync: result of previous reconciliation + // state.ComputedBalance = net effect of transactions in [fromBlock..toBlock] only + // newBalance = currentBalance + delta — correct for all cases and all sync runs. + newBalance := new(big.Int).Add(currentBalance, state.ComputedBalance) if newBalance.Sign() < 0 { newBalance = big.NewInt(0) // Prevent negative balances } diff --git a/core/sync/sync_protocols.go b/core/sync/sync_protocols.go index 1582ebd..7d3173c 100644 --- a/core/sync/sync_protocols.go +++ b/core/sync/sync_protocols.go @@ -2,13 +2,13 @@ package sync import ( "context" - + "fmt" "path/filepath" "os" "github.com/google/uuid" art "github.com/JupiterMetaLabs/JMDN_Merkletree/art" accountshelper "github.com/JupiterMetaLabs/JMDN-FastSync/core/protocol/router/helper/accounts" - + gosync "sync" "time" @@ -268,15 +268,66 @@ func (s *Sync) HandleHeaderSync(ctx context.Context, node host.Host) error { Version: s.nodeinfo.Version, } - // Route to Datarouter - resp := s.Datarouter.HandleHeaderSync(ctx, req, remoteNodeInfo) + // ── Start heartbeat goroutine ────────────────────────────────────── + // Fetching 1500 headers from DB can exceed the 15s stream deadline. + // Heartbeats reset the client's read deadline on every tick. + computeCtx, computeCancel := context.WithCancel(ctx) + defer computeCancel() + + done := make(chan struct{}) + var mu gosync.Mutex + + go func() { + ticker := time.NewTicker(constants.HeartbeatInterval) + defer ticker.Stop() + for { + select { + case <-done: + return + case <-computeCtx.Done(): + return + case <-ticker.C: + hb := &headerpb.HeaderSyncStreamMessage{ + Payload: &headerpb.HeaderSyncStreamMessage_Heartbeat{ + Heartbeat: &headerpb.HeaderSyncHeartbeat{ + Timestamp: time.Now().UnixNano(), + }, + }, + } + mu.Lock() + _ = str.SetWriteDeadline(time.Now().Add(constants.StreamDeadline)) + err := pbstream.WriteDelimited(str, hb) + mu.Unlock() + + if err != nil { + logging.Logger(logging.Sync).Warn(computeCtx, "headersync heartbeat write failed, cancelling computation", + ion.Err(err)) + computeCancel() + return + } + } + } + }() + + // ── Run the (potentially long) DB fetch ─────────────────────────── + resp := s.Datarouter.HandleHeaderSync(computeCtx, req, remoteNodeInfo) s.Debug(ctx, constants.HeaderSyncProtocol, node, remoteNodeInfo) - // Send response - _ = str.SetWriteDeadline(time.Now().Add(10 * time.Second)) - defer str.SetWriteDeadline(time.Time{}) + // ── Stop heartbeats and send final response ─────────────────────── + close(done) - _ = pbstream.WriteDelimited(str, resp) + final := &headerpb.HeaderSyncStreamMessage{ + Payload: &headerpb.HeaderSyncStreamMessage_Response{Response: resp}, + } + + mu.Lock() + _ = str.SetWriteDeadline(time.Now().Add(constants.StreamDeadline)) + err := pbstream.WriteDelimited(str, final) + if err != nil { + logging.Logger(logging.Sync).Warn(ctx, "failed to write final headersync response", + ion.Err(err)) + } + mu.Unlock() }) return nil } @@ -523,15 +574,70 @@ func (s *Sync) HandlePubsub(ctx context.Context, node host.Host) error { return nil } +// drainAndRejectAccountsSync gracefully terminates an AccountSync stream when +// the server cannot service the request (e.g. session ART creation failed). +// +// It drains every incoming AccountNonceSyncRequest, sending a BatchAck for each +// non-final chunk (so the client keeps uploading without blocking), then sends +// an error EndOfStream for the final chunk so the client receives a readable +// error instead of a stream reset. +// +// Without this drain, closing the stream while the client is still writing +// causes quic-go to send a STOP_SENDING frame that the client sees as +// "stream reset (remote): code: 0x1001". +func drainAndRejectAccountsSync(str network.Stream, errMsg string) { + for { + _ = str.SetReadDeadline(time.Now().Add(constants.StreamDeadline)) + req := &accountspb.AccountNonceSyncRequest{} + if err := pbstream.ReadDelimited(str, req); err != nil { + // Network-level failure — stream is already broken, nothing to send. + return + } + _ = str.SetWriteDeadline(time.Now().Add(constants.StreamDeadline)) + if req.IsLast { + // Send error EndOfStream so the client knows the reason. + end := &accountspb.AccountSyncServerMessage{ + Payload: &accountspb.AccountSyncServerMessage_End{ + End: &accountspb.AccountSyncEndOfStream{ + Ack: &ackpb.Ack{Ok: false, Error: errMsg}, + }, + }, + } + _ = pbstream.WriteDelimited(str, end) + return + } + // Client is waiting for a BatchAck before sending the next chunk. + // Send an ok ack to keep the upload flowing. + ack := &accountspb.AccountSyncServerMessage{ + Payload: &accountspb.AccountSyncServerMessage_BatchAck{ + BatchAck: &accountspb.AccountBatchAck{ + Ack: &ackpb.Ack{Ok: true}, + }, + }, + } + _ = pbstream.WriteDelimited(str, ack) + } +} + func (s *Sync) HandleAccountsSync(ctx context.Context, node host.Host) error { node.SetStreamHandler(constants.AccountsSyncProtocol, func(str network.Stream) { defer str.Close() - // Per-session ART: isolated to this connection, cleaned up when goroutine exits. + // Pre-create session directory — NewSwappable may not call MkdirAll internally. sessionDir := filepath.Join(os.TempDir(), constants.TEMP_ART_DIR, uuid.New().String()) + if mkErr := os.MkdirAll(sessionDir, 0755); mkErr != nil { + logging.Logger(logging.Sync).Warn(ctx, + fmt.Sprintf("accountssync: failed to create session dir %s: %v", sessionDir, mkErr)) + drainAndRejectAccountsSync(str, fmt.Sprintf("session dir setup failed: %v", mkErr)) + return + } + sessionSwappable, err := art.NewSwappable(sessionDir, art.DefaultThreshold) if err != nil { - logging.Logger(logging.Sync).Warn(ctx, "accountssync: failed to create session ART", ion.Err(err)) + logging.Logger(logging.Sync).Warn(ctx, + fmt.Sprintf("accountssync: failed to create session ART %s: %v", sessionDir, err)) + os.RemoveAll(sessionDir) + drainAndRejectAccountsSync(str, fmt.Sprintf("session ART setup failed: %v", err)) return } defer os.RemoveAll(sessionDir) diff --git a/tests/example/interfaces.go b/tests/example/interfaces.go index 8e11ad1..d8a8b0a 100644 --- a/tests/example/interfaces.go +++ b/tests/example/interfaces.go @@ -150,6 +150,10 @@ func (e example_accountmanager) GetTransactionsForAccount(accountAddress string) return nil, nil } +func (e example_accountmanager) GetTransactionsForAccountInRange(accountAddress string, fromBlock, toBlock uint64) ([]types.DBTransaction, error) { + return nil, nil +} + func (e example_accountmanager) CreateAccount(address string, balance *big.Int, nonce uint64) error { return nil }