diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a7bd067..6bb3a14 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -21,8 +21,6 @@ jobs: run: cargo clippy -- -Dclippy::all -D warnings - name: Run clippy (all targets and features) run: cargo clippy --all-targets --all-features -- -Dclippy::all -D warnings - - name: Run tests - run: cargo test --verbose coverage: runs-on: ubuntu-latest diff --git a/Cargo.lock b/Cargo.lock index 0e62a3d..c628a89 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1290,7 +1290,6 @@ dependencies = [ "hotfix-message", "mongodb", "rcgen", - "redb", "rustls", "rustls-native-certs", "rustls-pemfile", @@ -2650,15 +2649,6 @@ dependencies = [ "yasna", ] -[[package]] -name = "redb" -version = "3.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae323eb086579a3769daa2c753bb96deb95993c534711e0dbe881b5192906a06" -dependencies = [ - "libc", -] - [[package]] name = "redox_syscall" version = "0.5.18" diff --git a/Cargo.toml b/Cargo.toml index ac9f1f6..94a1583 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,7 +41,6 @@ proc-macro2 = "1" quickcheck = "1" quickcheck_macros = "1" quote = "1" -redb = "3.1" reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } roxmltree = "0.21" rust-embed = "8.7" diff --git a/crates/hotfix/Cargo.toml b/crates/hotfix/Cargo.toml index 3241f3f..ba88ac8 100644 --- a/crates/hotfix/Cargo.toml +++ b/crates/hotfix/Cargo.toml @@ -14,7 +14,6 @@ categories.workspace = true [features] default = ["test-utils"] fix44 = ["hotfix-message/fix44"] -redb = ["dep:redb"] mongodb = ["dep:mongodb"] test-utils = [] @@ -31,7 +30,6 @@ chrono-tz = { workspace = true, features = ["serde"] } futures = { workspace = true } mongodb = { workspace = true, optional = true } rustls-pki-types = { workspace = true } -redb = { workspace = true, optional = true } rustls = { workspace = true } rustls-native-certs = { workspace = true } rustls-pemfile = { workspace = true } diff --git a/crates/hotfix/src/lib.rs b/crates/hotfix/src/lib.rs index 296dace..b919748 100644 --- a/crates/hotfix/src/lib.rs +++ b/crates/hotfix/src/lib.rs @@ -27,7 +27,6 @@ pub mod application; pub mod config; -pub(crate) mod error; pub mod initiator; pub mod message; pub mod message_utils; diff --git a/crates/hotfix/src/message.rs b/crates/hotfix/src/message.rs index 8395f17..5d2ba92 100644 --- a/crates/hotfix/src/message.rs +++ b/crates/hotfix/src/message.rs @@ -14,6 +14,7 @@ pub mod resend_request; pub mod sequence_reset; pub mod test_request; pub mod verification; +pub mod verification_error; pub use parser::RawFixMessage; pub use resend_request::ResendRequest; diff --git a/crates/hotfix/src/message/verification.rs b/crates/hotfix/src/message/verification.rs index 9350b94..0ecc6cf 100644 --- a/crates/hotfix/src/message/verification.rs +++ b/crates/hotfix/src/message/verification.rs @@ -1,5 +1,5 @@ use crate::config::SessionConfig; -use crate::error::{CompIdType, MessageVerificationError}; +use crate::message::verification_error::{CompIdType, MessageVerificationError}; use hotfix_message::Part; use hotfix_message::field_types::Timestamp; use hotfix_message::message::Message; @@ -180,8 +180,9 @@ fn check_target_comp_id( #[cfg(test)] mod tests { - use super::{Message, MessageVerificationError, SessionConfig, verify_message}; - use crate::error::CompIdType; + use super::{Message, SessionConfig, verify_message}; + use crate::message::verification_error::CompIdType; + use crate::message::verification_error::MessageVerificationError; use hotfix_message::field_types::Timestamp; use hotfix_message::{Part, fix44}; diff --git a/crates/hotfix/src/error.rs b/crates/hotfix/src/message/verification_error.rs similarity index 93% rename from crates/hotfix/src/error.rs rename to crates/hotfix/src/message/verification_error.rs index a78e22b..df17fce 100644 --- a/crates/hotfix/src/error.rs +++ b/crates/hotfix/src/message/verification_error.rs @@ -53,9 +53,3 @@ pub enum CompIdType { Sender, Target, } - -#[derive(Debug, Error)] -pub enum SessionError { - #[error("Schedule configuration is invalid: {0}")] - InvalidSchedule(String), -} diff --git a/crates/hotfix/src/session.rs b/crates/hotfix/src/session.rs index 221502a..e663827 100644 --- a/crates/hotfix/src/session.rs +++ b/crates/hotfix/src/session.rs @@ -1,4 +1,5 @@ pub(crate) mod admin_request; +pub mod error; pub(crate) mod event; mod info; mod session_handle; @@ -26,15 +27,17 @@ use tracing::{debug, enabled, error, info, warn}; use crate::Application; use crate::application::{InboundDecision, OutboundDecision}; -use crate::error::{CompIdType, MessageVerificationError}; use crate::message::logout::Logout; use crate::message::reject::Reject; use crate::message::resend_request::ResendRequest; use crate::message::sequence_reset::SequenceReset; use crate::message::test_request::TestRequest; use crate::message::verification::verify_message; +use crate::message::verification_error::{CompIdType, MessageVerificationError}; use crate::message_utils::{is_admin, prepare_message_for_resend}; use crate::session::admin_request::AdminRequest; +pub use crate::session::info::{SessionInfo, Status}; +pub use crate::session::session_handle::SessionHandle; #[cfg(not(feature = "test-utils"))] pub(crate) use crate::session::session_ref::InternalSessionRef; #[cfg(feature = "test-utils")] @@ -49,9 +52,6 @@ use hotfix_message::session_fields::{ SessionRejectReason, TEST_REQ_ID, }; -pub use crate::session::info::{SessionInfo, Status}; -pub use crate::session::session_handle::SessionHandle; - const SCHEDULE_CHECK_INTERVAL: u64 = 1; struct Session { @@ -365,7 +365,8 @@ where } } - self.store.increment_target_seq_number().await + self.store.increment_target_seq_number().await?; + Ok(()) } async fn on_heartbeat(&mut self, message: &Message) -> Result<()> { @@ -378,7 +379,8 @@ where self.reset_peer_timer(None); } - self.store.increment_target_seq_number().await + self.store.increment_target_seq_number().await?; + Ok(()) } async fn on_test_request(&mut self, message: &Message) -> Result<()> { @@ -515,7 +517,8 @@ where return Ok(()); } - self.store.set_target_seq_number(end - 1).await + self.store.set_target_seq_number(end - 1).await?; + Ok(()) } async fn handle_verification_error(&mut self, error: MessageVerificationError) -> Result<()> { @@ -1129,6 +1132,7 @@ mod tests { use super::*; use crate::application::{InboundDecision, OutboundDecision}; use crate::message::{InboundMessage, OutboundMessage}; + use crate::store::{Result as StoreResult, StoreError}; use chrono::{DateTime, Datelike, NaiveDate, NaiveTime, TimeDelta, Timelike}; use chrono_tz::Tz; use hotfix_message::message::Message; @@ -1168,11 +1172,11 @@ mod tests { #[async_trait::async_trait] impl MessageStore for TestStore { - async fn add(&mut self, _sequence_number: u64, _message: &[u8]) -> Result<()> { + async fn add(&mut self, _sequence_number: u64, _message: &[u8]) -> StoreResult<()> { Ok(()) } - async fn get_slice(&self, _begin: usize, _end: usize) -> Result>> { + async fn get_slice(&self, _begin: usize, _end: usize) -> StoreResult>> { Ok(vec![]) } @@ -1184,25 +1188,25 @@ mod tests { self.target_seq } - async fn increment_sender_seq_number(&mut self) -> Result<()> { + async fn increment_sender_seq_number(&mut self) -> StoreResult<()> { self.sender_seq += 1; Ok(()) } - async fn increment_target_seq_number(&mut self) -> Result<()> { + async fn increment_target_seq_number(&mut self) -> StoreResult<()> { self.target_seq += 1; Ok(()) } - async fn set_target_seq_number(&mut self, seq_number: u64) -> Result<()> { + async fn set_target_seq_number(&mut self, seq_number: u64) -> StoreResult<()> { self.target_seq = seq_number; Ok(()) } - async fn reset(&mut self) -> Result<()> { + async fn reset(&mut self) -> StoreResult<()> { self.reset_called.store(true, Ordering::SeqCst); if self.fail_reset.load(Ordering::SeqCst) { - bail!("simulated reset failure") + return Err(StoreError::Reset("simulated reset failure".into())); } self.creation_time = Utc::now(); Ok(()) diff --git a/crates/hotfix/src/session/error.rs b/crates/hotfix/src/session/error.rs new file mode 100644 index 0000000..758de1d --- /dev/null +++ b/crates/hotfix/src/session/error.rs @@ -0,0 +1,13 @@ +use crate::store::StoreError; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum SessionError { + #[error("Schedule configuration is invalid: {0}")] + InvalidSchedule(String), + + #[error("store operation failed")] + Store(#[from] StoreError), +} + +pub type Result = std::result::Result; diff --git a/crates/hotfix/src/session_schedule.rs b/crates/hotfix/src/session_schedule.rs index 2ce1f0c..08a4756 100644 --- a/crates/hotfix/src/session_schedule.rs +++ b/crates/hotfix/src/session_schedule.rs @@ -1,5 +1,5 @@ use crate::config::ScheduleConfig; -use crate::error::SessionError; +use crate::session::error::SessionError; use chrono::{DateTime, Datelike, Days, NaiveDate, NaiveTime, TimeDelta, Utc, Weekday}; use chrono_tz::Tz; @@ -1043,6 +1043,7 @@ mod tests { SessionError::InvalidSchedule(msg) => { assert!(msg.contains("Weekly sessions cannot have weekdays specified")); } + other => panic!("unexpected error: {other}"), } } diff --git a/crates/hotfix/src/store.rs b/crates/hotfix/src/store.rs index 42dbe1b..cba3438 100644 --- a/crates/hotfix/src/store.rs +++ b/crates/hotfix/src/store.rs @@ -3,6 +3,9 @@ //! By default, only the [in_memory] store is included. Further message store implementations, //! such as `mongodb` and `redb` can be enabled through feature flags. +/// Error types for store operations. +pub mod error; + /// An in-memory message store that loses its state on restart. Only use this for testing. pub mod in_memory; @@ -12,11 +15,9 @@ pub mod mongodb; /// A file-based message store for persistence. pub mod file; -#[cfg(feature = "redb")] -/// A message store using [redb](https://www.redb.org/) for persistence. -pub mod redb; -use anyhow::Result; +pub use error::*; + use chrono::DateTime; #[async_trait::async_trait] diff --git a/crates/hotfix/src/store/error.rs b/crates/hotfix/src/store/error.rs new file mode 100644 index 0000000..0f90898 --- /dev/null +++ b/crates/hotfix/src/store/error.rs @@ -0,0 +1,38 @@ +//! Error types for message store operations. + +use thiserror::Error; + +/// A boxed error type for store errors. +pub type BoxError = Box; + +/// Errors that can occur during message store operations. +#[derive(Debug, Error)] +pub enum StoreError { + /// Failed to persist a message to the store. + #[error("failed to persist message (seq_num: {sequence_number})")] + PersistMessage { + sequence_number: u64, + #[source] + source: BoxError, + }, + + /// Failed to retrieve messages from the store. + #[error("failed to retrieve messages (range: {begin}..={end})")] + RetrieveMessages { + begin: usize, + end: usize, + #[source] + source: BoxError, + }, + + /// Failed to update a sequence number. + #[error("failed to update sequence number")] + UpdateSequenceNumber(#[source] BoxError), + + /// Failed to reset the store. + #[error("failed to reset store")] + Reset(#[source] BoxError), +} + +/// A specialized Result type for store operations. +pub type Result = std::result::Result; diff --git a/crates/hotfix/src/store/file.rs b/crates/hotfix/src/store/file.rs index d8c814d..60bd1f5 100644 --- a/crates/hotfix/src/store/file.rs +++ b/crates/hotfix/src/store/file.rs @@ -1,5 +1,5 @@ -use crate::store::MessageStore; -use anyhow::{Context, Result}; +use crate::store::{MessageStore, Result, StoreError}; +use anyhow::Context; use chrono::{DateTime, Utc}; use std::collections::HashMap; use std::fs::{File, OpenOptions}; @@ -33,7 +33,7 @@ pub struct FileStore { } impl FileStore { - pub fn new(directory: impl AsRef, name: &str) -> Result { + pub fn new(directory: impl AsRef, name: &str) -> anyhow::Result { let base_path = directory.as_ref().join(name); std::fs::create_dir_all(directory)?; @@ -84,7 +84,7 @@ impl FileStore { /// Retrieves the session creation time from the session file. /// /// It initialises the session file if it doesn't exist. - fn get_or_create_session_time(base_path: &Path) -> Result> { + fn get_or_create_session_time(base_path: &Path) -> anyhow::Result> { let session_path = base_path.with_extension("session"); let session_time = if session_path.exists() { let content = std::fs::read_to_string(&session_path)?; @@ -101,7 +101,7 @@ impl FileStore { /// Retrieves the sequence numbers from the seqnums file. /// /// It defaults to `(0, 0)` if the file doesn't exist or if it's empty. - fn read_initial_seqnums(base_path: &Path) -> Result<(u64, u64)> { + fn read_initial_seqnums(base_path: &Path) -> anyhow::Result<(u64, u64)> { let seqnums_path = base_path.with_extension("seqnums"); let (sender_seq_number, target_seq_number) = if seqnums_path.exists() { let content = @@ -118,7 +118,7 @@ impl FileStore { Ok((sender_seq_number, target_seq_number)) } - fn parse_seqnums(content: &str) -> Result<(u64, u64)> { + fn parse_seqnums(content: &str) -> anyhow::Result<(u64, u64)> { let parts: Vec<&str> = content.trim().split(':').map(|s| s.trim()).collect(); if parts.len() != 2 { anyhow::bail!("invalid seqnums format"); @@ -132,7 +132,7 @@ impl FileStore { Ok((sender, target)) } - fn load_message_index(header_path: &Path) -> Result> { + fn load_message_index(header_path: &Path) -> anyhow::Result> { let mut index = HashMap::new(); if !header_path.exists() { @@ -161,22 +161,15 @@ impl FileStore { Ok(index) } - fn write_seqnums(&mut self) -> Result<()> { + fn write_seqnums_with(&mut self, sender: u64, target: u64) -> std::io::Result<()> { self.seqnums_file.seek(SeekFrom::Start(0))?; self.seqnums_file.set_len(0)?; - write!( - self.seqnums_file, - "{:020} : {:020}", - self.sender_seq_number, self.target_seq_number - )?; + write!(self.seqnums_file, "{:020} : {:020}", sender, target)?; self.seqnums_file.flush()?; Ok(()) } -} -#[async_trait::async_trait] -impl MessageStore for FileStore { - async fn add(&mut self, sequence_number: u64, message: &[u8]) -> anyhow::Result<()> { + fn write_message(&mut self, sequence_number: u64, message: &[u8]) -> std::io::Result<()> { let msg_size = message.len(); let offset = self.current_body_offset; @@ -204,58 +197,7 @@ impl MessageStore for FileStore { Ok(()) } - async fn get_slice(&self, begin: usize, end: usize) -> Result>> { - let mut messages = Vec::with_capacity(end - begin + 1); - - let body_path = self.base_path.with_extension("body"); - let mut body_file = - File::open(body_path).context("failed to open body file for reading")?; - - for seq_num in begin..=end { - if let Some(msg_def) = self.message_index.get(&(seq_num as u64)) { - body_file.seek(SeekFrom::Start(msg_def.offset))?; - - let mut buffer = vec![0u8; msg_def.size]; - body_file - .read_exact(&mut buffer) - .context("failed to read message from body file")?; - - messages.push(buffer); - } - } - - Ok(messages) - } - - fn next_sender_seq_number(&self) -> u64 { - self.sender_seq_number + 1 - } - - fn next_target_seq_number(&self) -> u64 { - self.target_seq_number + 1 - } - - async fn increment_sender_seq_number(&mut self) -> Result<()> { - self.sender_seq_number += 1; - self.write_seqnums()?; - - Ok(()) - } - - async fn increment_target_seq_number(&mut self) -> Result<()> { - self.target_seq_number += 1; - self.write_seqnums()?; - - Ok(()) - } - - async fn set_target_seq_number(&mut self, seq_number: u64) -> Result<()> { - self.target_seq_number = seq_number; - self.write_seqnums()?; - Ok(()) - } - - async fn reset(&mut self) -> Result<()> { + fn perform_reset(&mut self) -> std::io::Result<()> { self.body_file.flush()?; self.header_file.flush()?; @@ -313,6 +255,82 @@ impl MessageStore for FileStore { Ok(()) } + fn read_messages(&self, begin: usize, end: usize) -> std::io::Result>> { + let mut messages = Vec::with_capacity(end - begin + 1); + + let body_path = self.base_path.with_extension("body"); + let mut body_file = File::open(body_path)?; + + for seq_num in begin..=end { + if let Some(msg_def) = self.message_index.get(&(seq_num as u64)) { + body_file.seek(SeekFrom::Start(msg_def.offset))?; + + let mut buffer = vec![0u8; msg_def.size]; + body_file.read_exact(&mut buffer)?; + + messages.push(buffer); + } + } + + Ok(messages) + } +} + +#[async_trait::async_trait] +impl MessageStore for FileStore { + async fn add(&mut self, sequence_number: u64, message: &[u8]) -> Result<()> { + self.write_message(sequence_number, message) + .map_err(|err| StoreError::PersistMessage { + sequence_number, + source: err.into(), + }) + } + + async fn get_slice(&self, begin: usize, end: usize) -> Result>> { + self.read_messages(begin, end) + .map_err(|e| StoreError::RetrieveMessages { + begin, + end, + source: e.into(), + }) + } + + fn next_sender_seq_number(&self) -> u64 { + self.sender_seq_number + 1 + } + + fn next_target_seq_number(&self) -> u64 { + self.target_seq_number + 1 + } + + async fn increment_sender_seq_number(&mut self) -> Result<()> { + let new_value = self.sender_seq_number + 1; + self.write_seqnums_with(new_value, self.target_seq_number) + .map_err(|e| StoreError::UpdateSequenceNumber(e.into()))?; + self.sender_seq_number = new_value; + Ok(()) + } + + async fn increment_target_seq_number(&mut self) -> Result<()> { + let new_value = self.target_seq_number + 1; + self.write_seqnums_with(self.sender_seq_number, new_value) + .map_err(|e| StoreError::UpdateSequenceNumber(e.into()))?; + self.target_seq_number = new_value; + Ok(()) + } + + async fn set_target_seq_number(&mut self, seq_number: u64) -> Result<()> { + self.write_seqnums_with(self.sender_seq_number, seq_number) + .map_err(|e| StoreError::UpdateSequenceNumber(e.into()))?; + self.target_seq_number = seq_number; + Ok(()) + } + + async fn reset(&mut self) -> Result<()> { + self.perform_reset() + .map_err(|e| StoreError::Reset(e.into())) + } + fn creation_time(&self) -> DateTime { self.creation_time } diff --git a/crates/hotfix/src/store/in_memory.rs b/crates/hotfix/src/store/in_memory.rs index 9b213e3..5468fe3 100644 --- a/crates/hotfix/src/store/in_memory.rs +++ b/crates/hotfix/src/store/in_memory.rs @@ -1,5 +1,4 @@ -use crate::store::MessageStore; -use anyhow::Result; +use crate::store::{MessageStore, Result}; use chrono::{DateTime, Utc}; use std::collections::HashMap; diff --git a/crates/hotfix/src/store/mongodb.rs b/crates/hotfix/src/store/mongodb.rs index eef8c6e..22e4ee6 100644 --- a/crates/hotfix/src/store/mongodb.rs +++ b/crates/hotfix/src/store/mongodb.rs @@ -1,4 +1,3 @@ -use anyhow::Result; use async_trait::async_trait; use chrono::{DateTime, Utc}; use futures::TryStreamExt; @@ -12,7 +11,7 @@ use serde::{Deserialize, Serialize}; pub use mongodb::Client; -use crate::store::MessageStore; +use crate::store::{MessageStore, Result, StoreError}; #[derive(Debug, Deserialize, Serialize)] struct SequenceMeta { @@ -38,7 +37,7 @@ pub struct MongoDbMessageStore { } impl MongoDbMessageStore { - pub async fn new(db: Database, collection_name: Option<&str>) -> Result { + pub async fn new(db: Database, collection_name: Option<&str>) -> anyhow::Result { let collection_name = collection_name.unwrap_or("messages"); let meta_collection = db.collection(collection_name); let message_collection = db.collection(collection_name); @@ -54,7 +53,7 @@ impl MongoDbMessageStore { Ok(store) } - async fn ensure_indexes(meta_collection: &Collection) -> Result<()> { + async fn ensure_indexes(meta_collection: &Collection) -> anyhow::Result<()> { let meta_index = IndexModel::builder() .keys(doc! { "meta": 1, "_id": -1 }) .build(); @@ -72,7 +71,7 @@ impl MongoDbMessageStore { async fn get_or_default_sequence( meta_collection: &Collection, - ) -> Result { + ) -> anyhow::Result { let options = FindOneOptions::builder().sort(doc! { "_id": -1 }).build(); let res = meta_collection .find_one(doc! { "meta": true }) @@ -86,7 +85,9 @@ impl MongoDbMessageStore { Ok(meta) } - async fn new_sequence(meta_collection: &Collection) -> Result { + async fn new_sequence( + meta_collection: &Collection, + ) -> anyhow::Result { let sequence_id = ObjectId::new(); let initial_meta = SequenceMeta { object_id: sequence_id, @@ -112,12 +113,16 @@ impl MessageStore for MongoDbMessageStore { bytes: message.to_vec(), }, }; - let filter = doc! { "sequence_id": self.current_sequence.object_id, "msg_seq_number": sequence_number as u32 }; + let filter = doc! { "sequence_id": self.current_sequence.object_id, "msg_seq_number": sequence_number as i64 }; let options = ReplaceOptions::builder().upsert(true).build(); self.message_collection .replace_one(filter, message) .with_options(options) - .await?; + .await + .map_err(|e| StoreError::PersistMessage { + sequence_number, + source: e.into(), + })?; Ok(()) } @@ -126,14 +131,29 @@ impl MessageStore for MongoDbMessageStore { let filter = doc! { "sequence_id": self.current_sequence.object_id, "msg_seq_number": doc! { - "$gte": begin as u32, - "$lte": end as u32, + "$gte": begin as i64, + "$lte": end as i64, } }; - let mut cursor = self.message_collection.find(filter).await?; + let mut cursor = self.message_collection.find(filter).await.map_err(|e| { + StoreError::RetrieveMessages { + begin, + end, + source: e.into(), + } + })?; let mut messages = Vec::new(); - while let Some(message) = cursor.try_next().await? { + while let Some(message) = + cursor + .try_next() + .await + .map_err(|e| StoreError::RetrieveMessages { + begin, + end, + source: e.into(), + })? + { messages.push(message.data.bytes); } @@ -149,43 +169,48 @@ impl MessageStore for MongoDbMessageStore { } async fn increment_sender_seq_number(&mut self) -> Result<()> { - self.current_sequence.sender_seq_number += 1; self.meta_collection .update_one( doc! { "_id": self.current_sequence.object_id }, doc! { "$inc": { "sender_seq_number": 1 } }, ) - .await?; + .await + .map_err(|e| StoreError::UpdateSequenceNumber(e.into()))?; + self.current_sequence.sender_seq_number += 1; Ok(()) } async fn increment_target_seq_number(&mut self) -> Result<()> { - self.current_sequence.target_seq_number += 1; self.meta_collection .update_one( doc! { "_id": self.current_sequence.object_id }, doc! { "$inc": { "target_seq_number": 1 } }, ) - .await?; + .await + .map_err(|e| StoreError::UpdateSequenceNumber(e.into()))?; + self.current_sequence.target_seq_number += 1; Ok(()) } async fn set_target_seq_number(&mut self, seq_number: u64) -> Result<()> { - self.current_sequence.target_seq_number = seq_number; self.meta_collection .update_one( doc! { "_id": self.current_sequence.object_id }, - doc! { "$set": { "target_seq_number": seq_number as u32 } }, + doc! { "$set": { "target_seq_number": seq_number as i64 } }, ) - .await?; + .await + .map_err(|e| StoreError::UpdateSequenceNumber(e.into()))?; + self.current_sequence.target_seq_number = seq_number; Ok(()) } async fn reset(&mut self) -> Result<()> { - self.current_sequence = Self::new_sequence(&self.meta_collection).await?; + self.current_sequence = Self::new_sequence(&self.meta_collection) + .await + .map_err(|e| StoreError::Reset(e.into()))?; Ok(()) } diff --git a/crates/hotfix/src/store/redb.rs b/crates/hotfix/src/store/redb.rs deleted file mode 100644 index 62b0230..0000000 --- a/crates/hotfix/src/store/redb.rs +++ /dev/null @@ -1,178 +0,0 @@ -use crate::store::MessageStore; -use anyhow::{Result, bail}; -use chrono::{DateTime, Utc}; -use redb::{Database, ReadOnlyTable, ReadableDatabase, TableDefinition, TableError}; -use std::path::Path; - -const MESSAGES_TABLE: TableDefinition = TableDefinition::new("messages"); -const META_TABLE: TableDefinition<&str, u64> = TableDefinition::new("seq_numbers"); -const SENDER_KEY: &str = "sender"; -const TARGET_KEY: &str = "target"; -const CREATION_TIME_KEY: &str = "creation_time"; - -struct MetaData { - creation_time: DateTime, - sender_seq_number: u64, - target_seq_number: u64, -} - -pub struct RedbMessageStore { - db: Database, - meta: MetaData, -} - -impl RedbMessageStore { - pub fn new(path: impl AsRef) -> Result { - let db = Database::create(path)?; - - let meta = if let Some(stored_metadata) = Self::load_metadata(&db)? { - stored_metadata - } else { - Self::persist_default_metadata(&db)?; - Self::load_metadata(&db)? - .ok_or_else(|| anyhow::anyhow!("failed to read metadata after initialization"))? - }; - - Ok(Self { db, meta }) - } - - fn persist_default_metadata(db: &Database) -> Result<()> { - let creation_timestamp = Utc::now().timestamp_micros() as u64; - let sender_seq_number = 0; - let target_seq_number = 0; - - // if we have just set the creation time, we need to write it to redb - let write_txn = db.begin_write()?; - { - let mut meta_table = write_txn.open_table(META_TABLE)?; - meta_table.insert(CREATION_TIME_KEY, creation_timestamp)?; - meta_table.insert(SENDER_KEY, sender_seq_number)?; - meta_table.insert(TARGET_KEY, target_seq_number)?; - let mut messages_table = write_txn.open_table(MESSAGES_TABLE)?; - messages_table.extract_if(|_, _| true)?.for_each(drop); - } - write_txn.commit()?; - Ok(()) - } - - fn load_metadata(db: &Database) -> Result> { - let read_txn = db.begin_read()?; - let metadata = match read_txn.open_table(META_TABLE) { - Ok(table) => { - let creation_time = Self::parse_timestamp(Self::read_required_meta_field( - &table, - CREATION_TIME_KEY, - )?)?; - let sender_seq_number = Self::read_required_meta_field(&table, SENDER_KEY)?; - let target_seq_number = Self::read_required_meta_field(&table, TARGET_KEY)?; - - Some(MetaData { - creation_time, - sender_seq_number, - target_seq_number, - }) - } - Err(TableError::TableDoesNotExist(_)) => None, - Err(err) => { - return Err(err.into()); - } - }; - - Ok(metadata) - } - - fn read_required_meta_field(table: &ReadOnlyTable<&str, u64>, key: &str) -> Result { - table - .get(key)? - .map(|v| v.value()) - .ok_or_else(|| anyhow::anyhow!("missing required metadata field: {key}")) - } - - fn parse_timestamp(timestamp: u64) -> Result> { - DateTime::from_timestamp_micros(timestamp as i64) - .ok_or_else(|| anyhow::anyhow!("invalid timestamp: {timestamp}")) - } - - async fn update_sequence_number(&mut self, key: &str, value: u64) -> Result<()> { - let write_txn = self.db.begin_write()?; - { - let mut table = write_txn.open_table(META_TABLE)?; - table.insert(key, value)?; - } - write_txn.commit()?; - Ok(()) - } -} - -#[async_trait::async_trait] -impl MessageStore for RedbMessageStore { - async fn add(&mut self, sequence_number: u64, message: &[u8]) -> Result<()> { - let write_txn = self.db.begin_write()?; - { - let mut table = write_txn.open_table(MESSAGES_TABLE)?; - table.insert(sequence_number, message)?; - } - write_txn.commit()?; - Ok(()) - } - - async fn get_slice(&self, begin: usize, end: usize) -> Result>> { - if begin > end { - return Ok(vec![]); - } - - let read_txn = self.db.begin_read()?; - match read_txn.open_table(MESSAGES_TABLE) { - Ok(table) => { - let messages: std::result::Result>, redb::StorageError> = table - .range(begin as u64..=end as u64)? - .map(|m| m.map(|v| v.1.value().to_vec())) - .collect(); - Ok(messages?) - } - Err(TableError::TableDoesNotExist(_)) => Ok(vec![]), - Err(err) => Err(err.into()), - } - } - - fn next_sender_seq_number(&self) -> u64 { - self.meta.sender_seq_number + 1 - } - - fn next_target_seq_number(&self) -> u64 { - self.meta.target_seq_number + 1 - } - - async fn increment_sender_seq_number(&mut self) -> Result<()> { - let sender_seq_number = self.meta.sender_seq_number + 1; - self.update_sequence_number(SENDER_KEY, sender_seq_number) - .await?; - self.meta.sender_seq_number = sender_seq_number; - Ok(()) - } - - async fn increment_target_seq_number(&mut self) -> Result<()> { - self.set_target_seq_number(self.meta.target_seq_number + 1) - .await - } - - async fn set_target_seq_number(&mut self, seq_number: u64) -> Result<()> { - self.update_sequence_number(TARGET_KEY, seq_number).await?; - self.meta.target_seq_number = seq_number; - Ok(()) - } - - async fn reset(&mut self) -> Result<()> { - Self::persist_default_metadata(&self.db)?; - if let Some(meta) = Self::load_metadata(&self.db)? { - self.meta = meta; - Ok(()) - } else { - bail!("meta unexpectedly not found") - } - } - - fn creation_time(&self) -> DateTime { - self.meta.creation_time - } -} diff --git a/crates/hotfix/tests/store_tests.rs b/crates/hotfix/tests/common_store_tests.rs similarity index 91% rename from crates/hotfix/tests/store_tests.rs rename to crates/hotfix/tests/common_store_tests.rs index a137c29..50e3a37 100644 --- a/crates/hotfix/tests/store_tests.rs +++ b/crates/hotfix/tests/common_store_tests.rs @@ -346,14 +346,6 @@ async fn create_test_store_factories() -> Vec> { Box::new(FileStoreTestFactory::new()) as Box, ]; - // Add redb store factory if the feature is enabled - #[cfg(feature = "redb")] - { - stores.push( - Box::new(redb_test_utils::RedbTestStoreFactory::new()) as Box - ); - } - #[cfg(feature = "mongodb")] { stores.push( @@ -408,43 +400,6 @@ impl Drop for FileStoreTestFactory { } } -#[cfg(feature = "redb")] -mod redb_test_utils { - use super::*; - use hotfix::store::MessageStore; - use hotfix::store::redb::RedbMessageStore; - use std::path::PathBuf; - use std::{env, fs}; - - pub(crate) struct RedbTestStoreFactory { - db_path: PathBuf, - } - - impl RedbTestStoreFactory { - pub(crate) fn new() -> Self { - let mut temp_path = env::temp_dir(); - temp_path.push(format!("redb_test_{}", uuid::Uuid::new_v4())); - temp_path.set_extension("db"); - - Self { db_path: temp_path } - } - } - - #[async_trait::async_trait] - impl TestStoreFactory for RedbTestStoreFactory { - async fn create_store(&self) -> Box { - Box::new(RedbMessageStore::new(&self.db_path).expect("Failed to create store")) - } - } - - impl Drop for RedbTestStoreFactory { - fn drop(&mut self) { - // Clean up the database file when the test store is dropped - let _ = fs::remove_file(&self.db_path); - } - } -} - #[cfg(feature = "mongodb")] mod mongodb_test_utils { use crate::TestStoreFactory; diff --git a/crates/hotfix/tests/file_store_tests.rs b/crates/hotfix/tests/file_store_tests.rs new file mode 100644 index 0000000..cc74978 --- /dev/null +++ b/crates/hotfix/tests/file_store_tests.rs @@ -0,0 +1,200 @@ +use hotfix::store::file::FileStore; +use hotfix::store::{MessageStore, StoreError}; +use std::fs; +use tempfile::TempDir; + +fn create_test_store() -> (TempDir, FileStore) { + let dir = TempDir::new().unwrap(); + let store = FileStore::new(dir.path(), "test").unwrap(); + (dir, store) +} + +mod corrupted_file_tests { + use super::*; + + #[tokio::test] + async fn test_corrupted_seqnums_file() { + let dir = TempDir::new().unwrap(); + let base_path = dir.path().join("test"); + + // Create required session file + fs::write( + base_path.with_extension("session"), + chrono::Utc::now().to_rfc3339(), + ) + .unwrap(); + + // Create a corrupted seqnums file (invalid format) + fs::write(base_path.with_extension("seqnums"), "not:valid:format").unwrap(); + + let result = FileStore::new(dir.path(), "test"); + + assert!(result.is_err()); + let err = result.err().unwrap(); + let err_msg = err.to_string(); + assert!( + err_msg.contains("seqnums") || err_msg.contains("parse"), + "Error should mention seqnums parsing: {}", + err_msg + ); + } + + #[tokio::test] + async fn test_corrupted_session_file() { + let dir = TempDir::new().unwrap(); + let base_path = dir.path().join("test"); + + // Create a corrupted session file (invalid datetime) + fs::write(base_path.with_extension("session"), "not-a-valid-datetime").unwrap(); + + let result = FileStore::new(dir.path(), "test"); + + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_corrupted_header_file() { + let dir = TempDir::new().unwrap(); + let base_path = dir.path().join("test"); + + // Create valid session file + fs::write( + base_path.with_extension("session"), + chrono::Utc::now().to_rfc3339(), + ) + .unwrap(); + + // Create body file with some content + fs::write(base_path.with_extension("body"), b"test message content").unwrap(); + + // Create corrupted header file (malformed lines) + // FileStore silently skips malformed lines, so this should succeed + // but the message won't be found + fs::write( + base_path.with_extension("header"), + "invalid_line_without_commas\n1,not_a_number,10\n", + ) + .unwrap(); + + let store = FileStore::new(dir.path(), "test"); + + // Store creation should succeed (malformed lines are skipped) + assert!(store.is_ok()); + + let store = store.unwrap(); + // No messages should be loaded due to malformed headers + let messages = store.get_slice(1, 10).await.unwrap(); + assert_eq!(messages.len(), 0); + } + + #[tokio::test] + async fn test_partial_seqnums_content() { + let dir = TempDir::new().unwrap(); + let base_path = dir.path().join("test"); + + // Create required session file + fs::write( + base_path.with_extension("session"), + chrono::Utc::now().to_rfc3339(), + ) + .unwrap(); + + // Create truncated seqnums file (only one number) + fs::write(base_path.with_extension("seqnums"), "00000000000000000001").unwrap(); + + let result = FileStore::new(dir.path(), "test"); + + assert!(result.is_err()); + let err = result.err().unwrap(); + let err_msg = err.to_string(); + assert!( + err_msg.contains("seqnums") || err_msg.contains("format"), + "Error should mention format issue: {}", + err_msg + ); + } +} + +mod file_system_error_tests { + use super::*; + + #[tokio::test] + #[cfg(unix)] + async fn test_readonly_directory() { + use std::os::unix::fs::PermissionsExt; + + let dir = TempDir::new().unwrap(); + let readonly_dir = dir.path().join("readonly"); + fs::create_dir(&readonly_dir).unwrap(); + + // Make directory read-only + let mut perms = fs::metadata(&readonly_dir).unwrap().permissions(); + perms.set_mode(0o444); + fs::set_permissions(&readonly_dir, perms).unwrap(); + + let result = FileStore::new(&readonly_dir, "test"); + + // Should fail because we can't create files + assert!(result.is_err()); + + // Restore permissions for cleanup + let mut perms = fs::metadata(&readonly_dir).unwrap().permissions(); + perms.set_mode(0o755); + fs::set_permissions(&readonly_dir, perms).unwrap(); + } + + #[tokio::test] + async fn test_missing_body_file_on_read() { + let (dir, mut store) = create_test_store(); + + // Add a message + store.add(1, b"test message").await.unwrap(); + + // Delete the body file + let body_path = dir.path().join("test.body"); + fs::remove_file(&body_path).unwrap(); + + // Attempt to read - should fail + let result = store.get_slice(1, 1).await; + + assert!(matches!(result, Err(StoreError::RetrieveMessages { .. }))); + } + + #[tokio::test] + async fn test_directory_not_exists_creates_it() { + let dir = TempDir::new().unwrap(); + let nested_path = dir + .path() + .join("nested") + .join("path") + .join("to") + .join("store"); + + // Directory doesn't exist yet + assert!(!nested_path.exists()); + + // FileStore should create the directory + let result = FileStore::new(&nested_path, "test"); + assert!(result.is_ok()); + + // Directory should now exist + assert!(nested_path.exists()); + } + + #[tokio::test] + async fn test_body_file_truncated() { + let (dir, mut store) = create_test_store(); + + // Add a message + store.add(1, b"test message content").await.unwrap(); + + // Truncate the body file + let body_path = dir.path().join("test.body"); + fs::write(&body_path, b"short").unwrap(); + + // Attempt to read - should fail because we can't read expected bytes + let result = store.get_slice(1, 1).await; + + assert!(matches!(result, Err(StoreError::RetrieveMessages { .. }))); + } +} diff --git a/crates/hotfix/tests/mongodb_store_tests.rs b/crates/hotfix/tests/mongodb_store_tests.rs new file mode 100644 index 0000000..9e688df --- /dev/null +++ b/crates/hotfix/tests/mongodb_store_tests.rs @@ -0,0 +1,116 @@ +#![cfg(feature = "mongodb")] + +use hotfix::store::mongodb::{Client, MongoDbMessageStore}; +use hotfix::store::{MessageStore, StoreError}; +use testcontainers::runners::AsyncRunner; +use testcontainers::{ContainerAsync, GenericImage}; + +const MONGO_PORT: u16 = 27017; + +async fn create_dedicated_container_and_store() +-> (ContainerAsync, MongoDbMessageStore) { + let container = GenericImage::new("mongo", "8.0").start().await.unwrap(); + let host = container.get_host().await.unwrap(); + let port = container.get_host_port_ipv4(MONGO_PORT).await.unwrap(); + + let client = Client::with_uri_str(format!("mongodb://{host}:{port}")) + .await + .unwrap(); + let db = client.database("test_conn_failure"); + let store = MongoDbMessageStore::new(db, Some("test")).await.unwrap(); + + (container, store) +} + +#[tokio::test] +async fn test_add_after_connection_drop() { + let (container, mut store) = create_dedicated_container_and_store().await; + + // Verify store works initially + store.add(1, b"initial message").await.unwrap(); + + // Stop the container + container.stop().await.unwrap(); + + // Attempt operation - should fail with appropriate error + let result = store.add(2, b"should fail").await; + + assert!(matches!(result, Err(StoreError::PersistMessage { .. }))); +} + +#[tokio::test] +async fn test_get_slice_after_connection_drop() { + let (container, mut store) = create_dedicated_container_and_store().await; + + // Add a message while connected + store.add(1, b"test message").await.unwrap(); + + // Stop the container + container.stop().await.unwrap(); + + // Attempt retrieval - should fail + let result = store.get_slice(1, 1).await; + + assert!(matches!(result, Err(StoreError::RetrieveMessages { .. }))); +} + +#[tokio::test] +async fn test_increment_after_connection_drop() { + let (container, mut store) = create_dedicated_container_and_store().await; + + // Stop the container + container.stop().await.unwrap(); + + // Attempt increment - should fail + let result = store.increment_sender_seq_number().await; + + assert!(matches!(result, Err(StoreError::UpdateSequenceNumber(_)))); +} + +#[tokio::test] +async fn test_reset_after_connection_drop() { + let (container, mut store) = create_dedicated_container_and_store().await; + + // Stop the container + container.stop().await.unwrap(); + + // Attempt reset - should fail + let result = store.reset().await; + + assert!(matches!(result, Err(StoreError::Reset(_)))); +} + +#[tokio::test] +async fn test_state_preserved_after_failed_increment() { + let (container, mut store) = create_dedicated_container_and_store().await; + + let initial_sender_seq = store.next_sender_seq_number(); + let initial_target_seq = store.next_target_seq_number(); + + // Stop the container + container.stop().await.unwrap(); + + // Attempt increments - should fail + let _ = store.increment_sender_seq_number().await; + let _ = store.increment_target_seq_number().await; + + // State should be unchanged since DB write failed first + assert_eq!(store.next_sender_seq_number(), initial_sender_seq); + assert_eq!(store.next_target_seq_number(), initial_target_seq); +} + +#[tokio::test] +async fn test_state_preserved_after_failed_set_target() { + let (container, mut store) = create_dedicated_container_and_store().await; + + let initial_target_seq = store.next_target_seq_number(); + + // Stop the container + container.stop().await.unwrap(); + + // Attempt set - should fail + let _ = store.set_target_seq_number(100).await; + + // State should be unchanged + assert_eq!(store.next_target_seq_number(), initial_target_seq); +} diff --git a/examples/load-testing/Cargo.toml b/examples/load-testing/Cargo.toml index 3dc1964..fb3ac77 100644 --- a/examples/load-testing/Cargo.toml +++ b/examples/load-testing/Cargo.toml @@ -7,7 +7,7 @@ license.workspace = true publish = false [dependencies] -hotfix = { path = "../../crates/hotfix", features = ["fix44", "mongodb", "redb"] } +hotfix = { path = "../../crates/hotfix", features = ["fix44"] } anyhow.workspace = true async-trait.workspace = true diff --git a/examples/load-testing/src/main.rs b/examples/load-testing/src/main.rs index 5686451..61b1d90 100644 --- a/examples/load-testing/src/main.rs +++ b/examples/load-testing/src/main.rs @@ -23,7 +23,6 @@ use crate::messages::{ExecutionReport, NewOrderSingle, OutboundMsg}; enum Database { Memory, File, - Redb, } #[derive(Parser, Debug)] @@ -105,11 +104,6 @@ async fn start_session( .expect("be able to create store"); Initiator::start(session_config, app, store).await } - Database::Redb => { - let store = hotfix::store::redb::RedbMessageStore::new("perf-session.db") - .expect("be able to create store"); - Initiator::start(session_config, app, store).await - } } } diff --git a/examples/simple-new-order/Cargo.toml b/examples/simple-new-order/Cargo.toml index 51e2458..81e7ad5 100644 --- a/examples/simple-new-order/Cargo.toml +++ b/examples/simple-new-order/Cargo.toml @@ -7,7 +7,7 @@ license.workspace = true publish = false [dependencies] -hotfix = { path = "../../crates/hotfix", features = ["fix44", "mongodb", "redb"] } +hotfix = { path = "../../crates/hotfix", features = ["fix44", "mongodb"] } hotfix-web = { path = "../../crates/hotfix-web", features = ["ui"] } anyhow.workspace = true diff --git a/examples/simple-new-order/src/main.rs b/examples/simple-new-order/src/main.rs index e41eb66..7fe194b 100644 --- a/examples/simple-new-order/src/main.rs +++ b/examples/simple-new-order/src/main.rs @@ -8,7 +8,6 @@ use hotfix::field_types::{Date, Timestamp}; use hotfix::fix44; use hotfix::initiator::Initiator; use hotfix::session::SessionHandle; -use hotfix::store::mongodb::Client; use hotfix_web::{RouterConfig, build_router_with_config}; use std::path::Path; use tokio::select; @@ -23,8 +22,8 @@ use crate::messages::{NewOrderSingle, OutboundMsg}; #[derive(ValueEnum, Clone, Debug)] #[clap(rename_all = "lower")] enum Database { - Redb, - Mongodb, + Memory, + File, } #[derive(Parser, Debug)] @@ -66,7 +65,7 @@ async fn main() -> Result<()> { .init(); } - let db_config = args.database.unwrap_or(Database::Redb); + let db_config = args.database.unwrap_or(Database::Memory); let app = TestApplication::default(); let initiator = start_session(&args.config, &db_config, app).await?; @@ -154,20 +153,13 @@ async fn start_session( .context("config must include a session")?; match db_config { - Database::Redb => { - let store = hotfix::store::redb::RedbMessageStore::new("session.db") - .context("failed to create redb store")?; + Database::Memory => { + let store = hotfix::store::in_memory::InMemoryMessageStore::default(); Initiator::start(session_config, app, store).await } - Database::Mongodb => { - let uri = "mongodb://localhost:30001"; - let client = Client::with_uri_str(uri) - .await - .context("failed to create mongodb client")?; - let store = - hotfix::store::mongodb::MongoDbMessageStore::new(client.database("hotfix"), None) - .await - .context("failed to create mongodb store")?; + Database::File => { + let store = hotfix::store::file::FileStore::new("data", "simple-new-order-store") + .context("failed to create file store")?; Initiator::start(session_config, app, store).await } }