-
Notifications
You must be signed in to change notification settings - Fork 70
fix(core/txpool): coordinate reset lifecycle and shutdown signaling #28837 #2132
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -17,6 +17,7 @@ | |||||||||||||||||
| package txpool | ||||||||||||||||||
|
|
||||||||||||||||||
| import ( | ||||||||||||||||||
| "errors" | ||||||||||||||||||
| "fmt" | ||||||||||||||||||
| "maps" | ||||||||||||||||||
| "math/big" | ||||||||||||||||||
|
|
@@ -57,6 +58,9 @@ type TxPool struct { | |||||||||||||||||
|
|
||||||||||||||||||
| subs event.SubscriptionScope // Subscription scope to unsubscribe all on shutdown | ||||||||||||||||||
| quit chan chan error // Quit channel to tear down the head updater | ||||||||||||||||||
| term chan struct{} // Termination channel to detect a closed pool | ||||||||||||||||||
|
|
||||||||||||||||||
| sync chan chan error // Testing / simulator channel to block until internal reset is done | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| // New creates a new transaction pool to gather, sort and filter inbound | ||||||||||||||||||
|
|
@@ -70,6 +74,8 @@ func New(gasTip uint64, chain BlockChain, subpools []SubPool) (*TxPool, error) { | |||||||||||||||||
| pool := &TxPool{ | ||||||||||||||||||
| subpools: subpools, | ||||||||||||||||||
| quit: make(chan chan error), | ||||||||||||||||||
| term: make(chan struct{}), | ||||||||||||||||||
| sync: make(chan chan error), | ||||||||||||||||||
gzliudan marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||
| } | ||||||||||||||||||
| reserver := NewReservationTracker() | ||||||||||||||||||
| for i, subpool := range subpools { | ||||||||||||||||||
|
|
@@ -113,6 +119,9 @@ func (p *TxPool) Close() error { | |||||||||||||||||
| // outside blockchain events as well as for various reporting and transaction | ||||||||||||||||||
| // eviction events. | ||||||||||||||||||
| func (p *TxPool) loop(head *types.Header, chain BlockChain) { | ||||||||||||||||||
| // Close the termination marker when the pool stops | ||||||||||||||||||
| defer close(p.term) | ||||||||||||||||||
|
|
||||||||||||||||||
| // Subscribe to chain head events to trigger subpool resets | ||||||||||||||||||
| var ( | ||||||||||||||||||
| newHeadCh = make(chan core.ChainHeadEvent) | ||||||||||||||||||
|
|
@@ -129,13 +138,26 @@ func (p *TxPool) loop(head *types.Header, chain BlockChain) { | |||||||||||||||||
| var ( | ||||||||||||||||||
| resetBusy = make(chan struct{}, 1) // Allow 1 reset to run concurrently | ||||||||||||||||||
| resetDone = make(chan *types.Header) | ||||||||||||||||||
|
|
||||||||||||||||||
| resetForced bool // Whether a forced reset was requested, only used in simulator mode | ||||||||||||||||||
| resetWaiter chan error // Channel waiting on a forced reset, only used in simulator mode | ||||||||||||||||||
| ) | ||||||||||||||||||
| // Notify the live reset waiter without blocking if the txpool is closed. | ||||||||||||||||||
| defer func() { | ||||||||||||||||||
| if resetWaiter != nil { | ||||||||||||||||||
| select { | ||||||||||||||||||
| case resetWaiter <- errors.New("pool already terminated"): | ||||||||||||||||||
| default: | ||||||||||||||||||
| } | ||||||||||||||||||
| resetWaiter = nil | ||||||||||||||||||
| } | ||||||||||||||||||
| }() | ||||||||||||||||||
| var errc chan error | ||||||||||||||||||
| for errc == nil { | ||||||||||||||||||
| // Something interesting might have happened, run a reset if there is | ||||||||||||||||||
| // one needed but none is running. The resetter will run on its own | ||||||||||||||||||
| // goroutine to allow chain head events to be consumed contiguously. | ||||||||||||||||||
| if newHead != oldHead { | ||||||||||||||||||
| if newHead != oldHead || resetForced { | ||||||||||||||||||
| // Try to inject a busy marker and start a reset if successful | ||||||||||||||||||
| select { | ||||||||||||||||||
| case resetBusy <- struct{}{}: | ||||||||||||||||||
|
|
@@ -147,8 +169,17 @@ func (p *TxPool) loop(head *types.Header, chain BlockChain) { | |||||||||||||||||
| resetDone <- newHead | ||||||||||||||||||
| }(oldHead, newHead) | ||||||||||||||||||
|
|
||||||||||||||||||
| // If the reset operation was explicitly requested, consider it | ||||||||||||||||||
| // being fulfilled and drop the request marker. If it was not, | ||||||||||||||||||
| // this is a noop. | ||||||||||||||||||
| resetForced = false | ||||||||||||||||||
|
|
||||||||||||||||||
| default: | ||||||||||||||||||
| // Reset already running, wait until it finishes | ||||||||||||||||||
| // Reset already running, wait until it finishes. | ||||||||||||||||||
| // | ||||||||||||||||||
| // Note, this will not drop any forced reset request. If a forced | ||||||||||||||||||
| // reset was requested, but we were busy, then when the currently | ||||||||||||||||||
| // running reset finishes, a new one will be spun up. | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
| // Wait for the next chain head event or a previous reset finish | ||||||||||||||||||
|
|
@@ -162,8 +193,37 @@ func (p *TxPool) loop(head *types.Header, chain BlockChain) { | |||||||||||||||||
| oldHead = head | ||||||||||||||||||
| <-resetBusy | ||||||||||||||||||
|
|
||||||||||||||||||
| // If someone is waiting for a reset to finish, notify them, unless | ||||||||||||||||||
| // the forced op is still pending. In that case, wait another round | ||||||||||||||||||
| // of resets. | ||||||||||||||||||
| if resetWaiter != nil && !resetForced { | ||||||||||||||||||
| select { | ||||||||||||||||||
| case resetWaiter <- nil: | ||||||||||||||||||
| // notification delivered | ||||||||||||||||||
| default: | ||||||||||||||||||
| // no active listener; avoid blocking the event loop | ||||||||||||||||||
| } | ||||||||||||||||||
| resetWaiter = nil | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| case errc = <-p.quit: | ||||||||||||||||||
| // Termination requested, break out on the next loop round | ||||||||||||||||||
|
|
||||||||||||||||||
| case syncc := <-p.sync: | ||||||||||||||||||
| // Transaction pool is running inside a simulator, and we are about | ||||||||||||||||||
| // to create a new block. Request a forced sync operation to ensure | ||||||||||||||||||
| // that any running reset operation finishes to make block imports | ||||||||||||||||||
| // deterministic. On top of that, run a new reset operation to make | ||||||||||||||||||
| // transaction insertions deterministic instead of being stuck in a | ||||||||||||||||||
| // queue waiting for a reset. | ||||||||||||||||||
|
||||||||||||||||||
| // queue waiting for a reset. | |
| // queue waiting for a reset. | |
| if resetWaiter != nil { | |
| // A previous sync waiter is still pending; notify it to avoid | |
| // leaking a goroutine waiting on the old channel. | |
| resetWaiter <- errors.New("sync request superseded by a new request") | |
| resetWaiter = nil | |
| } |
Copilot
AI
Mar 4, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The string "pool already terminated" is used in two separate errors.New() calls (line 145 in the defer and line 432 in Sync()), resulting in two distinct error instances. The existing pattern in errors.go defines all package-level errors as exported sentinel variables (e.g., ErrAlreadyKnown, ErrTxPoolOverflow), which allows callers to compare with errors.Is(). A sentinel error such as ErrPoolTerminated would be consistent with this codebase convention and easier to compare programmatically.
Copilot
AI
Mar 4, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new Sync() method, resetWaiter handling, and term channel shutdown signaling introduce complex concurrency behavior that lacks any unit test coverage. There are no test files at the core/txpool package level. Given the complexity of the added synchronization logic (e.g., forced reset lifecycle, waiter notification on pool termination), adding test cases to verify correct behavior and prevent regressions would be valuable. For example, tests for: (1) Sync() unblocking after a reset completes, (2) Sync() returning an error when the pool is closed, (3) correct waiter notification on pool shutdown.
Uh oh!
There was an error while loading. Please reload this page.