diff --git a/contracts/utils.go b/contracts/utils.go index d39a715aec5d..e1a357b76b95 100644 --- a/contracts/utils.go +++ b/contracts/utils.go @@ -93,7 +93,7 @@ func CreateTransactionSign(chainConfig *params.ChainConfig, pool *txpool.TxPool, return err } // Add tx signed to local tx pool. - err = pool.Add([]*types.Transaction{txSigned}, true, true)[0] + err = pool.AddLocal(txSigned, true) if err != nil { log.Error("Fail to add tx sign to local pool.", "error", err, "number", block.NumberU64(), "hash", block.Hash().Hex(), "from", account.Address, "nonce", nonce) return err @@ -121,7 +121,7 @@ func CreateTransactionSign(chainConfig *params.ChainConfig, pool *txpool.TxPool, return err } // Add tx signed to local tx pool. - err = pool.Add([]*types.Transaction{txSigned}, true, true)[0] + err = pool.AddLocal(txSigned, true) if err != nil { log.Error("Fail to add tx secret to local pool.", "error", err, "number", block.NumberU64(), "hash", block.Hash().Hex(), "from", account.Address, "nonce", nonce) return err @@ -150,7 +150,7 @@ func CreateTransactionSign(chainConfig *params.ChainConfig, pool *txpool.TxPool, return err } // Add tx to pool. - err = pool.Add([]*types.Transaction{txSigned}, true, true)[0] + err = pool.AddLocal(txSigned, true) if err != nil { log.Error("Fail to add tx opening to local pool.", "error", err, "number", block.NumberU64(), "hash", block.Hash().Hex(), "from", account.Address, "nonce", nonce) return err diff --git a/core/txpool/legacypool/journal_shared.go b/core/txpool/legacypool/journal_shared.go new file mode 100644 index 000000000000..a141385f5803 --- /dev/null +++ b/core/txpool/legacypool/journal_shared.go @@ -0,0 +1,13 @@ +package legacypool + +import "errors" + +// errNoActiveJournal is returned if a transaction is attempted to be inserted +// into the journal, but no such file is currently open. +var errNoActiveJournal = errors.New("no active journal") + +// devNull is a WriteCloser that just discards anything written into it. +type devNull struct{} + +func (*devNull) Write(p []byte) (n int, err error) { return len(p), nil } +func (*devNull) Close() error { return nil } diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index c0d8f5575819..1448f9017976 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -117,7 +117,6 @@ var ( pendingGauge = metrics.NewRegisteredGauge("txpool/pending", nil) queuedGauge = metrics.NewRegisteredGauge("txpool/queued", nil) - localGauge = metrics.NewRegisteredGauge("txpool/local", nil) slotsGauge = metrics.NewRegisteredGauge("txpool/slots", nil) pendingAddrsGauge = metrics.NewRegisteredGauge("txpool/pending/accounts", nil) @@ -191,10 +190,6 @@ var defaultMaxTip = big.NewInt(1000 * params.GWei) // unreasonable or unworkable. func (config *Config) sanitize() Config { conf := *config - if conf.Rejournal < time.Second { - log.Warn("Sanitizing invalid txpool journal time", "provided", conf.Rejournal, "updated", time.Second) - conf.Rejournal = time.Second - } if conf.PriceBump < 1 { log.Warn("Sanitizing invalid txpool price bump", "provided", conf.PriceBump, "updated", DefaultConfig.PriceBump) conf.PriceBump = DefaultConfig.PriceBump @@ -257,9 +252,6 @@ type LegacyPool struct { pendingNonces *noncer // Pending state tracking virtual nonces reserver *txpool.Reserver // Address reserver to ensure exclusivity across subpools - locals *accountSet // Set of local transaction to exempt from eviction rules - journal *journal // Journal of local transaction to back up to disk - pending map[common.Address]*list // All currently processable transactions queue map[common.Address]*list // Queued but non-processable transactions beats map[common.Address]time.Time // Last heartbeat from each known account @@ -308,16 +300,8 @@ func New(config Config, chain BlockChain) *LegacyPool { initDoneCh: make(chan struct{}), trc21FeeCapacity: map[common.Address]*big.Int{}, } - pool.locals = newAccountSet(pool.signer) - for _, addr := range config.Locals { - log.Info("Setting new local account", "address", addr) - pool.locals.add(addr) - } pool.priced = newPricedList(pool.all) - if !config.NoLocals && config.Journal != "" { - pool.journal = newTxJournal(config.Journal) - } return pool } @@ -333,8 +317,7 @@ func (pool *LegacyPool) Filter(tx *types.Transaction) bool { } // Init sets the gas price needed to keep a transaction in the pool and the chain -// head to allow balance / nonce checks. The transaction journal will be loaded -// from disk and filtered based on the provided starting settings. The internal +// head to allow balance / nonce checks. The internal // goroutines will be spun up and the pool deemed operational afterwards. func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserver *txpool.Reserver) error { // Set the address reserver to request exclusive access to pooled accounts @@ -357,19 +340,7 @@ func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserver *txpool pool.currentState = statedb pool.pendingNonces = newNoncer(statedb) - // Start the reorg loop early, so it can handle requests generated during - // journal loading. pool.wg.Go(pool.scheduleReorgLoop) - - // If local transactions and journaling is enabled, load from disk - if pool.journal != nil { - if err := pool.journal.load(pool.addLocals); err != nil { - log.Warn("Failed to load transaction journal", "err", err) - } - if err := pool.journal.rotate(pool.local()); err != nil { - log.Warn("Failed to rotate transaction journal", "err", err) - } - } pool.wg.Go(pool.loop) return nil } @@ -382,13 +353,11 @@ func (pool *LegacyPool) loop() { prevPending, prevQueued, prevStales int // Start the stats reporting and transaction eviction tickers - report = time.NewTicker(statsReportInterval) - evict = time.NewTicker(evictionInterval) - journal = time.NewTicker(pool.config.Rejournal) + report = time.NewTicker(statsReportInterval) + evict = time.NewTicker(evictionInterval) ) defer report.Stop() defer evict.Stop() - defer journal.Stop() // Notify tests that the init phase is done close(pool.initDoneCh) @@ -414,11 +383,7 @@ func (pool *LegacyPool) loop() { case <-evict.C: pool.mu.Lock() for addr := range pool.queue { - // Skip local transactions from the eviction mechanism - if pool.locals.contains(addr) { - continue - } - // Any non-locals old enough should be removed + // Any old enough should be removed if time.Since(pool.beats[addr]) > pool.config.Lifetime { list := pool.queue[addr].Flatten() for _, tx := range list { @@ -428,16 +393,6 @@ func (pool *LegacyPool) loop() { } } pool.mu.Unlock() - - // Handle local transaction journal rotation - case <-journal.C: - if pool.journal != nil { - pool.mu.Lock() - if err := pool.journal.rotate(pool.local()); err != nil { - log.Warn("Failed to rotate local tx journal", "err", err) - } - pool.mu.Unlock() - } } } } @@ -448,9 +403,6 @@ func (pool *LegacyPool) Close() error { close(pool.reorgShutdownCh) pool.wg.Wait() - if pool.journal != nil { - pool.journal.close() - } log.Info("Transaction pool stopped") return nil } @@ -501,7 +453,7 @@ func (pool *LegacyPool) SetGasTip(tip *big.Int) error { // If the min miner fee increased, remove transactions below the new threshold if newTip.Cmp(old) > 0 { // pool.priced is sorted by GasFeeCap, so we have to iterate through pool.all instead - drop := pool.all.RemotesBelowTip(tip) + drop := pool.all.TxsBelowTip(tip) for _, tx := range drop { pool.removeTx(tx.Hash(), false, true) } @@ -603,7 +555,7 @@ func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address] txs := list.Flatten() // If the miner requests tip enforcement, cap the lists now - if minTipBig != nil && !pool.locals.contains(addr) { + if minTipBig != nil { for i, tx := range txs { if !tx.IsSpecialTransaction() && tx.EffectiveGasTipIntCmp(minTipBig, baseFeeBig) < 0 { txs = txs[:i] @@ -630,35 +582,11 @@ func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address] return pending } -// Locals retrieves the accounts currently considered local by the pool. -func (pool *LegacyPool) Locals() []common.Address { - pool.mu.Lock() - defer pool.mu.Unlock() - - return pool.locals.flatten() -} - -// local retrieves all currently known local transactions, grouped by origin -// account and sorted by nonce. The returned transaction set is a copy and can be -// freely modified by calling code. -func (pool *LegacyPool) local() map[common.Address]types.Transactions { - txs := make(map[common.Address]types.Transactions) - for addr := range pool.locals.accounts { - if pending := pool.pending[addr]; pending != nil { - txs[addr] = append(txs[addr], pending.Flatten()...) - } - if queued := pool.queue[addr]; queued != nil { - txs[addr] = append(txs[addr], queued.Flatten()...) - } - } - return txs -} - // validateTxBasics checks whether a transaction is valid according to the consensus // rules, but does not check state-dependent validation such as sufficient balance. // This check is meant as an early check which only needs to be performed once, // and does not require the pool mutex to be held. -func (pool *LegacyPool) validateTxBasics(tx *types.Transaction, local bool) error { +func (pool *LegacyPool) validateTxBasics(tx *types.Transaction) error { opts := &txpool.ValidationOptions{ Config: pool.chainconfig, Accept: 0 | @@ -672,9 +600,6 @@ func (pool *LegacyPool) validateTxBasics(tx *types.Transaction, local bool) erro return pool.isSigner != nil && !pool.isSigner(from) }, } - if local { - opts.MinTip = new(big.Int) - } if err := txpool.ValidateTransaction(tx, pool.currentHead.Load(), pool.signer, opts); err != nil { return err } @@ -797,11 +722,7 @@ func (pool *LegacyPool) validateAuth(tx *types.Transaction) error { // add validates a transaction and inserts it into the non-executable queue for later // pending promotion and execution. If the transaction is a replacement for an already // pending or queued one, it overwrites the previous transaction if its price is higher. -// -// If a newly added transaction is marked as local, its sending account will be -// added to the allowlist, preventing any associated transaction from being dropped -// out of the pool due to pricing constraints. -func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, err error) { +func (pool *LegacyPool) add(tx *types.Transaction) (replaced bool, err error) { // If the transaction is already known, discard it hash := tx.Hash() if pool.all.Get(hash) != nil { @@ -809,9 +730,6 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e knownTxMeter.Mark(1) return false, txpool.ErrAlreadyKnown } - // Make the local flag. If it's from local source or it's from the network but - // the sender is marked as local previously, treat it as the local transaction. - isLocal := local || pool.locals.containsTx(tx) // If the transaction fails basic validation, discard it if err := pool.validateTx(tx); err != nil { @@ -848,13 +766,13 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e // Special transactions must also honor the reservation semantics to keep // the coordinator's ownership accounting balanced. if tx.IsSpecialTransaction() && pool.IsSigner(from) && pool.pendingNonces.get(from) == tx.Nonce() { - return pool.promoteSpecialTx(from, tx, isLocal) + return pool.promoteSpecialTx(from, tx) } // If the transaction pool is full, discard underpriced transactions if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue { // If the new transaction is underpriced, don't accept it - if !isLocal && pool.priced.Underpriced(tx) { + if pool.priced.Underpriced(tx) { log.Trace("Discarding underpriced transaction", "hash", hash, "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap()) underpricedTxMeter.Mark(1) return false, txpool.ErrUnderpriced @@ -869,19 +787,18 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e } // New transaction is better than our worse ones, make room for it. - // If it's a local transaction, forcibly discard all available transactions. - // Otherwise if we can't make enough room for new one, abort the operation. - drop, success := pool.priced.Discard(pool.all.Slots()-int(pool.config.GlobalSlots+pool.config.GlobalQueue)+numSlots(tx), isLocal) + // If we can't make enough room for new one, abort the operation. + drop, success := pool.priced.Discard(pool.all.Slots() - int(pool.config.GlobalSlots+pool.config.GlobalQueue) + numSlots(tx)) // Special case, we still can't make the room for the new remote one. - if !isLocal && !success { + if !success { log.Trace("Discarding overflown transaction", "hash", hash) overflowedTxMeter.Mark(1) return false, ErrTxPoolOverflow } // If the new transaction is a future transaction it should never churn pending transactions - if !isLocal && pool.isGapped(from, tx) { + if pool.isGapped(from, tx) { var replacesPending bool for _, dropTx := range drop { dropSender, _ := types.Sender(pool.signer, dropTx) @@ -893,7 +810,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e // Add all transactions back to the priced queue if replacesPending { for _, dropTx := range drop { - pool.priced.Put(dropTx, false) + pool.priced.Put(dropTx) } log.Trace("Discarding future transaction replacing pending tx", "hash", hash) return false, ErrFutureReplacePending @@ -926,9 +843,8 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e pool.priced.Removed(1) pendingReplaceMeter.Mark(1) } - pool.all.Add(tx, isLocal) - pool.priced.Put(tx, isLocal) - pool.journalTx(from, tx) + pool.all.Add(tx) + pool.priced.Put(tx) pool.queueTxEvent(tx) log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To()) @@ -937,20 +853,10 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e return old != nil, nil } // New transaction isn't replacing a pending one, push into queue - replaced, err = pool.enqueueTx(hash, tx, isLocal, true) + replaced, err = pool.enqueueTx(hash, tx, true) if err != nil { return false, err } - // Mark local addresses and journal local transactions - if local && !pool.locals.contains(from) { - log.Info("Setting new local account", "address", from) - pool.locals.add(from) - pool.priced.Removed(pool.all.RemoteToLocals(pool.locals)) // Migrate the remotes if it's marked as local first time. - } - if isLocal { - localGauge.Inc(1) - } - pool.journalTx(from, tx) log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To()) return replaced, nil @@ -983,7 +889,7 @@ func (pool *LegacyPool) isGapped(from common.Address, tx *types.Transaction) boo // enqueueTx inserts a new transaction into the non-executable transaction queue. // // Note, this method assumes the pool lock is held! -func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, local bool, addAll bool) (bool, error) { +func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, addAll bool) (bool, error) { // Try to insert the transaction into the future queue from, _ := types.Sender(pool.signer, tx) // already validated if pool.queue[from] == nil { @@ -1011,8 +917,8 @@ func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, local log.Error("Missing transaction in lookup set, please report the issue", "hash", hash) } if addAll { - pool.all.Add(tx, local) - pool.priced.Put(tx, local) + pool.all.Add(tx) + pool.priced.Put(tx) } // If we never record the heartbeat, do it right now. if _, exist := pool.beats[from]; !exist { @@ -1021,18 +927,6 @@ func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, local return old != nil, nil } -// journalTx adds the specified transaction to the local disk journal if it is -// deemed to have been sent from a local account. -func (pool *LegacyPool) journalTx(from common.Address, tx *types.Transaction) { - // Only journal if it's enabled and the transaction is local - if pool.journal == nil || !pool.locals.contains(from) { - return - } - if err := pool.journal.insert(tx); err != nil { - log.Warn("Failed to journal local transaction", "err", err) - } -} - // promoteTx adds a transaction to the pending (processable) list of transactions // and returns whether it was inserted or an older was better. // @@ -1071,7 +965,7 @@ func (pool *LegacyPool) promoteTx(addr common.Address, hash common.Hash, tx *typ return true } -func (pool *LegacyPool) promoteSpecialTx(addr common.Address, tx *types.Transaction, isLocal bool) (bool, error) { +func (pool *LegacyPool) promoteSpecialTx(addr common.Address, tx *types.Transaction) (bool, error) { list := pool.pending[addr] newPending := list == nil if newPending { @@ -1122,7 +1016,7 @@ func (pool *LegacyPool) promoteSpecialTx(addr common.Address, tx *types.Transact } // Failsafe to work around direct pending inserts (tests) if pool.all.Get(tx.Hash()) == nil { - pool.all.Add(tx, isLocal) + pool.all.Add(tx) } // Set the potentially new pending nonce and notify any subsystems of the new tx pool.beats[addr] = time.Now() @@ -1131,28 +1025,13 @@ func (pool *LegacyPool) promoteSpecialTx(addr common.Address, tx *types.Transact return true, nil } -// AddLocals enqueues a batch of transactions into the pool if they are valid, marking the -// senders as local ones, ensuring they go around the local pricing constraints. -// -// This method is used to add transactions from the RPC API and performs synchronous pool -// reorganization and event propagation. -func (pool *LegacyPool) addLocals(txs []*types.Transaction) []error { - return pool.Add(txs, !pool.config.NoLocals, true) -} - -// AddLocal enqueues a single local transaction into the pool if it is valid. This is -// a convenience wrapper around AddLocals. -func (pool *LegacyPool) addLocal(tx *types.Transaction) error { - return pool.addLocals([]*types.Transaction{tx})[0] -} - -// AddRemotes enqueues a batch of transactions into the pool if they are valid. If the -// senders are not among the locally tracked ones, full pricing constraints will apply. +// AddRemotes enqueues a batch of transactions into the pool if they are valid. +// Full pricing constraints will apply. // // This method is used to add transactions from the p2p network and does not wait for pool // reorganization and internal event propagation. func (pool *LegacyPool) AddRemotes(txs []*types.Transaction) []error { - return pool.Add(txs, false, false) + return pool.Add(txs, false) } // addRemote enqueues a single transaction into the pool if it is valid. This is a convenience @@ -1163,23 +1042,19 @@ func (pool *LegacyPool) addRemote(tx *types.Transaction) error { // AddRemotesSync is like AddRemotes, but waits for pool reorganization. Tests use this method. func (pool *LegacyPool) addRemotesSync(txs []*types.Transaction) []error { - return pool.Add(txs, false, true) + return pool.Add(txs, true) } // This is like AddRemotes with a single transaction, but waits for pool reorganization. Tests use this method. func (pool *LegacyPool) addRemoteSync(tx *types.Transaction) error { - return pool.Add([]*types.Transaction{tx}, false, true)[0] + return pool.Add([]*types.Transaction{tx}, true)[0] } -// Add enqueues a batch of transactions into the pool if they are valid. Depending -// on the local flag, full pricing constraints will or will not be applied. +// Add enqueues a batch of transactions into the pool if they are valid. // // If sync is set, the method will block until all internal maintenance related // to the add is finished. Only use this during tests for determinism! -func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error { - // Do not treat as local if local transactions have been disabled - local = local && !pool.config.NoLocals - +func (pool *LegacyPool) Add(txs []*types.Transaction, sync bool) []error { // Filter out known ones without obtaining the pool lock or recovering signatures var ( errs = make([]error, len(txs)) @@ -1195,7 +1070,7 @@ func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error // Exclude transactions with basic errors, e.g invalid signatures and // insufficient intrinsic gas as soon as possible and cache senders // in transactions before obtaining lock - if err := pool.validateTxBasics(tx, local); err != nil { + if err := pool.validateTxBasics(tx); err != nil { errs[i] = err invalidTxMeter.Mark(1) continue @@ -1209,7 +1084,7 @@ func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error // Process all the new transaction and merge any errors into the original slice pool.mu.Lock() - newErrs, dirtyAddrs := pool.addTxsLocked(news, local) + newErrs, dirtyAddrs := pool.addTxsLocked(news) pool.mu.Unlock() var nilSlot = 0 @@ -1230,11 +1105,11 @@ func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error // addTxsLocked attempts to queue a batch of transactions if they are valid. // The transaction pool lock must be held. -func (pool *LegacyPool) addTxsLocked(txs []*types.Transaction, local bool) ([]error, *accountSet) { +func (pool *LegacyPool) addTxsLocked(txs []*types.Transaction) ([]error, *accountSet) { dirty := newAccountSet(pool.signer) errs := make([]error, len(txs)) for i, tx := range txs { - replaced, err := pool.add(tx, local) + replaced, err := pool.add(tx) errs[i] = err if err == nil && !replaced { dirty.addTx(tx) @@ -1320,9 +1195,6 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo if outofbound { pool.priced.Removed(1) } - if pool.locals.contains(addr) { - localGauge.Dec(1) - } // Remove the transaction from the pending lists and reset the account nonce if pending := pool.pending[addr]; pending != nil { if removed, invalids := pending.Remove(tx); removed { @@ -1334,7 +1206,7 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo // Postpone any invalidated transactions for _, tx := range invalids { // Internal shuffle shouldn't touch the lookup set. - pool.enqueueTx(tx.Hash(), tx, false, false) + pool.enqueueTx(tx.Hash(), tx, false) } // Update the account nonce if needed pool.pendingNonces.setIfLower(addr, tx.Nonce()) @@ -1398,7 +1270,7 @@ func (pool *LegacyPool) scheduleReorgLoop() { launchNextRun bool reset *txpoolResetRequest dirtyAccounts *accountSet - queuedEvents = make(map[common.Address]*sortedMap) + queuedEvents = make(map[common.Address]*SortedMap) ) for { // Launch next background reorg if needed @@ -1411,7 +1283,7 @@ func (pool *LegacyPool) scheduleReorgLoop() { launchNextRun = false reset, dirtyAccounts = nil, nil - queuedEvents = make(map[common.Address]*sortedMap) + queuedEvents = make(map[common.Address]*SortedMap) } select { @@ -1440,7 +1312,7 @@ func (pool *LegacyPool) scheduleReorgLoop() { // request one later if they want the events sent. addr, _ := types.Sender(pool.signer, tx) if _, ok := queuedEvents[addr]; !ok { - queuedEvents[addr] = newSortedMap() + queuedEvents[addr] = NewSortedMap() } queuedEvents[addr].Put(tx) @@ -1459,7 +1331,7 @@ func (pool *LegacyPool) scheduleReorgLoop() { } // runReorg runs reset and promoteExecutables on behalf of scheduleReorgLoop. -func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirtyAccounts *accountSet, events map[common.Address]*sortedMap) { +func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirtyAccounts *accountSet, events map[common.Address]*SortedMap) { defer func(t0 time.Time) { reorgDurationTimer.Update(time.Since(t0)) }(time.Now()) @@ -1526,7 +1398,7 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, for _, tx := range promoted { addr, _ := types.Sender(pool.signer, tx) if _, ok := events[addr]; !ok { - events[addr] = newSortedMap() + events[addr] = NewSortedMap() } events[addr].Put(tx) } @@ -1636,7 +1508,7 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) { // Inject any transactions discarded due to reorgs log.Debug("Reinjecting stale transactions", "count", len(reinject)) core.SenderCacher().Recover(pool.signer, reinject) - pool.addTxsLocked(reinject, false) + pool.addTxsLocked(reinject) } // promoteExecutables moves transactions that have become processable from the @@ -1688,22 +1560,17 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T queuedGauge.Dec(int64(len(readies))) // Drop all transactions over the allowed limit - var caps types.Transactions - if !pool.locals.contains(addr) { - caps = list.Cap(int(pool.config.AccountQueue)) - for _, tx := range caps { - hash := tx.Hash() - pool.all.Remove(hash) - log.Trace("Removed cap-exceeding queued transaction", "hash", hash) - } - queuedRateLimitMeter.Mark(int64(len(caps))) + var caps = list.Cap(int(pool.config.AccountQueue)) + for _, tx := range caps { + hash := tx.Hash() + pool.all.Remove(hash) + log.Trace("Removed cap-exceeding queued transaction", "hash", hash) } + queuedRateLimitMeter.Mark(int64(len(caps))) // Mark all the items dropped as removed pool.priced.Removed(len(forwards) + len(drops) + len(caps)) queuedGauge.Dec(int64(len(forwards) + len(drops) + len(caps))) - if pool.locals.contains(addr) { - localGauge.Dec(int64(len(forwards) + len(drops) + len(caps))) - } + // Delete the entire queue entry if it became empty. if list.Empty() { delete(pool.queue, addr) @@ -1734,14 +1601,14 @@ func (pool *LegacyPool) truncatePending() { spammers := prque.New[int64, common.Address](nil) for addr, list := range pool.pending { // Only evict transactions from high rollers - if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots { + if uint64(list.Len()) > pool.config.AccountSlots { spammers.Push(addr, int64(list.Len())) } } // Gradually drop transactions from offenders offenders := []common.Address{} for pending > pool.config.GlobalSlots && !spammers.Empty() { - // Retrieve the next offender if not local address + // Retrieve the next offender offender, _ := spammers.Pop() offenders = append(offenders, offender) @@ -1767,9 +1634,6 @@ func (pool *LegacyPool) truncatePending() { } pool.priced.Removed(len(caps)) pendingGauge.Dec(int64(len(caps))) - if pool.locals.contains(offenders[i]) { - localGauge.Dec(int64(len(caps))) - } pending-- } } @@ -1794,9 +1658,6 @@ func (pool *LegacyPool) truncatePending() { } pool.priced.Removed(len(caps)) pendingGauge.Dec(int64(len(caps))) - if pool.locals.contains(addr) { - localGauge.Dec(int64(len(caps))) - } pending-- } } @@ -1817,9 +1678,7 @@ func (pool *LegacyPool) truncateQueue() { // Sort all accounts with queued transactions by heartbeat addresses := make(addressesByHeartbeat, 0, len(pool.queue)) for addr := range pool.queue { - if !pool.locals.contains(addr) { // don't drop locals - addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]}) - } + addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]}) } sort.Sort(sort.Reverse(addresses)) @@ -1887,13 +1746,11 @@ func (pool *LegacyPool) demoteUnexecutables() { log.Trace("Demoting pending transaction", "hash", hash) // Internal shuffle shouldn't touch the lookup set. - pool.enqueueTx(hash, tx, false, false) + pool.enqueueTx(hash, tx, false) } pool.priced.Removed(len(olds) + len(drops)) pendingGauge.Dec(int64(len(olds) + len(drops) + len(invalids))) - if pool.locals.contains(addr) { - localGauge.Dec(int64(len(olds) + len(drops) + len(invalids))) - } + // If there's a gap in front, alert (should never happen) and postpone all transactions if list.Len() > 0 && list.txs.Get(nonce) == nil { gapped := list.Cap(0) @@ -1902,7 +1759,7 @@ func (pool *LegacyPool) demoteUnexecutables() { log.Warn("Demoting invalidated transaction", "hash", hash) // Internal shuffle shouldn't touch the lookup set. - pool.enqueueTx(hash, tx, false, false) + pool.enqueueTx(hash, tx, false) } pendingGauge.Dec(int64(len(gapped))) } @@ -1960,25 +1817,6 @@ func newAccountSet(signer types.Signer, addrs ...common.Address) *accountSet { return as } -// contains checks if a given address is contained within the set. -func (as *accountSet) contains(addr common.Address) bool { - _, exist := as.accounts[addr] - return exist -} - -func (as *accountSet) empty() bool { - return len(as.accounts) == 0 -} - -// containsTx checks if the sender of a given tx is within the set. If the sender -// cannot be derived, this method returns false. -func (as *accountSet) containsTx(tx *types.Transaction) bool { - if addr, err := types.Sender(as.signer, tx); err == nil { - return as.contains(addr) - } - return false -} - // add inserts a new address into the set to track. func (as *accountSet) add(addr common.Address) { as.accounts[addr] = struct{}{} @@ -2008,23 +1846,19 @@ func (as *accountSet) merge(other *accountSet) { as.cache = nil } -// lookup is used internally by TxPool to track transactions while allowing +// lookup is used internally by LegacyPool to track transactions while allowing // lookup without mutex contention. // // Note, although this type is properly protected against concurrent access, it // is **not** a type that should ever be mutated or even exposed outside of the // transaction pool, since its internal state is tightly coupled with the pools // internal mechanisms. The sole purpose of the type is to permit out-of-bound -// peeking into the pool in TxPool.Get without having to acquire the widely scoped -// TxPool.mu mutex. -// -// This lookup set combines the notion of "local transactions", which is useful -// to build upper-level structure. +// peeking into the pool in LegacyPool.Get without having to acquire the widely scoped +// LegacyPool.mu mutex. type lookup struct { - slots int - lock sync.RWMutex - locals map[common.Hash]*types.Transaction - remotes map[common.Hash]*types.Transaction + slots int + lock sync.RWMutex + txs map[common.Hash]*types.Transaction auths map[common.Address][]common.Hash // All accounts with a pooled authorization } @@ -2032,31 +1866,21 @@ type lookup struct { // newLookup returns a new lookup structure. func newLookup() *lookup { return &lookup{ - locals: make(map[common.Hash]*types.Transaction), - remotes: make(map[common.Hash]*types.Transaction), - auths: make(map[common.Address][]common.Hash), + txs: make(map[common.Hash]*types.Transaction), + auths: make(map[common.Address][]common.Hash), } } // Range calls f on each key and value present in the map. The callback passed // should return the indicator whether the iteration needs to be continued. // Callers need to specify which set (or both) to be iterated. -func (t *lookup) Range(f func(hash common.Hash, tx *types.Transaction, local bool) bool, local bool, remote bool) { +func (t *lookup) Range(f func(hash common.Hash, tx *types.Transaction) bool) { t.lock.RLock() defer t.lock.RUnlock() - if local { - for key, value := range t.locals { - if !f(key, value, true) { - return - } - } - } - if remote { - for key, value := range t.remotes { - if !f(key, value, false) { - return - } + for key, value := range t.txs { + if !f(key, value) { + return } } } @@ -2066,26 +1890,7 @@ func (t *lookup) Get(hash common.Hash) *types.Transaction { t.lock.RLock() defer t.lock.RUnlock() - if tx := t.locals[hash]; tx != nil { - return tx - } - return t.remotes[hash] -} - -// GetLocal returns a transaction if it exists in the lookup, or nil if not found. -func (t *lookup) GetLocal(hash common.Hash) *types.Transaction { - t.lock.RLock() - defer t.lock.RUnlock() - - return t.locals[hash] -} - -// GetRemote returns a transaction if it exists in the lookup, or nil if not found. -func (t *lookup) GetRemote(hash common.Hash) *types.Transaction { - t.lock.RLock() - defer t.lock.RUnlock() - - return t.remotes[hash] + return t.txs[hash] } // Count returns the current number of transactions in the lookup. @@ -2093,23 +1898,7 @@ func (t *lookup) Count() int { t.lock.RLock() defer t.lock.RUnlock() - return len(t.locals) + len(t.remotes) -} - -// LocalCount returns the current number of local transactions in the lookup. -func (t *lookup) LocalCount() int { - t.lock.RLock() - defer t.lock.RUnlock() - - return len(t.locals) -} - -// RemoteCount returns the current number of remote transactions in the lookup. -func (t *lookup) RemoteCount() int { - t.lock.RLock() - defer t.lock.RUnlock() - - return len(t.remotes) + return len(t.txs) } // Slots returns the current number of slots used in the lookup. @@ -2121,18 +1910,14 @@ func (t *lookup) Slots() int { } // Add adds a transaction to the lookup. -func (t *lookup) Add(tx *types.Transaction, local bool) { +func (t *lookup) Add(tx *types.Transaction) { t.lock.Lock() defer t.lock.Unlock() t.slots += numSlots(tx) slotsGauge.Update(int64(t.slots)) - if local { - t.locals[tx.Hash()] = tx - } else { - t.remotes[tx.Hash()] = tx - } + t.txs[tx.Hash()] = tx t.addAuthorities(tx) } @@ -2142,10 +1927,7 @@ func (t *lookup) Remove(hash common.Hash) { defer t.lock.Unlock() t.removeAuthorities(hash) - tx, ok := t.locals[hash] - if !ok { - tx, ok = t.remotes[hash] - } + tx, ok := t.txs[hash] if !ok { log.Error("No transaction found to be deleted", "hash", hash) return @@ -2153,36 +1935,18 @@ func (t *lookup) Remove(hash common.Hash) { t.slots -= numSlots(tx) slotsGauge.Update(int64(t.slots)) - delete(t.locals, hash) - delete(t.remotes, hash) -} - -// RemoteToLocals migrates the transactions belongs to the given locals to locals -// set. The assumption is held the locals set is thread-safe to be used. -func (t *lookup) RemoteToLocals(locals *accountSet) int { - t.lock.Lock() - defer t.lock.Unlock() - - var migrated int - for hash, tx := range t.remotes { - if locals.containsTx(tx) { - t.locals[hash] = tx - delete(t.remotes, hash) - migrated += 1 - } - } - return migrated + delete(t.txs, hash) } -// RemotesBelowTip finds all remote transactions below the given tip threshold. -func (t *lookup) RemotesBelowTip(threshold *big.Int) types.Transactions { +// TxsBelowTip finds all remote transactions below the given tip threshold. +func (t *lookup) TxsBelowTip(threshold *big.Int) types.Transactions { found := make(types.Transactions, 0, 128) - t.Range(func(hash common.Hash, tx *types.Transaction, local bool) bool { + t.Range(func(hash common.Hash, tx *types.Transaction) bool { if tx.GasTipCapIntCmp(threshold) < 0 { found = append(found, tx) } return true - }, false, true) // Only iterate remotes + }) return found } @@ -2257,11 +2021,7 @@ func (pool *LegacyPool) Clear() { // The transaction addition may attempt to reserve the sender addr which // can't happen until Clear releases the reservation lock. Clear cannot // acquire the subpool lock until the transaction addition is completed. - for _, tx := range pool.all.locals { - senderAddr, _ := types.Sender(pool.signer, tx) - pool.reserver.Release(senderAddr) - } - for _, tx := range pool.all.remotes { + for _, tx := range pool.all.txs { senderAddr, _ := types.Sender(pool.signer, tx) pool.reserver.Release(senderAddr) } diff --git a/core/txpool/legacypool/legacypool_test.go b/core/txpool/legacypool/legacypool_test.go index c02bb98cdbd6..f06812331606 100644 --- a/core/txpool/legacypool/legacypool_test.go +++ b/core/txpool/legacypool/legacypool_test.go @@ -24,7 +24,6 @@ import ( "math" "math/big" "math/rand" - "os" "sync/atomic" "testing" "time" @@ -217,7 +216,7 @@ func validatePoolInternals(pool *LegacyPool) error { return fmt.Errorf("total transaction count %d != %d pending + %d queued", total, pending, queued) } pool.priced.Reheap() - priced, remote := pool.priced.urgent.Len()+pool.priced.floating.Len(), pool.all.RemoteCount() + priced, remote := pool.priced.urgent.Len()+pool.priced.floating.Len(), pool.all.Count() if priced != remote { return fmt.Errorf("total priced transaction count %d != %d", priced, remote) } @@ -293,7 +292,7 @@ func TestPromoteSpecialTxUpdatesTotalCost(t *testing.T) { if !inserted { t.Fatal("failed to insert baseline transaction") } - if _, err := pool.promoteSpecialTx(addr, special, false); err != nil { + if _, err := pool.promoteSpecialTx(addr, special); err != nil { t.Fatalf("promoteSpecialTx failed: %v", err) } want, overflow := uint256.FromBig(special.Cost()) @@ -383,7 +382,7 @@ func TestPromoteSpecialTxReplacementAvoidsIntermediateOverflow(t *testing.T) { if !inserted { t.Fatal("failed to insert baseline transaction") } - inserted, err = pool.promoteSpecialTx(addr, special, false) + inserted, err = pool.promoteSpecialTx(addr, special) if err != nil { t.Fatalf("promoteSpecialTx failed: %v", err) } @@ -421,7 +420,7 @@ func TestPromoteSpecialTxOverflowReturnsErrorWithoutMutation(t *testing.T) { pool.mu.Lock() defer pool.mu.Unlock() - inserted, err := pool.promoteSpecialTx(addr, special, false) + inserted, err := pool.promoteSpecialTx(addr, special) if inserted { t.Fatal("overflowing special tx must not be inserted") } @@ -558,10 +557,6 @@ func TestInvalidTransactions(t *testing.T) { if err, want := pool.addRemote(tx), txpool.ErrUnderpriced; !errors.Is(err, want) { t.Errorf("want %v have %v", want, err) } - // Local transactions should be accepted even if underpriced - if err := pool.addLocal(tx); err != nil { - t.Error("expected", nil, "got", err) - } } func TestQueue(t *testing.T) { @@ -575,7 +570,7 @@ func TestQueue(t *testing.T) { testAddBalance(pool, from, big.NewInt(1000)) <-pool.requestReset(nil, nil) - pool.enqueueTx(tx.Hash(), tx, false, true) + pool.enqueueTx(tx.Hash(), tx, true) <-pool.requestPromoteExecutables(newAccountSet(pool.signer, from)) if len(pool.pending) != 1 { t.Error("expected valid txs to be 1 is", len(pool.pending)) @@ -584,7 +579,7 @@ func TestQueue(t *testing.T) { tx = transaction(1, 100, key) from, _ = deriveSender(tx) testSetNonce(pool, from, 2) - pool.enqueueTx(tx.Hash(), tx, false, true) + pool.enqueueTx(tx.Hash(), tx, true) <-pool.requestPromoteExecutables(newAccountSet(pool.signer, from)) if _, ok := pool.pending[from].txs.items[tx.Nonce()]; ok { @@ -609,9 +604,9 @@ func TestQueue2(t *testing.T) { testAddBalance(pool, from, big.NewInt(1000)) pool.reset(nil, nil) - pool.enqueueTx(tx1.Hash(), tx1, false, true) - pool.enqueueTx(tx2.Hash(), tx2, false, true) - pool.enqueueTx(tx3.Hash(), tx3, false, true) + pool.enqueueTx(tx1.Hash(), tx1, true) + pool.enqueueTx(tx2.Hash(), tx2, true) + pool.enqueueTx(tx3.Hash(), tx3, true) pool.promoteExecutables([]common.Address{from}) if len(pool.pending) != 1 { @@ -693,7 +688,7 @@ func TestValidateTransactionEIP2681(t *testing.T) { GasPrice: tt.gasPrice, }) signedTx, _ := types.SignTx(tx, types.HomesteadSigner{}, key) - err := pool.validateTxBasics(signedTx, true) + err := pool.validateTxBasics(signedTx) if tt.wantErr == nil && err != nil { t.Errorf("expected nil, got %v", err) @@ -754,14 +749,14 @@ func TestChainFork(t *testing.T) { resetState() tx := pricedTransaction(0, 100000, big.NewInt(300000000), key) - if _, err := pool.add(tx, false); err != nil { + if _, err := pool.add(tx); err != nil { t.Error("didn't expect error", err) } pool.removeTx(tx.Hash(), true, true) // reset the pool's internal state resetState() - if _, err := pool.add(tx, false); err != nil { + if _, err := pool.add(tx); err != nil { t.Error("didn't expect error", err) } } @@ -790,10 +785,10 @@ func TestDoubleNonce(t *testing.T) { tx3, _ := types.SignTx(types.NewTransaction(0, common.Address{}, big.NewInt(100), 1000000, big.NewInt(250000000), nil), signer, key) // Add the first two transaction, ensure higher priced stays only - if replace, err := pool.add(tx1, false); err != nil || replace { + if replace, err := pool.add(tx1); err != nil || replace { t.Errorf("first transaction insert failed (%v) or reported replacement (%v)", err, replace) } - if replace, err := pool.add(tx2, false); err != nil || !replace { + if replace, err := pool.add(tx2); err != nil || !replace { t.Errorf("second transaction insert failed (%v) or not reported replacement (%v)", err, replace) } <-pool.requestPromoteExecutables(newAccountSet(signer, addr)) @@ -805,7 +800,7 @@ func TestDoubleNonce(t *testing.T) { } // Add the third transaction and ensure it's not saved (smaller price) - pool.add(tx3, false) + pool.add(tx3) <-pool.requestPromoteExecutables(newAccountSet(signer, addr)) if pool.pending[addr].Len() != 1 { t.Error("expected 1 pending transactions, got", pool.pending[addr].Len()) @@ -828,7 +823,7 @@ func TestMissingNonce(t *testing.T) { addr := crypto.PubkeyToAddress(key.PublicKey) testAddBalance(pool, addr, big.NewInt(100000000000000)) tx := pricedTransaction(1, 100000, big.NewInt(300000000), key) - if _, err := pool.add(tx, false); err != nil { + if _, err := pool.add(tx); err != nil { t.Error("didn't expect error", err) } if len(pool.pending) != 0 { @@ -887,21 +882,21 @@ func TestDropping(t *testing.T) { tx11 = transaction(11, 200, key) tx12 = transaction(12, 300, key) ) - pool.all.Add(tx0, false) - pool.priced.Put(tx0, false) + pool.all.Add(tx0) + pool.priced.Put(tx0) pool.promoteTx(account, tx0.Hash(), tx0) - pool.all.Add(tx1, false) - pool.priced.Put(tx1, false) + pool.all.Add(tx1) + pool.priced.Put(tx1) pool.promoteTx(account, tx1.Hash(), tx1) - pool.all.Add(tx2, false) - pool.priced.Put(tx2, false) + pool.all.Add(tx2) + pool.priced.Put(tx2) pool.promoteTx(account, tx2.Hash(), tx2) - pool.enqueueTx(tx10.Hash(), tx10, false, true) - pool.enqueueTx(tx11.Hash(), tx11, false, true) - pool.enqueueTx(tx12.Hash(), tx12, false, true) + pool.enqueueTx(tx10.Hash(), tx10, true) + pool.enqueueTx(tx11.Hash(), tx11, true) + pool.enqueueTx(tx12.Hash(), tx12, true) // Check that pre and post validations leave the pool as is if pool.pending[account].Len() != 3 { @@ -1178,14 +1173,6 @@ func TestQueueAccountLimiting(t *testing.T) { // This logic should not hold for local transactions, unless the local tracking // mechanism is disabled. func TestQueueGlobalLimiting(t *testing.T) { - testQueueGlobalLimiting(t, false) -} - -func TestQueueGlobalLimitingNoLocals(t *testing.T) { - testQueueGlobalLimiting(t, true) -} - -func testQueueGlobalLimiting(t *testing.T, nolocals bool) { t.Parallel() // Create the pool to test the limit enforcement with @@ -1193,7 +1180,7 @@ func testQueueGlobalLimiting(t *testing.T, nolocals bool) { blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) config := testTxPoolConfig - config.NoLocals = nolocals + config.NoLocals = true config.AccountQueue = 1 config.GlobalQueue = config.AccountQueue*3 - 1 // reduce the queue limits to shorten test time (-1 to make it non divisible) @@ -1207,7 +1194,6 @@ func testQueueGlobalLimiting(t *testing.T, nolocals bool) { keys[i], _ = crypto.GenerateKey() testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(100000000000000)) } - local := keys[len(keys)-1] // Generate and queue a batch of transactions nonces := make(map[common.Address]uint64) @@ -1233,35 +1219,6 @@ func testQueueGlobalLimiting(t *testing.T, nolocals bool) { if queued > int(config.GlobalQueue) { t.Fatalf("total transactions overflow allowance: %d > %d", queued, config.GlobalQueue) } - // Generate a batch of transactions from the local account and import them - txs = txs[:0] - for i := uint64(0); i < 3*config.GlobalQueue; i++ { - txs = append(txs, pricedTransaction(i+1, 100000, big.NewInt(300000000), local)) - } - pool.addLocals(txs) - - // If locals are disabled, the previous eviction algorithm should apply here too - if nolocals { - queued := 0 - for addr, list := range pool.queue { - if list.Len() > int(config.AccountQueue) { - t.Errorf("addr %x: queued accounts overflown allowance: %d > %d", addr, list.Len(), config.AccountQueue) - } - queued += list.Len() - } - if queued > int(config.GlobalQueue) { - t.Fatalf("total transactions overflow allowance: %d > %d", queued, config.GlobalQueue) - } - } else { - // Local exemptions are enabled, make sure the local account owned the queue - if len(pool.queue) != 1 { - t.Errorf("multiple accounts in queue: have %v, want %v", len(pool.queue), 1) - } - // Also ensure no local transactions are ever dropped, even if above global limits - if queued := pool.queue[crypto.PubkeyToAddress(local.PublicKey)].Len(); uint64(queued) != 3*config.GlobalQueue { - t.Fatalf("local account queued transaction count mismatch: have %v, want %v", queued, 3*config.GlobalQueue) - } - } } // Tests that if an account remains idle for a prolonged amount of time, any @@ -1270,12 +1227,7 @@ func testQueueGlobalLimiting(t *testing.T, nolocals bool) { // // This logic should not hold for local transactions, unless the local tracking // mechanism is disabled. -func TestQueueTimeLimiting(t *testing.T) { testQueueTimeLimiting(t, false) } -func TestQueueTimeLimitingNoLocals(t *testing.T) { - testQueueTimeLimiting(t, true) -} - -func testQueueTimeLimiting(t *testing.T, nolocals bool) { +func TestQueueTimeLimiting(t *testing.T) { // Reduce the eviction interval to a testable amount defer func(old time.Duration) { evictionInterval = old }(evictionInterval) evictionInterval = time.Millisecond * 100 @@ -1286,23 +1238,17 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) { config := testTxPoolConfig config.Lifetime = time.Second - config.NoLocals = nolocals pool := New(config, blockchain) pool.Init(config.PriceLimit, blockchain.CurrentBlock(), newReserver()) defer pool.Close() - // Create two test accounts to ensure remotes expire but locals do not - local, _ := crypto.GenerateKey() + // Create a test account to ensure remotes expire remote, _ := crypto.GenerateKey() - testAddBalance(pool, crypto.PubkeyToAddress(local.PublicKey), big.NewInt(100000000000000)) testAddBalance(pool, crypto.PubkeyToAddress(remote.PublicKey), big.NewInt(100000000000000)) - // Add the two transactions and ensure they both are queued up - if err := pool.addLocal(pricedTransaction(1, 100000, big.NewInt(300000000), local)); err != nil { - t.Fatalf("failed to add local transaction: %v", err) - } + // Add the transaction and ensure it is queued up if err := pool.addRemote(pricedTransaction(1, 100000, big.NewInt(300000000), remote)); err != nil { t.Fatalf("failed to add remote transaction: %v", err) } @@ -1310,8 +1256,8 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) { if pending != 0 { t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0) } - if queued != 2 { - t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2) + if queued != 1 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1) } if err := validatePoolInternals(pool); err != nil { t.Fatalf("pool internal state corrupted: %v", err) @@ -1325,8 +1271,8 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) { if pending != 0 { t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0) } - if queued != 2 { - t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2) + if queued != 1 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1) } if err := validatePoolInternals(pool); err != nil { t.Fatalf("pool internal state corrupted: %v", err) @@ -1339,14 +1285,8 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) { if pending != 0 { t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0) } - if nolocals { - if queued != 0 { - t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0) - } - } else { - if queued != 1 { - t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1) - } + if queued != 0 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0) } if err := validatePoolInternals(pool); err != nil { t.Fatalf("pool internal state corrupted: %v", err) @@ -1354,7 +1294,6 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) { // remove current transactions and increase nonce to prepare for a reset and cleanup statedb.SetNonce(crypto.PubkeyToAddress(remote.PublicKey), 2) - statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 2) <-pool.requestReset(nil, nil) // make sure queue, pending are cleared @@ -1370,18 +1309,12 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) { } // Queue gapped transactions - if err := pool.addLocal(pricedTransaction(4, 100000, big.NewInt(300000000), local)); err != nil { - t.Fatalf("failed to add remote transaction: %v", err) - } if err := pool.addRemoteSync(pricedTransaction(4, 100000, big.NewInt(300000000), remote)); err != nil { t.Fatalf("failed to add remote transaction: %v", err) } time.Sleep(5 * evictionInterval) // A half lifetime pass // Queue executable transactions, the life cycle should be restarted. - if err := pool.addLocal(pricedTransaction(2, 100000, big.NewInt(300000000), local)); err != nil { - t.Fatalf("failed to add remote transaction: %v", err) - } if err := pool.addRemoteSync(pricedTransaction(2, 100000, big.NewInt(300000000), remote)); err != nil { t.Fatalf("failed to add remote transaction: %v", err) } @@ -1389,11 +1322,11 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) { // All gapped transactions shouldn't be kicked out pending, queued = pool.Stats() - if pending != 2 { - t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2) + if pending != 1 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 1) } - if queued != 2 { - t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2) + if queued != 1 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1) } if err := validatePoolInternals(pool); err != nil { t.Fatalf("pool internal state corrupted: %v", err) @@ -1402,17 +1335,11 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) { // The whole life time pass after last promotion, kick out stale transactions time.Sleep(2 * config.Lifetime) pending, queued = pool.Stats() - if pending != 2 { - t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2) + if pending != 1 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 1) } - if nolocals { - if queued != 0 { - t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0) - } - } else { - if queued != 1 { - t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1) - } + if queued != 0 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0) } if err := validatePoolInternals(pool); err != nil { t.Fatalf("pool internal state corrupted: %v", err) @@ -1675,7 +1602,7 @@ func TestRepricing(t *testing.T) { defer sub.Unsubscribe() // Create a number of test accounts and fund them - keys := make([]*ecdsa.PrivateKey, 4) + keys := make([]*ecdsa.PrivateKey, 3) for i := 0; i < len(keys); i++ { keys[i], _ = crypto.GenerateKey() testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(200000000000000)) @@ -1695,20 +1622,17 @@ func TestRepricing(t *testing.T) { txs = append(txs, pricedTransaction(2, 100000, big.NewInt(250000000), keys[2])) txs = append(txs, pricedTransaction(3, 100000, big.NewInt(500000000), keys[2])) - ltx := pricedTransaction(0, 100000, big.NewInt(250000000), keys[3]) - // Import the batch and that both pending and queued transactions match up pool.addRemotesSync(txs) - pool.addLocal(ltx) pending, queued := pool.Stats() - if pending != 7 { - t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 7) + if pending != 6 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 6) } if queued != 3 { t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 3) } - if err := validateEvents(events, 7); err != nil { + if err := validateEvents(events, 6); err != nil { t.Fatalf("original event firing failed: %v", err) } if err := validatePoolInternals(pool); err != nil { @@ -1718,8 +1642,8 @@ func TestRepricing(t *testing.T) { pool.SetGasTip(big.NewInt(500000000)) pending, queued = pool.Stats() - if pending != 2 { - t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2) + if pending != 1 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 1) } if queued != 5 { t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 5) @@ -1746,21 +1670,7 @@ func TestRepricing(t *testing.T) { if err := validatePoolInternals(pool); err != nil { t.Fatalf("pool internal state corrupted: %v", err) } - // However we can add local underpriced transactions - tx := pricedTransaction(1, 100000, big.NewInt(250000000), keys[3]) - if err := pool.addLocal(tx); err != nil { - t.Fatalf("failed to add underpriced local transaction: %v", err) - } - if pending, _ = pool.Stats(); pending != 3 { - t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3) - } - if err := validateEvents(events, 1); err != nil { - t.Fatalf("post-reprice local event firing failed: %v", err) - } - if err := validatePoolInternals(pool); err != nil { - t.Fatalf("pool internal state corrupted: %v", err) - } - // And we can fill gaps with properly priced transactions + // we can fill gaps with properly priced transactions if err := pool.addRemote(pricedTransaction(1, 100000, big.NewInt(500000000), keys[0])); err != nil { t.Fatalf("failed to add pending transaction: %v", err) } @@ -1802,29 +1712,16 @@ func TestMinGasPriceEnforced(t *testing.T) { tx := pricedTransaction(0, 100000, legacyPrice, key) pool.SetGasTip(new(big.Int).Add(legacyPrice, big.NewInt(1))) - if err := pool.addLocal(tx); !errors.Is(err, txpool.ErrUnderpriced) { - t.Fatalf("Min tip not enforced") - } - - if err := pool.Add([]*types.Transaction{tx}, true, false)[0]; !errors.Is(err, txpool.ErrUnderpriced) { + if err := pool.Add([]*types.Transaction{tx}, false)[0]; !errors.Is(err, txpool.ErrUnderpriced) { t.Fatalf("Min tip not enforced") } tx = dynamicFeeTx(0, 100000, dynamicFeeCap, dynamicTip, key) pool.SetGasTip(new(big.Int).Add(dynamicTip, big.NewInt(1))) - if err := pool.addLocal(tx); !errors.Is(err, txpool.ErrUnderpriced) { - t.Fatalf("Min tip not enforced") - } - - if err := pool.Add([]*types.Transaction{tx}, true, false)[0]; !errors.Is(err, txpool.ErrUnderpriced) { + if err := pool.Add([]*types.Transaction{tx}, true)[0]; !errors.Is(err, txpool.ErrUnderpriced) { t.Fatalf("Min tip not enforced") } - // Make sure the tx is accepted if locals are enabled - pool.config.NoLocals = false - if err := pool.Add([]*types.Transaction{tx}, true, false)[0]; err != nil { - t.Fatalf("Min tip enforced with locals enabled, error: %v", err) - } } // Tests that setting the transaction pool gas price to a higher value correctly @@ -1865,20 +1762,17 @@ func TestRepricingDynamicFee(t *testing.T) { txs = append(txs, dynamicFeeTx(2, 100000, big.NewInt(300000000), big.NewInt(300000000), keys[2])) txs = append(txs, dynamicFeeTx(3, 100000, big.NewInt(350000000), big.NewInt(350000000), keys[2])) - ltx := dynamicFeeTx(0, 100000, big.NewInt(350000000), big.NewInt(300000000), keys[3]) - // Import the batch and that both pending and queued transactions match up pool.addRemotesSync(txs) - pool.addLocal(ltx) pending, queued := pool.Stats() - if pending != 7 { - t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 7) + if pending != 6 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 6) } if queued != 3 { t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 3) } - if err := validateEvents(events, 7); err != nil { + if err := validateEvents(events, 6); err != nil { t.Fatalf("original event firing failed: %v", err) } if err := validatePoolInternals(pool); err != nil { @@ -1888,8 +1782,8 @@ func TestRepricingDynamicFee(t *testing.T) { pool.SetGasTip(big.NewInt(350000000)) pending, queued = pool.Stats() - if pending != 2 { - t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2) + if pending != 1 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 1) } if queued != 5 { t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 5) @@ -1919,20 +1813,7 @@ func TestRepricingDynamicFee(t *testing.T) { if err := validatePoolInternals(pool); err != nil { t.Fatalf("pool internal state corrupted: %v", err) } - // However we can add local underpriced transactions - tx = dynamicFeeTx(1, 100000, big.NewInt(300000000), big.NewInt(300000000), keys[3]) - if err := pool.addLocal(tx); err != nil { - t.Fatalf("failed to add underpriced local transaction: %v", err) - } - if pending, _ = pool.Stats(); pending != 3 { - t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3) - } - if err := validateEvents(events, 1); err != nil { - t.Fatalf("post-reprice local event firing failed: %v", err) - } - if err := validatePoolInternals(pool); err != nil { - t.Fatalf("pool internal state corrupted: %v", err) - } + // And we can fill gaps with properly priced transactions tx = pricedTransaction(1, 100000, big.NewInt(350000000), keys[0]) if err := pool.addRemote(tx); err != nil { @@ -1954,78 +1835,6 @@ func TestRepricingDynamicFee(t *testing.T) { } } -// Tests that setting the transaction pool gas price to a higher value does not -// remove local transactions (legacy & dynamic fee). -func TestRepricingKeepsLocals(t *testing.T) { - t.Parallel() - - // Create the pool to test the pricing enforcement with - statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase())) - blockchain := newTestBlockChain(eip1559Config, 1000000, statedb, new(event.Feed)) - - pool := New(testTxPoolConfig, blockchain) - pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver()) - defer pool.Close() - - // Create a number of test accounts and fund them - keys := make([]*ecdsa.PrivateKey, 3) - for i := 0; i < len(keys); i++ { - keys[i], _ = crypto.GenerateKey() - testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1500000000000000)) - } - // Create transaction (both pending and queued) with a linearly growing gasprice - // common.LimitThresholdNonceInQueue = 10 - for i := uint64(0); i < 5; i++ { - // Add pending transaction. - pendingTx := pricedTransaction(i, 100000, big.NewInt(int64((i+1)*250000000)), keys[2]) - if err := pool.addLocal(pendingTx); err != nil { - t.Fatal(err) - } - // Add queued transaction. - queuedTx := pricedTransaction(i+6, 100000, big.NewInt(int64((i+1)*250000000)), keys[2]) - if err := pool.addLocal(queuedTx); err != nil { - t.Fatal(err) - } - - // Add pending dynamic fee transaction. - pendingTx = dynamicFeeTx(i, 100000, big.NewInt(int64((i+1)*250000000)), big.NewInt(int64((i+1)*250000000)), keys[1]) - if err := pool.addLocal(pendingTx); err != nil { - t.Fatal(err) - } - // Add queued dynamic fee transaction. - queuedTx = dynamicFeeTx(i+6, 100000, big.NewInt(int64((i+1)*250000000)), big.NewInt(int64((i+1)*250000000)), keys[1]) - if err := pool.addLocal(queuedTx); err != nil { - t.Fatal(err) - } - } - pending, queued := pool.Stats() - expPending, expQueued := 10, 10 - validate := func() { - pending, queued = pool.Stats() - if pending != expPending { - t.Fatalf("pending transactions mismatched: have %d, want %d", pending, expPending) - } - if queued != expQueued { - t.Fatalf("queued transactions mismatched: have %d, want %d", queued, expQueued) - } - - if err := validatePoolInternals(pool); err != nil { - t.Fatalf("pool internal state corrupted: %v", err) - } - } - validate() - - // Reprice the pool and check that nothing is dropped - pool.SetGasTip(big.NewInt(500000000)) - validate() - - pool.SetGasTip(big.NewInt(500000000)) - pool.SetGasTip(big.NewInt(1000000000)) - pool.SetGasTip(big.NewInt(2000000000)) - pool.SetGasTip(big.NewInt(25000000000)) - validate() -} - // Tests that when the pool reaches its global transaction limit, underpriced // transactions are gradually shifted out for more expensive ones and any gapped // pending transactions are moved into the queue. @@ -2055,24 +1864,18 @@ func TestUnderpricing(t *testing.T) { keys := make([]*ecdsa.PrivateKey, 5) for i := 0; i < len(keys); i++ { keys[i], _ = crypto.GenerateKey() - testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(600000000000000)) + testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(6000000000000000)) } // Generate and queue a batch of transactions, both pending and queued - // Gas prices follow an intentional stepped pattern (250M, 500M, 750M, 1000M, 1250M) to test - // transaction replacement and eviction logic based on price competition. The 1:5 ratio - // (250M to 1250M) ensures clear differentiation when testing underpricing behavior. txs := types.Transactions{} - txs = append(txs, pricedTransaction(0, 100000, big.NewInt(250000000), keys[0])) - txs = append(txs, pricedTransaction(1, 100000, big.NewInt(500000000), keys[0])) - - txs = append(txs, pricedTransaction(1, 100000, big.NewInt(250000000), keys[1])) - - ltx := pricedTransaction(0, 100000, big.NewInt(250000000), keys[2]) + txs = append(txs, pricedTransaction(0, 100000, big.NewInt(250000000), keys[0])) // pending + txs = append(txs, pricedTransaction(1, 100000, big.NewInt(500000000), keys[0])) // pending + txs = append(txs, pricedTransaction(0, 100000, big.NewInt(250000000), keys[2])) // pending + txs = append(txs, pricedTransaction(1, 100000, big.NewInt(250000000), keys[1])) // queued // Import the batch and that both pending and queued transactions match up - pool.AddRemotes(txs) - pool.addLocal(ltx) + pool.addRemotesSync(txs) pending, queued := pool.Stats() if pending != 3 { @@ -2102,44 +1905,23 @@ func TestUnderpricing(t *testing.T) { if err := pool.addRemoteSync(pricedTransaction(2, 100000, big.NewInt(1000000000), keys[1])); err != nil { // +K1:2 => -K0:0 => Pend K1:0, K2:0; Que K0:1 K1:2 t.Fatalf("failed to add well priced transaction: %v", err) } - if err := pool.addRemote(pricedTransaction(3, 100000, big.NewInt(1250000000), keys[1])); err != nil { // +K1:3 => -K0:1 => Pend K1:0, K2:0; Que K1:2 K1:3 + if err := pool.addRemoteSync(pricedTransaction(3, 100000, big.NewInt(1250000000), keys[1])); err != nil { // +K1:3 => -K0:1 => Pend K1:0, K2:0; Que K1:2 K1:3 t.Fatalf("failed to add well priced transaction: %v", err) } + // Ensure nonce continuity for key[1] before the future-replacement assertion. + if err := pool.addRemoteSync(pricedTransaction(1, 100000, big.NewInt(1500000000), keys[1])); err != nil { + t.Fatalf("failed to restore contiguous pending set: %v", err) + } // Ensure that replacing a pending transaction with a future transaction fails - if err := pool.addRemote(pricedTransaction(5, 100000, big.NewInt(1500000000), keys[1])); err != ErrFutureReplacePending { + if err := pool.addRemoteSync(pricedTransaction(5, 100000, big.NewInt(1750000000), keys[1])); !errors.Is(err, ErrFutureReplacePending) { t.Fatalf("adding future replace transaction error mismatch: have %v, want %v", err, ErrFutureReplacePending) } pending, queued = pool.Stats() - if pending != 2 { - t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2) - } - if queued != 2 { - t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2) - } - if err := validateEvents(events, 2); err != nil { - t.Fatalf("additional event firing failed: %v", err) - } - if err := validatePoolInternals(pool); err != nil { - t.Fatalf("pool internal state corrupted: %v", err) - } - // Ensure that adding local transactions can push out even higher priced ones - ltx = pricedTransaction(1, 100000, big.NewInt(250000000), keys[2]) - if err := pool.addLocal(ltx); err != nil { - t.Fatalf("failed to append underpriced local transaction: %v", err) - } - ltx = pricedTransaction(0, 100000, big.NewInt(250000000), keys[3]) - if err := pool.addLocal(ltx); err != nil { - t.Fatalf("failed to add new underpriced local transaction: %v", err) - } - pending, queued = pool.Stats() - if pending != 3 { - t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3) - } - if queued != 1 { - t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1) + if pending != 4 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 4) } - if err := validateEvents(events, 2); err != nil { - t.Fatalf("local event firing failed: %v", err) + if queued != 0 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0) } if err := validatePoolInternals(pool); err != nil { t.Fatalf("pool internal state corrupted: %v", err) @@ -2218,8 +2000,6 @@ func TestStableUnderpricing(t *testing.T) { // Tests that when the pool reaches its global transaction limit, underpriced // transactions (legacy & dynamic fee) are gradually shifted out for more // expensive ones and any gapped pending transactions are moved into the queue. -// -// Note, local transactions are never allowed to be dropped. func TestUnderpricingDynamicFee(t *testing.T) { t.Parallel() @@ -2244,15 +2024,13 @@ func TestUnderpricingDynamicFee(t *testing.T) { // Generate and queue a batch of transactions, both pending and queued txs := types.Transactions{} - txs = append(txs, dynamicFeeTx(0, 100000, big.NewInt(270000000), big.NewInt(260000000), keys[0])) - txs = append(txs, pricedTransaction(1, 100000, big.NewInt(260000000), keys[0])) - txs = append(txs, dynamicFeeTx(1, 100000, big.NewInt(260000000), big.NewInt(250000000), keys[1])) - - ltx := dynamicFeeTx(0, 100000, big.NewInt(260000000), big.NewInt(250000000), keys[2]) + txs = append(txs, dynamicFeeTx(0, 100000, big.NewInt(270000000), big.NewInt(260000000), keys[0])) // pending + txs = append(txs, pricedTransaction(1, 100000, big.NewInt(260000000), keys[0])) // pending + txs = append(txs, dynamicFeeTx(1, 100000, big.NewInt(260000000), big.NewInt(250000000), keys[1])) // queued + txs = append(txs, dynamicFeeTx(0, 100000, big.NewInt(260000000), big.NewInt(250000000), keys[2])) // pending // Import the batch and that both pending and queued transactions match up - pool.AddRemotes(txs) // Pend K0:0, K0:1; Que K1:1 - pool.addLocal(ltx) // +K2:0 => Pend K0:0, K0:1, K2:0; Que K1:1 + pool.addRemotesSync(txs) // Pend K0:0, K0:1; Que K1:1 pending, queued := pool.Stats() if pending != 3 { @@ -2270,13 +2048,13 @@ func TestUnderpricingDynamicFee(t *testing.T) { // Ensure that adding an underpriced transaction fails tx := dynamicFeeTx(0, 100000, big.NewInt(260000000), big.NewInt(250000000), keys[1]) - if err := pool.addRemote(tx); !errors.Is(err, txpool.ErrUnderpriced) { // Pend K0:0, K0:1, K2:0; Que K1:1 + if err := pool.addRemoteSync(tx); !errors.Is(err, txpool.ErrUnderpriced) { // Pend K0:0, K0:1, K2:0; Que K1:1 t.Fatalf("adding underpriced pending transaction error mismatch: have %v, want %v", err, txpool.ErrUnderpriced) } // Ensure that adding high priced transactions drops cheap ones, but not own tx = pricedTransaction(0, 100000, big.NewInt(260000000), keys[1]) - if err := pool.addRemote(tx); err != nil { // +K1:0, -K1:1 => Pend K0:0, K0:1, K1:0, K2:0; Que - + if err := pool.addRemoteSync(tx); err != nil { // +K1:0, -K1:1 => Pend K0:0, K0:1, K1:0, K2:0; Que - t.Fatalf("failed to add well priced transaction: %v", err) } @@ -2289,40 +2067,18 @@ func TestUnderpricingDynamicFee(t *testing.T) { t.Fatalf("failed to add well priced transaction: %v", err) } pending, queued = pool.Stats() - if pending != 2 { - t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2) + if pending != 4 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 4) } - if queued != 2 { - t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2) + if queued != 0 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0) } - if err := validateEvents(events, 2); err != nil { + if err := validateEvents(events, 3); err != nil { t.Fatalf("additional event firing failed: %v", err) } if err := validatePoolInternals(pool); err != nil { t.Fatalf("pool internal state corrupted: %v", err) } - // Ensure that adding local transactions can push out even higher priced ones - ltx = dynamicFeeTx(1, 100000, big.NewInt(250000000), big.NewInt(250000000), keys[2]) - if err := pool.addLocal(ltx); err != nil { - t.Fatalf("failed to append underpriced local transaction: %v", err) - } - ltx = dynamicFeeTx(0, 100000, big.NewInt(250000000), big.NewInt(250000000), keys[3]) - if err := pool.addLocal(ltx); err != nil { - t.Fatalf("failed to add new underpriced local transaction: %v", err) - } - pending, queued = pool.Stats() - if pending != 3 { - t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3) - } - if queued != 1 { - t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1) - } - if err := validateEvents(events, 2); err != nil { - t.Fatalf("local event firing failed: %v", err) - } - if err := validatePoolInternals(pool); err != nil { - t.Fatalf("pool internal state corrupted: %v", err) - } } // Tests whether highest fee cap transaction is retained after a batch of high effective @@ -2342,7 +2098,7 @@ func TestDualHeapEviction(t *testing.T) { ) check := func(tx *types.Transaction, name string) { - if pool.all.GetRemote(tx.Hash()) == nil { + if pool.all.Get(tx.Hash()) == nil { t.Fatalf("highest %s transaction evicted from the pool", name) } } @@ -2639,123 +2395,6 @@ func TestReplacementDynamicFee(t *testing.T) { } } -// Tests that local transactions are journaled to disk, but remote transactions -// get discarded between restarts. -func TestJournaling(t *testing.T) { testJournaling(t, false) } - -func TestJournalingNoLocals(t *testing.T) { testJournaling(t, true) } - -func testJournaling(t *testing.T, nolocals bool) { - t.Parallel() - - // Create a temporary file for the journal - file, err := os.CreateTemp(t.TempDir(), "") - if err != nil { - t.Fatalf("failed to create temporary journal: %v", err) - } - journal := file.Name() - defer os.Remove(journal) - - // Clean up the temporary file, we only need the path for now - file.Close() - os.Remove(journal) - - // Create the original pool to inject transaction into the journal - statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase())) - blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) - - config := testTxPoolConfig - config.NoLocals = nolocals - config.Journal = journal - config.Rejournal = time.Second - - pool := New(config, blockchain) - pool.Init(config.PriceLimit, blockchain.CurrentBlock(), newReserver()) - - // Create two test accounts to ensure remotes expire but locals do not - local, _ := crypto.GenerateKey() - remote, _ := crypto.GenerateKey() - - testAddBalance(pool, crypto.PubkeyToAddress(local.PublicKey), big.NewInt(100000000000000)) - testAddBalance(pool, crypto.PubkeyToAddress(remote.PublicKey), big.NewInt(100000000000000)) - - // Add three local and a remote transactions and ensure they are queued up - if err := pool.addLocal(pricedTransaction(0, 100000, big.NewInt(300000000), local)); err != nil { - t.Fatalf("failed to add local transaction: %v", err) - } - if err := pool.addLocal(pricedTransaction(1, 100000, big.NewInt(300000000), local)); err != nil { - t.Fatalf("failed to add local transaction: %v", err) - } - if err := pool.addLocal(pricedTransaction(2, 100000, big.NewInt(300000000), local)); err != nil { - t.Fatalf("failed to add local transaction: %v", err) - } - if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(300000000), remote)); err != nil { - t.Fatalf("failed to add remote transaction: %v", err) - } - pending, queued := pool.Stats() - if pending != 4 { - t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 4) - } - if queued != 0 { - t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0) - } - if err := validatePoolInternals(pool); err != nil { - t.Fatalf("pool internal state corrupted: %v", err) - } - // Terminate the old pool, bump the local nonce, create a new pool and ensure relevant transaction survive - pool.Close() - statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1) - blockchain = newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) - - pool = New(config, blockchain) - pool.Init(config.PriceLimit, blockchain.CurrentBlock(), newReserver()) - - pending, queued = pool.Stats() - if queued != 0 { - t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0) - } - if nolocals { - if pending != 0 { - t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0) - } - } else { - if pending != 2 { - t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2) - } - } - if err := validatePoolInternals(pool); err != nil { - t.Fatalf("pool internal state corrupted: %v", err) - } - // Bump the nonce temporarily and ensure the newly invalidated transaction is removed - statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 2) - <-pool.requestReset(nil, nil) - time.Sleep(2 * config.Rejournal) - pool.Close() - - statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1) - blockchain = newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) - pool = New(config, blockchain) - pool.Init(config.PriceLimit, blockchain.CurrentBlock(), newReserver()) - - pending, queued = pool.Stats() - if pending != 0 { - t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0) - } - if nolocals { - if queued != 0 { - t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0) - } - } else { - if queued != 1 { - t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1) - } - } - if err := validatePoolInternals(pool); err != nil { - t.Fatalf("pool internal state corrupted: %v", err) - } - pool.Close() -} - // TestTransactionStatusCheck tests that the pool can correctly retrieve the // pending status of individual transactions. func TestStatusCheck(t *testing.T) { @@ -3140,7 +2779,7 @@ func benchmarkFuturePromotion(b *testing.B, size int) { for i := 0; i < size; i++ { tx := transaction(uint64(1+i), 100000, key) - pool.enqueueTx(tx.Hash(), tx, false, true) + pool.enqueueTx(tx.Hash(), tx, true) } // Benchmark the speed of pool validation b.ResetTimer() @@ -3150,15 +2789,11 @@ func benchmarkFuturePromotion(b *testing.B, size int) { } // Benchmarks the speed of batched transaction insertion. -func BenchmarkBatchInsert100(b *testing.B) { benchmarkBatchInsert(b, 100, false) } -func BenchmarkBatchInsert1000(b *testing.B) { benchmarkBatchInsert(b, 1000, false) } -func BenchmarkBatchInsert10000(b *testing.B) { benchmarkBatchInsert(b, 10000, false) } +func BenchmarkBatchInsert100(b *testing.B) { benchmarkBatchInsert(b, 100) } +func BenchmarkBatchInsert1000(b *testing.B) { benchmarkBatchInsert(b, 1000) } +func BenchmarkBatchInsert10000(b *testing.B) { benchmarkBatchInsert(b, 10000) } -func BenchmarkBatchLocalInsert100(b *testing.B) { benchmarkBatchInsert(b, 100, true) } -func BenchmarkBatchLocalInsert1000(b *testing.B) { benchmarkBatchInsert(b, 1000, true) } -func BenchmarkBatchLocalInsert10000(b *testing.B) { benchmarkBatchInsert(b, 10000, true) } - -func benchmarkBatchInsert(b *testing.B, size int, local bool) { +func benchmarkBatchInsert(b *testing.B, size int) { // Generate a batch of transactions to enqueue into the pool pool, key := setupPool() defer pool.Close() @@ -3176,46 +2811,7 @@ func benchmarkBatchInsert(b *testing.B, size int, local bool) { // Benchmark importing the transactions into the queue b.ResetTimer() for _, batch := range batches { - if local { - pool.addLocals(batch) - } else { - pool.AddRemotes(batch) - } - } -} - -func BenchmarkInsertRemoteWithAllLocals(b *testing.B) { - // Allocate keys for testing - key, _ := crypto.GenerateKey() - account := crypto.PubkeyToAddress(key.PublicKey) - - remoteKey, _ := crypto.GenerateKey() - remoteAddr := crypto.PubkeyToAddress(remoteKey.PublicKey) - - locals := make([]*types.Transaction, 4096+1024) // Occupy all slots - for i := 0; i < len(locals); i++ { - locals[i] = transaction(uint64(i), 100000, key) - } - remotes := make([]*types.Transaction, 1000) - for i := 0; i < len(remotes); i++ { - remotes[i] = pricedTransaction(uint64(i), 100000, big.NewInt(2), remoteKey) // Higher gasprice - } - // Benchmark importing the transactions into the queue - b.ResetTimer() - for i := 0; i < b.N; i++ { - b.StopTimer() - pool, _ := setupPool() - testAddBalance(pool, account, big.NewInt(100000000)) - for _, local := range locals { - pool.addLocal(local) - } - b.StartTimer() - // Assign a high enough balance for testing - testAddBalance(pool, remoteAddr, big.NewInt(100000000)) - for i := 0; i < len(remotes); i++ { - pool.AddRemotes([]*types.Transaction{remotes[i]}) - } - pool.Close() + pool.AddRemotes(batch) } } @@ -3350,8 +2946,9 @@ func TestPendingKeepsLocalAndSpecialTransactions(t *testing.T) { localKey, _ := crypto.GenerateKey() localAddr := crypto.PubkeyToAddress(localKey.PublicKey) testAddBalance(pool, localAddr, big.NewInt(1_000_000_000_000_000)) - if err := pool.addLocal(pricedTransaction(0, 100000, new(big.Int).Add(new(big.Int).Set(minGasTip), big.NewInt(1)), localKey)); err != nil { - t.Fatalf("failed to add local tx: %v", err) + localTx := pricedTransaction(0, 100000, new(big.Int).Add(new(big.Int).Set(filterTip), big.NewInt(1)), localKey) + if err := pool.addRemoteSync(localTx); err != nil { + t.Fatalf("failed to add local account tx: %v", err) } filtered := pool.Pending(txpool.PendingFilter{MinTip: uint256.MustFromBig(filterTip)}) diff --git a/core/txpool/legacypool/list.go b/core/txpool/legacypool/list.go index 9461bc324edc..afd3e8b32b49 100644 --- a/core/txpool/legacypool/list.go +++ b/core/txpool/legacypool/list.go @@ -52,31 +52,31 @@ func (h *nonceHeap) Pop() interface{} { return x } -// sortedMap is a nonce->transaction hash map with a heap based index to allow +// SortedMap is a nonce->transaction hash map with a heap based index to allow // iterating over the contents in a nonce-incrementing way. -type sortedMap struct { +type SortedMap struct { items map[uint64]*types.Transaction // Hash map storing the transaction data index *nonceHeap // Heap of nonces of all the stored transactions (non-strict mode) cache types.Transactions // Cache of the transactions already sorted cacheMu sync.Mutex // Mutex covering the cache } -// newSortedMap creates a new nonce-sorted transaction map. -func newSortedMap() *sortedMap { - return &sortedMap{ +// NewSortedMap creates a new nonce-sorted transaction map. +func NewSortedMap() *SortedMap { + return &SortedMap{ items: make(map[uint64]*types.Transaction), index: new(nonceHeap), } } // Get retrieves the current transactions associated with the given nonce. -func (m *sortedMap) Get(nonce uint64) *types.Transaction { +func (m *SortedMap) Get(nonce uint64) *types.Transaction { return m.items[nonce] } // Put inserts a new transaction into the map, also updating the map's nonce // index. If a transaction already exists with the same nonce, it's overwritten. -func (m *sortedMap) Put(tx *types.Transaction) { +func (m *SortedMap) Put(tx *types.Transaction) { nonce := tx.Nonce() if m.items[nonce] == nil { heap.Push(m.index, nonce) @@ -89,7 +89,7 @@ func (m *sortedMap) Put(tx *types.Transaction) { // Forward removes all transactions from the map with a nonce lower than the // provided threshold. Every removed transaction is returned for any post-removal // maintenance. -func (m *sortedMap) Forward(threshold uint64) types.Transactions { +func (m *SortedMap) Forward(threshold uint64) types.Transactions { var removed types.Transactions // Pop off heap items until the threshold is reached @@ -112,7 +112,7 @@ func (m *sortedMap) Forward(threshold uint64) types.Transactions { // Filter, as opposed to 'filter', re-initialises the heap after the operation is done. // If you want to do several consecutive filterings, it's therefore better to first // do a .filter(func1) followed by .Filter(func2) or reheap() -func (m *sortedMap) Filter(filterFunc func(*types.Transaction) bool) types.Transactions { +func (m *SortedMap) Filter(filterFunc func(*types.Transaction) bool) types.Transactions { removed := m.filter(filterFunc) // If transactions were removed, the heap and cache are ruined if len(removed) > 0 { @@ -121,7 +121,7 @@ func (m *sortedMap) Filter(filterFunc func(*types.Transaction) bool) types.Trans return removed } -func (m *sortedMap) reheap() { +func (m *SortedMap) reheap() { *m.index = make([]uint64, 0, len(m.items)) for nonce := range m.items { *m.index = append(*m.index, nonce) @@ -134,7 +134,7 @@ func (m *sortedMap) reheap() { // filter is identical to Filter, but **does not** regenerate the heap. This method // should only be used if followed immediately by a call to Filter or reheap() -func (m *sortedMap) filter(filterFunc func(*types.Transaction) bool) types.Transactions { +func (m *SortedMap) filter(filterFunc func(*types.Transaction) bool) types.Transactions { var removed types.Transactions // Collect all the transactions to filter out @@ -154,7 +154,7 @@ func (m *sortedMap) filter(filterFunc func(*types.Transaction) bool) types.Trans // Cap places a hard limit on the number of items, returning all transactions // exceeding that limit. -func (m *sortedMap) Cap(threshold int) types.Transactions { +func (m *SortedMap) Cap(threshold int) types.Transactions { // Short circuit if the number of items is under the limit if len(m.items) <= threshold { return nil @@ -182,7 +182,7 @@ func (m *sortedMap) Cap(threshold int) types.Transactions { // Remove deletes a transaction from the maintained map, returning whether the // transaction was found. -func (m *sortedMap) Remove(nonce uint64) bool { +func (m *SortedMap) Remove(nonce uint64) bool { // Short circuit if no transaction is present _, ok := m.items[nonce] if !ok { @@ -210,7 +210,7 @@ func (m *sortedMap) Remove(nonce uint64) bool { // Note, all transactions with nonces lower than start will also be returned to // prevent getting into an invalid state. This is not something that should ever // happen but better to be self correcting than failing! -func (m *sortedMap) Ready(start uint64) types.Transactions { +func (m *SortedMap) Ready(start uint64) types.Transactions { // Short circuit if no transactions are available if m.index.Len() == 0 || (*m.index)[0] > start { return nil @@ -230,11 +230,11 @@ func (m *sortedMap) Ready(start uint64) types.Transactions { } // Len returns the length of the transaction map. -func (m *sortedMap) Len() int { +func (m *SortedMap) Len() int { return len(m.items) } -func (m *sortedMap) flatten() types.Transactions { +func (m *SortedMap) flatten() types.Transactions { m.cacheMu.Lock() defer m.cacheMu.Unlock() // If the sorting was not cached yet, create and cache it @@ -251,7 +251,7 @@ func (m *sortedMap) flatten() types.Transactions { // Flatten creates a nonce-sorted slice of transactions based on the loosely // sorted internal representation. The result of the sorting is cached in case // it's requested again before any modifications are made to the contents. -func (m *sortedMap) Flatten() types.Transactions { +func (m *SortedMap) Flatten() types.Transactions { cache := m.flatten() // Copy the cache to prevent accidental modification txs := make(types.Transactions, len(cache)) @@ -261,7 +261,7 @@ func (m *sortedMap) Flatten() types.Transactions { // LastElement returns the last element of a flattened list, thus, the // transaction with the highest nonce -func (m *sortedMap) LastElement() *types.Transaction { +func (m *SortedMap) LastElement() *types.Transaction { cache := m.flatten() return cache[len(cache)-1] } @@ -272,7 +272,7 @@ func (m *sortedMap) LastElement() *types.Transaction { // executable/future queue, with minor behavioral changes. type list struct { strict bool // Whether nonces are strictly continuous or not - txs *sortedMap // Heap indexed sorted hash map of the transactions + txs *SortedMap // Heap indexed sorted hash map of the transactions costcap *big.Int // Price of the highest costing transaction (reset only if exceeds balance) gascap uint64 // Gas limit of the highest spending transaction (reset only if exceeds block limit) @@ -284,7 +284,7 @@ type list struct { func newList(strict bool) *list { return &list{ strict: strict, - txs: newSortedMap(), + txs: NewSortedMap(), costcap: new(big.Int), totalcost: new(uint256.Int), } @@ -573,10 +573,7 @@ func newPricedList(all *lookup) *pricedList { } // Put inserts a new transaction into the heap. -func (l *pricedList) Put(tx *types.Transaction, local bool) { - if local { - return - } +func (l *pricedList) Put(tx *types.Transaction) { // Insert every new transaction to the urgent heap first; Discard will balance the heaps heap.Push(&l.urgent, tx) } @@ -610,7 +607,7 @@ func (l *pricedList) underpricedFor(h *priceHeap, tx *types.Transaction) bool { // Discard stale price points if found at the heap start for len(h.list) > 0 { head := h.list[0] - if l.all.GetRemote(head.Hash()) == nil { // Removed or migrated + if l.all.Get(head.Hash()) == nil { // Removed or migrated l.stales.Add(-1) heap.Pop(h) continue @@ -629,15 +626,13 @@ func (l *pricedList) underpricedFor(h *priceHeap, tx *types.Transaction) bool { // Discard finds a number of most underpriced transactions, removes them from the // priced list and returns them for further removal from the entire pool. // If noPending is set to true, we will only consider the floating list -// -// Note local transaction won't be considered for eviction. -func (l *pricedList) Discard(slots int, force bool) (types.Transactions, bool) { +func (l *pricedList) Discard(slots int) (types.Transactions, bool) { drop := make(types.Transactions, 0, slots) // Remote underpriced transactions to drop for slots > 0 { if len(l.urgent.list)*floatingRatio > len(l.floating.list)*urgentRatio { // Discard stale transactions if found during cleanup tx := heap.Pop(&l.urgent).(*types.Transaction) - if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated + if l.all.Get(tx.Hash()) == nil { // Removed or migrated l.stales.Add(-1) continue } @@ -650,7 +645,7 @@ func (l *pricedList) Discard(slots int, force bool) (types.Transactions, bool) { } // Discard stale transactions if found during cleanup tx := heap.Pop(&l.floating).(*types.Transaction) - if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated + if l.all.Get(tx.Hash()) == nil { // Removed or migrated l.stales.Add(-1) continue } @@ -660,7 +655,7 @@ func (l *pricedList) Discard(slots int, force bool) (types.Transactions, bool) { } } // If we still can't make enough room for the new transaction - if slots > 0 && !force { + if slots > 0 { for _, tx := range drop { heap.Push(&l.urgent, tx) } @@ -675,11 +670,11 @@ func (l *pricedList) Reheap() { defer l.reheapMu.Unlock() start := time.Now() l.stales.Store(0) - l.urgent.list = make([]*types.Transaction, 0, l.all.RemoteCount()) - l.all.Range(func(hash common.Hash, tx *types.Transaction, local bool) bool { + l.urgent.list = make([]*types.Transaction, 0, l.all.Count()) + l.all.Range(func(hash common.Hash, tx *types.Transaction) bool { l.urgent.list = append(l.urgent.list, tx) return true - }, false, true) // Only iterate remotes + }) heap.Init(&l.urgent) // balance out the two heaps by moving the worse half of transactions into the diff --git a/core/txpool/legacypool/journal.go b/core/txpool/locals/journal.go similarity index 99% rename from core/txpool/legacypool/journal.go rename to core/txpool/locals/journal.go index e34a503c8d31..9eea07c03d7d 100644 --- a/core/txpool/legacypool/journal.go +++ b/core/txpool/locals/journal.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -package legacypool +package locals import ( "errors" diff --git a/core/txpool/locals/tx_tracker.go b/core/txpool/locals/tx_tracker.go new file mode 100644 index 000000000000..8cd33948bfc6 --- /dev/null +++ b/core/txpool/locals/tx_tracker.go @@ -0,0 +1,212 @@ +// Copyright 2023 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// Package locals implements tracking for "local" transactions +package locals + +import ( + "cmp" + "slices" + "sync" + "time" + + "github.com/XinFinOrg/XDPoSChain/common" + "github.com/XinFinOrg/XDPoSChain/core/txpool" + "github.com/XinFinOrg/XDPoSChain/core/txpool/legacypool" + "github.com/XinFinOrg/XDPoSChain/core/types" + "github.com/XinFinOrg/XDPoSChain/log" + "github.com/XinFinOrg/XDPoSChain/metrics" + "github.com/XinFinOrg/XDPoSChain/params" +) + +var ( + recheckInterval = time.Minute + localGauge = metrics.GetOrRegisterGauge("txpool/local", nil) +) + +// TxTracker is a struct used to track priority transactions; it will check from +// time to time if the main pool has forgotten about any of the transaction +// it is tracking, and if so, submit it again. +// This is used to track 'locals'. +// This struct does not care about transaction validity, price-bumps or account limits, +// but optimistically accepts transactions. +type TxTracker struct { + all map[common.Hash]*types.Transaction // All tracked transactions + byAddr map[common.Address]*legacypool.SortedMap // Transactions by address + + journal *journal // Journal of local transaction to back up to disk + rejournal time.Duration // How often to rotate journal + pool *txpool.TxPool // The tx pool to interact with + signer types.Signer + + shutdownCh chan struct{} + mu sync.Mutex + wg sync.WaitGroup +} + +// New creates a new TxTracker +func New(journalPath string, journalTime time.Duration, chainConfig *params.ChainConfig, next *txpool.TxPool) *TxTracker { + pool := &TxTracker{ + all: make(map[common.Hash]*types.Transaction), + byAddr: make(map[common.Address]*legacypool.SortedMap), + signer: types.LatestSigner(chainConfig), + shutdownCh: make(chan struct{}), + pool: next, + } + if journalPath != "" { + pool.journal = newTxJournal(journalPath) + pool.rejournal = journalTime + } + return pool +} + +// Track adds a transaction to the tracked set. +func (tracker *TxTracker) Track(tx *types.Transaction) { + tracker.TrackAll([]*types.Transaction{tx}) +} + +// TrackAll adds a list of transactions to the tracked set. +func (tracker *TxTracker) TrackAll(txs []*types.Transaction) { + tracker.mu.Lock() + defer tracker.mu.Unlock() + + for _, tx := range txs { + // If we're already tracking it, it's a no-op + if _, ok := tracker.all[tx.Hash()]; ok { + continue + } + // Theoretically, checking the error here is unnecessary since sender recovery + // is already part of basic validation. However, retrieving the sender address + // from the transaction cache is effectively a no-op if it was previously verified. + // Therefore, the error is still checked just in case. + addr, err := types.Sender(tracker.signer, tx) + if err != nil { // Ignore this tx + continue + } + tracker.all[tx.Hash()] = tx + if tracker.byAddr[addr] == nil { + tracker.byAddr[addr] = legacypool.NewSortedMap() + } + tracker.byAddr[addr].Put(tx) + + if tracker.journal != nil { + _ = tracker.journal.insert(tx) + } + } + localGauge.Update(int64(len(tracker.all))) +} + +// recheck checks and returns any transactions that needs to be resubmitted. +func (tracker *TxTracker) recheck(journalCheck bool) (resubmits []*types.Transaction, rejournal map[common.Address]types.Transactions) { + tracker.mu.Lock() + defer tracker.mu.Unlock() + + var ( + numStales = 0 + numOk = 0 + ) + for sender, txs := range tracker.byAddr { + // Wipe the stales + stales := txs.Forward(tracker.pool.Nonce(sender)) + for _, tx := range stales { + delete(tracker.all, tx.Hash()) + } + numStales += len(stales) + + // Check the non-stale + for _, tx := range txs.Flatten() { + if tracker.pool.Has(tx.Hash()) { + numOk++ + continue + } + resubmits = append(resubmits, tx) + } + } + + if journalCheck { // rejournal + rejournal = make(map[common.Address]types.Transactions) + for _, tx := range tracker.all { + addr, _ := types.Sender(tracker.signer, tx) + rejournal[addr] = append(rejournal[addr], tx) + } + // Sort them + for _, list := range rejournal { + // cmp(a, b) should return a negative number when a < b, + slices.SortFunc(list, func(a, b *types.Transaction) int { + return cmp.Compare(a.Nonce(), b.Nonce()) + }) + } + } + localGauge.Update(int64(len(tracker.all))) + log.Debug("Tx tracker status", "need-resubmit", len(resubmits), "stale", numStales, "ok", numOk) + return resubmits, rejournal +} + +// Start implements node.Lifecycle interface +// Start is called after all services have been constructed and the networking +// layer was also initialized to spawn any goroutines required by the service. +func (tracker *TxTracker) Start() error { + tracker.wg.Add(1) + go tracker.loop() + return nil +} + +// Stop implements node.Lifecycle interface +// Stop terminates all goroutines belonging to the service, blocking until they +// are all terminated. +func (tracker *TxTracker) Stop() error { + close(tracker.shutdownCh) + tracker.wg.Wait() + return nil +} + +func (tracker *TxTracker) loop() { + defer tracker.wg.Done() + + if tracker.journal != nil { + tracker.journal.load(func(transactions []*types.Transaction) []error { + tracker.TrackAll(transactions) + return nil + }) + defer tracker.journal.close() + } + var ( + lastJournal = time.Now() + timer = time.NewTimer(10 * time.Second) // Do initial check after 10 seconds, do rechecks more seldom. + ) + for { + select { + case <-tracker.shutdownCh: + return + case <-timer.C: + checkJournal := tracker.journal != nil && time.Since(lastJournal) > tracker.rejournal + resubmits, rejournal := tracker.recheck(checkJournal) + if len(resubmits) > 0 { + tracker.pool.Add(resubmits, false) + } + if checkJournal { + // Lock to prevent journal.rotate <-> journal.insert (via TrackAll) conflicts + tracker.mu.Lock() + lastJournal = time.Now() + if err := tracker.journal.rotate(rejournal); err != nil { + log.Warn("Transaction journal rotation failed", "err", err) + } + tracker.mu.Unlock() + } + timer.Reset(recheckInterval) + } + } +} diff --git a/core/txpool/subpool.go b/core/txpool/subpool.go index 8ad40718c3fc..a0c7ac169cbb 100644 --- a/core/txpool/subpool.go +++ b/core/txpool/subpool.go @@ -117,7 +117,7 @@ type SubPool interface { // Add enqueues a batch of transactions into the pool if they are valid. Due // to the large transaction churn, add may postpone fully integrating the tx // to a later point to batch multiple ones together. - Add(txs []*types.Transaction, local bool, sync bool) []error + Add(txs []*types.Transaction, sync bool) []error // Pending retrieves all currently processable transactions, grouped by origin // account and sorted by nonce. @@ -147,9 +147,6 @@ type SubPool interface { // pending as well as queued transactions of this address, grouped by nonce. ContentFrom(addr common.Address) ([]*types.Transaction, []*types.Transaction) - // Locals retrieves the accounts currently considered local by the pool. - Locals() []common.Address - // Status returns the known status (unknown/pending/queued) of a transaction // identified by their hashes. Status(hash common.Hash) TxStatus diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 4e7f801f84f3..b97137551db3 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -56,6 +56,8 @@ type BlockChain interface { type TxPool struct { subpools []SubPool // List of subpools for specialized transaction handling + localTracker LocalTracker // Optional tracker for local tx submissions + subs event.SubscriptionScope // Subscription scope to unsubscribe all on shutdown quit chan chan error // Quit channel to tear down the head updater term chan struct{} // Termination channel to detect a closed pool @@ -63,6 +65,13 @@ type TxPool struct { sync chan chan error // Testing / simulator channel to block until internal reset is done } +// LocalTracker is the minimal local transaction tracking functionality used by +// TxPool local submission helpers. +type LocalTracker interface { + Track(tx *types.Transaction) + TrackAll(txs []*types.Transaction) +} + // New creates a new transaction pool to gather, sort and filter inbound // transactions from the network. func New(gasTip uint64, chain BlockChain, subpools []SubPool) (*TxPool, error) { @@ -268,7 +277,7 @@ func (p *TxPool) Get(hash common.Hash) *types.Transaction { // Add enqueues a batch of transactions into the pool if they are valid. Due // to the large transaction churn, add may postpone fully integrating the tx // to a later point to batch multiple ones together. -func (p *TxPool) Add(txs []*types.Transaction, local bool, sync bool) []error { +func (p *TxPool) Add(txs []*types.Transaction, sync bool) []error { // Split the input transactions between the subpools. It shouldn't really // happen that we receive merged batches, but better graceful than strange // errors. @@ -295,7 +304,7 @@ func (p *TxPool) Add(txs []*types.Transaction, local bool, sync bool) []error { // back the errors into the original sort order. errsets := make([][]error, len(p.subpools)) for i := 0; i < len(p.subpools); i++ { - errsets[i] = p.subpools[i].Add(txsets[i], local, sync) + errsets[i] = p.subpools[i].Add(txsets[i], sync) } errs := make([]error, len(txs)) for i, split := range splits { @@ -311,6 +320,26 @@ func (p *TxPool) Add(txs []*types.Transaction, local bool, sync bool) []error { return errs } +// SetLocalTracker configures an optional tracker that will receive all local +// transaction submissions. +func (p *TxPool) SetLocalTracker(tracker LocalTracker) { + p.localTracker = tracker +} + +// AddLocals enqueues a batch of local transactions into the pool if they are +// valid and tracks them for re-journal/re-submit flows. +func (p *TxPool) AddLocals(txs []*types.Transaction, sync bool) []error { + if p.localTracker != nil { + p.localTracker.TrackAll(txs) + } + return p.Add(txs, sync) +} + +// AddLocal enqueues a single local transaction into the pool if it is valid. +func (p *TxPool) AddLocal(tx *types.Transaction, sync bool) error { + return p.AddLocals([]*types.Transaction{tx}, sync)[0] +} + // Pending retrieves all currently processable transactions, grouped by origin // account and sorted by nonce. // @@ -394,23 +423,6 @@ func (p *TxPool) ContentFrom(addr common.Address) ([]*types.Transaction, []*type return []*types.Transaction{}, []*types.Transaction{} } -// Locals retrieves the accounts currently considered local by the pool. -func (p *TxPool) Locals() []common.Address { - // Retrieve the locals from each subpool and deduplicate them - locals := make(map[common.Address]struct{}) - for _, subpool := range p.subpools { - for _, local := range subpool.Locals() { - locals[local] = struct{}{} - } - } - // Flatten and return the deduplicated local set - flat := make([]common.Address, 0, len(locals)) - for local := range locals { - flat = append(flat, local) - } - return flat -} - // Status returns the known status (unknown/pending/queued) of a transaction // identified by its hashes. func (p *TxPool) Status(hash common.Hash) TxStatus { diff --git a/core/txpool/txpool_local_test.go b/core/txpool/txpool_local_test.go new file mode 100644 index 000000000000..bceec4b05fae --- /dev/null +++ b/core/txpool/txpool_local_test.go @@ -0,0 +1,183 @@ +package txpool + +import ( + "math/big" + "reflect" + "sync" + "testing" + + "github.com/XinFinOrg/XDPoSChain/common" + "github.com/XinFinOrg/XDPoSChain/core" + "github.com/XinFinOrg/XDPoSChain/core/types" + "github.com/XinFinOrg/XDPoSChain/event" +) + +type testChain struct{} + +func (testChain) CurrentBlock() *types.Header { return &types.Header{Number: big.NewInt(0)} } + +func (testChain) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription { + return event.NewSubscription(func(quit <-chan struct{}) error { + <-quit + return nil + }) +} + +type testLocalTracker struct { + mu sync.Mutex + events *[]string + tracked []common.Hash +} + +func (t *testLocalTracker) Track(tx *types.Transaction) { + t.TrackAll([]*types.Transaction{tx}) +} + +func (t *testLocalTracker) TrackAll(txs []*types.Transaction) { + t.mu.Lock() + defer t.mu.Unlock() + *t.events = append(*t.events, "track") + for _, tx := range txs { + t.tracked = append(t.tracked, tx.Hash()) + } +} + +type testSubPool struct { + events *[]string + + lastAdd []*types.Transaction + lastSync bool +} + +func (s *testSubPool) Filter(tx *types.Transaction) bool { return true } + +func (s *testSubPool) Init(gasTip uint64, head *types.Header, reserver *Reserver) error { return nil } + +func (s *testSubPool) Close() error { return nil } + +func (s *testSubPool) Reset(oldHead, newHead *types.Header) {} + +func (s *testSubPool) SetGasTip(tip *big.Int) error { return nil } + +func (s *testSubPool) Has(hash common.Hash) bool { return false } + +func (s *testSubPool) Get(hash common.Hash) *types.Transaction { return nil } + +func (s *testSubPool) Add(txs []*types.Transaction, sync bool) []error { + *s.events = append(*s.events, "add") + s.lastAdd = txs + s.lastSync = sync + return make([]error, len(txs)) +} + +func (s *testSubPool) Pending(filter PendingFilter) map[common.Address][]*LazyTransaction { + return map[common.Address][]*LazyTransaction{} +} + +func (s *testSubPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription { + return event.NewSubscription(func(quit <-chan struct{}) error { + <-quit + return nil + }) +} + +func (s *testSubPool) Nonce(addr common.Address) uint64 { return 0 } + +func (s *testSubPool) Stats() (int, int) { return 0, 0 } + +func (s *testSubPool) Content() (map[common.Address][]*types.Transaction, map[common.Address][]*types.Transaction) { + return map[common.Address][]*types.Transaction{}, map[common.Address][]*types.Transaction{} +} + +func (s *testSubPool) ContentFrom(addr common.Address) ([]*types.Transaction, []*types.Transaction) { + return nil, nil +} + +func (s *testSubPool) Status(hash common.Hash) TxStatus { return TxStatusUnknown } + +func (s *testSubPool) SetSigner(f func(address common.Address) bool) {} + +func (s *testSubPool) IsSigner(addr common.Address) bool { return false } + +func TestAddLocalTracksBeforeAdd(t *testing.T) { + events := []string{} + tracker := &testLocalTracker{events: &events} + subpool := &testSubPool{events: &events} + + pool, err := New(0, testChain{}, []SubPool{subpool}) + if err != nil { + t.Fatalf("failed to create txpool: %v", err) + } + defer pool.Close() + + pool.SetLocalTracker(tracker) + + tx := types.NewTransaction(0, common.Address{0x1}, big.NewInt(1), 21000, big.NewInt(1), nil) + if err := pool.AddLocal(tx, true); err != nil { + t.Fatalf("AddLocal failed: %v", err) + } + + if len(tracker.tracked) != 1 || tracker.tracked[0] != tx.Hash() { + t.Fatalf("tracker did not receive local tx hash") + } + if len(subpool.lastAdd) != 1 || subpool.lastAdd[0].Hash() != tx.Hash() { + t.Fatalf("subpool Add did not receive local tx") + } + if !subpool.lastSync { + t.Fatalf("sync flag not propagated to subpool Add") + } + if !reflect.DeepEqual(events, []string{"track", "add"}) { + t.Fatalf("unexpected call order: have %v", events) + } +} + +func TestAddLocalsTracksBeforeAdd(t *testing.T) { + events := []string{} + tracker := &testLocalTracker{events: &events} + subpool := &testSubPool{events: &events} + + pool, err := New(0, testChain{}, []SubPool{subpool}) + if err != nil { + t.Fatalf("failed to create txpool: %v", err) + } + defer pool.Close() + + pool.SetLocalTracker(tracker) + + tx0 := types.NewTransaction(0, common.Address{0x1}, big.NewInt(1), 21000, big.NewInt(1), nil) + tx1 := types.NewTransaction(1, common.Address{0x1}, big.NewInt(1), 21000, big.NewInt(1), nil) + txs := []*types.Transaction{tx0, tx1} + + errs := pool.AddLocals(txs, true) + if len(errs) != len(txs) { + t.Fatalf("unexpected error result length: have %d, want %d", len(errs), len(txs)) + } + for i, err := range errs { + if err != nil { + t.Fatalf("AddLocals error at index %d: %v", i, err) + } + } + + hashes := []common.Hash{tx0.Hash(), tx1.Hash()} + if len(tracker.tracked) != len(hashes) { + t.Fatalf("tracker tx count mismatch: have %d, want %d", len(tracker.tracked), len(hashes)) + } + if !reflect.DeepEqual(tracker.tracked, hashes) { + t.Fatalf("tracker hashes mismatch: have %v, want %v", tracker.tracked, hashes) + } + + if len(subpool.lastAdd) != len(hashes) { + t.Fatalf("subpool Add tx count mismatch: have %d, want %d", len(subpool.lastAdd), len(hashes)) + } + for i, tx := range subpool.lastAdd { + if tx.Hash() != hashes[i] { + t.Fatalf("subpool Add hash mismatch at index %d", i) + } + } + if !subpool.lastSync { + t.Fatalf("sync flag not propagated to subpool Add") + } + if !reflect.DeepEqual(events, []string{"track", "add"}) { + t.Fatalf("unexpected call order: have %v", events) + } +} diff --git a/eth/api_backend.go b/eth/api_backend.go index 6d14b0a93ede..e703f9f64b8e 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -301,7 +301,10 @@ func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscri } func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error { - return b.eth.txPool.Add([]*types.Transaction{signedTx}, true, false)[0] + if locals := b.eth.localTxTracker; locals != nil { + locals.Track(signedTx) + } + return b.eth.txPool.Add([]*types.Transaction{signedTx}, false)[0] } func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) { diff --git a/eth/backend.go b/eth/backend.go index 8b6fc8148858..df68a1a8f76e 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -25,6 +25,7 @@ import ( "runtime" "sync" "sync/atomic" + "time" "github.com/XinFinOrg/XDPoSChain/XDCx" "github.com/XinFinOrg/XDPoSChain/XDCxlending" @@ -41,6 +42,7 @@ import ( "github.com/XinFinOrg/XDPoSChain/core/rawdb" "github.com/XinFinOrg/XDPoSChain/core/txpool" "github.com/XinFinOrg/XDPoSChain/core/txpool/legacypool" + "github.com/XinFinOrg/XDPoSChain/core/txpool/locals" "github.com/XinFinOrg/XDPoSChain/core/types" "github.com/XinFinOrg/XDPoSChain/core/vm" "github.com/XinFinOrg/XDPoSChain/eth/downloader" @@ -65,17 +67,17 @@ import ( // Ethereum implements the Ethereum full node service. type Ethereum struct { - config *ethconfig.Config + // core protocol objects + config *ethconfig.Config + txPool *txpool.TxPool + localTxTracker *locals.TxTracker + blockchain *core.BlockChain // Channel for shutting down the service shutdownChan chan bool // Channel for shutting down the ethereum - // Handlers - txPool *txpool.TxPool - orderPool *legacypool.OrderPool lendingPool *legacypool.LendingPool - blockchain *core.BlockChain protocolManager *ProtocolManager // DB interfaces @@ -267,14 +269,24 @@ func New(stack *node.Node, config *ethconfig.Config, XDCXServ *XDCx.XDCX, lendin config.TxPool.Journal = stack.ResolvePath(config.TxPool.Journal) } legacyPool := legacypool.New(config.TxPool, eth.blockchain) + eth.orderPool = legacypool.NewOrderPool(eth.blockchain.Config(), eth.blockchain) + eth.lendingPool = legacypool.NewLendingPool(eth.blockchain.Config(), eth.blockchain) eth.txPool, err = txpool.New(config.TxPool.PriceLimit, eth.blockchain, []txpool.SubPool{legacyPool}) if err != nil { return nil, err } - eth.orderPool = legacypool.NewOrderPool(eth.blockchain.Config(), eth.blockchain) - eth.lendingPool = legacypool.NewLendingPool(eth.blockchain.Config(), eth.blockchain) + if !config.TxPool.NoLocals { + rejournal := config.TxPool.Rejournal + if rejournal < time.Second { + log.Warn("Sanitizing invalid txpool journal time", "provided", rejournal, "updated", time.Second) + rejournal = time.Second + } + eth.localTxTracker = locals.New(config.TxPool.Journal, rejournal, eth.blockchain.Config(), eth.txPool) + eth.txPool.SetLocalTracker(eth.localTxTracker) + stack.RegisterLifecycle(eth.localTxTracker) + } if eth.protocolManager, err = NewProtocolManagerEx(eth.blockchain.Config(), config.SyncMode, networkID, eth.eventMux, eth.txPool, eth.orderPool, eth.lendingPool, eth.engine, eth.blockchain, chainDb); err != nil { return nil, err diff --git a/eth/handler.go b/eth/handler.go index 93b48157d55c..3bda8c158e2f 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -780,7 +780,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { pm.knownTxs.Add(tx.Hash(), struct{}{}) } } - pm.txpool.Add(txs, false, false) + pm.txpool.Add(txs, false) case msg.Code == OrderTxMsg: // Transactions arrived, make sure we have a valid and fresh chain to handle them diff --git a/eth/helper_test.go b/eth/helper_test.go index 3db3579eb96f..c98914fc6a63 100644 --- a/eth/helper_test.go +++ b/eth/helper_test.go @@ -132,7 +132,7 @@ func (p *testTxPool) Get(hash common.Hash) *types.Transaction { // Add appends a batch of transactions to the pool, and notifies any // listeners if the addition channel is non nil -func (p *testTxPool) Add(txs []*types.Transaction, local bool, sync bool) []error { +func (p *testTxPool) Add(txs []*types.Transaction, sync bool) []error { p.lock.Lock() defer p.lock.Unlock() diff --git a/eth/protocol.go b/eth/protocol.go index ef3c74bb857a..6490f246e831 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -105,7 +105,7 @@ var errorToString = map[int]string{ type txPool interface { // Add should add the given transactions to the pool. - Add(txs []*types.Transaction, local bool, sync bool) []error + Add(txs []*types.Transaction, sync bool) []error // Pending should return pending transactions. // The slice should be modifiable by the caller. diff --git a/eth/protocol_test.go b/eth/protocol_test.go index 79228ac93b25..c85815150c2e 100644 --- a/eth/protocol_test.go +++ b/eth/protocol_test.go @@ -136,7 +136,7 @@ func testSendTransactions(t *testing.T, protocol int) { tx := newTestTransaction(testAccount, uint64(nonce), txsize) alltxs[nonce] = tx } - pm.txpool.Add(alltxs, false, false) + pm.txpool.Add(alltxs, false) // Connect several peers. They should all receive the pending transactions. var wg sync.WaitGroup