From 14127f024a19e2e19abe48e0dde2cb8db08a2865 Mon Sep 17 00:00:00 2001 From: David Steiner Date: Thu, 22 Jan 2026 10:58:38 +0100 Subject: [PATCH 01/11] Move message verification errors into their own module --- crates/hotfix/src/error.rs | 54 ------------------ crates/hotfix/src/message.rs | 1 + crates/hotfix/src/message/verification.rs | 7 ++- .../hotfix/src/message/verification_error.rs | 55 +++++++++++++++++++ crates/hotfix/src/session.rs | 7 +-- 5 files changed, 63 insertions(+), 61 deletions(-) create mode 100644 crates/hotfix/src/message/verification_error.rs diff --git a/crates/hotfix/src/error.rs b/crates/hotfix/src/error.rs index a78e22b..96cf249 100644 --- a/crates/hotfix/src/error.rs +++ b/crates/hotfix/src/error.rs @@ -1,59 +1,5 @@ -use hotfix_message::field_types::Timestamp; use thiserror::Error; -#[derive(Debug, Error)] -pub enum MessageVerificationError { - /// The message's sequence number is lower than we expected. - #[error( - "sequence number too low (expected {expected:?}, actual {actual:?}, possible duplicate: {possible_duplicate})" - )] - SeqNumberTooLow { - expected: u64, - actual: u64, - possible_duplicate: bool, - }, - - /// The message's sequence number is higher than we expected. - #[error("sequence number too high (expected {expected:?}, actual {actual:?})")] - SeqNumberTooHigh { expected: u64, actual: u64 }, - - /// The begin string is different from our expectations. - #[error("incorrect begin string {0}")] - IncorrectBeginString(String), - - /// The comp ID is different from our expectations. - #[error("incorrect comp id {comp_id} ({comp_id_type:?})")] - IncorrectCompId { - comp_id: String, - comp_id_type: CompIdType, - msg_seq_num: u64, - }, - /// The sending time is not within the latency threshold. - #[error("sending time accuracy issue")] - SendingTimeAccuracyIssue { msg_seq_num: u64 }, - /// The sending time field is missing from the message. - #[error("sending time missing")] - SendingTimeMissing { msg_seq_num: u64 }, - /// Original sending time is not provided despite PossDupFlag being set. - #[error("original sending time missing")] - OriginalSendingTimeMissing { msg_seq_num: u64 }, - /// The original sending time is after the sending time of the message. - #[error( - "original sending time {original_sending_time:?} is after sending time {sending_time:?}" - )] - OriginalSendingTimeAfterSendingTime { - msg_seq_num: u64, - original_sending_time: Timestamp, - sending_time: Timestamp, - }, -} - -#[derive(Debug)] -pub enum CompIdType { - Sender, - Target, -} - #[derive(Debug, Error)] pub enum SessionError { #[error("Schedule configuration is invalid: {0}")] diff --git a/crates/hotfix/src/message.rs b/crates/hotfix/src/message.rs index 8395f17..5d2ba92 100644 --- a/crates/hotfix/src/message.rs +++ b/crates/hotfix/src/message.rs @@ -14,6 +14,7 @@ pub mod resend_request; pub mod sequence_reset; pub mod test_request; pub mod verification; +pub mod verification_error; pub use parser::RawFixMessage; pub use resend_request::ResendRequest; diff --git a/crates/hotfix/src/message/verification.rs b/crates/hotfix/src/message/verification.rs index 9350b94..0ecc6cf 100644 --- a/crates/hotfix/src/message/verification.rs +++ b/crates/hotfix/src/message/verification.rs @@ -1,5 +1,5 @@ use crate::config::SessionConfig; -use crate::error::{CompIdType, MessageVerificationError}; +use crate::message::verification_error::{CompIdType, MessageVerificationError}; use hotfix_message::Part; use hotfix_message::field_types::Timestamp; use hotfix_message::message::Message; @@ -180,8 +180,9 @@ fn check_target_comp_id( #[cfg(test)] mod tests { - use super::{Message, MessageVerificationError, SessionConfig, verify_message}; - use crate::error::CompIdType; + use super::{Message, SessionConfig, verify_message}; + use crate::message::verification_error::CompIdType; + use crate::message::verification_error::MessageVerificationError; use hotfix_message::field_types::Timestamp; use hotfix_message::{Part, fix44}; diff --git a/crates/hotfix/src/message/verification_error.rs b/crates/hotfix/src/message/verification_error.rs new file mode 100644 index 0000000..df17fce --- /dev/null +++ b/crates/hotfix/src/message/verification_error.rs @@ -0,0 +1,55 @@ +use hotfix_message::field_types::Timestamp; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum MessageVerificationError { + /// The message's sequence number is lower than we expected. + #[error( + "sequence number too low (expected {expected:?}, actual {actual:?}, possible duplicate: {possible_duplicate})" + )] + SeqNumberTooLow { + expected: u64, + actual: u64, + possible_duplicate: bool, + }, + + /// The message's sequence number is higher than we expected. + #[error("sequence number too high (expected {expected:?}, actual {actual:?})")] + SeqNumberTooHigh { expected: u64, actual: u64 }, + + /// The begin string is different from our expectations. + #[error("incorrect begin string {0}")] + IncorrectBeginString(String), + + /// The comp ID is different from our expectations. + #[error("incorrect comp id {comp_id} ({comp_id_type:?})")] + IncorrectCompId { + comp_id: String, + comp_id_type: CompIdType, + msg_seq_num: u64, + }, + /// The sending time is not within the latency threshold. + #[error("sending time accuracy issue")] + SendingTimeAccuracyIssue { msg_seq_num: u64 }, + /// The sending time field is missing from the message. + #[error("sending time missing")] + SendingTimeMissing { msg_seq_num: u64 }, + /// Original sending time is not provided despite PossDupFlag being set. + #[error("original sending time missing")] + OriginalSendingTimeMissing { msg_seq_num: u64 }, + /// The original sending time is after the sending time of the message. + #[error( + "original sending time {original_sending_time:?} is after sending time {sending_time:?}" + )] + OriginalSendingTimeAfterSendingTime { + msg_seq_num: u64, + original_sending_time: Timestamp, + sending_time: Timestamp, + }, +} + +#[derive(Debug)] +pub enum CompIdType { + Sender, + Target, +} diff --git a/crates/hotfix/src/session.rs b/crates/hotfix/src/session.rs index 221502a..b062a74 100644 --- a/crates/hotfix/src/session.rs +++ b/crates/hotfix/src/session.rs @@ -26,15 +26,17 @@ use tracing::{debug, enabled, error, info, warn}; use crate::Application; use crate::application::{InboundDecision, OutboundDecision}; -use crate::error::{CompIdType, MessageVerificationError}; use crate::message::logout::Logout; use crate::message::reject::Reject; use crate::message::resend_request::ResendRequest; use crate::message::sequence_reset::SequenceReset; use crate::message::test_request::TestRequest; use crate::message::verification::verify_message; +use crate::message::verification_error::{CompIdType, MessageVerificationError}; use crate::message_utils::{is_admin, prepare_message_for_resend}; use crate::session::admin_request::AdminRequest; +pub use crate::session::info::{SessionInfo, Status}; +pub use crate::session::session_handle::SessionHandle; #[cfg(not(feature = "test-utils"))] pub(crate) use crate::session::session_ref::InternalSessionRef; #[cfg(feature = "test-utils")] @@ -49,9 +51,6 @@ use hotfix_message::session_fields::{ SessionRejectReason, TEST_REQ_ID, }; -pub use crate::session::info::{SessionInfo, Status}; -pub use crate::session::session_handle::SessionHandle; - const SCHEDULE_CHECK_INTERVAL: u64 = 1; struct Session { From 32655f8c3b2ff2458d19e99f8bd309aa01dfb824 Mon Sep 17 00:00:00 2001 From: David Steiner Date: Thu, 22 Jan 2026 11:01:14 +0100 Subject: [PATCH 02/11] Move SessionError into the session module --- crates/hotfix/src/lib.rs | 1 - crates/hotfix/src/session.rs | 1 + crates/hotfix/src/{ => session}/error.rs | 2 ++ crates/hotfix/src/session_schedule.rs | 2 +- 4 files changed, 4 insertions(+), 2 deletions(-) rename crates/hotfix/src/{ => session}/error.rs (72%) diff --git a/crates/hotfix/src/lib.rs b/crates/hotfix/src/lib.rs index 296dace..b919748 100644 --- a/crates/hotfix/src/lib.rs +++ b/crates/hotfix/src/lib.rs @@ -27,7 +27,6 @@ pub mod application; pub mod config; -pub(crate) mod error; pub mod initiator; pub mod message; pub mod message_utils; diff --git a/crates/hotfix/src/session.rs b/crates/hotfix/src/session.rs index b062a74..dc656ed 100644 --- a/crates/hotfix/src/session.rs +++ b/crates/hotfix/src/session.rs @@ -1,4 +1,5 @@ pub(crate) mod admin_request; +pub mod error; pub(crate) mod event; mod info; mod session_handle; diff --git a/crates/hotfix/src/error.rs b/crates/hotfix/src/session/error.rs similarity index 72% rename from crates/hotfix/src/error.rs rename to crates/hotfix/src/session/error.rs index 96cf249..3c1a448 100644 --- a/crates/hotfix/src/error.rs +++ b/crates/hotfix/src/session/error.rs @@ -5,3 +5,5 @@ pub enum SessionError { #[error("Schedule configuration is invalid: {0}")] InvalidSchedule(String), } + +pub type Result = std::result::Result; diff --git a/crates/hotfix/src/session_schedule.rs b/crates/hotfix/src/session_schedule.rs index 2ce1f0c..e1ffa08 100644 --- a/crates/hotfix/src/session_schedule.rs +++ b/crates/hotfix/src/session_schedule.rs @@ -1,5 +1,5 @@ use crate::config::ScheduleConfig; -use crate::error::SessionError; +use crate::session::error::SessionError; use chrono::{DateTime, Datelike, Days, NaiveDate, NaiveTime, TimeDelta, Utc, Weekday}; use chrono_tz::Tz; From d411a4f20f7ea05b755dd41b76258dbd4c8d0d2b Mon Sep 17 00:00:00 2001 From: David Steiner Date: Thu, 22 Jan 2026 11:37:26 +0100 Subject: [PATCH 03/11] Introduce StoreError for message store related errors --- crates/hotfix/src/session.rs | 24 ++-- crates/hotfix/src/session/error.rs | 4 + crates/hotfix/src/session_schedule.rs | 1 + crates/hotfix/src/store.rs | 6 +- crates/hotfix/src/store/error.rs | 38 +++++ crates/hotfix/src/store/file.rs | 191 ++++++++++++++------------ crates/hotfix/src/store/in_memory.rs | 3 +- crates/hotfix/src/store/mongodb.rs | 48 +++++-- crates/hotfix/src/store/redb.rs | 92 ++++++++----- 9 files changed, 258 insertions(+), 149 deletions(-) create mode 100644 crates/hotfix/src/store/error.rs diff --git a/crates/hotfix/src/session.rs b/crates/hotfix/src/session.rs index dc656ed..e663827 100644 --- a/crates/hotfix/src/session.rs +++ b/crates/hotfix/src/session.rs @@ -365,7 +365,8 @@ where } } - self.store.increment_target_seq_number().await + self.store.increment_target_seq_number().await?; + Ok(()) } async fn on_heartbeat(&mut self, message: &Message) -> Result<()> { @@ -378,7 +379,8 @@ where self.reset_peer_timer(None); } - self.store.increment_target_seq_number().await + self.store.increment_target_seq_number().await?; + Ok(()) } async fn on_test_request(&mut self, message: &Message) -> Result<()> { @@ -515,7 +517,8 @@ where return Ok(()); } - self.store.set_target_seq_number(end - 1).await + self.store.set_target_seq_number(end - 1).await?; + Ok(()) } async fn handle_verification_error(&mut self, error: MessageVerificationError) -> Result<()> { @@ -1129,6 +1132,7 @@ mod tests { use super::*; use crate::application::{InboundDecision, OutboundDecision}; use crate::message::{InboundMessage, OutboundMessage}; + use crate::store::{Result as StoreResult, StoreError}; use chrono::{DateTime, Datelike, NaiveDate, NaiveTime, TimeDelta, Timelike}; use chrono_tz::Tz; use hotfix_message::message::Message; @@ -1168,11 +1172,11 @@ mod tests { #[async_trait::async_trait] impl MessageStore for TestStore { - async fn add(&mut self, _sequence_number: u64, _message: &[u8]) -> Result<()> { + async fn add(&mut self, _sequence_number: u64, _message: &[u8]) -> StoreResult<()> { Ok(()) } - async fn get_slice(&self, _begin: usize, _end: usize) -> Result>> { + async fn get_slice(&self, _begin: usize, _end: usize) -> StoreResult>> { Ok(vec![]) } @@ -1184,25 +1188,25 @@ mod tests { self.target_seq } - async fn increment_sender_seq_number(&mut self) -> Result<()> { + async fn increment_sender_seq_number(&mut self) -> StoreResult<()> { self.sender_seq += 1; Ok(()) } - async fn increment_target_seq_number(&mut self) -> Result<()> { + async fn increment_target_seq_number(&mut self) -> StoreResult<()> { self.target_seq += 1; Ok(()) } - async fn set_target_seq_number(&mut self, seq_number: u64) -> Result<()> { + async fn set_target_seq_number(&mut self, seq_number: u64) -> StoreResult<()> { self.target_seq = seq_number; Ok(()) } - async fn reset(&mut self) -> Result<()> { + async fn reset(&mut self) -> StoreResult<()> { self.reset_called.store(true, Ordering::SeqCst); if self.fail_reset.load(Ordering::SeqCst) { - bail!("simulated reset failure") + return Err(StoreError::Reset("simulated reset failure".into())); } self.creation_time = Utc::now(); Ok(()) diff --git a/crates/hotfix/src/session/error.rs b/crates/hotfix/src/session/error.rs index 3c1a448..758de1d 100644 --- a/crates/hotfix/src/session/error.rs +++ b/crates/hotfix/src/session/error.rs @@ -1,9 +1,13 @@ +use crate::store::StoreError; use thiserror::Error; #[derive(Debug, Error)] pub enum SessionError { #[error("Schedule configuration is invalid: {0}")] InvalidSchedule(String), + + #[error("store operation failed")] + Store(#[from] StoreError), } pub type Result = std::result::Result; diff --git a/crates/hotfix/src/session_schedule.rs b/crates/hotfix/src/session_schedule.rs index e1ffa08..08a4756 100644 --- a/crates/hotfix/src/session_schedule.rs +++ b/crates/hotfix/src/session_schedule.rs @@ -1043,6 +1043,7 @@ mod tests { SessionError::InvalidSchedule(msg) => { assert!(msg.contains("Weekly sessions cannot have weekdays specified")); } + other => panic!("unexpected error: {other}"), } } diff --git a/crates/hotfix/src/store.rs b/crates/hotfix/src/store.rs index 42dbe1b..640e775 100644 --- a/crates/hotfix/src/store.rs +++ b/crates/hotfix/src/store.rs @@ -3,6 +3,9 @@ //! By default, only the [in_memory] store is included. Further message store implementations, //! such as `mongodb` and `redb` can be enabled through feature flags. +/// Error types for store operations. +pub mod error; + /// An in-memory message store that loses its state on restart. Only use this for testing. pub mod in_memory; @@ -16,7 +19,8 @@ pub mod file; /// A message store using [redb](https://www.redb.org/) for persistence. pub mod redb; -use anyhow::Result; +pub use error::*; + use chrono::DateTime; #[async_trait::async_trait] diff --git a/crates/hotfix/src/store/error.rs b/crates/hotfix/src/store/error.rs new file mode 100644 index 0000000..0f90898 --- /dev/null +++ b/crates/hotfix/src/store/error.rs @@ -0,0 +1,38 @@ +//! Error types for message store operations. + +use thiserror::Error; + +/// A boxed error type for store errors. +pub type BoxError = Box; + +/// Errors that can occur during message store operations. +#[derive(Debug, Error)] +pub enum StoreError { + /// Failed to persist a message to the store. + #[error("failed to persist message (seq_num: {sequence_number})")] + PersistMessage { + sequence_number: u64, + #[source] + source: BoxError, + }, + + /// Failed to retrieve messages from the store. + #[error("failed to retrieve messages (range: {begin}..={end})")] + RetrieveMessages { + begin: usize, + end: usize, + #[source] + source: BoxError, + }, + + /// Failed to update a sequence number. + #[error("failed to update sequence number")] + UpdateSequenceNumber(#[source] BoxError), + + /// Failed to reset the store. + #[error("failed to reset store")] + Reset(#[source] BoxError), +} + +/// A specialized Result type for store operations. +pub type Result = std::result::Result; diff --git a/crates/hotfix/src/store/file.rs b/crates/hotfix/src/store/file.rs index d8c814d..d31d6c1 100644 --- a/crates/hotfix/src/store/file.rs +++ b/crates/hotfix/src/store/file.rs @@ -1,5 +1,5 @@ -use crate::store::MessageStore; -use anyhow::{Context, Result}; +use crate::store::{MessageStore, Result, StoreError}; +use anyhow::Context; use chrono::{DateTime, Utc}; use std::collections::HashMap; use std::fs::{File, OpenOptions}; @@ -33,7 +33,7 @@ pub struct FileStore { } impl FileStore { - pub fn new(directory: impl AsRef, name: &str) -> Result { + pub fn new(directory: impl AsRef, name: &str) -> anyhow::Result { let base_path = directory.as_ref().join(name); std::fs::create_dir_all(directory)?; @@ -84,7 +84,7 @@ impl FileStore { /// Retrieves the session creation time from the session file. /// /// It initialises the session file if it doesn't exist. - fn get_or_create_session_time(base_path: &Path) -> Result> { + fn get_or_create_session_time(base_path: &Path) -> anyhow::Result> { let session_path = base_path.with_extension("session"); let session_time = if session_path.exists() { let content = std::fs::read_to_string(&session_path)?; @@ -101,7 +101,7 @@ impl FileStore { /// Retrieves the sequence numbers from the seqnums file. /// /// It defaults to `(0, 0)` if the file doesn't exist or if it's empty. - fn read_initial_seqnums(base_path: &Path) -> Result<(u64, u64)> { + fn read_initial_seqnums(base_path: &Path) -> anyhow::Result<(u64, u64)> { let seqnums_path = base_path.with_extension("seqnums"); let (sender_seq_number, target_seq_number) = if seqnums_path.exists() { let content = @@ -118,7 +118,7 @@ impl FileStore { Ok((sender_seq_number, target_seq_number)) } - fn parse_seqnums(content: &str) -> Result<(u64, u64)> { + fn parse_seqnums(content: &str) -> anyhow::Result<(u64, u64)> { let parts: Vec<&str> = content.trim().split(':').map(|s| s.trim()).collect(); if parts.len() != 2 { anyhow::bail!("invalid seqnums format"); @@ -132,7 +132,7 @@ impl FileStore { Ok((sender, target)) } - fn load_message_index(header_path: &Path) -> Result> { + fn load_message_index(header_path: &Path) -> anyhow::Result> { let mut index = HashMap::new(); if !header_path.exists() { @@ -161,7 +161,7 @@ impl FileStore { Ok(index) } - fn write_seqnums(&mut self) -> Result<()> { + fn write_seqnums(&mut self) -> std::io::Result<()> { self.seqnums_file.seek(SeekFrom::Start(0))?; self.seqnums_file.set_len(0)?; write!( @@ -176,21 +176,29 @@ impl FileStore { #[async_trait::async_trait] impl MessageStore for FileStore { - async fn add(&mut self, sequence_number: u64, message: &[u8]) -> anyhow::Result<()> { + async fn add(&mut self, sequence_number: u64, message: &[u8]) -> Result<()> { let msg_size = message.len(); let offset = self.current_body_offset; - // write the message itself - self.body_file.write_all(message)?; - self.body_file.flush()?; + let mut persist = || -> std::io::Result<()> { + // write the message itself + self.body_file.write_all(message)?; + self.body_file.flush()?; + + // write the offset to the header file + writeln!( + self.header_file, + "{},{},{}", + sequence_number, offset, msg_size + )?; + self.header_file.flush()?; + Ok(()) + }; - // write the offset to the header file - writeln!( - self.header_file, - "{},{},{}", - sequence_number, offset, msg_size - )?; - self.header_file.flush()?; + persist().map_err(|e| StoreError::PersistMessage { + sequence_number, + source: e.into(), + })?; self.message_index.insert( sequence_number, @@ -205,26 +213,31 @@ impl MessageStore for FileStore { } async fn get_slice(&self, begin: usize, end: usize) -> Result>> { - let mut messages = Vec::with_capacity(end - begin + 1); + let retrieve = || -> std::io::Result>> { + let mut messages = Vec::with_capacity(end - begin + 1); - let body_path = self.base_path.with_extension("body"); - let mut body_file = - File::open(body_path).context("failed to open body file for reading")?; + let body_path = self.base_path.with_extension("body"); + let mut body_file = File::open(body_path)?; - for seq_num in begin..=end { - if let Some(msg_def) = self.message_index.get(&(seq_num as u64)) { - body_file.seek(SeekFrom::Start(msg_def.offset))?; + for seq_num in begin..=end { + if let Some(msg_def) = self.message_index.get(&(seq_num as u64)) { + body_file.seek(SeekFrom::Start(msg_def.offset))?; - let mut buffer = vec![0u8; msg_def.size]; - body_file - .read_exact(&mut buffer) - .context("failed to read message from body file")?; + let mut buffer = vec![0u8; msg_def.size]; + body_file.read_exact(&mut buffer)?; - messages.push(buffer); + messages.push(buffer); + } } - } - Ok(messages) + Ok(messages) + }; + + retrieve().map_err(|e| StoreError::RetrieveMessages { + begin, + end, + source: e.into(), + }) } fn next_sender_seq_number(&self) -> u64 { @@ -237,80 +250,82 @@ impl MessageStore for FileStore { async fn increment_sender_seq_number(&mut self) -> Result<()> { self.sender_seq_number += 1; - self.write_seqnums()?; - - Ok(()) + self.write_seqnums() + .map_err(|e| StoreError::UpdateSequenceNumber(e.into())) } async fn increment_target_seq_number(&mut self) -> Result<()> { self.target_seq_number += 1; - self.write_seqnums()?; - - Ok(()) + self.write_seqnums() + .map_err(|e| StoreError::UpdateSequenceNumber(e.into())) } async fn set_target_seq_number(&mut self, seq_number: u64) -> Result<()> { self.target_seq_number = seq_number; - self.write_seqnums()?; - Ok(()) + self.write_seqnums() + .map_err(|e| StoreError::UpdateSequenceNumber(e.into())) } async fn reset(&mut self) -> Result<()> { - self.body_file.flush()?; - self.header_file.flush()?; - - // remove all files - let body_path = self.base_path.with_extension("body"); - let header_path = self.base_path.with_extension("header"); - let seqnums_path = self.base_path.with_extension("seqnums"); - let session_path = self.base_path.with_extension("session"); + let do_reset = |this: &mut Self| -> std::io::Result<()> { + this.body_file.flush()?; + this.header_file.flush()?; + + // remove all files + let body_path = this.base_path.with_extension("body"); + let header_path = this.base_path.with_extension("header"); + let seqnums_path = this.base_path.with_extension("seqnums"); + let session_path = this.base_path.with_extension("session"); + + if body_path.exists() { + std::fs::remove_file(&body_path)?; + } + if header_path.exists() { + std::fs::remove_file(&header_path)?; + } + if seqnums_path.exists() { + std::fs::remove_file(&seqnums_path)?; + } + if session_path.exists() { + std::fs::remove_file(&session_path)?; + } - if body_path.exists() { - std::fs::remove_file(&body_path)?; - } - if header_path.exists() { - std::fs::remove_file(&header_path)?; - } - if seqnums_path.exists() { - std::fs::remove_file(&seqnums_path)?; - } - if session_path.exists() { - std::fs::remove_file(&session_path)?; - } + // reset in-memory state + this.sender_seq_number = 0; + this.target_seq_number = 0; + this.creation_time = Utc::now(); + this.message_index.clear(); + this.current_body_offset = 0; - // reset in-memory state - self.sender_seq_number = 0; - self.target_seq_number = 0; - self.creation_time = Utc::now(); - self.message_index.clear(); - self.current_body_offset = 0; + // recreate files + let now = Utc::now(); + std::fs::write(&session_path, now.to_rfc3339())?; - // recreate files - let now = Utc::now(); - std::fs::write(&session_path, now.to_rfc3339())?; + let body_file = OpenOptions::new() + .create(true) + .append(true) + .open(&body_path)?; + this.body_file = BufWriter::new(body_file); - let body_file = OpenOptions::new() - .create(true) - .append(true) - .open(&body_path)?; - self.body_file = BufWriter::new(body_file); + let header_file = OpenOptions::new() + .create(true) + .append(true) + .open(&header_path)?; + this.header_file = BufWriter::new(header_file); - let header_file = OpenOptions::new() - .create(true) - .append(true) - .open(&header_path)?; - self.header_file = BufWriter::new(header_file); + this.seqnums_file = OpenOptions::new() + .create(true) + .truncate(true) + .read(true) + .write(true) + .open(&seqnums_path)?; - self.seqnums_file = OpenOptions::new() - .create(true) - .truncate(true) - .read(true) - .write(true) - .open(&seqnums_path)?; + this.creation_time = now; - self.creation_time = now; + Ok(()) + }; - Ok(()) + do_reset(self).map_err(|e| StoreError::Reset(e.into())) } fn creation_time(&self) -> DateTime { diff --git a/crates/hotfix/src/store/in_memory.rs b/crates/hotfix/src/store/in_memory.rs index 9b213e3..5468fe3 100644 --- a/crates/hotfix/src/store/in_memory.rs +++ b/crates/hotfix/src/store/in_memory.rs @@ -1,5 +1,4 @@ -use crate::store::MessageStore; -use anyhow::Result; +use crate::store::{MessageStore, Result}; use chrono::{DateTime, Utc}; use std::collections::HashMap; diff --git a/crates/hotfix/src/store/mongodb.rs b/crates/hotfix/src/store/mongodb.rs index eef8c6e..099f55f 100644 --- a/crates/hotfix/src/store/mongodb.rs +++ b/crates/hotfix/src/store/mongodb.rs @@ -1,4 +1,3 @@ -use anyhow::Result; use async_trait::async_trait; use chrono::{DateTime, Utc}; use futures::TryStreamExt; @@ -12,7 +11,7 @@ use serde::{Deserialize, Serialize}; pub use mongodb::Client; -use crate::store::MessageStore; +use crate::store::{MessageStore, Result, StoreError}; #[derive(Debug, Deserialize, Serialize)] struct SequenceMeta { @@ -38,7 +37,7 @@ pub struct MongoDbMessageStore { } impl MongoDbMessageStore { - pub async fn new(db: Database, collection_name: Option<&str>) -> Result { + pub async fn new(db: Database, collection_name: Option<&str>) -> anyhow::Result { let collection_name = collection_name.unwrap_or("messages"); let meta_collection = db.collection(collection_name); let message_collection = db.collection(collection_name); @@ -54,7 +53,7 @@ impl MongoDbMessageStore { Ok(store) } - async fn ensure_indexes(meta_collection: &Collection) -> Result<()> { + async fn ensure_indexes(meta_collection: &Collection) -> anyhow::Result<()> { let meta_index = IndexModel::builder() .keys(doc! { "meta": 1, "_id": -1 }) .build(); @@ -72,7 +71,7 @@ impl MongoDbMessageStore { async fn get_or_default_sequence( meta_collection: &Collection, - ) -> Result { + ) -> anyhow::Result { let options = FindOneOptions::builder().sort(doc! { "_id": -1 }).build(); let res = meta_collection .find_one(doc! { "meta": true }) @@ -86,7 +85,7 @@ impl MongoDbMessageStore { Ok(meta) } - async fn new_sequence(meta_collection: &Collection) -> Result { + async fn new_sequence(meta_collection: &Collection) -> anyhow::Result { let sequence_id = ObjectId::new(); let initial_meta = SequenceMeta { object_id: sequence_id, @@ -117,7 +116,11 @@ impl MessageStore for MongoDbMessageStore { self.message_collection .replace_one(filter, message) .with_options(options) - .await?; + .await + .map_err(|e| StoreError::PersistMessage { + sequence_number, + source: e.into(), + })?; Ok(()) } @@ -130,10 +133,24 @@ impl MessageStore for MongoDbMessageStore { "$lte": end as u32, } }; - let mut cursor = self.message_collection.find(filter).await?; + let mut cursor = self + .message_collection + .find(filter) + .await + .map_err(|e| StoreError::RetrieveMessages { + begin, + end, + source: e.into(), + })?; let mut messages = Vec::new(); - while let Some(message) = cursor.try_next().await? { + while let Some(message) = cursor.try_next().await.map_err(|e| { + StoreError::RetrieveMessages { + begin, + end, + source: e.into(), + } + })? { messages.push(message.data.bytes); } @@ -155,7 +172,8 @@ impl MessageStore for MongoDbMessageStore { doc! { "_id": self.current_sequence.object_id }, doc! { "$inc": { "sender_seq_number": 1 } }, ) - .await?; + .await + .map_err(|e| StoreError::UpdateSequenceNumber(e.into()))?; Ok(()) } @@ -167,7 +185,8 @@ impl MessageStore for MongoDbMessageStore { doc! { "_id": self.current_sequence.object_id }, doc! { "$inc": { "target_seq_number": 1 } }, ) - .await?; + .await + .map_err(|e| StoreError::UpdateSequenceNumber(e.into()))?; Ok(()) } @@ -179,13 +198,16 @@ impl MessageStore for MongoDbMessageStore { doc! { "_id": self.current_sequence.object_id }, doc! { "$set": { "target_seq_number": seq_number as u32 } }, ) - .await?; + .await + .map_err(|e| StoreError::UpdateSequenceNumber(e.into()))?; Ok(()) } async fn reset(&mut self) -> Result<()> { - self.current_sequence = Self::new_sequence(&self.meta_collection).await?; + self.current_sequence = Self::new_sequence(&self.meta_collection) + .await + .map_err(|e| StoreError::Reset(e.into()))?; Ok(()) } diff --git a/crates/hotfix/src/store/redb.rs b/crates/hotfix/src/store/redb.rs index 62b0230..0242367 100644 --- a/crates/hotfix/src/store/redb.rs +++ b/crates/hotfix/src/store/redb.rs @@ -1,5 +1,4 @@ -use crate::store::MessageStore; -use anyhow::{Result, bail}; +use crate::store::{MessageStore, Result, StoreError}; use chrono::{DateTime, Utc}; use redb::{Database, ReadOnlyTable, ReadableDatabase, TableDefinition, TableError}; use std::path::Path; @@ -22,7 +21,7 @@ pub struct RedbMessageStore { } impl RedbMessageStore { - pub fn new(path: impl AsRef) -> Result { + pub fn new(path: impl AsRef) -> anyhow::Result { let db = Database::create(path)?; let meta = if let Some(stored_metadata) = Self::load_metadata(&db)? { @@ -36,7 +35,7 @@ impl RedbMessageStore { Ok(Self { db, meta }) } - fn persist_default_metadata(db: &Database) -> Result<()> { + fn persist_default_metadata(db: &Database) -> anyhow::Result<()> { let creation_timestamp = Utc::now().timestamp_micros() as u64; let sender_seq_number = 0; let target_seq_number = 0; @@ -55,7 +54,7 @@ impl RedbMessageStore { Ok(()) } - fn load_metadata(db: &Database) -> Result> { + fn load_metadata(db: &Database) -> anyhow::Result> { let read_txn = db.begin_read()?; let metadata = match read_txn.open_table(META_TABLE) { Ok(table) => { @@ -81,19 +80,23 @@ impl RedbMessageStore { Ok(metadata) } - fn read_required_meta_field(table: &ReadOnlyTable<&str, u64>, key: &str) -> Result { + fn read_required_meta_field(table: &ReadOnlyTable<&str, u64>, key: &str) -> anyhow::Result { table .get(key)? .map(|v| v.value()) .ok_or_else(|| anyhow::anyhow!("missing required metadata field: {key}")) } - fn parse_timestamp(timestamp: u64) -> Result> { + fn parse_timestamp(timestamp: u64) -> anyhow::Result> { DateTime::from_timestamp_micros(timestamp as i64) .ok_or_else(|| anyhow::anyhow!("invalid timestamp: {timestamp}")) } - async fn update_sequence_number(&mut self, key: &str, value: u64) -> Result<()> { + async fn update_sequence_number( + &mut self, + key: &str, + value: u64, + ) -> std::result::Result<(), redb::Error> { let write_txn = self.db.begin_write()?; { let mut table = write_txn.open_table(META_TABLE)?; @@ -107,13 +110,20 @@ impl RedbMessageStore { #[async_trait::async_trait] impl MessageStore for RedbMessageStore { async fn add(&mut self, sequence_number: u64, message: &[u8]) -> Result<()> { - let write_txn = self.db.begin_write()?; - { - let mut table = write_txn.open_table(MESSAGES_TABLE)?; - table.insert(sequence_number, message)?; - } - write_txn.commit()?; - Ok(()) + let persist = || -> std::result::Result<(), redb::Error> { + let write_txn = self.db.begin_write()?; + { + let mut table = write_txn.open_table(MESSAGES_TABLE)?; + table.insert(sequence_number, message)?; + } + write_txn.commit()?; + Ok(()) + }; + + persist().map_err(|e| StoreError::PersistMessage { + sequence_number, + source: e.into(), + }) } async fn get_slice(&self, begin: usize, end: usize) -> Result>> { @@ -121,18 +131,26 @@ impl MessageStore for RedbMessageStore { return Ok(vec![]); } - let read_txn = self.db.begin_read()?; - match read_txn.open_table(MESSAGES_TABLE) { - Ok(table) => { - let messages: std::result::Result>, redb::StorageError> = table - .range(begin as u64..=end as u64)? - .map(|m| m.map(|v| v.1.value().to_vec())) - .collect(); - Ok(messages?) + let retrieve = || -> std::result::Result>, redb::Error> { + let read_txn = self.db.begin_read()?; + match read_txn.open_table(MESSAGES_TABLE) { + Ok(table) => { + let messages: std::result::Result>, redb::StorageError> = table + .range(begin as u64..=end as u64)? + .map(|m| m.map(|v| v.1.value().to_vec())) + .collect(); + Ok(messages?) + } + Err(TableError::TableDoesNotExist(_)) => Ok(vec![]), + Err(err) => Err(err.into()), } - Err(TableError::TableDoesNotExist(_)) => Ok(vec![]), - Err(err) => Err(err.into()), - } + }; + + retrieve().map_err(|e| StoreError::RetrieveMessages { + begin, + end, + source: e.into(), + }) } fn next_sender_seq_number(&self) -> u64 { @@ -146,7 +164,8 @@ impl MessageStore for RedbMessageStore { async fn increment_sender_seq_number(&mut self) -> Result<()> { let sender_seq_number = self.meta.sender_seq_number + 1; self.update_sequence_number(SENDER_KEY, sender_seq_number) - .await?; + .await + .map_err(|e| StoreError::UpdateSequenceNumber(e.into()))?; self.meta.sender_seq_number = sender_seq_number; Ok(()) } @@ -157,19 +176,22 @@ impl MessageStore for RedbMessageStore { } async fn set_target_seq_number(&mut self, seq_number: u64) -> Result<()> { - self.update_sequence_number(TARGET_KEY, seq_number).await?; + self.update_sequence_number(TARGET_KEY, seq_number) + .await + .map_err(|e| StoreError::UpdateSequenceNumber(e.into()))?; self.meta.target_seq_number = seq_number; Ok(()) } async fn reset(&mut self) -> Result<()> { - Self::persist_default_metadata(&self.db)?; - if let Some(meta) = Self::load_metadata(&self.db)? { - self.meta = meta; - Ok(()) - } else { - bail!("meta unexpectedly not found") - } + let do_reset = |db: &Database| -> anyhow::Result { + Self::persist_default_metadata(db)?; + Self::load_metadata(db)? + .ok_or_else(|| anyhow::anyhow!("meta unexpectedly not found")) + }; + + self.meta = do_reset(&self.db).map_err(|e| StoreError::Reset(e.into()))?; + Ok(()) } fn creation_time(&self) -> DateTime { From 650869736277f6b265a6017fac89de9ff59f6c0c Mon Sep 17 00:00:00 2001 From: David Steiner Date: Thu, 22 Jan 2026 11:42:36 +0100 Subject: [PATCH 04/11] Remove redb message store as the append-only file one is superior --- Cargo.lock | 10 -- Cargo.toml | 1 - crates/hotfix/Cargo.toml | 2 - crates/hotfix/src/store.rs | 3 - crates/hotfix/src/store/redb.rs | 200 -------------------------- crates/hotfix/tests/store_tests.rs | 45 ------ examples/load-testing/Cargo.toml | 2 +- examples/load-testing/src/main.rs | 6 - examples/simple-new-order/Cargo.toml | 2 +- examples/simple-new-order/src/main.rs | 24 ++-- 10 files changed, 10 insertions(+), 285 deletions(-) delete mode 100644 crates/hotfix/src/store/redb.rs diff --git a/Cargo.lock b/Cargo.lock index 0e62a3d..c628a89 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1290,7 +1290,6 @@ dependencies = [ "hotfix-message", "mongodb", "rcgen", - "redb", "rustls", "rustls-native-certs", "rustls-pemfile", @@ -2650,15 +2649,6 @@ dependencies = [ "yasna", ] -[[package]] -name = "redb" -version = "3.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae323eb086579a3769daa2c753bb96deb95993c534711e0dbe881b5192906a06" -dependencies = [ - "libc", -] - [[package]] name = "redox_syscall" version = "0.5.18" diff --git a/Cargo.toml b/Cargo.toml index ac9f1f6..94a1583 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,7 +41,6 @@ proc-macro2 = "1" quickcheck = "1" quickcheck_macros = "1" quote = "1" -redb = "3.1" reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } roxmltree = "0.21" rust-embed = "8.7" diff --git a/crates/hotfix/Cargo.toml b/crates/hotfix/Cargo.toml index 3241f3f..ba88ac8 100644 --- a/crates/hotfix/Cargo.toml +++ b/crates/hotfix/Cargo.toml @@ -14,7 +14,6 @@ categories.workspace = true [features] default = ["test-utils"] fix44 = ["hotfix-message/fix44"] -redb = ["dep:redb"] mongodb = ["dep:mongodb"] test-utils = [] @@ -31,7 +30,6 @@ chrono-tz = { workspace = true, features = ["serde"] } futures = { workspace = true } mongodb = { workspace = true, optional = true } rustls-pki-types = { workspace = true } -redb = { workspace = true, optional = true } rustls = { workspace = true } rustls-native-certs = { workspace = true } rustls-pemfile = { workspace = true } diff --git a/crates/hotfix/src/store.rs b/crates/hotfix/src/store.rs index 640e775..cba3438 100644 --- a/crates/hotfix/src/store.rs +++ b/crates/hotfix/src/store.rs @@ -15,9 +15,6 @@ pub mod mongodb; /// A file-based message store for persistence. pub mod file; -#[cfg(feature = "redb")] -/// A message store using [redb](https://www.redb.org/) for persistence. -pub mod redb; pub use error::*; diff --git a/crates/hotfix/src/store/redb.rs b/crates/hotfix/src/store/redb.rs deleted file mode 100644 index 0242367..0000000 --- a/crates/hotfix/src/store/redb.rs +++ /dev/null @@ -1,200 +0,0 @@ -use crate::store::{MessageStore, Result, StoreError}; -use chrono::{DateTime, Utc}; -use redb::{Database, ReadOnlyTable, ReadableDatabase, TableDefinition, TableError}; -use std::path::Path; - -const MESSAGES_TABLE: TableDefinition = TableDefinition::new("messages"); -const META_TABLE: TableDefinition<&str, u64> = TableDefinition::new("seq_numbers"); -const SENDER_KEY: &str = "sender"; -const TARGET_KEY: &str = "target"; -const CREATION_TIME_KEY: &str = "creation_time"; - -struct MetaData { - creation_time: DateTime, - sender_seq_number: u64, - target_seq_number: u64, -} - -pub struct RedbMessageStore { - db: Database, - meta: MetaData, -} - -impl RedbMessageStore { - pub fn new(path: impl AsRef) -> anyhow::Result { - let db = Database::create(path)?; - - let meta = if let Some(stored_metadata) = Self::load_metadata(&db)? { - stored_metadata - } else { - Self::persist_default_metadata(&db)?; - Self::load_metadata(&db)? - .ok_or_else(|| anyhow::anyhow!("failed to read metadata after initialization"))? - }; - - Ok(Self { db, meta }) - } - - fn persist_default_metadata(db: &Database) -> anyhow::Result<()> { - let creation_timestamp = Utc::now().timestamp_micros() as u64; - let sender_seq_number = 0; - let target_seq_number = 0; - - // if we have just set the creation time, we need to write it to redb - let write_txn = db.begin_write()?; - { - let mut meta_table = write_txn.open_table(META_TABLE)?; - meta_table.insert(CREATION_TIME_KEY, creation_timestamp)?; - meta_table.insert(SENDER_KEY, sender_seq_number)?; - meta_table.insert(TARGET_KEY, target_seq_number)?; - let mut messages_table = write_txn.open_table(MESSAGES_TABLE)?; - messages_table.extract_if(|_, _| true)?.for_each(drop); - } - write_txn.commit()?; - Ok(()) - } - - fn load_metadata(db: &Database) -> anyhow::Result> { - let read_txn = db.begin_read()?; - let metadata = match read_txn.open_table(META_TABLE) { - Ok(table) => { - let creation_time = Self::parse_timestamp(Self::read_required_meta_field( - &table, - CREATION_TIME_KEY, - )?)?; - let sender_seq_number = Self::read_required_meta_field(&table, SENDER_KEY)?; - let target_seq_number = Self::read_required_meta_field(&table, TARGET_KEY)?; - - Some(MetaData { - creation_time, - sender_seq_number, - target_seq_number, - }) - } - Err(TableError::TableDoesNotExist(_)) => None, - Err(err) => { - return Err(err.into()); - } - }; - - Ok(metadata) - } - - fn read_required_meta_field(table: &ReadOnlyTable<&str, u64>, key: &str) -> anyhow::Result { - table - .get(key)? - .map(|v| v.value()) - .ok_or_else(|| anyhow::anyhow!("missing required metadata field: {key}")) - } - - fn parse_timestamp(timestamp: u64) -> anyhow::Result> { - DateTime::from_timestamp_micros(timestamp as i64) - .ok_or_else(|| anyhow::anyhow!("invalid timestamp: {timestamp}")) - } - - async fn update_sequence_number( - &mut self, - key: &str, - value: u64, - ) -> std::result::Result<(), redb::Error> { - let write_txn = self.db.begin_write()?; - { - let mut table = write_txn.open_table(META_TABLE)?; - table.insert(key, value)?; - } - write_txn.commit()?; - Ok(()) - } -} - -#[async_trait::async_trait] -impl MessageStore for RedbMessageStore { - async fn add(&mut self, sequence_number: u64, message: &[u8]) -> Result<()> { - let persist = || -> std::result::Result<(), redb::Error> { - let write_txn = self.db.begin_write()?; - { - let mut table = write_txn.open_table(MESSAGES_TABLE)?; - table.insert(sequence_number, message)?; - } - write_txn.commit()?; - Ok(()) - }; - - persist().map_err(|e| StoreError::PersistMessage { - sequence_number, - source: e.into(), - }) - } - - async fn get_slice(&self, begin: usize, end: usize) -> Result>> { - if begin > end { - return Ok(vec![]); - } - - let retrieve = || -> std::result::Result>, redb::Error> { - let read_txn = self.db.begin_read()?; - match read_txn.open_table(MESSAGES_TABLE) { - Ok(table) => { - let messages: std::result::Result>, redb::StorageError> = table - .range(begin as u64..=end as u64)? - .map(|m| m.map(|v| v.1.value().to_vec())) - .collect(); - Ok(messages?) - } - Err(TableError::TableDoesNotExist(_)) => Ok(vec![]), - Err(err) => Err(err.into()), - } - }; - - retrieve().map_err(|e| StoreError::RetrieveMessages { - begin, - end, - source: e.into(), - }) - } - - fn next_sender_seq_number(&self) -> u64 { - self.meta.sender_seq_number + 1 - } - - fn next_target_seq_number(&self) -> u64 { - self.meta.target_seq_number + 1 - } - - async fn increment_sender_seq_number(&mut self) -> Result<()> { - let sender_seq_number = self.meta.sender_seq_number + 1; - self.update_sequence_number(SENDER_KEY, sender_seq_number) - .await - .map_err(|e| StoreError::UpdateSequenceNumber(e.into()))?; - self.meta.sender_seq_number = sender_seq_number; - Ok(()) - } - - async fn increment_target_seq_number(&mut self) -> Result<()> { - self.set_target_seq_number(self.meta.target_seq_number + 1) - .await - } - - async fn set_target_seq_number(&mut self, seq_number: u64) -> Result<()> { - self.update_sequence_number(TARGET_KEY, seq_number) - .await - .map_err(|e| StoreError::UpdateSequenceNumber(e.into()))?; - self.meta.target_seq_number = seq_number; - Ok(()) - } - - async fn reset(&mut self) -> Result<()> { - let do_reset = |db: &Database| -> anyhow::Result { - Self::persist_default_metadata(db)?; - Self::load_metadata(db)? - .ok_or_else(|| anyhow::anyhow!("meta unexpectedly not found")) - }; - - self.meta = do_reset(&self.db).map_err(|e| StoreError::Reset(e.into()))?; - Ok(()) - } - - fn creation_time(&self) -> DateTime { - self.meta.creation_time - } -} diff --git a/crates/hotfix/tests/store_tests.rs b/crates/hotfix/tests/store_tests.rs index a137c29..50e3a37 100644 --- a/crates/hotfix/tests/store_tests.rs +++ b/crates/hotfix/tests/store_tests.rs @@ -346,14 +346,6 @@ async fn create_test_store_factories() -> Vec> { Box::new(FileStoreTestFactory::new()) as Box, ]; - // Add redb store factory if the feature is enabled - #[cfg(feature = "redb")] - { - stores.push( - Box::new(redb_test_utils::RedbTestStoreFactory::new()) as Box - ); - } - #[cfg(feature = "mongodb")] { stores.push( @@ -408,43 +400,6 @@ impl Drop for FileStoreTestFactory { } } -#[cfg(feature = "redb")] -mod redb_test_utils { - use super::*; - use hotfix::store::MessageStore; - use hotfix::store::redb::RedbMessageStore; - use std::path::PathBuf; - use std::{env, fs}; - - pub(crate) struct RedbTestStoreFactory { - db_path: PathBuf, - } - - impl RedbTestStoreFactory { - pub(crate) fn new() -> Self { - let mut temp_path = env::temp_dir(); - temp_path.push(format!("redb_test_{}", uuid::Uuid::new_v4())); - temp_path.set_extension("db"); - - Self { db_path: temp_path } - } - } - - #[async_trait::async_trait] - impl TestStoreFactory for RedbTestStoreFactory { - async fn create_store(&self) -> Box { - Box::new(RedbMessageStore::new(&self.db_path).expect("Failed to create store")) - } - } - - impl Drop for RedbTestStoreFactory { - fn drop(&mut self) { - // Clean up the database file when the test store is dropped - let _ = fs::remove_file(&self.db_path); - } - } -} - #[cfg(feature = "mongodb")] mod mongodb_test_utils { use crate::TestStoreFactory; diff --git a/examples/load-testing/Cargo.toml b/examples/load-testing/Cargo.toml index 3dc1964..fb3ac77 100644 --- a/examples/load-testing/Cargo.toml +++ b/examples/load-testing/Cargo.toml @@ -7,7 +7,7 @@ license.workspace = true publish = false [dependencies] -hotfix = { path = "../../crates/hotfix", features = ["fix44", "mongodb", "redb"] } +hotfix = { path = "../../crates/hotfix", features = ["fix44"] } anyhow.workspace = true async-trait.workspace = true diff --git a/examples/load-testing/src/main.rs b/examples/load-testing/src/main.rs index 5686451..61b1d90 100644 --- a/examples/load-testing/src/main.rs +++ b/examples/load-testing/src/main.rs @@ -23,7 +23,6 @@ use crate::messages::{ExecutionReport, NewOrderSingle, OutboundMsg}; enum Database { Memory, File, - Redb, } #[derive(Parser, Debug)] @@ -105,11 +104,6 @@ async fn start_session( .expect("be able to create store"); Initiator::start(session_config, app, store).await } - Database::Redb => { - let store = hotfix::store::redb::RedbMessageStore::new("perf-session.db") - .expect("be able to create store"); - Initiator::start(session_config, app, store).await - } } } diff --git a/examples/simple-new-order/Cargo.toml b/examples/simple-new-order/Cargo.toml index 51e2458..81e7ad5 100644 --- a/examples/simple-new-order/Cargo.toml +++ b/examples/simple-new-order/Cargo.toml @@ -7,7 +7,7 @@ license.workspace = true publish = false [dependencies] -hotfix = { path = "../../crates/hotfix", features = ["fix44", "mongodb", "redb"] } +hotfix = { path = "../../crates/hotfix", features = ["fix44", "mongodb"] } hotfix-web = { path = "../../crates/hotfix-web", features = ["ui"] } anyhow.workspace = true diff --git a/examples/simple-new-order/src/main.rs b/examples/simple-new-order/src/main.rs index e41eb66..7fe194b 100644 --- a/examples/simple-new-order/src/main.rs +++ b/examples/simple-new-order/src/main.rs @@ -8,7 +8,6 @@ use hotfix::field_types::{Date, Timestamp}; use hotfix::fix44; use hotfix::initiator::Initiator; use hotfix::session::SessionHandle; -use hotfix::store::mongodb::Client; use hotfix_web::{RouterConfig, build_router_with_config}; use std::path::Path; use tokio::select; @@ -23,8 +22,8 @@ use crate::messages::{NewOrderSingle, OutboundMsg}; #[derive(ValueEnum, Clone, Debug)] #[clap(rename_all = "lower")] enum Database { - Redb, - Mongodb, + Memory, + File, } #[derive(Parser, Debug)] @@ -66,7 +65,7 @@ async fn main() -> Result<()> { .init(); } - let db_config = args.database.unwrap_or(Database::Redb); + let db_config = args.database.unwrap_or(Database::Memory); let app = TestApplication::default(); let initiator = start_session(&args.config, &db_config, app).await?; @@ -154,20 +153,13 @@ async fn start_session( .context("config must include a session")?; match db_config { - Database::Redb => { - let store = hotfix::store::redb::RedbMessageStore::new("session.db") - .context("failed to create redb store")?; + Database::Memory => { + let store = hotfix::store::in_memory::InMemoryMessageStore::default(); Initiator::start(session_config, app, store).await } - Database::Mongodb => { - let uri = "mongodb://localhost:30001"; - let client = Client::with_uri_str(uri) - .await - .context("failed to create mongodb client")?; - let store = - hotfix::store::mongodb::MongoDbMessageStore::new(client.database("hotfix"), None) - .await - .context("failed to create mongodb store")?; + Database::File => { + let store = hotfix::store::file::FileStore::new("data", "simple-new-order-store") + .context("failed to create file store")?; Initiator::start(session_config, app, store).await } } From c14a10acce3d6629e849edae9c6a979d3a787a9f Mon Sep 17 00:00:00 2001 From: David Steiner Date: Thu, 22 Jan 2026 11:51:06 +0100 Subject: [PATCH 05/11] Clean up file store error handling --- crates/hotfix/src/store/file.rs | 202 +++++++++++++++-------------- crates/hotfix/src/store/mongodb.rs | 31 +++-- 2 files changed, 119 insertions(+), 114 deletions(-) diff --git a/crates/hotfix/src/store/file.rs b/crates/hotfix/src/store/file.rs index d31d6c1..914e0b9 100644 --- a/crates/hotfix/src/store/file.rs +++ b/crates/hotfix/src/store/file.rs @@ -172,33 +172,22 @@ impl FileStore { self.seqnums_file.flush()?; Ok(()) } -} -#[async_trait::async_trait] -impl MessageStore for FileStore { - async fn add(&mut self, sequence_number: u64, message: &[u8]) -> Result<()> { + fn write_message(&mut self, sequence_number: u64, message: &[u8]) -> std::io::Result<()> { let msg_size = message.len(); let offset = self.current_body_offset; - let mut persist = || -> std::io::Result<()> { - // write the message itself - self.body_file.write_all(message)?; - self.body_file.flush()?; - - // write the offset to the header file - writeln!( - self.header_file, - "{},{},{}", - sequence_number, offset, msg_size - )?; - self.header_file.flush()?; - Ok(()) - }; + // write the message itself + self.body_file.write_all(message)?; + self.body_file.flush()?; - persist().map_err(|e| StoreError::PersistMessage { - sequence_number, - source: e.into(), - })?; + // write the offset to the header file + writeln!( + self.header_file, + "{},{},{}", + sequence_number, offset, msg_size + )?; + self.header_file.flush()?; self.message_index.insert( sequence_number, @@ -212,32 +201,102 @@ impl MessageStore for FileStore { Ok(()) } - async fn get_slice(&self, begin: usize, end: usize) -> Result>> { - let retrieve = || -> std::io::Result>> { - let mut messages = Vec::with_capacity(end - begin + 1); + fn perform_reset(&mut self) -> std::io::Result<()> { + self.body_file.flush()?; + self.header_file.flush()?; + + // remove all files + let body_path = self.base_path.with_extension("body"); + let header_path = self.base_path.with_extension("header"); + let seqnums_path = self.base_path.with_extension("seqnums"); + let session_path = self.base_path.with_extension("session"); + + if body_path.exists() { + std::fs::remove_file(&body_path)?; + } + if header_path.exists() { + std::fs::remove_file(&header_path)?; + } + if seqnums_path.exists() { + std::fs::remove_file(&seqnums_path)?; + } + if session_path.exists() { + std::fs::remove_file(&session_path)?; + } + + // reset in-memory state + self.sender_seq_number = 0; + self.target_seq_number = 0; + self.creation_time = Utc::now(); + self.message_index.clear(); + self.current_body_offset = 0; + + // recreate files + let now = Utc::now(); + std::fs::write(&session_path, now.to_rfc3339())?; - let body_path = self.base_path.with_extension("body"); - let mut body_file = File::open(body_path)?; + let body_file = OpenOptions::new() + .create(true) + .append(true) + .open(&body_path)?; + self.body_file = BufWriter::new(body_file); + + let header_file = OpenOptions::new() + .create(true) + .append(true) + .open(&header_path)?; + self.header_file = BufWriter::new(header_file); - for seq_num in begin..=end { - if let Some(msg_def) = self.message_index.get(&(seq_num as u64)) { - body_file.seek(SeekFrom::Start(msg_def.offset))?; + self.seqnums_file = OpenOptions::new() + .create(true) + .truncate(true) + .read(true) + .write(true) + .open(&seqnums_path)?; - let mut buffer = vec![0u8; msg_def.size]; - body_file.read_exact(&mut buffer)?; + self.creation_time = now; - messages.push(buffer); - } + Ok(()) + } + + fn read_messages(&self, begin: usize, end: usize) -> std::io::Result>> { + let mut messages = Vec::with_capacity(end - begin + 1); + + let body_path = self.base_path.with_extension("body"); + let mut body_file = File::open(body_path)?; + + for seq_num in begin..=end { + if let Some(msg_def) = self.message_index.get(&(seq_num as u64)) { + body_file.seek(SeekFrom::Start(msg_def.offset))?; + + let mut buffer = vec![0u8; msg_def.size]; + body_file.read_exact(&mut buffer)?; + + messages.push(buffer); } + } - Ok(messages) - }; + Ok(messages) + } +} - retrieve().map_err(|e| StoreError::RetrieveMessages { - begin, - end, - source: e.into(), - }) +#[async_trait::async_trait] +impl MessageStore for FileStore { + async fn add(&mut self, sequence_number: u64, message: &[u8]) -> Result<()> { + self.write_message(sequence_number, message) + .map_err(|err| StoreError::PersistMessage { + sequence_number, + source: err.into(), + }) + } + + async fn get_slice(&self, begin: usize, end: usize) -> Result>> { + self.read_messages(begin, end) + .map_err(|e| StoreError::RetrieveMessages { + begin, + end, + source: e.into(), + }) } fn next_sender_seq_number(&self) -> u64 { @@ -267,65 +326,8 @@ impl MessageStore for FileStore { } async fn reset(&mut self) -> Result<()> { - let do_reset = |this: &mut Self| -> std::io::Result<()> { - this.body_file.flush()?; - this.header_file.flush()?; - - // remove all files - let body_path = this.base_path.with_extension("body"); - let header_path = this.base_path.with_extension("header"); - let seqnums_path = this.base_path.with_extension("seqnums"); - let session_path = this.base_path.with_extension("session"); - - if body_path.exists() { - std::fs::remove_file(&body_path)?; - } - if header_path.exists() { - std::fs::remove_file(&header_path)?; - } - if seqnums_path.exists() { - std::fs::remove_file(&seqnums_path)?; - } - if session_path.exists() { - std::fs::remove_file(&session_path)?; - } - - // reset in-memory state - this.sender_seq_number = 0; - this.target_seq_number = 0; - this.creation_time = Utc::now(); - this.message_index.clear(); - this.current_body_offset = 0; - - // recreate files - let now = Utc::now(); - std::fs::write(&session_path, now.to_rfc3339())?; - - let body_file = OpenOptions::new() - .create(true) - .append(true) - .open(&body_path)?; - this.body_file = BufWriter::new(body_file); - - let header_file = OpenOptions::new() - .create(true) - .append(true) - .open(&header_path)?; - this.header_file = BufWriter::new(header_file); - - this.seqnums_file = OpenOptions::new() - .create(true) - .truncate(true) - .read(true) - .write(true) - .open(&seqnums_path)?; - - this.creation_time = now; - - Ok(()) - }; - - do_reset(self).map_err(|e| StoreError::Reset(e.into())) + self.perform_reset() + .map_err(|e| StoreError::Reset(e.into())) } fn creation_time(&self) -> DateTime { diff --git a/crates/hotfix/src/store/mongodb.rs b/crates/hotfix/src/store/mongodb.rs index 099f55f..5a9e43d 100644 --- a/crates/hotfix/src/store/mongodb.rs +++ b/crates/hotfix/src/store/mongodb.rs @@ -85,7 +85,9 @@ impl MongoDbMessageStore { Ok(meta) } - async fn new_sequence(meta_collection: &Collection) -> anyhow::Result { + async fn new_sequence( + meta_collection: &Collection, + ) -> anyhow::Result { let sequence_id = ObjectId::new(); let initial_meta = SequenceMeta { object_id: sequence_id, @@ -133,24 +135,25 @@ impl MessageStore for MongoDbMessageStore { "$lte": end as u32, } }; - let mut cursor = self - .message_collection - .find(filter) - .await - .map_err(|e| StoreError::RetrieveMessages { - begin, - end, - source: e.into(), - })?; - - let mut messages = Vec::new(); - while let Some(message) = cursor.try_next().await.map_err(|e| { + let mut cursor = self.message_collection.find(filter).await.map_err(|e| { StoreError::RetrieveMessages { begin, end, source: e.into(), } - })? { + })?; + + let mut messages = Vec::new(); + while let Some(message) = + cursor + .try_next() + .await + .map_err(|e| StoreError::RetrieveMessages { + begin, + end, + source: e.into(), + })? + { messages.push(message.data.bytes); } From 92d3a6d3e9934e29f58e38981648d352ba767858 Mon Sep 17 00:00:00 2001 From: David Steiner Date: Thu, 22 Jan 2026 13:13:04 +0100 Subject: [PATCH 06/11] Robustness improvements for the MongoDB store --- crates/hotfix/src/store/mongodb.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/crates/hotfix/src/store/mongodb.rs b/crates/hotfix/src/store/mongodb.rs index 5a9e43d..22e4ee6 100644 --- a/crates/hotfix/src/store/mongodb.rs +++ b/crates/hotfix/src/store/mongodb.rs @@ -113,7 +113,7 @@ impl MessageStore for MongoDbMessageStore { bytes: message.to_vec(), }, }; - let filter = doc! { "sequence_id": self.current_sequence.object_id, "msg_seq_number": sequence_number as u32 }; + let filter = doc! { "sequence_id": self.current_sequence.object_id, "msg_seq_number": sequence_number as i64 }; let options = ReplaceOptions::builder().upsert(true).build(); self.message_collection .replace_one(filter, message) @@ -131,8 +131,8 @@ impl MessageStore for MongoDbMessageStore { let filter = doc! { "sequence_id": self.current_sequence.object_id, "msg_seq_number": doc! { - "$gte": begin as u32, - "$lte": end as u32, + "$gte": begin as i64, + "$lte": end as i64, } }; let mut cursor = self.message_collection.find(filter).await.map_err(|e| { @@ -169,7 +169,6 @@ impl MessageStore for MongoDbMessageStore { } async fn increment_sender_seq_number(&mut self) -> Result<()> { - self.current_sequence.sender_seq_number += 1; self.meta_collection .update_one( doc! { "_id": self.current_sequence.object_id }, @@ -177,12 +176,12 @@ impl MessageStore for MongoDbMessageStore { ) .await .map_err(|e| StoreError::UpdateSequenceNumber(e.into()))?; + self.current_sequence.sender_seq_number += 1; Ok(()) } async fn increment_target_seq_number(&mut self) -> Result<()> { - self.current_sequence.target_seq_number += 1; self.meta_collection .update_one( doc! { "_id": self.current_sequence.object_id }, @@ -190,19 +189,20 @@ impl MessageStore for MongoDbMessageStore { ) .await .map_err(|e| StoreError::UpdateSequenceNumber(e.into()))?; + self.current_sequence.target_seq_number += 1; Ok(()) } async fn set_target_seq_number(&mut self, seq_number: u64) -> Result<()> { - self.current_sequence.target_seq_number = seq_number; self.meta_collection .update_one( doc! { "_id": self.current_sequence.object_id }, - doc! { "$set": { "target_seq_number": seq_number as u32 } }, + doc! { "$set": { "target_seq_number": seq_number as i64 } }, ) .await .map_err(|e| StoreError::UpdateSequenceNumber(e.into()))?; + self.current_sequence.target_seq_number = seq_number; Ok(()) } From bab9a54fe182a0a140fd0a9b3b5ccee1567bd24c Mon Sep 17 00:00:00 2001 From: David Steiner Date: Thu, 22 Jan 2026 13:17:35 +0100 Subject: [PATCH 07/11] Add dedicated mongodb tests --- crates/hotfix/tests/mongodb_store_tests.rs | 506 +++++++++++++++++++++ 1 file changed, 506 insertions(+) create mode 100644 crates/hotfix/tests/mongodb_store_tests.rs diff --git a/crates/hotfix/tests/mongodb_store_tests.rs b/crates/hotfix/tests/mongodb_store_tests.rs new file mode 100644 index 0000000..d7f6dcf --- /dev/null +++ b/crates/hotfix/tests/mongodb_store_tests.rs @@ -0,0 +1,506 @@ +#![cfg(feature = "mongodb")] + +use hotfix::store::mongodb::{Client, MongoDbMessageStore}; +use hotfix::store::{MessageStore, StoreError}; +use testcontainers::runners::AsyncRunner; +use testcontainers::{ContainerAsync, GenericImage}; +use tokio::sync::OnceCell; + +const MONGO_PORT: u16 = 27017; + +static MONGO_CONTAINER: OnceCell> = OnceCell::const_new(); + +async fn init_container() -> ContainerAsync { + GenericImage::new("mongo", "8.0").start().await.unwrap() +} + +async fn get_mongo_client() -> Client { + let container = MONGO_CONTAINER.get_or_init(init_container).await; + let host = container.get_host().await.unwrap(); + let port = container.get_host_port_ipv4(MONGO_PORT).await.unwrap(); + Client::with_uri_str(format!("mongodb://{host}:{port}")) + .await + .unwrap() +} + +async fn create_test_store(db_name: &str) -> MongoDbMessageStore { + let client = get_mongo_client().await; + let db = client.database(db_name); + let collection_name = format!("test_{}", uuid::Uuid::new_v4()); + MongoDbMessageStore::new(db, Some(&collection_name)) + .await + .unwrap() +} + +// ============================================================================= +// Large Sequence Number Tests +// ============================================================================= + +mod large_sequence_number_tests { + use super::*; + + #[tokio::test] + async fn test_sequence_number_at_u32_max() { + let mut store = create_test_store("test_seq_u32_max").await; + + let seq_num: u64 = u32::MAX as u64; + let message = b"message at u32::MAX"; + + store.add(seq_num, message).await.unwrap(); + + let messages = store + .get_slice(seq_num as usize, seq_num as usize) + .await + .unwrap(); + + assert_eq!(messages.len(), 1); + assert_eq!(messages[0], message); + } + + #[tokio::test] + async fn test_sequence_number_above_u32_max() { + let mut store = create_test_store("test_seq_above_u32_max").await; + + let large_seq_num: u64 = u32::MAX as u64 + 100; + let message = b"message above u32::MAX"; + + store.add(large_seq_num, message).await.unwrap(); + + let messages = store + .get_slice(large_seq_num as usize, large_seq_num as usize) + .await + .unwrap(); + + assert_eq!(messages.len(), 1); + assert_eq!(messages[0], message); + } + + #[tokio::test] + async fn test_get_slice_with_large_bounds() { + let mut store = create_test_store("test_large_bounds").await; + + let base_seq: u64 = u32::MAX as u64 + 1000; + let messages_to_add = vec![ + (base_seq, b"first".as_slice()), + (base_seq + 1, b"second".as_slice()), + (base_seq + 2, b"third".as_slice()), + ]; + + for (seq, msg) in &messages_to_add { + store.add(*seq, msg).await.unwrap(); + } + + let retrieved = store + .get_slice(base_seq as usize, (base_seq + 2) as usize) + .await + .unwrap(); + + assert_eq!(retrieved.len(), 3); + } + + #[tokio::test] + async fn test_set_target_seq_number_above_u32_max() { + let mut store = create_test_store("test_target_seq_large").await; + + let large_seq: u64 = u32::MAX as u64 + 500; + + store.set_target_seq_number(large_seq).await.unwrap(); + + assert_eq!(store.next_target_seq_number(), large_seq + 1); + + // Note: We can't easily verify persistence without tracking the collection name, + // so we just verify the in-memory state was updated correctly + } + + #[tokio::test] + async fn test_no_collision_between_large_and_truncated_values() { + let mut store = create_test_store("test_no_collision").await; + + // These two sequence numbers would collide if truncated to u32 + let small_seq: u64 = 100; + let large_seq: u64 = u32::MAX as u64 + 100 + 1; // Would truncate to 100 if cast to u32 + + let small_msg = b"small sequence message"; + let large_msg = b"large sequence message"; + + store.add(small_seq, small_msg).await.unwrap(); + store.add(large_seq, large_msg).await.unwrap(); + + // Both messages should be retrievable independently + let small_retrieved = store + .get_slice(small_seq as usize, small_seq as usize) + .await + .unwrap(); + let large_retrieved = store + .get_slice(large_seq as usize, large_seq as usize) + .await + .unwrap(); + + assert_eq!(small_retrieved.len(), 1); + assert_eq!(small_retrieved[0], small_msg); + + assert_eq!(large_retrieved.len(), 1); + assert_eq!(large_retrieved[0], large_msg); + } +} + +// ============================================================================= +// Concurrent Access Tests +// ============================================================================= + +mod concurrent_access_tests { + use super::*; + use std::sync::Arc; + use tokio::sync::Mutex; + + #[tokio::test] + async fn test_concurrent_add_same_sequence() { + let client = get_mongo_client().await; + let db = client.database("test_concurrent_add"); + let collection_name = format!("test_{}", uuid::Uuid::new_v4()); + + let mut store1 = MongoDbMessageStore::new(db.clone(), Some(&collection_name)) + .await + .unwrap(); + let mut store2 = MongoDbMessageStore::new(db.clone(), Some(&collection_name)) + .await + .unwrap(); + + let seq_num = 1u64; + let msg1 = b"message from store 1"; + let msg2 = b"message from store 2"; + + // Both stores add a message with the same sequence number + let (result1, result2) = tokio::join!(store1.add(seq_num, msg1), store2.add(seq_num, msg2)); + + // Both should succeed (upsert semantics) + assert!(result1.is_ok()); + assert!(result2.is_ok()); + + // One of the messages should be stored (last write wins) + let messages = store1 + .get_slice(seq_num as usize, seq_num as usize) + .await + .unwrap(); + assert_eq!(messages.len(), 1); + // The message should be one of the two (we can't guarantee which due to race) + assert!(messages[0] == msg1 || messages[0] == msg2); + } + + #[tokio::test] + async fn test_concurrent_increment_sender_seq() { + let client = get_mongo_client().await; + let db = client.database("test_concurrent_inc_sender"); + let collection_name = format!("test_{}", uuid::Uuid::new_v4()); + + let store = Arc::new(Mutex::new( + MongoDbMessageStore::new(db.clone(), Some(&collection_name)) + .await + .unwrap(), + )); + + let num_increments = 50; + let mut handles = Vec::new(); + + for _ in 0..num_increments { + let store_clone = Arc::clone(&store); + handles.push(tokio::spawn(async move { + let mut guard = store_clone.lock().await; + guard.increment_sender_seq_number().await + })); + } + + for handle in handles { + handle.await.unwrap().unwrap(); + } + + let final_store = store.lock().await; + assert_eq!(final_store.next_sender_seq_number(), num_increments + 1); + } + + #[tokio::test] + async fn test_concurrent_increment_target_seq() { + let client = get_mongo_client().await; + let db = client.database("test_concurrent_inc_target"); + let collection_name = format!("test_{}", uuid::Uuid::new_v4()); + + let store = Arc::new(Mutex::new( + MongoDbMessageStore::new(db.clone(), Some(&collection_name)) + .await + .unwrap(), + )); + + let num_increments = 50; + let mut handles = Vec::new(); + + for _ in 0..num_increments { + let store_clone = Arc::clone(&store); + handles.push(tokio::spawn(async move { + let mut guard = store_clone.lock().await; + guard.increment_target_seq_number().await + })); + } + + for handle in handles { + handle.await.unwrap().unwrap(); + } + + let final_store = store.lock().await; + assert_eq!(final_store.next_target_seq_number(), num_increments + 1); + } + + #[tokio::test] + async fn test_two_store_instances_same_collection() { + let client = get_mongo_client().await; + let db = client.database("test_two_instances"); + let collection_name = format!("test_{}", uuid::Uuid::new_v4()); + + let mut store1 = MongoDbMessageStore::new(db.clone(), Some(&collection_name)) + .await + .unwrap(); + + // Store1 adds a message + store1.add(1, b"message from store 1").await.unwrap(); + store1.increment_sender_seq_number().await.unwrap(); + + // Store2 connects to the same collection + let store2 = MongoDbMessageStore::new(db.clone(), Some(&collection_name)) + .await + .unwrap(); + + // Store2 should see the updated sequence number + assert_eq!(store2.next_sender_seq_number(), 2); + + // Store2 should be able to retrieve the message + let messages = store2.get_slice(1, 1).await.unwrap(); + assert_eq!(messages.len(), 1); + assert_eq!(messages[0], b"message from store 1"); + } +} + +// ============================================================================= +// Connection Failure Tests +// ============================================================================= + +mod connection_failure_tests { + use super::*; + + async fn create_dedicated_container_and_store() + -> (ContainerAsync, MongoDbMessageStore) { + let container = GenericImage::new("mongo", "8.0").start().await.unwrap(); + let host = container.get_host().await.unwrap(); + let port = container.get_host_port_ipv4(MONGO_PORT).await.unwrap(); + + let client = Client::with_uri_str(format!("mongodb://{host}:{port}")) + .await + .unwrap(); + let db = client.database("test_conn_failure"); + let store = MongoDbMessageStore::new(db, Some("test")).await.unwrap(); + + (container, store) + } + + #[tokio::test] + async fn test_add_after_connection_drop() { + let (container, mut store) = create_dedicated_container_and_store().await; + + // Verify store works initially + store.add(1, b"initial message").await.unwrap(); + + // Stop the container + container.stop().await.unwrap(); + + // Attempt operation - should fail with appropriate error + let result = store.add(2, b"should fail").await; + + assert!(matches!(result, Err(StoreError::PersistMessage { .. }))); + } + + #[tokio::test] + async fn test_get_slice_after_connection_drop() { + let (container, mut store) = create_dedicated_container_and_store().await; + + // Add a message while connected + store.add(1, b"test message").await.unwrap(); + + // Stop the container + container.stop().await.unwrap(); + + // Attempt retrieval - should fail + let result = store.get_slice(1, 1).await; + + assert!(matches!(result, Err(StoreError::RetrieveMessages { .. }))); + } + + #[tokio::test] + async fn test_increment_after_connection_drop() { + let (container, mut store) = create_dedicated_container_and_store().await; + + // Stop the container + container.stop().await.unwrap(); + + // Attempt increment - should fail + let result = store.increment_sender_seq_number().await; + + assert!(matches!(result, Err(StoreError::UpdateSequenceNumber(_)))); + } + + #[tokio::test] + async fn test_reset_after_connection_drop() { + let (container, mut store) = create_dedicated_container_and_store().await; + + // Stop the container + container.stop().await.unwrap(); + + // Attempt reset - should fail + let result = store.reset().await; + + assert!(matches!(result, Err(StoreError::Reset(_)))); + } + + #[tokio::test] + async fn test_state_preserved_after_failed_increment() { + let (container, mut store) = create_dedicated_container_and_store().await; + + let initial_sender_seq = store.next_sender_seq_number(); + let initial_target_seq = store.next_target_seq_number(); + + // Stop the container + container.stop().await.unwrap(); + + // Attempt increments - should fail + let _ = store.increment_sender_seq_number().await; + let _ = store.increment_target_seq_number().await; + + // State should be unchanged since DB write failed first + assert_eq!(store.next_sender_seq_number(), initial_sender_seq); + assert_eq!(store.next_target_seq_number(), initial_target_seq); + } + + #[tokio::test] + async fn test_state_preserved_after_failed_set_target() { + let (container, mut store) = create_dedicated_container_and_store().await; + + let initial_target_seq = store.next_target_seq_number(); + + // Stop the container + container.stop().await.unwrap(); + + // Attempt set - should fail + let _ = store.set_target_seq_number(100).await; + + // State should be unchanged + assert_eq!(store.next_target_seq_number(), initial_target_seq); + } +} + +// ============================================================================= +// Data Integrity Tests +// ============================================================================= + +mod data_integrity_tests { + use super::*; + + #[tokio::test] + async fn test_store_empty_message() { + let mut store = create_test_store("test_empty_msg").await; + + let empty_message: &[u8] = b""; + + store.add(1, empty_message).await.unwrap(); + + let messages = store.get_slice(1, 1).await.unwrap(); + assert_eq!(messages.len(), 1); + assert_eq!(messages[0], empty_message); + } + + #[tokio::test] + async fn test_store_binary_message() { + let mut store = create_test_store("test_binary_msg").await; + + // Message with null bytes and other binary data + let binary_message: Vec = (0u8..=255).collect(); + + store.add(1, &binary_message).await.unwrap(); + + let messages = store.get_slice(1, 1).await.unwrap(); + assert_eq!(messages.len(), 1); + assert_eq!(messages[0], binary_message); + } + + #[tokio::test] + async fn test_store_large_message() { + let mut store = create_test_store("test_large_msg").await; + + // 1MB message (well under 16MB BSON limit) + let large_message: Vec = vec![0xAB; 1024 * 1024]; + + store.add(1, &large_message).await.unwrap(); + + let messages = store.get_slice(1, 1).await.unwrap(); + assert_eq!(messages.len(), 1); + assert_eq!(messages[0], large_message); + } + + #[tokio::test] + async fn test_sequence_number_zero() { + let mut store = create_test_store("test_seq_zero").await; + + store.add(0, b"message at seq 0").await.unwrap(); + + let messages = store.get_slice(0, 0).await.unwrap(); + assert_eq!(messages.len(), 1); + assert_eq!(messages[0], b"message at seq 0"); + } + + #[tokio::test] + async fn test_overwrite_message() { + let mut store = create_test_store("test_overwrite").await; + + store.add(1, b"original message").await.unwrap(); + store.add(1, b"updated message").await.unwrap(); + + let messages = store.get_slice(1, 1).await.unwrap(); + assert_eq!(messages.len(), 1); + assert_eq!(messages[0], b"updated message"); + } + + #[tokio::test] + async fn test_sparse_sequence_numbers() { + let mut store = create_test_store("test_sparse").await; + + // Add messages with gaps in sequence numbers + store.add(1, b"first").await.unwrap(); + store.add(100, b"hundredth").await.unwrap(); + store.add(1000, b"thousandth").await.unwrap(); + + // Query the full range - should only return existing messages + let messages = store.get_slice(1, 1000).await.unwrap(); + assert_eq!(messages.len(), 3); + } + + #[tokio::test] + async fn test_reset_creates_new_sequence() { + let mut store = create_test_store("test_reset_new_seq").await; + + // Add data and increment sequence numbers + store.add(1, b"pre-reset message").await.unwrap(); + store.increment_sender_seq_number().await.unwrap(); + store.increment_target_seq_number().await.unwrap(); + + let creation_time_before = store.creation_time(); + + // Small delay to ensure different creation time + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + + // Reset + store.reset().await.unwrap(); + + // Sequence numbers should be reset + assert_eq!(store.next_sender_seq_number(), 1); + assert_eq!(store.next_target_seq_number(), 1); + + // Creation time should be updated + assert!(store.creation_time() > creation_time_before); + } +} From bb7ebc5f0204ff42ef0b17ea60ce372165ed08be Mon Sep 17 00:00:00 2001 From: David Steiner Date: Thu, 22 Jan 2026 14:16:02 +0100 Subject: [PATCH 08/11] Add test suite dedicated to file-based storage --- crates/hotfix/src/store/file.rs | 29 +- crates/hotfix/tests/file_store_tests.rs | 602 ++++++++++++++++++++++++ 2 files changed, 617 insertions(+), 14 deletions(-) create mode 100644 crates/hotfix/tests/file_store_tests.rs diff --git a/crates/hotfix/src/store/file.rs b/crates/hotfix/src/store/file.rs index 914e0b9..60bd1f5 100644 --- a/crates/hotfix/src/store/file.rs +++ b/crates/hotfix/src/store/file.rs @@ -161,14 +161,10 @@ impl FileStore { Ok(index) } - fn write_seqnums(&mut self) -> std::io::Result<()> { + fn write_seqnums_with(&mut self, sender: u64, target: u64) -> std::io::Result<()> { self.seqnums_file.seek(SeekFrom::Start(0))?; self.seqnums_file.set_len(0)?; - write!( - self.seqnums_file, - "{:020} : {:020}", - self.sender_seq_number, self.target_seq_number - )?; + write!(self.seqnums_file, "{:020} : {:020}", sender, target)?; self.seqnums_file.flush()?; Ok(()) } @@ -308,21 +304,26 @@ impl MessageStore for FileStore { } async fn increment_sender_seq_number(&mut self) -> Result<()> { - self.sender_seq_number += 1; - self.write_seqnums() - .map_err(|e| StoreError::UpdateSequenceNumber(e.into())) + let new_value = self.sender_seq_number + 1; + self.write_seqnums_with(new_value, self.target_seq_number) + .map_err(|e| StoreError::UpdateSequenceNumber(e.into()))?; + self.sender_seq_number = new_value; + Ok(()) } async fn increment_target_seq_number(&mut self) -> Result<()> { - self.target_seq_number += 1; - self.write_seqnums() - .map_err(|e| StoreError::UpdateSequenceNumber(e.into())) + let new_value = self.target_seq_number + 1; + self.write_seqnums_with(self.sender_seq_number, new_value) + .map_err(|e| StoreError::UpdateSequenceNumber(e.into()))?; + self.target_seq_number = new_value; + Ok(()) } async fn set_target_seq_number(&mut self, seq_number: u64) -> Result<()> { + self.write_seqnums_with(self.sender_seq_number, seq_number) + .map_err(|e| StoreError::UpdateSequenceNumber(e.into()))?; self.target_seq_number = seq_number; - self.write_seqnums() - .map_err(|e| StoreError::UpdateSequenceNumber(e.into())) + Ok(()) } async fn reset(&mut self) -> Result<()> { diff --git a/crates/hotfix/tests/file_store_tests.rs b/crates/hotfix/tests/file_store_tests.rs new file mode 100644 index 0000000..7459bf1 --- /dev/null +++ b/crates/hotfix/tests/file_store_tests.rs @@ -0,0 +1,602 @@ +use hotfix::store::file::FileStore; +use hotfix::store::{MessageStore, StoreError}; +use std::fs; +use tempfile::TempDir; + +fn create_test_store() -> (TempDir, FileStore) { + let dir = TempDir::new().unwrap(); + let store = FileStore::new(dir.path(), "test").unwrap(); + (dir, store) +} + +mod corrupted_file_tests { + use super::*; + + #[tokio::test] + async fn test_corrupted_seqnums_file() { + let dir = TempDir::new().unwrap(); + let base_path = dir.path().join("test"); + + // Create required session file + fs::write( + base_path.with_extension("session"), + chrono::Utc::now().to_rfc3339(), + ) + .unwrap(); + + // Create a corrupted seqnums file (invalid format) + fs::write(base_path.with_extension("seqnums"), "not:valid:format").unwrap(); + + let result = FileStore::new(dir.path(), "test"); + + assert!(result.is_err()); + let err = result.err().unwrap(); + let err_msg = err.to_string(); + assert!( + err_msg.contains("seqnums") || err_msg.contains("parse"), + "Error should mention seqnums parsing: {}", + err_msg + ); + } + + #[tokio::test] + async fn test_corrupted_session_file() { + let dir = TempDir::new().unwrap(); + let base_path = dir.path().join("test"); + + // Create a corrupted session file (invalid datetime) + fs::write(base_path.with_extension("session"), "not-a-valid-datetime").unwrap(); + + let result = FileStore::new(dir.path(), "test"); + + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_corrupted_header_file() { + let dir = TempDir::new().unwrap(); + let base_path = dir.path().join("test"); + + // Create valid session file + fs::write( + base_path.with_extension("session"), + chrono::Utc::now().to_rfc3339(), + ) + .unwrap(); + + // Create body file with some content + fs::write(base_path.with_extension("body"), b"test message content").unwrap(); + + // Create corrupted header file (malformed lines) + // FileStore silently skips malformed lines, so this should succeed + // but the message won't be found + fs::write( + base_path.with_extension("header"), + "invalid_line_without_commas\n1,not_a_number,10\n", + ) + .unwrap(); + + let store = FileStore::new(dir.path(), "test"); + + // Store creation should succeed (malformed lines are skipped) + assert!(store.is_ok()); + + let store = store.unwrap(); + // No messages should be loaded due to malformed headers + let messages = store.get_slice(1, 10).await.unwrap(); + assert_eq!(messages.len(), 0); + } + + #[tokio::test] + async fn test_partial_seqnums_content() { + let dir = TempDir::new().unwrap(); + let base_path = dir.path().join("test"); + + // Create required session file + fs::write( + base_path.with_extension("session"), + chrono::Utc::now().to_rfc3339(), + ) + .unwrap(); + + // Create truncated seqnums file (only one number) + fs::write(base_path.with_extension("seqnums"), "00000000000000000001").unwrap(); + + let result = FileStore::new(dir.path(), "test"); + + assert!(result.is_err()); + let err = result.err().unwrap(); + let err_msg = err.to_string(); + assert!( + err_msg.contains("seqnums") || err_msg.contains("format"), + "Error should mention format issue: {}", + err_msg + ); + } + + #[tokio::test] + async fn test_seqnums_with_negative_like_value() { + let dir = TempDir::new().unwrap(); + let base_path = dir.path().join("test"); + + // Create required session file + fs::write( + base_path.with_extension("session"), + chrono::Utc::now().to_rfc3339(), + ) + .unwrap(); + + // Create seqnums with what looks like a negative number + fs::write(base_path.with_extension("seqnums"), "-1 : 0").unwrap(); + + let result = FileStore::new(dir.path(), "test"); + + // u64 cannot be negative, so this should fail + assert!(result.is_err()); + } +} + +mod file_system_error_tests { + use super::*; + + #[tokio::test] + #[cfg(unix)] + async fn test_readonly_directory() { + use std::os::unix::fs::PermissionsExt; + + let dir = TempDir::new().unwrap(); + let readonly_dir = dir.path().join("readonly"); + fs::create_dir(&readonly_dir).unwrap(); + + // Make directory read-only + let mut perms = fs::metadata(&readonly_dir).unwrap().permissions(); + perms.set_mode(0o444); + fs::set_permissions(&readonly_dir, perms).unwrap(); + + let result = FileStore::new(&readonly_dir, "test"); + + // Should fail because we can't create files + assert!(result.is_err()); + + // Restore permissions for cleanup + let mut perms = fs::metadata(&readonly_dir).unwrap().permissions(); + perms.set_mode(0o755); + fs::set_permissions(&readonly_dir, perms).unwrap(); + } + + #[tokio::test] + async fn test_missing_body_file_on_read() { + let (dir, mut store) = create_test_store(); + + // Add a message + store.add(1, b"test message").await.unwrap(); + + // Delete the body file + let body_path = dir.path().join("test.body"); + fs::remove_file(&body_path).unwrap(); + + // Attempt to read - should fail + let result = store.get_slice(1, 1).await; + + assert!(matches!(result, Err(StoreError::RetrieveMessages { .. }))); + } + + #[tokio::test] + async fn test_directory_not_exists_creates_it() { + let dir = TempDir::new().unwrap(); + let nested_path = dir + .path() + .join("nested") + .join("path") + .join("to") + .join("store"); + + // Directory doesn't exist yet + assert!(!nested_path.exists()); + + // FileStore should create the directory + let result = FileStore::new(&nested_path, "test"); + assert!(result.is_ok()); + + // Directory should now exist + assert!(nested_path.exists()); + } + + #[tokio::test] + async fn test_body_file_truncated() { + let (dir, mut store) = create_test_store(); + + // Add a message + store.add(1, b"test message content").await.unwrap(); + + // Truncate the body file + let body_path = dir.path().join("test.body"); + fs::write(&body_path, b"short").unwrap(); + + // Attempt to read - should fail because we can't read expected bytes + let result = store.get_slice(1, 1).await; + + assert!(matches!(result, Err(StoreError::RetrieveMessages { .. }))); + } +} + +mod data_integrity_tests { + use super::*; + + #[tokio::test] + async fn test_store_empty_message() { + let (_dir, mut store) = create_test_store(); + + let empty_message: &[u8] = b""; + + store.add(1, empty_message).await.unwrap(); + + let messages = store.get_slice(1, 1).await.unwrap(); + assert_eq!(messages.len(), 1); + assert_eq!(messages[0], empty_message); + } + + #[tokio::test] + async fn test_store_binary_message() { + let (_dir, mut store) = create_test_store(); + + // Message with null bytes and all possible byte values + let binary_message: Vec = (0u8..=255).collect(); + + store.add(1, &binary_message).await.unwrap(); + + let messages = store.get_slice(1, 1).await.unwrap(); + assert_eq!(messages.len(), 1); + assert_eq!(messages[0], binary_message); + } + + #[tokio::test] + async fn test_store_large_message() { + let (_dir, mut store) = create_test_store(); + + // 1MB message + let large_message: Vec = vec![0xAB; 1024 * 1024]; + + store.add(1, &large_message).await.unwrap(); + + let messages = store.get_slice(1, 1).await.unwrap(); + assert_eq!(messages.len(), 1); + assert_eq!(messages[0], large_message); + } + + #[tokio::test] + async fn test_overwrite_preserves_latest() { + let (_dir, mut store) = create_test_store(); + + store.add(1, b"original message").await.unwrap(); + store.add(1, b"updated message").await.unwrap(); + + let messages = store.get_slice(1, 1).await.unwrap(); + assert_eq!(messages.len(), 1); + assert_eq!(messages[0], b"updated message"); + } + + #[tokio::test] + async fn test_sparse_sequence_numbers() { + let (_dir, mut store) = create_test_store(); + + // Add messages with gaps in sequence numbers + store.add(1, b"first").await.unwrap(); + store.add(100, b"hundredth").await.unwrap(); + store.add(1000, b"thousandth").await.unwrap(); + + // Query the full range - should only return existing messages + let messages = store.get_slice(1, 1000).await.unwrap(); + assert_eq!(messages.len(), 3); + assert_eq!(messages[0], b"first"); + assert_eq!(messages[1], b"hundredth"); + assert_eq!(messages[2], b"thousandth"); + } + + #[tokio::test] + async fn test_sequence_number_zero() { + let (_dir, mut store) = create_test_store(); + + store.add(0, b"message at seq 0").await.unwrap(); + + let messages = store.get_slice(0, 0).await.unwrap(); + assert_eq!(messages.len(), 1); + assert_eq!(messages[0], b"message at seq 0"); + } + + #[tokio::test] + async fn test_message_with_newlines() { + let (_dir, mut store) = create_test_store(); + + let message_with_newlines = b"line1\nline2\r\nline3\n"; + + store.add(1, message_with_newlines).await.unwrap(); + + let messages = store.get_slice(1, 1).await.unwrap(); + assert_eq!(messages.len(), 1); + assert_eq!(messages[0], message_with_newlines); + } + + #[tokio::test] + async fn test_multiple_messages_in_sequence() { + let (_dir, mut store) = create_test_store(); + + // Add many messages + for i in 1..=100 { + let msg = format!("message {}", i); + store.add(i, msg.as_bytes()).await.unwrap(); + } + + // Retrieve all messages + let messages = store.get_slice(1, 100).await.unwrap(); + assert_eq!(messages.len(), 100); + + // Verify each message + for (i, msg) in messages.iter().enumerate() { + let expected = format!("message {}", i + 1); + assert_eq!(msg, expected.as_bytes()); + } + } +} + +mod state_consistency_tests { + use super::*; + + // Note: Testing I/O failure is challenging without mocking. + // We can test the state is correct after normal operations + // and verify the fix by examining the code structure. + + #[tokio::test] + async fn test_state_matches_after_increment() { + let (dir, mut store) = create_test_store(); + + store.increment_sender_seq_number().await.unwrap(); + store.increment_target_seq_number().await.unwrap(); + + assert_eq!(store.next_sender_seq_number(), 2); + assert_eq!(store.next_target_seq_number(), 2); + + // Verify persistence by creating new store + drop(store); + let store2 = FileStore::new(dir.path(), "test").unwrap(); + + assert_eq!(store2.next_sender_seq_number(), 2); + assert_eq!(store2.next_target_seq_number(), 2); + } + + #[tokio::test] + async fn test_state_matches_after_set_target() { + let (dir, mut store) = create_test_store(); + + store.set_target_seq_number(100).await.unwrap(); + + assert_eq!(store.next_target_seq_number(), 101); + + // Verify persistence + drop(store); + let store2 = FileStore::new(dir.path(), "test").unwrap(); + + assert_eq!(store2.next_target_seq_number(), 101); + } + + #[tokio::test] + async fn test_seqnums_format_preserved() { + let (dir, mut store) = create_test_store(); + + store.increment_sender_seq_number().await.unwrap(); + store.set_target_seq_number(42).await.unwrap(); + drop(store); + + // Read the raw seqnums file and verify format + let seqnums_content = fs::read_to_string(dir.path().join("test.seqnums")).unwrap(); + + // Should be in format: "00000000000000000001 : 00000000000000000042" + assert!(seqnums_content.contains(':')); + let parts: Vec<&str> = seqnums_content.split(':').collect(); + assert_eq!(parts.len(), 2); + assert_eq!(parts[0].trim().parse::().unwrap(), 1); + assert_eq!(parts[1].trim().parse::().unwrap(), 42); + } + + #[tokio::test] + async fn test_header_index_rebuilt_on_reload() { + let (dir, mut store) = create_test_store(); + + // Add messages + store.add(1, b"message 1").await.unwrap(); + store.add(5, b"message 5").await.unwrap(); + store.add(10, b"message 10").await.unwrap(); + + drop(store); + + // Reload store - index should be rebuilt from header file + let store2 = FileStore::new(dir.path(), "test").unwrap(); + + let messages = store2.get_slice(1, 10).await.unwrap(); + assert_eq!(messages.len(), 3); + assert_eq!(messages[0], b"message 1"); + assert_eq!(messages[1], b"message 5"); + assert_eq!(messages[2], b"message 10"); + } +} + +mod concurrent_access_tests { + use super::*; + + #[tokio::test] + async fn test_two_store_instances_same_files() { + let dir = TempDir::new().unwrap(); + + // Create first store and add data + let mut store1 = FileStore::new(dir.path(), "test").unwrap(); + store1.add(1, b"message from store 1").await.unwrap(); + store1.increment_sender_seq_number().await.unwrap(); + + // Create second store pointing to same files + // Note: FileStore reads state at construction time, so it should see + // the current persisted state + let store2 = FileStore::new(dir.path(), "test").unwrap(); + + // Store2 should see the persisted sequence number + assert_eq!(store2.next_sender_seq_number(), 2); + + // Store2 should be able to read the message + let messages = store2.get_slice(1, 1).await.unwrap(); + assert_eq!(messages.len(), 1); + assert_eq!(messages[0], b"message from store 1"); + } + + #[tokio::test] + async fn test_store_instance_isolation() { + let dir = TempDir::new().unwrap(); + + // Create two store instances + let mut store1 = FileStore::new(dir.path(), "test").unwrap(); + let mut store2 = FileStore::new(dir.path(), "test").unwrap(); + + // Both start at the same sequence number + assert_eq!(store1.next_sender_seq_number(), 1); + assert_eq!(store2.next_sender_seq_number(), 1); + + // Store1 increments + store1.increment_sender_seq_number().await.unwrap(); + + // Store1 sees the change + assert_eq!(store1.next_sender_seq_number(), 2); + + // Store2 still has its cached state (no automatic refresh) + assert_eq!(store2.next_sender_seq_number(), 1); + + // However, if store2 also writes, it will overwrite with its state + store2.increment_sender_seq_number().await.unwrap(); + assert_eq!(store2.next_sender_seq_number(), 2); + + // Now if we create a new store, it reads what was last written (store2's state) + let store3 = FileStore::new(dir.path(), "test").unwrap(); + assert_eq!(store3.next_sender_seq_number(), 2); + } + + #[tokio::test] + async fn test_reset_affects_all_future_instances() { + let dir = TempDir::new().unwrap(); + + // Create store and add data + let mut store1 = FileStore::new(dir.path(), "test").unwrap(); + store1.add(1, b"will be deleted").await.unwrap(); + store1.increment_sender_seq_number().await.unwrap(); + store1.increment_sender_seq_number().await.unwrap(); + + // Reset + store1.reset().await.unwrap(); + + // New instance should see reset state + let store2 = FileStore::new(dir.path(), "test").unwrap(); + assert_eq!(store2.next_sender_seq_number(), 1); + assert_eq!(store2.next_target_seq_number(), 1); + + let messages = store2.get_slice(1, 100).await.unwrap(); + assert_eq!(messages.len(), 0); + } +} + +mod large_sequence_number_tests { + use super::*; + + #[tokio::test] + async fn test_sequence_number_at_u32_max() { + let (_dir, mut store) = create_test_store(); + + let seq_num: u64 = u32::MAX as u64; + let message = b"message at u32::MAX"; + + store.add(seq_num, message).await.unwrap(); + + let messages = store + .get_slice(seq_num as usize, seq_num as usize) + .await + .unwrap(); + + assert_eq!(messages.len(), 1); + assert_eq!(messages[0], message); + } + + #[tokio::test] + async fn test_set_target_seq_number_large() { + let (dir, mut store) = create_test_store(); + + let large_seq: u64 = u32::MAX as u64 + 500; + + store.set_target_seq_number(large_seq).await.unwrap(); + + assert_eq!(store.next_target_seq_number(), large_seq + 1); + + // Verify persistence + drop(store); + let store2 = FileStore::new(dir.path(), "test").unwrap(); + assert_eq!(store2.next_target_seq_number(), large_seq + 1); + } +} + +mod reset_tests { + use super::*; + + #[tokio::test] + async fn test_reset_clears_all_messages() { + let (_dir, mut store) = create_test_store(); + + // Add multiple messages + for i in 1..=10 { + store + .add(i, format!("message {}", i).as_bytes()) + .await + .unwrap(); + } + + let messages = store.get_slice(1, 10).await.unwrap(); + assert_eq!(messages.len(), 10); + + // Reset + store.reset().await.unwrap(); + + // All messages should be gone + let messages = store.get_slice(1, 10).await.unwrap(); + assert_eq!(messages.len(), 0); + } + + #[tokio::test] + async fn test_reset_updates_creation_time() { + let (_dir, mut store) = create_test_store(); + + let original_time = store.creation_time(); + + // Small delay to ensure different time + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + + store.reset().await.unwrap(); + + // Creation time should be updated + assert!(store.creation_time() > original_time); + } + + #[tokio::test] + async fn test_store_works_after_reset() { + let (_dir, mut store) = create_test_store(); + + // Add data + store.add(1, b"before reset").await.unwrap(); + store.increment_sender_seq_number().await.unwrap(); + + // Reset + store.reset().await.unwrap(); + + // Store should work normally after reset + assert_eq!(store.next_sender_seq_number(), 1); + + store.add(1, b"after reset").await.unwrap(); + let messages = store.get_slice(1, 1).await.unwrap(); + assert_eq!(messages[0], b"after reset"); + + store.increment_sender_seq_number().await.unwrap(); + assert_eq!(store.next_sender_seq_number(), 2); + } +} From ed0eb2ddd99b2a063e96e54802f09ca545f3096f Mon Sep 17 00:00:00 2001 From: David Steiner Date: Thu, 22 Jan 2026 14:32:17 +0100 Subject: [PATCH 09/11] Remove duplication in store test suites --- .../{store_tests.rs => common_store_tests.rs} | 0 crates/hotfix/tests/file_store_tests.rs | 261 ------------------ crates/hotfix/tests/mongodb_store_tests.rs | 231 ---------------- 3 files changed, 492 deletions(-) rename crates/hotfix/tests/{store_tests.rs => common_store_tests.rs} (100%) diff --git a/crates/hotfix/tests/store_tests.rs b/crates/hotfix/tests/common_store_tests.rs similarity index 100% rename from crates/hotfix/tests/store_tests.rs rename to crates/hotfix/tests/common_store_tests.rs diff --git a/crates/hotfix/tests/file_store_tests.rs b/crates/hotfix/tests/file_store_tests.rs index 7459bf1..cb12008 100644 --- a/crates/hotfix/tests/file_store_tests.rs +++ b/crates/hotfix/tests/file_store_tests.rs @@ -113,27 +113,6 @@ mod corrupted_file_tests { err_msg ); } - - #[tokio::test] - async fn test_seqnums_with_negative_like_value() { - let dir = TempDir::new().unwrap(); - let base_path = dir.path().join("test"); - - // Create required session file - fs::write( - base_path.with_extension("session"), - chrono::Utc::now().to_rfc3339(), - ) - .unwrap(); - - // Create seqnums with what looks like a negative number - fs::write(base_path.with_extension("seqnums"), "-1 : 0").unwrap(); - - let result = FileStore::new(dir.path(), "test"); - - // u64 cannot be negative, so this should fail - assert!(result.is_err()); - } } mod file_system_error_tests { @@ -264,18 +243,6 @@ mod data_integrity_tests { assert_eq!(messages[0], large_message); } - #[tokio::test] - async fn test_overwrite_preserves_latest() { - let (_dir, mut store) = create_test_store(); - - store.add(1, b"original message").await.unwrap(); - store.add(1, b"updated message").await.unwrap(); - - let messages = store.get_slice(1, 1).await.unwrap(); - assert_eq!(messages.len(), 1); - assert_eq!(messages[0], b"updated message"); - } - #[tokio::test] async fn test_sparse_sequence_numbers() { let (_dir, mut store) = create_test_store(); @@ -292,132 +259,6 @@ mod data_integrity_tests { assert_eq!(messages[1], b"hundredth"); assert_eq!(messages[2], b"thousandth"); } - - #[tokio::test] - async fn test_sequence_number_zero() { - let (_dir, mut store) = create_test_store(); - - store.add(0, b"message at seq 0").await.unwrap(); - - let messages = store.get_slice(0, 0).await.unwrap(); - assert_eq!(messages.len(), 1); - assert_eq!(messages[0], b"message at seq 0"); - } - - #[tokio::test] - async fn test_message_with_newlines() { - let (_dir, mut store) = create_test_store(); - - let message_with_newlines = b"line1\nline2\r\nline3\n"; - - store.add(1, message_with_newlines).await.unwrap(); - - let messages = store.get_slice(1, 1).await.unwrap(); - assert_eq!(messages.len(), 1); - assert_eq!(messages[0], message_with_newlines); - } - - #[tokio::test] - async fn test_multiple_messages_in_sequence() { - let (_dir, mut store) = create_test_store(); - - // Add many messages - for i in 1..=100 { - let msg = format!("message {}", i); - store.add(i, msg.as_bytes()).await.unwrap(); - } - - // Retrieve all messages - let messages = store.get_slice(1, 100).await.unwrap(); - assert_eq!(messages.len(), 100); - - // Verify each message - for (i, msg) in messages.iter().enumerate() { - let expected = format!("message {}", i + 1); - assert_eq!(msg, expected.as_bytes()); - } - } -} - -mod state_consistency_tests { - use super::*; - - // Note: Testing I/O failure is challenging without mocking. - // We can test the state is correct after normal operations - // and verify the fix by examining the code structure. - - #[tokio::test] - async fn test_state_matches_after_increment() { - let (dir, mut store) = create_test_store(); - - store.increment_sender_seq_number().await.unwrap(); - store.increment_target_seq_number().await.unwrap(); - - assert_eq!(store.next_sender_seq_number(), 2); - assert_eq!(store.next_target_seq_number(), 2); - - // Verify persistence by creating new store - drop(store); - let store2 = FileStore::new(dir.path(), "test").unwrap(); - - assert_eq!(store2.next_sender_seq_number(), 2); - assert_eq!(store2.next_target_seq_number(), 2); - } - - #[tokio::test] - async fn test_state_matches_after_set_target() { - let (dir, mut store) = create_test_store(); - - store.set_target_seq_number(100).await.unwrap(); - - assert_eq!(store.next_target_seq_number(), 101); - - // Verify persistence - drop(store); - let store2 = FileStore::new(dir.path(), "test").unwrap(); - - assert_eq!(store2.next_target_seq_number(), 101); - } - - #[tokio::test] - async fn test_seqnums_format_preserved() { - let (dir, mut store) = create_test_store(); - - store.increment_sender_seq_number().await.unwrap(); - store.set_target_seq_number(42).await.unwrap(); - drop(store); - - // Read the raw seqnums file and verify format - let seqnums_content = fs::read_to_string(dir.path().join("test.seqnums")).unwrap(); - - // Should be in format: "00000000000000000001 : 00000000000000000042" - assert!(seqnums_content.contains(':')); - let parts: Vec<&str> = seqnums_content.split(':').collect(); - assert_eq!(parts.len(), 2); - assert_eq!(parts[0].trim().parse::().unwrap(), 1); - assert_eq!(parts[1].trim().parse::().unwrap(), 42); - } - - #[tokio::test] - async fn test_header_index_rebuilt_on_reload() { - let (dir, mut store) = create_test_store(); - - // Add messages - store.add(1, b"message 1").await.unwrap(); - store.add(5, b"message 5").await.unwrap(); - store.add(10, b"message 10").await.unwrap(); - - drop(store); - - // Reload store - index should be rebuilt from header file - let store2 = FileStore::new(dir.path(), "test").unwrap(); - - let messages = store2.get_slice(1, 10).await.unwrap(); - assert_eq!(messages.len(), 3); - assert_eq!(messages[0], b"message 1"); - assert_eq!(messages[1], b"message 5"); - assert_eq!(messages[2], b"message 10"); - } } mod concurrent_access_tests { @@ -498,105 +339,3 @@ mod concurrent_access_tests { assert_eq!(messages.len(), 0); } } - -mod large_sequence_number_tests { - use super::*; - - #[tokio::test] - async fn test_sequence_number_at_u32_max() { - let (_dir, mut store) = create_test_store(); - - let seq_num: u64 = u32::MAX as u64; - let message = b"message at u32::MAX"; - - store.add(seq_num, message).await.unwrap(); - - let messages = store - .get_slice(seq_num as usize, seq_num as usize) - .await - .unwrap(); - - assert_eq!(messages.len(), 1); - assert_eq!(messages[0], message); - } - - #[tokio::test] - async fn test_set_target_seq_number_large() { - let (dir, mut store) = create_test_store(); - - let large_seq: u64 = u32::MAX as u64 + 500; - - store.set_target_seq_number(large_seq).await.unwrap(); - - assert_eq!(store.next_target_seq_number(), large_seq + 1); - - // Verify persistence - drop(store); - let store2 = FileStore::new(dir.path(), "test").unwrap(); - assert_eq!(store2.next_target_seq_number(), large_seq + 1); - } -} - -mod reset_tests { - use super::*; - - #[tokio::test] - async fn test_reset_clears_all_messages() { - let (_dir, mut store) = create_test_store(); - - // Add multiple messages - for i in 1..=10 { - store - .add(i, format!("message {}", i).as_bytes()) - .await - .unwrap(); - } - - let messages = store.get_slice(1, 10).await.unwrap(); - assert_eq!(messages.len(), 10); - - // Reset - store.reset().await.unwrap(); - - // All messages should be gone - let messages = store.get_slice(1, 10).await.unwrap(); - assert_eq!(messages.len(), 0); - } - - #[tokio::test] - async fn test_reset_updates_creation_time() { - let (_dir, mut store) = create_test_store(); - - let original_time = store.creation_time(); - - // Small delay to ensure different time - tokio::time::sleep(std::time::Duration::from_millis(10)).await; - - store.reset().await.unwrap(); - - // Creation time should be updated - assert!(store.creation_time() > original_time); - } - - #[tokio::test] - async fn test_store_works_after_reset() { - let (_dir, mut store) = create_test_store(); - - // Add data - store.add(1, b"before reset").await.unwrap(); - store.increment_sender_seq_number().await.unwrap(); - - // Reset - store.reset().await.unwrap(); - - // Store should work normally after reset - assert_eq!(store.next_sender_seq_number(), 1); - - store.add(1, b"after reset").await.unwrap(); - let messages = store.get_slice(1, 1).await.unwrap(); - assert_eq!(messages[0], b"after reset"); - - store.increment_sender_seq_number().await.unwrap(); - assert_eq!(store.next_sender_seq_number(), 2); - } -} diff --git a/crates/hotfix/tests/mongodb_store_tests.rs b/crates/hotfix/tests/mongodb_store_tests.rs index d7f6dcf..15f90cb 100644 --- a/crates/hotfix/tests/mongodb_store_tests.rs +++ b/crates/hotfix/tests/mongodb_store_tests.rs @@ -32,122 +32,6 @@ async fn create_test_store(db_name: &str) -> MongoDbMessageStore { .unwrap() } -// ============================================================================= -// Large Sequence Number Tests -// ============================================================================= - -mod large_sequence_number_tests { - use super::*; - - #[tokio::test] - async fn test_sequence_number_at_u32_max() { - let mut store = create_test_store("test_seq_u32_max").await; - - let seq_num: u64 = u32::MAX as u64; - let message = b"message at u32::MAX"; - - store.add(seq_num, message).await.unwrap(); - - let messages = store - .get_slice(seq_num as usize, seq_num as usize) - .await - .unwrap(); - - assert_eq!(messages.len(), 1); - assert_eq!(messages[0], message); - } - - #[tokio::test] - async fn test_sequence_number_above_u32_max() { - let mut store = create_test_store("test_seq_above_u32_max").await; - - let large_seq_num: u64 = u32::MAX as u64 + 100; - let message = b"message above u32::MAX"; - - store.add(large_seq_num, message).await.unwrap(); - - let messages = store - .get_slice(large_seq_num as usize, large_seq_num as usize) - .await - .unwrap(); - - assert_eq!(messages.len(), 1); - assert_eq!(messages[0], message); - } - - #[tokio::test] - async fn test_get_slice_with_large_bounds() { - let mut store = create_test_store("test_large_bounds").await; - - let base_seq: u64 = u32::MAX as u64 + 1000; - let messages_to_add = vec![ - (base_seq, b"first".as_slice()), - (base_seq + 1, b"second".as_slice()), - (base_seq + 2, b"third".as_slice()), - ]; - - for (seq, msg) in &messages_to_add { - store.add(*seq, msg).await.unwrap(); - } - - let retrieved = store - .get_slice(base_seq as usize, (base_seq + 2) as usize) - .await - .unwrap(); - - assert_eq!(retrieved.len(), 3); - } - - #[tokio::test] - async fn test_set_target_seq_number_above_u32_max() { - let mut store = create_test_store("test_target_seq_large").await; - - let large_seq: u64 = u32::MAX as u64 + 500; - - store.set_target_seq_number(large_seq).await.unwrap(); - - assert_eq!(store.next_target_seq_number(), large_seq + 1); - - // Note: We can't easily verify persistence without tracking the collection name, - // so we just verify the in-memory state was updated correctly - } - - #[tokio::test] - async fn test_no_collision_between_large_and_truncated_values() { - let mut store = create_test_store("test_no_collision").await; - - // These two sequence numbers would collide if truncated to u32 - let small_seq: u64 = 100; - let large_seq: u64 = u32::MAX as u64 + 100 + 1; // Would truncate to 100 if cast to u32 - - let small_msg = b"small sequence message"; - let large_msg = b"large sequence message"; - - store.add(small_seq, small_msg).await.unwrap(); - store.add(large_seq, large_msg).await.unwrap(); - - // Both messages should be retrievable independently - let small_retrieved = store - .get_slice(small_seq as usize, small_seq as usize) - .await - .unwrap(); - let large_retrieved = store - .get_slice(large_seq as usize, large_seq as usize) - .await - .unwrap(); - - assert_eq!(small_retrieved.len(), 1); - assert_eq!(small_retrieved[0], small_msg); - - assert_eq!(large_retrieved.len(), 1); - assert_eq!(large_retrieved[0], large_msg); - } -} - -// ============================================================================= -// Concurrent Access Tests -// ============================================================================= - mod concurrent_access_tests { use super::*; use std::sync::Arc; @@ -278,10 +162,6 @@ mod concurrent_access_tests { } } -// ============================================================================= -// Connection Failure Tests -// ============================================================================= - mod connection_failure_tests { use super::*; @@ -393,114 +273,3 @@ mod connection_failure_tests { assert_eq!(store.next_target_seq_number(), initial_target_seq); } } - -// ============================================================================= -// Data Integrity Tests -// ============================================================================= - -mod data_integrity_tests { - use super::*; - - #[tokio::test] - async fn test_store_empty_message() { - let mut store = create_test_store("test_empty_msg").await; - - let empty_message: &[u8] = b""; - - store.add(1, empty_message).await.unwrap(); - - let messages = store.get_slice(1, 1).await.unwrap(); - assert_eq!(messages.len(), 1); - assert_eq!(messages[0], empty_message); - } - - #[tokio::test] - async fn test_store_binary_message() { - let mut store = create_test_store("test_binary_msg").await; - - // Message with null bytes and other binary data - let binary_message: Vec = (0u8..=255).collect(); - - store.add(1, &binary_message).await.unwrap(); - - let messages = store.get_slice(1, 1).await.unwrap(); - assert_eq!(messages.len(), 1); - assert_eq!(messages[0], binary_message); - } - - #[tokio::test] - async fn test_store_large_message() { - let mut store = create_test_store("test_large_msg").await; - - // 1MB message (well under 16MB BSON limit) - let large_message: Vec = vec![0xAB; 1024 * 1024]; - - store.add(1, &large_message).await.unwrap(); - - let messages = store.get_slice(1, 1).await.unwrap(); - assert_eq!(messages.len(), 1); - assert_eq!(messages[0], large_message); - } - - #[tokio::test] - async fn test_sequence_number_zero() { - let mut store = create_test_store("test_seq_zero").await; - - store.add(0, b"message at seq 0").await.unwrap(); - - let messages = store.get_slice(0, 0).await.unwrap(); - assert_eq!(messages.len(), 1); - assert_eq!(messages[0], b"message at seq 0"); - } - - #[tokio::test] - async fn test_overwrite_message() { - let mut store = create_test_store("test_overwrite").await; - - store.add(1, b"original message").await.unwrap(); - store.add(1, b"updated message").await.unwrap(); - - let messages = store.get_slice(1, 1).await.unwrap(); - assert_eq!(messages.len(), 1); - assert_eq!(messages[0], b"updated message"); - } - - #[tokio::test] - async fn test_sparse_sequence_numbers() { - let mut store = create_test_store("test_sparse").await; - - // Add messages with gaps in sequence numbers - store.add(1, b"first").await.unwrap(); - store.add(100, b"hundredth").await.unwrap(); - store.add(1000, b"thousandth").await.unwrap(); - - // Query the full range - should only return existing messages - let messages = store.get_slice(1, 1000).await.unwrap(); - assert_eq!(messages.len(), 3); - } - - #[tokio::test] - async fn test_reset_creates_new_sequence() { - let mut store = create_test_store("test_reset_new_seq").await; - - // Add data and increment sequence numbers - store.add(1, b"pre-reset message").await.unwrap(); - store.increment_sender_seq_number().await.unwrap(); - store.increment_target_seq_number().await.unwrap(); - - let creation_time_before = store.creation_time(); - - // Small delay to ensure different creation time - tokio::time::sleep(std::time::Duration::from_millis(10)).await; - - // Reset - store.reset().await.unwrap(); - - // Sequence numbers should be reset - assert_eq!(store.next_sender_seq_number(), 1); - assert_eq!(store.next_target_seq_number(), 1); - - // Creation time should be updated - assert!(store.creation_time() > creation_time_before); - } -} From 776dfc6242a166132f1a7fb67a15287321c31c18 Mon Sep 17 00:00:00 2001 From: David Steiner Date: Thu, 22 Jan 2026 14:38:20 +0100 Subject: [PATCH 10/11] Trim store test suites to focus on meaningful cases --- crates/hotfix/tests/file_store_tests.rs | 141 ---------- crates/hotfix/tests/mongodb_store_tests.rs | 303 +++++---------------- 2 files changed, 72 insertions(+), 372 deletions(-) diff --git a/crates/hotfix/tests/file_store_tests.rs b/crates/hotfix/tests/file_store_tests.rs index cb12008..cc74978 100644 --- a/crates/hotfix/tests/file_store_tests.rs +++ b/crates/hotfix/tests/file_store_tests.rs @@ -198,144 +198,3 @@ mod file_system_error_tests { assert!(matches!(result, Err(StoreError::RetrieveMessages { .. }))); } } - -mod data_integrity_tests { - use super::*; - - #[tokio::test] - async fn test_store_empty_message() { - let (_dir, mut store) = create_test_store(); - - let empty_message: &[u8] = b""; - - store.add(1, empty_message).await.unwrap(); - - let messages = store.get_slice(1, 1).await.unwrap(); - assert_eq!(messages.len(), 1); - assert_eq!(messages[0], empty_message); - } - - #[tokio::test] - async fn test_store_binary_message() { - let (_dir, mut store) = create_test_store(); - - // Message with null bytes and all possible byte values - let binary_message: Vec = (0u8..=255).collect(); - - store.add(1, &binary_message).await.unwrap(); - - let messages = store.get_slice(1, 1).await.unwrap(); - assert_eq!(messages.len(), 1); - assert_eq!(messages[0], binary_message); - } - - #[tokio::test] - async fn test_store_large_message() { - let (_dir, mut store) = create_test_store(); - - // 1MB message - let large_message: Vec = vec![0xAB; 1024 * 1024]; - - store.add(1, &large_message).await.unwrap(); - - let messages = store.get_slice(1, 1).await.unwrap(); - assert_eq!(messages.len(), 1); - assert_eq!(messages[0], large_message); - } - - #[tokio::test] - async fn test_sparse_sequence_numbers() { - let (_dir, mut store) = create_test_store(); - - // Add messages with gaps in sequence numbers - store.add(1, b"first").await.unwrap(); - store.add(100, b"hundredth").await.unwrap(); - store.add(1000, b"thousandth").await.unwrap(); - - // Query the full range - should only return existing messages - let messages = store.get_slice(1, 1000).await.unwrap(); - assert_eq!(messages.len(), 3); - assert_eq!(messages[0], b"first"); - assert_eq!(messages[1], b"hundredth"); - assert_eq!(messages[2], b"thousandth"); - } -} - -mod concurrent_access_tests { - use super::*; - - #[tokio::test] - async fn test_two_store_instances_same_files() { - let dir = TempDir::new().unwrap(); - - // Create first store and add data - let mut store1 = FileStore::new(dir.path(), "test").unwrap(); - store1.add(1, b"message from store 1").await.unwrap(); - store1.increment_sender_seq_number().await.unwrap(); - - // Create second store pointing to same files - // Note: FileStore reads state at construction time, so it should see - // the current persisted state - let store2 = FileStore::new(dir.path(), "test").unwrap(); - - // Store2 should see the persisted sequence number - assert_eq!(store2.next_sender_seq_number(), 2); - - // Store2 should be able to read the message - let messages = store2.get_slice(1, 1).await.unwrap(); - assert_eq!(messages.len(), 1); - assert_eq!(messages[0], b"message from store 1"); - } - - #[tokio::test] - async fn test_store_instance_isolation() { - let dir = TempDir::new().unwrap(); - - // Create two store instances - let mut store1 = FileStore::new(dir.path(), "test").unwrap(); - let mut store2 = FileStore::new(dir.path(), "test").unwrap(); - - // Both start at the same sequence number - assert_eq!(store1.next_sender_seq_number(), 1); - assert_eq!(store2.next_sender_seq_number(), 1); - - // Store1 increments - store1.increment_sender_seq_number().await.unwrap(); - - // Store1 sees the change - assert_eq!(store1.next_sender_seq_number(), 2); - - // Store2 still has its cached state (no automatic refresh) - assert_eq!(store2.next_sender_seq_number(), 1); - - // However, if store2 also writes, it will overwrite with its state - store2.increment_sender_seq_number().await.unwrap(); - assert_eq!(store2.next_sender_seq_number(), 2); - - // Now if we create a new store, it reads what was last written (store2's state) - let store3 = FileStore::new(dir.path(), "test").unwrap(); - assert_eq!(store3.next_sender_seq_number(), 2); - } - - #[tokio::test] - async fn test_reset_affects_all_future_instances() { - let dir = TempDir::new().unwrap(); - - // Create store and add data - let mut store1 = FileStore::new(dir.path(), "test").unwrap(); - store1.add(1, b"will be deleted").await.unwrap(); - store1.increment_sender_seq_number().await.unwrap(); - store1.increment_sender_seq_number().await.unwrap(); - - // Reset - store1.reset().await.unwrap(); - - // New instance should see reset state - let store2 = FileStore::new(dir.path(), "test").unwrap(); - assert_eq!(store2.next_sender_seq_number(), 1); - assert_eq!(store2.next_target_seq_number(), 1); - - let messages = store2.get_slice(1, 100).await.unwrap(); - assert_eq!(messages.len(), 0); - } -} diff --git a/crates/hotfix/tests/mongodb_store_tests.rs b/crates/hotfix/tests/mongodb_store_tests.rs index 15f90cb..9e688df 100644 --- a/crates/hotfix/tests/mongodb_store_tests.rs +++ b/crates/hotfix/tests/mongodb_store_tests.rs @@ -4,272 +4,113 @@ use hotfix::store::mongodb::{Client, MongoDbMessageStore}; use hotfix::store::{MessageStore, StoreError}; use testcontainers::runners::AsyncRunner; use testcontainers::{ContainerAsync, GenericImage}; -use tokio::sync::OnceCell; const MONGO_PORT: u16 = 27017; -static MONGO_CONTAINER: OnceCell> = OnceCell::const_new(); - -async fn init_container() -> ContainerAsync { - GenericImage::new("mongo", "8.0").start().await.unwrap() -} - -async fn get_mongo_client() -> Client { - let container = MONGO_CONTAINER.get_or_init(init_container).await; +async fn create_dedicated_container_and_store() +-> (ContainerAsync, MongoDbMessageStore) { + let container = GenericImage::new("mongo", "8.0").start().await.unwrap(); let host = container.get_host().await.unwrap(); let port = container.get_host_port_ipv4(MONGO_PORT).await.unwrap(); - Client::with_uri_str(format!("mongodb://{host}:{port}")) - .await - .unwrap() -} -async fn create_test_store(db_name: &str) -> MongoDbMessageStore { - let client = get_mongo_client().await; - let db = client.database(db_name); - let collection_name = format!("test_{}", uuid::Uuid::new_v4()); - MongoDbMessageStore::new(db, Some(&collection_name)) + let client = Client::with_uri_str(format!("mongodb://{host}:{port}")) .await - .unwrap() -} + .unwrap(); + let db = client.database("test_conn_failure"); + let store = MongoDbMessageStore::new(db, Some("test")).await.unwrap(); -mod concurrent_access_tests { - use super::*; - use std::sync::Arc; - use tokio::sync::Mutex; - - #[tokio::test] - async fn test_concurrent_add_same_sequence() { - let client = get_mongo_client().await; - let db = client.database("test_concurrent_add"); - let collection_name = format!("test_{}", uuid::Uuid::new_v4()); - - let mut store1 = MongoDbMessageStore::new(db.clone(), Some(&collection_name)) - .await - .unwrap(); - let mut store2 = MongoDbMessageStore::new(db.clone(), Some(&collection_name)) - .await - .unwrap(); - - let seq_num = 1u64; - let msg1 = b"message from store 1"; - let msg2 = b"message from store 2"; - - // Both stores add a message with the same sequence number - let (result1, result2) = tokio::join!(store1.add(seq_num, msg1), store2.add(seq_num, msg2)); - - // Both should succeed (upsert semantics) - assert!(result1.is_ok()); - assert!(result2.is_ok()); - - // One of the messages should be stored (last write wins) - let messages = store1 - .get_slice(seq_num as usize, seq_num as usize) - .await - .unwrap(); - assert_eq!(messages.len(), 1); - // The message should be one of the two (we can't guarantee which due to race) - assert!(messages[0] == msg1 || messages[0] == msg2); - } - - #[tokio::test] - async fn test_concurrent_increment_sender_seq() { - let client = get_mongo_client().await; - let db = client.database("test_concurrent_inc_sender"); - let collection_name = format!("test_{}", uuid::Uuid::new_v4()); - - let store = Arc::new(Mutex::new( - MongoDbMessageStore::new(db.clone(), Some(&collection_name)) - .await - .unwrap(), - )); - - let num_increments = 50; - let mut handles = Vec::new(); - - for _ in 0..num_increments { - let store_clone = Arc::clone(&store); - handles.push(tokio::spawn(async move { - let mut guard = store_clone.lock().await; - guard.increment_sender_seq_number().await - })); - } - - for handle in handles { - handle.await.unwrap().unwrap(); - } - - let final_store = store.lock().await; - assert_eq!(final_store.next_sender_seq_number(), num_increments + 1); - } - - #[tokio::test] - async fn test_concurrent_increment_target_seq() { - let client = get_mongo_client().await; - let db = client.database("test_concurrent_inc_target"); - let collection_name = format!("test_{}", uuid::Uuid::new_v4()); - - let store = Arc::new(Mutex::new( - MongoDbMessageStore::new(db.clone(), Some(&collection_name)) - .await - .unwrap(), - )); - - let num_increments = 50; - let mut handles = Vec::new(); - - for _ in 0..num_increments { - let store_clone = Arc::clone(&store); - handles.push(tokio::spawn(async move { - let mut guard = store_clone.lock().await; - guard.increment_target_seq_number().await - })); - } - - for handle in handles { - handle.await.unwrap().unwrap(); - } - - let final_store = store.lock().await; - assert_eq!(final_store.next_target_seq_number(), num_increments + 1); - } - - #[tokio::test] - async fn test_two_store_instances_same_collection() { - let client = get_mongo_client().await; - let db = client.database("test_two_instances"); - let collection_name = format!("test_{}", uuid::Uuid::new_v4()); - - let mut store1 = MongoDbMessageStore::new(db.clone(), Some(&collection_name)) - .await - .unwrap(); - - // Store1 adds a message - store1.add(1, b"message from store 1").await.unwrap(); - store1.increment_sender_seq_number().await.unwrap(); - - // Store2 connects to the same collection - let store2 = MongoDbMessageStore::new(db.clone(), Some(&collection_name)) - .await - .unwrap(); - - // Store2 should see the updated sequence number - assert_eq!(store2.next_sender_seq_number(), 2); - - // Store2 should be able to retrieve the message - let messages = store2.get_slice(1, 1).await.unwrap(); - assert_eq!(messages.len(), 1); - assert_eq!(messages[0], b"message from store 1"); - } + (container, store) } -mod connection_failure_tests { - use super::*; - - async fn create_dedicated_container_and_store() - -> (ContainerAsync, MongoDbMessageStore) { - let container = GenericImage::new("mongo", "8.0").start().await.unwrap(); - let host = container.get_host().await.unwrap(); - let port = container.get_host_port_ipv4(MONGO_PORT).await.unwrap(); - - let client = Client::with_uri_str(format!("mongodb://{host}:{port}")) - .await - .unwrap(); - let db = client.database("test_conn_failure"); - let store = MongoDbMessageStore::new(db, Some("test")).await.unwrap(); +#[tokio::test] +async fn test_add_after_connection_drop() { + let (container, mut store) = create_dedicated_container_and_store().await; - (container, store) - } + // Verify store works initially + store.add(1, b"initial message").await.unwrap(); - #[tokio::test] - async fn test_add_after_connection_drop() { - let (container, mut store) = create_dedicated_container_and_store().await; + // Stop the container + container.stop().await.unwrap(); - // Verify store works initially - store.add(1, b"initial message").await.unwrap(); + // Attempt operation - should fail with appropriate error + let result = store.add(2, b"should fail").await; - // Stop the container - container.stop().await.unwrap(); - - // Attempt operation - should fail with appropriate error - let result = store.add(2, b"should fail").await; - - assert!(matches!(result, Err(StoreError::PersistMessage { .. }))); - } + assert!(matches!(result, Err(StoreError::PersistMessage { .. }))); +} - #[tokio::test] - async fn test_get_slice_after_connection_drop() { - let (container, mut store) = create_dedicated_container_and_store().await; +#[tokio::test] +async fn test_get_slice_after_connection_drop() { + let (container, mut store) = create_dedicated_container_and_store().await; - // Add a message while connected - store.add(1, b"test message").await.unwrap(); + // Add a message while connected + store.add(1, b"test message").await.unwrap(); - // Stop the container - container.stop().await.unwrap(); + // Stop the container + container.stop().await.unwrap(); - // Attempt retrieval - should fail - let result = store.get_slice(1, 1).await; + // Attempt retrieval - should fail + let result = store.get_slice(1, 1).await; - assert!(matches!(result, Err(StoreError::RetrieveMessages { .. }))); - } + assert!(matches!(result, Err(StoreError::RetrieveMessages { .. }))); +} - #[tokio::test] - async fn test_increment_after_connection_drop() { - let (container, mut store) = create_dedicated_container_and_store().await; +#[tokio::test] +async fn test_increment_after_connection_drop() { + let (container, mut store) = create_dedicated_container_and_store().await; - // Stop the container - container.stop().await.unwrap(); + // Stop the container + container.stop().await.unwrap(); - // Attempt increment - should fail - let result = store.increment_sender_seq_number().await; + // Attempt increment - should fail + let result = store.increment_sender_seq_number().await; - assert!(matches!(result, Err(StoreError::UpdateSequenceNumber(_)))); - } + assert!(matches!(result, Err(StoreError::UpdateSequenceNumber(_)))); +} - #[tokio::test] - async fn test_reset_after_connection_drop() { - let (container, mut store) = create_dedicated_container_and_store().await; +#[tokio::test] +async fn test_reset_after_connection_drop() { + let (container, mut store) = create_dedicated_container_and_store().await; - // Stop the container - container.stop().await.unwrap(); + // Stop the container + container.stop().await.unwrap(); - // Attempt reset - should fail - let result = store.reset().await; + // Attempt reset - should fail + let result = store.reset().await; - assert!(matches!(result, Err(StoreError::Reset(_)))); - } + assert!(matches!(result, Err(StoreError::Reset(_)))); +} - #[tokio::test] - async fn test_state_preserved_after_failed_increment() { - let (container, mut store) = create_dedicated_container_and_store().await; +#[tokio::test] +async fn test_state_preserved_after_failed_increment() { + let (container, mut store) = create_dedicated_container_and_store().await; - let initial_sender_seq = store.next_sender_seq_number(); - let initial_target_seq = store.next_target_seq_number(); + let initial_sender_seq = store.next_sender_seq_number(); + let initial_target_seq = store.next_target_seq_number(); - // Stop the container - container.stop().await.unwrap(); + // Stop the container + container.stop().await.unwrap(); - // Attempt increments - should fail - let _ = store.increment_sender_seq_number().await; - let _ = store.increment_target_seq_number().await; + // Attempt increments - should fail + let _ = store.increment_sender_seq_number().await; + let _ = store.increment_target_seq_number().await; - // State should be unchanged since DB write failed first - assert_eq!(store.next_sender_seq_number(), initial_sender_seq); - assert_eq!(store.next_target_seq_number(), initial_target_seq); - } + // State should be unchanged since DB write failed first + assert_eq!(store.next_sender_seq_number(), initial_sender_seq); + assert_eq!(store.next_target_seq_number(), initial_target_seq); +} - #[tokio::test] - async fn test_state_preserved_after_failed_set_target() { - let (container, mut store) = create_dedicated_container_and_store().await; +#[tokio::test] +async fn test_state_preserved_after_failed_set_target() { + let (container, mut store) = create_dedicated_container_and_store().await; - let initial_target_seq = store.next_target_seq_number(); + let initial_target_seq = store.next_target_seq_number(); - // Stop the container - container.stop().await.unwrap(); + // Stop the container + container.stop().await.unwrap(); - // Attempt set - should fail - let _ = store.set_target_seq_number(100).await; + // Attempt set - should fail + let _ = store.set_target_seq_number(100).await; - // State should be unchanged - assert_eq!(store.next_target_seq_number(), initial_target_seq); - } + // State should be unchanged + assert_eq!(store.next_target_seq_number(), initial_target_seq); } From 9143732dd3f3ad7ddeead310ae95dfefbd415c5d Mon Sep 17 00:00:00 2001 From: David Steiner Date: Thu, 22 Jan 2026 14:42:56 +0100 Subject: [PATCH 11/11] Remove step to run tests in CI job which is already covered by the coverage job --- .github/workflows/ci.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a7bd067..6bb3a14 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -21,8 +21,6 @@ jobs: run: cargo clippy -- -Dclippy::all -D warnings - name: Run clippy (all targets and features) run: cargo clippy --all-targets --all-features -- -Dclippy::all -D warnings - - name: Run tests - run: cargo test --verbose coverage: runs-on: ubuntu-latest