diff --git a/src/Stratis.FederatedPeg.Features.FederationGateway/Interfaces/IChangeTracker.cs b/src/Stratis.FederatedPeg.Features.FederationGateway/Interfaces/IChangeTracker.cs new file mode 100644 index 00000000..8623c98f --- /dev/null +++ b/src/Stratis.FederatedPeg.Features.FederationGateway/Interfaces/IChangeTracker.cs @@ -0,0 +1,22 @@ +using NBitcoin; + +namespace Stratis.FederatedPeg.Features.FederationGateway.Interfaces +{ + /// + /// Tracks changes made to objects. + /// + public interface IChangeTracker + { + /// + /// Records the object (or part of the object) that was originally read from the database. + /// + /// The object to record the original value of. + void RecordOldValue(IBitcoinSerializable obj); + + /// + /// Instructs the object to record its original value (or part thereof). Typically called after reading it from the database. + /// + /// The object being instructed to record its original value. + void SetOldValue(IBitcoinSerializable obj); + } +} diff --git a/src/Stratis.FederatedPeg.Features.FederationGateway/Interfaces/ICrossChainDB.cs b/src/Stratis.FederatedPeg.Features.FederationGateway/Interfaces/ICrossChainDB.cs new file mode 100644 index 00000000..8631a38f --- /dev/null +++ b/src/Stratis.FederatedPeg.Features.FederationGateway/Interfaces/ICrossChainDB.cs @@ -0,0 +1,19 @@ +using System; +using Stratis.FederatedPeg.Features.FederationGateway.TargetChain; + +namespace Stratis.FederatedPeg.Features.FederationGateway.Interfaces +{ + public interface ICrossChainDB : ICrossChainLookups, IDisposable + { + /// Initializes the cross-chain-transfer store. + void Initialize(); + + /// + /// Creates a for either read or read/write operations. + /// + /// The mode which is either + /// or + /// The object. + CrossChainDBTransaction GetTransaction(CrossChainDBTransactionMode mode = CrossChainDBTransactionMode.Read); + } +} diff --git a/src/Stratis.FederatedPeg.Features.FederationGateway/Interfaces/ICrossChainTransfer.cs b/src/Stratis.FederatedPeg.Features.FederationGateway/Interfaces/ICrossChainTransfer.cs index 4b17fc6b..c06cd334 100644 --- a/src/Stratis.FederatedPeg.Features.FederationGateway/Interfaces/ICrossChainTransfer.cs +++ b/src/Stratis.FederatedPeg.Features.FederationGateway/Interfaces/ICrossChainTransfer.cs @@ -51,6 +51,7 @@ public interface ICrossChainTransfer : IBitcoinSerializable int? BlockHeight { get; } CrossChainTransferStatus Status { get; } + CrossChainTransferStatus? DbStatus { get; } /// /// Depending on the status some fields can't be null. @@ -82,7 +83,14 @@ public interface ICrossChainTransfer : IBitcoinSerializable /// /// Gets the number of sigatures in the first input of the transaction. /// + /// The network associated with the transfer's partial transaction. /// Number of signatures. int GetSignatureCount(Network network); + + /// + /// Used by the store to note down the status as recorded in the database. + /// This allows the store to update its internal deposit-by-status lookup. + /// + void RecordDbStatus(); } } \ No newline at end of file diff --git a/src/Stratis.FederatedPeg.Features.FederationGateway/Interfaces/ICrossChainTransferStore.cs b/src/Stratis.FederatedPeg.Features.FederationGateway/Interfaces/ICrossChainTransferStore.cs index 828180e6..0fc7f329 100644 --- a/src/Stratis.FederatedPeg.Features.FederationGateway/Interfaces/ICrossChainTransferStore.cs +++ b/src/Stratis.FederatedPeg.Features.FederationGateway/Interfaces/ICrossChainTransferStore.cs @@ -3,16 +3,12 @@ using System.Threading.Tasks; using NBitcoin; using Stratis.FederatedPeg.Features.FederationGateway.Models; -using Stratis.FederatedPeg.Features.FederationGateway.TargetChain; namespace Stratis.FederatedPeg.Features.FederationGateway.Interfaces { /// Interface for interacting with the cross-chain transfer database. - public interface ICrossChainTransferStore : IDisposable + public interface ICrossChainTransferStore : IDisposable, ICrossChainDB { - /// Initializes the cross-chain-transfer store. - void Initialize(); - /// Starts the cross-chain-transfer store. void Start(); diff --git a/src/Stratis.FederatedPeg.Features.FederationGateway/Interfaces/IUpdateLookups.cs b/src/Stratis.FederatedPeg.Features.FederationGateway/Interfaces/IUpdateLookups.cs new file mode 100644 index 00000000..b7048c52 --- /dev/null +++ b/src/Stratis.FederatedPeg.Features.FederationGateway/Interfaces/IUpdateLookups.cs @@ -0,0 +1,25 @@ +using System; +using System.Collections.Generic; + +namespace Stratis.FederatedPeg.Features.FederationGateway.Interfaces +{ + /// + /// Provides methods for creating trackers and for updating lookups based on + /// the information contained in the trackers. + /// + public interface ICrossChainLookups + { + /// + /// Creates trackers for recording information on how to update the lookups. + /// + /// Trackers for recording information on how to update the lookups. + Dictionary CreateTrackers(); + + /// + /// Updates lookups based on the changes recorded in the tracker object. + /// + /// Trackers recording information about how to update the lookups. + /// This method is intended be called by the class. + void UpdateLookups(Dictionary trackers); + } +} diff --git a/src/Stratis.FederatedPeg.Features.FederationGateway/TargetChain/CrossChainDB.cs b/src/Stratis.FederatedPeg.Features.FederationGateway/TargetChain/CrossChainDB.cs new file mode 100644 index 00000000..68e09ccd --- /dev/null +++ b/src/Stratis.FederatedPeg.Features.FederationGateway/TargetChain/CrossChainDB.cs @@ -0,0 +1,379 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using DBreeze; +using DBreeze.Utils; +using Microsoft.Extensions.Logging; +using NBitcoin; +using Stratis.Bitcoin.Configuration; +using Stratis.Bitcoin.Utilities; +using Stratis.FederatedPeg.Features.FederationGateway.Interfaces; + +namespace Stratis.FederatedPeg.Features.FederationGateway.TargetChain +{ + /// + /// This class provided the low-level cross-chain database functionality which + /// includes maintaining the various lookups in a transactional manner. + /// Also see . + /// + public class CrossChainDB : ICrossChainDB + { + /// This table contains the cross-chain transfer information. + public const string TransferTableName = "Transfers"; + + /// This table keeps track of the chain tips so that we know exactly what data our transfer table contains. + public const string CommonTableName = "Common"; + + /// Instance logger. + private readonly ILogger logger; + + /// The network. + private readonly Network network; + + /// The chain. + private readonly ConcurrentChain chain; + + /// This contains deposits ids indexed by block hash of the corresponding transaction. + protected readonly Dictionary> depositIdsByBlockHash = new Dictionary>(); + + /// This contains the block heights by block hashes for only the blocks of interest in our chain. + protected readonly Dictionary blockHeightsByBlockHash = new Dictionary(); + + /// This table contains deposits ids by status. + protected readonly Dictionary> depositsIdsByStatus = new Dictionary>(); + + /// The block height on the counter-chain for which the next list of deposits is expected. + public int NextMatureDepositHeight { get; protected set; } + + /// The tip of our chain when we last updated the store. + public ChainedHeader TipHashAndHeight { get; protected set; } + + /// The key of the repository tip in the common table. + public static readonly byte[] RepositoryTipKey = new byte[] { 0 }; + + /// The key of the counter-chain last mature block tip in the common table. + public static readonly byte[] NextMatureTipKey = new byte[] { 1 }; + + /// Access to DBreeze database. + private readonly DBreezeEngine DBreeze; + + /// The DBreeze serializer used to serialize / deserialize objects stored in the DBreeze database. + private readonly DBreezeSerializer DBreezeSerializer; + + /// + /// Constructs the class controlling the underlying DBreeze database. + /// + /// The network type of the transaction recorded in the database. + /// The logger facrtory used to create the logger. + /// The concurrent chain associated with the objects in the database. + /// The datafolder where the database files will be persisted. + /// Used to identify the MultiSigAddress that the database is for. + /// The DBreeze serializer used to serialize / deserialize stored objects. + public CrossChainDB( + Network network, + ILoggerFactory loggerFactory, + ConcurrentChain chain, + DataFolder dataFolder, + IFederationGatewaySettings federationGatewaySettings, + DBreezeSerializer dbreezeSerializer) + { + Guard.NotNull(network, nameof(network)); + Guard.NotNull(loggerFactory, nameof(loggerFactory)); + Guard.NotNull(chain, nameof(chain)); + Guard.NotNull(dataFolder, nameof(dataFolder)); + Guard.NotNull(federationGatewaySettings, nameof(federationGatewaySettings)); + Guard.NotNull(dbreezeSerializer, nameof(dbreezeSerializer)); + + this.network = network; + this.logger = loggerFactory.CreateLogger(this.GetType().FullName); + this.chain = chain; + + Block genesis = network.GetGenesis(); + this.TipHashAndHeight = new ChainedHeader(genesis.Header, genesis.GetHash(), 0); + this.NextMatureDepositHeight = 1; + + // Future-proof store name. + string depositStoreName = "federatedTransfers" + federationGatewaySettings.MultiSigAddress.ToString(); + string folder = Path.Combine(dataFolder.RootPath, depositStoreName); + Directory.CreateDirectory(folder); + this.DBreeze = new DBreezeEngine(folder); + this.DBreezeSerializer = dbreezeSerializer; + + // Initialize tracking deposits by status. + foreach (object status in typeof(CrossChainTransferStatus).GetEnumValues()) + this.depositsIdsByStatus[(CrossChainTransferStatus)status] = new HashSet(); + } + + /// + public void Initialize() + { + using (CrossChainDBTransaction xdbTransaction = this.GetTransaction()) + { + this.LoadTipHashAndHeight(xdbTransaction); + this.LoadNextMatureHeight(xdbTransaction); + this.logger.LogTrace("Loaded TipHashAndHeight {0} and NextMatureDepositHeight {1}.", this.TipHashAndHeight, this.NextMatureDepositHeight); + + // Initialize the lookups. + foreach (ICrossChainTransfer transfer in xdbTransaction.EnumerateTransfers()) + { + this.depositsIdsByStatus[transfer.Status].Add(transfer.DepositTransactionId); + + if (transfer.BlockHash != null && transfer.BlockHeight != null) + { + if (!this.depositIdsByBlockHash.TryGetValue(transfer.BlockHash, out HashSet deposits)) + { + deposits = new HashSet(); + this.depositIdsByBlockHash[transfer.BlockHash] = deposits; + } + + deposits.Add(transfer.DepositTransactionId); + + this.blockHeightsByBlockHash[transfer.BlockHash] = (int)transfer.BlockHeight; + } + } + this.logger.LogTrace("Lookups initialised."); + } + + this.logger.LogTrace("(-)"); + } + + /// + public CrossChainDBTransaction GetTransaction(CrossChainDBTransactionMode mode = CrossChainDBTransactionMode.Read) + { + CrossChainDBTransaction xdbTransaction = new CrossChainDBTransaction(this.DBreeze, this.DBreezeSerializer, (ICrossChainLookups)this, mode); + + this.logger.LogTrace("Transaction '{0}' created for {1}.", xdbTransaction, mode); + + return xdbTransaction; + } + + /// + /// Gets an array of objects corresponding to the array + /// of deposit ids passed to the method. + /// + /// The providing the context for the get. + /// The array of deposit ids. + /// An array of objects or null for non-existing transfers. + protected ICrossChainTransfer[] Get(CrossChainDBTransaction xdbTransaction, uint256[] depositIds) + { + try + { + // To boost performance we will access the deposits sorted by deposit id. + var depositDict = new Dictionary(); + for (int i = 0; i < depositIds.Length; i++) + depositDict[depositIds[i]] = i; + + var byteListComparer = new ByteListComparer(); + List> depositList = depositDict.ToList(); + depositList.Sort((pair1, pair2) => byteListComparer.Compare(pair1.Key.ToBytes(), pair2.Key.ToBytes())); + + var res = new ICrossChainTransfer[depositIds.Length]; + + foreach (KeyValuePair kv in depositList) + { + ICrossChainTransfer crossChainTransfer = xdbTransaction.GetTransfer(kv.Key); + if (crossChainTransfer != null) + res[kv.Value] = crossChainTransfer; + } + + this.logger.LogTrace("Transaction '{0}' read {1} transfers.", xdbTransaction, res.Length); + + return res; + } + catch (Exception err) + { + this.logger.LogError("Transaction '{0}' failed with '{1}'.", xdbTransaction, err.Message); + throw err; + } + } + + /// Persist multiple cross-chain transfer information into the database. + /// The crosschain database transaction context to use. + /// Cross-chain transfers to be inserted. + protected void PutTransfers(CrossChainDBTransaction xdbTransaction, ICrossChainTransfer[] crossChainTransfers) + { + try + { + // Optimal ordering for DB consumption. + var byteListComparer = new ByteListComparer(); + List orderedTransfers = crossChainTransfers.ToList(); + orderedTransfers.Sort((pair1, pair2) => byteListComparer.Compare(pair1.DepositTransactionId.ToBytes(), pair2.DepositTransactionId.ToBytes())); + + // Write each transfer in order. + foreach (ICrossChainTransfer transfer in orderedTransfers) + { + this.logger.LogTrace("Registering transfer: {0}.", transfer); + + xdbTransaction.PutTransfer(transfer); + } + + this.logger.LogTrace("Transaction '{0}' updated {1} transfers.", xdbTransaction, orderedTransfers.Count); + } + catch (Exception err) + { + this.logger.LogError("Transaction '{0}' failed with '{1}'.", xdbTransaction, err.Message); + throw err; + } + } + + /// Rolls back the database if an operation running in the context of a database transaction fails. + /// Database transaction to roll back. + /// Exception to report and re-raise. + /// Short reason/context code of failure. + protected void RollbackAndThrowTransactionError(CrossChainDBTransaction xdbTransaction, Exception exception, string reason = "FAILED_TRANSACTION") + { + this.logger.LogError("Error during database update: {0}", exception.Message); + this.logger.LogTrace("(-):[{0}]", reason); + + xdbTransaction.Rollback(); + throw exception; + } + + /// Loads the tip and hash height. + /// The DBreeze transaction context to use. + /// The hash and height pair. + protected ChainedHeader LoadTipHashAndHeight(CrossChainDBTransaction xdbTransaction) + { + try + { + BlockLocator blockLocator = xdbTransaction.LoadTipHashAndHeight(); + + this.TipHashAndHeight = this.chain.GetBlock(blockLocator.Blocks[0]) ?? this.chain.FindFork(blockLocator); + + this.logger.LogTrace("Transaction '{0}' read TipHashAndHeight {1}.", xdbTransaction, this.TipHashAndHeight); + + return this.TipHashAndHeight; + } + catch (Exception err) + { + this.logger.LogError("Transaction '{0}' failed with '{1}'.", xdbTransaction, err.Message); + throw err; + } + } + + /// Saves the tip and hash height. + /// The crosschain db transaction context to use. + /// The new tip to persist. + protected void SaveTipHashAndHeight(CrossChainDBTransaction xdbTransaction, ChainedHeader newTip) + { + try + { + BlockLocator locator = this.chain.GetBlock(newTip.HashBlock).GetLocator(); + xdbTransaction.SaveTipHashAndHeight(locator); + this.TipHashAndHeight = newTip; + + this.logger.LogTrace("Transaction '{0}' set TipHashAndHeight {1}", xdbTransaction, newTip); + } + catch (Exception err) + { + this.logger.LogError("Transaction '{0}' failed with '{1}'.", xdbTransaction, err.Message); + throw err; + } + } + + /// Loads the counter-chain next mature block height. + /// The crosschain db transaction context to use. + /// The hash and height pair. + protected int LoadNextMatureHeight(CrossChainDBTransaction xdbTransaction) + { + try + { + int height = xdbTransaction.LoadNextMatureHeight() ?? this.NextMatureDepositHeight; + + this.logger.LogTrace("Transaction '{0}' read NextMatureDepositHeight {1}", xdbTransaction, height); + + return height; + } + catch (Exception err) + { + this.logger.LogError("Transaction '{0}' failed with '{1}'.", xdbTransaction, err.Message); + throw err; + } + } + + /// Saves the counter-chain next mature block height. + /// The crosschain db transaction context to use. + /// The next mature block height on the counter-chain. + protected void SaveNextMatureHeight(CrossChainDBTransaction xdbTransaction, int newTip) + { + try + { + xdbTransaction.SaveNextMatureHeight(newTip); + this.NextMatureDepositHeight = newTip; + + this.logger.LogTrace("Transaction '{0}' set NextMatureDepositHeight {1}", newTip); + } + catch (Exception err) + { + this.logger.LogError("Transaction '{0}' failed with '{1}'.", xdbTransaction, err.Message); + throw err; + } + } + + /// + public Dictionary CreateTrackers() + { + var trackers = new Dictionary(); + + trackers[TransferTableName] = new StatusChangeTracker(); + + return trackers; + } + + /// + public void UpdateLookups(Dictionary trackers) + { + StatusChangeTracker tracker = (StatusChangeTracker)trackers[TransferTableName]; + + foreach (uint256 hash in tracker.UniqueBlockHashes()) + { + if (this.depositIdsByBlockHash.ContainsKey(hash)) continue; + this.depositIdsByBlockHash[hash] = new HashSet(); + } + + foreach (KeyValuePair kv in tracker) + { + ICrossChainTransfer transfer = kv.Key; + + if (transfer.DepositHeight == null && transfer.DbStatus != null /* Not new */) + { + // Transfer is being removed. + this.depositsIdsByStatus[transfer.Status].Remove(transfer.DepositTransactionId); + this.depositIdsByBlockHash[transfer.BlockHash].Remove(transfer.DepositTransactionId); + } + else + { + this.TransferStatusUpdated(transfer, tracker[transfer]); + + if (transfer.BlockHash != null && transfer.BlockHeight != null) + { + if (!this.depositIdsByBlockHash[transfer.BlockHash].Contains(transfer.DepositTransactionId)) + this.depositIdsByBlockHash[transfer.BlockHash].Add(transfer.DepositTransactionId); + this.blockHeightsByBlockHash[transfer.BlockHash] = (int)transfer.BlockHeight; + } + } + } + + this.logger.LogTrace("Lookups updated from tracker containing {0} items.", tracker.Count); + } + + /// Updates the status lookup based on a transfer and its previous status. + /// The cross-chain transfer that was updated. + /// The old status. + private void TransferStatusUpdated(ICrossChainTransfer transfer, CrossChainTransferStatus? oldStatus) + { + if (oldStatus != null) + { + this.depositsIdsByStatus[(CrossChainTransferStatus)oldStatus].Remove(transfer.DepositTransactionId); + } + + this.depositsIdsByStatus[transfer.Status].Add(transfer.DepositTransactionId); + } + + public virtual void Dispose() + { + this.DBreeze.Dispose(); + } + } +} diff --git a/src/Stratis.FederatedPeg.Features.FederationGateway/TargetChain/CrossChainDBTransaction.cs b/src/Stratis.FederatedPeg.Features.FederationGateway/TargetChain/CrossChainDBTransaction.cs new file mode 100644 index 00000000..2e4ee446 --- /dev/null +++ b/src/Stratis.FederatedPeg.Features.FederationGateway/TargetChain/CrossChainDBTransaction.cs @@ -0,0 +1,264 @@ +using System; +using System.Collections.Generic; +using DBreeze.DataTypes; +using NBitcoin; +using Stratis.Bitcoin.Utilities; +using Stratis.FederatedPeg.Features.FederationGateway.Interfaces; + +namespace Stratis.FederatedPeg.Features.FederationGateway.TargetChain +{ + /// Supported DBreeze transaction modes. + public enum CrossChainDBTransactionMode + { + Read, + ReadWrite + } + + /// + /// The purpose of this class is to restrict the operations that can be performed on the underlying + /// database - i.e. it provides a "higher level" layer to the underlying DBreeze transaction. + /// As such it provides a guarantee that any transient lookups will be kept in sync with changes + /// to the database. It also handles all required serialization here in one place. + /// + public class CrossChainDBTransaction : IDisposable + { + /// The serializer to use for this transaction. + private readonly DBreezeSerializer dBreezeSerializer; + + /// The underlying DBreeze transaction. + private DBreeze.Transactions.Transaction transaction; + + /// Interface providing control over the updating of transient lookups. + private readonly ICrossChainLookups crossChainLookups; + + /// The mode of the transaction. + private readonly CrossChainDBTransactionMode mode; + + /// Tracking changes allows updating of transient lookups after a successful commit operation. + private Dictionary trackers; + + /// + /// Constructs a transaction object that acts as a wrapper around the database tables. + /// + /// The DBreeze database engine. + /// The DBreeze serializer to use. + /// Interface providing methods to orchestrate the updating of transient lookups. + /// The mode in which to interact with the database. + public CrossChainDBTransaction( + DBreeze.DBreezeEngine dbreeze, + DBreezeSerializer dbreezeSerializer, + ICrossChainLookups updateLookups, + CrossChainDBTransactionMode mode) + { + this.transaction = dbreeze.GetTransaction(); + this.dBreezeSerializer = dbreezeSerializer; + this.crossChainLookups = updateLookups; + this.mode = mode; + + this.transaction.ValuesLazyLoadingIsOn = false; + + if (mode == CrossChainDBTransactionMode.ReadWrite) + { + this.transaction.SynchronizeTables(CrossChainDB.TransferTableName, CrossChainDB.CommonTableName); + } + + this.trackers = updateLookups.CreateTrackers(); + } + + private void Insert(string tableName, TKey key, TObject obj) where TObject : IBitcoinSerializable + { + Guard.Assert(this.mode == CrossChainDBTransactionMode.ReadWrite); + + byte[] keyBytes = this.dBreezeSerializer.Serialize(key); + byte[] objBytes = this.dBreezeSerializer.Serialize(obj); + this.transaction.Insert(tableName, keyBytes, objBytes); + + // If this is a tracked class. + if (this.trackers.TryGetValue(tableName, out IChangeTracker tracker)) + { + // Record the object and its old value. + tracker.RecordOldValue(obj); + } + } + + private bool Select(string tableName, TKey key, out TObject obj) where TObject : IBitcoinSerializable + { + byte[] keyBytes = this.dBreezeSerializer.Serialize(key); + Row row = this.transaction.Select(tableName, keyBytes); + + if (!row.Exists) + { + obj = default(TObject); + return false; + } + + // Temporary workaround until DBreezeSerializer is fixed. + if (typeof(ICrossChainTransfer).IsAssignableFrom(typeof(TObject))) + { + obj = (TObject)Activator.CreateInstance(typeof(CrossChainTransfer)); + obj.ReadWrite(row.Value, this.dBreezeSerializer.Network.Consensus.ConsensusFactory); + } + else + { + obj = this.dBreezeSerializer.Deserialize(row.Value); + } + + // If this is a tracked class. + if (this.trackers.TryGetValue(tableName, out IChangeTracker tracker)) + { + // Set the old value on the object itself so that we can update the lookups if it is changed. + tracker.SetOldValue(obj); + } + + return true; + } + + private IEnumerable SelectForward(string tableName) where TObject : IBitcoinSerializable + { + if (!this.trackers.TryGetValue(tableName, out IChangeTracker tracker)) + tracker = null; + + foreach (Row row in this.transaction.SelectForward(tableName)) + { + TObject obj; + + // Temporary workaround until DBreezeSerializer is fixed. + if (typeof(ICrossChainTransfer).IsAssignableFrom(typeof(TObject))) + { + obj = (TObject)Activator.CreateInstance(typeof(CrossChainTransfer)); + obj.ReadWrite(row.Value, this.dBreezeSerializer.Network.Consensus.ConsensusFactory); + } + else + { + obj = this.dBreezeSerializer.Deserialize(row.Value); + } + + // If this is a tracked class. + if (tracker != null) + { + // Set the old value on the object itself so that we can update the lookups if it is changed. + tracker.SetOldValue(obj); + } + + yield return obj; + } + } + + private void RemoveKey(string tableName, TKey key, TObject obj) where TObject : IBitcoinSerializable + { + Guard.Assert(this.mode == CrossChainDBTransactionMode.ReadWrite); + + byte[] keyBytes = this.dBreezeSerializer.Serialize(key); + this.transaction.RemoveKey(tableName, keyBytes); + + // If this is a tracked class. + if (!this.trackers.TryGetValue(tableName, out IChangeTracker tracker)) + { + // Record the object and its old value. + tracker.RecordOldValue(obj); + } + } + + public ICrossChainTransfer GetTransfer(uint256 depositId) + { + if (!Select(CrossChainDB.TransferTableName, depositId, out CrossChainTransfer crossChainTransfer)) + return null; + + return crossChainTransfer; + } + + public IEnumerable EnumerateTransfers() + { + foreach (ICrossChainTransfer crossChainTransfer in SelectForward(CrossChainDB.TransferTableName)) + yield return crossChainTransfer; + } + + public void PutTransfer(ICrossChainTransfer transfer) + { + Guard.NotNull(transfer, nameof(transfer)); + + // Write the transfer. + Insert(CrossChainDB.TransferTableName, transfer.DepositTransactionId, transfer); + } + + public void DeleteTransfer(ICrossChainTransfer transfer) + { + Guard.NotNull(transfer, nameof(transfer)); + + // Only transfers that exist in the db solely due to being seen in a block will be removed. + Guard.Assert(transfer.DepositHeight == null); + + RemoveKey(CrossChainDB.TransferTableName, transfer.DepositTransactionId, transfer); + } + + public void Commit() + { + Guard.Assert(this.mode == CrossChainDBTransactionMode.ReadWrite); + + this.transaction.Commit(); + this.crossChainLookups.UpdateLookups(this.trackers); + } + + public void Rollback() + { + Guard.Assert(this.mode == CrossChainDBTransactionMode.ReadWrite); + + this.transaction.Rollback(); + } + + public BlockLocator LoadTipHashAndHeight() + { + var blockLocator = new BlockLocator(); + try + { + Row row = this.transaction.Select(CrossChainDB.CommonTableName, CrossChainDB.RepositoryTipKey); + Guard.Assert(row.Exists); + blockLocator.FromBytes(row.Value); + } + catch (Exception) + { + blockLocator.Blocks = new List { this.dBreezeSerializer.Network.GenesisHash }; + } + + return blockLocator; + } + + public void SaveTipHashAndHeight(BlockLocator blockLocator) + { + Guard.Assert(this.mode != CrossChainDBTransactionMode.Read); + + this.transaction.Insert(CrossChainDB.CommonTableName, CrossChainDB.RepositoryTipKey, blockLocator.ToBytes()); + } + + public int? LoadNextMatureHeight() + { + Row row = this.transaction.Select(CrossChainDB.CommonTableName, CrossChainDB.NextMatureTipKey); + + return (row?.Exists ?? false) ? row.Value : (int?)null; + } + + public void SaveNextMatureHeight(int newTip) + { + Guard.Assert(this.mode != CrossChainDBTransactionMode.Read); + + this.transaction.Insert(CrossChainDB.CommonTableName, CrossChainDB.NextMatureTipKey, newTip); + } + + /// A string to identify this transaction by. + /// A concatenation of the creation time and thread id. + public override string ToString() + { + DateTime createdDT = new DateTime(this.transaction.CreatedUdt); + return string.Format("{0}:{1}", createdDT, this.transaction.ManagedThreadId); + } + + public void Dispose() + { + if (this.transaction != null) + { + this.transaction.Dispose(); + this.transaction = null; + } + } + } +} diff --git a/src/Stratis.FederatedPeg.Features.FederationGateway/TargetChain/CrossChainTransfer.cs b/src/Stratis.FederatedPeg.Features.FederationGateway/TargetChain/CrossChainTransfer.cs index 632055d5..d2621cd8 100644 --- a/src/Stratis.FederatedPeg.Features.FederationGateway/TargetChain/CrossChainTransfer.cs +++ b/src/Stratis.FederatedPeg.Features.FederationGateway/TargetChain/CrossChainTransfer.cs @@ -42,6 +42,10 @@ public class CrossChainTransfer : ICrossChainTransfer public CrossChainTransferStatus Status => this.status; private CrossChainTransferStatus status; + /// + public CrossChainTransferStatus? DbStatus => this.dbStatus; + private CrossChainTransferStatus? dbStatus; + /// /// Parameter-less constructor for (de)serialization. /// @@ -212,6 +216,12 @@ public int GetSignatureCount(Network network) return result?.Count(s => s != null) ?? 0; } + /// + public void RecordDbStatus() + { + this.dbStatus = this.status; + } + /// public void CombineSignatures(TransactionBuilder builder, Transaction[] partialTransactions) { @@ -233,5 +243,13 @@ public void SetPartialTransaction(Transaction partialTransaction) { this.partialTransaction = partialTransaction; } + + public override string ToString() + { + return string.Format("DepositId: '{0}', DepositHeight: {1}, Status: '{2}', TransactionHash: '{3}'" + + this.depositTransactionId, this.depositHeight, this.status, this.partialTransaction?.GetHash()) + + ((this.status != CrossChainTransferStatus.SeenInBlock) ? "" : + string.Format(", BlockHeight: {0}, BlockHash: '{1}'", this.blockHeight, this.blockHash)); + } } } diff --git a/src/Stratis.FederatedPeg.Features.FederationGateway/TargetChain/CrossChainTransferStore.cs b/src/Stratis.FederatedPeg.Features.FederationGateway/TargetChain/CrossChainTransferStore.cs index 6ad6f376..cc00a42e 100644 --- a/src/Stratis.FederatedPeg.Features.FederationGateway/TargetChain/CrossChainTransferStore.cs +++ b/src/Stratis.FederatedPeg.Features.FederationGateway/TargetChain/CrossChainTransferStore.cs @@ -1,12 +1,8 @@ using System; using System.Collections.Generic; -using System.IO; using System.Linq; using System.Threading; using System.Threading.Tasks; -using DBreeze; -using DBreeze.DataTypes; -using DBreeze.Utils; using Microsoft.Extensions.Logging; using NBitcoin; using Stratis.Bitcoin; @@ -19,45 +15,13 @@ namespace Stratis.FederatedPeg.Features.FederationGateway.TargetChain { - public class CrossChainTransferStore : ICrossChainTransferStore + public class CrossChainTransferStore : CrossChainDB, ICrossChainTransferStore { - /// This table contains the cross-chain transfer information. - private const string transferTableName = "Transfers"; - - /// This table keeps track of the chain tips so that we know exactly what data our transfer table contains. - private const string commonTableName = "Common"; - // Block batch size for synchronization private const int synchronizationBatchSize = 1000; - /// This contains deposits ids indexed by block hash of the corresponding transaction. - private readonly Dictionary> depositIdsByBlockHash = new Dictionary>(); - - /// This contains the block heights by block hashes for only the blocks of interest in our chain. - private readonly Dictionary blockHeightsByBlockHash = new Dictionary(); - - /// This table contains deposits ids by status. - private readonly Dictionary> depositsIdsByStatus = new Dictionary>(); - - /// - public int NextMatureDepositHeight { get; private set; } - - /// - public ChainedHeader TipHashAndHeight { get; private set; } - - /// The key of the repository tip in the common table. - private static readonly byte[] RepositoryTipKey = new byte[] { 0 }; - - /// The key of the counter-chain last mature block tip in the common table. - private static readonly byte[] NextMatureTipKey = new byte[] { 1 }; - /// Instance logger. private readonly ILogger logger; - - /// Access to DBreeze database. - private readonly DBreezeEngine DBreeze; - - private readonly DBreezeSerializer dBreezeSerializer; private readonly Network network; private readonly ConcurrentChain chain; private readonly IWithdrawalExtractor withdrawalExtractor; @@ -68,17 +32,18 @@ public class CrossChainTransferStore : ICrossChainTransferStore private readonly IFederationGatewaySettings federationGatewaySettings; /// Provider of time functions. + private readonly IDateTimeProvider dateTimeProvider; private readonly object lockObj; public CrossChainTransferStore(Network network, DataFolder dataFolder, ConcurrentChain chain, IFederationGatewaySettings settings, IDateTimeProvider dateTimeProvider, ILoggerFactory loggerFactory, IWithdrawalExtractor withdrawalExtractor, IFullNode fullNode, IBlockRepository blockRepository, - IFederationWalletManager federationWalletManager, IFederationWalletTransactionHandler federationWalletTransactionHandler, DBreezeSerializer dBreezeSerializer) + IFederationWalletManager federationWalletManager, IFederationWalletTransactionHandler federationWalletTransactionHandler, DBreezeSerializer dbreezeSerializer) + : base(network, loggerFactory, chain, dataFolder, settings, dbreezeSerializer) { Guard.NotNull(network, nameof(network)); Guard.NotNull(dataFolder, nameof(dataFolder)); Guard.NotNull(chain, nameof(chain)); Guard.NotNull(settings, nameof(settings)); - Guard.NotNull(dateTimeProvider, nameof(dateTimeProvider)); Guard.NotNull(loggerFactory, nameof(loggerFactory)); Guard.NotNull(withdrawalExtractor, nameof(withdrawalExtractor)); Guard.NotNull(fullNode, nameof(fullNode)); @@ -93,58 +58,11 @@ public CrossChainTransferStore(Network network, DataFolder dataFolder, Concurren this.federationWalletTransactionHandler = federationWalletTransactionHandler; this.federationGatewaySettings = settings; this.withdrawalExtractor = withdrawalExtractor; - this.dBreezeSerializer = dBreezeSerializer; this.lockObj = new object(); this.logger = loggerFactory.CreateLogger(this.GetType().FullName); this.TipHashAndHeight = this.chain.GetBlock(0); this.NextMatureDepositHeight = 1; this.cancellation = new CancellationTokenSource(); - - // Future-proof store name. - string depositStoreName = "federatedTransfers" + settings.MultiSigAddress.ToString(); - string folder = Path.Combine(dataFolder.RootPath, depositStoreName); - Directory.CreateDirectory(folder); - this.DBreeze = new DBreezeEngine(folder); - - // Initialize tracking deposits by status. - foreach (object status in typeof(CrossChainTransferStatus).GetEnumValues()) - this.depositsIdsByStatus[(CrossChainTransferStatus)status] = new HashSet(); - } - - /// Performs any needed initialisation for the database. - public void Initialize() - { - lock (this.lockObj) - { - using (DBreeze.Transactions.Transaction dbreezeTransaction = this.DBreeze.GetTransaction()) - { - dbreezeTransaction.ValuesLazyLoadingIsOn = false; - - this.LoadTipHashAndHeight(dbreezeTransaction); - this.LoadNextMatureHeight(dbreezeTransaction); - - // Initialize the lookups. - foreach (Row transferRow in dbreezeTransaction.SelectForward(transferTableName)) - { - var transfer = new CrossChainTransfer(); - transfer.FromBytes(transferRow.Value, this.network.Consensus.ConsensusFactory); - this.depositsIdsByStatus[transfer.Status].Add(transfer.DepositTransactionId); - - if (transfer.BlockHash != null && transfer.BlockHeight != null) - { - if (!this.depositIdsByBlockHash.TryGetValue(transfer.BlockHash, out HashSet deposits)) - { - deposits = new HashSet(); - this.depositIdsByBlockHash[transfer.BlockHash] = deposits; - } - - deposits.Add(transfer.DepositTransactionId); - - this.blockHeightsByBlockHash[transfer.BlockHash] = (int)transfer.BlockHeight; - } - } - } - } } /// Starts the cross-chain-transfer store. @@ -159,11 +77,15 @@ public void Start() // suspended due to the missing wallet transactions which will rewind the counter- // chain tip to then reprocess them. if (this.federationWalletManager.RemoveTransientTransactions()) + { + this.logger.LogTrace("Unseen transactions have been removed from the wallet."); this.federationWalletManager.SaveWallet(); + } Guard.Assert(this.Synchronize()); - // Any transactions seen in blocks must also be present in the wallet. + this.logger.LogTrace("Adding any missing but seen transactions to wallet."); + FederationWallet wallet = this.federationWalletManager.GetWallet(); ICrossChainTransfer[] transfers = this.GetTransfersByStatus(new[] { CrossChainTransferStatus.SeenInBlock }, true, false).ToArray(); foreach (ICrossChainTransfer transfer in transfers) @@ -240,6 +162,9 @@ private ICrossChainTransfer[] ValidateCrossChainTransfers(ICrossChainTransfer[] if (CrossChainTransfer.TemplatesMatch(this.network, walletTran, partialTransfer.PartialTransaction)) { + this.logger.LogTrace("Could not find transaction by hash {0} but found it by matching template.", partialTransfer.PartialTransaction.GetHash()); + this.logger.LogTrace("Will update transfer with wallet transaction {0}.", walletTran.GetHash()); + partialTransfer.SetPartialTransaction(walletTran); if (walletData[0].Item2.BlockHeight != null) @@ -253,47 +178,45 @@ private ICrossChainTransfer[] ValidateCrossChainTransfers(ICrossChainTransfer[] } } - // Remove any invalid withdrawal transactions. - foreach (IWithdrawal withdrawal in walletData.Select(d => d.Item3)) - this.federationWalletManager.RemoveTransientTransactions(withdrawal.DepositId); - // The chain may have been rewound so that this transaction or its UTXO's have been lost. // Rewind our recorded chain A tip to ensure the transaction is re-built once UTXO's become available. if (partialTransfer.DepositHeight < newChainATip) - newChainATip = partialTransfer.DepositHeight ?? newChainATip; + { + newChainATip = (int)partialTransfer.DepositHeight; + + this.logger.LogTrace("Will rewind NextMatureDepositHeight due to suspended deposit {0} at height {1}.", + partialTransfer.DepositTransactionId, newChainATip); + } tracker.SetTransferStatus(partialTransfer, CrossChainTransferStatus.Suspended); } + // Exit if nothing to do. if (tracker.Count == 0) { this.logger.LogTrace("(-)[NO_CHANGES_IN_TRACKER]"); return crossChainTransfers; } - using (DBreeze.Transactions.Transaction dbreezeTransaction = this.DBreeze.GetTransaction()) + using (CrossChainDBTransaction xdbTransaction = this.GetTransaction(CrossChainDBTransactionMode.ReadWrite)) { - dbreezeTransaction.SynchronizeTables(transferTableName, commonTableName); - int oldChainATip = this.NextMatureDepositHeight; try { - foreach (KeyValuePair kv in tracker) - { - this.PutTransfer(dbreezeTransaction, kv.Key); - } + this.PutTransfers(xdbTransaction, tracker.Select(kv => kv.Key).ToArray()); + this.SaveNextMatureHeight(xdbTransaction, newChainATip); - this.SaveNextMatureHeight(dbreezeTransaction, newChainATip); - dbreezeTransaction.Commit(); - this.UpdateLookups(tracker); + xdbTransaction.Commit(); + + bool walletUpdated = false; // Remove any remnants of suspended transactions from the wallet. foreach (KeyValuePair kv in tracker) { if (kv.Value == CrossChainTransferStatus.Suspended) { - this.federationWalletManager.RemoveTransientTransactions(kv.Key.DepositTransactionId); + walletUpdated |= this.federationWalletManager.RemoveTransientTransactions(kv.Key.DepositTransactionId); } } @@ -302,11 +225,12 @@ private ICrossChainTransfer[] ValidateCrossChainTransfers(ICrossChainTransfer[] { if (t.Item3.BlockNumber >= newChainATip) { - this.federationWalletManager.RemoveTransientTransactions(t.Item3.DepositId); + walletUpdated |= this.federationWalletManager.RemoveTransientTransactions(t.Item3.DepositId); } } - this.federationWalletManager.SaveWallet(); + if (walletUpdated) + this.federationWalletManager.SaveWallet(); return crossChainTransfers; } @@ -315,7 +239,7 @@ private ICrossChainTransfer[] ValidateCrossChainTransfers(ICrossChainTransfer[] // Restore expected store state in case the calling code retries / continues using the store. this.NextMatureDepositHeight = oldChainATip; - this.RollbackAndThrowTransactionError(dbreezeTransaction, err, "SANITY_ERROR"); + this.RollbackAndThrowTransactionError(xdbTransaction, err, "SANITY_ERROR"); // Dummy return as the above method throws. Avoids compiler error. return null; @@ -341,23 +265,29 @@ private Transaction BuildDeterministicTransaction(uint256 depositId, uint blockT Shuffle = false, IgnoreVerify = true, WalletPassword = walletPassword, - Sign = sign, + Sign = sign }; - Transaction transaction = this.federationWalletTransactionHandler.BuildTransaction(multiSigContext); - // Build the transaction. - if (this.network.Consensus.IsProofOfStake) + Transaction transaction = this.federationWalletTransactionHandler.BuildTransaction(multiSigContext); + if (transaction == null) { - transaction.Time = blockTime; - - if (sign) + this.logger.LogTrace("Failed to create deterministic transaction."); + } + else + { + if (this.network.Consensus.IsProofOfStake) { - transaction = multiSigContext.TransactionBuilder.SignTransaction(transaction); + transaction.Time = blockTime; + + if (sign) + { + transaction = multiSigContext.TransactionBuilder.SignTransaction(transaction); + } } - } - this.logger.LogInformation("transaction = {0}", transaction.ToString(this.network, RawFormat.BlockExplorer)); + this.logger.LogInformation("transaction = {0}", transaction.ToString(this.network, RawFormat.BlockExplorer)); + } return transaction; } @@ -370,18 +300,6 @@ private Transaction BuildDeterministicTransaction(uint256 depositId, uint blockT return null; } - /// Rolls back the database if an operation running in the context of a database transaction fails. - /// Database transaction to roll back. - /// Exception to report and re-raise. - /// Short reason/context code of failure. - private void RollbackAndThrowTransactionError(DBreeze.Transactions.Transaction dbreezeTransaction, Exception exception, string reason = "FAILED_TRANSACTION") - { - this.logger.LogError("Error during database update: {0}, reason: {1}", exception.Message, reason); - - dbreezeTransaction.Rollback(); - throw exception; - } - /// public Task SaveCurrentTipAsync() { @@ -389,11 +307,10 @@ public Task SaveCurrentTipAsync() { lock (this.lockObj) { - using (DBreeze.Transactions.Transaction dbreezeTransaction = this.DBreeze.GetTransaction()) + using (CrossChainDBTransaction xdbTransaction = this.GetTransaction(CrossChainDBTransactionMode.ReadWrite)) { - dbreezeTransaction.SynchronizeTables(transferTableName, commonTableName); - this.SaveNextMatureHeight(dbreezeTransaction, this.NextMatureDepositHeight); - dbreezeTransaction.Commit(); + xdbTransaction.SaveNextMatureHeight(this.NextMatureDepositHeight); + xdbTransaction.Commit(); } } }); @@ -409,6 +326,8 @@ public Task RecordLatestMatureDepositsAsync(IList RecordLatestMatureDepositsAsync(IList RecordLatestMatureDepositsAsync(IList RecordLatestMatureDepositsAsync(IList RecordLatestMatureDepositsAsync(IList kv in tracker) - { - this.PutTransfer(dbreezeTransaction, kv.Key); - } + this.PutTransfers(xdbTransaction, tracker.Select(kv => kv.Key).ToArray()); // Ensure we get called for a retry by NOT advancing the chain A tip if the block // contained any suspended transfers. if (!haveSuspendedTransfers) { - this.SaveNextMatureHeight(dbreezeTransaction, this.NextMatureDepositHeight + 1); + this.SaveNextMatureHeight(xdbTransaction, this.NextMatureDepositHeight + 1); } - dbreezeTransaction.Commit(); - this.UpdateLookups(tracker); + xdbTransaction.Commit(); } catch (Exception err) { this.logger.LogError("An error occurred when processing deposits {0}", err); + this.logger.LogTrace("Undoing reserved UTXOs."); - // Undo reserved UTXO's. if (walletUpdated) { foreach (KeyValuePair kv in tracker) @@ -556,11 +472,13 @@ public Task RecordLatestMatureDepositsAsync(IList MergeTransactionSignaturesAsync(uint256 depositId, Tran { lock (this.lockObj) { + this.logger.LogTrace("()"); this.Synchronize(); - this.logger.LogInformation("ValidateCrossChainTransfers : {0}", depositId); + this.logger.LogInformation("Get and ValidateCrossChainTransfers : {0}", depositId); ICrossChainTransfer transfer = this.ValidateCrossChainTransfers(this.Get(new[] { depositId })).FirstOrDefault(); if (transfer == null) { - this.logger.LogInformation("FAILED ValidateCrossChainTransfers : {0}", depositId); - - this.logger.LogTrace("(-)[MERGE_NOT_FOUND]:null"); + this.logger.LogInformation("FAILED Get and ValidateCrossChainTransfers : {0}", depositId); + this.logger.LogTrace("(-)[MERGE_NOTFOUND]"); return null; } @@ -604,32 +522,25 @@ public Task MergeTransactionSignaturesAsync(uint256 depositId, Tran if (transfer.PartialTransaction.GetHash() == oldTransaction.GetHash()) { this.logger.LogInformation("FAILED to combineSignatures : {0}", transfer.DepositTransactionId); - this.logger.LogTrace("(-)[MERGE_UNCHANGED]"); return transfer.PartialTransaction; } - using (DBreeze.Transactions.Transaction dbreezeTransaction = this.DBreeze.GetTransaction()) + using (CrossChainDBTransaction xdbTransaction = this.GetTransaction(CrossChainDBTransactionMode.ReadWrite)) { try { - dbreezeTransaction.SynchronizeTables(transferTableName, commonTableName); - this.federationWalletManager.ProcessTransaction(transfer.PartialTransaction); this.federationWalletManager.SaveWallet(); - if (this.ValidateTransaction(transfer.PartialTransaction, true)) + if (ValidateTransaction(transfer.PartialTransaction, true)) { this.logger.LogInformation("Deposit: {0} collected enough signatures and is FullySigned", transfer.DepositTransactionId); transfer.SetStatus(CrossChainTransferStatus.FullySigned); } - this.PutTransfer(dbreezeTransaction, transfer); - dbreezeTransaction.Commit(); - - // Do this last to maintain DB integrity. We are assuming that this won't throw. - this.logger.LogInformation("Deposit: {0} did not collected enough signatures and is Partial", transfer.DepositTransactionId); - this.TransferStatusUpdated(transfer, CrossChainTransferStatus.Partial); + xdbTransaction.PutTransfer(transfer); + xdbTransaction.Commit(); } catch (Exception err) { @@ -639,10 +550,11 @@ public Task MergeTransactionSignaturesAsync(uint256 depositId, Tran transfer.SetPartialTransaction(oldTransaction); this.federationWalletManager.ProcessTransaction(oldTransaction); this.federationWalletManager.SaveWallet(); - this.RollbackAndThrowTransactionError(dbreezeTransaction, err, "MERGE_ERROR"); + this.RollbackAndThrowTransactionError(xdbTransaction, err, "MERGE_ERROR"); } - return transfer.PartialTransaction; + this.logger.LogTrace("(-)"); + return transfer?.PartialTransaction; } } }); @@ -656,105 +568,106 @@ public Task MergeTransactionSignaturesAsync(uint256 depositId, Tran /// The blocks used to update the store. Must be sorted by ascending height leading up to the new tip. private void Put(List blocks) { + this.logger.LogTrace("Putting {0} blocks.", blocks.Count); + if (blocks.Count == 0) this.logger.LogTrace("(-)[NO_BLOCKS]:0"); Dictionary transferLookup; Dictionary allWithdrawals; - - int blockHeight = this.TipHashAndHeight.Height + 1; - var allDepositIds = new HashSet(); - - allWithdrawals = new Dictionary(); - foreach (Block block in blocks) { - IReadOnlyList blockWithdrawals = this.withdrawalExtractor.ExtractWithdrawalsFromBlock(block, blockHeight++); - allDepositIds.UnionWith(blockWithdrawals.Select(d => d.DepositId).ToArray()); - allWithdrawals[block.GetHash()] = blockWithdrawals.ToArray(); - } + int blockHeight = this.TipHashAndHeight.Height + 1; + var allDepositIds = new HashSet(); - // Nothing to do? - if (allDepositIds.Count == 0) - { - // Exiting here and saving the tip after the sync. - this.TipHashAndHeight = this.chain.GetBlock(blocks.Last().GetHash()); + allWithdrawals = new Dictionary(); + foreach (Block block in blocks) + { + IReadOnlyList blockWithdrawals = this.withdrawalExtractor.ExtractWithdrawalsFromBlock(block, blockHeight++); + allDepositIds.UnionWith(blockWithdrawals.Select(d => d.DepositId).ToArray()); + allWithdrawals[block.GetHash()] = blockWithdrawals.ToArray(); + } - this.logger.LogTrace("(-)[NO_DEPOSIT_IDS]"); - return; - } + // Nothing to do? + if (allDepositIds.Count == 0) + { + // Exiting here and saving the tip after the sync. + this.TipHashAndHeight = this.chain.GetBlock(blocks.Last().GetHash()); - // Create transfer lookup by deposit Id. - uint256[] uniqueDepositIds = allDepositIds.ToArray(); - ICrossChainTransfer[] uniqueTransfers = this.Get(uniqueDepositIds); - transferLookup = new Dictionary(); - for (int i = 0; i < uniqueDepositIds.Length; i++) - transferLookup[uniqueDepositIds[i]] = uniqueTransfers[i]; + this.logger.LogTrace("(-)[NO_DEPOSIT_IDS]"); + return; + } + // Create transfer lookup by deposit Id. + uint256[] uniqueDepositIds = allDepositIds.ToArray(); + ICrossChainTransfer[] uniqueTransfers = this.Get(uniqueDepositIds); + transferLookup = new Dictionary(); + for (int i = 0; i < uniqueDepositIds.Length; i++) + transferLookup[uniqueDepositIds[i]] = uniqueTransfers[i]; + } - // Only create a transaction if there is important work to do. - using (DBreeze.Transactions.Transaction dbreezeTransaction = this.DBreeze.GetTransaction()) + // Find transfer transactions in blocks + foreach (Block block in blocks) { - dbreezeTransaction.SynchronizeTables(transferTableName, commonTableName); - - ChainedHeader prevTip = this.TipHashAndHeight; + // First check the database to see if we already know about these deposits. + IWithdrawal[] withdrawals = allWithdrawals[block.GetHash()].ToArray(); + ICrossChainTransfer[] crossChainTransfers = withdrawals.Select(d => transferLookup[d.DepositId]).ToArray(); - try + // Update the information about these deposits or record their status. + for (int i = 0; i < crossChainTransfers.Length; i++) { - var tracker = new StatusChangeTracker(); - - // Find transfer transactions in blocks - foreach (Block block in blocks) - { - // First check the database to see if we already know about these deposits. - IWithdrawal[] withdrawals = allWithdrawals[block.GetHash()].ToArray(); - ICrossChainTransfer[] crossChainTransfers = withdrawals.Select(d => transferLookup[d.DepositId]).ToArray(); + IWithdrawal withdrawal = withdrawals[i]; + Transaction transaction = block.Transactions.Single(t => t.GetHash() == withdrawal.Id); - // Update the information about these deposits or record their status. - for (int i = 0; i < crossChainTransfers.Length; i++) - { - IWithdrawal withdrawal = withdrawals[i]; - Transaction transaction = block.Transactions.Single(t => t.GetHash() == withdrawal.Id); + // Ensure that the wallet is in step. + this.federationWalletManager.ProcessTransaction(transaction, withdrawal.BlockNumber, block); - // Ensure that the wallet is in step. - this.federationWalletManager.ProcessTransaction(transaction, withdrawal.BlockNumber, block); + if (crossChainTransfers[i] == null) + { + Script scriptPubKey = BitcoinAddress.Create(withdrawal.TargetAddress, this.network).ScriptPubKey; - if (crossChainTransfers[i] == null) - { - Script scriptPubKey = BitcoinAddress.Create(withdrawal.TargetAddress, this.network).ScriptPubKey; + crossChainTransfers[i] = new CrossChainTransfer(CrossChainTransferStatus.SeenInBlock, withdrawal.DepositId, + scriptPubKey, withdrawal.Amount, null, transaction, withdrawal.BlockHash, withdrawal.BlockNumber); - crossChainTransfers[i] = new CrossChainTransfer(CrossChainTransferStatus.SeenInBlock, withdrawal.DepositId, - scriptPubKey, withdrawal.Amount, null, transaction, withdrawal.BlockHash, withdrawal.BlockNumber); + transferLookup[crossChainTransfers[i].DepositTransactionId] = crossChainTransfers[i]; + } + else + { + crossChainTransfers[i].SetPartialTransaction(transaction); + crossChainTransfers[i].SetStatus(CrossChainTransferStatus.SeenInBlock, withdrawal.BlockHash, withdrawal.BlockNumber); + } + } + } - tracker.SetTransferStatus(crossChainTransfers[i]); - } - else - { - crossChainTransfers[i].SetPartialTransaction(transaction); + // Only create a transaction if there is work to do. + if (transferLookup.Count == 0) + { + this.logger.LogTrace("(-)[NOTHING_TO_DO]"); + return; + } - tracker.SetTransferStatus(crossChainTransfers[i], - CrossChainTransferStatus.SeenInBlock, withdrawal.BlockHash, withdrawal.BlockNumber); - } - } - } + using (CrossChainDBTransaction xdbTransaction = this.GetTransaction(CrossChainDBTransactionMode.ReadWrite)) + { + ChainedHeader prevTip = this.TipHashAndHeight; + try + { // Write transfers. - this.PutTransfers(dbreezeTransaction, tracker.Keys.ToArray()); + this.PutTransfers(xdbTransaction, transferLookup.Select(x => x.Value).ToArray()); // Commit additions ChainedHeader newTip = this.chain.GetBlock(blocks.Last().GetHash()); - this.SaveTipHashAndHeight(dbreezeTransaction, newTip); - dbreezeTransaction.Commit(); - - // Update the lookups last to ensure store integrity. - this.UpdateLookups(tracker); + this.SaveTipHashAndHeight(xdbTransaction, newTip); + xdbTransaction.Commit(); } catch (Exception err) { // Restore expected store state in case the calling code retries / continues using the store. this.TipHashAndHeight = prevTip; - this.RollbackAndThrowTransactionError(dbreezeTransaction, err, "PUT_ERROR"); + this.RollbackAndThrowTransactionError(xdbTransaction, err, "PUT_ERROR"); } } + + this.logger.LogTrace("(-):{0}", blocks.Count); } /// @@ -773,9 +686,13 @@ private bool RewindIfRequired() return false; } + this.logger.LogTrace("Rewinding."); + // We are dependent on the wallet manager having dealt with any fork by now. if (this.chain.GetBlock(tipToChase.Hash) == null) { + this.logger.LogTrace("The wallet tip is not found in the chain. Rewinding on behalf of wallet."); + ICollection locators = this.federationWalletManager.GetWallet().BlockLocator; var blockLocator = new BlockLocator { Blocks = locators.ToList() }; ChainedHeader fork = this.chain.FindFork(blockLocator); @@ -787,6 +704,8 @@ private bool RewindIfRequired() if (this.TipHashAndHeight != null && (this.TipHashAndHeight.Height > tipToChase.Height || this.chain.GetBlock(this.TipHashAndHeight.HashBlock)?.Height != this.TipHashAndHeight.Height)) { + this.logger.LogTrace("The chain does not contain our tip."); + // We are ahead of the current chain or on the wrong chain. ChainedHeader fork = this.chain.FindFork(this.TipHashAndHeight.GetLocator()) ?? this.chain.GetBlock(0); @@ -794,33 +713,33 @@ private bool RewindIfRequired() while (fork.Height > tipToChase.Height) fork = fork.Previous; - using (DBreeze.Transactions.Transaction dbreezeTransaction = this.DBreeze.GetTransaction()) - { - dbreezeTransaction.SynchronizeTables(transferTableName, commonTableName); - dbreezeTransaction.ValuesLazyLoadingIsOn = false; + this.logger.LogTrace("Fork height determined to be {0}", fork.Height); + using (CrossChainDBTransaction xdbTransaction = this.GetTransaction(CrossChainDBTransactionMode.ReadWrite)) + { ChainedHeader prevTip = this.TipHashAndHeight; try { - StatusChangeTracker tracker = this.OnDeleteBlocks(dbreezeTransaction, fork.Height); - this.SaveTipHashAndHeight(dbreezeTransaction, fork); - dbreezeTransaction.Commit(); - this.UndoLookups(tracker); + this.OnDeleteBlocks(xdbTransaction, fork.Height); + this.SaveTipHashAndHeight(xdbTransaction, fork); + xdbTransaction.Commit(); } catch (Exception err) { // Restore expected store state in case the calling code retries / continues using the store. this.TipHashAndHeight = prevTip; - this.RollbackAndThrowTransactionError(dbreezeTransaction, err, "REWIND_ERROR"); + this.RollbackAndThrowTransactionError(xdbTransaction, err, "REWIND_ERROR"); } } this.ValidateCrossChainTransfers(); + this.logger.LogTrace("(-):true"); return true; } // Indicate that we are behind the current chain. + this.logger.LogTrace("(-):false"); return false; } @@ -830,6 +749,8 @@ private bool Synchronize() { lock (this.lockObj) { + this.logger.LogTrace("Synchronizing."); + HashHeightPair tipToChase = this.TipToChase(); if (tipToChase.Hash == this.TipHashAndHeight.HashBlock) { @@ -850,19 +771,19 @@ private bool Synchronize() if (this.SynchronizeBatch()) { - using (DBreeze.Transactions.Transaction dbreezeTransaction = this.DBreeze.GetTransaction()) + using (CrossChainDBTransaction xdbTransaction = this.GetTransaction(CrossChainDBTransactionMode.ReadWrite)) { - dbreezeTransaction.SynchronizeTables(transferTableName, commonTableName); - - this.SaveTipHashAndHeight(dbreezeTransaction, this.TipHashAndHeight); + this.SaveTipHashAndHeight(xdbTransaction, this.TipHashAndHeight); - dbreezeTransaction.Commit(); + xdbTransaction.Commit(); } + this.logger.LogTrace("(-):true"); return true; } } + this.logger.LogTrace("(-):false"); return false; } } @@ -890,11 +811,15 @@ private bool SynchronizeBatch() break; } + this.logger.LogTrace("Attempting to synchronize a batch of {0} blocks.", batchSize); + List blocks = this.blockRepository.GetBlocksAsync(blockHashes).GetAwaiter().GetResult(); int availableBlocks = blocks.FindIndex(b => (b == null)); if (availableBlocks < 0) availableBlocks = blocks.Count; + this.logger.LogTrace("Available blocks are {0}", availableBlocks); + if (availableBlocks > 0) { Block lastBlock = blocks[availableBlocks - 1]; @@ -904,61 +829,10 @@ private bool SynchronizeBatch() bool done = availableBlocks < synchronizationBatchSize; + this.logger.LogTrace("(-):{0}", done); return done; } - /// Loads the tip and hash height. - /// The DBreeze transaction context to use. - /// The hash and height pair. - private ChainedHeader LoadTipHashAndHeight(DBreeze.Transactions.Transaction dbreezeTransaction) - { - var blockLocator = new BlockLocator(); - try - { - Row row = dbreezeTransaction.Select(commonTableName, RepositoryTipKey); - Guard.Assert(row.Exists); - blockLocator.FromBytes(row.Value); - } - catch (Exception) - { - blockLocator.Blocks = new List { this.network.GenesisHash }; - } - - this.TipHashAndHeight = this.chain.GetBlock(blockLocator.Blocks[0]) ?? this.chain.FindFork(blockLocator); - return this.TipHashAndHeight; - } - - /// Saves the tip and hash height. - /// The DBreeze transaction context to use. - /// The new tip to persist. - private void SaveTipHashAndHeight(DBreeze.Transactions.Transaction dbreezeTransaction, ChainedHeader newTip) - { - BlockLocator locator = this.chain.Tip.GetLocator(); - this.TipHashAndHeight = newTip; - dbreezeTransaction.Insert(commonTableName, RepositoryTipKey, locator.ToBytes()); - } - - /// Loads the counter-chain next mature block height. - /// The DBreeze transaction context to use. - /// The hash and height pair. - private int LoadNextMatureHeight(DBreeze.Transactions.Transaction dbreezeTransaction) - { - Row row = dbreezeTransaction.Select(commonTableName, NextMatureTipKey); - if (row.Exists) - this.NextMatureDepositHeight = row.Value; - - return this.NextMatureDepositHeight; - } - - /// Saves the counter-chain next mature block height. - /// The DBreeze transaction context to use. - /// The next mature block height on the counter-chain. - private void SaveNextMatureHeight(DBreeze.Transactions.Transaction dbreezeTransaction, int newTip) - { - this.NextMatureDepositHeight = newTip; - dbreezeTransaction.Insert(commonTableName, NextMatureTipKey, this.NextMatureDepositHeight); - } - /// public Task GetAsync(uint256[] depositIds) { @@ -967,48 +841,17 @@ public Task GetAsync(uint256[] depositIds) this.Synchronize(); ICrossChainTransfer[] res = this.ValidateCrossChainTransfers(this.Get(depositIds)); + return res; }); } - private ICrossChainTransfer[] Get(uint256[] depositId) + private ICrossChainTransfer[] Get(uint256[] depositIds) { - using (DBreeze.Transactions.Transaction dbreezeTransaction = this.DBreeze.GetTransaction()) - { - dbreezeTransaction.ValuesLazyLoadingIsOn = false; - - return this.Get(dbreezeTransaction, depositId); - } - } - - private CrossChainTransfer[] Get(DBreeze.Transactions.Transaction transaction, uint256[] depositId) - { - Guard.NotNull(depositId, nameof(depositId)); - - // To boost performance we will access the deposits sorted by deposit id. - var depositDict = new Dictionary(); - for (int i = 0; i < depositId.Length; i++) - depositDict[depositId[i]] = i; - - var byteListComparer = new ByteListComparer(); - List> depositList = depositDict.ToList(); - depositList.Sort((pair1, pair2) => byteListComparer.Compare(pair1.Key.ToBytes(), pair2.Key.ToBytes())); - - var res = new CrossChainTransfer[depositId.Length]; - - foreach (KeyValuePair kv in depositList) + using (CrossChainDBTransaction xdbTransaction = this.GetTransaction(CrossChainDBTransactionMode.Read)) { - Row transferRow = transaction.Select(transferTableName, kv.Key.ToBytes()); - - if (transferRow.Exists) - { - var crossChainTransfer = new CrossChainTransfer(); - crossChainTransfer.FromBytes(transferRow.Value, this.network.Consensus.ConsensusFactory); - res[kv.Value] = crossChainTransfer; - } + return this.Get(xdbTransaction, depositIds); } - - return res; } private OutPoint EarliestOutput(Transaction transaction) @@ -1056,55 +899,12 @@ public Task> GetTransactionsByStatusAsync(Cross }); } - /// Persist the cross-chain transfer information into the database. - /// The DBreeze transaction context to use. - /// Cross-chain transfer information to be inserted. - private void PutTransfer(DBreeze.Transactions.Transaction dbreezeTransaction, ICrossChainTransfer crossChainTransfer) - { - Guard.NotNull(crossChainTransfer, nameof(crossChainTransfer)); - - byte[] crossChainTransferBytes = this.dBreezeSerializer.Serialize(crossChainTransfer); - - dbreezeTransaction.Insert(transferTableName, crossChainTransfer.DepositTransactionId.ToBytes(), crossChainTransferBytes); - } - - /// Persist multiple cross-chain transfer information into the database. - /// The DBreeze transaction context to use. - /// Cross-chain transfers to be inserted. - private void PutTransfers(DBreeze.Transactions.Transaction dbreezeTransaction, ICrossChainTransfer[] crossChainTransfers) - { - Guard.NotNull(crossChainTransfers, nameof(crossChainTransfers)); - - // Optimal ordering for DB consumption. - var byteListComparer = new ByteListComparer(); - List orderedTransfers = crossChainTransfers.ToList(); - orderedTransfers.Sort((pair1, pair2) => byteListComparer.Compare(pair1.DepositTransactionId.ToBytes(), pair2.DepositTransactionId.ToBytes())); - - // Write each transfer in order. - foreach (ICrossChainTransfer transfer in orderedTransfers) - { - byte[] transferBytes = this.dBreezeSerializer.Serialize(transfer); - dbreezeTransaction.Insert(transferTableName, transfer.DepositTransactionId.ToBytes(), transferBytes); - } - } - - /// Deletes the cross-chain transfer information from the database - /// The DBreeze transaction context to use. - /// Cross-chain transfer information to be deleted. - private void DeleteTransfer(DBreeze.Transactions.Transaction dbreezeTransaction, ICrossChainTransfer crossChainTransfer) - { - Guard.NotNull(crossChainTransfer, nameof(crossChainTransfer)); - - dbreezeTransaction.RemoveKey(transferTableName, crossChainTransfer.DepositTransactionId.ToBytes()); - } - /// - /// Forgets transfer information for the blocks being removed and returns information for updating the transient lookups. + /// Forgets transfer information for the blocks being removed. /// - /// The DBreeze transaction context to use. + /// The cross-chain db transaction context to use. /// The last block to retain. - /// A tracker with all the cross chain transfers that were affected. - private StatusChangeTracker OnDeleteBlocks(DBreeze.Transactions.Transaction dbreezeTransaction, int lastBlockHeight) + private void OnDeleteBlocks(CrossChainDBTransaction xdbTransaction, int lastBlockHeight) { // Gather all the deposit ids that may have had transactions in the blocks being deleted. var depositIds = new HashSet(); @@ -1116,89 +916,27 @@ private StatusChangeTracker OnDeleteBlocks(DBreeze.Transactions.Transaction dbre } // Find the transfers related to these deposit ids in the database. - var tracker = new StatusChangeTracker(); - CrossChainTransfer[] crossChainTransfers = this.Get(dbreezeTransaction, depositIds.ToArray()); + ICrossChainTransfer[] crossChainTransfers = this.Get(xdbTransaction, depositIds.ToArray()); foreach (CrossChainTransfer transfer in crossChainTransfers) { // Transfers that only exist in the DB due to having been seen in a block should be removed completely. if (transfer.DepositHeight == null) { - // Trigger deletion from the status lookup. - tracker.SetTransferStatus(transfer); - // Delete the transfer completely. - this.DeleteTransfer(dbreezeTransaction, transfer); + xdbTransaction.DeleteTransfer(transfer); } else { // Transaction is no longer seen. - tracker.SetTransferStatus(transfer, CrossChainTransferStatus.FullySigned); + transfer.SetStatus(CrossChainTransferStatus.FullySigned); // Write the transfer status to the database. - this.PutTransfer(dbreezeTransaction, transfer); - } - } - - return tracker; - } - - /// Updates the status lookup based on a transfer and its previous status. - /// The cross-chain transfer that was update. - /// The old status. - private void TransferStatusUpdated(ICrossChainTransfer transfer, CrossChainTransferStatus? oldStatus) - { - if (oldStatus != null) - { - this.depositsIdsByStatus[(CrossChainTransferStatus)oldStatus].Remove(transfer.DepositTransactionId); - } - - this.depositsIdsByStatus[transfer.Status].Add(transfer.DepositTransactionId); - } - - /// Update the transient lookups after changes have been committed to the store. - /// Information about how to update the lookups. - private void UpdateLookups(StatusChangeTracker tracker) - { - foreach (uint256 hash in tracker.UniqueBlockHashes()) - { - this.depositIdsByBlockHash[hash] = new HashSet(); - } - - foreach (KeyValuePair kv in tracker) - { - this.TransferStatusUpdated(kv.Key, kv.Value); - - if (kv.Key.BlockHash != null && kv.Key.BlockHeight != null) - { - if (!this.depositIdsByBlockHash[kv.Key.BlockHash].Contains(kv.Key.DepositTransactionId)) - this.depositIdsByBlockHash[kv.Key.BlockHash].Add(kv.Key.DepositTransactionId); - this.blockHeightsByBlockHash[kv.Key.BlockHash] = (int)kv.Key.BlockHeight; + xdbTransaction.PutTransfer(transfer); } } } - /// Undoes the transient lookups after block removals have been committed to the store. - /// Information about how to undo the lookups. - private void UndoLookups(StatusChangeTracker tracker) - { - foreach (KeyValuePair kv in tracker) - { - if (kv.Value == null) - { - this.depositsIdsByStatus[kv.Key.Status].Remove(kv.Key.DepositTransactionId); - } - - this.TransferStatusUpdated(kv.Key, kv.Value); - } - - foreach (uint256 hash in tracker.UniqueBlockHashes()) - { - this.depositIdsByBlockHash.Remove(hash); - this.blockHeightsByBlockHash.Remove(hash); - } - } - public bool ValidateTransaction(Transaction transaction, bool checkSignature = false) { return this.federationWalletManager.ValidateTransaction(transaction, checkSignature); @@ -1217,11 +955,11 @@ public Dictionary GetCrossChainTransferStatusCoun } /// - public void Dispose() + public override void Dispose() { this.SaveCurrentTipAsync().GetAwaiter().GetResult(); this.cancellation.Cancel(); - this.DBreeze.Dispose(); + base.Dispose(); } } -} +} \ No newline at end of file diff --git a/src/Stratis.FederatedPeg.Features.FederationGateway/TargetChain/StatusChangeTracker.cs b/src/Stratis.FederatedPeg.Features.FederationGateway/TargetChain/StatusChangeTracker.cs index 36877734..f008e368 100644 --- a/src/Stratis.FederatedPeg.Features.FederationGateway/TargetChain/StatusChangeTracker.cs +++ b/src/Stratis.FederatedPeg.Features.FederationGateway/TargetChain/StatusChangeTracker.cs @@ -5,10 +5,32 @@ namespace Stratis.FederatedPeg.Features.FederationGateway.TargetChain { - public class StatusChangeTracker : Dictionary + /// + /// Tracks changed transfers and records their original status. + /// + public class StatusChangeTracker : Dictionary, IChangeTracker { /// - /// Records changes to transfers for the purpose of synchronizing the transient lookups after the DB commit. + /// Records the status that was originally read from the database. + /// + /// The transfer to record the original status of. + public void RecordOldValue(IBitcoinSerializable transfer) + { + this[(ICrossChainTransfer)transfer] = ((ICrossChainTransfer)transfer).DbStatus; + } + + /// + /// Instructs the transfer to record its (original) status. Typically called after reading it from the database. + /// + /// The transfer that should record its status. + public void SetOldValue(IBitcoinSerializable transfer) + { + ((ICrossChainTransfer)transfer).RecordDbStatus(); + } + + /// + /// This is used by standalone (not created in context) trackers + /// to set the status on a transfer and at the same time note the change in the tracker. /// /// The cross-chain transfer to update. /// The new status. @@ -16,7 +38,7 @@ public class StatusChangeTracker : DictionaryThe block height of the partialTransaction. /// /// Within the store the earliest status is . In this case null - /// is used to flag a new transfer - a transfer with no earlier status. null is not written to the DB. + /// is used to flag a new transfer within the tracker only. It means that there is no earlier status. /// public void SetTransferStatus(ICrossChainTransfer transfer, CrossChainTransferStatus? status = null, uint256 blockHash = null, int blockHeight = 0) { @@ -28,7 +50,7 @@ public void SetTransferStatus(ICrossChainTransfer transfer, CrossChainTransferSt } else { - // If not setting the status then assume there is no previous status. + // This is a new object and there is no previous status. this[transfer] = null; } } @@ -39,7 +61,8 @@ public void SetTransferStatus(ICrossChainTransfer transfer, CrossChainTransferSt /// A list of unique block hashes for the transfers being tracked. public uint256[] UniqueBlockHashes() { - return this.Keys.Where(k => k.BlockHash != null).Select(k => k.BlockHash).Distinct().ToArray(); + // This tests for transfers containing block hashes and checks that this is not a deletion. + return this.Keys.Where(k => k.BlockHash != null && (k.DepositHeight != null || k.DbStatus == null)).Select(k => k.BlockHash).Distinct().ToArray(); } } }