diff --git a/tx-submitter/constants/methods.go b/tx-submitter/constants/methods.go index aefe24d0..7de9eb49 100644 --- a/tx-submitter/constants/methods.go +++ b/tx-submitter/constants/methods.go @@ -1,8 +1,15 @@ package constants const ( - // MethodCommitBatch is the method name for committing a batch + // MethodCommitBatch is the method name for committing a batch (with blob when applicable) MethodCommitBatch = "commitBatch" + // MethodCommitState is the method name for recommitting batch state using stored blob hash (no blob in tx) + MethodCommitState = "commitState" // MethodFinalizeBatch is the method name for finalizing a batch MethodFinalizeBatch = "finalizeBatch" ) + +// IsCommitLikeMethod returns true for commitBatch or commitState (same calldata shape for batch index parsing). +func IsCommitLikeMethod(method string) bool { + return method == MethodCommitBatch || method == MethodCommitState +} diff --git a/tx-submitter/iface/rollup.go b/tx-submitter/iface/rollup.go index afa7d46a..9a60a64b 100644 --- a/tx-submitter/iface/rollup.go +++ b/tx-submitter/iface/rollup.go @@ -18,6 +18,7 @@ type IRollup interface { BatchInsideChallengeWindow(opts *bind.CallOpts, batchIndex *big.Int) (bool, error) BatchExist(opts *bind.CallOpts, batchIndex *big.Int) (bool, error) CommittedBatches(opts *bind.CallOpts, batchIndex *big.Int) ([32]byte, error) + BatchBlobVersionedHashes(opts *bind.CallOpts, batchIndex *big.Int) ([32]byte, error) BatchDataStore(opts *bind.CallOpts, batchIndex *big.Int) (struct { OriginTimestamp *big.Int FinalizeTimestamp *big.Int diff --git a/tx-submitter/mock/rollup.go b/tx-submitter/mock/rollup.go index dbf424a5..d5757093 100644 --- a/tx-submitter/mock/rollup.go +++ b/tx-submitter/mock/rollup.go @@ -65,6 +65,11 @@ func (m *MockRollup) CommittedBatches(opts *bind.CallOpts, batchIndex *big.Int) return [32]byte{}, nil } +// BatchBlobVersionedHashes implements IRollup (no stored hash by default) +func (m *MockRollup) BatchBlobVersionedHashes(opts *bind.CallOpts, batchIndex *big.Int) ([32]byte, error) { + return [32]byte{}, nil +} + // BatchDataStore implements IRollup func (m *MockRollup) BatchDataStore(opts *bind.CallOpts, batchIndex *big.Int) (struct { OriginTimestamp *big.Int diff --git a/tx-submitter/services/pendingtx.go b/tx-submitter/services/pendingtx.go index fc5ac18c..b94735b9 100644 --- a/tx-submitter/services/pendingtx.go +++ b/tx-submitter/services/pendingtx.go @@ -18,13 +18,6 @@ import ( "github.com/morph-l2/go-ethereum/log" ) -const ( - // MethodCommitBatch is the method name for committing a batch - MethodCommitBatch = "commitBatch" - // MethodFinalizeBatch is the method name for finalizing a batch - MethodFinalizeBatch = "finalizeBatch" -) - // Journal defines the interface for transaction journaling type Journal interface { Init() error @@ -43,21 +36,16 @@ type PendingTxs struct { pindex uint64 // pending batch index pfinalize uint64 - commitBatchId []byte - finalizeBatchId []byte - journal Journal } -// NewPendingTxs creates a new PendingTxs instance -func NewPendingTxs(commitBatchMethodId, finalizeBatchMethodId []byte, journal Journal) *PendingTxs { - pt := &PendingTxs{ - txinfos: make(map[common.Hash]*types.TxRecord), - journal: journal, - commitBatchId: commitBatchMethodId, - finalizeBatchId: finalizeBatchMethodId, +// NewPendingTxs creates a new PendingTxs instance. +// Commit-like txs (commitBatch / commitState) are detected via utils.ParseMethod + constants.IsCommitLikeMethod, not stored method IDs. +func NewPendingTxs(journal Journal) *PendingTxs { + return &PendingTxs{ + txinfos: make(map[common.Hash]*types.TxRecord), + journal: journal, } - return pt } // Store persists a transaction to the journal @@ -131,6 +119,13 @@ func (pt *PendingTxs) GetAll() []*types.TxRecord { return pt.getAll() } +// Len returns the number of pending transactions (thread-safe). +func (pt *PendingTxs) Len() int { + pt.mu.RLock() + defer pt.mu.RUnlock() + return len(pt.txinfos) +} + func (pt *PendingTxs) getAll() []*types.TxRecord { txs := make([]*types.TxRecord, 0, len(pt.txinfos)) for _, tx := range pt.txinfos { @@ -217,7 +212,7 @@ func (pt *PendingTxs) GetPFinalize() uint64 { // ExistedIndex checks if a batch index exists func (pt *PendingTxs) ExistedIndex(index uint64) bool { - txs := pt.GetAll() // already has RLock + txs := pt.GetAll() // snapshot taken under RLock inside GetAll; caller does not hold the mutex abi, err := bindings.RollupMetaData.GetAbi() if err != nil { log.Error("Failed to get ABI", "err", err) @@ -226,7 +221,7 @@ func (pt *PendingTxs) ExistedIndex(index uint64) bool { for i := len(txs) - 1; i >= 0; i-- { tx := txs[i].Tx - if utils.ParseMethod(tx, abi) == constants.MethodCommitBatch { + if constants.IsCommitLikeMethod(utils.ParseMethod(tx, abi)) { pindex := utils.ParseParentBatchIndex(tx.Data()) + 1 if index == pindex { return true @@ -252,7 +247,7 @@ func (pt *PendingTxs) Recover(txs []*ethtypes.Transaction, abi *abi.ABI) error { // Get batch index based on method var batchIndex uint64 - if method == constants.MethodCommitBatch { + if constants.IsCommitLikeMethod(method) { batchIndex = utils.ParseParentBatchIndex(tx.Data()) + 1 if batchIndex > maxCommitBatchIndex { maxCommitBatchIndex = batchIndex @@ -281,17 +276,31 @@ func (pt *PendingTxs) Recover(txs []*ethtypes.Transaction, abi *abi.ABI) error { "type", tx.Type(), ) - if err := pt.Add(tx); err != nil { - return fmt.Errorf("failed to add tx during recovery: %w", err) + // Add to in-memory map only; do not write to journal yet. + // The original journal data is preserved until dump() succeeds below, + // so a crash here is safe — the next restart will re-read the original entries. + pt.mu.Lock() + pt.txinfos[tx.Hash()] = &types.TxRecord{ + Tx: tx, + SendTime: uint64(time.Now().Unix()), + QueryTimes: 0, + Confirmed: false, } + pt.mu.Unlock() } pt.SetPindex(maxCommitBatchIndex) pt.SetPFinalize(maxFinalizeBatchIndex) pt.SetNonce(txs[len(txs)-1].Nonce()) + // Rewrite the journal with the deduplicated in-memory set. + // This replaces any duplicate entries accumulated by previous buggy restarts. + if err := pt.dump(); err != nil { + return fmt.Errorf("failed to rewrite journal after recovery: %w", err) + } + log.Info("Recovered from mempool", - "tx_count", len(txs), + "tx_count", len(pt.txinfos), "max_batch_index", maxCommitBatchIndex, "max_finalize_index", maxFinalizeBatchIndex, "max_nonce", pt.GetNonce(), diff --git a/tx-submitter/services/reorg.go b/tx-submitter/services/reorg.go index 61c65d68..8ba385a1 100644 --- a/tx-submitter/services/reorg.go +++ b/tx-submitter/services/reorg.go @@ -21,7 +21,6 @@ type ReorgDetector struct { maxHistory int l1Client iface.Client - metrics iface.IMetrics } type blockInfo struct { @@ -29,12 +28,11 @@ type blockInfo struct { hash common.Hash } -func NewReorgDetector(l1Client iface.Client, metrics iface.IMetrics) *ReorgDetector { +func NewReorgDetector(l1Client iface.Client) *ReorgDetector { return &ReorgDetector{ blockHistory: make([]blockInfo, 0), - maxHistory: 5, // Track last 50 blocks + maxHistory: 5, // Track last 5 blocks l1Client: l1Client, - metrics: metrics, } } @@ -77,10 +75,6 @@ func (r *ReorgDetector) DetectReorg(ctx context.Context) (bool, uint64, error) { "new_hash", block.Hash(), "block_number", info.number) - // Update metrics - r.metrics.IncReorgs() - r.metrics.SetReorgDepth(float64(reorgDepth)) - // Truncate history before reorg point and rebuild r.blockHistory = r.blockHistory[:i] err = r.updateHistory(ctx) diff --git a/tx-submitter/services/rollup.go b/tx-submitter/services/rollup.go index e9d6b875..00ffa014 100644 --- a/tx-submitter/services/rollup.go +++ b/tx-submitter/services/rollup.go @@ -106,7 +106,7 @@ func NewRollup( eventInfoStorage *event.EventInfoStorage, l2Caller *types.L2Caller, ) *Rollup { - reorgDetector := NewReorgDetector(l1, metrics) + reorgDetector := NewReorgDetector(l1) r := &Rollup{ ctx: ctx, metrics: metrics, @@ -151,7 +151,7 @@ func (r *Rollup) Start() error { log.Crit("journal file init failed", "err", err) } // pendingtxs - r.pendingTxs = NewPendingTxs(r.abi.Methods[constants.MethodCommitBatch].ID, r.abi.Methods[constants.MethodFinalizeBatch].ID, jn) + r.pendingTxs = NewPendingTxs(jn) txs, err := jn.ParseAllTxs() if err != nil { log.Crit("parse l1 mempool error", "error", err) @@ -298,10 +298,15 @@ func (r *Rollup) Start() error { func (r *Rollup) ProcessTx() error { // Check for reorgs first with exponential backoff retry - _, _, err := r.detectReorgWithRetry() + hasReorg, depth, err := r.detectReorgWithRetry() if err != nil { log.Warn("Failed to check for reorgs", "error", err) } + if hasReorg { + if err := r.handleReorg(depth); err != nil { + log.Warn("Post-reorg handling failed", "error", err) + } + } // Get all local transactions txRecords := r.pendingTxs.GetAll() @@ -312,8 +317,20 @@ func (r *Rollup) ProcessTx() error { // Check if this submitter should process transactions if err = r.checkSubmitterTurn(); err != nil { if errors.Is(err, errNotMyTurn) { + // If rotator is not configured or not yet initialized, just skip this round safely. + if r.rotator == nil || r.rotator.startTime == nil || r.rotator.epoch == nil { + log.Info("Awaiting turn for transaction processing, but rotator state is not initialized", + "has_rotator", r.rotator != nil) + return nil + } + // Get current submitter index for logging - activeSubmitter, activeIndex, _ := r.rotator.CurrentSubmitter(r.L2Clients, r.Staking) + activeSubmitter, activeIndex, rotErr := r.rotator.CurrentSubmitter(r.L2Clients, r.Staking) + if rotErr != nil || activeSubmitter == nil { + log.Warn("Failed to get current submitter while awaiting turn", + "error", rotErr) + return nil + } // Calculate rotation timing information past := (time.Now().Unix() - r.rotator.startTime.Int64()) % r.rotator.epoch.Int64() @@ -366,9 +383,11 @@ func (r *Rollup) detectReorgWithRetry() (bool, uint64, error) { var errNotMyTurn = errors.New("not my turn") func (r *Rollup) checkSubmitterTurn() error { - if r.cfg.PriorityRollup { + // If we are in priority rollup mode or rotator is not configured, always allow processing. + if r.cfg.PriorityRollup || r.rotator == nil { return nil } + activeSubmitter, submitterIndex, err := r.rotator.CurrentSubmitter(r.L2Clients, r.Staking) if err != nil { return fmt.Errorf("rollup: get current submitter err, %w", err) @@ -379,6 +398,11 @@ func (r *Rollup) checkSubmitterTurn() error { isMyTurn := activeAddress == myAddress // Calculate rotation timing information + if r.rotator.startTime == nil || r.rotator.epoch == nil { + log.Warn("rotator state not initialized, skipping submitter turn check", + "has_rotator", r.rotator != nil) + return errNotMyTurn + } past := (time.Now().Unix() - r.rotator.startTime.Int64()) % r.rotator.epoch.Int64() start := time.Now().Unix() - past end := start + r.rotator.epoch.Int64() @@ -409,11 +433,13 @@ func (r *Rollup) checkSubmitterTurn() error { return nil } -// Handle chain reorganization +// Handle chain reorganization (metrics + pending pool; DetectReorg only detects and updates block history). func (r *Rollup) handleReorg(depth uint64) error { - // Update metrics r.metrics.SetReorgDepth(float64(depth)) r.metrics.IncReorgs() + if r.pendingTxs != nil { + r.pendingTxs.ClearConfirmedTxs() + } return nil } @@ -467,7 +493,7 @@ func (r *Rollup) processSingleTx(txRecord *types.TxRecord) error { r.metrics.IncTxConfirmed(method) return nil } - return r.handleConfirmedTx(txRecord, rtx, method) + return r.handleConfirmedTx(txRecord, rtx, status, currentBlock) case txStatusMissing: return r.handleMissingTx(txRecord, rtx, method) default: @@ -481,7 +507,7 @@ func (r *Rollup) updateFeeMetrics(tx *ethtypes.Transaction, receipt *ethtypes.Re txFeeFloat, _ := txFeeEth.Float64() // Update metrics based on transaction type - if method == constants.MethodCommitBatch { + if constants.IsCommitLikeMethod(method) { r.rollupFeeSum += txFeeFloat r.metrics.RollupCostSum.Add(txFeeFloat) r.metrics.RollupCost.Set(txFeeFloat) @@ -574,19 +600,8 @@ func (r *Rollup) getTxStatus(tx *ethtypes.Transaction) (*txStatus, error) { } func (r *Rollup) handlePendingTx(txRecord *types.TxRecord, tx *ethtypes.Transaction, method string) error { - // Check for timeout - if txRecord.SendTime+uint64(r.cfg.TxTimeout.Seconds()) < uint64(time.Now().Unix()) { - log.Info("Transaction timed out", - "tx", tx.Hash().Hex(), - "nonce", tx.Nonce(), - "method", method) - - // Try to replace the transaction with higher gas price - return r.replaceTimedOutTx(tx) - } - - // Check if transaction might fail - if method == constants.MethodCommitBatch { + // Obsolete / doomed txs: cancel before timeout handling so we do not resubmit a tx that will revert. + if constants.IsCommitLikeMethod(method) { batchIndex := utils.ParseParentBatchIndex(tx.Data()) + 1 lastCommitted, err := r.Rollup.LastCommittedBatchIndex(nil) if err != nil { @@ -594,13 +609,11 @@ func (r *Rollup) handlePendingTx(txRecord *types.TxRecord, tx *ethtypes.Transact } if batchIndex <= lastCommitted.Uint64() { - // This batch is already committed by another submitter log.Info("Batch already committed by another submitter, trying to cancel transaction", "batch_index", batchIndex, "last_committed", lastCommitted.Uint64(), "tx_hash", tx.Hash().String()) - // Try to cancel the transaction since it will fail cancelTx, err := r.CancelTx(tx) if err != nil { log.Error("Failed to cancel commit batch transaction", @@ -623,9 +636,7 @@ func (r *Rollup) handlePendingTx(txRecord *types.TxRecord, tx *ethtypes.Transact if err := r.pendingTxs.Remove(tx.Hash()); err != nil { log.Error("failed to remove transaction", "hash", tx.Hash().String(), "error", err) } - if err := r.pendingTxs.Add(cancelTx); err != nil { - log.Error("failed to add cancel transaction", "hash", cancelTx.Hash().String(), "error", err) - } + // CancelTx -> SendTx already adds the cancel tx to the pool return nil } } else if method == constants.MethodFinalizeBatch { @@ -636,13 +647,11 @@ func (r *Rollup) handlePendingTx(txRecord *types.TxRecord, tx *ethtypes.Transact } if batchIndex <= lastFinalized.Uint64() { - // This batch is already finalized by another submitter log.Info("Batch already finalized by another submitter, trying to cancel transaction", "batch_index", batchIndex, "last_finalized", lastFinalized.Uint64(), "tx_hash", tx.Hash().String()) - // Try to cancel the transaction since it will fail cancelTx, err := r.CancelTx(tx) if err != nil { log.Error("Failed to cancel finalize batch transaction", @@ -665,13 +674,19 @@ func (r *Rollup) handlePendingTx(txRecord *types.TxRecord, tx *ethtypes.Transact if err := r.pendingTxs.Remove(tx.Hash()); err != nil { log.Error("failed to remove transaction", "hash", tx.Hash().String(), "error", err) } - if err := r.pendingTxs.Add(cancelTx); err != nil { - log.Error("failed to add cancel transaction", "hash", cancelTx.Hash().String(), "error", err) - } + // CancelTx -> SendTx already adds the cancel tx to the pool return nil } } + if txRecord.SendTime+uint64(r.cfg.TxTimeout.Seconds()) < uint64(time.Now().Unix()) { + log.Info("Transaction timed out", + "tx", tx.Hash().Hex(), + "nonce", tx.Nonce(), + "method", method) + return r.replaceTimedOutTx(tx) + } + return nil } @@ -693,9 +708,7 @@ func (r *Rollup) replaceTimedOutTx(tx *ethtypes.Transaction) error { if err := r.pendingTxs.Remove(tx.Hash()); err != nil { log.Error("failed to remove transaction", "hash", tx.Hash().String(), "error", err) } - if err := r.pendingTxs.Add(newTx); err != nil { - log.Error("failed to add new transaction", "hash", newTx.Hash().String(), "error", err) - } + // ReSubmitTx -> SendTx already adds newTx to the pool return nil } @@ -768,17 +781,12 @@ func (r *Rollup) handleDiscardedTx(txRecord *types.TxRecord, tx *ethtypes.Transa return nil } -// handleConfirmedTx handles a confirmed transaction -func (r *Rollup) handleConfirmedTx(txRecord *types.TxRecord, tx *ethtypes.Transaction, txType string) error { - status, err := r.getTxStatus(tx) - if err != nil { - return fmt.Errorf("get tx status error: %w", err) - } - - // Get the current block number for confirmation count - currentBlock, err := r.L1Client.BlockNumber(context.Background()) - if err != nil { - return fmt.Errorf("get current block number error: %w", err) +// handleConfirmedTx handles a confirmed transaction. +// status and currentBlock must come from the same processSingleTx pass as getTxStatus / BlockNumber +// so a reorg between polls cannot leave receipt nil while we still treat the tx as confirmed-on-chain. +func (r *Rollup) handleConfirmedTx(txRecord *types.TxRecord, tx *ethtypes.Transaction, status *txStatus, currentBlock uint64) error { + if status == nil || status.receipt == nil { + return nil } confirmations := currentBlock - status.receipt.BlockNumber.Uint64() @@ -790,7 +798,7 @@ func (r *Rollup) handleConfirmedTx(txRecord *types.TxRecord, tx *ethtypes.Transa method := utils.ParseMethod(tx, r.abi) if status.receipt.Status == ethtypes.ReceiptStatusFailed { - if method == constants.MethodCommitBatch { + if constants.IsCommitLikeMethod(method) { batchIndex := utils.ParseParentBatchIndex(tx.Data()) + 1 lastCommitted, err := r.Rollup.LastCommittedBatchIndex(nil) if err != nil { @@ -820,23 +828,15 @@ func (r *Rollup) handleConfirmedTx(txRecord *types.TxRecord, tx *ethtypes.Transa } } } else { // Transaction succeeded - // Get current block number for confirmation count only for successful transactions - currentBlock, err = r.L1Client.BlockNumber(context.Background()) - if err != nil { - return fmt.Errorf("get current block number error: %w", err) - } - confirmations = currentBlock - status.receipt.BlockNumber.Uint64() - - if method == constants.MethodCommitBatch { + if constants.IsCommitLikeMethod(method) { batchIndex := utils.ParseParentBatchIndex(tx.Data()) + 1 - log.Info("Successfully committed batch", "batch_index", batchIndex, "tx_hash", tx.Hash().String(), "block_number", status.receipt.BlockNumber.Uint64(), "gas_used", status.receipt.GasUsed, "confirm", confirmations) + log.Info("Successfully committed batch", "method", method, "batch_index", batchIndex, "tx_hash", tx.Hash().String(), "block_number", status.receipt.BlockNumber.Uint64(), "gas_used", status.receipt.GasUsed, "confirm", confirmations) } else if method == constants.MethodFinalizeBatch { batchIndex := utils.ParseFBatchIndex(tx.Data()) if batchIndex > 0 { if r.cfg.SealBatch { - err = r.batchCache.Delete(batchIndex - 1) - if err != nil { - log.Error("failed to delete batch", "batch_index", batchIndex, "tx_hash", tx.Hash().String()) + if delErr := r.batchCache.Delete(batchIndex - 1); delErr != nil { + log.Error("failed to delete batch", "batch_index", batchIndex, "tx_hash", tx.Hash().String(), "error", delErr) } } else { r.batchCacheLegacy.Delete(batchIndex - 1) @@ -862,7 +862,7 @@ func (r *Rollup) finalize() error { return fmt.Errorf("get last committed error:%v", err) } - target := big.NewInt(int64(r.pendingTxs.pfinalize + 1)) + target := big.NewInt(int64(r.pendingTxs.GetPFinalize() + 1)) if target.Cmp(lastFinalized) <= 0 { target = new(big.Int).Add(lastFinalized, big.NewInt(1)) } @@ -954,8 +954,9 @@ func (r *Rollup) finalize() error { gas = r.BumpGas(gas) var nonce uint64 - if r.pendingTxs.pnonce != 0 { - nonce = r.pendingTxs.pnonce + 1 + pn := r.pendingTxs.GetNonce() + if pn != 0 { + nonce = pn + 1 } else { nonce, err = r.L1Client.PendingNonceAt(context.Background(), r.WalletAddr()) if err != nil { @@ -1013,9 +1014,7 @@ func (r *Rollup) finalize() error { r.pendingTxs.SetNonce(signedTx.Nonce()) r.pendingTxs.SetPFinalize(target.Uint64()) - if err = r.pendingTxs.Add(signedTx); err != nil { - log.Error("failed to add signed transaction", "hash", signedTx.Hash().String(), "error", err) - } + // SendTx already adds signedTx to the pending pool } return nil } @@ -1099,9 +1098,9 @@ func (r *Rollup) rollup() error { } } - if len(r.pendingTxs.txinfos) > int(r.cfg.MaxTxsInPendingPool) { + if pendingN := r.pendingTxs.Len(); pendingN > int(r.cfg.MaxTxsInPendingPool) { log.Info("Pending pool full", - "current_size", len(r.pendingTxs.txinfos), + "current_size", pendingN, "max_size", r.cfg.MaxTxsInPendingPool) return nil } @@ -1114,14 +1113,16 @@ func (r *Rollup) rollup() error { } cindex := cindexBig.Uint64() batchIndex = cindex + 1 - if len(r.pendingTxs.getAll()) != 0 && r.pendingTxs.pindex != 0 { - batchIndex = max(cindex, r.pendingTxs.pindex) + 1 + pendingN := r.pendingTxs.Len() + pidx := r.pendingTxs.GetPindex() + if pendingN != 0 && pidx != 0 { + batchIndex = max(cindex, pidx) + 1 } log.Debug("Batch status", "last_committed", cindex, "next_batch", batchIndex, - "current_processing", r.pendingTxs.pindex) + "current_processing", pidx) if r.pendingTxs.ExistedIndex(batchIndex) { log.Debug("Batch already committed", @@ -1155,14 +1156,28 @@ func (r *Rollup) rollup() error { WithdrawalRoot: rpcRollupBatch.WithdrawRoot, } + storedBlobHash, err := r.Rollup.BatchBlobVersionedHashes(nil, big.NewInt(int64(batchIndex))) + if err != nil { + return fmt.Errorf("get batch blob versioned hash: %w", err) + } + useCommitState := storedBlobHash != [32]byte{} + if useCommitState { + log.Info("Using commitState (L1 holds blob versioned hash for batch)", "batch_index", batchIndex) + } + // tip and cap tip, gasFeeCap, blobFee, head, err := r.GetGasTipAndCap() if err != nil { return fmt.Errorf("get gas tip and cap error:%v", err) } - // calldata encode - calldata, err := r.abi.Pack("commitBatch", rollupBatch, *signature) + // calldata encode — commitState reuses stored blob hash and must not carry blob data in tx + var calldata []byte + if useCommitState { + calldata, err = r.abi.Pack("commitState", rollupBatch, *signature) + } else { + calldata, err = r.abi.Pack("commitBatch", rollupBatch, *signature) + } if err != nil { return fmt.Errorf("pack calldata error:%v", err) } @@ -1192,8 +1207,13 @@ func (r *Rollup) rollup() error { return fmt.Errorf("failed to get next nonce") } - // Create and sign transaction - tx, err := r.createRollupTx(rpcRollupBatch, nonce, gas, tip, gasFeeCap, blobFee, calldata, head) + // Create and sign transaction (commitState is always type-2, never blob) + var tx *ethtypes.Transaction + if useCommitState { + tx, err = r.createDynamicFeeTx(nonce, gas, tip, gasFeeCap, calldata) + } else { + tx, err = r.createRollupTx(rpcRollupBatch, nonce, gas, tip, gasFeeCap, blobFee, calldata, head) + } if err != nil { return fmt.Errorf("failed to create rollup tx: %w", err) } @@ -1214,16 +1234,14 @@ func (r *Rollup) rollup() error { // Update pending state r.pendingTxs.SetPindex(batchIndex) r.pendingTxs.SetNonce(tx.Nonce()) - if err = r.pendingTxs.Add(signedTx); err != nil { - log.Error("Failed to track transaction", "error", err) - } + // SendTx already adds signedTx to the pending pool return nil } func (r *Rollup) getNextNonce() uint64 { - if r.pendingTxs.pnonce != 0 { - return r.pendingTxs.pnonce + 1 + if pn := r.pendingTxs.GetNonce(); pn != 0 { + return pn + 1 } nonce, err := r.L1Client.PendingNonceAt(context.Background(), r.WalletAddr()) @@ -1292,6 +1310,90 @@ func (r *Rollup) createDynamicFeeTx(nonce, gas uint64, tip, gasFeeCap *big.Int, }), nil } +// tryRebuildRollupCommitTx aligns an in-flight commit-like tx with L1 blob-hash state: +// - stored blob versioned hash set -> must use commitState (no blob), upgrading commitBatch / blob txs +// - stored hash cleared -> must use commitBatch (+ blob sidecar when present), downgrading stale commitState +// +// Returns handled=true when newTx must be used and the default ReSubmitTx copy path must be skipped. +func (r *Rollup) tryRebuildRollupCommitTx(tx *ethtypes.Transaction, tip, gasFeeCap, blobFeeCap *big.Int, head *ethtypes.Header) (newTx *ethtypes.Transaction, handled bool, err error) { + method := utils.ParseMethod(tx, r.abi) + if !constants.IsCommitLikeMethod(method) { + return nil, false, nil + } + batchIndex := utils.ParseParentBatchIndex(tx.Data()) + 1 + stored, err := r.Rollup.BatchBlobVersionedHashes(nil, big.NewInt(int64(batchIndex))) + if err != nil { + return nil, false, err + } + + if stored != ([32]byte{}) { + if method == constants.MethodCommitState && tx.Type() == ethtypes.DynamicFeeTxType { + return nil, false, nil + } + } else { + if method == constants.MethodCommitBatch { + return nil, false, nil + } + } + + var rpcRollupBatch *eth.RPCRollupBatch + exist := true + if r.cfg.SealBatch { + rpcRollupBatch, err = r.batchCache.Get(batchIndex) + } else { + rpcRollupBatch, exist = r.batchCacheLegacy.Get(batchIndex) + } + if err != nil || !exist || rpcRollupBatch == nil { + return nil, true, fmt.Errorf("cannot rebuild rollup commit tx: batch %d not in cache (err=%v)", batchIndex, err) + } + signature, err := r.buildSignatureInput(rpcRollupBatch) + if err != nil { + return nil, true, err + } + rollupBatch := bindings.IRollupBatchDataInput{ + Version: uint8(rpcRollupBatch.Version), + ParentBatchHeader: rpcRollupBatch.ParentBatchHeader, + LastBlockNumber: rpcRollupBatch.LastBlockNumber, + NumL1Messages: rpcRollupBatch.NumL1Messages, + PrevStateRoot: rpcRollupBatch.PrevStateRoot, + PostStateRoot: rpcRollupBatch.PostStateRoot, + WithdrawalRoot: rpcRollupBatch.WithdrawRoot, + } + + if stored != ([32]byte{}) { + calldata, err := r.abi.Pack("commitState", rollupBatch, *signature) + if err != nil { + return nil, true, err + } + newTx, err = r.createDynamicFeeTx(tx.Nonce(), tx.Gas(), tip, gasFeeCap, calldata) + if err != nil { + return nil, true, err + } + log.Info("Rebuilt pending rollup tx as commitState (stored blob hash on L1)", + "batch_index", batchIndex, + "old_tx_type", tx.Type(), + "old_method", method) + return newTx, true, nil + } + + calldata, err := r.abi.Pack("commitBatch", rollupBatch, *signature) + if err != nil { + return nil, true, err + } + if head == nil { + return nil, true, fmt.Errorf("cannot rebuild commitBatch: nil L1 head") + } + newTx, err = r.createRollupTx(rpcRollupBatch, tx.Nonce(), tx.Gas(), tip, gasFeeCap, blobFeeCap, calldata, head) + if err != nil { + return nil, true, err + } + log.Info("Rebuilt pending rollup tx as commitBatch (no stored blob hash on L1)", + "batch_index", batchIndex, + "old_tx_type", tx.Type(), + "old_method", method) + return newTx, true, nil +} + func (r *Rollup) logTxInfo(tx *ethtypes.Transaction, batchIndex uint64) { log.Info("Rollup transaction created", "batch_index", batchIndex, @@ -1696,47 +1798,82 @@ func (r *Rollup) ReSubmitTx(resend bool, tx *ethtypes.Transaction) (*ethtypes.Tr } var newTx *ethtypes.Transaction - switch tx.Type() { - case ethtypes.DynamicFeeTxType: - newTx = ethtypes.NewTx(ðtypes.DynamicFeeTx{ - ChainID: tx.ChainId(), - To: tx.To(), - Nonce: tx.Nonce(), - GasFeeCap: gasFeeCap, - GasTipCap: tip, - Gas: tx.Gas(), - Value: tx.Value(), - Data: tx.Data(), - }) - case ethtypes.BlobTxType: - sidecar := tx.BlobTxSidecar() - version := types.DetermineBlobVersion(head, r.chainId.Uint64()) - if sidecar != nil { - if sidecar.Version == ethtypes.BlobSidecarVersion0 && version == ethtypes.BlobSidecarVersion1 { - err = types.BlobSidecarVersionToV1(sidecar) - if err != nil { - return nil, err + if rebuilt, ok, err := r.tryRebuildRollupCommitTx(tx, tip, gasFeeCap, blobFeeCap, head); ok { + if err != nil { + return nil, err + } + newTx = rebuilt + } else if err != nil { + return nil, err + } else { + switch tx.Type() { + case ethtypes.DynamicFeeTxType: + newTx = ethtypes.NewTx(ðtypes.DynamicFeeTx{ + ChainID: tx.ChainId(), + To: tx.To(), + Nonce: tx.Nonce(), + GasFeeCap: gasFeeCap, + GasTipCap: tip, + Gas: tx.Gas(), + Value: tx.Value(), + Data: tx.Data(), + }) + case ethtypes.BlobTxType: + sidecar := tx.BlobTxSidecar() + version := types.DetermineBlobVersion(head, r.chainId.Uint64()) + if sidecar != nil { + if sidecar.Version == ethtypes.BlobSidecarVersion0 && version == ethtypes.BlobSidecarVersion1 { + err = types.BlobSidecarVersionToV1(sidecar) + if err != nil { + return nil, err + } } } - } - - newTx = ethtypes.NewTx(ðtypes.BlobTx{ - ChainID: uint256.MustFromBig(tx.ChainId()), - Nonce: tx.Nonce(), - GasTipCap: uint256.MustFromBig(tip), - GasFeeCap: uint256.MustFromBig(gasFeeCap), - Gas: tx.Gas(), - To: *tx.To(), - Value: uint256.MustFromBig(tx.Value()), - Data: tx.Data(), - BlobFeeCap: uint256.MustFromBig(blobFeeCap), - BlobHashes: tx.BlobHashes(), - Sidecar: sidecar, - }) - - default: - return nil, fmt.Errorf("replace unknown tx type:%v", tx.Type()) + newTx = ethtypes.NewTx(ðtypes.BlobTx{ + ChainID: uint256.MustFromBig(tx.ChainId()), + Nonce: tx.Nonce(), + GasTipCap: uint256.MustFromBig(tip), + GasFeeCap: uint256.MustFromBig(gasFeeCap), + Gas: tx.Gas(), + To: *tx.To(), + Value: uint256.MustFromBig(tx.Value()), + Data: tx.Data(), + BlobFeeCap: uint256.MustFromBig(blobFeeCap), + BlobHashes: tx.BlobHashes(), + Sidecar: sidecar, + }) + + default: + return nil, fmt.Errorf("replace unknown tx type:%v", tx.Type()) + + } + } + + // Original tx was not a blob tx, but rebuild (e.g. commitState -> commitBatch) may produce a blob tx. + // In that case blobFeeCap was never bumped above (bump only ran for BlobTxType). Apply the same bump as for blob replacements. + if !resend && newTx != nil && newTx.Type() == ethtypes.BlobTxType && tx.Type() != ethtypes.BlobTxType { + embeddedCap := newTx.BlobGasFeeCap() + if embeddedCap != nil { + bumpedBlob := calcThresholdValue(embeddedCap, true) + if bumpedBlob.Cmp(blobFeeCap) > 0 { + blobFeeCap = bumpedBlob + } + sidecar := newTx.BlobTxSidecar() + newTx = ethtypes.NewTx(ðtypes.BlobTx{ + ChainID: uint256.MustFromBig(newTx.ChainId()), + Nonce: newTx.Nonce(), + GasTipCap: uint256.MustFromBig(newTx.GasTipCap()), + GasFeeCap: uint256.MustFromBig(newTx.GasFeeCap()), + Gas: newTx.Gas(), + To: *newTx.To(), + Value: uint256.MustFromBig(newTx.Value()), + Data: newTx.Data(), + BlobFeeCap: uint256.MustFromBig(blobFeeCap), + BlobHashes: newTx.BlobHashes(), + Sidecar: sidecar, + }) + } } // weiToGwei converts wei value to gwei string representation @@ -1869,18 +2006,6 @@ func (r *Rollup) InitFeeMetricsSum() error { return nil } -// ClearPendingTxs clears all pending transactions -func (p *PendingTxs) ClearPendingTxs() { - p.txinfos = make(map[common.Hash]*types.TxRecord) -} - -// MarkUnconfirmed marks a transaction as unconfirmed in the pending pool -func (p *PendingTxs) MarkUnconfirmed(hash common.Hash) { - if txRecord, ok := p.txinfos[hash]; ok { - txRecord.Confirmed = false - } -} - // CancelTx creates a new transaction with empty calldata to cancel the original transaction func (r *Rollup) CancelTx(tx *ethtypes.Transaction) (*ethtypes.Transaction, error) { if tx == nil { diff --git a/tx-submitter/services/rollup_handle_test.go b/tx-submitter/services/rollup_handle_test.go index 46596527..8842fed4 100644 --- a/tx-submitter/services/rollup_handle_test.go +++ b/tx-submitter/services/rollup_handle_test.go @@ -118,7 +118,7 @@ func setupTestRollup(t *testing.T) (*Rollup, *mock.L1ClientWrapper, *mock.L2Clie ) // Initialize pending transactions - rollup.pendingTxs = NewPendingTxs([]byte{}, []byte{}, mockJournal) + rollup.pendingTxs = NewPendingTxs(mockJournal) // Initialize reorg detector // Use the mock implementation for controlled testing diff --git a/tx-submitter/utils/utils.go b/tx-submitter/utils/utils.go index 06de87e6..7001e5da 100644 --- a/tx-submitter/utils/utils.go +++ b/tx-submitter/utils/utils.go @@ -15,11 +15,9 @@ import ( ntype "morph-l2/node/types" "github.com/morph-l2/go-ethereum/accounts/abi" - "github.com/morph-l2/go-ethereum/common" "github.com/morph-l2/go-ethereum/common/hexutil" "github.com/morph-l2/go-ethereum/core/types" "github.com/morph-l2/go-ethereum/log" - "github.com/morph-l2/go-ethereum/rpc" ) // Loop Run the f func periodically. @@ -71,12 +69,42 @@ func ParseParentBatchIndex(calldata []byte) uint64 { /// * batchIndex 8 uint64 1 The index of the batch /// * l1MessagePopped 8 uint64 9 Number of L1 messages popped in the batch - abi, _ := bindings.RollupMetaData.GetAbi() - parms, _ := abi.Methods["commitBatch"].Inputs.UnpackValues(calldata[4:]) + if len(calldata) < 4 { + return 0 + } + rollupAbi, err := bindings.RollupMetaData.GetAbi() + if err != nil { + return 0 + } + sel := calldata[:4] + var method abi.Method + var ok bool + if bytes.Equal(sel, rollupAbi.Methods["commitState"].ID) { + method, ok = rollupAbi.Methods["commitState"] + } else if bytes.Equal(sel, rollupAbi.Methods["commitBatch"].ID) { + method, ok = rollupAbi.Methods["commitBatch"] + } else { + // Unknown selector: keep legacy behavior (unpack as commitBatch). Matches older fixtures and + // any tx whose first tuple matches BatchDataInput layout even if the selector differs. + method, ok = rollupAbi.Methods["commitBatch"] + } + if !ok { + return 0 + } + parms, err := method.Inputs.UnpackValues(calldata[4:]) + if err != nil || len(parms) == 0 { + return 0 + } v := reflect.ValueOf(parms[0]) pbh := v.FieldByName("ParentBatchHeader") - batchIndex := binary.BigEndian.Uint64(pbh.Bytes()[1:9]) - return batchIndex + if !pbh.IsValid() { + return 0 + } + b := pbh.Bytes() + if len(b) < 9 { + return 0 + } + return binary.BigEndian.Uint64(b[1:9]) } // SetFBatchIndex sets the batch index in the calldata while preserving all other data @@ -119,51 +147,6 @@ func SetFBatchIndex(calldata []byte, batchIndex uint64) error { return nil } -// ParseL1Mempool parses the L1 mempool and returns the transactions. -func ParseL1Mempool(rpc *rpc.Client, addr common.Address) ([]*types.Transaction, error) { - - var result map[string]map[string]*types.Transaction - err := rpc.Call(&result, "txpool_contentFrom", addr) - if err != nil { - return nil, fmt.Errorf("failed to get txpool content: %v", err) - } - - var txs []*types.Transaction - - // get pending txs - if pendingTxs, ok := result["pending"]; ok { - for _, tx := range pendingTxs { - txs = append(txs, tx) - } - } - - // get queued txs - if pendingTxs, ok := result["queued"]; ok { - for _, tx := range pendingTxs { - txs = append(txs, tx) - } - } - - return txs, nil - -} - -func ParseMempoolLatestBatchIndex(id []byte, txs []*types.Transaction) uint64 { - - var res uint64 - for _, tx := range txs { - if bytes.Equal(tx.Data()[:4], id) { - pindex := ParseParentBatchIndex(tx.Data()) - if pindex > res { - res = pindex - } - } - } - - return res + 1 - -} - func ParseBusinessInfo(tx *types.Transaction, a *abi.ABI) []interface{} { // var method string // var batchIndex uint64 @@ -178,6 +161,13 @@ func ParseBusinessInfo(tx *types.Transaction, a *abi.ABI) []interface{} { "method", method, "batchIndex", batchIndex, ) + } else if bytes.Equal(id, a.Methods["commitState"].ID) { + method := "commitState" + batchIndex := ParseParentBatchIndex(tx.Data()) + 1 + res = append(res, + "method", method, + "batchIndex", batchIndex, + ) } else if bytes.Equal(id, a.Methods["finalizeBatch"].ID) { method := "finalizeBatch" parms, err := a.Methods["finalizeBatch"].Inputs.Unpack(tx.Data()[4:]) @@ -205,6 +195,8 @@ func ParseMethod(tx *types.Transaction, a *abi.ABI) string { id := tx.Data()[:4] if bytes.Equal(id, a.Methods["commitBatch"].ID) { return "commitBatch" + } else if bytes.Equal(id, a.Methods["commitState"].ID) { + return "commitState" } else if bytes.Equal(id, a.Methods["finalizeBatch"].ID) { return "finalizeBatch" } else {