From d1c4e758643b0a7f89380fbedf369ed04a0d4efe Mon Sep 17 00:00:00 2001 From: mmsqe Date: Wed, 11 Mar 2026 18:00:35 +0800 Subject: [PATCH 1/2] Problem: concurrent map writes from ProcessResults bump deps --- chains/ethereum/metrics/collector.go | 11 ++++++++--- go.mod | 11 ++++++----- go.sum | 8 ++++++++ 3 files changed, 22 insertions(+), 8 deletions(-) diff --git a/chains/ethereum/metrics/collector.go b/chains/ethereum/metrics/collector.go index d9f3ac1..45dfeb8 100644 --- a/chains/ethereum/metrics/collector.go +++ b/chains/ethereum/metrics/collector.go @@ -32,6 +32,7 @@ func ProcessResults( wg := sync.WaitGroup{} blockStats := make([]loadtesttypes.BlockStat, endBlock-startBlock+1) receipts := make(map[uint64]gethtypes.Receipts) + var receiptsMu sync.Mutex fetchReceiptsConcurrently := config.EnvFromContext(ctx).ConcurrentReceipts @@ -49,7 +50,8 @@ func ProcessResults( //nolint:gosec // G115: overflow unlikely in practice blockNumBig := big.NewInt(int64(blockNum)) - go func() { + idx := blockNum - startBlock + go func(blockNum uint64, blockNumBig *big.Int, idx uint64) { defer wg.Done() start := time.Now() @@ -68,9 +70,11 @@ func ProcessResults( } if len(blockReceipts) > 0 { + receiptsMu.Lock() receipts[blockReceipts[0].BlockNumber.Uint64()] = blockReceipts + receiptsMu.Unlock() } - blockStats[blockNum-startBlock] = buildBlockStats(block, blockReceipts) + blockStats[idx] = buildBlockStats(block, blockReceipts) logger.Info( "Block collected", @@ -78,7 +82,7 @@ func ProcessResults( zap.Int("receipts", len(blockReceipts)), zap.String("duration", time.Since(start).String()), ) - }() + }(blockNum, blockNumBig, idx) } wg.Wait() @@ -240,6 +244,7 @@ func getReceiptsForBlockTxs( eg.SetLimit(concurrency) for i := 0; i < len(txs); i++ { + i := i eg.Go(func() error { hash := txs[i].Hash() receipt, err := client.TransactionReceipt(ctx, hash) diff --git a/go.mod b/go.mod index 51f3500..e930458 100644 --- a/go.mod +++ b/go.mod @@ -8,11 +8,13 @@ require ( github.com/cosmos/cosmos-sdk v0.53.4 github.com/cosmos/evm v0.3.2 github.com/ethereum/go-ethereum v1.15.11 + github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.22.0 github.com/skip-mev/ironbird v0.0.0-20251024154444-c72f64c42974 github.com/stretchr/testify v1.10.0 github.com/wk8/go-ordered-map/v2 v2.1.9-0.20250401010720-46d686821e33 go.uber.org/zap v1.27.0 + golang.org/x/sync v0.18.0 google.golang.org/grpc v1.74.2 gopkg.in/yaml.v3 v3.0.1 ) @@ -60,12 +62,13 @@ require ( github.com/btcsuite/btcd/btcutil v1.1.6 // indirect github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 // indirect github.com/buger/jsonparser v1.1.1 // indirect - github.com/bytedance/sonic v1.14.0 // indirect - github.com/bytedance/sonic/loader v0.3.0 // indirect + github.com/bytedance/gopkg v0.1.3 // indirect + github.com/bytedance/sonic v1.15.0 // indirect + github.com/bytedance/sonic/loader v0.5.0 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cilium/ipam v0.0.0-20230509084518-fd66eae7909b // indirect - github.com/cloudwego/base64x v0.1.5 // indirect + github.com/cloudwego/base64x v0.1.6 // indirect github.com/cockroachdb/errors v1.12.0 // indirect github.com/cockroachdb/fifo v0.0.0-20240616162244-4768e80dfb9a // indirect github.com/cockroachdb/logtags v0.0.0-20241215232642-bb51bb14a506 // indirect @@ -209,7 +212,6 @@ require ( github.com/pion/stun/v2 v2.0.0 // indirect github.com/pion/transport/v2 v2.2.10 // indirect github.com/pion/transport/v3 v3.0.7 // indirect - github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus-community/pro-bing v0.4.0 // indirect github.com/prometheus/client_model v0.6.2 // indirect @@ -277,7 +279,6 @@ require ( golang.org/x/exp v0.0.0-20250606033433-dcc06ee1d476 // indirect golang.org/x/mod v0.29.0 // indirect golang.org/x/net v0.47.0 // indirect - golang.org/x/sync v0.18.0 // indirect golang.org/x/sys v0.38.0 // indirect golang.org/x/term v0.37.0 // indirect golang.org/x/text v0.31.0 // indirect diff --git a/go.sum b/go.sum index 5635751..e575c76 100644 --- a/go.sum +++ b/go.sum @@ -185,11 +185,17 @@ github.com/bufbuild/protocompile v0.14.1 h1:iA73zAf/fyljNjQKwYzUHD6AD4R8KMasmwa/ github.com/bufbuild/protocompile v0.14.1/go.mod h1:ppVdAIhbr2H8asPk6k4pY7t9zB1OU5DoEw9xY/FUi1c= github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs= github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= +github.com/bytedance/gopkg v0.1.3 h1:TPBSwH8RsouGCBcMBktLt1AymVo2TVsBVCY4b6TnZ/M= +github.com/bytedance/gopkg v0.1.3/go.mod h1:576VvJ+eJgyCzdjS+c4+77QF3p7ubbtiKARP3TxducM= github.com/bytedance/sonic v1.14.0 h1:/OfKt8HFw0kh2rj8N0F6C/qPGRESq0BbaNZgcNXXzQQ= github.com/bytedance/sonic v1.14.0/go.mod h1:WoEbx8WTcFJfzCe0hbmyTGrfjt8PzNEBdxlNUO24NhA= +github.com/bytedance/sonic v1.15.0 h1:/PXeWFaR5ElNcVE84U0dOHjiMHQOwNIx3K4ymzh/uSE= +github.com/bytedance/sonic v1.15.0/go.mod h1:tFkWrPz0/CUCLEF4ri4UkHekCIcdnkqXw9VduqpJh0k= github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= github.com/bytedance/sonic/loader v0.3.0 h1:dskwH8edlzNMctoruo8FPTJDF3vLtDT0sXZwvZJyqeA= github.com/bytedance/sonic/loader v0.3.0/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI= +github.com/bytedance/sonic/loader v0.5.0 h1:gXH3KVnatgY7loH5/TkeVyXPfESoqSBSBEiDd5VjlgE= +github.com/bytedance/sonic/loader v0.5.0/go.mod h1:AR4NYCk5DdzZizZ5djGqQ92eEhCCcdf5x77udYiSJRo= github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/cenkalti/backoff/v4 v4.1.1/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= @@ -217,6 +223,8 @@ github.com/clbanning/x2j v0.0.0-20191024224557-825249438eec/go.mod h1:jMjuTZXRI4 github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cloudwego/base64x v0.1.5 h1:XPciSp1xaq2VCSt6lF0phncD4koWyULpl5bUxbfCyP4= github.com/cloudwego/base64x v0.1.5/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w= +github.com/cloudwego/base64x v0.1.6 h1:t11wG9AECkCDk5fMSoxmufanudBtJ+/HemLstXDLI2M= +github.com/cloudwego/base64x v0.1.6/go.mod h1:OFcloc187FXDaYHvrNIjxSe8ncn0OOM8gEHfghB2IPU= github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= From 9cb4279524769d3d5a6c2f5a87fe2496f5cc943e Mon Sep 17 00:00:00 2001 From: mmsqe Date: Wed, 11 Mar 2026 18:13:10 +0800 Subject: [PATCH 2/2] add test --- chains/ethereum/metrics/collector_test.go | 88 +++++++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/chains/ethereum/metrics/collector_test.go b/chains/ethereum/metrics/collector_test.go index 8cbc176..237d31c 100644 --- a/chains/ethereum/metrics/collector_test.go +++ b/chains/ethereum/metrics/collector_test.go @@ -1,14 +1,102 @@ package metrics import ( + "context" + "crypto/ecdsa" + "math/big" + "reflect" "testing" "time" + "github.com/ethereum/go-ethereum/common" + gethtypes "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/ethclient/simulated" "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" + ethtypes "github.com/skip-mev/catalyst/chains/ethereum/types" loadtesttypes "github.com/skip-mev/catalyst/chains/types" + "github.com/skip-mev/catalyst/config" ) +func mustSignedTransferTx( + t *testing.T, + key *ecdsa.PrivateKey, + chainID *big.Int, + gasTipCap *big.Int, + gasFeeCap *big.Int, + nonce uint64, + to common.Address, +) *gethtypes.Transaction { + t.Helper() + tx := gethtypes.NewTx(&gethtypes.DynamicFeeTx{ + ChainID: chainID, + Nonce: nonce, + GasTipCap: gasTipCap, + GasFeeCap: gasFeeCap, + Gas: 21_000, + To: &to, + Value: big.NewInt(1), + }) + signedTx, err := gethtypes.SignTx(tx, gethtypes.LatestSignerForChainID(chainID), key) + require.NoError(t, err) + return signedTx +} + +func TestProcessResultsConcurrent(t *testing.T) { + ctx := config.WithEnv(context.Background(), config.Env{ConcurrentReceipts: true}) + logger := zaptest.NewLogger(t) + key, err := crypto.GenerateKey() + require.NoError(t, err) + from := crypto.PubkeyToAddress(key.PublicKey) + alloc := gethtypes.GenesisAlloc{ + from: {Balance: big.NewInt(9_000_000_000_000_000_000)}, + } + + sim := simulated.NewBackend(alloc) + client := sim.Client() + defer sim.Close() + + chainID, err := client.ChainID(ctx) + require.NoError(t, err) + header, err := client.HeaderByNumber(ctx, nil) + require.NoError(t, err) + require.NotNil(t, header.BaseFee) + + gasTipCap := big.NewInt(2_000_000_000) + gasFeeCap := new(big.Int).Add(new(big.Int).Mul(header.BaseFee, big.NewInt(2)), gasTipCap) + blocks := 150 + sentTxs := make([]*ethtypes.SentTx, 0, blocks) + for i := 0; i < blocks; i++ { + to := common.BigToAddress(big.NewInt(int64(i + 10_000))) + tx := mustSignedTransferTx(t, key, chainID, gasTipCap, gasFeeCap, uint64(i), to) + require.NoError(t, client.SendTransaction(ctx, tx)) + sim.Commit() + sentTxs = append(sentTxs, ðtypes.SentTx{TxHash: tx.Hash(), MsgType: ethtypes.ContractCall, Tx: tx}) + } + + endBlock, err := client.BlockNumber(ctx) + require.NoError(t, err) + require.GreaterOrEqual(t, endBlock, uint64(blocks)) + + f := reflect.Indirect(reflect.ValueOf(client)).FieldByName("Client") + require.True(t, f.IsValid() && f.CanInterface()) + ec, ok := f.Interface().(*ethclient.Client) + require.True(t, ok) + require.NotNil(t, ec) + clients := []*ethclient.Client{ec} + require.NotPanics(t, func() { + for run := 0; run < 25; run++ { + result, err := ProcessResults(ctx, logger, sentTxs, 1, endBlock, clients) + require.NoError(t, err) + require.NotNil(t, result) + require.Greater(t, result.Overall.TotalIncludedTransactions, 0) + } + }) +} + func TestTrimBlocks(t *testing.T) { tests := []struct { name string