Skip to content

Commit ea2f19a

Browse files
committed
fix(lightclient): fix BTC transaction scan gap and add state management to prevent duplicate processing
1 parent 6d13729 commit ea2f19a

2 files changed

Lines changed: 106 additions & 45 deletions

File tree

plugin/dapp/lightclient/rpc/lightclient/neutrino/bitcoin.go

Lines changed: 63 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package neutrino
77
import (
88
"bytes"
99
"encoding/hex"
10+
"errors"
1011
"strings"
1112
"time"
1213

@@ -141,6 +142,11 @@ func (n *neutrinoClient) depositWatcher() {
141142
}
142143
}
143144
func (n *neutrinoClient) commitDepositTx(pendingTx *btcPendingTx) error {
145+
if state := n.getDepositState(pendingTx.txHash[:]); bytes.Equal(state, depositStatusProcessed) {
146+
log.Debug("commitDepositTx already processed", "txHash", pendingTx.txHash.String())
147+
n.bw.removePendingTx(pendingTx.txHash)
148+
return nil
149+
}
144150
spv, err := n.bw.buildTxExistenceProof(pendingTx)
145151
if err != nil {
146152
log.Error("commitDepositTx buildTxExistenceProof", "txHash", pendingTx.txHash.String(), "err", err)
@@ -164,6 +170,10 @@ func (n *neutrinoClient) commitDepositTx(pendingTx *btcPendingTx) error {
164170
},
165171
}
166172
n.submitMainchainTxUntilSuccess(rtypes.RgbxX, rtypes.NameDepositAssetAction, deposit)
173+
if err = n.setDepositState(pendingTx.txHash[:], depositStatusProcessed); err != nil {
174+
log.Error("commitDepositTx setDepositState processed", "txHash", pendingTx.txHash.String(), "err", err)
175+
}
176+
n.bw.removePendingTx(pendingTx.txHash)
167177
log.Debug("commitDepositTx submit deposit success", "btxHash", pendingTx.txHash.String(),
168178
"depositAddr", deposit.GetDepositAddress(), "amount", deposit.GetAmount())
169179
return nil
@@ -202,6 +212,16 @@ func (n *neutrinoClient) processWithdrawRequest(chain33Pending *rtypes.PendingTx
202212
log.Error("processWithdrawRequest broadcastTransaction", "txHash", txHash, "err", err)
203213
return err
204214
}
215+
n.bw.addPendingTx(&btcPendingTx{
216+
tx: tx,
217+
submitTime: types.Now(),
218+
confirmations: 0,
219+
blockHeight: -1,
220+
txHash: tx.TxHash(),
221+
txType: transactionTypeWithdraw,
222+
withdrawAddress: req.toAddress,
223+
chain33WithdrawTxHash: req.chain33WithDrawHash,
224+
})
205225
btcTxHash := tx.TxHash().String()
206226
if err = n.setWithdrawState(chain33Pending.GetTxHash(), withdrawStatusSent); err != nil {
207227
log.Error("processWithdrawRequest setWithdrawState", "txHash", txHash, "btcTxHash", btcTxHash, "err", err)
@@ -220,24 +240,25 @@ var (
220240
withdrawStateBucket = []byte("rgbx-withdraw-state")
221241
withdrawStatusSent = []byte("broadcasted")
222242
withdrawStatusConfirmed = []byte("confirmed")
223-
)
224243

225-
func (n *neutrinoClient) hasWithdrawState(chain33TxHash []byte) bool {
244+
depositStateBucket = []byte("rgbx-deposit-state")
245+
depositStatusProcessed = []byte("processed")
246+
)
226247

248+
func (n *neutrinoClient) getWithdrawState(chain33TxHash []byte) []byte {
227249
var data []byte
228250
err := walletdb.View(n.neutrinoCfg.Database, func(tx walletdb.ReadTx) error {
229-
bucket := tx.ReadBucket([]byte(withdrawStateBucket))
251+
bucket := tx.ReadBucket(withdrawStateBucket)
230252
if bucket == nil {
231253
return walletdb.ErrBucketNotFound
232254
}
233255
data = bucket.Get(chain33TxHash)
234256
return nil
235257
})
236-
if err != nil {
237-
log.Debug("hasWithdrawState read db", "chain33TxHash", hex.EncodeToString(chain33TxHash), "err", err)
238-
return false
258+
if err != nil && !errors.Is(err, walletdb.ErrBucketNotFound) {
259+
log.Error("getWithdrawState", "txHash", hex.EncodeToString(chain33TxHash), "err", err)
239260
}
240-
return len(data) > 0
261+
return data
241262
}
242263

243264
func (n *neutrinoClient) setWithdrawState(txHash []byte, status []byte) error {
@@ -251,6 +272,32 @@ func (n *neutrinoClient) setWithdrawState(txHash []byte, status []byte) error {
251272
})
252273
}
253274

275+
func (n *neutrinoClient) getDepositState(txHash []byte) []byte {
276+
var data []byte
277+
err := walletdb.View(n.neutrinoCfg.Database, func(tx walletdb.ReadTx) error {
278+
bucket := tx.ReadBucket(depositStateBucket)
279+
if bucket == nil {
280+
return walletdb.ErrBucketNotFound
281+
}
282+
data = bucket.Get(txHash)
283+
return nil
284+
})
285+
if err != nil && !errors.Is(err, walletdb.ErrBucketNotFound) {
286+
log.Error("getDepositState", "txHash", hex.EncodeToString(txHash), "err", err)
287+
}
288+
return data
289+
}
290+
291+
func (n *neutrinoClient) setDepositState(txHash []byte, status []byte) error {
292+
return walletdb.Update(n.neutrinoCfg.Database, func(tx walletdb.ReadWriteTx) error {
293+
bucket, err := tx.CreateTopLevelBucket(depositStateBucket)
294+
if err != nil {
295+
return err
296+
}
297+
return bucket.Put(txHash, status)
298+
})
299+
}
300+
254301
func (n *neutrinoClient) getPendingTxBlockIndex(txHash []byte) *rtypes.TxBlockIndex {
255302
hashStr := hex.EncodeToString(txHash)
256303
pendingTx := n.rgbx.pendingCache.getTx(hashStr)
@@ -306,7 +353,7 @@ func (n *neutrinoClient) withdrawalProcessor() {
306353
}
307354

308355
case pending := <-withdrawReqChan: // 向btc链提交提现交易
309-
if n.hasWithdrawState(pending.GetTxHash()) {
356+
if len(n.getWithdrawState(pending.GetTxHash())) > 0 {
310357
log.Debug("withdrawalProcessor hasWithdrawState", "txHash", hex.EncodeToString(pending.GetTxHash()))
311358
continue
312359
}
@@ -376,13 +423,20 @@ func (n *neutrinoClient) processWithdrawConfirm(confirm *confirmWithdraw) bool {
376423
if confirm.confirmTx == nil {
377424
return false
378425
}
379-
380426
confirmHash := hex.EncodeToString(confirm.confirmTx.TxHash)
427+
if state := n.getWithdrawState(confirm.confirmTx.GetTxHash()); bytes.Equal(state, withdrawStatusConfirmed) {
428+
n.rgbx.pendingCache.removeTx(confirmHash)
429+
n.bw.removePendingTx(confirm.btcPending.txHash)
430+
log.Debug("processWithdrawConfirm already confirmed local state", "confirmHash", confirmHash)
431+
return true
432+
}
433+
381434
txHash, err := n.commitWithdrawConfirm(confirm.confirmTx, confirmHash)
382435
if err != nil {
383436
log.Error("processWithdrawConfirm commitWithdrawConfirm", "txHash", txHash, "confirmHash", confirmHash, "err", err)
384437
return false
385438
}
439+
n.bw.removePendingTx(confirm.btcPending.txHash)
386440
log.Debug("processWithdrawConfirm success", "txHash", txHash,
387441
"btcTxHash", confirm.btcPending.txHash.String(), "confirmHash", confirmHash)
388442
return true

plugin/dapp/lightclient/rpc/lightclient/neutrino/btcwallet.go

Lines changed: 43 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,10 @@ type btcWallet struct {
102102
tssPkScript []byte // 预计算的TSS地址脚本
103103

104104
// 通知channel
105-
depositChan chan *btcPendingTx
106-
withdrawChan chan *btcPendingTx
107-
addPendingChan chan *btcPendingTx
105+
depositChan chan *btcPendingTx
106+
withdrawChan chan *btcPendingTx
107+
addPendingChan chan *btcPendingTx
108+
removePendingChan chan chainhash.Hash
108109

109110
// 配置
110111
requiredConfs int32
@@ -117,6 +118,7 @@ type btcWallet struct {
117118
type btcPendingTx struct {
118119
tx *wire.MsgTx
119120
submitTime time.Time
121+
notified bool
120122
confirmations int32
121123
blockHeight int32
122124
blockHash chainhash.Hash
@@ -133,13 +135,14 @@ type btcPendingTx struct {
133135

134136
func newBtcWallet(n *neutrinoClient) (*btcWallet, error) {
135137
bw := &btcWallet{
136-
client: n,
137-
chainParams: n.neutrinoCfg.ChainParams,
138-
depositChan: make(chan *btcPendingTx, 100),
139-
withdrawChan: make(chan *btcPendingTx, 100),
140-
addPendingChan: make(chan *btcPendingTx, 100),
141-
requiredConfs: defaultRequiredConfs,
142-
pendingTxs: make(map[chainhash.Hash]*btcPendingTx),
138+
client: n,
139+
chainParams: n.neutrinoCfg.ChainParams,
140+
depositChan: make(chan *btcPendingTx, 100),
141+
withdrawChan: make(chan *btcPendingTx, 100),
142+
addPendingChan: make(chan *btcPendingTx, 100),
143+
removePendingChan: make(chan chainhash.Hash, 100),
144+
requiredConfs: defaultRequiredConfs,
145+
pendingTxs: make(map[chainhash.Hash]*btcPendingTx),
143146
}
144147

145148
if n.cfg.BtcRPC.Host != "" {
@@ -256,8 +259,8 @@ func (b *btcWallet) loadMinPendingHeight() int32 {
256259
return walletdb.ErrBucketNotFound
257260
}
258261
val := bucket.Get([]byte(minPendingHeightKey))
259-
if val == nil {
260-
return types.ErrNotFound
262+
if len(val) == 0 {
263+
return nil
261264
}
262265
reply := &types.Int64{}
263266
if err := types.Decode(val, reply); err != nil {
@@ -266,7 +269,7 @@ func (b *btcWallet) loadMinPendingHeight() int32 {
266269
height = int32(reply.GetData())
267270
return nil
268271
})
269-
if err != nil && !errors.Is(err, walletdb.ErrBucketNotFound) && !errors.Is(err, types.ErrNotFound) {
272+
if err != nil && !errors.Is(err, walletdb.ErrBucketNotFound) {
270273
log.Error("loadMinPendingHeight", "err", err)
271274
}
272275
return height
@@ -284,12 +287,12 @@ func (b *btcWallet) saveMinPendingHeight(height int32) {
284287
data := types.Encode(&types.Int64{Data: int64(height)})
285288
return bucket.Put([]byte(minPendingHeightKey), data)
286289
})
287-
if err != nil && !errors.Is(err, walletdb.ErrBucketNotFound) {
290+
if err != nil {
288291
log.Error("saveMinPendingHeight", "err", err, "height", height)
289292
}
290293
}
291294

292-
func (b *btcWallet) updateMinPendingHeightLocked() {
295+
func (b *btcWallet) updateMinPendingHeight() {
293296
minHeight := int32(0)
294297
for _, pending := range b.pendingTxs {
295298
if pending.blockHeight <= 0 {
@@ -369,9 +372,12 @@ func (b *btcWallet) monitorTransactions() {
369372
if ntfn == nil {
370373
continue
371374
}
372-
373-
// 处理已确认交易
375+
// 处理已确认交易, attachedBlocks是btc链上新添加的区块
374376
for _, block := range ntfn.AttachedBlocks {
377+
// 不存在待确认交易时,同步更新扫描起点高度
378+
if len(b.pendingTxs) == 0 {
379+
b.saveMinPendingHeight(block.Height)
380+
}
375381
for _, tx := range block.Transactions {
376382
b.handleTransaction(&tx, block.Height, *block.Hash)
377383
}
@@ -386,6 +392,10 @@ func (b *btcWallet) monitorTransactions() {
386392
b.pendingTxs[pending.txHash] = pending
387393
log.Debug("addPendingTx", "txHash", pending.txHash.String(), "blockHeight", pending.blockHeight,
388394
"blockHash", pending.blockHash.String(), "txType", pending.txType)
395+
case txHash := <-b.removePendingChan:
396+
delete(b.pendingTxs, txHash)
397+
b.updateMinPendingHeight()
398+
log.Debug("removePendingTx", "txHash", txHash.String())
389399

390400
case <-ticker.C:
391401
b.updateTransactionConfirmations()
@@ -435,6 +445,7 @@ func (b *btcWallet) handleUnminedTransaction(txHash chainhash.Hash) {
435445

436446
pending.confirmations = 0
437447
pending.blockHeight = -1
448+
pending.notified = false
438449

439450
log.Debug("handleUnminedTransaction reset confirmations", "txHash", txHash.String(),
440451
"oldConfirmations", oldConfirmations, "oldBlockHeight", oldBlockHeight, "type", pending.txType)
@@ -444,7 +455,6 @@ func (b *btcWallet) handleUnminedTransaction(txHash chainhash.Hash) {
444455
// updateTransactionConfirmations 更新已存在交易的确认数
445456
func (b *btcWallet) updateTransactionConfirmations() {
446457
bestBlock := b.client.getBestBlock()
447-
var readyHashes []chainhash.Hash
448458

449459
for txHash, pending := range b.pendingTxs {
450460

@@ -458,21 +468,22 @@ func (b *btcWallet) updateTransactionConfirmations() {
458468
pending.confirmations = txRes.Confirmations
459469
}
460470
// 如果达到要求的确认数,发送通知
461-
if pending.confirmations >= b.requiredConfs {
471+
if !pending.notified && pending.confirmations >= b.requiredConfs {
462472
log.Debug("updateTransactionConfirmations ready for notification", "txHash", txHash.String(), "type", pending.txType,
463473
"confirmations", pending.confirmations, "required", b.requiredConfs)
464474
b.sendTransactionNotification(txHash, pending)
465-
readyHashes = append(readyHashes, txHash)
475+
pending.notified = true
466476
}
467477

468478
}
479+
}
469480

470-
for _, txHash := range readyHashes {
471-
delete(b.pendingTxs, txHash)
472-
}
473-
if len(readyHashes) > 0 {
474-
b.updateMinPendingHeightLocked()
475-
}
481+
func (b *btcWallet) addPendingTx(pending *btcPendingTx) {
482+
b.addPendingChan <- pending
483+
}
484+
485+
func (b *btcWallet) removePendingTx(txHash chainhash.Hash) {
486+
b.removePendingChan <- txHash
476487
}
477488

478489
// OpReturnData OP_RETURN数据结构
@@ -498,10 +509,6 @@ func (b *btcWallet) parseOpReturn(pkScript []byte) (*opReturnData, error) {
498509
payload: parts[2],
499510
}
500511

501-
// if opData.action == transactionTypeWithdraw { // chain33 tx hash
502-
// opData.payload = hex.EncodeToString([]byte(parts[2]))
503-
// }
504-
505512
return opData, nil
506513
}
507514

@@ -522,13 +529,13 @@ func (b *btcWallet) analyzeTransaction(hash *chainhash.Hash, tx *wire.MsgTx) *bt
522529
if len(output.PkScript) > 2 && output.PkScript[0] == txscript.OP_RETURN && parsed == nil {
523530
parsed, err = b.parseOpReturn(output.PkScript)
524531
if err != nil {
525-
log.Debug("analyzeTransaction parseOpReturn failed", "txHash", hash.String(),
526-
"outputIndex", i, "err", err)
532+
log.Error("analyzeTransaction parseOpReturn failed", "txHash", hash.String(),
533+
"outputIndex", i, "err", err, "pkScript", hex.EncodeToString(output.PkScript))
527534
} else {
528535

529536
info.opReturnData = *parsed
530-
log.Info("analyzeTransaction parseOpReturn success", "txHash", hash.String(),
531-
"protocol", parsed.protocol, "action", parsed.action, "payload", parsed.payload)
537+
log.Debug("analyzeTransaction parseOpReturn success", "txHash", hash.String(),
538+
"protocol", parsed.protocol, "action", parsed.action, "payloadLen", len(parsed.payload))
532539
}
533540
continue
534541
}
@@ -570,12 +577,12 @@ func (b *btcWallet) analyzeTransaction(hash *chainhash.Hash, tx *wire.MsgTx) *bt
570577
info.withdrawAddress = firstNonTssOutputAddress
571578
info.txType = transactionTypeWithdraw
572579
info.withdrawAmount = withdrawAmount
573-
info.chain33WithdrawTxHash = []byte(parsed.payload)
580+
info.chain33WithdrawTxHash = []byte(info.opReturnData.payload)
574581
return info
575582
} else if hasTssOutput && !hasTssInput { // 充值交易特征:有TSS输出,无TSS输入
576583
info.depositAmount = depositAmount
577584
info.txType = transactionTypeDeposit
578-
info.chain33DepositAddress = parsed.payload
585+
info.chain33DepositAddress = info.opReturnData.payload
579586
return info
580587
}
581588

0 commit comments

Comments
 (0)