diff --git a/.gitignore b/.gitignore index e3be5b64..9b11e888 100644 --- a/.gitignore +++ b/.gitignore @@ -53,3 +53,14 @@ internal/WAL/.tmp/* .code-review-graph/* .cursor/* test_results/ +docs/FASTSYNC_V3_MIGRATION_PLAN.md +eventlog.duckdb +Scripts/sign_tx.go +storage/thebe-kv/000017.vlog +storage/thebe-kv/000018.vlog +storage/thebe-kv/DISCARD +storage/thebe-kv/KEYREGISTRY +storage/thebe-kv/MANIFEST +storage/thebe-kv/outbox.db +dlq/DLQ.log +jmdn2 diff --git a/CLI/CLI.go b/CLI/CLI.go index 87e33bb1..37800a44 100644 --- a/CLI/CLI.go +++ b/CLI/CLI.go @@ -6,6 +6,7 @@ import ( "fmt" "log" "os" + "strconv" "strings" "time" @@ -107,8 +108,9 @@ func PrintFuncs() { fmt.Println(" mempoolStats - Show mempool statistics") fmt.Println(" stats - Show messaging statistics") fmt.Println(" broadcast - Broadcast a message to all connected peers") - fmt.Println(" fastsync - Fast sync blockchain data with a peer (V2 Engine)") - fmt.Println(" accountsync - Sync missing accounts only (skip block sync)") + fmt.Println(" fastsync - Fast sync blockchain data with a peer (V2 Engine)") + fmt.Println(" catchup [from_block] - Catch up to chain tip; from_block defaults to auto-detect (localTip+1)") + fmt.Println(" accountsync - Sync missing accounts only (skip block sync)") fmt.Println(" dbstate - Show current ImmuDB database state") fmt.Println(" propagateDID - Propagate a DID to the network") fmt.Println(" getDID - Get a DID document from the network") @@ -268,6 +270,8 @@ func (h *CommandHandler) handleCommand(parts []string) { h.handleBroadcast(parts) case "fastsync", "fastsyncv2", "firstsync": h.handleFastSync(parts) + case "catchup": + h.handleCatchUpSync(parts) case "accountsync": h.handleAccountSync(parts) case "propagateDID": @@ -632,6 +636,41 @@ func (h *CommandHandler) handleFastSync(parts []string) { printDashes() } +func (h *CommandHandler) handleCatchUpSync(parts []string) { + if len(parts) < 2 { + fmt.Println("Usage: catchup [from_block]") + fmt.Println(" peer_multiaddr full multiaddr with peer ID, e.g. /ip4/1.2.3.4/tcp/15000/p2p/12D3KooW...") + fmt.Println(" from_block optional; defaults to 0 (auto-detect from local DB tip)") + fmt.Println(" pass 1 to force a full scan from genesis") + return + } + if h.FastSyncerV2 == nil { + fmt.Println("Error: FastsyncV2 engine is not initialized") + return + } + + var fromBlock uint64 + if len(parts) >= 3 { + var err error + fromBlock, err = strconv.ParseUint(parts[2], 10, 64) + if err != nil { + fmt.Printf("Invalid from_block %q: %v\n", parts[2], err) + return + } + } // fromBlock=0 → auto-detect inside HandleCatchUpSync + + fmt.Printf("Starting catch-up sync (from_block=%d) with peer %s\n", fromBlock, parts[1]) + startTime := time.Now() + + if err := h.FastSyncerV2.HandleCatchUpSync(fromBlock, parts[1]); err != nil { + fmt.Printf("CatchUpSync failed: %v\n", err) + return + } + + fmt.Printf("CatchUpSync completed in %v\n", time.Since(startTime)) + printDashes() +} + func (h *CommandHandler) handleAccountSync(parts []string) { if len(parts) != 2 { fmt.Println("Usage: accountsync ") diff --git a/CLI/CLI_GRPC.go b/CLI/CLI_GRPC.go index efc2fa58..84f71f65 100644 --- a/CLI/CLI_GRPC.go +++ b/CLI/CLI_GRPC.go @@ -330,6 +330,38 @@ func (h *CommandHandler) HandleFastSyncV2(peeraddr string) (SyncStats, error) { }, nil } +func (h *CommandHandler) HandleCatchUpSync(peeraddr string, fromBlock uint64) (SyncStats, error) { + if peeraddr == "" { + return SyncStats{}, fmt.Errorf("usage: catchup [from_block]") + } + // fromBlock=0 → auto-detect inside HandleCatchUpSync + if !h.PullAllowed { + return SyncStats{}, fmt.Errorf("node is configured as a serve-only participant (pulling disabled). cannot pull data") + } + if h.FastSyncerV2 == nil { + return SyncStats{}, fmt.Errorf("FastsyncV2 engine is inactive") + } + + startTime := time.Now().UTC() + if err := h.FastSyncerV2.HandleCatchUpSync(fromBlock, peeraddr); err != nil { + return SyncStats{}, fmt.Errorf("CatchUpSync failed: %w", err) + } + + var newMainState, newAccountsState *schema.ImmutableState + if h.MainClient != nil { + newMainState, _ = DB_OPs.GetDatabaseState(h.MainClient.Client) + } + if h.DIDClient != nil { + newAccountsState, _ = DB_OPs.GetDatabaseState(h.DIDClient.Client) + } + + return SyncStats{ + TimeTaken: time.Since(startTime), + MainState: newMainState, + AccountsState: newAccountsState, + }, nil +} + func (h *CommandHandler) HandleAccountSync(peeraddr string) (SyncStats, error) { if peeraddr == "" { return SyncStats{}, fmt.Errorf("usage: accountsync ") diff --git a/CLI/GRPC_Server.go b/CLI/GRPC_Server.go index 1c849a9d..0c8e3ee2 100644 --- a/CLI/GRPC_Server.go +++ b/CLI/GRPC_Server.go @@ -256,9 +256,19 @@ func (s *CLIServer) AccountSync(ctx context.Context, req *pb.PeerRequest) (*pb.S func (s *CLIServer) FirstSync(ctx context.Context, req *pb.FirstSyncRequest) (*pb.SyncStats, error) { stats, err := s.handler.HandleFirstSync(req.Peer, req.Mode) if err != nil { - return &pb.SyncStats{ - Error: err.Error(), - }, nil + return &pb.SyncStats{Error: err.Error()}, nil + } + return &pb.SyncStats{ + TimeTaken: int64(stats.TimeTaken.Seconds()), + MainState: convertDBState(stats.MainState), + AccountsState: convertDBState(stats.AccountsState), + }, nil +} + +func (s *CLIServer) CatchUpSync(ctx context.Context, req *pb.CatchUpRequest) (*pb.SyncStats, error) { + stats, err := s.handler.HandleCatchUpSync(req.Peer, req.FromBlock) + if err != nil { + return &pb.SyncStats{Error: err.Error()}, nil } return &pb.SyncStats{ TimeTaken: int64(stats.TimeTaken.Seconds()), diff --git a/CLI/client.go b/CLI/client.go index a019353f..3c5d11fa 100644 --- a/CLI/client.go +++ b/CLI/client.go @@ -179,6 +179,15 @@ func (c *Client) FirstSync(peerAddr string, mode string) (*pb.SyncStats, error) }) } +// CatchUpSync reconciles blocks [fromBlock..remoteTip] without Merkle bisection. +func (c *Client) CatchUpSync(peerAddr string, fromBlock uint64) (*pb.SyncStats, error) { + ctx := context.Background() + return c.conn.CatchUpSync(ctx, &pb.CatchUpRequest{ + Peer: peerAddr, + FromBlock: fromBlock, + }) +} + // GetDatabaseState returns the current database state func (c *Client) GetDatabaseState() (*pb.DatabaseStates, error) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) diff --git a/CLI/proto/Connection.pb.go b/CLI/proto/Connection.pb.go index 39e3c52a..cae3c9bf 100644 --- a/CLI/proto/Connection.pb.go +++ b/CLI/proto/Connection.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 -// protoc v7.34.1 +// protoc v4.25.3 // source: Connection.proto package proto @@ -893,6 +893,58 @@ func (x *FirstSyncRequest) GetMode() string { return "" } +type CatchUpRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + Peer string `protobuf:"bytes,1,opt,name=peer,proto3" json:"peer,omitempty"` // full libp2p multiaddr with peer ID + FromBlock uint64 `protobuf:"varint,2,opt,name=from_block,json=fromBlock,proto3" json:"from_block,omitempty"` // first block NOT in local DB (bootstrap tip + 1) + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *CatchUpRequest) Reset() { + *x = CatchUpRequest{} + mi := &file_Connection_proto_msgTypes[14] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *CatchUpRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CatchUpRequest) ProtoMessage() {} + +func (x *CatchUpRequest) ProtoReflect() protoreflect.Message { + mi := &file_Connection_proto_msgTypes[14] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CatchUpRequest.ProtoReflect.Descriptor instead. +func (*CatchUpRequest) Descriptor() ([]byte, []int) { + return file_Connection_proto_rawDescGZIP(), []int{14} +} + +func (x *CatchUpRequest) GetPeer() string { + if x != nil { + return x.Peer + } + return "" +} + +func (x *CatchUpRequest) GetFromBlock() uint64 { + if x != nil { + return x.FromBlock + } + return 0 +} + type SyncInfo struct { state protoimpl.MessageState `protogen:"open.v1"` BatchSize int64 `protobuf:"varint,1,opt,name=batch_size,json=batchSize,proto3" json:"batch_size,omitempty"` @@ -904,7 +956,7 @@ type SyncInfo struct { func (x *SyncInfo) Reset() { *x = SyncInfo{} - mi := &file_Connection_proto_msgTypes[14] + mi := &file_Connection_proto_msgTypes[15] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -916,7 +968,7 @@ func (x *SyncInfo) String() string { func (*SyncInfo) ProtoMessage() {} func (x *SyncInfo) ProtoReflect() protoreflect.Message { - mi := &file_Connection_proto_msgTypes[14] + mi := &file_Connection_proto_msgTypes[15] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -929,7 +981,7 @@ func (x *SyncInfo) ProtoReflect() protoreflect.Message { // Deprecated: Use SyncInfo.ProtoReflect.Descriptor instead. func (*SyncInfo) Descriptor() ([]byte, []int) { - return file_Connection_proto_rawDescGZIP(), []int{14} + return file_Connection_proto_rawDescGZIP(), []int{15} } func (x *SyncInfo) GetBatchSize() int64 { @@ -964,7 +1016,7 @@ type GethStatus struct { func (x *GethStatus) Reset() { *x = GethStatus{} - mi := &file_Connection_proto_msgTypes[15] + mi := &file_Connection_proto_msgTypes[16] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -976,7 +1028,7 @@ func (x *GethStatus) String() string { func (*GethStatus) ProtoMessage() {} func (x *GethStatus) ProtoReflect() protoreflect.Message { - mi := &file_Connection_proto_msgTypes[15] + mi := &file_Connection_proto_msgTypes[16] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -989,7 +1041,7 @@ func (x *GethStatus) ProtoReflect() protoreflect.Message { // Deprecated: Use GethStatus.ProtoReflect.Descriptor instead. func (*GethStatus) Descriptor() ([]byte, []int) { - return file_Connection_proto_rawDescGZIP(), []int{15} + return file_Connection_proto_rawDescGZIP(), []int{16} } func (x *GethStatus) GetChainId() int32 { @@ -1022,7 +1074,7 @@ type AliasList struct { func (x *AliasList) Reset() { *x = AliasList{} - mi := &file_Connection_proto_msgTypes[16] + mi := &file_Connection_proto_msgTypes[17] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1034,7 +1086,7 @@ func (x *AliasList) String() string { func (*AliasList) ProtoMessage() {} func (x *AliasList) ProtoReflect() protoreflect.Message { - mi := &file_Connection_proto_msgTypes[16] + mi := &file_Connection_proto_msgTypes[17] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1047,7 +1099,7 @@ func (x *AliasList) ProtoReflect() protoreflect.Message { // Deprecated: Use AliasList.ProtoReflect.Descriptor instead. func (*AliasList) Descriptor() ([]byte, []int) { - return file_Connection_proto_rawDescGZIP(), []int{16} + return file_Connection_proto_rawDescGZIP(), []int{17} } func (x *AliasList) GetAliases() []string { @@ -1067,7 +1119,7 @@ type OperationResponse struct { func (x *OperationResponse) Reset() { *x = OperationResponse{} - mi := &file_Connection_proto_msgTypes[17] + mi := &file_Connection_proto_msgTypes[18] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1079,7 +1131,7 @@ func (x *OperationResponse) String() string { func (*OperationResponse) ProtoMessage() {} func (x *OperationResponse) ProtoReflect() protoreflect.Message { - mi := &file_Connection_proto_msgTypes[17] + mi := &file_Connection_proto_msgTypes[18] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1092,7 +1144,7 @@ func (x *OperationResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use OperationResponse.ProtoReflect.Descriptor instead. func (*OperationResponse) Descriptor() ([]byte, []int) { - return file_Connection_proto_rawDescGZIP(), []int{17} + return file_Connection_proto_rawDescGZIP(), []int{18} } func (x *OperationResponse) GetSuccess() bool { @@ -1119,7 +1171,7 @@ type CleanPeersResponse struct { func (x *CleanPeersResponse) Reset() { *x = CleanPeersResponse{} - mi := &file_Connection_proto_msgTypes[18] + mi := &file_Connection_proto_msgTypes[19] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1131,7 +1183,7 @@ func (x *CleanPeersResponse) String() string { func (*CleanPeersResponse) ProtoMessage() {} func (x *CleanPeersResponse) ProtoReflect() protoreflect.Message { - mi := &file_Connection_proto_msgTypes[18] + mi := &file_Connection_proto_msgTypes[19] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1144,7 +1196,7 @@ func (x *CleanPeersResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use CleanPeersResponse.ProtoReflect.Descriptor instead. func (*CleanPeersResponse) Descriptor() ([]byte, []int) { - return file_Connection_proto_rawDescGZIP(), []int{18} + return file_Connection_proto_rawDescGZIP(), []int{19} } func (x *CleanPeersResponse) GetCleanedCount() int32 { @@ -1171,7 +1223,7 @@ type DatabaseStates struct { func (x *DatabaseStates) Reset() { *x = DatabaseStates{} - mi := &file_Connection_proto_msgTypes[19] + mi := &file_Connection_proto_msgTypes[20] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1183,7 +1235,7 @@ func (x *DatabaseStates) String() string { func (*DatabaseStates) ProtoMessage() {} func (x *DatabaseStates) ProtoReflect() protoreflect.Message { - mi := &file_Connection_proto_msgTypes[19] + mi := &file_Connection_proto_msgTypes[20] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1196,7 +1248,7 @@ func (x *DatabaseStates) ProtoReflect() protoreflect.Message { // Deprecated: Use DatabaseStates.ProtoReflect.Descriptor instead. func (*DatabaseStates) Descriptor() ([]byte, []int) { - return file_Connection_proto_rawDescGZIP(), []int{19} + return file_Connection_proto_rawDescGZIP(), []int{20} } func (x *DatabaseStates) GetMainDb() *DatabaseState { @@ -1287,7 +1339,11 @@ const file_Connection_proto_rawDesc = "" + "\abalance\x18\x03 \x01(\tR\abalance\":\n" + "\x10FirstSyncRequest\x12\x12\n" + "\x04peer\x18\x01 \x01(\tR\x04peer\x12\x12\n" + - "\x04mode\x18\x02 \x01(\tR\x04mode\"\x8b\x01\n" + + "\x04mode\x18\x02 \x01(\tR\x04mode\"C\n" + + "\x0eCatchUpRequest\x12\x12\n" + + "\x04peer\x18\x01 \x01(\tR\x04peer\x12\x1d\n" + + "\n" + + "from_block\x18\x02 \x01(\x04R\tfromBlock\"\x8b\x01\n" + "\bSyncInfo\x12\x1d\n" + "\n" + "batch_size\x18\x01 \x01(\x03R\tbatchSize\x12.\n" + @@ -1310,7 +1366,7 @@ const file_Connection_proto_rawDesc = "" + "\x0eDatabaseStates\x12+\n" + "\amain_db\x18\x01 \x01(\v2\x12.cli.DatabaseStateR\x06mainDb\x123\n" + "\vaccounts_db\x18\x02 \x01(\v2\x12.cli.DatabaseStateR\n" + - "accountsDb2\x9e\n" + + "accountsDb2\xd4\n" + "\n" + "\n" + "CLIService\x124\n" + @@ -1331,7 +1387,8 @@ const file_Connection_proto_rawDesc = "" + "\n" + "FastSyncV2\x12\x10.cli.PeerRequest\x1a\x0e.cli.SyncStats\"\x00\x121\n" + "\vAccountSync\x12\x10.cli.PeerRequest\x1a\x0e.cli.SyncStats\"\x00\x124\n" + - "\tFirstSync\x12\x15.cli.FirstSyncRequest\x1a\x0e.cli.SyncStats\"\x00\x12A\n" + + "\tFirstSync\x12\x15.cli.FirstSyncRequest\x1a\x0e.cli.SyncStats\"\x00\x124\n" + + "\vCatchUpSync\x12\x13.cli.CatchUpRequest\x1a\x0e.cli.SyncStats\"\x00\x12A\n" + "\x10GetDatabaseState\x12\x16.google.protobuf.Empty\x1a\x13.cli.DatabaseStates\"\x00\x123\n" + "\vReturnAddrs\x12\x16.google.protobuf.Empty\x1a\n" + ".cli.Addrs\"\x00\x126\n" + @@ -1353,7 +1410,7 @@ func file_Connection_proto_rawDescGZIP() []byte { return file_Connection_proto_rawDescData } -var file_Connection_proto_msgTypes = make([]protoimpl.MessageInfo, 20) +var file_Connection_proto_msgTypes = make([]protoimpl.MessageInfo, 21) var file_Connection_proto_goTypes = []any{ (*Peer)(nil), // 0: cli.Peer (*PeerList)(nil), // 1: cli.PeerList @@ -1369,69 +1426,72 @@ var file_Connection_proto_goTypes = []any{ (*DIDRequest)(nil), // 11: cli.DIDRequest (*DIDPropagationRequest)(nil), // 12: cli.DIDPropagationRequest (*FirstSyncRequest)(nil), // 13: cli.FirstSyncRequest - (*SyncInfo)(nil), // 14: cli.SyncInfo - (*GethStatus)(nil), // 15: cli.GethStatus - (*AliasList)(nil), // 16: cli.AliasList - (*OperationResponse)(nil), // 17: cli.OperationResponse - (*CleanPeersResponse)(nil), // 18: cli.CleanPeersResponse - (*DatabaseStates)(nil), // 19: cli.DatabaseStates - (*timestamppb.Timestamp)(nil), // 20: google.protobuf.Timestamp - (*emptypb.Empty)(nil), // 21: google.protobuf.Empty + (*CatchUpRequest)(nil), // 14: cli.CatchUpRequest + (*SyncInfo)(nil), // 15: cli.SyncInfo + (*GethStatus)(nil), // 16: cli.GethStatus + (*AliasList)(nil), // 17: cli.AliasList + (*OperationResponse)(nil), // 18: cli.OperationResponse + (*CleanPeersResponse)(nil), // 19: cli.CleanPeersResponse + (*DatabaseStates)(nil), // 20: cli.DatabaseStates + (*timestamppb.Timestamp)(nil), // 21: google.protobuf.Timestamp + (*emptypb.Empty)(nil), // 22: google.protobuf.Empty } var file_Connection_proto_depIdxs = []int32{ 0, // 0: cli.PeerList.peers:type_name -> cli.Peer - 20, // 1: cli.DIDDocument.created_at:type_name -> google.protobuf.Timestamp - 20, // 2: cli.DIDDocument.updated_at:type_name -> google.protobuf.Timestamp + 21, // 1: cli.DIDDocument.created_at:type_name -> google.protobuf.Timestamp + 21, // 2: cli.DIDDocument.updated_at:type_name -> google.protobuf.Timestamp 3, // 3: cli.SyncStats.main_state:type_name -> cli.DatabaseState 3, // 4: cli.SyncStats.accounts_state:type_name -> cli.DatabaseState 3, // 5: cli.DatabaseStates.main_db:type_name -> cli.DatabaseState 3, // 6: cli.DatabaseStates.accounts_db:type_name -> cli.DatabaseState - 21, // 7: cli.CLIService.ListPeers:input_type -> google.protobuf.Empty + 22, // 7: cli.CLIService.ListPeers:input_type -> google.protobuf.Empty 8, // 8: cli.CLIService.AddPeer:input_type -> cli.PeerRequest 8, // 9: cli.CLIService.RemovePeer:input_type -> cli.PeerRequest - 21, // 10: cli.CLIService.CleanPeers:input_type -> google.protobuf.Empty + 22, // 10: cli.CLIService.CleanPeers:input_type -> google.protobuf.Empty 9, // 11: cli.CLIService.SendMessage:input_type -> cli.MessageRequest 9, // 12: cli.CLIService.SendYggdrasilMessage:input_type -> cli.MessageRequest 10, // 13: cli.CLIService.SendFile:input_type -> cli.FileRequest 9, // 14: cli.CLIService.BroadcastMessage:input_type -> cli.MessageRequest - 21, // 15: cli.CLIService.GetMessageStats:input_type -> google.protobuf.Empty + 22, // 15: cli.CLIService.GetMessageStats:input_type -> google.protobuf.Empty 11, // 16: cli.CLIService.GetDID:input_type -> cli.DIDRequest 12, // 17: cli.CLIService.PropagateDID:input_type -> cli.DIDPropagationRequest 8, // 18: cli.CLIService.FastSync:input_type -> cli.PeerRequest 8, // 19: cli.CLIService.FastSyncV2:input_type -> cli.PeerRequest 8, // 20: cli.CLIService.AccountSync:input_type -> cli.PeerRequest 13, // 21: cli.CLIService.FirstSync:input_type -> cli.FirstSyncRequest - 21, // 22: cli.CLIService.GetDatabaseState:input_type -> google.protobuf.Empty - 21, // 23: cli.CLIService.ReturnAddrs:input_type -> google.protobuf.Empty - 21, // 24: cli.CLIService.GetSyncInfo:input_type -> google.protobuf.Empty - 21, // 25: cli.CLIService.GetGethStatus:input_type -> google.protobuf.Empty - 21, // 26: cli.CLIService.DiscoverNeighbors:input_type -> google.protobuf.Empty - 21, // 27: cli.CLIService.ListAliases:input_type -> google.protobuf.Empty - 21, // 28: cli.CLIService.GetNodeVersion:input_type -> google.protobuf.Empty - 1, // 29: cli.CLIService.ListPeers:output_type -> cli.PeerList - 17, // 30: cli.CLIService.AddPeer:output_type -> cli.OperationResponse - 17, // 31: cli.CLIService.RemovePeer:output_type -> cli.OperationResponse - 18, // 32: cli.CLIService.CleanPeers:output_type -> cli.CleanPeersResponse - 17, // 33: cli.CLIService.SendMessage:output_type -> cli.OperationResponse - 17, // 34: cli.CLIService.SendYggdrasilMessage:output_type -> cli.OperationResponse - 17, // 35: cli.CLIService.SendFile:output_type -> cli.OperationResponse - 17, // 36: cli.CLIService.BroadcastMessage:output_type -> cli.OperationResponse - 2, // 37: cli.CLIService.GetMessageStats:output_type -> cli.MessageStats - 4, // 38: cli.CLIService.GetDID:output_type -> cli.DIDDocument - 17, // 39: cli.CLIService.PropagateDID:output_type -> cli.OperationResponse - 5, // 40: cli.CLIService.FastSync:output_type -> cli.SyncStats - 5, // 41: cli.CLIService.FastSyncV2:output_type -> cli.SyncStats - 5, // 42: cli.CLIService.AccountSync:output_type -> cli.SyncStats - 5, // 43: cli.CLIService.FirstSync:output_type -> cli.SyncStats - 19, // 44: cli.CLIService.GetDatabaseState:output_type -> cli.DatabaseStates - 6, // 45: cli.CLIService.ReturnAddrs:output_type -> cli.Addrs - 14, // 46: cli.CLIService.GetSyncInfo:output_type -> cli.SyncInfo - 15, // 47: cli.CLIService.GetGethStatus:output_type -> cli.GethStatus - 17, // 48: cli.CLIService.DiscoverNeighbors:output_type -> cli.OperationResponse - 16, // 49: cli.CLIService.ListAliases:output_type -> cli.AliasList - 7, // 50: cli.CLIService.GetNodeVersion:output_type -> cli.VersionInfo - 29, // [29:51] is the sub-list for method output_type - 7, // [7:29] is the sub-list for method input_type + 14, // 22: cli.CLIService.CatchUpSync:input_type -> cli.CatchUpRequest + 22, // 23: cli.CLIService.GetDatabaseState:input_type -> google.protobuf.Empty + 22, // 24: cli.CLIService.ReturnAddrs:input_type -> google.protobuf.Empty + 22, // 25: cli.CLIService.GetSyncInfo:input_type -> google.protobuf.Empty + 22, // 26: cli.CLIService.GetGethStatus:input_type -> google.protobuf.Empty + 22, // 27: cli.CLIService.DiscoverNeighbors:input_type -> google.protobuf.Empty + 22, // 28: cli.CLIService.ListAliases:input_type -> google.protobuf.Empty + 22, // 29: cli.CLIService.GetNodeVersion:input_type -> google.protobuf.Empty + 1, // 30: cli.CLIService.ListPeers:output_type -> cli.PeerList + 18, // 31: cli.CLIService.AddPeer:output_type -> cli.OperationResponse + 18, // 32: cli.CLIService.RemovePeer:output_type -> cli.OperationResponse + 19, // 33: cli.CLIService.CleanPeers:output_type -> cli.CleanPeersResponse + 18, // 34: cli.CLIService.SendMessage:output_type -> cli.OperationResponse + 18, // 35: cli.CLIService.SendYggdrasilMessage:output_type -> cli.OperationResponse + 18, // 36: cli.CLIService.SendFile:output_type -> cli.OperationResponse + 18, // 37: cli.CLIService.BroadcastMessage:output_type -> cli.OperationResponse + 2, // 38: cli.CLIService.GetMessageStats:output_type -> cli.MessageStats + 4, // 39: cli.CLIService.GetDID:output_type -> cli.DIDDocument + 18, // 40: cli.CLIService.PropagateDID:output_type -> cli.OperationResponse + 5, // 41: cli.CLIService.FastSync:output_type -> cli.SyncStats + 5, // 42: cli.CLIService.FastSyncV2:output_type -> cli.SyncStats + 5, // 43: cli.CLIService.AccountSync:output_type -> cli.SyncStats + 5, // 44: cli.CLIService.FirstSync:output_type -> cli.SyncStats + 5, // 45: cli.CLIService.CatchUpSync:output_type -> cli.SyncStats + 20, // 46: cli.CLIService.GetDatabaseState:output_type -> cli.DatabaseStates + 6, // 47: cli.CLIService.ReturnAddrs:output_type -> cli.Addrs + 15, // 48: cli.CLIService.GetSyncInfo:output_type -> cli.SyncInfo + 16, // 49: cli.CLIService.GetGethStatus:output_type -> cli.GethStatus + 18, // 50: cli.CLIService.DiscoverNeighbors:output_type -> cli.OperationResponse + 17, // 51: cli.CLIService.ListAliases:output_type -> cli.AliasList + 7, // 52: cli.CLIService.GetNodeVersion:output_type -> cli.VersionInfo + 30, // [30:53] is the sub-list for method output_type + 7, // [7:30] is the sub-list for method input_type 7, // [7:7] is the sub-list for extension type_name 7, // [7:7] is the sub-list for extension extendee 0, // [0:7] is the sub-list for field type_name @@ -1448,7 +1508,7 @@ func file_Connection_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_Connection_proto_rawDesc), len(file_Connection_proto_rawDesc)), NumEnums: 0, - NumMessages: 20, + NumMessages: 21, NumExtensions: 0, NumServices: 1, }, diff --git a/CLI/proto/Connection.proto b/CLI/proto/Connection.proto index 95ee21a0..c8908efe 100644 --- a/CLI/proto/Connection.proto +++ b/CLI/proto/Connection.proto @@ -94,6 +94,7 @@ service CLIService { rpc FastSyncV2(PeerRequest) returns (SyncStats) {} rpc AccountSync(PeerRequest) returns (SyncStats) {} rpc FirstSync(FirstSyncRequest) returns (SyncStats) {} + rpc CatchUpSync(CatchUpRequest) returns (SyncStats) {} rpc GetDatabaseState(google.protobuf.Empty) returns (DatabaseStates) {} // Node Operations @@ -140,6 +141,11 @@ message FirstSyncRequest { string mode = 2; // "server" or "client" } +message CatchUpRequest { + string peer = 1; // full libp2p multiaddr with peer ID + uint64 from_block = 2; // first block NOT in local DB (bootstrap tip + 1) +} + message SyncInfo { int64 batch_size = 1; int64 request_timeout_sec = 2; diff --git a/CLI/proto/Connection_grpc.pb.go b/CLI/proto/Connection_grpc.pb.go index 4c4d24b5..b086e325 100644 --- a/CLI/proto/Connection_grpc.pb.go +++ b/CLI/proto/Connection_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.6.2 -// - protoc v7.34.1 +// - protoc v4.25.3 // source: Connection.proto package proto @@ -35,6 +35,7 @@ const ( CLIService_FastSyncV2_FullMethodName = "/cli.CLIService/FastSyncV2" CLIService_AccountSync_FullMethodName = "/cli.CLIService/AccountSync" CLIService_FirstSync_FullMethodName = "/cli.CLIService/FirstSync" + CLIService_CatchUpSync_FullMethodName = "/cli.CLIService/CatchUpSync" CLIService_GetDatabaseState_FullMethodName = "/cli.CLIService/GetDatabaseState" CLIService_ReturnAddrs_FullMethodName = "/cli.CLIService/ReturnAddrs" CLIService_GetSyncInfo_FullMethodName = "/cli.CLIService/GetSyncInfo" @@ -69,6 +70,7 @@ type CLIServiceClient interface { FastSyncV2(ctx context.Context, in *PeerRequest, opts ...grpc.CallOption) (*SyncStats, error) AccountSync(ctx context.Context, in *PeerRequest, opts ...grpc.CallOption) (*SyncStats, error) FirstSync(ctx context.Context, in *FirstSyncRequest, opts ...grpc.CallOption) (*SyncStats, error) + CatchUpSync(ctx context.Context, in *CatchUpRequest, opts ...grpc.CallOption) (*SyncStats, error) GetDatabaseState(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*DatabaseStates, error) // Node Operations ReturnAddrs(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*Addrs, error) @@ -239,6 +241,16 @@ func (c *cLIServiceClient) FirstSync(ctx context.Context, in *FirstSyncRequest, return out, nil } +func (c *cLIServiceClient) CatchUpSync(ctx context.Context, in *CatchUpRequest, opts ...grpc.CallOption) (*SyncStats, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(SyncStats) + err := c.cc.Invoke(ctx, CLIService_CatchUpSync_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *cLIServiceClient) GetDatabaseState(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*DatabaseStates, error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(DatabaseStates) @@ -334,6 +346,7 @@ type CLIServiceServer interface { FastSyncV2(context.Context, *PeerRequest) (*SyncStats, error) AccountSync(context.Context, *PeerRequest) (*SyncStats, error) FirstSync(context.Context, *FirstSyncRequest) (*SyncStats, error) + CatchUpSync(context.Context, *CatchUpRequest) (*SyncStats, error) GetDatabaseState(context.Context, *emptypb.Empty) (*DatabaseStates, error) // Node Operations ReturnAddrs(context.Context, *emptypb.Empty) (*Addrs, error) @@ -399,6 +412,9 @@ func (UnimplementedCLIServiceServer) AccountSync(context.Context, *PeerRequest) func (UnimplementedCLIServiceServer) FirstSync(context.Context, *FirstSyncRequest) (*SyncStats, error) { return nil, status.Error(codes.Unimplemented, "method FirstSync not implemented") } +func (UnimplementedCLIServiceServer) CatchUpSync(context.Context, *CatchUpRequest) (*SyncStats, error) { + return nil, status.Error(codes.Unimplemented, "method CatchUpSync not implemented") +} func (UnimplementedCLIServiceServer) GetDatabaseState(context.Context, *emptypb.Empty) (*DatabaseStates, error) { return nil, status.Error(codes.Unimplemented, "method GetDatabaseState not implemented") } @@ -711,6 +727,24 @@ func _CLIService_FirstSync_Handler(srv interface{}, ctx context.Context, dec fun return interceptor(ctx, in, info, handler) } +func _CLIService_CatchUpSync_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(CatchUpRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(CLIServiceServer).CatchUpSync(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: CLIService_CatchUpSync_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(CLIServiceServer).CatchUpSync(ctx, req.(*CatchUpRequest)) + } + return interceptor(ctx, in, info, handler) +} + func _CLIService_GetDatabaseState_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(emptypb.Empty) if err := dec(in); err != nil { @@ -904,6 +938,10 @@ var CLIService_ServiceDesc = grpc.ServiceDesc{ MethodName: "FirstSync", Handler: _CLIService_FirstSync_Handler, }, + { + MethodName: "CatchUpSync", + Handler: _CLIService_CatchUpSync_Handler, + }, { MethodName: "GetDatabaseState", Handler: _CLIService_GetDatabaseState_Handler, diff --git a/DB_OPs/Nodeinfo/account_sync_worker.go b/DB_OPs/Nodeinfo/account_sync_worker.go index 84926a5b..c7444a1c 100644 --- a/DB_OPs/Nodeinfo/account_sync_worker.go +++ b/DB_OPs/Nodeinfo/account_sync_worker.go @@ -71,11 +71,13 @@ type dbEntry = struct { // surprises (math/big.Int marshals as a quoted decimal string, but that behaviour // is implementation-defined and not guaranteed across versions). // -// Stored in the stream as: {"address":"0x...","new_balance":"1000000","nonce":42} +// Stored in the stream as: {"address":"0x...","new_balance":"1000000","nonce":42,"tx_nonce":43,"tx_count_sent":5} type accountUpdateWire struct { - Address string `json:"address"` - NewBalance string `json:"new_balance"` // decimal string from big.Int.String() - Nonce uint64 `json:"nonce"` + Address string `json:"address"` + NewBalance string `json:"new_balance"` // decimal string from big.Int.String() + Nonce uint64 `json:"nonce"` + TxNonce uint64 `json:"tx_nonce"` + TxCountSent uint64 `json:"tx_count_sent"` } // ─── Configuration ──────────────────────────────────────────────────────────── @@ -462,6 +464,8 @@ func parseUpdatesPayload(dataStr string) ([]dbEntry, error) { Address: addr, Balance: balance.String(), Nonce: w.Nonce, + TxNonce: w.TxNonce, + TxCountSent: w.TxCountSent, AccountType: "user", UpdatedAt: time.Now().UTC().UnixNano(), } diff --git a/DB_OPs/Nodeinfo/immudb_account_manager.go b/DB_OPs/Nodeinfo/immudb_account_manager.go index 8774029c..31d00ced 100644 --- a/DB_OPs/Nodeinfo/immudb_account_manager.go +++ b/DB_OPs/Nodeinfo/immudb_account_manager.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "log" "math/big" "sort" "strings" @@ -85,7 +86,7 @@ func chunkCount(n int) int { // Time Complexity: O(N) where N is the total number of transactions scanned or retrieved func (am *account_manager) GetTransactionsForAccount(accountAddress string) ([]types.DBTransaction, error) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() conn, err := DB_OPs.GetMainDBConnectionandPutBack(ctx) @@ -106,6 +107,25 @@ func (am *account_manager) GetTransactionsForAccount(accountAddress string) ([]t return result, nil } +func (am *account_manager) GetTransactionsForAccountInRange(accountAddress string, fromBlock, toBlock uint64) ([]types.DBTransaction, error) { + conn, err := DB_OPs.GetMainDBConnectionandPutBack(context.Background()) + if err != nil { + return nil, fmt.Errorf("failed to get main DB connection: %w", err) + } + + addr := common.HexToAddress(accountAddress) + cfgTxs, err := DB_OPs.GetTransactionsByAccountInRange(conn, &addr, fromBlock, toBlock) + if err != nil { + return nil, fmt.Errorf("failed to get transactions in range [%d..%d]: %w", fromBlock, toBlock, err) + } + + result := make([]types.DBTransaction, 0, len(cfgTxs)) + for _, tx := range cfgTxs { + result = append(result, configTxToDBTx(tx)) + } + return result, nil +} + // Time Complexity: O(1) func (am *account_manager) GetAccountBalance(accountAddress string) (*big.Int, uint64, error) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) @@ -245,16 +265,79 @@ func (am *account_manager) WriteAccounts(accounts []*types.Account) error { } s, mgr := getAccountQueue() if s == nil { - return fmt.Errorf("WriteAccounts: account queue not initialized; call StartAccountSyncWorker before use") + // Redis not available — write directly to ImmuDB synchronously. + // Slower (~15 s/batch) but correct; no external dependency required. + log.Printf("[accountqueue] Redis not available — writing %d accounts directly to ImmuDB", len(accounts)) + return writeAccountsDirect(accounts) } - mgr.EnsureActive() - chunks := chunkCount(len(accounts)) ctx, cancel := context.WithTimeout(context.Background(), enqueueTimeout(chunks)) defer cancel() if err := enqueueRecordsChunked(ctx, s, payloadTypeAccounts, accounts); err != nil { - return fmt.Errorf("WriteAccounts: enqueue %d accounts in %d messages: %w", len(accounts), chunks, err) + // Redis is configured but unreachable (server down, connection refused, etc). + // Fall back to direct ImmuDB write rather than dropping the accounts entirely. + // Do NOT call EnsureActive — no point starting the worker if Redis is down. + log.Printf("[accountqueue] Redis enqueue failed (%v) — falling back to direct ImmuDB write for %d accounts", err, len(accounts)) + return writeAccountsDirect(accounts) } + // Enqueue succeeded — ensure the drain worker is running to process it. + mgr.EnsureActive() + return nil +} + +// writeAccountsDirect writes accounts synchronously to ImmuDB without going through Redis. +// Used when the Redis queue is not configured. Uses the same dbEntry/BatchRestoreAccounts +// path as the worker so the write is LWW-idempotent. +func writeAccountsDirect(accounts []*types.Account) error { + entries := make([]dbEntry, 0, len(accounts)*2) + for _, acc := range accounts { + if acc == nil { + continue + } + dbAcc := &DB_OPs.Account{ + DIDAddress: acc.DIDAddress, + Address: acc.Address, + Balance: acc.Balance, + Nonce: acc.Nonce, + TxNonce: acc.TxNonce, + TxCountSent: acc.TxCountSent, + AccountType: acc.AccountType, + CreatedAt: acc.CreatedAt, + UpdatedAt: acc.UpdatedAt, + Metadata: acc.Metadata, + } + val, err := json.Marshal(dbAcc) + if err != nil { + return fmt.Errorf("writeAccountsDirect: marshal %s: %w", acc.Address.Hex(), err) + } + entries = append(entries, dbEntry{Key: DB_OPs.Prefix + acc.Address.Hex(), Value: val}) + if acc.DIDAddress != "" { + entries = append(entries, dbEntry{Key: DB_OPs.DIDPrefix + acc.DIDAddress, Value: val}) + } + } + + const batchSize = 500 + // Generous timeout: 60 s base + 2 s per batch to cover ImmuDB commit latency. + timeout := 60*time.Second + time.Duration(len(entries)/batchSize+1)*2*time.Second + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + conn, err := DB_OPs.GetAccountsConnections(ctx) + if err != nil { + return fmt.Errorf("writeAccountsDirect: get connection: %w", err) + } + defer DB_OPs.PutAccountsConnection(conn) + + for i := 0; i < len(entries); i += batchSize { + end := i + batchSize + if end > len(entries) { + end = len(entries) + } + if err := DB_OPs.BatchRestoreAccounts(ctx, conn, entries[i:end]); err != nil { + return fmt.Errorf("writeAccountsDirect: batch [%d:%d]: %w", i, end, err) + } + } + log.Printf("[accountqueue] direct write complete: %d accounts written to ImmuDB", len(accounts)) return nil } @@ -396,17 +479,20 @@ func (am *account_manager) BatchUpdateAccounts(updates []types.AccountUpdate) er } s, mgr := getAccountQueue() if s == nil { - return fmt.Errorf("BatchUpdateAccounts: account queue not initialized; call StartAccountSyncWorker before use") + log.Printf("[accountqueue] BatchUpdateAccounts: queue not initialized — writing %d updates directly to ImmuDB", len(updates)) + return batchUpdateAccountsDirect(am, updates) } - mgr.EnsureActive() + // Convert to wire type for stable JSON serialization. // big.Int.String() produces a decimal string; accountUpdateWire makes the format explicit. wires := make([]accountUpdateWire, len(updates)) for i, u := range updates { wires[i] = accountUpdateWire{ - Address: u.Address, - NewBalance: u.NewBalance.String(), - Nonce: u.Nonce, + Address: u.Address, + NewBalance: u.NewBalance.String(), + Nonce: u.Nonce, + TxNonce: u.TxNonce, + TxCountSent: u.TxCountSent, } } @@ -414,7 +500,26 @@ func (am *account_manager) BatchUpdateAccounts(updates []types.AccountUpdate) er ctx, cancel := context.WithTimeout(context.Background(), enqueueTimeout(chunks)) defer cancel() if err := enqueueRecordsChunked(ctx, s, payloadTypeUpdates, wires); err != nil { - return fmt.Errorf("BatchUpdateAccounts: enqueue %d updates in %d messages: %w", len(updates), chunks, err) + log.Printf("[accountqueue] Redis enqueue failed (%v) — falling back to direct ImmuDB write for %d updates", err, len(updates)) + return batchUpdateAccountsDirect(am, updates) + } + mgr.EnsureActive() + return nil +} + +// batchUpdateAccountsDirect writes account balance updates synchronously to ImmuDB, +// bypassing Redis. Used when Redis is unavailable. +func batchUpdateAccountsDirect(am *account_manager, updates []types.AccountUpdate) error { + for _, u := range updates { + if u.IsNewAccount { + if err := am.CreateAccount(u.Address, u.NewBalance, u.Nonce); err != nil { + return fmt.Errorf("batchUpdateAccountsDirect: create %s: %w", u.Address, err) + } + } else { + if err := am.UpdateAccountBalance(u.Address, u.NewBalance, u.Nonce); err != nil { + return fmt.Errorf("batchUpdateAccountsDirect: update %s: %w", u.Address, err) + } + } } return nil } diff --git a/DB_OPs/Nodeinfo/immudb_data_writer.go b/DB_OPs/Nodeinfo/immudb_data_writer.go index d5a04b97..312a6481 100644 --- a/DB_OPs/Nodeinfo/immudb_data_writer.go +++ b/DB_OPs/Nodeinfo/immudb_data_writer.go @@ -36,6 +36,8 @@ func (dw *DataWriter) WriteData(data []*blockpb.NonHeaders) error { return err } + var highestWritten uint64 + for _, nh := range data { if nh == nil { continue @@ -140,12 +142,15 @@ func (dw *DataWriter) WriteData(data []*blockpb.NonHeaders) error { txs = append(txs, cfgTx) } - if len(txs) > 0 { - b.Transactions = txs - } + // Always overwrite Transactions from the DataSync response. + // The previous guard (if len(txs) > 0) was wrong: if the server sends + // transactions for this block, they must be written; if it sends none, + // the block genuinely has no transactions and we must clear any stale + // data left by PubSub/HeaderSync skeleton writes. + b.Transactions = txs if err := DB_OPs.StoreZKBlock(conn, b); err != nil { - // if err not nill, then force write or update + // if err not nil, then force write or update if strings.Contains(err.Error(), "already exists") { blockKey := fmt.Sprintf("%s%d", DB_OPs.PREFIX_BLOCK, b.BlockNumber) if err2 := DB_OPs.Update(blockKey, b); err2 != nil { @@ -157,14 +162,10 @@ func (dw *DataWriter) WriteData(data []*blockpb.NonHeaders) error { return fmt.Errorf("force update hash mapping failed: %w", err2) } - if err2 := DB_OPs.Update("latest_block", b.BlockNumber); err2 != nil { - return fmt.Errorf("force update latest block failed: %w", err2) - } - // Write tx: → blockNumber index for each transaction. // WriteHeaders stores blocks without transactions, so StoreZKBlock's tx // indexing loop runs 0 times there. This is the only place those index - // entries get written — required for GetTransactionByHash to work. + // entries get written for existing blocks — required for GetTransactionByHash. for _, tx := range b.Transactions { txKey := fmt.Sprintf("%s%s", DB_OPs.DEFAULT_PREFIX_TX, tx.Hash) if err2 := DB_OPs.Create(conn, txKey, b.BlockNumber); err2 != nil { @@ -177,6 +178,20 @@ func (dw *DataWriter) WriteData(data []*blockpb.NonHeaders) error { return err } } + + if b.BlockNumber > highestWritten { + highestWritten = b.BlockNumber + } + } + + // Update latest_block once to the highest block number written in this batch. + // Per-block updates (done inside the loop above) are non-deterministic when + // DataSync workers run concurrently — the last worker to finish may not hold + // the highest block. A single update at the end is authoritative. + if highestWritten > 0 { + if err2 := DB_OPs.Update("latest_block", highestWritten); err2 != nil { + return fmt.Errorf("update latest_block to %d failed: %w", highestWritten, err2) + } } return nil diff --git a/DB_OPs/account_immuclient.go b/DB_OPs/account_immuclient.go index 417f6013..2223af1e 100644 --- a/DB_OPs/account_immuclient.go +++ b/DB_OPs/account_immuclient.go @@ -1266,8 +1266,10 @@ func GetTransactionsByAccount(PooledConnection *config.PooledConnection, account var err error var shouldReturnConnection = false - // Define Function wide context for timeout - ctx, cancel := context.WithTimeout(context.Background(), 8*time.Second) + // Define Function wide context for timeout. + // The scan reads every block from 0..latestBlock via batch GetAll calls (~24 batches + // for 11605 blocks). 120s gives ample headroom even under ImmuDB load. + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) defer cancel() if PooledConnection == nil || PooledConnection.Client == nil { @@ -1318,40 +1320,38 @@ func GetTransactionsByAccount(PooledConnection *config.PooledConnection, account } var matchingTxs []*config.Transaction - batchSize := uint64(100) // Process 100 blocks at a time + // Use large batches so GetAll makes ~24 round-trips for 11605 blocks instead + // of 11605 individual reads. This cuts scan time from minutes to seconds. + const batchSize = uint64(500) - // Start from block 0 (genesis block) to include all blocks for startBlock := uint64(0); startBlock <= latestBlockNumber; startBlock += batchSize { + if ctx.Err() != nil { + return nil, ctx.Err() + } endBlock := startBlock + batchSize - 1 if endBlock > latestBlockNumber { endBlock = latestBlockNumber } - // Process current batch of blocks - for i := startBlock; i <= endBlock; i++ { - if ctx.Err() != nil { - return nil, ctx.Err() - } - block, err := ReadZKBlockByNumber(ctx, PooledConnection, i) - if err != nil { - loggerCtx, cancel := context.WithCancel(context.Background()) - ic.Logger.Warn(loggerCtx, "Error retrieving block, skipping", - ion.String("error", err.Error()), - ion.Uint64("block_number", i), - ion.String("database", config.AccountsDBName), - ion.String("created_at", time.Now().UTC().Format(time.RFC3339)), - ion.String("log_file", LOG_FILE), - ion.String("topic", TOPIC), - ion.String("function", "DB_OPs.GetTransactionsByAccount")) - cancel() - continue - } + blocks, err := GetBlocksRange(PooledConnection, startBlock, endBlock) + if err != nil { + loggerCtx, cancel := context.WithCancel(context.Background()) + ic.Logger.Warn(loggerCtx, "Error retrieving block batch, skipping", + ion.String("error", err.Error()), + ion.Uint64("start_block", startBlock), + ion.Uint64("end_block", endBlock), + ion.String("database", config.DBName), + ion.String("created_at", time.Now().UTC().Format(time.RFC3339)), + ion.String("log_file", LOG_FILE), + ion.String("topic", TOPIC), + ion.String("function", "DB_OPs.GetTransactionsByAccount")) + cancel() + continue + } - // Check each transaction in the current block + for _, block := range blocks { for _, tx := range block.Transactions { - // Check if the transaction involves the given account if isTransactionInvolvingAccount(tx, accountAddr) { - // Create a copy of the transaction to avoid referencing the loop variable txCopy := tx matchingTxs = append(matchingTxs, &txCopy) } @@ -1373,6 +1373,76 @@ func GetTransactionsByAccount(PooledConnection *config.PooledConnection, account return matchingTxs, nil } +// GetTransactionsByAccountInRange retrieves transactions for an account in [fromBlock, toBlock]. +// Pass math.MaxUint64 for toBlock to scan up to the latest block in the DB. +// Identical to GetTransactionsByAccount but scans a bounded block range instead of 0..latest, +// enabling delta-only reconciliation so each sync run replays only new transactions. +func GetTransactionsByAccountInRange(PooledConnection *config.PooledConnection, accountAddr *common.Address, fromBlock, toBlock uint64) ([]*config.Transaction, error) { + var err error + var shouldReturnConnection = false + + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + + if PooledConnection == nil || PooledConnection.Client == nil { + PooledConnection, err = GetMainDBConnectionandPutBack(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get main DB connection from pool: %w - GetTransactionsByAccountInRange", err) + } + shouldReturnConnection = true + } + if shouldReturnConnection { + defer PutMainDBConnection(PooledConnection) + } + + latestBlockNumber, err := GetLatestBlockNumber(ctx, PooledConnection) + if err != nil { + return nil, fmt.Errorf("failed to get latest block number: %w", err) + } + + if toBlock > latestBlockNumber { + toBlock = latestBlockNumber + } + if fromBlock > toBlock { + // Nothing to scan — no new blocks in range + return nil, nil + } + + var matchingTxs []*config.Transaction + const batchSize = uint64(500) + + for startBlock := fromBlock; startBlock <= toBlock; startBlock += batchSize { + if ctx.Err() != nil { + return nil, ctx.Err() + } + endBlock := startBlock + batchSize - 1 + if endBlock > toBlock { + endBlock = toBlock + } + + blocks, err := GetBlocksRange(PooledConnection, startBlock, endBlock) + if err != nil { + PooledConnection.Client.Logger.Warn(ctx, "Error retrieving block batch, skipping", + ion.String("error", err.Error()), + ion.Uint64("start_block", startBlock), + ion.Uint64("end_block", endBlock), + ion.String("function", "DB_OPs.GetTransactionsByAccountInRange")) + continue + } + + for _, block := range blocks { + for _, tx := range block.Transactions { + if isTransactionInvolvingAccount(tx, accountAddr) { + txCopy := tx + matchingTxs = append(matchingTxs, &txCopy) + } + } + } + } + + return matchingTxs, nil +} + // isTransactionInvolvingAccount checks if a transaction involves a specific account func isTransactionInvolvingAccount(tx config.Transaction, accountAddr *common.Address) bool { // Compare address values, not pointers diff --git a/FastsyncV2/catchup.go b/FastsyncV2/catchup.go new file mode 100644 index 00000000..53773bc3 --- /dev/null +++ b/FastsyncV2/catchup.go @@ -0,0 +1,599 @@ +package FastsyncV2 + +// HandleCatchUpSync syncs blocks [fromBlock..remoteTip] without Merkle bisection. +// +// Use this after a bootstrap snapshot has loaded blocks [0..X]: call +// HandleCatchUpSync(X+1, targetPeer) to reconcile the remaining blocks to the +// current chain tip. +// +// Unlike HandleSync / HandleStartupSync, this path skips PriorSync entirely. +// It builds the missing range directly from the availability response: +// +// Phase 1 — Availability → get auth token, discover remoteTip +// Phase 2 — HeaderSync → fetch headers [fromBlock..remoteTip] (no Merkle confirmation) +// Phase 3 — DataSync → fetch block bodies +// Phase 4 — AccountSync → sync zero-tx accounts not covered by DataSync +// Phase 5 — Reconciliation → replay txs, commit account balances +// Phase 6 — Re-auth → refresh expired token before PoTS +// Phase 7 — PoTS → fetch blocks produced while phases 2-5 ran +// +// targetPeer must be a libp2p multiaddr with an embedded peer ID, e.g.: +// +// /ip4/192.168.1.5/tcp/15000/p2p/12D3KooW... +import ( + "context" + "fmt" + "log" + "math" + "time" + + "gossipnode/DB_OPs" + + ackpb "github.com/JupiterMetaLabs/JMDN-FastSync/common/proto/ack" + availabilitypb "github.com/JupiterMetaLabs/JMDN-FastSync/common/proto/availability" + authpb "github.com/JupiterMetaLabs/JMDN-FastSync/common/proto/availability/auth" + datasyncpb "github.com/JupiterMetaLabs/JMDN-FastSync/common/proto/datasync" + headersyncpb "github.com/JupiterMetaLabs/JMDN-FastSync/common/proto/headersync" + phasepb "github.com/JupiterMetaLabs/JMDN-FastSync/common/proto/phase" + taggingpb "github.com/JupiterMetaLabs/JMDN-FastSync/common/proto/tagging" + "github.com/JupiterMetaLabs/JMDN-FastSync/common/types" + "github.com/JupiterMetaLabs/JMDN-FastSync/common/types/constants" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" +) + +// Note: tryRefreshAuth is defined below but currently unused (AUTH_TTL = 48h). +// Kept for reference — re-enable the commented blocks above if TTL is reduced. + +// HandleCatchUpSync is the public entry point. See package-level doc above. +// +// fromBlock is the first block AFTER the guaranteed-complete bootstrap range. +// It anchors the gap scan — buildMissingTag scans [fromBlock..remoteTip] and +// fetches only what is absent locally. +// +// Lifecycle: +// +// Stage 1 — bootstrap loads [0..X] (complete, no gaps) +// Stage 2 — HandleCatchUpSync(X+1, peer) → syncs [X+1..T1], no gaps expected +// Stage 3 — node offline, misses Y blocks; HandleCatchUpSync(X+1, peer) again +// → buildMissingTag finds any Stage-2 gaps + new [lastSynced+1..T2] +// +// fromBlock should always be bootstrapTip+1 (set in fastsync.catch_up_from_block +// config). Never use localTip+1: if Stage 2 was partial, localTip may be in the +// middle of a gap and the scan would skip missing blocks below it. +func (fs *FastsyncV2) HandleCatchUpSync(fromBlock uint64, targetPeer string) error { + catchUpStart := time.Now() + + // Use a generous timeout — catching up on days of blocks takes much longer + // than a normal incremental sync. Callers can wrap in their own deadline if needed. + ctx, cancel := context.WithTimeout(context.Background(), fs.syncTimeout) + defer cancel() + + // ── Parse and connect ───────────────────────────────────────────────── + maddr, err := multiaddr.NewMultiaddr(targetPeer) + if err != nil { + return fmt.Errorf("catchup: invalid multiaddr %q: %w", targetPeer, err) + } + info, err := peer.AddrInfoFromP2pAddr(maddr) + if err != nil { + return fmt.Errorf("catchup: extract peer info: %w", err) + } + if err := fs.Host.Connect(ctx, *info); err != nil { + return fmt.Errorf("catchup: connect to peer %s: %w", info.ID, err) + } + + peerAddrs := fs.Host.Peerstore().Addrs(info.ID) + if len(peerAddrs) == 0 { + peerAddrs = info.Addrs + } + targetNodeInfo := &types.Nodeinfo{ + PeerID: info.ID, + Multiaddr: peerAddrs, + Version: commsVersion, + } + + log.Printf("[CatchUpSync] starting from block %d → peer %s", fromBlock, info.ID) + + // ── Phase 1: Availability ───────────────────────────────────────────── + log.Printf("[CatchUpSync] phase 1: availability probe") + + availResp, err := fs.AvailRouter.SendAvailabilityRequest( + ctx, fs.PriorRouter.GetSyncVars(), *targetNodeInfo, fromBlock, math.MaxUint64, + ) + if err != nil { + return fmt.Errorf("catchup: availability: %w", err) + } + if !availResp.IsAvailable { + return fmt.Errorf("catchup: peer %s not available", info.ID) + } + if availResp.Auth == nil || availResp.Auth.UUID == "" { + return fmt.Errorf("catchup: peer %s returned no auth token", info.ID) + } + + // remoteTip is the peer's latest block number (BlockHeight field, added in add/catchup). + // Old peers (pre-add/catchup binary) leave this as 0 — fall back to our own local tip + // so we can at least close any gaps within our already-downloaded range. + // New blocks beyond our local tip will be picked up once the peer is updated. + remoteTip := availResp.BlockHeight + if remoteTip == 0 { + localTip := fs.blockInfoAdapter.GetBlockNumber() + if localTip == 0 { + return fmt.Errorf("catchup: peer %s returned block_height=0 and local tip is also 0 — peer needs the add/catchup binary update", info.ID) + } + log.Printf("[CatchUpSync] WARNING: peer %s returned block_height=0 (pre-add/catchup binary). "+ + "Falling back to local tip %d. Update the peer node to sync new blocks beyond local tip.", + info.ID, localTip) + remoteTip = localTip + } + if remoteTip < fromBlock { + log.Printf("[CatchUpSync] remoteTip %d < fromBlock %d — nothing to sync", remoteTip, fromBlock) + return nil + } + + log.Printf("[CatchUpSync] phase 1 complete: remoteTip=%d auth=%s", remoteTip, availResp.Auth.UUID) + + remotes := []*availabilitypb.AvailabilityResponse{availResp} + + // ── Build header-missing tag (sparse gap detection) ───────────────── + // Scan local DB for blocks already present in [fromBlock..remoteTip] and + // compute the complement — only header-missing blocks are fetched here. + // NOTE: PubSub announcements may have already written block headers without + // transaction data. Those blocks appear "present" to the iterator but lack + // NonHeaders data. Phase 3 (DataSync) always runs for the full range to fix + // this independently of whether Phase 2 (HeaderSync) found anything to do. + catchUpTag, err := fs.buildMissingTag(fromBlock, remoteTip) + if err != nil { + return fmt.Errorf("catchup: scan local blocks: %w", err) + } + + // ── Phase 2: HeaderSync ─────────────────────────────────────────────── + log.Printf("[CatchUpSync] phase 2: header sync [%d..%d]", fromBlock, remoteTip) + + if len(catchUpTag.Range) > 0 || len(catchUpTag.BlockNumber) > 0 { + p2Blocks := tagBlockCount(catchUpTag) + p2Batches := (p2Blocks + constants.MAX_HEADERS_PER_REQUEST - 1) / constants.MAX_HEADERS_PER_REQUEST + log.Printf("[CatchUpSync] phase 2: fetching %d headers across ~%d batches (%d blocks/batch)", + p2Blocks, p2Batches, constants.MAX_HEADERS_PER_REQUEST) + p2Start := time.Now() + stopP2 := watchProgress("[CatchUpSync] phase 2", "headers", p2Blocks, p2Batches, p2Start) + _, err = fs.HeaderRouter.HeaderSync( + &headersyncpb.HeaderSyncRequest{Tag: catchUpTag}, + remotes, + false, // syncConfirmation=false: skip Merkle, we know the exact range + ) + stopP2() + if err != nil { + return fmt.Errorf("catchup: header sync: %w", err) + } + log.Printf("[CatchUpSync] phase 2 complete: %d headers in %s", + p2Blocks, time.Since(p2Start).Round(time.Millisecond)) + } else { + log.Printf("[CatchUpSync] phase 2 skipped: all headers present in [%d..%d]", fromBlock, remoteTip) + } + + // ── Phase 3: DataSync ───────────────────────────────────────────────── + // Scan local blocks to find which ones are missing NonHeaders data. + // StarkProof is written ONLY by DataSync (immudb_data_writer.go) — absent or + // empty means the block needs DataSync regardless of whether HeaderSync ran. + // Blocks written only by PubSub/HeaderSync will have StarkProof==nil. + log.Printf("[CatchUpSync] phase 3: scanning for data-missing blocks [%d..%d]", fromBlock, remoteTip) + + dataMissingTag, err := fs.buildDataMissingTag(fromBlock, remoteTip) + if err != nil { + return fmt.Errorf("catchup: scan data-missing blocks: %w", err) + } + + var taggedAccounts *taggingpb.TaggedAccounts + if len(dataMissingTag.Range) == 0 && len(dataMissingTag.BlockNumber) == 0 { + log.Printf("[CatchUpSync] phase 3 skipped: all blocks in [%d..%d] already have data", fromBlock, remoteTip) + } else { + p3Blocks := tagBlockCount(dataMissingTag) + p3Batches := (p3Blocks + constants.MAX_DATA_PER_REQUEST - 1) / constants.MAX_DATA_PER_REQUEST + log.Printf("[CatchUpSync] phase 3: fetching %d blocks across ~%d batches (%d blocks/batch)", + p3Blocks, p3Batches, constants.MAX_DATA_PER_REQUEST) + p3Start := time.Now() + stopP3 := watchProgress("[CatchUpSync] phase 3", "blocks", p3Blocks, p3Batches, p3Start) + dataSyncReq := &datasyncpb.DataSyncRequest{ + Tag: dataMissingTag, + Version: uint32(commsVersion), + Ack: &ackpb.Ack{Ok: true}, + Phase: &phasepb.Phase{ + PresentPhase: constants.HEADER_SYNC_RESPONSE, + SuccessivePhase: constants.DATA_SYNC_REQUEST, + Success: true, + Auth: &authpb.Auth{UUID: availResp.Auth.UUID}, + }, + } + taggedAccounts, err = fs.DataRouter.DataSync(dataSyncReq, remotes) + stopP3() + if err != nil { + return fmt.Errorf("catchup: data sync: %w", err) + } + log.Printf("[CatchUpSync] phase 3 complete: %d blocks in %s", + p3Blocks, time.Since(p3Start).Round(time.Millisecond)) + } + + // Always scan local blocks [fromBlock..remoteTip] for tagged accounts and merge + // with DataSync's results. This serves two purposes: + // 1. When DataSync was skipped (data already present), this is the only source + // of tagged accounts for reconciliation. + // 2. When DataSync ran for SOME new blocks only, previously-failed reconciliation + // accounts from already-synced blocks are re-included here so they are retried. + // With 500-block batch reads this is ~3 DB round-trips for a 1200-block range. + localTagged := fs.collectTaggedAccountsFromBlocks(fromBlock, remoteTip) + if localTagged != nil { + if taggedAccounts == nil { + taggedAccounts = localTagged + log.Printf("[CatchUpSync] phase 3: %d accounts from local scan", len(taggedAccounts.Accounts)) + } else { + before := len(taggedAccounts.Accounts) + for addr := range localTagged.Accounts { + taggedAccounts.Accounts[addr] = true + } + log.Printf("[CatchUpSync] phase 3: merged local scan (+%d) → %d total accounts for reconciliation", + len(taggedAccounts.Accounts)-before, len(taggedAccounts.Accounts)) + } + } + + // ── Phase 3.5: FetchAccounts — pull tagged accounts missing locally ─── + if taggedAccounts != nil && len(taggedAccounts.Accounts) > 0 { + // AUTH_TTL is now 48h so no re-auth needed here. + // if refreshed, ok := fs.tryRefreshAuth(ctx, targetNodeInfo, fromBlock); ok { + // availResp = refreshed + // remotes = []*availabilitypb.AvailabilityResponse{availResp} + // } + + missingMap := make(map[string]bool) + accountMgr := fs.blockInfoAdapter.NewAccountManager() + for addr := range taggedAccounts.Accounts { + acc, err := accountMgr.GetAccountByAddress(addr) + if err == nil && acc == nil { + missingMap[addr] = true + } + } + if len(missingMap) > 0 { + log.Printf("[CatchUpSync] phase 3.5: fetching %d missing tagged accounts", len(missingMap)) + resp, err := fs.AccountSyncRouter.FetchAccounts(availResp, missingMap) + if err != nil { + log.Printf("[CatchUpSync] phase 3.5 warning: FetchAccounts failed: %v", err) + } else if resp != nil && len(resp.GetAccounts()) > 0 { + accounts := protoAccountsToTypes(resp.GetAccounts()) + if writeErr := accountMgr.WriteAccounts(accounts); writeErr != nil { + log.Printf("[CatchUpSync] phase 3.5 warning: WriteAccounts failed: %v", writeErr) + } else { + log.Printf("[CatchUpSync] phase 3.5 complete: wrote %d accounts", len(accounts)) + } + } + } + } + + // ── Phase 4: AccountSync ────────────────────────────────────────────── + // Syncs zero-tx accounts not covered by DataSync TaggedAccounts. + log.Printf("[CatchUpSync] phase 4: account sync") + + totalMissing, err := fs.AccountSyncRouter.AccountSync(availResp) + if err != nil { + log.Printf("[CatchUpSync] phase 4 warning: account sync failed: %v", err) + } else { + log.Printf("[CatchUpSync] phase 4 complete: %d accounts synced", totalMissing) + } + + // ── Phase 5: Reconciliation ─────────────────────────────────────────── + // Single-pass delta approach: one BlockIterator scan over [reconFrom..remoteTip] + // computes all account deltas in memory — no per-account DB scan needed. + log.Printf("[CatchUpSync] phase 5: reconciliation (delta approach)") + reconStart := time.Now() + + reconFrom, reconSkip := fs.effectiveReconRange(fromBlock, remoteTip) + if reconSkip { + log.Printf("[CatchUpSync] phase 5 skipped: range [%d..%d] already reconciled", fromBlock, remoteTip) + } else { + if reconFrom > fromBlock { + log.Printf("[CatchUpSync] phase 5: advancing fromBlock %d → %d (already reconciled)", fromBlock, reconFrom) + } + deltas := fs.computeAccountDeltas(reconFrom, remoteTip) + log.Printf("[CatchUpSync] phase 5: computed deltas for %d accounts", len(deltas)) + reconCount, failedAccounts, err := fs.ReconRouter.ReconcileWithDeltas(deltas, availResp) + if err != nil { + log.Printf("[CatchUpSync] phase 5 warning: %v", err) + } + log.Printf("[CatchUpSync] phase 5 complete: %d committed, %d failed, took %s", + reconCount, len(failedAccounts), time.Since(reconStart).Round(time.Millisecond)) + if err == nil { + fs.markReconComplete(remoteTip) + } + } + + // ── Phase 6: Re-auth before PoTS (disabled — AUTH_TTL is now 48h) ───── + // if refreshed, ok := fs.tryRefreshAuth(ctx, targetNodeInfo, 0); ok { + // availResp = refreshed + // remotes = []*availabilitypb.AvailabilityResponse{availResp} + // log.Printf("[CatchUpSync] phase 6: re-auth ok (UUID=%s)", availResp.Auth.UUID) + // } else { + // log.Printf("[CatchUpSync] phase 6: re-auth failed — proceeding with stale token") + // } + + // ── Phase 7: PoTS ───────────────────────────────────────────────────── + log.Printf("[CatchUpSync] phase 7: PoTS gap fill") + + if err := fs.executePoTS(ctx, targetNodeInfo, remotes, availResp); err != nil { + log.Printf("[CatchUpSync] phase 7 warning: PoTS failed: %v", err) + } else { + log.Printf("[CatchUpSync] phase 7 complete") + } + + // Always update latest_block regardless of which phases ran. + // This is critical when PubSub blocks were header-only before this run. + fs.reconcileLocalLatestBlock() + + // ── Phase 8: Post-sync verification ────────────────────────────────── + // Re-run buildDataMissingTag over the same range. If sync succeeded, the + // returned tag will be empty (all blocks pass blockNeedsDataSync check). + // Any non-empty ranges indicate blocks that are still data-incomplete. + log.Printf("[CatchUpSync] phase 8: verifying sync completeness [%d..%d]", fromBlock, remoteTip) + + verifyTag, verifyErr := fs.buildDataMissingTag(fromBlock, remoteTip) + if verifyErr != nil { + log.Printf("[CatchUpSync] phase 8 warning: verification scan failed: %v", verifyErr) + } else if len(verifyTag.Range) == 0 && len(verifyTag.BlockNumber) == 0 { + log.Printf("[CatchUpSync] phase 8: PASS — all blocks in [%d..%d] have data", fromBlock, remoteTip) + // Advance latest_block to remoteTip. This is the authoritative write: + // phases 2/3 may have been skipped (data already present), so WriteData + // never ran and the DB key was never updated on this run. + if updateErr := DB_OPs.Update("latest_block", remoteTip); updateErr != nil { + log.Printf("[CatchUpSync] phase 8 warning: failed to update latest_block to %d: %v", remoteTip, updateErr) + } else { + log.Printf("[CatchUpSync] phase 8: latest_block advanced to %d", remoteTip) + } + } else { + log.Printf("[CatchUpSync] phase 8: INCOMPLETE — %d range(s) still missing data:", len(verifyTag.Range)) + for _, r := range verifyTag.Range { + log.Printf("[CatchUpSync] missing data: blocks [%d..%d] (%d blocks)", + r.Start, r.End, r.End-r.Start+1) + } + for _, bn := range verifyTag.BlockNumber { + log.Printf("[CatchUpSync] missing data: block %d", bn) + } + } + + log.Printf("[CatchUpSync] done in %s", time.Since(catchUpStart).Round(time.Millisecond)) + return nil +} + +// buildMissingTag scans the local DB over [fromBlock..remoteTip] and returns a +// Tag containing only the ranges absent locally. +// +// Algorithm — O(n) time, O(batch) space: +// +// 1. Iterate local blocks in ascending order via BlockIterator. +// 2. Keep a "cursor" at the next expected block number, starting at fromBlock. +// 3. For each present block B: +// - If cursor < B → gap [cursor..B-1] is missing → emit RangeTag. +// - Advance cursor to B+1. +// 4. After iteration: if cursor ≤ remoteTip, emit the trailing gap. +// +// This produces the minimal set of contiguous ranges to request from the peer. +// Example: present={0,1,3,7,9,10}, fromBlock=0, remoteTip=10 +// +// → gaps: [2..2], [4..6], [8..8] +const catchUpBatchSize = 500 + +// blockNeedsDataSync returns true when a locally-present block is missing its +// NonHeaders data and must be (re-)fetched via DataSync. +// +// Two conditions trigger a re-fetch: +// 1. StarkProof is empty — DataSync has never written ZK proof data for this +// block. StarkProof is set ONLY by DataSync (immudb_data_writer.go:59). +// 2. GasUsed > 0 but Transactions is empty — the block consumed gas so it +// must have transactions, but none were stored. This catches blocks where a +// previous DataSync run set StarkProof but failed to persist transactions +// (e.g. due to the old "if len(txs) > 0" guard that has since been removed). +// +// Limitation: a block with GasUsed=0 and an empty ZK proof (legitimately no +// transactions and no proof) will be re-fetched on every run. This is safe +// (DataSync is idempotent) and extremely rare on a ZK L2 in practice. +func blockNeedsDataSync(blk *types.ZKBlock) bool { + if len(blk.StarkProof) == 0 { + return true + } + if blk.GasUsed > 0 && len(blk.Transactions) == 0 { + return true + } + return false +} + +// tagBlockCount returns the total number of blocks described by a Tag. +// Range entries contribute (end-start+1) each; individual BlockNumbers contribute 1 each. +func tagBlockCount(tag *taggingpb.Tag) uint64 { + if tag == nil { + return 0 + } + var n uint64 + for _, r := range tag.Range { + if r.End >= r.Start { + n += r.End - r.Start + 1 + } + } + n += uint64(len(tag.BlockNumber)) + return n +} + +// watchProgress starts a background goroutine that logs a "[phase] still running…" +// line every 10 s. Call the returned stop func to shut it down before logging completion. +func watchProgress(label, unit string, total, batches uint64, start time.Time) func() { + done := make(chan struct{}) + go func() { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + for { + select { + case <-done: + return + case <-ticker.C: + log.Printf("%s: still running — %d %s / ~%d batches (elapsed %s)", + label, total, unit, batches, time.Since(start).Round(time.Second)) + } + } + }() + return func() { close(done) } +} + +// buildDataMissingTag scans [fromBlock..remoteTip] and returns a Tag covering +// blocks that need DataSync (absent or data-incomplete per blockNeedsDataSync). +// Consecutive blocks needing DataSync are coalesced into a single RangeTag to +// minimise round-trips. + +func (fs *FastsyncV2) buildMissingTag(fromBlock, remoteTip uint64) (*taggingpb.Tag, error) { + iter := fs.blockInfoAdapter.NewBlockIterator(fromBlock, remoteTip, catchUpBatchSize) + defer iter.Close() + + var ranges []*taggingpb.RangeTag + cursor := fromBlock + + for { + batch, err := iter.Next() + if err != nil { + return nil, fmt.Errorf("block iterator: %w", err) + } + if len(batch) == 0 { + break // end of iteration + } + + for _, blk := range batch { + b := blk.BlockNumber + if b < cursor { + continue // already accounted for (shouldn't happen with sorted iterator) + } + if b > cursor { + // Gap: [cursor .. b-1] is missing + ranges = append(ranges, &taggingpb.RangeTag{Start: cursor, End: b - 1}) + } + cursor = b + 1 + } + } + + // Trailing gap: blocks after the last present one up to remoteTip + if cursor <= remoteTip { + ranges = append(ranges, &taggingpb.RangeTag{Start: cursor, End: remoteTip}) + } + + return &taggingpb.Tag{Range: ranges}, nil +} + +// collectTaggedAccountsFromBlocks scans local blocks [fromBlock..remoteTip] and +// returns a TaggedAccounts containing every unique sender and receiver address +// found in stored transactions. Used when DataSync is skipped (data already +// present) so that Phase 5 reconciliation still updates account balances. +func (fs *FastsyncV2) collectTaggedAccountsFromBlocks(fromBlock, remoteTip uint64) *taggingpb.TaggedAccounts { + iter := fs.blockInfoAdapter.NewBlockIterator(fromBlock, remoteTip, catchUpBatchSize) + defer iter.Close() + + accounts := make(map[string]bool) + for { + batch, err := iter.Next() + if err != nil || len(batch) == 0 { + break + } + for _, blk := range batch { + for _, tx := range blk.Transactions { + if tx.From != nil { + accounts[tx.From.Hex()] = true + } + if tx.To != nil { + accounts[tx.To.Hex()] = true + } + } + } + } + if len(accounts) == 0 { + return nil + } + return &taggingpb.TaggedAccounts{Accounts: accounts} +} + +func (fs *FastsyncV2) buildDataMissingTag(fromBlock, remoteTip uint64) (*taggingpb.Tag, error) { + iter := fs.blockInfoAdapter.NewBlockIterator(fromBlock, remoteTip, catchUpBatchSize) + defer iter.Close() + + var ranges []*taggingpb.RangeTag + cursor := fromBlock + runStart := uint64(0) + inRun := false + + // Diagnostic counters — logged at end to explain why blocks pass/fail. + var nAbsent, nNoProof, nGasNoTx, nComplete uint64 + + // Start a new run at b (or extend if already in one). + addToRun := func(b uint64) { + if !inRun { + runStart = b + inRun = true + } + } + // Close the active run, capping it at end. + endRunAt := func(end uint64) { + if inRun { + ranges = append(ranges, &taggingpb.RangeTag{Start: runStart, End: end}) + inRun = false + } + } + + for { + batch, err := iter.Next() + if err != nil { + return nil, fmt.Errorf("data-missing block iterator: %w", err) + } + if len(batch) == 0 { + // Remaining [cursor..remoteTip] are absent from the DB — include them. + if cursor <= remoteTip { + nAbsent += remoteTip - cursor + 1 + addToRun(cursor) + } + // Close any open run (covers both absent trailing blocks AND the case + // where the last DB block needed DataSync: cursor advanced past remoteTip + // but inRun is still true). endRunAt is a no-op when inRun==false. + endRunAt(remoteTip) + break + } + + for _, blk := range batch { + b := blk.BlockNumber + if b < cursor { + continue // shouldn't happen with a sorted iterator + } + + // Absent blocks [cursor..b-1]: they need DataSync — extend or start run. + if b > cursor { + nAbsent += b - cursor + addToRun(cursor) + // Run is now active through at least b-1. + // We decide below whether b also extends it or closes it. + } + + if len(blk.StarkProof) == 0 { + nNoProof++ + } else if blk.GasUsed > 0 && len(blk.Transactions) == 0 { + nGasNoTx++ + } else { + nComplete++ + } + + if blockNeedsDataSync(blk) { + // Block b is present but data-incomplete — keep the run going. + addToRun(b) + } else { + // Block b is complete — close any active run just before b. + if inRun { + endRunAt(b - 1) + } + } + + cursor = b + 1 + } + } + + log.Printf("[CatchUpSync] phase 3 scan: absent=%d noProof=%d gasNoTx=%d complete=%d", + nAbsent, nNoProof, nGasNoTx, nComplete) + + return &taggingpb.Tag{Range: ranges}, nil +} diff --git a/FastsyncV2/deltas.go b/FastsyncV2/deltas.go new file mode 100644 index 00000000..24533401 --- /dev/null +++ b/FastsyncV2/deltas.go @@ -0,0 +1,165 @@ +package FastsyncV2 + +// computeAccountDeltas performs a single forward pass over the locally stored blocks +// in [fromBlock..toBlock] and computes per-account balance/nonce deltas. +// +// This replaces the prior per-account GetTransactionsForAccountInRange scan: +// instead of O(accounts × blocks) DB queries, this is one O(blocks) iterator pass. +// +// Balance rules follow processBlockTransactions in messaging/BlockProcessing/Processing.go: +// +// Sender → deduct value + gasFee; advance Nonce and TxCountSent +// Receiver → credit value +// Coinbase → credit gasFee/2 + gasFee%2 (half + remainder) +// ZKVM → credit gasFee/2 +// +// Gas fee: +// +// EIP-1559 (type 2): effectiveGasPrice = MaxFee ?? MaxPriorityFee ?? GasPrice ?? 1e9 +// Legacy (type 0/1): effectiveGasPrice = GasPrice ?? MaxFee ?? MaxPriorityFee ?? 1e9 +// gasFee = gasLimit * effectiveGasPrice + +import ( + "math/big" + "strings" + + "github.com/JupiterMetaLabs/JMDN-FastSync/common/types" +) + +// computeAccountDeltas iterates all blocks in [fromBlock..toBlock] and returns a map +// of lowercase-hex-address → *types.AccountDelta. Accounts not touched in the range +// are absent from the map. +func (fs *FastsyncV2) computeAccountDeltas(fromBlock, toBlock uint64) map[string]*types.AccountDelta { + const batchSize = 500 + iter := fs.blockInfoAdapter.NewBlockIterator(fromBlock, toBlock, batchSize) + defer iter.Close() + + deltas := make(map[string]*types.AccountDelta) + + for { + batch, err := iter.Next() + if err != nil || len(batch) == 0 { + break + } + for _, blk := range batch { + applyBlockDeltas(blk, deltas) + } + } + + return deltas +} + +// applyBlockDeltas applies the transaction effects of one ZKBlock to the delta map. +func applyBlockDeltas(blk *types.ZKBlock, deltas map[string]*types.AccountDelta) { + var coinbaseAddr, zkvmAddr string + if blk.CoinbaseAddr != nil { + coinbaseAddr = strings.ToLower(blk.CoinbaseAddr.Hex()) + } + if blk.ZKVMAddr != nil { + zkvmAddr = strings.ToLower(blk.ZKVMAddr.Hex()) + } + + for i := range blk.Transactions { + tx := &blk.Transactions[i] + + var fromAddr, toAddr string + if tx.From != nil { + fromAddr = strings.ToLower(tx.From.Hex()) + } + if tx.To != nil { + toAddr = strings.ToLower(tx.To.Hex()) + } + + gasFee := computeGasFee(tx) + + halfGas := new(big.Int).Div(gasFee, big.NewInt(2)) + remainder := new(big.Int).Mod(gasFee, big.NewInt(2)) + coinbaseGas := new(big.Int).Add(halfGas, remainder) + zkvmGas := new(big.Int).Set(halfGas) + + // Sender: deduct value + gasFee; advance nonce; increment TxCountSent + if fromAddr != "" { + d := getDelta(deltas, fromAddr) + d.BalanceDelta.Sub(d.BalanceDelta, gasFee) + if tx.Value != nil && tx.Value.Sign() > 0 { + d.BalanceDelta.Sub(d.BalanceDelta, tx.Value) + } + if tx.Nonce > d.Nonce { + d.Nonce = tx.Nonce + d.TxNonce = tx.Nonce + 1 + } + d.TxCountSent++ + d.IsSender = true + } + + // Receiver: credit value only + if toAddr != "" && tx.Value != nil && tx.Value.Sign() > 0 { + d := getDelta(deltas, toAddr) + d.BalanceDelta.Add(d.BalanceDelta, tx.Value) + } + + // Coinbase: credit half + remainder of gasFee + if coinbaseAddr != "" { + d := getDelta(deltas, coinbaseAddr) + d.BalanceDelta.Add(d.BalanceDelta, coinbaseGas) + } + + // ZKVM: credit exact half of gasFee + if zkvmAddr != "" { + d := getDelta(deltas, zkvmAddr) + d.BalanceDelta.Add(d.BalanceDelta, zkvmGas) + } + } +} + +// getDelta returns the existing delta for addr, creating a zero entry if absent. +func getDelta(deltas map[string]*types.AccountDelta, addr string) *types.AccountDelta { + d, ok := deltas[addr] + if !ok { + d = &types.AccountDelta{BalanceDelta: big.NewInt(0)} + deltas[addr] = d + } + return d +} + +// computeGasFee returns gasLimit * effectiveGasPrice following Processing.go rules. +func computeGasFee(tx *types.Transaction) *big.Int { + if tx.GasLimit == 0 { + return big.NewInt(0) + } + gasLimit := new(big.Int).SetUint64(tx.GasLimit) + effectivePrice := effectiveGasPrice(tx) + return new(big.Int).Mul(gasLimit, effectivePrice) +} + +var oneGwei = big.NewInt(1_000_000_000) + +// effectiveGasPrice returns the effective gas price for a transaction. +// +// EIP-1559 (type 2): MaxFee → MaxPriorityFee → GasPrice → 1 Gwei +// Legacy (0/1): GasPrice → MaxFee → MaxPriorityFee → 1 Gwei +func effectiveGasPrice(tx *types.Transaction) *big.Int { + switch tx.Type { + case 2: // EIP-1559 + if tx.MaxFee != nil && tx.MaxFee.Sign() > 0 { + return tx.MaxFee + } + if tx.MaxPriorityFee != nil && tx.MaxPriorityFee.Sign() > 0 { + return tx.MaxPriorityFee + } + if tx.GasPrice != nil && tx.GasPrice.Sign() > 0 { + return tx.GasPrice + } + default: // Legacy / EIP-2930 + if tx.GasPrice != nil && tx.GasPrice.Sign() > 0 { + return tx.GasPrice + } + if tx.MaxFee != nil && tx.MaxFee.Sign() > 0 { + return tx.MaxFee + } + if tx.MaxPriorityFee != nil && tx.MaxPriorityFee.Sign() > 0 { + return tx.MaxPriorityFee + } + } + return oneGwei +} diff --git a/FastsyncV2/fastsyncv2.go b/FastsyncV2/fastsyncv2.go index 5158a9e5..1589bcfa 100644 --- a/FastsyncV2/fastsyncv2.go +++ b/FastsyncV2/fastsyncv2.go @@ -20,9 +20,11 @@ import ( "math" "os" "path/filepath" + "strconv" "time" NodeInfo "gossipnode/DB_OPs/Nodeinfo" + "gossipnode/DB_OPs/sqlops" "github.com/JupiterMetaLabs/JMDN-FastSync/common/WAL" accountspb "github.com/JupiterMetaLabs/JMDN-FastSync/common/proto/accounts" @@ -462,19 +464,34 @@ func (fs *FastsyncV2) handleSyncInternal(targetPeer string, startBlock uint64) e // ========================================================================= // PHASE 5: Reconciliation — recompute and commit account balances // ========================================================================= - // Three-phase atomic operation: - // 1. Concurrent balance computation (up to 15 goroutines replay transactions) - // 2. WAL batch write (single ReconciliationBatchEvent for crash recovery) - // 3. Atomic DB commit via AccountManager.BatchUpdateAccounts + // Single-pass approach: one BlockIterator scan computes all account deltas + // (O(blocks)), then applies them — no per-account DB scan. log.Println("[FastsyncV2] Phase 5: Reconciliation") - reconciledCount, failedAccounts, err := fs.ReconRouter.Reconcile(taggedAccounts, availResp) - if err != nil { - log.Printf("[FastsyncV2] Phase 5 warning: reconciliation returned error: %v", err) + remoteBlockNum := availResp.BlockHeight + if remoteBlockNum == 0 { + remoteBlockNum = math.MaxUint64 } - log.Printf("[FastsyncV2] Phase 5 complete: %d accounts reconciled, %d failed", reconciledCount, len(failedAccounts)) - if len(failedAccounts) > 0 { - log.Printf("[FastsyncV2] Failed accounts: %v", failedAccounts) + reconFrom, reconSkip := fs.effectiveReconRange(localBlockNum+1, remoteBlockNum) + if reconSkip { + log.Printf("[FastsyncV2] Phase 5 skipped: range [%d..%d] already reconciled", localBlockNum+1, remoteBlockNum) + } else { + if reconFrom > localBlockNum+1 { + log.Printf("[FastsyncV2] Phase 5: advancing fromBlock %d → %d (already reconciled)", localBlockNum+1, reconFrom) + } + deltas := fs.computeAccountDeltas(reconFrom, remoteBlockNum) + log.Printf("[FastsyncV2] Phase 5: computed deltas for %d accounts", len(deltas)) + reconciledCount, failedAccounts, err := fs.ReconRouter.ReconcileWithDeltas(deltas, availResp) + if err != nil { + log.Printf("[FastsyncV2] Phase 5 warning: reconciliation returned error: %v", err) + } + log.Printf("[FastsyncV2] Phase 5 complete: %d accounts reconciled, %d failed", reconciledCount, len(failedAccounts)) + if len(failedAccounts) > 0 { + log.Printf("[FastsyncV2] Failed accounts: %v", failedAccounts) + } + if err == nil { + fs.markReconComplete(remoteBlockNum) + } } // ========================================================================= @@ -570,12 +587,28 @@ func (fs *FastsyncV2) executePoTS( } // Secondary Reconciliation for accounts affected by PoTS blocks. + // PoTS blocks are produced after availResp.BlockHeight, so fromBlock = BlockHeight+1. if potsTaggedAccts != nil { - reconCount, failed, err := fs.ReconRouter.Reconcile(potsTaggedAccts, availResp) - if err != nil { - log.Printf("[FastsyncV2] PoTS reconciliation warning: %v", err) + potsFromBlock := availResp.BlockHeight + 1 + if availResp.BlockHeight == 0 { + potsFromBlock = 1 + } + potsReconFrom, potsReconSkip := fs.effectiveReconRange(potsFromBlock, math.MaxUint64) + if potsReconSkip { + log.Printf("[FastsyncV2] PoTS reconciliation skipped: already reconciled") + } else { + potsLatest := fs.blockInfoAdapter.GetBlockDetails().Blocknumber + potsDeltas := fs.computeAccountDeltas(potsReconFrom, potsLatest) + log.Printf("[FastsyncV2] PoTS: computed deltas for %d accounts", len(potsDeltas)) + reconCount, failed, err := fs.ReconRouter.ReconcileWithDeltas(potsDeltas, availResp) + if err != nil { + log.Printf("[FastsyncV2] PoTS reconciliation warning: %v", err) + } + log.Printf("[FastsyncV2] PoTS reconciled %d accounts, %d failed", reconCount, len(failed)) + if err == nil { + fs.markReconComplete(potsLatest) + } } - log.Printf("[FastsyncV2] PoTS reconciled %d accounts, %d failed", reconCount, len(failed)) } } } else { @@ -637,6 +670,50 @@ func (fs *FastsyncV2) dumpPoTSWALToDB(ctx context.Context) error { return nil } +// reconBlockKey is the SQLite key_value key used to persist the last successfully +// reconciled block number. Reading it before each Reconcile call prevents +// double-counting on re-runs that cover an already-reconciled range. +const reconBlockKey = "fastsync:last_reconciled_block" + +// effectiveReconRange returns the adjusted [from, to] range that hasn't been +// reconciled yet, plus a skip flag when the entire range is already done. +// +// Algorithm: +// +// lastBlock = SQLite key_value["fastsync:last_reconciled_block"] (0 if absent) +// effectiveFrom = max(fromBlock, lastBlock+1) +// skip = effectiveFrom > toBlock +func (fs *FastsyncV2) effectiveReconRange(fromBlock, toBlock uint64) (from uint64, skip bool) { + udb, err := sqlops.NewUnifiedDB() + if err != nil { + log.Printf("[FastsyncV2] recon anchor: open SQLite failed (%v) — using fromBlock=%d as-is", err, fromBlock) + return fromBlock, false + } + defer udb.Close() + + from = fromBlock + if raw, err := udb.GetKeyValue(reconBlockKey); err == nil && raw != "" { + if last, err := strconv.ParseUint(raw, 10, 64); err == nil && last+1 > fromBlock { + from = last + 1 + } + } + return from, from > toBlock +} + +// markReconComplete stores toBlock as the last successfully reconciled block. +func (fs *FastsyncV2) markReconComplete(toBlock uint64) { + udb, err := sqlops.NewUnifiedDB() + if err != nil { + log.Printf("[FastsyncV2] recon anchor: open SQLite failed (%v) — last_reconciled_block not persisted", err) + return + } + defer udb.Close() + + if err := udb.StoreKeyValue(reconBlockKey, strconv.FormatUint(toBlock, 10)); err != nil { + log.Printf("[FastsyncV2] recon anchor: store failed (%v) — last_reconciled_block not persisted", err) + } +} + // Close tears down all routers and flushes WALs. // Call this when the node shuts down. func (fs *FastsyncV2) Close() { diff --git a/config/ConnectionPool.go b/config/ConnectionPool.go index 4ba92bee..84f846d9 100644 --- a/config/ConnectionPool.go +++ b/config/ConnectionPool.go @@ -59,7 +59,7 @@ type PoolingConfig struct { func DefaultConnectionPoolConfig() *ConnectionPoolConfig { return &ConnectionPoolConfig{ MinConnections: 2, - MaxConnections: 20, + MaxConnections: 30, ConnectionTimeout: 30 * time.Second, IdleTimeout: 5 * time.Minute, MaxLifetime: 30 * time.Minute, diff --git a/config/settings/config.go b/config/settings/config.go index d22dcf52..06b4ba8e 100644 --- a/config/settings/config.go +++ b/config/settings/config.go @@ -156,6 +156,11 @@ type FastSyncSettings struct { // local DB. false = read-only participant (serves data, never updates itself). EnablePulling bool `mapstructure:"enable_pulling" yaml:"enable_pulling"` + // EnableCatchup controls whether the node automatically runs HandleCatchUpSync on + // startup using catch_up_peer and catch_up_from_block. Requires enable_pulling=true. + // Set false to disable automatic catchup while still allowing manual CLI catchup. + EnableCatchup bool `mapstructure:"enable_catchup" yaml:"enable_catchup"` + // PullOnStartup controls whether the node attempts to catch up on missed blocks // automatically when it (re)starts and connects to peers. PullOnStartup bool `mapstructure:"pull_on_startup" yaml:"pull_on_startup"` @@ -167,4 +172,21 @@ type FastSyncSettings struct { // AllowedPeers is an optional whitelist of libp2p peer IDs this node will // accept sync data FROM. Empty list = accept from any peer. AllowedPeers []string `mapstructure:"allowed_peers" yaml:"allowed_peers"` + + // CatchUpFromBlock is the first block AFTER the bootstrap snapshot + // (i.e. bootstrapTip + 1). Set this once after loading the bootstrap and + // never change it. Every catchup run — including after the node goes offline + // and comes back — scans from this block to remoteTip to find all gaps. + // + // 0 = full scan from block 1 (genesis). Use this if no bootstrap was loaded. + // N = scan from N; bootstrap guaranteed to cover [0..N-1] with no gaps. + // + // Do NOT set this to localTip+1: if a previous catchup was partial, + // localTip may be ahead of gaps that would be silently skipped. + CatchUpFromBlock uint64 `mapstructure:"catch_up_from_block" yaml:"catch_up_from_block"` + + // CatchUpPeer is the libp2p peer ID of the node to catch up from. + // Example: 12D3KooWAbCdEf... + // The node must be connected (in peerstore) at startup for addresses to resolve. + CatchUpPeer string `mapstructure:"catch_up_peer" yaml:"catch_up_peer"` } diff --git a/config/settings/defaults.go b/config/settings/defaults.go index 60ae4cac..747ce7b5 100644 --- a/config/settings/defaults.go +++ b/config/settings/defaults.go @@ -82,11 +82,14 @@ func DefaultConfig() NodeConfig { GROTrack: false, }, FastSync: FastSyncSettings{ - Enabled: true, - EnablePulling: true, - PullOnStartup: true, - SyncTimeout: 10 * time.Minute, - AllowedPeers: []string{}, + Enabled: true, + EnablePulling: true, + EnableCatchup: false, + PullOnStartup: true, + SyncTimeout: 10 * time.Minute, + AllowedPeers: []string{}, + CatchUpFromBlock: 0, + CatchUpPeer: "", }, Security: DefaultSecurityConfig(), Alerts: DefaultAlertsConfig(), diff --git a/config/settings/loader.go b/config/settings/loader.go index 3c60233a..017c4765 100644 --- a/config/settings/loader.go +++ b/config/settings/loader.go @@ -167,9 +167,12 @@ func setDefaults(v *viper.Viper) { // FastSync v.SetDefault("fastsync.enabled", d.FastSync.Enabled) v.SetDefault("fastsync.enable_pulling", d.FastSync.EnablePulling) + v.SetDefault("fastsync.enable_catchup", d.FastSync.EnableCatchup) v.SetDefault("fastsync.pull_on_startup", d.FastSync.PullOnStartup) v.SetDefault("fastsync.sync_timeout", d.FastSync.SyncTimeout) v.SetDefault("fastsync.allowed_peers", d.FastSync.AllowedPeers) + v.SetDefault("fastsync.catch_up_from_block", d.FastSync.CatchUpFromBlock) + v.SetDefault("fastsync.catch_up_peer", d.FastSync.CatchUpPeer) // Security v.SetDefault("security.enabled", d.Security.Enabled) diff --git a/go.mod b/go.mod index d30ace05..055357e7 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module gossipnode go 1.25.0 require ( - github.com/JupiterMetaLabs/JMDN-FastSync v0.0.0-20260623100612-88e18669b3f7 + github.com/JupiterMetaLabs/JMDN-FastSync v0.0.0-20260624070201-0e662a93ce62 github.com/JupiterMetaLabs/JMDN_Merkletree v0.0.0-20260413092720-b819e61566f8 github.com/JupiterMetaLabs/goroutine-orchestrator v0.1.5 github.com/JupiterMetaLabs/ion v0.4.2 diff --git a/go.sum b/go.sum index 4f204b47..673f4163 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/JupiterMetaLabs/JMDN-FastSync v0.0.0-20260623100612-88e18669b3f7 h1:PQloBFnhSs5YBpbvVNOyWMyV3aaM6x3XiLP1Lzl7ZVE= -github.com/JupiterMetaLabs/JMDN-FastSync v0.0.0-20260623100612-88e18669b3f7/go.mod h1:0erT7gGH4TYtitRik+Y3GfxSa5KGLacr9rJovV3vNB0= +github.com/JupiterMetaLabs/JMDN-FastSync v0.0.0-20260624070201-0e662a93ce62 h1:o2bXM609Fa3BlDfN0FRuo2MKyd15fOOPeeAERew5AzY= +github.com/JupiterMetaLabs/JMDN-FastSync v0.0.0-20260624070201-0e662a93ce62/go.mod h1:0erT7gGH4TYtitRik+Y3GfxSa5KGLacr9rJovV3vNB0= github.com/JupiterMetaLabs/JMDN_Merkletree v0.0.0-20260413092720-b819e61566f8 h1:yPrYb6g6NnqGsiCVqMf0zndEYTuelL3B03Fee+utLWA= github.com/JupiterMetaLabs/JMDN_Merkletree v0.0.0-20260413092720-b819e61566f8/go.mod h1:zM8F31G2SiPXzTo1WzbDFZ5iOOAkqrkuZjS0QVDW4ew= github.com/JupiterMetaLabs/goroutine-orchestrator v0.1.5 h1:S9+s6JeWSrGJ6ooYb4f8iRlJxwPUZ8X/EA4EgxKS3zc= diff --git a/main.go b/main.go index 433e69c6..cc906517 100644 --- a/main.go +++ b/main.go @@ -51,6 +51,7 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" _ "github.com/mattn/go-sqlite3" "github.com/redis/go-redis/v9" "github.com/rs/zerolog/log" @@ -260,8 +261,9 @@ func runCommand(command string, args []string, grpcPort int) { fmt.Println(" broadcast - Broadcast message") fmt.Println(" getdid - Get DID document") fmt.Println(" propagatedid [balance] - Propagate DID to network") - fmt.Println(" fastsync - Fast sync with peer (V2 Engine)") - fmt.Println(" accountsync - Sync missing accounts only (skip block sync)") + fmt.Println(" fastsync - Fast sync with peer (V2 Engine)") + fmt.Println(" catchup [from_block] - Catch up to chain tip; from_block defaults to auto-detect (localTip+1)") + fmt.Println(" accountsync - Sync missing accounts only (skip block sync)") fmt.Println("\nUsage: ./jmdn -cmd [args...]") fmt.Println("\nNote: Some interactive commands (mempoolStats, seednodeStats, etc.)") fmt.Println("are only available in interactive mode.") @@ -449,6 +451,35 @@ func runCommand(command string, args []string, grpcPort int) { fmt.Printf(" Accounts DB TxID: %d\n", stats.AccountsState.TxId) } + case "catchup": + if len(args) < 1 { + fmt.Println("Usage: jmdn -cmd catchup [from_block]") + fmt.Println(" from_block defaults to 0 (auto-detect from local DB tip)") + os.Exit(1) + } + var fromBlock uint64 + if len(args) >= 2 { + var err error + fromBlock, err = strconv.ParseUint(args[1], 10, 64) + if err != nil { + fmt.Printf("Invalid from_block %q: %v\n", args[1], err) + os.Exit(1) + } + } + fmt.Printf("Starting CatchUpSync (from_block=%d)...\n", fromBlock) + stats, err := client.CatchUpSync(args[0], fromBlock) + if err != nil { + fmt.Printf("Error: %v\n", err) + os.Exit(1) + } + if stats != nil && stats.Error != "" { + fmt.Printf("CatchUpSync failed: %s\n", stats.Error) + os.Exit(1) + } + if stats != nil { + fmt.Printf("CatchUpSync completed in %ds\n", stats.TimeTaken) + } + case "accountsync": if len(args) < 1 { fmt.Println("Usage: jmdn -cmd accountsync ") @@ -509,8 +540,9 @@ func runCommand(command string, args []string, grpcPort int) { fmt.Println(" sendfile - Send file") fmt.Println(" broadcast - Broadcast message") fmt.Println(" getdid - Get DID document") - fmt.Println(" fastsync - Fast sync with peer (V2 Engine)") - fmt.Println(" accountsync - Sync missing accounts only (skip block sync)") + fmt.Println(" fastsync - Fast sync with peer (V2 Engine)") + fmt.Println(" catchup [from_block] - Catch up to chain tip; from_block defaults to auto-detect (localTip+1)") + fmt.Println(" accountsync - Sync missing accounts only (skip block sync)") os.Exit(1) } } @@ -967,57 +999,51 @@ func main() { log.Info().Msg("[FastSync] disabled by config — protocol handlers not registered") } - // Startup sync: catch up on blocks missed while offline. - if fastSyncerV2 != nil && cfg.FastSync.EnablePulling && cfg.FastSync.PullOnStartup { + // CatchUp sync: post-bootstrap reconciliation from a known block to realtime. + // catch_up_peer is a plain peer ID (e.g. 12D3KooW...) — resolved from peerstore, + // same pattern as the old HandleStartupSync startup path. + if fastSyncerV2 != nil && cfg.FastSync.EnablePulling && cfg.FastSync.EnableCatchup && cfg.FastSync.CatchUpPeer != "" { + if cfg.FastSync.CatchUpFromBlock == 0 { + log.Warn().Msg("[CatchUpSync] catch_up_from_block not set — defaulting to 0 (full scan from genesis). Set to bootstrapTip+1 to limit scan range.") + } + catchUpPeerIDStr := cfg.FastSync.CatchUpPeer + fromBlock := cfg.FastSync.CatchUpFromBlock if err := goMaybeTracked(MainLM, GRO.MainAM, GRO.MainLM, GRO.StartupSyncThread, func(ctx context.Context) error { - // Wait for peer connections to establish after node startup - time.Sleep(5 * time.Second) - - peers := n.Host.Network().Peers() - if len(peers) == 0 { - // TODO: Query seed node for available sync peers when no direct peers are connected - log.Info().Msg("[StartupSync] No peers connected, skipping startup sync") - return nil + time.Sleep(5 * time.Second) // allow peer connections to establish + + // Resolve the configured peer ID to a libp2p peer.ID. + catchUpPeerID, err := peer.Decode(catchUpPeerIDStr) + if err != nil { + return fmt.Errorf("[CatchUpSync] invalid catch_up_peer %q: %w", catchUpPeerIDStr, err) } - log.Info().Int("peers", len(peers)).Msg("[StartupSync] Attempting startup sync with connected peers") - - for _, peerID := range peers { - // Honour allowed_peers whitelist if configured - if len(cfg.FastSync.AllowedPeers) > 0 { - allowed := false - for _, ap := range cfg.FastSync.AllowedPeers { - if ap == peerID.String() { - allowed = true - break - } - } - if !allowed { - log.Info().Str("peer", peerID.String()).Msg("[StartupSync] Skipping peer not in allowed_peers") - continue - } - } + // Get addresses from peerstore — same as HandleStartupSync. + addrs := n.Host.Peerstore().Addrs(catchUpPeerID) + if len(addrs) == 0 { + return fmt.Errorf("[CatchUpSync] peer %s not in peerstore — not connected yet", catchUpPeerIDStr) + } - addrs := n.Host.Peerstore().Addrs(peerID) - if len(addrs) == 0 { - continue - } + // Build full multiaddr with embedded peer ID, matching HandleStartupSync pattern. + targetMultiaddr := fmt.Sprintf("%s/p2p/%s", addrs[0].String(), catchUpPeerID.String()) - log.Info().Str("peer", peerID.String()).Msg("[StartupSync] Trying peer") - if err := fastSyncerV2.HandleStartupSync(peerID, addrs); err != nil { - log.Warn().Err(err).Str("peer", peerID.String()).Msg("[StartupSync] Failed, trying next peer") - continue - } + log.Info(). + Uint64("from_block", fromBlock). + Str("peer", catchUpPeerIDStr). + Str("addr", targetMultiaddr). + Msg("[CatchUpSync] starting") - log.Info().Str("peer", peerID.String()).Msg("[StartupSync] Sync completed successfully") - return nil + if err := fastSyncerV2.HandleCatchUpSync(fromBlock, targetMultiaddr); err != nil { + log.Error().Err(err).Msg("[CatchUpSync] failed") + return err } - - log.Warn().Msg("[StartupSync] Failed to sync with any connected peer") + log.Info().Msg("[CatchUpSync] completed successfully") return nil }); err != nil { - log.Error().Err(err).Str("thread", GRO.StartupSyncThread).Msg("Failed to start startup sync goroutine") + log.Error().Err(err).Str("thread", GRO.StartupSyncThread).Msg("Failed to start CatchUpSync goroutine") } + // StartupSync (HandleStartupSync) disabled — catchup is the only startup sync path. + // } else if fastSyncerV2 != nil && cfg.FastSync.EnablePulling && cfg.FastSync.PullOnStartup { + // ... } else if fastSyncerV2 != nil && !cfg.FastSync.EnablePulling { log.Info().Msg("[FastSync] Node configured with enable_pulling=false (serve-only participant); skipping StartupSync") }