Skip to content
Open
2 changes: 1 addition & 1 deletion cmd/es-node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func NewConfig(ctx *cli.Context, lg log.Logger) (*node.Config, error) {

// l2SyncEndpoint := NewL2SyncEndpointConfig(ctx)

scannerConfig, err := scanner.NewConfig(ctx)
scannerConfig, err := scanner.NewConfig(ctx, l1Endpoint.L1BeaconSlotTime)
if err != nil {
return nil, fmt.Errorf("failed to create scanner config: %w", err)
}
Expand Down
12 changes: 12 additions & 0 deletions ethstorage/eth/polling_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,18 @@ func (w *PollingClient) GetMiningReward(shard uint64, timestamp uint64) (*big.In
return new(big.Int).SetBytes(bs), nil
}

func (w *PollingClient) GetUpdatedKvIndices(startBlock, endBlock *big.Int) ([]uint64, error) {
events, err := w.FilterLogsByBlockRange(startBlock, endBlock, PutBlobEvent)
if err != nil {
return nil, err
}
var kvIndices []uint64
for _, event := range events {
kvIndices = append(kvIndices, new(big.Int).SetBytes(event.Topics[1][:]).Uint64())
}
return kvIndices, nil
}

func (w *PollingClient) ReadContractField(fieldName string, blockNumber *big.Int) ([]byte, error) {
h := crypto.Keccak256Hash([]byte(fieldName + "()"))
msg := ethereum.CallMsg{
Expand Down
67 changes: 59 additions & 8 deletions ethstorage/scanner/README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,65 @@
# EthStorage Scanner

A data verification service periodically checks if the data hashes of the blobs in local storage files align with the key-value hashes in the storage contract. If any mismatch found, the service looks for the correct blob in the p2p network, and update the data in the local storage.
A data verification service that periodically checks whether locally stored KV blobs match the on-chain KV meta hash (commit) from the storage contract.

This service offers a lightweight yet effective way to maintain network-wide data consistency.
If a mismatch is detected, the scanner can attempt to repair the local data by re-fetching the blob from the network and rewriting it with meta validation.

### Usage
## Options

The scanner service is enabled with `check meta` mode by default:
- `--scanner.mode` Data scan mode, 0: disabled, 1: check meta, 2: check blob (default: 1)[`ES_NODE_SCANNER_MODE`].
| Flag | Default | Env var | Description |
| --- | --- | --- | --- |
| `--scanner.mode` | `1` | `ES_NODE_SCANNER_MODE` | Data scan mode (bitmask): `0`=disabled, `1`=meta, `2`=blob, `4`=block. Combine via sum/OR (e.g. `3`=`1+2`, `5`=`1+4`, `7`=`1+2+4`). |
| `--scanner.batch-size` | `8192` | `ES_NODE_SCANNER_BATCH_SIZE` | The number of KVs to scan per batch for check-meta and check-blob modes. No impact on check-block mode. |
| `--scanner.interval.meta` | `3` (minutes) | `ES_NODE_SCANNER_INTERVAL_META` | Scan interval for `check-meta`. |
| `--scanner.interval.blob` | `60` (minutes) | `ES_NODE_SCANNER_INTERVAL_BLOB` | Scan interval for `check-blob`. |
| `--scanner.interval.block` | `1440` (minutes) | `ES_NODE_SCANNER_INTERVAL_BLOCK` | Scan interval for `check-block`. |

The following settings are required if the service is not disabled manually:
- `--scanner.batch-size` Data scan batch size (default: 8192) [`$ES_NODE_SCANNER_BATCH_SIZE`]
- `--scanner.interval` Data scan interval in minutes (default: 3) [`$ES_NODE_SCANNER_INTERVAL`]
## Scan modes explained

The flag / env `--scanner.mode` (default: `1`) [`ES_NODE_SCANNER_MODE`] is a bitmask:

- `0`: disabled
- `1`: check-meta (compare local meta with on-chain meta)
- `2`: check-blob (read local blob and validate its commit against on-chain meta)
- `4`: check-block (scan recently updated KVs from finalized blocks, then run check-blob on them)

### Quick comparison

| Name | `--scanner.mode` | What it does | Performance impact | Notes |
| --- | ---: | --- | --- | --- |
| check-meta | `1` | Read local meta and compare with on-chain meta | Low | Minimal impact; may miss some mismatches |
| check-blob | `2` | Compute the commit from a local blob and validate it against on-chain meta | High | Best precision; highest IO/CPU cost when many blobs |
| check-block | `4` | Scan recently finalized blocks for updated KVs, then run `check-blob` on them | High | Ensure newly updated blobs are fetched and verified within the Beacon node retention window |

### More choices

You can combine modes by summing/OR-ing them to get mixed behavior and balance precision, coverage vs performance:

- `3` = `1 + 2` = meta + blob
- `5` = `1 + 4` = meta + block
- `6` = `2 + 4` = blob + block
- `7` = meta + blob + block
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is your recommended mode for our mainnet miner nodes?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we don't have many blobs yet, I'd rather use mode 6 to check both all and the most recent ones. Should we update the shells to make it the default?



> [!TIP]
> `--scanner.batch-size` and `--scanner.interval.*` control the batch size and frequency of each scan mode, so you can tune the performance impact further based on the amount of data and hardware resources.

> [!NOTE]
> `--scanner.batch-size` only affects `check-meta` and `check-blob` modes, not `check-block`.

> [!WARNING]
> If `--scanner.batch-size` is set higher than the default, it may cause `out of gas` error while querying meta data from the L1 contract.

## Status tracking

When es-node starts, the scanner only starts after the node finishes syncing all shards from the P2P network.

After it starts, the scanner periodically logs summaries and statistics (mismatched/unfixed counts). These counts are also exposed in the node state as `scan_stats`.

## Repair behavior

A background repair loop periodically retries/fixes mismatched KVs by fetching blobs from the p2p network and rewriting them locally.

- If mismatches are detected for the first time, the KV is marked as `pending`, meaning it is scheduled for repair. Sometimes the mismatch is transient (e.g., due to download latency) and may be recovered automatically by the downloader.
- If the KV is repaired successfully or recovered, it is removed from the mismatch list.
- If the repair fails, it remains in the mismatch list and is marked as `failed` for future retries.
63 changes: 42 additions & 21 deletions ethstorage/scanner/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ const (
// Default Kvs per scan batch; bigger than this may cause run out of gas
defaultBatchSize = 8192
// Default intervals in minutes
defaultIntervalMeta = 3
defaultIntervalBlob = 60
defaultIntervalMeta = 3
defaultIntervalBlob = 60
defaultIntervalBlock = 1440 // 1 day
// Minutes between fixing loops
fixingInterval = 12
// Minutes to wait before trying to fix pending mismatches
Expand All @@ -29,18 +30,22 @@ const (
modeCheckMeta
// Compute meta hashes from local blobs and compare with those in L1 contract
modeCheckBlob
// Scan updated KVs from recent blocks and run "check-blob" on them
modeCheckBlock
)

const (
modeSetMeta scanModeSet = 1 << iota // 1
modeSetBlob // 2
modeSetMeta scanModeSet = 1 << iota // 1
modeSetBlob // 2
modeSetBlock // 4
)

const (
ModeFlagName = "scanner.mode"
BatchSizeFlagName = "scanner.batch-size"
IntervalMetaFlagName = "scanner.interval.meta"
IntervalBlobFlagName = "scanner.interval.blob"
ModeFlagName = "scanner.mode"
BatchSizeFlagName = "scanner.batch-size"
IntervalMetaFlagName = "scanner.interval.meta"
IntervalBlobFlagName = "scanner.interval.blob"
IntervalBlockFlagName = "scanner.interval.block"
)

// scanModeSet is a combination of scanMode values used for configuration purposes.
Expand All @@ -61,6 +66,12 @@ func (m scanModeSet) String() string {
}
out += "check-blob"
}
if m&modeSetBlock != 0 {
if out != "" {
out += "+"
}
out += "check-block"
}
return out
}

Expand All @@ -69,23 +80,25 @@ func scannerEnv(name string) string {
}

type Config struct {
Mode scanModeSet
BatchSize int
IntervalMeta time.Duration
IntervalBlob time.Duration
Mode scanModeSet
BatchSize int
L1SlotTime time.Duration
IntervalMeta time.Duration
IntervalBlob time.Duration
IntervalBlock time.Duration
}

func CLIFlags() []cli.Flag {
flags := []cli.Flag{
cli.IntFlag{
Name: ModeFlagName,
Usage: "Data scan mode (bitmask) : 0=disabled, 1=meta, 2=blob; combinations via sum/OR: 3=meta+blob",
Usage: "Data scan mode (bitmask) : 0=disabled, 1=meta, 2=blob, 4=block; combinations via sum/OR: 3=meta+blob, 5=meta+block, 6=blob+block, 7=all",
EnvVar: scannerEnv("MODE"),
Value: 1,
},
cli.IntFlag{
Name: BatchSizeFlagName,
Usage: "Data scan batch size",
Usage: "The number of KVs to scan per batch for check-meta and check-blob modes. No impact on check-block mode.",
EnvVar: scannerEnv("BATCH_SIZE"),
Value: defaultBatchSize,
},
Expand All @@ -101,26 +114,34 @@ func CLIFlags() []cli.Flag {
EnvVar: scannerEnv("INTERVAL_BLOB"),
Value: defaultIntervalBlob,
},
cli.IntFlag{
Name: IntervalBlockFlagName,
Usage: fmt.Sprintf("Data scan interval of check-block mode in minutes (default %d)", defaultIntervalBlock),
EnvVar: scannerEnv("INTERVAL_BLOCK"),
Value: defaultIntervalBlock,
},
}
return flags
}

const scanModeSetMask = modeSetMeta | modeSetBlob
const scanModeSetMask = modeSetMeta | modeSetBlob | modeSetBlock // 7

func NewConfig(ctx *cli.Context) (*Config, error) {
func NewConfig(ctx *cli.Context, slot uint64) (*Config, error) {
rawMode := scanModeSet(ctx.GlobalInt(ModeFlagName))
if rawMode == 0 {
return nil, nil
}
// Check for invalid bits outside the valid mask
if rawMode&^scanModeSetMask != 0 {
return nil, fmt.Errorf("invalid scanner mode: %d, valid values are 0 (disabled), 1 (meta), 2 (blob), or 3 (meta+blob)", rawMode)
return nil, fmt.Errorf("invalid scanner mode: %d, valid values are 0 (disabled), 1 (meta), 2 (blob), 3 (meta+blob), 4 (block), 5 (meta+block), 6 (blob+block), 7 (all)", rawMode)
}
mode := rawMode & scanModeSetMask
return &Config{
Mode: mode,
BatchSize: ctx.GlobalInt(BatchSizeFlagName),
IntervalMeta: time.Minute * time.Duration(ctx.GlobalInt(IntervalMetaFlagName)),
IntervalBlob: time.Minute * time.Duration(ctx.GlobalInt(IntervalBlobFlagName)),
Mode: mode,
BatchSize: ctx.GlobalInt(BatchSizeFlagName),
L1SlotTime: time.Second * time.Duration(slot),
IntervalMeta: time.Minute * time.Duration(ctx.GlobalInt(IntervalMetaFlagName)),
IntervalBlob: time.Minute * time.Duration(ctx.GlobalInt(IntervalBlobFlagName)),
IntervalBlock: time.Minute * time.Duration(ctx.GlobalInt(IntervalBlockFlagName)),
}, nil
}
26 changes: 20 additions & 6 deletions ethstorage/scanner/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func New(
cfg Config,
sm *es.StorageManager,
fetchBlob es.FetchBlobFunc,
l1 es.Il1Source,
l1 IL1,
feed *event.Feed,
lg log.Logger,
) *Scanner {
Expand Down Expand Up @@ -108,6 +108,10 @@ func (s *Scanner) start() {
if s.cfg.Mode&modeSetBlob != 0 {
s.launchScanLoop(s.blobScanLoopRuntime())
}
if s.cfg.Mode&modeSetBlock != 0 {
s.launchScanLoop(s.blockScanLoopRuntime())
}

s.lg.Info("Scanner started", "mode", s.cfg.Mode.String())

s.startReporter()
Expand Down Expand Up @@ -165,6 +169,16 @@ func (s *Scanner) updateStats(kvi uint64, sc *scanned) {
}
}

func (s *Scanner) metaScanLoopRuntime() *scanLoopRuntime {
return &scanLoopRuntime{
mode: modeCheckMeta,
nextBatch: s.worker.getKvsInBatch,
interval: s.cfg.IntervalMeta,
batchSize: uint64(s.cfg.BatchSize),
nextIndex: 0,
}
}

func (s *Scanner) blobScanLoopRuntime() *scanLoopRuntime {
return &scanLoopRuntime{
mode: modeCheckBlob,
Expand All @@ -175,12 +189,12 @@ func (s *Scanner) blobScanLoopRuntime() *scanLoopRuntime {
}
}

func (s *Scanner) metaScanLoopRuntime() *scanLoopRuntime {
func (s *Scanner) blockScanLoopRuntime() *scanLoopRuntime {
return &scanLoopRuntime{
mode: modeCheckMeta,
nextBatch: s.worker.getKvsInBatch,
interval: s.cfg.IntervalMeta,
batchSize: uint64(s.cfg.BatchSize),
mode: modeCheckBlock,
nextBatch: s.worker.latestUpdated,
interval: s.cfg.IntervalBlock,
batchSize: uint64(s.cfg.IntervalBlock / s.cfg.L1SlotTime), // number of slots in the interval
nextIndex: 0,
}
}
Expand Down
2 changes: 2 additions & 0 deletions ethstorage/scanner/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ func (m scanMode) String() string {
return "check-meta"
case modeCheckBlob:
return "check-blob"
case modeCheckBlock:
return "check-block"
default:
return fmt.Sprintf("unknown(%d)", int(m))
}
Expand Down
58 changes: 55 additions & 3 deletions ethstorage/scanner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import (
"context"
"errors"
"fmt"
"math/big"
"strings"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"

Expand All @@ -27,17 +29,23 @@ type IStorageManager interface {
Shards() []uint64
}

type IL1 interface {
GetKvMetas(kvIndices []uint64, blockNumber int64) ([][32]byte, error)
GetUpdatedKvIndices(startBlock, endBlock *big.Int) ([]uint64, error)
HeaderByNumber(context.Context, *big.Int) (*types.Header, error)
}

type Worker struct {
sm IStorageManager
fetchBlob es.FetchBlobFunc
l1 es.Il1Source
l1 IL1
lg log.Logger
}

func NewWorker(
sm IStorageManager,
fetch es.FetchBlobFunc,
l1 es.Il1Source,
l1 IL1,
lg log.Logger,
) *Worker {
return &Worker{
Expand Down Expand Up @@ -86,7 +94,12 @@ func (s *Worker) scanBatch(ctx context.Context, runtime *scanLoopRuntime, update

var commit common.Hash
copy(commit[:], meta[32-es.HashSizeInContract:32])
s.scanKv(runtime.mode, kvsInBatch[i], commit, updateStatus)
mode := runtime.mode
if mode == modeCheckBlock {
// since we done parsing blob info from block
mode = modeCheckBlob
}
s.scanKv(mode, kvsInBatch[i], commit, updateStatus)
}

runtime.nextIndex = batchEndExclusive
Expand Down Expand Up @@ -197,6 +210,45 @@ func (s *Worker) getKvsInBatch(batchSize uint64, startIndexOfKvIdx uint64) ([]ui
return getKvsInBatch(shards, kvEntries, localKvCount, batchSize, startIndexOfKvIdx, s.lg)
}

// latestUpdated fetches the latest updated KV indices from L1 contract within the given number of blocks to scan.
func (s *Worker) latestUpdated(blocksToScan uint64, lastScannedBlock uint64) ([]uint64, uint64) {
latestFinalized, err := s.l1.HeaderByNumber(context.Background(), big.NewInt(int64(rpc.FinalizedBlockNumber)))
if err != nil {
s.lg.Error("Failed to get latest finalized block header", "error", err)
return []uint64{}, lastScannedBlock
}
startBlock := lastScannedBlock + 1
endBlock := latestFinalized.Number.Uint64()
if lastScannedBlock == 0 {
startBlock = endBlock - blocksToScan
s.lg.Info(fmt.Sprintf("Starting from %d slots ago (block %d)", blocksToScan, startBlock))
}
if startBlock > endBlock {
s.lg.Info("No new finalized blocks to scan", "lastScannedBlock", lastScannedBlock, "latestFinalized", endBlock)
return []uint64{}, lastScannedBlock
}
kvsIndices, err := s.l1.GetUpdatedKvIndices(big.NewInt(int64(startBlock)), big.NewInt(int64(endBlock)))
if err != nil {
s.lg.Error("Failed to get updated KV indices", "startBlock", startBlock, "endBlock", endBlock, "error", err)
return []uint64{}, lastScannedBlock
}
// filter out kv indices that are not stored in local storage
shardSet := make(map[uint64]struct{})
for _, shard := range s.sm.Shards() {
shardSet[shard] = struct{}{}
}
kvEntries := s.sm.KvEntries()
var locallyStored []uint64
for _, kvi := range kvsIndices {
shardIdx := kvi / kvEntries
if _, ok := shardSet[shardIdx]; ok {
locallyStored = append(locallyStored, kvi)
}
}
s.lg.Info("Latest updated KV indices fetched", "startBlock", startBlock, "endBlock", endBlock, "totalUpdatedKvs", len(kvsIndices), "locallyStored", len(locallyStored))
return locallyStored, endBlock
}

func getKvsInBatch(shards []uint64, kvEntries, localKvCount, batchSize, startKvIndex uint64, lg log.Logger) ([]uint64, uint64) {
// Determine batch start and end KV indices
if startKvIndex >= localKvCount {
Expand Down
Loading