Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 83 additions & 3 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package txpool

import (
"errors"
"fmt"
"maps"
"math/big"
Expand Down Expand Up @@ -57,6 +58,9 @@ type TxPool struct {

subs event.SubscriptionScope // Subscription scope to unsubscribe all on shutdown
quit chan chan error // Quit channel to tear down the head updater
term chan struct{} // Termination channel to detect a closed pool

sync chan chan error // Testing / simulator channel to block until internal reset is done
}

// New creates a new transaction pool to gather, sort and filter inbound
Expand All @@ -70,6 +74,8 @@ func New(gasTip uint64, chain BlockChain, subpools []SubPool) (*TxPool, error) {
pool := &TxPool{
subpools: subpools,
quit: make(chan chan error),
term: make(chan struct{}),
sync: make(chan chan error),
}
reserver := NewReservationTracker()
for i, subpool := range subpools {
Expand Down Expand Up @@ -113,6 +119,9 @@ func (p *TxPool) Close() error {
// outside blockchain events as well as for various reporting and transaction
// eviction events.
func (p *TxPool) loop(head *types.Header, chain BlockChain) {
// Close the termination marker when the pool stops
defer close(p.term)

// Subscribe to chain head events to trigger subpool resets
var (
newHeadCh = make(chan core.ChainHeadEvent)
Expand All @@ -129,13 +138,26 @@ func (p *TxPool) loop(head *types.Header, chain BlockChain) {
var (
resetBusy = make(chan struct{}, 1) // Allow 1 reset to run concurrently
resetDone = make(chan *types.Header)

resetForced bool // Whether a forced reset was requested, only used in simulator mode
resetWaiter chan error // Channel waiting on a forced reset, only used in simulator mode
)
// Notify the live reset waiter without blocking if the txpool is closed.
defer func() {
if resetWaiter != nil {
select {
case resetWaiter <- errors.New("pool already terminated"):
default:
}
resetWaiter = nil
}
}()
var errc chan error
for errc == nil {
// Something interesting might have happened, run a reset if there is
// one needed but none is running. The resetter will run on its own
// goroutine to allow chain head events to be consumed contiguously.
if newHead != oldHead {
if newHead != oldHead || resetForced {
// Try to inject a busy marker and start a reset if successful
select {
case resetBusy <- struct{}{}:
Expand All @@ -144,11 +166,23 @@ func (p *TxPool) loop(head *types.Header, chain BlockChain) {
for _, subpool := range p.subpools {
subpool.Reset(oldHead, newHead)
}
resetDone <- newHead
select {
case resetDone <- newHead:
case <-p.term:
}
}(oldHead, newHead)

// If the reset operation was explicitly requested, consider it
// being fulfilled and drop the request marker. If it was not,
// this is a noop.
resetForced = false

default:
// Reset already running, wait until it finishes
// Reset already running, wait until it finishes.
//
// Note, this will not drop any forced reset request. If a forced
// reset was requested, but we were busy, then when the currently
// running reset finishes, a new one will be spun up.
}
}
// Wait for the next chain head event or a previous reset finish
Expand All @@ -162,8 +196,37 @@ func (p *TxPool) loop(head *types.Header, chain BlockChain) {
oldHead = head
<-resetBusy

// If someone is waiting for a reset to finish, notify them, unless
// the forced op is still pending. In that case, wait another round
// of resets.
if resetWaiter != nil && !resetForced {
select {
case resetWaiter <- nil:
// notification delivered
default:
// no active listener; avoid blocking the event loop
}
resetWaiter = nil
}

case errc = <-p.quit:
// Termination requested, break out on the next loop round

case syncc := <-p.sync:
// Transaction pool is running inside a simulator, and we are about
// to create a new block. Request a forced sync operation to ensure
// that any running reset operation finishes to make block imports
// deterministic. On top of that, run a new reset operation to make
// transaction insertions deterministic instead of being stuck in a
// queue waiting for a reset.
if resetWaiter != nil {
// A previous sync waiter is still pending; notify it to avoid
// leaking a goroutine waiting on the old channel.
resetWaiter <- errors.New("sync request superseded by a new request")
resetWaiter = nil
}
resetForced = true
resetWaiter = syncc
}
}
// Notify the closer of termination (no error possible for now)
Expand Down Expand Up @@ -375,3 +438,20 @@ func (pool *TxPool) IsSigner(addr common.Address) bool {
}
return false
}

// Sync is a helper method for unit tests or simulator runs where the chain events
// are arriving in quick succession, without any time in between them to run the
// internal background reset operations. This method will run an explicit reset
// operation to ensure the pool stabilises, thus avoiding flakey behavior.
//
// Note, do not use this in production / live code. In live code, the pool is
// meant to reset on a separate thread to avoid DoS vectors.
func (p *TxPool) Sync() error {
sync := make(chan error)
select {
case p.sync <- sync:
return <-sync
case <-p.term:
return errors.New("pool already terminated")
}
}
Loading