Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion tx-submitter/constants/methods.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
package constants

const (
// MethodCommitBatch is the method name for committing a batch
// MethodCommitBatch is the method name for committing a batch (with blob when applicable)
MethodCommitBatch = "commitBatch"
// MethodCommitState is the method name for recommitting batch state using stored blob hash (no blob in tx)
MethodCommitState = "commitState"
// MethodFinalizeBatch is the method name for finalizing a batch
MethodFinalizeBatch = "finalizeBatch"
)

// IsCommitLikeMethod returns true for commitBatch or commitState (same calldata shape for batch index parsing).
func IsCommitLikeMethod(method string) bool {
return method == MethodCommitBatch || method == MethodCommitState
}
1 change: 1 addition & 0 deletions tx-submitter/iface/rollup.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type IRollup interface {
BatchInsideChallengeWindow(opts *bind.CallOpts, batchIndex *big.Int) (bool, error)
BatchExist(opts *bind.CallOpts, batchIndex *big.Int) (bool, error)
CommittedBatches(opts *bind.CallOpts, batchIndex *big.Int) ([32]byte, error)
BatchBlobVersionedHashes(opts *bind.CallOpts, batchIndex *big.Int) ([32]byte, error)
BatchDataStore(opts *bind.CallOpts, batchIndex *big.Int) (struct {
OriginTimestamp *big.Int
FinalizeTimestamp *big.Int
Expand Down
5 changes: 5 additions & 0 deletions tx-submitter/mock/rollup.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ func (m *MockRollup) CommittedBatches(opts *bind.CallOpts, batchIndex *big.Int)
return [32]byte{}, nil
}

// BatchBlobVersionedHashes implements IRollup (no stored hash by default)
func (m *MockRollup) BatchBlobVersionedHashes(opts *bind.CallOpts, batchIndex *big.Int) ([32]byte, error) {
return [32]byte{}, nil
}

// BatchDataStore implements IRollup
func (m *MockRollup) BatchDataStore(opts *bind.CallOpts, batchIndex *big.Int) (struct {
OriginTimestamp *big.Int
Expand Down
57 changes: 33 additions & 24 deletions tx-submitter/services/pendingtx.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,6 @@ import (
"github.com/morph-l2/go-ethereum/log"
)

const (
// MethodCommitBatch is the method name for committing a batch
MethodCommitBatch = "commitBatch"
// MethodFinalizeBatch is the method name for finalizing a batch
MethodFinalizeBatch = "finalizeBatch"
)

// Journal defines the interface for transaction journaling
type Journal interface {
Init() error
Expand All @@ -43,21 +36,16 @@ type PendingTxs struct {
pindex uint64 // pending batch index
pfinalize uint64

commitBatchId []byte
finalizeBatchId []byte

journal Journal
}

// NewPendingTxs creates a new PendingTxs instance
func NewPendingTxs(commitBatchMethodId, finalizeBatchMethodId []byte, journal Journal) *PendingTxs {
pt := &PendingTxs{
txinfos: make(map[common.Hash]*types.TxRecord),
journal: journal,
commitBatchId: commitBatchMethodId,
finalizeBatchId: finalizeBatchMethodId,
// NewPendingTxs creates a new PendingTxs instance.
// Commit-like txs (commitBatch / commitState) are detected via utils.ParseMethod + constants.IsCommitLikeMethod, not stored method IDs.
func NewPendingTxs(journal Journal) *PendingTxs {
return &PendingTxs{
txinfos: make(map[common.Hash]*types.TxRecord),
journal: journal,
}
return pt
}

// Store persists a transaction to the journal
Expand Down Expand Up @@ -131,6 +119,13 @@ func (pt *PendingTxs) GetAll() []*types.TxRecord {
return pt.getAll()
}

// Len returns the number of pending transactions (thread-safe).
func (pt *PendingTxs) Len() int {
pt.mu.RLock()
defer pt.mu.RUnlock()
return len(pt.txinfos)
}

func (pt *PendingTxs) getAll() []*types.TxRecord {
txs := make([]*types.TxRecord, 0, len(pt.txinfos))
for _, tx := range pt.txinfos {
Expand Down Expand Up @@ -217,7 +212,7 @@ func (pt *PendingTxs) GetPFinalize() uint64 {

// ExistedIndex checks if a batch index exists
func (pt *PendingTxs) ExistedIndex(index uint64) bool {
txs := pt.GetAll() // already has RLock
txs := pt.GetAll() // snapshot taken under RLock inside GetAll; caller does not hold the mutex
abi, err := bindings.RollupMetaData.GetAbi()
if err != nil {
log.Error("Failed to get ABI", "err", err)
Expand All @@ -226,7 +221,7 @@ func (pt *PendingTxs) ExistedIndex(index uint64) bool {

for i := len(txs) - 1; i >= 0; i-- {
tx := txs[i].Tx
if utils.ParseMethod(tx, abi) == constants.MethodCommitBatch {
if constants.IsCommitLikeMethod(utils.ParseMethod(tx, abi)) {
pindex := utils.ParseParentBatchIndex(tx.Data()) + 1
if index == pindex {
return true
Expand All @@ -252,7 +247,7 @@ func (pt *PendingTxs) Recover(txs []*ethtypes.Transaction, abi *abi.ABI) error {

// Get batch index based on method
var batchIndex uint64
if method == constants.MethodCommitBatch {
if constants.IsCommitLikeMethod(method) {
batchIndex = utils.ParseParentBatchIndex(tx.Data()) + 1
if batchIndex > maxCommitBatchIndex {
maxCommitBatchIndex = batchIndex
Expand Down Expand Up @@ -281,17 +276,31 @@ func (pt *PendingTxs) Recover(txs []*ethtypes.Transaction, abi *abi.ABI) error {
"type", tx.Type(),
)

if err := pt.Add(tx); err != nil {
return fmt.Errorf("failed to add tx during recovery: %w", err)
// Add to in-memory map only; do not write to journal yet.
// The original journal data is preserved until dump() succeeds below,
// so a crash here is safe — the next restart will re-read the original entries.
pt.mu.Lock()
pt.txinfos[tx.Hash()] = &types.TxRecord{
Tx: tx,
SendTime: uint64(time.Now().Unix()),
QueryTimes: 0,
Confirmed: false,
}
pt.mu.Unlock()
}

pt.SetPindex(maxCommitBatchIndex)
pt.SetPFinalize(maxFinalizeBatchIndex)
pt.SetNonce(txs[len(txs)-1].Nonce())

// Rewrite the journal with the deduplicated in-memory set.
// This replaces any duplicate entries accumulated by previous buggy restarts.
if err := pt.dump(); err != nil {
return fmt.Errorf("failed to rewrite journal after recovery: %w", err)
}

log.Info("Recovered from mempool",
"tx_count", len(txs),
"tx_count", len(pt.txinfos),
"max_batch_index", maxCommitBatchIndex,
"max_finalize_index", maxFinalizeBatchIndex,
"max_nonce", pt.GetNonce(),
Expand Down
10 changes: 2 additions & 8 deletions tx-submitter/services/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,18 @@ type ReorgDetector struct {
maxHistory int

l1Client iface.Client
metrics iface.IMetrics
}

type blockInfo struct {
number uint64
hash common.Hash
}

func NewReorgDetector(l1Client iface.Client, metrics iface.IMetrics) *ReorgDetector {
func NewReorgDetector(l1Client iface.Client) *ReorgDetector {
return &ReorgDetector{
blockHistory: make([]blockInfo, 0),
maxHistory: 5, // Track last 50 blocks
maxHistory: 5, // Track last 5 blocks
l1Client: l1Client,
metrics: metrics,
}
}

Expand Down Expand Up @@ -77,10 +75,6 @@ func (r *ReorgDetector) DetectReorg(ctx context.Context) (bool, uint64, error) {
"new_hash", block.Hash(),
"block_number", info.number)

// Update metrics
r.metrics.IncReorgs()
r.metrics.SetReorgDepth(float64(reorgDepth))

// Truncate history before reorg point and rebuild
r.blockHistory = r.blockHistory[:i]
err = r.updateHistory(ctx)
Expand Down
Loading
Loading