Skip to content

Commit c4e761a

Browse files
author
NightCrawler
committed
supernode: add phase-2 self-healing deterministic runner scaffold
1 parent d44a78e commit c4e761a

7 files changed

Lines changed: 281 additions & 1 deletion

File tree

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ gen-supernode:
152152
--grpc-gateway_out=gen \
153153
--grpc-gateway_opt=paths=source_relative \
154154
--openapiv2_out=gen \
155-
proto/supernode/service.proto proto/supernode/status.proto proto/supernode/storage_challenge.proto
155+
proto/supernode/service.proto proto/supernode/status.proto proto/supernode/storage_challenge.proto proto/supernode/self_healing.proto
156156

157157
# Define the paths
158158
SUPERNODE_SRC=supernode/main.go

proto/supernode/self_healing.proto

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
syntax = "proto3";
2+
package supernode;
3+
option go_package = "github.com/LumeraProtocol/supernode/v2/gen/supernode";
4+
5+
// SelfHealingService exposes the minimal control-plane RPCs for deterministic
6+
// expected-owner healing challenges.
7+
service SelfHealingService {
8+
rpc RequestSelfHealing(RequestSelfHealingRequest) returns (RequestSelfHealingResponse) {}
9+
rpc VerifySelfHealing(VerifySelfHealingRequest) returns (VerifySelfHealingResponse) {}
10+
}
11+
12+
message RequestSelfHealingRequest {
13+
string challenge_id = 1;
14+
uint64 epoch_id = 2;
15+
16+
string file_key = 3;
17+
string challenger_id = 4;
18+
string recipient_id = 5;
19+
repeated string observer_ids = 6;
20+
}
21+
22+
message RequestSelfHealingResponse {
23+
string challenge_id = 1;
24+
uint64 epoch_id = 2;
25+
26+
string recipient_id = 3;
27+
bool accepted = 4;
28+
bool reconstruction_required = 5;
29+
string reconstructed_hash_hex = 6;
30+
string error = 7;
31+
}
32+
33+
message VerifySelfHealingRequest {
34+
string challenge_id = 1;
35+
uint64 epoch_id = 2;
36+
37+
string file_key = 3;
38+
string recipient_id = 4;
39+
string reconstructed_hash_hex = 5;
40+
string observer_id = 6;
41+
}
42+
43+
message VerifySelfHealingResponse {
44+
string challenge_id = 1;
45+
uint64 epoch_id = 2;
46+
string observer_id = 3;
47+
bool ok = 4;
48+
string error = 5;
49+
}

supernode/cmd/start.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
cascadeService "github.com/LumeraProtocol/supernode/v2/supernode/cascade"
2525
"github.com/LumeraProtocol/supernode/v2/supernode/config"
2626
hostReporterService "github.com/LumeraProtocol/supernode/v2/supernode/host_reporter"
27+
selfHealingService "github.com/LumeraProtocol/supernode/v2/supernode/self_healing"
2728
statusService "github.com/LumeraProtocol/supernode/v2/supernode/status"
2829
storageChallengeService "github.com/LumeraProtocol/supernode/v2/supernode/storage_challenge"
2930
// Legacy supernode metrics reporter (MsgReportSupernodeMetrics) has been superseded by
@@ -214,6 +215,22 @@ The supernode will connect to the Lumera network and begin participating in the
214215
}
215216
}
216217

218+
var selfHealingRunner *selfHealingService.Service
219+
if appConfig.SelfHealingConfig.Enabled {
220+
selfHealingRunner, err = selfHealingService.NewService(
221+
appConfig.SupernodeConfig.Identity,
222+
lumeraClient,
223+
p2pService,
224+
selfHealingService.Config{
225+
Enabled: true,
226+
PollInterval: time.Duration(appConfig.SelfHealingConfig.PollIntervalMs) * time.Millisecond,
227+
},
228+
)
229+
if err != nil {
230+
logtrace.Fatal(ctx, "Failed to initialize self-healing runner", logtrace.Fields{"error": err.Error()})
231+
}
232+
}
233+
217234
// Create supernode server
218235
supernodeServer := server.NewSupernodeServer(statusSvc)
219236

@@ -257,6 +274,9 @@ The supernode will connect to the Lumera network and begin participating in the
257274
if storageChallengeRunner != nil {
258275
services = append(services, storageChallengeRunner)
259276
}
277+
if selfHealingRunner != nil {
278+
services = append(services, selfHealingRunner)
279+
}
260280
servicesErr <- RunServices(ctx, services...)
261281
}()
262282

supernode/config/config.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,19 @@ type StorageChallengeConfig struct {
5353
SubmitEvidence bool `yaml:"submit_evidence,omitempty"`
5454
}
5555

56+
type SelfHealingConfig struct {
57+
Enabled bool `yaml:"enabled"`
58+
PollIntervalMs uint64 `yaml:"poll_interval_ms,omitempty"`
59+
}
60+
5661
type Config struct {
5762
SupernodeConfig `yaml:"supernode"`
5863
KeyringConfig `yaml:"keyring"`
5964
P2PConfig `yaml:"p2p"`
6065
LumeraClientConfig `yaml:"lumera"`
6166
RaptorQConfig `yaml:"raptorq"`
6267
StorageChallengeConfig `yaml:"storage_challenge"`
68+
SelfHealingConfig `yaml:"self_healing"`
6369

6470
// Store base directory (not from YAML)
6571
BaseDir string `yaml:"-"`
@@ -153,6 +159,9 @@ func LoadConfig(filename string, baseDir string) (*Config, error) {
153159
if config.StorageChallengeConfig.PollIntervalMs == 0 {
154160
config.StorageChallengeConfig.PollIntervalMs = DefaultStorageChallengePollIntervalMs
155161
}
162+
if config.SelfHealingConfig.PollIntervalMs == 0 {
163+
config.SelfHealingConfig.PollIntervalMs = DefaultSelfHealingPollIntervalMs
164+
}
156165

157166
// Create directories
158167
if err := config.EnsureDirs(); err != nil {

supernode/config/defaults.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,5 @@ const (
1313
DefaultChainID = "testing"
1414
DefaultRaptorQFilesDir = "raptorq_files"
1515
DefaultStorageChallengePollIntervalMs = 5000
16+
DefaultSelfHealingPollIntervalMs = 10000
1617
)

supernode/config/save.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,5 +62,9 @@ func CreateDefaultConfig(keyName, identity, chainID string, keyringBackend, keyr
6262
PollIntervalMs: DefaultStorageChallengePollIntervalMs,
6363
SubmitEvidence: true,
6464
},
65+
SelfHealingConfig: SelfHealingConfig{
66+
Enabled: false,
67+
PollIntervalMs: DefaultSelfHealingPollIntervalMs,
68+
},
6569
}
6670
}

supernode/self_healing/service.go

Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
package self_healing
2+
3+
import (
4+
"context"
5+
"encoding/hex"
6+
"fmt"
7+
"sort"
8+
"strconv"
9+
"strings"
10+
"time"
11+
12+
audittypes "github.com/LumeraProtocol/lumera/x/audit/v1/types"
13+
"github.com/LumeraProtocol/supernode/v2/p2p"
14+
"github.com/LumeraProtocol/supernode/v2/pkg/logtrace"
15+
"github.com/LumeraProtocol/supernode/v2/pkg/lumera"
16+
"github.com/LumeraProtocol/supernode/v2/pkg/storagechallenge/deterministic"
17+
"lukechampine.com/blake3"
18+
)
19+
20+
const (
21+
defaultPollInterval = 10 * time.Second
22+
filesPerChallenger = uint32(2)
23+
recipientReplicaCount = uint32(5)
24+
)
25+
26+
type Config struct {
27+
Enabled bool
28+
PollInterval time.Duration
29+
}
30+
31+
type Service struct {
32+
cfg Config
33+
identity string
34+
lumera lumera.Client
35+
p2p p2p.Client
36+
}
37+
38+
func NewService(identity string, lumeraClient lumera.Client, p2pClient p2p.Client, cfg Config) (*Service, error) {
39+
identity = strings.TrimSpace(identity)
40+
if identity == "" {
41+
return nil, fmt.Errorf("identity is empty")
42+
}
43+
if lumeraClient == nil || lumeraClient.Node() == nil || lumeraClient.Audit() == nil {
44+
return nil, fmt.Errorf("lumera client is missing required modules")
45+
}
46+
if p2pClient == nil {
47+
return nil, fmt.Errorf("p2p client is nil")
48+
}
49+
if cfg.PollInterval <= 0 {
50+
cfg.PollInterval = defaultPollInterval
51+
}
52+
53+
return &Service{cfg: cfg, identity: identity, lumera: lumeraClient, p2p: p2pClient}, nil
54+
}
55+
56+
func (s *Service) Run(ctx context.Context) error {
57+
if !s.cfg.Enabled {
58+
<-ctx.Done()
59+
return nil
60+
}
61+
62+
ticker := time.NewTicker(s.cfg.PollInterval)
63+
defer ticker.Stop()
64+
65+
var lastRunEpoch uint64
66+
var lastRunOK bool
67+
68+
for {
69+
select {
70+
case <-ctx.Done():
71+
return ctx.Err()
72+
case <-ticker.C:
73+
height, ok := s.latestHeight(ctx)
74+
if !ok {
75+
continue
76+
}
77+
params, ok := s.auditParams(ctx)
78+
if !ok {
79+
continue
80+
}
81+
epochID, ok := deterministic.EpochID(height, params.EpochZeroHeight, params.EpochLengthBlocks)
82+
if !ok {
83+
continue
84+
}
85+
if lastRunOK && lastRunEpoch == epochID {
86+
continue
87+
}
88+
89+
anchorResp, err := s.lumera.Audit().GetEpochAnchor(ctx, epochID)
90+
if err != nil || anchorResp == nil || anchorResp.Anchor.EpochId != epochID {
91+
continue
92+
}
93+
94+
anchor := anchorResp.Anchor
95+
challengers := deterministic.SelectChallengers(anchor.ActiveSupernodeAccounts, anchor.Seed, epochID, params.ScChallengersPerEpoch)
96+
if !contains(challengers, s.identity) {
97+
lastRunEpoch = epochID
98+
lastRunOK = true
99+
continue
100+
}
101+
102+
if err := s.runEpoch(ctx, anchor); err != nil {
103+
logtrace.Warn(ctx, "self-healing epoch run error", logtrace.Fields{"epoch_id": epochID, "error": err.Error()})
104+
lastRunEpoch = epochID
105+
lastRunOK = false
106+
continue
107+
}
108+
lastRunEpoch = epochID
109+
lastRunOK = true
110+
}
111+
}
112+
}
113+
114+
func (s *Service) latestHeight(ctx context.Context) (int64, bool) {
115+
resp, err := s.lumera.Node().GetLatestBlock(ctx)
116+
if err != nil || resp == nil {
117+
return 0, false
118+
}
119+
if sdkBlk := resp.GetSdkBlock(); sdkBlk != nil {
120+
return sdkBlk.Header.Height, true
121+
}
122+
if blk := resp.GetBlock(); blk != nil {
123+
return blk.Header.Height, true
124+
}
125+
return 0, false
126+
}
127+
128+
func (s *Service) auditParams(ctx context.Context) (audittypes.Params, bool) {
129+
resp, err := s.lumera.Audit().GetParams(ctx)
130+
if err != nil || resp == nil {
131+
return audittypes.Params{}, false
132+
}
133+
p := resp.Params.WithDefaults()
134+
if err := p.Validate(); err != nil {
135+
return audittypes.Params{}, false
136+
}
137+
return p, true
138+
}
139+
140+
func (s *Service) runEpoch(ctx context.Context, anchor audittypes.EpochAnchor) error {
141+
to := time.Now().UTC()
142+
from := to.Add(-24 * time.Hour)
143+
144+
keys, err := s.p2p.GetLocalKeys(ctx, &from, to)
145+
if err != nil {
146+
return fmt.Errorf("get local keys: %w", err)
147+
}
148+
if len(keys) == 0 {
149+
return nil
150+
}
151+
sort.Strings(keys)
152+
153+
fileKeys := deterministic.SelectFileKeys(keys, anchor.Seed, anchor.EpochId, s.identity, filesPerChallenger)
154+
for _, fileKey := range fileKeys {
155+
replicas, err := deterministic.SelectReplicaSet(anchor.TargetSupernodeAccounts, fileKey, recipientReplicaCount)
156+
if err != nil {
157+
continue
158+
}
159+
recipient := selectRecipient(replicas, s.identity)
160+
if recipient == "" {
161+
continue
162+
}
163+
challengeID := deriveChallengeID(anchor.Seed, anchor.EpochId, fileKey, s.identity, recipient)
164+
logtrace.Info(ctx, "self-healing challenge planned", logtrace.Fields{
165+
"epoch_id": anchor.EpochId,
166+
"challenge_id": challengeID,
167+
"file_key": fileKey,
168+
"challenger_id": s.identity,
169+
"recipient_id": recipient,
170+
})
171+
}
172+
return nil
173+
}
174+
175+
func deriveChallengeID(seed []byte, epochID uint64, fileKey, challenger, recipient string) string {
176+
msg := []byte("sh:challenge:" + hex.EncodeToString(seed) + ":" + strconv.FormatUint(epochID, 10) + ":" + fileKey + ":" + challenger + ":" + recipient)
177+
sum := blake3.Sum256(msg)
178+
return hex.EncodeToString(sum[:])
179+
}
180+
181+
func selectRecipient(replicas []string, self string) string {
182+
for _, id := range replicas {
183+
if id != self {
184+
return id
185+
}
186+
}
187+
return ""
188+
}
189+
190+
func contains(items []string, target string) bool {
191+
for _, it := range items {
192+
if it == target {
193+
return true
194+
}
195+
}
196+
return false
197+
}

0 commit comments

Comments
 (0)