diff --git a/core/txpool/locals/tx_tracker_test.go b/core/txpool/locals/tx_tracker_test.go new file mode 100644 index 00000000000..56b13cf63bc --- /dev/null +++ b/core/txpool/locals/tx_tracker_test.go @@ -0,0 +1,124 @@ +// Copyright 2025 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package locals + +import ( + "math/big" + "testing" + "time" + + "github.com/XinFinOrg/XDPoSChain/common" + "github.com/XinFinOrg/XDPoSChain/consensus/ethash" + "github.com/XinFinOrg/XDPoSChain/core" + "github.com/XinFinOrg/XDPoSChain/core/rawdb" + "github.com/XinFinOrg/XDPoSChain/core/txpool" + "github.com/XinFinOrg/XDPoSChain/core/txpool/legacypool" + "github.com/XinFinOrg/XDPoSChain/core/types" + "github.com/XinFinOrg/XDPoSChain/core/vm" + "github.com/XinFinOrg/XDPoSChain/crypto" + "github.com/XinFinOrg/XDPoSChain/ethdb" + "github.com/XinFinOrg/XDPoSChain/params" +) + +var ( + key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + address = crypto.PubkeyToAddress(key.PublicKey) + funds = big.NewInt(1000000000000000) + gspec = &core.Genesis{ + Config: params.TestChainConfig, + Alloc: types.GenesisAlloc{ + address: {Balance: funds}, + }, + BaseFee: big.NewInt(params.InitialBaseFee), + } + signer = types.LatestSigner(gspec.Config) +) + +type testEnv struct { + chain *core.BlockChain + pool *txpool.TxPool + tracker *TxTracker + genDb ethdb.Database +} + +func newTestEnv(t *testing.T, n int, gasTip uint64, journal string) *testEnv { + genDb, blocks, _ := core.GenerateChainWithGenesis(gspec, ethash.NewFaker(), n, func(i int, gen *core.BlockGen) { + tx, err := types.SignTx(types.NewTransaction(gen.TxNonce(address), common.Address{0x00}, big.NewInt(1000), params.TxGas, gen.BaseFee(), nil), signer, key) + if err != nil { + panic(err) + } + gen.AddTx(tx) + }) + + db := rawdb.NewMemoryDatabase() + chain, _ := core.NewBlockChain(db, nil, gspec, ethash.NewFaker(), vm.Config{}) + + legacyPool := legacypool.New(legacypool.DefaultConfig, chain) + pool, err := txpool.New(gasTip, chain, []txpool.SubPool{legacyPool}) + if err != nil { + t.Fatalf("Failed to create tx pool: %v", err) + } + if n, err := chain.InsertChain(blocks); err != nil { + t.Fatalf("Failed to process block %d: %v", n, err) + } + if err := pool.Sync(); err != nil { + t.Fatalf("Failed to sync the txpool, %v", err) + } + return &testEnv{ + chain: chain, + pool: pool, + tracker: New(journal, time.Minute, gspec.Config, pool), + genDb: genDb, + } +} + +func (env *testEnv) close() { + env.chain.Stop() +} + +func (env *testEnv) setGasTip(gasTip uint64) { + env.pool.SetGasTip(new(big.Int).SetUint64(gasTip)) +} + +func (env *testEnv) makeTx(nonce uint64, gasPrice *big.Int) *types.Transaction { + if nonce == 0 { + head := env.chain.CurrentHeader() + state, _ := env.chain.StateAt(head.Root) + nonce = state.GetNonce(address) + } + if gasPrice == nil { + gasPrice = big.NewInt(params.GWei) + } + tx, _ := types.SignTx(types.NewTransaction(nonce, common.Address{0x00}, big.NewInt(1000), params.TxGas, gasPrice, nil), signer, key) + return tx +} + +func (env *testEnv) commit() { + head := env.chain.CurrentBlock() + block := env.chain.GetBlock(head.Hash(), head.Number.Uint64()) + blocks, _ := core.GenerateChain(env.chain.Config(), block, ethash.NewFaker(), env.genDb, 1, func(i int, gen *core.BlockGen) { + tx, err := types.SignTx(types.NewTransaction(gen.TxNonce(address), common.Address{0x00}, big.NewInt(1000), params.TxGas, gen.BaseFee(), nil), signer, key) + if err != nil { + panic(err) + } + gen.AddTx(tx) + }) + env.chain.InsertChain(blocks) + if err := env.pool.Sync(); err != nil { + panic(err) + } +} diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 5f2cae60443..66532c5d434 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -21,11 +21,15 @@ import ( "fmt" "maps" "math/big" + "sync" "github.com/XinFinOrg/XDPoSChain/common" "github.com/XinFinOrg/XDPoSChain/core" + "github.com/XinFinOrg/XDPoSChain/core/state" "github.com/XinFinOrg/XDPoSChain/core/types" "github.com/XinFinOrg/XDPoSChain/event" + "github.com/XinFinOrg/XDPoSChain/log" + "github.com/XinFinOrg/XDPoSChain/params" ) // TxStatus is the current status of a transaction as seen by the pool. @@ -41,11 +45,17 @@ const ( // BlockChain defines the minimal set of methods needed to back a tx pool with // a chain. Exists to allow mocking the live chain out of tests. type BlockChain interface { + // Config retrieves the chain's fork configuration. + Config() *params.ChainConfig + // CurrentBlock returns the current head of the chain. CurrentBlock() *types.Header // SubscribeChainHeadEvent subscribes to new blocks being added to the chain. SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription + + // StateAt returns a state database for a given root hash (generally the head). + StateAt(root common.Hash) (*state.StateDB, error) } // TxPool is an aggregator for various transaction specific pools, collectively @@ -55,6 +65,10 @@ type BlockChain interface { // resource constraints. type TxPool struct { subpools []SubPool // List of subpools for specialized transaction handling + chain BlockChain + + stateLock sync.RWMutex // The lock for protecting state instance + state *state.StateDB // Current state at the blockchain head localTracker LocalTracker // Optional tracker for local tx submissions @@ -80,8 +94,20 @@ func New(gasTip uint64, chain BlockChain, subpools []SubPool) (*TxPool, error) { // during initialization. head := chain.CurrentBlock() + // Initialize the state with head block, or fallback to empty one in + // case the head state is not available (might occur when node is not + // fully synced). + statedb, err := chain.StateAt(head.Root) + if err != nil { + statedb, err = chain.StateAt(types.EmptyRootHash) + } + if err != nil { + return nil, err + } pool := &TxPool{ subpools: subpools, + chain: chain, + state: statedb, quit: make(chan chan error), term: make(chan struct{}), sync: make(chan chan error), @@ -95,7 +121,7 @@ func New(gasTip uint64, chain BlockChain, subpools []SubPool) (*TxPool, error) { return nil, err } } - go pool.loop(head, chain) + go pool.loop(head) return pool, nil } @@ -127,14 +153,14 @@ func (p *TxPool) Close() error { // loop is the transaction pool's main event loop, waiting for and reacting to // outside blockchain events as well as for various reporting and transaction // eviction events. -func (p *TxPool) loop(head *types.Header, chain BlockChain) { +func (p *TxPool) loop(head *types.Header) { // Close the termination marker when the pool stops defer close(p.term) // Subscribe to chain head events to trigger subpool resets var ( newHeadCh = make(chan core.ChainHeadEvent) - newHeadSub = chain.SubscribeChainHeadEvent(newHeadCh) + newHeadSub = p.chain.SubscribeChainHeadEvent(newHeadCh) ) defer newHeadSub.Unsubscribe() @@ -170,6 +196,16 @@ func (p *TxPool) loop(head *types.Header, chain BlockChain) { // Try to inject a busy marker and start a reset if successful select { case resetBusy <- struct{}{}: + // Updates the statedb with the new chain head. The head state may be + // unavailable if the initial state sync has not yet completed. + if statedb, err := p.chain.StateAt(newHead.Root); err != nil { + log.Error("Failed to reset txpool state", "err", err) + } else { + p.stateLock.Lock() + p.state = statedb + p.stateLock.Unlock() + } + // Busy marker injected, start a new subpool reset go func(oldHead, newHead *types.Header) { for _, subpool := range p.subpools { @@ -455,15 +491,15 @@ func (pool *TxPool) IsSigner(addr common.Address) bool { // Sync is a helper method for unit tests or simulator runs where the chain events // are arriving in quick succession, without any time in between them to run the // internal background reset operations. This method will run an explicit reset -// operation to ensure the pool stabilises, thus avoiding flakey behavior. +// operation to ensure the pool stabilises, thus avoiding flaky behavior. // // Note, this method is only used for testing and is susceptible to DoS vectors. // In production code, the pool is meant to reset on a separate thread. func (p *TxPool) Sync() error { - sync := make(chan error) + waiter := make(chan error) select { - case p.sync <- sync: - return <-sync + case p.sync <- waiter: + return <-waiter case <-p.term: return errors.New("pool already terminated") } diff --git a/core/txpool/txpool_local_test.go b/core/txpool/txpool_local_test.go index 1613ccbf8af..75acc9aa9fb 100644 --- a/core/txpool/txpool_local_test.go +++ b/core/txpool/txpool_local_test.go @@ -9,14 +9,23 @@ import ( "github.com/XinFinOrg/XDPoSChain/common" "github.com/XinFinOrg/XDPoSChain/core" + "github.com/XinFinOrg/XDPoSChain/core/rawdb" + "github.com/XinFinOrg/XDPoSChain/core/state" "github.com/XinFinOrg/XDPoSChain/core/types" "github.com/XinFinOrg/XDPoSChain/event" + "github.com/XinFinOrg/XDPoSChain/params" ) type testChain struct{} +func (testChain) Config() *params.ChainConfig { return params.TestChainConfig } + func (testChain) CurrentBlock() *types.Header { return &types.Header{Number: big.NewInt(0)} } +func (testChain) StateAt(common.Hash) (*state.StateDB, error) { + return state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase())) +} + func (testChain) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription { return event.NewSubscription(func(quit <-chan struct{}) error { <-quit