diff --git a/cmd/bee/cmd/cmd.go b/cmd/bee/cmd/cmd.go index 4c7d4fd712e..8fbafe55ddf 100644 --- a/cmd/bee/cmd/cmd.go +++ b/cmd/bee/cmd/cmd.go @@ -84,6 +84,8 @@ const ( optionSkipPostageSnapshot = "skip-postage-snapshot" optionNameMinimumGasTipCap = "minimum-gas-tip-cap" optionNameGasLimitFallback = "gas-limit-fallback" + optionNameMaxTxCost = "max-tx-cost" + optionNameMaxTxCostTolerancePercent = "max-tx-cost-tolerance-percent" optionNameP2PWSSEnable = "p2p-wss-enable" optionP2PWSSAddr = "p2p-wss-addr" optionNATWSSAddr = "nat-wss-addr" @@ -341,6 +343,21 @@ func (c *command) setAllFlags(cmd *cobra.Command) { cmd.Flags().Bool(optionSkipPostageSnapshot, false, "skip postage snapshot") cmd.Flags().Uint64(optionNameMinimumGasTipCap, 0, "minimum gas tip cap in wei for transactions, 0 means use suggested gas tip cap") cmd.Flags().Uint64(optionNameGasLimitFallback, 500_000, "gas limit fallback when estimation fails for contract transactions") + // Default 1.5e15 wei (= 0.0015 xDAI = 1.5 mxDAI) is calibrated against the + // upper-bound formula used by isTxCostAcceptable: + // estimated = gasUnits × (2·baseFee + boostedTip) + // where gasUnits = 500_000 (gas-limit-fallback when not overridden via ctx), + // boostedTip = suggestedTip × 1.5 (BoostTipPercent=50, ≈0.15 gwei on Gnosis). + // With max-tx-cost-tolerance-percent=10 the effective threshold is 1.65e15 wei, + // which permits a gasFeeCap of up to ≈3.3 gwei → roughly baseFee ≤ ~1.5 gwei. + // Observed network-wide effective gas prices over the last 10 rounds were + // 0.3–0.5 gwei in normal conditions and peaked at ≈1.5 gwei during a spike; + // this default keeps commit/reveal flowing through ordinary operation and + // only rejects sustained spikes above ≈1.5 gwei baseFee. Claim has its own + // override path (see ClaimOpts) and is unaffected by spikes when the + // expected reward covers the upper-bound cost. + cmd.Flags().Uint64(optionNameMaxTxCost, 1_500_000_000_000_000, "maximum total cost in wei per redistribution transaction (gas limit × max fee per gas); 0 means no limit. Default 1.5e15 wei (= 0.0015 xDAI = 1.5 mxDAI) is calibrated for typical Gnosis baseFee up to ~1.5 gwei with default gas-limit-fallback=500000 and 10% tolerance.") + cmd.Flags().Uint64(optionNameMaxTxCostTolerancePercent, 10, "percentage above max-tx-cost within which the transaction is still allowed (effective threshold = max-tx-cost × (1 + tol/100))") cmd.Flags().Int(optionNameTransactionRetryMaxRetries, 5, "maximum broadcast attempts for SendWithRetry (e.g. redistribution txs)") cmd.Flags().Duration(optionNameTransactionRetryDelay, time.Minute, "how long to wait for a receipt before escalating fees in transactions with retry") cmd.Flags().Int(optionNameTransactionRetryGasIncreasePercent, 20, "percent increase applied to priority fee after each transactions with retry escalation step") diff --git a/cmd/bee/cmd/start.go b/cmd/bee/cmd/start.go index e4b1e289065..96326db2513 100644 --- a/cmd/bee/cmd/start.go +++ b/cmd/bee/cmd/start.go @@ -362,6 +362,8 @@ func buildBeeNode(ctx context.Context, c *command, cmd *cobra.Command, logger lo WarmupTime: c.config.GetDuration(optionWarmUpTime), WelcomeMessage: c.config.GetString(optionWelcomeMessage), WhitelistedWithdrawalAddress: c.config.GetStringSlice(optionNameWhitelistedWithdrawalAddress), + MaxTxCost: c.config.GetUint64(optionNameMaxTxCost), + MaxTxCostTolerancePercent: c.config.GetUint64(optionNameMaxTxCostTolerancePercent), }) return b, err diff --git a/go.mod b/go.mod index c8b0cc496f5..4c090c92a6d 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/ethereum/go-ethereum v1.16.9 github.com/ethersphere/batch-archive v0.0.6 github.com/ethersphere/go-price-oracle-abi v0.6.9 - github.com/ethersphere/go-storage-incentives-abi v0.9.4 + github.com/ethersphere/go-storage-incentives-abi v0.9.3-rc4 github.com/ethersphere/go-sw3-abi v0.6.9 github.com/ethersphere/langos v1.0.0 github.com/go-playground/validator/v10 v10.19.0 diff --git a/go.sum b/go.sum index a8510646f91..d0b3a86bd67 100644 --- a/go.sum +++ b/go.sum @@ -254,8 +254,8 @@ github.com/ethersphere/batch-archive v0.0.6 h1:Nt9mundj8CXT42qgGdq1sqKIVOk4KkKC4 github.com/ethersphere/batch-archive v0.0.6/go.mod h1:41BPb192NoK9CYjNB8BAE1J2MtiI/5aq0Wtas5O7A7Q= github.com/ethersphere/go-price-oracle-abi v0.6.9 h1:bseen6he3PZv5GHOm+KD6s4awaFmVSD9LFx+HpB6rCU= github.com/ethersphere/go-price-oracle-abi v0.6.9/go.mod h1:sI/Qj4/zJ23/b1enzwMMv0/hLTpPNVNacEwCWjo6yBk= -github.com/ethersphere/go-storage-incentives-abi v0.9.4 h1:mSIWXQXg5OQmH10QvXMV5w0vbSibFMaRlBL37gPLTM0= -github.com/ethersphere/go-storage-incentives-abi v0.9.4/go.mod h1:SXvJVtM4sEsaSKD0jc1ClpDLw8ErPoROZDme4Wrc/Nc= +github.com/ethersphere/go-storage-incentives-abi v0.9.3-rc4 h1:YK9FpiQz29ctU5V46CuwMt+4X5Xn8FTBwy6E2v/ix8s= +github.com/ethersphere/go-storage-incentives-abi v0.9.3-rc4/go.mod h1:SXvJVtM4sEsaSKD0jc1ClpDLw8ErPoROZDme4Wrc/Nc= github.com/ethersphere/go-sw3-abi v0.6.9 h1:TnWLnYkWE5UvC17mQBdUmdkzhPhO8GcqvWy4wvd1QJQ= github.com/ethersphere/go-sw3-abi v0.6.9/go.mod h1:BmpsvJ8idQZdYEtWnvxA8POYQ8Rl/NhyCdF0zLMOOJU= github.com/ethersphere/langos v1.0.0 h1:NBtNKzXTTRSue95uOlzPN4py7Aofs0xWPzyj4AI1Vcc= diff --git a/pkg/api/api_test.go b/pkg/api/api_test.go index 4dd7aa1d3f7..86fdc14c92b 100644 --- a/pkg/api/api_test.go +++ b/pkg/api/api_test.go @@ -752,7 +752,7 @@ func (m *mockContract) IsWinner(context.Context) (bool, error) { return false, nil } -func (m *mockContract) Claim(context.Context, redistribution.ChunkInclusionProofs) (common.Hash, error) { +func (m *mockContract) Claim(context.Context, redistribution.ChunkInclusionProofs, *redistribution.ClaimOpts) (common.Hash, error) { m.mtx.Lock() defer m.mtx.Unlock() m.callsList = append(m.callsList, claimCall) diff --git a/pkg/node/node.go b/pkg/node/node.go index 7983bd883e6..f2ba2ef9f68 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -199,6 +199,8 @@ type Options struct { WarmupTime time.Duration WelcomeMessage string WhitelistedWithdrawalAddress []string + MaxTxCost uint64 + MaxTxCostTolerancePercent uint64 } func txRetryConfigFromOptions(o *Options) transaction.TransactionsRetryConfig { @@ -1350,6 +1352,7 @@ func NewBee( if agent != nil { apiService.MustRegisterMetrics(agent.Metrics()...) + apiService.MustRegisterMetrics(redistribution.Metrics()...) } apiService.MustRegisterMetrics(pushSyncProtocol.Metrics()...) diff --git a/pkg/postage/postagecontract/contract.go b/pkg/postage/postagecontract/contract.go index 62242c9630e..433469c6cb7 100644 --- a/pkg/postage/postagecontract/contract.go +++ b/pkg/postage/postagecontract/contract.go @@ -46,6 +46,7 @@ type Interface interface { TopUpBatch(ctx context.Context, batchID []byte, topupBalance *big.Int) (common.Hash, error) DiluteBatch(ctx context.Context, batchID []byte, newDepth uint8) (common.Hash, error) Paused(ctx context.Context) (bool, error) + ExpectedReward(ctx context.Context) (*big.Int, error) PostageBatchExpirer } @@ -338,6 +339,15 @@ func (c *postageContract) getProperty(ctx context.Context, propertyName string, return nil } +// ExpectedReward returns the current redistribution pot (totalPot) from the postage stamp contract. +func (c *postageContract) ExpectedReward(ctx context.Context) (*big.Int, error) { + pot := new(big.Int) + if err := c.getProperty(ctx, "totalPot", pot); err != nil { + return nil, fmt.Errorf("totalPot: %w", err) + } + return pot, nil +} + func (c *postageContract) getMinInitialBalance(ctx context.Context) (uint64, error) { var lastPrice uint64 err := c.getProperty(ctx, "lastPrice", &lastPrice) @@ -556,6 +566,10 @@ func (m *noOpPostageContract) ExpireBatches(context.Context) error { return ErrChainDisabled } +func (m *noOpPostageContract) ExpectedReward(context.Context) (*big.Int, error) { + return nil, ErrChainDisabled +} + func LookupERC20Address(ctx context.Context, transactionService transaction.Service, postageStampContractAddress common.Address, postageStampContractABI abi.ABI, chainEnabled bool) (common.Address, error) { if !chainEnabled { return common.Address{}, nil diff --git a/pkg/postage/postagecontract/contract_test.go b/pkg/postage/postagecontract/contract_test.go index 6592a7f23a7..66d56f2c455 100644 --- a/pkg/postage/postagecontract/contract_test.go +++ b/pkg/postage/postagecontract/contract_test.go @@ -74,7 +74,7 @@ func TestCreateBatch(t *testing.T) { postageStampContractABI, bzzTokenAddress, transactionMock.New( - transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { + transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest, _ ...transaction.RetryOption) (common.Hash, *types.Receipt, error) { switch *request.To { case bzzTokenAddress: return txHashApprove, &types.Receipt{Status: 1}, nil @@ -308,7 +308,7 @@ func TestTopUpBatch(t *testing.T) { postageStampContractABI, bzzTokenAddress, transactionMock.New( - transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { + transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest, _ ...transaction.RetryOption) (common.Hash, *types.Receipt, error) { switch *request.To { case bzzTokenAddress: return txHashApprove, &types.Receipt{Status: 1}, nil @@ -468,7 +468,7 @@ func TestDiluteBatch(t *testing.T) { postageStampContractABI, bzzTokenAddress, transactionMock.New( - transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { + transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest, _ ...transaction.RetryOption) (common.Hash, *types.Receipt, error) { if *request.To == postageStampAddress { if bytes.Equal(expectedCallDataForExpireLimitedBatches[:32], request.Data[:32]) { return txHashApprove, &types.Receipt{Status: 1}, nil @@ -630,7 +630,7 @@ func TestBatchExpirer(t *testing.T) { } } return nil, errors.New("unexpected call") - }), transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { + }), transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest, _ ...transaction.RetryOption) (common.Hash, *types.Receipt, error) { return common.Hash{}, &types.Receipt{Status: 1}, nil }), ), @@ -768,7 +768,7 @@ func TestBatchExpirer(t *testing.T) { } } return nil, errors.New("unexpected call") - }), transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { + }), transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest, _ ...transaction.RetryOption) (common.Hash, *types.Receipt, error) { if *request.To == postageContractAddress { if bytes.Equal(expectedCallDataForExpireLimitedBatches[:32], request.Data[:32]) { return common.Hash{}, nil, fmt.Errorf("some error") @@ -891,7 +891,7 @@ func TestBatchExpirer(t *testing.T) { } } return nil, errors.New("unexpected call") - }), transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { + }), transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest, _ ...transaction.RetryOption) (common.Hash, *types.Receipt, error) { return common.Hash{}, &types.Receipt{Status: 0}, transaction.ErrTransactionReverted }), ), diff --git a/pkg/postage/postagecontract/mock/contract.go b/pkg/postage/postagecontract/mock/contract.go index c670cf62dce..fdaeceec0d1 100644 --- a/pkg/postage/postagecontract/mock/contract.go +++ b/pkg/postage/postagecontract/mock/contract.go @@ -13,11 +13,12 @@ import ( ) type contractMock struct { - createBatch func(ctx context.Context, initialBalance *big.Int, depth uint8, immutable bool, label string) (common.Hash, []byte, error) - topupBatch func(ctx context.Context, id []byte, amount *big.Int) (common.Hash, error) - diluteBatch func(ctx context.Context, id []byte, newDepth uint8) (common.Hash, error) - expireBatches func(ctx context.Context) error - paused func(ctx context.Context) (bool, error) + createBatch func(ctx context.Context, initialBalance *big.Int, depth uint8, immutable bool, label string) (common.Hash, []byte, error) + topupBatch func(ctx context.Context, id []byte, amount *big.Int) (common.Hash, error) + diluteBatch func(ctx context.Context, id []byte, newDepth uint8) (common.Hash, error) + expireBatches func(ctx context.Context) error + paused func(ctx context.Context) (bool, error) + expectedReward func(ctx context.Context) (*big.Int, error) } func (c *contractMock) CreateBatch(ctx context.Context, initialBalance *big.Int, depth uint8, immutable bool, label string) (common.Hash, []byte, error) { @@ -40,6 +41,13 @@ func (s *contractMock) Paused(ctx context.Context) (bool, error) { return s.paused(ctx) } +func (c *contractMock) ExpectedReward(ctx context.Context) (*big.Int, error) { + if c.expectedReward != nil { + return c.expectedReward(ctx) + } + return big.NewInt(1_000_000), nil +} + // Option is an option passed to New type Option func(*contractMock) @@ -83,3 +91,9 @@ func WithPaused(f func(ctx context.Context) (bool, error)) Option { mock.paused = f } } + +func WithExpectedRewardFunc(f func(ctx context.Context) (*big.Int, error)) Option { + return func(m *contractMock) { + m.expectedReward = f + } +} diff --git a/pkg/storageincentives/agent.go b/pkg/storageincentives/agent.go index 5142e97836e..bb9341851f3 100644 --- a/pkg/storageincentives/agent.go +++ b/pkg/storageincentives/agent.go @@ -41,6 +41,10 @@ const ( // average tx gas used by transactions issued from agent avgTxGas = 250_000 + + // forceClaimBlocksBeforeEnd is how many blocks before round end claim may + // bypass max-tx-cost when economics justify it (see redistribution.ClaimOpts). + forceClaimBlocksBeforeEnd = 10 ) type ChainBackend interface { @@ -59,8 +63,9 @@ type Agent struct { metrics metrics backend ChainBackend blocksPerRound uint64 + blockTime time.Duration contract redistribution.Contract - batchExpirer postagecontract.PostageBatchExpirer + postageContract postagecontract.Interface redistributionStatuser staking.RedistributionStatuser store storer.Reserve fullSyncedFunc func() bool @@ -78,7 +83,7 @@ func New(overlay swarm.Address, ethAddress common.Address, backend ChainBackend, contract redistribution.Contract, - batchExpirer postagecontract.PostageBatchExpirer, + postageContract postagecontract.Interface, redistributionStatuser staking.RedistributionStatuser, store storer.Reserve, fullSyncedFunc func() bool, @@ -98,10 +103,11 @@ func New(overlay swarm.Address, backend: backend, logger: logger.WithName(loggerName).Register(), contract: contract, - batchExpirer: batchExpirer, + postageContract: postageContract, store: store, fullSyncedFunc: fullSyncedFunc, blocksPerRound: blocksPerRound, + blockTime: blockTime, quit: make(chan struct{}), redistributionStatuser: redistributionStatuser, health: health, @@ -116,7 +122,7 @@ func New(overlay swarm.Address, a.state = state a.wg.Add(1) - go a.start(blockTime, a.blocksPerRound, blocksPerPhase) + go a.start(a.blockTime, a.blocksPerRound, blocksPerPhase) return a, nil } @@ -311,7 +317,7 @@ func (a *Agent) handleReveal(ctx context.Context, round uint64) error { a.metrics.ErrReveal.Inc() return err } - a.state.AddFee(ctx, txHash) + a.state.AddRoundFee(ctx, round, txHash) a.state.SetHasRevealed(round) @@ -344,7 +350,7 @@ func (a *Agent) handleClaim(ctx context.Context, round uint64) error { // In case when there are too many expired batches, Claim trx could runs out of gas. // To prevent this, node should first expire batches before Claiming a reward. - err = a.batchExpirer.ExpireBatches(ctx) + err = a.postageContract.ExpireBatches(ctx) if err != nil { a.logger.Info("expire batches failed", "err", err) // Even when error happens, proceed with claim handler @@ -353,7 +359,7 @@ func (a *Agent) handleClaim(ctx context.Context, round uint64) error { errBalance := a.state.SetBalance(ctx) if errBalance != nil { - a.logger.Info("could not set balance", "err", err) + a.logger.Info("could not set balance", "err", errBalance) } sampleData, exists := a.state.SampleData(round - 1) @@ -371,8 +377,33 @@ func (a *Agent) handleClaim(ctx context.Context, round uint64) error { return fmt.Errorf("making inclusion proofs: %w", err) } - txHash, err := a.contract.Claim(ctx, proofs) + claimCtx := ctx + phaseEndBlock := (round+1)*a.blocksPerRound - 1 + if rem := int64(phaseEndBlock) - int64(a.state.currentBlock()); rem > 0 { + var cancel context.CancelFunc + claimCtx, cancel = context.WithDeadline(ctx, time.Now().Add(time.Duration(rem)*a.blockTime)) + defer cancel() + } + + reward, err := a.postageContract.ExpectedReward(ctx) + if err != nil { + a.logger.Warning("could not estimate claim reward, override max_tx_cost option will be disabled", "error", err) + } + + opts := &redistribution.ClaimOpts{ + OverrideAfterBlock: (round+1)*a.blocksPerRound - forceClaimBlocksBeforeEnd, + CurrentBlockFn: func() uint64 { return a.state.currentBlock() }, + ExpectedReward: reward, + RoundFees: a.state.RoundFees(round), + } + + txHash, err := a.contract.Claim(claimCtx, proofs, opts) if err != nil { + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + a.logger.Info("claim aborted by context", "round", round, "err", err) + a.metrics.SkippedExpensivePhase.Inc() + return nil + } a.metrics.ErrClaim.Inc() return fmt.Errorf("claiming win: %w", err) } @@ -382,11 +413,11 @@ func (a *Agent) handleClaim(ctx context.Context, round uint64) error { if errBalance == nil { errReward := a.state.CalculateWinnerReward(ctx) if errReward != nil { - a.logger.Info("calculate winner reward", "err", err) + a.logger.Info("calculate winner reward", "err", errReward) } } - a.state.AddFee(ctx, txHash) + a.state.AddRoundFee(ctx, round, txHash) return nil } @@ -539,7 +570,7 @@ func (a *Agent) commit(ctx context.Context, sample SampleData, round uint64) err a.metrics.ErrCommit.Inc() return err } - a.state.AddFee(ctx, txHash) + a.state.AddRoundFee(ctx, round, txHash) a.state.SetCommitKey(round, key) diff --git a/pkg/storageincentives/agent_test.go b/pkg/storageincentives/agent_test.go index 6449ede9059..cbc28d8a813 100644 --- a/pkg/storageincentives/agent_test.go +++ b/pkg/storageincentives/agent_test.go @@ -307,7 +307,7 @@ func (m *mockContract) IsWinner(context.Context) (bool, error) { return false, nil } -func (m *mockContract) Claim(context.Context, redistribution.ChunkInclusionProofs) (common.Hash, error) { +func (m *mockContract) Claim(context.Context, redistribution.ChunkInclusionProofs, *redistribution.ClaimOpts) (common.Hash, error) { m.mtx.Lock() defer m.mtx.Unlock() m.callsList = append(m.callsList, claimCall) diff --git a/pkg/storageincentives/metrics.go b/pkg/storageincentives/metrics.go index b376d9d20b2..3db40b39caa 100644 --- a/pkg/storageincentives/metrics.go +++ b/pkg/storageincentives/metrics.go @@ -31,6 +31,9 @@ type metrics struct { ErrClaim prometheus.Counter ErrWinner prometheus.Counter ErrCheckIsPlaying prometheus.Counter + + // cost control metrics + SkippedExpensivePhase prometheus.Counter } func newMetrics() metrics { @@ -137,6 +140,12 @@ func newMetrics() metrics { Name: "is_playing_errors", Help: "total neighborhood selected errors while processing", }), + SkippedExpensivePhase: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "skipped_expensive_phase", + Help: "Count of phases skipped because estimated tx cost exceeded configured limit.", + }), } } diff --git a/pkg/storageincentives/redistribution/redistribution.go b/pkg/storageincentives/redistribution/redistribution.go index 98cf737d3fe..8fb09b280cb 100644 --- a/pkg/storageincentives/redistribution/redistribution.go +++ b/pkg/storageincentives/redistribution/redistribution.go @@ -12,22 +12,32 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" "github.com/ethersphere/bee/v2/pkg/log" - "github.com/ethersphere/bee/v2/pkg/sctx" "github.com/ethersphere/bee/v2/pkg/swarm" "github.com/ethersphere/bee/v2/pkg/transaction" ) const ( - loggerName = "redistributionContract" - // BoostTipPercent is used where the node sends transactions without retry. + loggerName = "redistributionContract" BoostTipPercent = 50 + + minEstimatedGasLimit = 250_000 ) +// ClaimOpts configures optional claim behaviour: after OverrideAfterBlock (absolute +// chain block number), if ExpectedReward covers upper-bound claim cost plus +// RoundFees, the max-tx-cost limit is bypassed for that send attempt. +type ClaimOpts struct { + OverrideAfterBlock uint64 + CurrentBlockFn func() uint64 + ExpectedReward *big.Int + RoundFees *big.Int +} + type Contract interface { ReserveSalt(context.Context) ([]byte, error) IsPlaying(context.Context, uint8) (bool, error) IsWinner(context.Context) (bool, error) - Claim(context.Context, ChunkInclusionProofs) (common.Hash, error) + Claim(context.Context, ChunkInclusionProofs, *ClaimOpts) (common.Hash, error) Commit(context.Context, []byte, uint64) (common.Hash, error) Reveal(context.Context, uint8, []byte, []byte) (common.Hash, error) } @@ -101,8 +111,12 @@ func (c *contract) IsWinner(ctx context.Context) (isWinner bool, err error) { return results[0].(bool), nil } -// Claim sends a transaction to blockchain if a win is claimed. -func (c *contract) Claim(ctx context.Context, proofs ChunkInclusionProofs) (common.Hash, error) { +// Claim sends a transaction to blockchain if a win is claimed. When opts is +// non-nil and the configured max-tx-price would block the broadcast, +// canOverrideClaim is consulted: if the override block threshold has passed +// and ExpectedReward covers the estimated cost plus previous round fees, +// the price cap is bypassed. +func (c *contract) Claim(ctx context.Context, proofs ChunkInclusionProofs, opts *ClaimOpts) (txHash common.Hash, err error) { callData, err := c.incentivesContractABI.Pack("claim", proofs.A, proofs.B, proofs.C) if err != nil { return common.Hash{}, err @@ -110,20 +124,61 @@ func (c *contract) Claim(ctx context.Context, proofs ChunkInclusionProofs) (comm request := &transaction.TxRequest{ To: &c.incentivesContractAddress, Data: callData, - GasPrice: sctx.GetGasPrice(ctx), - GasLimit: max(sctx.GetGasLimit(ctx), c.gasLimit), - MinEstimatedGasLimit: 500_000, + GasLimit: c.gasLimit, + MinEstimatedGasLimit: minEstimatedGasLimit, Value: big.NewInt(0), Description: "claim win transaction", } - txHash, err := c.sendAndWait(ctx, request) + + retryOpts := []transaction.RetryOption{ + transaction.WithIgnoreMaxPrice(func(gasFeeCap *big.Int) bool { + return c.canOverrideClaim(opts, gasFeeCap) + }), + } + + txHash, err = c.sendAndWait(ctx, request, retryOpts...) if err != nil { return txHash, fmt.Errorf("claim: %w", err) } - return txHash, nil } +// canOverrideClaim decides whether the claim transaction should bypass the +// max-tx-price cap. gasFeeCap is the actual max fee per gas (wei) that the +// retry loop wants to use — it is provided by suggestGasFeeGasTipCapWithHistory +// so there is no redundant estimation. +func (c *contract) canOverrideClaim(opts *ClaimOpts, gasFeeCap *big.Int) bool { + if opts == nil || opts.OverrideAfterBlock == 0 || opts.CurrentBlockFn == nil || opts.RoundFees == nil { + return false + } + + if opts.CurrentBlockFn() < opts.OverrideAfterBlock { + return false + } + + if opts.ExpectedReward == nil || opts.ExpectedReward.Sign() <= 0 { + return false + } + + gasUnits := c.gasLimit + if gasUnits <= 0 { + gasUnits = minEstimatedGasLimit + } + + txCost := new(big.Int).Mul(gasFeeCap, big.NewInt(int64(gasUnits))) + totalSpent := new(big.Int).Add(txCost, opts.RoundFees) + if opts.ExpectedReward.Cmp(totalSpent) < 0 { + c.logger.Info("claim override: reward does not cover cost", + "tx_cost", txCost, + "round_fees", opts.RoundFees, + "total_spent", totalSpent, + "expected_reward", opts.ExpectedReward, + ) + return false + } + return true +} + // Commit submits the obfusHash hash by sending a transaction to the blockchain. func (c *contract) Commit(ctx context.Context, obfusHash []byte, round uint64) (common.Hash, error) { callData, err := c.incentivesContractABI.Pack("commit", common.BytesToHash(obfusHash), round) @@ -133,9 +188,8 @@ func (c *contract) Commit(ctx context.Context, obfusHash []byte, round uint64) ( request := &transaction.TxRequest{ To: &c.incentivesContractAddress, Data: callData, - GasPrice: sctx.GetGasPrice(ctx), - GasLimit: max(sctx.GetGasLimit(ctx), c.gasLimit), - MinEstimatedGasLimit: 500_000, + GasLimit: c.gasLimit, + MinEstimatedGasLimit: minEstimatedGasLimit, Value: big.NewInt(0), Description: "commit transaction", } @@ -156,9 +210,8 @@ func (c *contract) Reveal(ctx context.Context, storageDepth uint8, reserveCommit request := &transaction.TxRequest{ To: &c.incentivesContractAddress, Data: callData, - GasPrice: sctx.GetGasPrice(ctx), - GasLimit: max(sctx.GetGasLimit(ctx), c.gasLimit), - MinEstimatedGasLimit: 500_000, + GasLimit: c.gasLimit, + MinEstimatedGasLimit: minEstimatedGasLimit, Value: big.NewInt(0), Description: "reveal transaction", } @@ -190,7 +243,7 @@ func (c *contract) ReserveSalt(ctx context.Context) ([]byte, error) { return salt[:], nil } -func (c *contract) sendAndWait(ctx context.Context, request *transaction.TxRequest) (txHash common.Hash, err error) { +func (c *contract) sendAndWait(ctx context.Context, request *transaction.TxRequest, opts ...transaction.RetryOption) (txHash common.Hash, err error) { defer func() { err = c.txService.UnwrapABIError( ctx, @@ -200,14 +253,8 @@ func (c *contract) sendAndWait(ctx context.Context, request *transaction.TxReque ) }() - txHash, receipt, err := c.txService.SendWithRetry(ctx, request) - if err != nil { - return txHash, err - } - if receipt == nil { - return txHash, fmt.Errorf("missing receipt after send with retry") - } - return txHash, nil + txHash, _, err = c.txService.SendWithRetry(ctx, request, opts...) + return txHash, err } // callTx simulates a transaction based on tx request. diff --git a/pkg/storageincentives/redistribution/redistribution_test.go b/pkg/storageincentives/redistribution/redistribution_test.go index 85ec99b7b19..e9092c3ee02 100644 --- a/pkg/storageincentives/redistribution/redistribution_test.go +++ b/pkg/storageincentives/redistribution/redistribution_test.go @@ -11,6 +11,7 @@ import ( "errors" "fmt" "math/big" + "sync/atomic" "testing" "github.com/ethereum/go-ethereum/common" @@ -24,9 +25,12 @@ import ( transactionMock "github.com/ethersphere/bee/v2/pkg/transaction/mock" "github.com/ethersphere/bee/v2/pkg/util/abiutil" "github.com/ethersphere/bee/v2/pkg/util/testutil" + "github.com/stretchr/testify/assert" ) -var redistributionContractABI = abiutil.MustParseABI(chaincfg.Testnet.RedistributionABI) +var ( + redistributionContractABI = abiutil.MustParseABI(chaincfg.Testnet.RedistributionABI) +) func randChunkInclusionProof(t *testing.T) redistribution.ChunkInclusionProof { t.Helper() @@ -203,7 +207,7 @@ func TestRedistribution(t *testing.T) { owner, log.Noop, transactionMock.New( - transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { + transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest, _ ...transaction.RetryOption) (common.Hash, *types.Receipt, error) { if *request.To == redistributionContractAddress { if !bytes.Equal(expectedCallData[:32], request.Data[:32]) { return common.Hash{}, nil, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) @@ -218,7 +222,7 @@ func TestRedistribution(t *testing.T) { 0, ) - _, err = contract.Claim(ctx, proofs) + _, err = contract.Claim(ctx, proofs, nil) if err != nil { t.Fatal(err) } @@ -237,7 +241,7 @@ func TestRedistribution(t *testing.T) { owner, log.Noop, transactionMock.New( - transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { + transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest, _ ...transaction.RetryOption) (common.Hash, *types.Receipt, error) { if *request.To == redistributionContractAddress { if !bytes.Equal(expectedCallData[:32], request.Data[:32]) { return common.Hash{}, nil, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) @@ -252,7 +256,7 @@ func TestRedistribution(t *testing.T) { 0, ) - _, err = contract.Claim(ctx, proofs) + _, err = contract.Claim(ctx, proofs, nil) if !errors.Is(err, transaction.ErrTransactionReverted) { t.Fatal(err) } @@ -272,7 +276,7 @@ func TestRedistribution(t *testing.T) { owner, log.Noop, transactionMock.New( - transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { + transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest, _ ...transaction.RetryOption) (common.Hash, *types.Receipt, error) { if *request.To == redistributionContractAddress { if !bytes.Equal(expectedCallData[:32], request.Data[:32]) { return common.Hash{}, nil, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) @@ -309,7 +313,7 @@ func TestRedistribution(t *testing.T) { owner, log.Noop, transactionMock.New( - transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { + transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest, _ ...transaction.RetryOption) (common.Hash, *types.Receipt, error) { if *request.To == redistributionContractAddress { if !bytes.Equal(expectedCallData[:32], request.Data[:32]) { return common.Hash{}, nil, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) @@ -401,7 +405,7 @@ func TestRedistribution(t *testing.T) { owner, log.Noop, transactionMock.New( - transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { + transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest, _ ...transaction.RetryOption) (common.Hash, *types.Receipt, error) { if *request.To == redistributionContractAddress { if !bytes.Equal(expectedCallData, request.Data) { return common.Hash{}, nil, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) @@ -422,3 +426,151 @@ func TestRedistribution(t *testing.T) { } }) } + +func TestCommit_CriticalErrorFails(t *testing.T) { + t.Parallel() + + ctx := context.Background() + owner := common.HexToAddress("abcd") + overlay := swarm.NewAddress(common.HexToHash("cbd").Bytes()) + redistributionContractAddress := common.HexToAddress("ffff") + testobfus := common.Hex2Bytes("hash") + + txSvc := transactionMock.New( + transactionMock.WithSendWithRetryFunc(func(_ context.Context, _ *transaction.TxRequest, _ ...transaction.RetryOption) (common.Hash, *types.Receipt, error) { + return common.Hash{}, nil, transaction.ErrTransactionReverted + }), + ) + + c := redistribution.New( + overlay, + owner, + log.Noop, + txSvc, + redistributionContractAddress, + redistributionContractABI, + 0, + ) + + _, err := c.Commit(ctx, testobfus, 0) + assert.Error(t, err) + assert.ErrorIs(t, err, transaction.ErrTransactionReverted) +} + +func TestCommit_withoutGasFeeCapOnRequest(t *testing.T) { + t.Parallel() + + ctx := context.Background() + owner := common.HexToAddress("abcd") + overlay := swarm.NewAddress(common.HexToHash("cbd").Bytes()) + redistributionContractAddress := common.HexToAddress("ffff") + testobfus := common.Hex2Bytes("hash") + expectedHash := common.HexToHash("bbbb") + + txSvc := transactionMock.New( + transactionMock.WithSendWithRetryFunc(func(_ context.Context, request *transaction.TxRequest, _ ...transaction.RetryOption) (common.Hash, *types.Receipt, error) { + assert.Nil(t, request.GasFeeCap) + assert.Nil(t, request.GasPrice) + return expectedHash, &types.Receipt{Status: 1}, nil + }), + ) + + c := redistribution.New( + overlay, + owner, + log.Noop, + txSvc, + redistributionContractAddress, + redistributionContractABI, + 0, + ) + + h, err := c.Commit(ctx, testobfus, 0) + assert.NoError(t, err) + assert.Equal(t, expectedHash, h) +} + +func TestClaim_sendsWithRetryOptions(t *testing.T) { + t.Parallel() + + ctx := context.Background() + owner := common.HexToAddress("abcd") + overlay := swarm.NewAddress(common.HexToHash("cbd").Bytes()) + redistributionContractAddress := common.HexToAddress("ffff") + proofs := randChunkInclusionProofs(t) + expectedHash := common.HexToHash("cafe") + + var sendCalls atomic.Int32 + var retryOptsLen int + txSvc := transactionMock.New( + transactionMock.WithSendWithRetryFunc(func(_ context.Context, request *transaction.TxRequest, opts ...transaction.RetryOption) (common.Hash, *types.Receipt, error) { + sendCalls.Add(1) + retryOptsLen = len(opts) + callData, err := redistributionContractABI.Pack("claim", proofs.A, proofs.B, proofs.C) + assert.NoError(t, err) + assert.Equal(t, callData, request.Data) + return expectedHash, &types.Receipt{Status: 1}, nil + }), + ) + + c := redistribution.New( + overlay, + owner, + log.Noop, + txSvc, + redistributionContractAddress, + redistributionContractABI, + 0, + ) + + opts := &redistribution.ClaimOpts{ + OverrideAfterBlock: 100, + CurrentBlockFn: func() uint64 { return 110 }, + ExpectedReward: new(big.Int).Mul(big.NewInt(50), big.NewInt(1_000_000)), + RoundFees: big.NewInt(100_000), + } + + h, err := c.Claim(ctx, proofs, opts) + assert.NoError(t, err) + assert.Equal(t, expectedHash, h) + assert.EqualValues(t, 1, sendCalls.Load()) + assert.Equal(t, 1, retryOptsLen, "Claim must pass WithIgnoreMaxPrice retry option") +} + +func TestClaim_contextCanceled(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + owner := common.HexToAddress("abcd") + overlay := swarm.NewAddress(common.HexToHash("cbd").Bytes()) + redistributionContractAddress := common.HexToAddress("ffff") + proofs := randChunkInclusionProofs(t) + + txSvc := transactionMock.New( + transactionMock.WithSendWithRetryFunc(func(ctx context.Context, _ *transaction.TxRequest, _ ...transaction.RetryOption) (common.Hash, *types.Receipt, error) { + return common.Hash{}, nil, ctx.Err() + }), + ) + + c := redistribution.New( + overlay, + owner, + log.Noop, + txSvc, + redistributionContractAddress, + redistributionContractABI, + 0, + ) + + opts := &redistribution.ClaimOpts{ + OverrideAfterBlock: 100, + CurrentBlockFn: func() uint64 { return 200 }, + ExpectedReward: big.NewInt(1000), + RoundFees: big.NewInt(1), + } + + _, err := c.Claim(ctx, proofs, opts) + assert.ErrorIs(t, err, context.Canceled) +} diff --git a/pkg/storageincentives/redistributionstate.go b/pkg/storageincentives/redistributionstate.go index 9929a9e7831..adbcbdcbcc3 100644 --- a/pkg/storageincentives/redistributionstate.go +++ b/pkg/storageincentives/redistributionstate.go @@ -61,6 +61,7 @@ type RoundData struct { CommitKey []byte SampleData *SampleData HasRevealed bool + RoundFees *big.Int } type SampleData struct { @@ -184,8 +185,8 @@ func (r *RedistributionState) SetLastSelectedRound(round uint64) { r.save() } -// AddFee sets the internal node status -func (r *RedistributionState) AddFee(ctx context.Context, txHash common.Hash) { +// AddRoundFee tracks fees spent in a specific round. +func (r *RedistributionState) AddRoundFee(ctx context.Context, round uint64, txHash common.Hash) { fee, err := r.txService.TransactionFee(ctx, txHash) if err != nil { return @@ -194,10 +195,28 @@ func (r *RedistributionState) AddFee(ctx context.Context, txHash common.Hash) { r.mtx.Lock() defer r.mtx.Unlock() + rd := r.status.RoundData[round] + if rd.RoundFees == nil { + rd.RoundFees = new(big.Int) + } + rd.RoundFees.Add(rd.RoundFees, fee) + r.status.RoundData[round] = rd r.status.Fees.Add(r.status.Fees, fee) r.save() } +// RoundFees returns the total fees spent in a given round. +func (r *RedistributionState) RoundFees(round uint64) *big.Int { + r.mtx.Lock() + defer r.mtx.Unlock() + + rd, ok := r.status.RoundData[round] + if !ok || rd.RoundFees == nil { + return new(big.Int) + } + return new(big.Int).Set(rd.RoundFees) +} + // CalculateWinnerReward calculates the reward for the winner func (r *RedistributionState) CalculateWinnerReward(ctx context.Context) error { currentBalance, err := r.erc20Service.BalanceOf(ctx, r.ethAddress) diff --git a/pkg/storageincentives/redistributionstate_test.go b/pkg/storageincentives/redistributionstate_test.go index 9e2930642a4..318aca7e264 100644 --- a/pkg/storageincentives/redistributionstate_test.go +++ b/pkg/storageincentives/redistributionstate_test.go @@ -295,9 +295,10 @@ func TestReward(t *testing.T) { } } -// TestFee check if fees increments when called multiple times -func TestFee(t *testing.T) { +// TestRoundFee check if fees increments when called multiple times +func TestRoundFee(t *testing.T) { t.Parallel() + const round = 1 firstFee := big.NewInt(10) state := createRedistribution(t, nil, []transactionmock.Option{ transactionmock.WithTransactionFeeFunc(func(ctx context.Context, txHash common.Hash) (*big.Int, error) { @@ -305,7 +306,7 @@ func TestFee(t *testing.T) { }), }) ctx := context.Background() - state.AddFee(ctx, common.Hash{}) + state.AddRoundFee(ctx, round, common.Hash{}) gotFirstResult, err := state.Status() if err != nil { t.Fatal("failed to get status") @@ -320,7 +321,7 @@ func TestFee(t *testing.T) { }), }...) - state.AddFee(ctx, common.Hash{}) + state.AddRoundFee(ctx, round, common.Hash{}) gotSecondResult, err := state.Status() if err != nil { t.Fatal("failed to get status") diff --git a/pkg/transaction/export_test.go b/pkg/transaction/export_test.go index 8b6a9070fdb..4cf9af5246a 100644 --- a/pkg/transaction/export_test.go +++ b/pkg/transaction/export_test.go @@ -25,6 +25,7 @@ func SuggestGasFeeGasTipCapWithHistory( maxTxPrice *big.Int, ctx context.Context, prevGasTipCap *big.Int, + overrides *RetryOverrides, ) (gasFeeCap, gasTipCap *big.Int, err error) { svc := &transactionService{ logger: log.Noop, @@ -32,5 +33,5 @@ func SuggestGasFeeGasTipCapWithHistory( txRetryGasIncreasePercent: gasIncreasePercent, maxTxPrice: maxTxPrice, } - return svc.suggestGasFeeGasTipCapWithHistory(ctx, prevGasTipCap) + return svc.suggestGasFeeGasTipCapWithHistory(ctx, prevGasTipCap, overrides) } diff --git a/pkg/transaction/mock/transaction.go b/pkg/transaction/mock/transaction.go index 66ad081fbe4..8491ca0fa49 100644 --- a/pkg/transaction/mock/transaction.go +++ b/pkg/transaction/mock/transaction.go @@ -18,8 +18,9 @@ import ( ) type transactionServiceMock struct { + estimateTxCost func(ctx context.Context, gasUnits int64, tip int) (cost *big.Int, gasFeeCap *big.Int, err error) send func(ctx context.Context, request *transaction.TxRequest, boost int) (txHash common.Hash, err error) - sendWithRetry func(ctx context.Context, request *transaction.TxRequest) (txHash common.Hash, receipt *types.Receipt, err error) + sendWithRetry func(ctx context.Context, request *transaction.TxRequest, opts ...transaction.RetryOption) (txHash common.Hash, receipt *types.Receipt, err error) waitForReceipt func(ctx context.Context, txHash common.Hash) (receipt *types.Receipt, err error) watchSentTransaction func(txHash common.Hash) (chan types.Receipt, chan error, error) call func(ctx context.Context, request *transaction.TxRequest) (result []byte, err error) @@ -30,13 +31,20 @@ type transactionServiceMock struct { transactionFee func(ctx context.Context, txHash common.Hash) (*big.Int, error) } -func (m *transactionServiceMock) SendWithRetry(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { +func (m *transactionServiceMock) SendWithRetry(ctx context.Context, request *transaction.TxRequest, opts ...transaction.RetryOption) (common.Hash, *types.Receipt, error) { if m.sendWithRetry != nil { - return m.sendWithRetry(ctx, request) + return m.sendWithRetry(ctx, request, opts...) } return common.Hash{}, nil, errors.New("not implemented") } +func (m *transactionServiceMock) EstimateTxCost(ctx context.Context, gasUnits int64, tip int) (*big.Int, *big.Int, error) { + if m.estimateTxCost != nil { + return m.estimateTxCost(ctx, gasUnits, tip) + } + return big.NewInt(0), big.NewInt(0), nil +} + func (m *transactionServiceMock) Send(ctx context.Context, request *transaction.TxRequest, boostPercent int) (txHash common.Hash, err error) { if m.send != nil { return m.send(ctx, request, boostPercent) @@ -118,12 +126,18 @@ type optionFunc func(*transactionServiceMock) func (f optionFunc) apply(r *transactionServiceMock) { f(r) } -func WithSendWithRetryFunc(f func(context.Context, *transaction.TxRequest) (common.Hash, *types.Receipt, error)) Option { +func WithSendWithRetryFunc(f func(context.Context, *transaction.TxRequest, ...transaction.RetryOption) (common.Hash, *types.Receipt, error)) Option { return optionFunc(func(s *transactionServiceMock) { s.sendWithRetry = f }) } +func WithEstimateTxCostFunc(f func(context.Context, int64, int) (*big.Int, *big.Int, error)) Option { + return optionFunc(func(s *transactionServiceMock) { + s.estimateTxCost = f + }) +} + func WithSendFunc(f func(context.Context, *transaction.TxRequest, int) (txHash common.Hash, err error)) Option { return optionFunc(func(s *transactionServiceMock) { s.send = f diff --git a/pkg/transaction/send_tx_with_retry.go b/pkg/transaction/send_tx_with_retry.go index a7dcbfa0377..fda9936451e 100644 --- a/pkg/transaction/send_tx_with_retry.go +++ b/pkg/transaction/send_tx_with_retry.go @@ -19,6 +19,46 @@ import ( const retryStatePrefix = "transaction_retry_" +// RetryOverrides controls per-call behaviour overrides for SendWithRetry. +// Fields are optional; nil means "use default behaviour". +type RetryOverrides struct { + // IgnoreMaxPrice is called when maxTxPrice would block a broadcast. + // It receives the gasFeeCap (max fee per gas, wei) that would be used + // for this attempt. If it returns true, the price cap is bypassed. + IgnoreMaxPrice func(gasFeeCap *big.Int) bool + + // RetryDelay, if set, overrides the configured delay between attempts. + RetryDelay func(attempt int) time.Duration +} + +// RetryOption configures per-call overrides for SendWithRetry. +type RetryOption func(*RetryOverrides) + +// WithIgnoreMaxPrice returns a RetryOption that installs a predicate called +// whenever the configured maxTxPrice would block a broadcast. The predicate +// receives the gasFeeCap (max fee per gas, wei) that would be used. When fn +// returns true the price cap is bypassed for that attempt. +func WithIgnoreMaxPrice(fn func(gasFeeCap *big.Int) bool) RetryOption { + return func(o *RetryOverrides) { o.IgnoreMaxPrice = fn } +} + +// WithRetryDelay returns a RetryOption that overrides the configured +// per-attempt delay. +func WithRetryDelay(fn func(attempt int) time.Duration) RetryOption { + return func(o *RetryOverrides) { o.RetryDelay = fn } +} + +func applyRetryOptions(opts []RetryOption) *RetryOverrides { + if len(opts) == 0 { + return nil + } + var o RetryOverrides + for _, fn := range opts { + fn(&o) + } + return &o +} + // TransactionRetryState is persisted so transactions with retry can resume after a node restart. type TransactionRetryState struct { Nonce uint64 `json:"nonce"` @@ -43,13 +83,14 @@ func retryStateKey(nonce uint64) string { // SendWithRetry sends an EIP-1559 transaction using one eth_feeHistory snapshot for the initial tip, // then increases gas tip by gas_increase_percent after each unsuccessful wait, up to max_retries. -func (t *transactionService) SendWithRetry(ctx context.Context, request *TxRequest) (txHash common.Hash, receipt *types.Receipt, err error) { +// Optional RetryOption values can override per-call retry behaviour (e.g. bypass price cap). +func (t *transactionService) SendWithRetry(ctx context.Context, request *TxRequest, opts ...RetryOption) (txHash common.Hash, receipt *types.Receipt, err error) { if request.GasPrice != nil { err = errors.New("send txs with retry requires automatic gas pricing") t.recordRetryComplete(0, err) return common.Hash{}, nil, err } - return t.retry(ctx, "", request) + return t.retry(ctx, "", request, applyRetryOptions(opts)) } // escalateGasTip returns tip * (100+increasePct)/100 — a single escalation step. @@ -72,7 +113,9 @@ func escalateGasTip(tip *big.Int, increasePct int) *big.Int { // When maxTxPrice is set and 2*baseFee + escalated tip exceeds it, the function broadcasts with the // un-escalated previous tip (2*baseFee + prevGasTipCap) instead. If that fee cap still exceeds // maxTxPrice, it returns ErrTxMaxPriceExceeded. -func (t *transactionService) suggestGasFeeGasTipCapWithHistory(ctx context.Context, prevGasTipCap *big.Int) (gasFeeCap, gasTipCap *big.Int, err error) { +// +// When overrides.IgnoreMaxPrice is set and returns true, the maxTxPrice cap is bypassed. +func (t *transactionService) suggestGasFeeGasTipCapWithHistory(ctx context.Context, prevGasTipCap *big.Int, overrides *RetryOverrides) (gasFeeCap, gasTipCap *big.Int, err error) { header, err := t.backend.HeaderByNumber(ctx, nil) if err != nil { return nil, prevGasTipCap, err @@ -108,7 +151,18 @@ func (t *transactionService) suggestGasFeeGasTipCapWithHistory(ctx context.Conte "gas_fee_cap_with_previous_tip", gasFeeCapWithPreviousTip, "max_tx_price", t.maxTxPrice) + canOverride := func(feeCap *big.Int) bool { + return overrides != nil && overrides.IgnoreMaxPrice != nil && overrides.IgnoreMaxPrice(feeCap) + } + if t.maxTxPrice != nil && gasFeeCapWithEscalatedTip.Cmp(t.maxTxPrice) > 0 { + if canOverride(gasFeeCapWithEscalatedTip) { + t.logger.Info("max price override: bypassing limit", + "escalated_gas_fee_cap", gasFeeCapWithEscalatedTip, + "max_tx_price", t.maxTxPrice) + return gasFeeCapWithEscalatedTip, escalatedGasTip, nil + } + t.logger.Warning("gas cap fee with escalated gas tip is too high, fallback to previous gas tip", "escalated_gas_tip_cap", escalatedGasTip.String(), "escalated_gas_fee_cap", gasFeeCapWithEscalatedTip.String(), @@ -122,13 +176,13 @@ func (t *transactionService) suggestGasFeeGasTipCapWithHistory(ctx context.Conte return gasFeeCapWithEscalatedTip, escalatedGasTip, nil } -func (t *transactionService) prepareTransactionWithRetry(ctx context.Context, request *TxRequest, nonce uint64, prevGasTipCap *big.Int) (*types.Transaction, error) { +func (t *transactionService) prepareTransactionWithRetry(ctx context.Context, request *TxRequest, nonce uint64, prevGasTipCap *big.Int, overrides *RetryOverrides) (*types.Transaction, error) { gasLimit, err := t.estimateGasLimit(ctx, request) if err != nil { return nil, err } - gasFeeCap, newGasTipCap, err := t.suggestGasFeeGasTipCapWithHistory(ctx, prevGasTipCap) + gasFeeCap, newGasTipCap, err := t.suggestGasFeeGasTipCapWithHistory(ctx, prevGasTipCap, overrides) if err != nil { return nil, err } @@ -149,7 +203,7 @@ func (t *transactionService) prepareTransactionWithRetry(ctx context.Context, re // broadcastTx prepares, signs, and sends a transaction. // When fixedNonce is nil a new nonce is allocated (first attempt); // otherwise the supplied nonce is reused (replacement transaction). -func (t *transactionService) broadcastTx(ctx context.Context, request *TxRequest, fixedNonce *uint64, gasTipCap *big.Int, attempt int) (*types.Transaction, error) { +func (t *transactionService) broadcastTx(ctx context.Context, request *TxRequest, fixedNonce *uint64, gasTipCap *big.Int, attempt int, overrides *RetryOverrides) (*types.Transaction, error) { var nonce uint64 if fixedNonce != nil { @@ -164,7 +218,7 @@ func (t *transactionService) broadcastTx(ctx context.Context, request *TxRequest } nonce = n } - tx, err := t.prepareTransactionWithRetry(ctx, request, nonce, gasTipCap) + tx, err := t.prepareTransactionWithRetry(ctx, request, nonce, gasTipCap, overrides) if err != nil { return nil, err } @@ -205,7 +259,7 @@ func (t *transactionService) deleteRetryStateAndPending(retryKey string, state T _ = t.store.Delete(pendingTransactionKey(state.LastTxHash)) } } -func (t *transactionService) retry(ctx context.Context, txRetryKey string, request *TxRequest) (common.Hash, *types.Receipt, error) { +func (t *transactionService) retry(ctx context.Context, txRetryKey string, request *TxRequest, overrides *RetryOverrides) (common.Hash, *types.Receipt, error) { var ( txState TransactionRetryState nonce *uint64 @@ -238,12 +292,19 @@ func (t *transactionService) retry(ctx context.Context, txRetryKey string, reque "nonce_assigned", txState.NonceAssigned, "previous_tip", txState.PreviousTip) + retryDelay := func(attempt int) time.Duration { + if overrides != nil && overrides.RetryDelay != nil { + return overrides.RetryDelay(attempt) + } + return t.txRetryDelay + } + for attempt := txState.NextAttempt; attempt < t.txMaxRetries; attempt++ { if txState.NonceAssigned { nonce = &txState.Nonce } - signedTx, err := t.broadcastTx(ctx, request, nonce, txState.PreviousTip, attempt) + signedTx, err := t.broadcastTx(ctx, request, nonce, txState.PreviousTip, attempt, overrides) if err != nil { if isErrCritical(err) { t.logger.Error(err, @@ -277,22 +338,24 @@ func (t *transactionService) retry(ctx context.Context, txRetryKey string, reque "previous_tip", txState.PreviousTip, "description", request.Description) + delay := retryDelay(attempt) + if txState.LastTxHash == (common.Hash{}) { loggerV1.Debug("send with retry: no tx hash after broadcast failure, waiting before next attempt", "attempt", attempt, - "retry_delay", t.txRetryDelay, + "retry_delay", delay, "description", request.Description) select { case <-ctx.Done(): err := ctx.Err() t.recordRetryComplete(txState.NextAttempt, err) return common.Hash{}, nil, err - case <-time.After(t.txRetryDelay): + case <-time.After(delay): continue } } - waitCtx, cancel := context.WithTimeout(ctx, t.txRetryDelay) + waitCtx, cancel := context.WithTimeout(ctx, delay) rec, waitErr := t.WaitForReceipt(waitCtx, txState.LastTxHash) cancel() @@ -499,7 +562,7 @@ func (t *transactionService) resumeRetryTransactions() error { sk := key st := state t.wg.Go(func() { - if _, _, err := t.retry(t.ctx, sk, nil); err != nil { + if _, _, err := t.retry(t.ctx, sk, nil, nil); err != nil { t.logger.Error(err, "resumed transaction retry aborted", "nonce", st.Nonce, "description", st.Description) } }) diff --git a/pkg/transaction/send_tx_with_retry_test.go b/pkg/transaction/send_tx_with_retry_test.go index ecd7a2b3161..5a54f780600 100644 --- a/pkg/transaction/send_tx_with_retry_test.go +++ b/pkg/transaction/send_tx_with_retry_test.go @@ -67,7 +67,7 @@ func TestSuggestGasFeeGasTipCapWithHistory(t *testing.T) { backend := backendmock.New(headerOption(), feeHistoryOption(&feeHistoryCalls)) gasFeeCap, gasTipCap, err := transaction.SuggestGasFeeGasTipCapWithHistory( - backend, gasIncreasePct, nil, context.Background(), nil, + backend, gasIncreasePct, nil, context.Background(), nil, nil, ) require.NoError(t, err) @@ -83,7 +83,7 @@ func TestSuggestGasFeeGasTipCapWithHistory(t *testing.T) { backend := backendmock.New(headerOption(), feeHistoryOption(&feeHistoryCalls)) gasFeeCap, gasTipCap, err := transaction.SuggestGasFeeGasTipCapWithHistory( - backend, gasIncreasePct, nil, context.Background(), big.NewInt(prevTip), + backend, gasIncreasePct, nil, context.Background(), big.NewInt(prevTip), nil, ) require.NoError(t, err) @@ -101,7 +101,7 @@ func TestSuggestGasFeeGasTipCapWithHistory(t *testing.T) { backend := backendmock.New(headerOption()) gasFeeCap, gasTipCap, err := transaction.SuggestGasFeeGasTipCapWithHistory( - backend, gasIncreasePct, maxTxPrice, context.Background(), big.NewInt(prevTip), + backend, gasIncreasePct, maxTxPrice, context.Background(), big.NewInt(prevTip), nil, ) require.NoError(t, err) @@ -117,13 +117,56 @@ func TestSuggestGasFeeGasTipCapWithHistory(t *testing.T) { backend := backendmock.New(headerOption()) gasFeeCap, gasTipCap, err := transaction.SuggestGasFeeGasTipCapWithHistory( - backend, gasIncreasePct, maxTxPrice, context.Background(), big.NewInt(prevTip), + backend, gasIncreasePct, maxTxPrice, context.Background(), big.NewInt(prevTip), nil, ) assert.ErrorIs(t, err, transaction.ErrTxMaxPriceExceeded) assert.Nil(t, gasFeeCap) assert.Nil(t, gasTipCap) }) + + t.Run("IgnoreMaxPrice override bypasses limit", func(t *testing.T) { + t.Parallel() + + maxTxPrice := big.NewInt(baseFeeCap + prevTip - 1) + backend := backendmock.New(headerOption()) + + var receivedFeeCap *big.Int + overrides := &transaction.RetryOverrides{ + IgnoreMaxPrice: func(feeCap *big.Int) bool { + receivedFeeCap = feeCap + return true + }, + } + + gasFeeCap, gasTipCap, err := transaction.SuggestGasFeeGasTipCapWithHistory( + backend, gasIncreasePct, maxTxPrice, context.Background(), big.NewInt(prevTip), overrides, + ) + + require.NoError(t, err, "override should bypass ErrTxMaxPriceExceeded") + assert.Equal(t, escalatedTip, gasTipCap.Int64(), "must use escalated tip despite exceeding max") + assert.Equal(t, baseFeeCap+escalatedTip, gasFeeCap.Int64()) + assert.NotNil(t, receivedFeeCap, "IgnoreMaxPrice must receive gasFeeCap") + assert.Equal(t, gasFeeCap.Int64(), receivedFeeCap.Int64(), + "IgnoreMaxPrice must receive the actual gasFeeCap that would be used") + }) + + t.Run("IgnoreMaxPrice false does not bypass limit", func(t *testing.T) { + t.Parallel() + + maxTxPrice := big.NewInt(baseFeeCap + prevTip - 1) + backend := backendmock.New(headerOption()) + + overrides := &transaction.RetryOverrides{ + IgnoreMaxPrice: func(_ *big.Int) bool { return false }, + } + + _, _, err := transaction.SuggestGasFeeGasTipCapWithHistory( + backend, gasIncreasePct, maxTxPrice, context.Background(), big.NewInt(prevTip), overrides, + ) + + assert.ErrorIs(t, err, transaction.ErrTxMaxPriceExceeded) + }) } // capturedBroadcast records the parameters of a transaction as seen by SendTransaction. @@ -855,6 +898,128 @@ func TestSendWithRetry_MaxTxPriceCap(t *testing.T) { }) } +// WithIgnoreMaxPrice override allows transactions to be sent despite exceeding maxTxPrice. +func TestSendWithRetry_IgnoreMaxPriceOverride(t *testing.T) { + t.Parallel() + s := newRetryTestSetup() + store := storemock.NewStateStore() + testutil.CleanupCloser(t, store) + + marketTip := s.expectedMarketTip() + maxTxPrice := new(big.Int).Sub(s.expectedGasFeeCap(marketTip), big.NewInt(1)) + + cfg := s.retryConfig() + cfg.MaxTxPrice = maxTxPrice + + var broadcasts []capturedBroadcast + var overrideCalls atomic.Int32 + + svc, err := transaction.NewService(log.Noop, s.sender, + backendmock.New( + s.nonceOption(), + s.feeHistoryOption(nil), + s.headerOption(), + s.estimateGasOption(), + backendmock.WithSendTransactionFunc(func(ctx context.Context, tx *types.Transaction) error { + broadcasts = append(broadcasts, captureTx(tx)) + return nil + }), + ), + signermock.New(s.passThroughSigner(), s.signerAddr()), + store, + s.chainID, + monitormock.New( + monitormock.WithWatchTransactionFunc(func(txHash common.Hash, nonce uint64) (<-chan types.Receipt, <-chan error, error) { + ch := make(chan types.Receipt, 1) + ch <- types.Receipt{TxHash: txHash, Status: 1} + return ch, nil, nil + }), + ), + 0, + cfg, + ) + require.NoError(t, err) + testutil.CleanupCloser(t, svc) + + var receivedFeeCaps []*big.Int + txHash, receipt, err := svc.SendWithRetry(context.Background(), s.request(), + transaction.WithIgnoreMaxPrice(func(gasFeeCap *big.Int) bool { + overrideCalls.Add(1) + receivedFeeCaps = append(receivedFeeCaps, new(big.Int).Set(gasFeeCap)) + return true + }), + ) + + require.NoError(t, err) + assert.NotEqual(t, common.Hash{}, txHash) + require.NotNil(t, receipt) + assert.Equal(t, uint64(1), receipt.Status) + + require.Len(t, broadcasts, 1, "transaction should be sent despite exceeding maxTxPrice") + assert.Equal(t, marketTip.Int64(), broadcasts[0].GasTipCap.Int64()) + assert.GreaterOrEqual(t, int(overrideCalls.Load()), 1, "override function must have been called") + require.Len(t, receivedFeeCaps, 1, "override should have received gasFeeCap") + assert.Equal(t, broadcasts[0].GasFeeCap.Int64(), receivedFeeCaps[0].Int64(), + "override must receive the same gasFeeCap as the broadcast") +} + +// WithRetryDelay override changes the delay between retry attempts. +func TestSendWithRetry_RetryDelayOverride(t *testing.T) { + t.Parallel() + s := newRetryTestSetup() + store := storemock.NewStateStore() + testutil.CleanupCloser(t, store) + + cfg := s.retryConfig() + cfg.RetryDelay = 10 * time.Second + + var broadcastCount atomic.Int32 + + svc, err := transaction.NewService(log.Noop, s.sender, + backendmock.New( + s.nonceOption(), + s.feeHistoryOption(nil), + s.headerOption(), + s.estimateGasOption(), + backendmock.WithSendTransactionFunc(func(ctx context.Context, tx *types.Transaction) error { + broadcastCount.Add(1) + return nil + }), + ), + signermock.New(s.passThroughSigner(), s.signerAddr()), + store, + s.chainID, + monitormock.New( + monitormock.WithWatchTransactionFunc(func(txHash common.Hash, nonce uint64) (<-chan types.Receipt, <-chan error, error) { + if broadcastCount.Load() <= 1 { + return make(chan types.Receipt), make(chan error), nil + } + ch := make(chan types.Receipt, 1) + ch <- types.Receipt{TxHash: txHash, Status: 1} + return ch, nil, nil + }), + ), + 0, + cfg, + ) + require.NoError(t, err) + testutil.CleanupCloser(t, svc) + + start := time.Now() + _, receipt, err := svc.SendWithRetry(context.Background(), s.request(), + transaction.WithRetryDelay(func(attempt int) time.Duration { + return 50 * time.Millisecond + }), + ) + + require.NoError(t, err) + require.NotNil(t, receipt) + elapsed := time.Since(start) + assert.Less(t, elapsed, 2*time.Second, + "with overridden short delay, retry should complete much faster than the 10s default") + assert.GreaterOrEqual(t, int(broadcastCount.Load()), 2, "should have retried at least once") +} + // failOnNthPutStore wraps a StateStorer and fails the Nth Put call with putErr. type failOnNthPutStore struct { storage.StateStorer diff --git a/pkg/transaction/transaction.go b/pkg/transaction/transaction.go index cdc927f83c7..0234724e1c7 100644 --- a/pkg/transaction/transaction.go +++ b/pkg/transaction/transaction.go @@ -113,10 +113,12 @@ type StoredTransaction struct { // limit and nonce management. type Service interface { io.Closer + EstimateTxCost(ctx context.Context, gasUnits int64, tip int) (cost *big.Int, gasFeeCap *big.Int, err error) // Send creates a transaction based on the request (with gasprice increased by provided percentage) and sends it. Send(ctx context.Context, request *TxRequest, tipCapBoostPercent int) (txHash common.Hash, err error) // SendWithRetry sends a transaction using fee-history tiers and automatic fee escalation; see send_tx_with_retry.go. - SendWithRetry(ctx context.Context, request *TxRequest) (txHash common.Hash, receipt *types.Receipt, err error) + // Optional RetryOption values can override per-call retry behaviour (e.g. bypass price cap). + SendWithRetry(ctx context.Context, request *TxRequest, opts ...RetryOption) (txHash common.Hash, receipt *types.Receipt, err error) // Call simulate a transaction based on the request. Call(ctx context.Context, request *TxRequest) (result []byte, err error) // WaitForReceipt waits until either the transaction with the given hash has been mined or the context is cancelled. @@ -243,6 +245,15 @@ func (t *transactionService) waitForAllPendingTx() error { return nil } +func (t *transactionService) EstimateTxCost(ctx context.Context, gasUnits int64, tip int) (cost *big.Int, gasFeeCap *big.Int, err error) { + gasFeeCap, _, err = t.backend.SuggestedFeeAndTip(ctx, nil, tip) + if err != nil { + return nil, nil, err + } + cost = new(big.Int).Mul(big.NewInt(gasUnits), gasFeeCap) + return cost, gasFeeCap, nil +} + // Send creates and signs a transaction based on the request and sends it. func (t *transactionService) Send(ctx context.Context, request *TxRequest, boostPercent int) (txHash common.Hash, err error) { loggerV1 := t.logger.V(1).Register() @@ -427,6 +438,18 @@ func (t *transactionService) prepareTransaction(ctx context.Context, request *Tx if err != nil { return nil, err } + if request.GasFeeCap != nil { + if request.GasFeeCap.Sign() <= 0 { + return nil, errors.New("gas fee cap must be greater than zero") + } + if gasFeeCap.Cmp(request.GasFeeCap) > 0 { + return nil, fmt.Errorf("gas fee cap exceeded: suggested=%s requested=%s", gasFeeCap, request.GasFeeCap) + } + gasFeeCap = new(big.Int).Set(request.GasFeeCap) + if gasTipCap.Cmp(gasFeeCap) > 0 { + gasTipCap = new(big.Int).Set(gasFeeCap) + } + } t.logger.Debug("prepared transaction", "to", request.To, @@ -573,6 +596,15 @@ func (t *transactionService) ResendTransaction(ctx context.Context, txHash commo if err != nil { return err } + if storedTransaction.GasFeeCap != nil && gasFeeCap.Cmp(storedTransaction.GasFeeCap) > 0 { + gasFeeCap = new(big.Int).Set(storedTransaction.GasFeeCap) + } + if storedTransaction.GasTipCap != nil && gasTipCap.Cmp(storedTransaction.GasTipCap) > 0 { + gasTipCap = new(big.Int).Set(storedTransaction.GasTipCap) + } + if gasTipCap.Cmp(gasFeeCap) > 0 { + gasTipCap = new(big.Int).Set(gasFeeCap) + } tx := types.NewTx(&types.DynamicFeeTx{ Nonce: storedTransaction.Nonce,