Skip to content

Commit 6a3eda1

Browse files
authored
Revert "feat(tx-submitter): optimize rollup transaction processing" (#738)
1 parent f923054 commit 6a3eda1

37 files changed

Lines changed: 789 additions & 3253 deletions

tx-submitter/.gitignore

Lines changed: 7 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,56 +1,9 @@
1-
# IDE and Editor files
2-
.idea/
3-
.vscode/
4-
*.swp
5-
*.swo
6-
*~
7-
8-
# OS generated files
9-
.DS_Store
10-
.DS_Store?
11-
._*
12-
.Spotlight-V100
13-
.Trashes
14-
ehthumbs.db
15-
Thumbs.db
16-
17-
# Go specific
18-
*.exe
19-
*.exe~
20-
*.dll
21-
*.so
22-
*.dylib
23-
*.test
24-
*.out
25-
go.work
26-
27-
# Binary files
28-
tx-submitter
29-
**/tx-submitter
30-
build/
31-
*debug_bin*
32-
33-
# Config and Environment files
1+
.idea
342
.env*
35-
36-
# Test and Debug files
37-
test/
38-
testleveldb/
39-
test_data/
40-
debug/
41-
*.test
3+
build
4+
tx-submitter
5+
.DS_Store
6+
.vscode
427
*.log
43-
*.pprof
44-
cpu.prof
45-
mem.prof
46-
trace.out
47-
48-
# Coverage reports
49-
coverage.txt
50-
51-
# Transaction files
52-
journal.rlp
53-
journal*.rlp
54-
55-
# Temporary files
56-
tmp/
8+
*debug_bin*
9+
journal.rlp

tx-submitter/constants/methods.go

Lines changed: 0 additions & 8 deletions
This file was deleted.

tx-submitter/db/db.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,10 @@ import (
88
"morph-l2/tx-submitter/utils"
99

1010
"github.com/morph-l2/go-ethereum/ethdb/leveldb"
11-
"github.com/syndtr/goleveldb/leveldb/errors"
1211
)
1312

1413
var (
15-
ErrKeyNotFound = errors.ErrNotFound
14+
ErrKeyNotFound = fmt.Errorf("not found")
1615
)
1716

1817
type Db struct {
@@ -56,9 +55,6 @@ func (d *Db) GetString(key string) (string, error) {
5655
defer d.m.Unlock()
5756
v, err := d.db.Get([]byte(key))
5857
if err != nil {
59-
if err == errors.ErrNotFound {
60-
return "", ErrKeyNotFound
61-
}
6258
return "", fmt.Errorf("failed to get key from leveldb %w", err)
6359
}
6460
return string(v), nil

tx-submitter/db/interface.go

Lines changed: 0 additions & 10 deletions
This file was deleted.

tx-submitter/event/indexer.go

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@ type EventIndexer struct {
1616
deployBlock *big.Int // Block number of contract deployment
1717
filterQuery ethereum.FilterQuery
1818
indexStep uint64 // index step
19-
storage IEventStorage
19+
storage *EventInfoStorage
2020
}
2121

22-
func NewEventIndexer(client *ethclient.Client, deployedBlock *big.Int, filter ethereum.FilterQuery, indexStep uint64, storage IEventStorage) *EventIndexer {
22+
func NewEventIndexer(client *ethclient.Client, deployedBlock *big.Int, filter ethereum.FilterQuery, indexStep uint64, storage *EventInfoStorage) *EventIndexer {
2323
return &EventIndexer{
2424
client: client,
2525
deployBlock: deployedBlock,
@@ -32,8 +32,8 @@ func NewEventIndexer(client *ethclient.Client, deployedBlock *big.Int, filter et
3232
func (l *EventIndexer) Index() {
3333
log.Info("event indexer started")
3434

35-
if l.storage.BlockProcessed() == 0 {
36-
l.storage.SetBlockProcessed(l.deployBlock.Uint64())
35+
if l.storage.BlockProcessed == 0 {
36+
l.storage.BlockProcessed = l.deployBlock.Uint64()
3737
err := l.storage.Store()
3838
if err != nil {
3939
log.Error("failed to store initial block number", "error", err)
@@ -45,40 +45,43 @@ func (l *EventIndexer) Index() {
4545
defer ticker.Stop()
4646

4747
for range ticker.C {
48+
4849
// Get the current block number
4950
currentBlock, err := l.client.BlockNumber(context.Background())
5051
if err != nil {
5152
log.Error("failed to get current block number", "error", err)
5253
continue
5354
}
5455

55-
if currentBlock <= l.storage.BlockProcessed() {
56-
log.Info("no new block to index", "current_block", currentBlock, "last_processed_block", l.storage.BlockProcessed())
56+
if currentBlock <= l.storage.BlockProcessed {
57+
log.Info("no new block to index", "current_block", currentBlock, "last_processed_block", l.storage.BlockProcessed)
5758
continue
5859
}
5960

6061
// Perform indexing operation
61-
indexedEventInfo, err := l.index(l.client, big.NewInt(int64(l.storage.BlockProcessed())), big.NewInt(int64(currentBlock)))
62+
indexedEventInfo, err := l.index(l.client, big.NewInt(int64(l.storage.BlockProcessed)), big.NewInt(int64(currentBlock)))
6263
if err != nil {
6364
log.Error("indexing operation failed", "error", err)
6465
continue
6566
}
6667

6768
if indexedEventInfo != nil {
68-
l.storage.SetBlockProcessed(indexedEventInfo.BlockProcessed)
69+
l.storage.EventInfo = *indexedEventInfo
6970
} else {
70-
l.storage.SetBlockProcessed(currentBlock)
71+
l.storage.EventInfo = EventInfo{
72+
BlockProcessed: currentBlock,
73+
}
7174
}
72-
7375
// Update storage
7476
err = l.storage.Store()
7577
if err != nil {
7678
log.Error("event index complete, failed to update storage", "error", err)
7779
} else {
78-
info := l.storage.EventInfo()
79-
log.Info("event index complete, storage updated", "processed_block", info.BlockProcessed, "block_time", info.BlockTime)
80+
log.Info("event index complete, storage updated", "processed_block", l.storage.EventInfo.BlockProcessed, "block_time", l.storage.EventInfo.BlockTime)
8081
}
82+
8183
}
84+
8285
}
8386

8487
// filter logs from from_block to to_block
@@ -140,6 +143,6 @@ func (ei *EventIndexer) index(client *ethclient.Client, fromBlock, toBlock *big.
140143
func (l *EventIndexer) GetFilter() ethereum.FilterQuery {
141144
return l.filterQuery
142145
}
143-
func (l *EventIndexer) GetStorage() IEventStorage {
146+
func (l *EventIndexer) GetStorage() *EventInfoStorage {
144147
return l.storage
145148
}

tx-submitter/event/storage.go

Lines changed: 22 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,12 @@ import (
77

88
"morph-l2/tx-submitter/db"
99
"morph-l2/tx-submitter/params"
10+
"morph-l2/tx-submitter/utils"
1011
)
1112

1213
type IEventStorage interface {
1314
Store() error
1415
Load() error
15-
BlockProcessed() uint64
16-
SetBlockProcessed(blockNum uint64)
17-
BlockTime() uint64
18-
SetBlockTime(blockTime uint64)
19-
EventInfo() EventInfo
2016
}
2117

2218
type EventInfo struct {
@@ -25,20 +21,21 @@ type EventInfo struct {
2521
}
2622

2723
type EventInfoStorage struct {
28-
eventInfo EventInfo
29-
db db.Database
30-
mu sync.RWMutex
24+
EventInfo
25+
db *db.Db
26+
mu sync.Mutex
3127
}
3228

33-
func NewEventInfoStorage(db db.Database) *EventInfoStorage {
29+
func NewEventInfoStorage(db *db.Db) *EventInfoStorage {
3430
return &EventInfoStorage{
3531
db: db,
3632
}
3733
}
3834

3935
func (e *EventInfoStorage) Store() error {
36+
4037
// Convert struct to JSON string
41-
jsonData, err := json.Marshal(e.eventInfo)
38+
jsonData, err := json.Marshal(e.EventInfo)
4239
if err != nil {
4340
return fmt.Errorf("failed to convert struct to JSON: %w", err)
4441
}
@@ -52,57 +49,31 @@ func (e *EventInfoStorage) Store() error {
5249
}
5350
return nil
5451
}
55-
5652
func (e *EventInfoStorage) Load() error {
5753
e.mu.Lock()
5854
defer e.mu.Unlock()
59-
60-
jsonStr, err := e.db.GetString(params.EventInfoKey)
55+
evnetInfo, err := e.db.GetString(params.EventInfoKey)
6156
if err != nil {
62-
if err == db.ErrKeyNotFound {
63-
// Initialize with default values if not found
64-
e.eventInfo = EventInfo{}
57+
if utils.ErrStringMatch(err, db.ErrKeyNotFound) {
58+
e.EventInfo = EventInfo{}
59+
jsonData, err := json.Marshal(e.EventInfo)
60+
if err != nil {
61+
return fmt.Errorf("failed to marshal json: %w", err)
62+
}
63+
err = e.db.PutString(params.EventInfoKey, string(jsonData))
64+
if err != nil {
65+
return fmt.Errorf("failed to init eventinfo to db: %w", err)
66+
}
6567
return nil
6668
}
67-
return fmt.Errorf("failed to read from db: %w", err)
69+
return fmt.Errorf("failed to load eventinfo from db: %w", err)
6870
}
6971

70-
err = json.Unmarshal([]byte(jsonStr), &e.eventInfo)
72+
// parse json data to struct
73+
err = json.Unmarshal([]byte(evnetInfo), &e.EventInfo)
7174
if err != nil {
72-
return fmt.Errorf("failed to parse JSON: %w", err)
75+
return fmt.Errorf("failed to unmarshal JSON: %w", err)
7376
}
7477

7578
return nil
7679
}
77-
78-
func (e *EventInfoStorage) BlockProcessed() uint64 {
79-
e.mu.RLock()
80-
defer e.mu.RUnlock()
81-
return e.eventInfo.BlockProcessed
82-
}
83-
84-
func (e *EventInfoStorage) SetBlockProcessed(blockNum uint64) {
85-
e.mu.Lock()
86-
defer e.mu.Unlock()
87-
e.eventInfo.BlockProcessed = blockNum
88-
}
89-
90-
func (e *EventInfoStorage) BlockTime() uint64 {
91-
e.mu.RLock()
92-
defer e.mu.RUnlock()
93-
return e.eventInfo.BlockTime
94-
}
95-
96-
func (e *EventInfoStorage) SetBlockTime(blockTime uint64) {
97-
e.mu.Lock()
98-
defer e.mu.Unlock()
99-
e.eventInfo.BlockTime = blockTime
100-
}
101-
102-
// EventInfo returns a copy of the current event info.
103-
// This ensures thread safety without exposing the internal state.
104-
func (e *EventInfoStorage) EventInfo() EventInfo {
105-
e.mu.RLock()
106-
defer e.mu.RUnlock()
107-
return e.eventInfo
108-
}

tx-submitter/event/storage_test.go

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,39 +2,27 @@ package event
22

33
import (
44
"morph-l2/tx-submitter/db"
5-
"os"
6-
"path/filepath"
75
"testing"
86

97
"github.com/stretchr/testify/require"
108
)
119

1210
func TestEventInfoStorage(t *testing.T) {
13-
// Create a unique test directory
14-
testDir := filepath.Join(t.TempDir(), "testleveldb")
1511

16-
// Cleanup before test (in case it exists)
17-
os.RemoveAll(testDir)
18-
19-
// Cleanup after test
20-
t.Cleanup(func() {
21-
os.RemoveAll(testDir)
22-
})
23-
24-
db, err := db.New(testDir)
12+
db, err := db.New("./testleveldb")
2513
require.NoError(t, err)
2614
storage := NewEventInfoStorage(db)
2715
err = storage.Load()
2816
require.NoError(t, err)
2917

30-
storage.SetBlockTime(100)
31-
storage.SetBlockProcessed(100)
18+
storage.BlockTime = 100
19+
storage.BlockProcessed = 100
3220
err = storage.Store()
3321
require.NoError(t, err)
3422

3523
storage2 := NewEventInfoStorage(db)
3624
err = storage2.Load()
3725
require.NoError(t, err)
38-
require.Equal(t, storage.BlockTime(), storage2.BlockTime())
39-
require.Equal(t, storage.BlockProcessed(), storage2.BlockProcessed())
26+
require.Equal(t, storage.BlockTime, storage2.BlockTime)
27+
require.Equal(t, storage.BlockProcessed, storage2.BlockProcessed)
4028
}

tx-submitter/iface/metrics.go

Lines changed: 0 additions & 7 deletions
This file was deleted.

tx-submitter/iface/reorg_detector.go

Lines changed: 0 additions & 13 deletions
This file was deleted.

tx-submitter/localpool/journal.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func (j *Journal) ParseAllTxs() ([]*types.Transaction, error) {
8282
content, err := readFileContent(j.path)
8383

8484
if err != nil {
85-
return nil, fmt.Errorf("failed to read journal file: %w", err)
85+
return nil, fmt.Errorf("failed to parse txs: %w", err)
8686
}
8787

8888
if len(content) == 0 {

0 commit comments

Comments
 (0)