Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ jobs:
run: cargo clippy -- -Dclippy::all -D warnings
- name: Run clippy (all targets and features)
run: cargo clippy --all-targets --all-features -- -Dclippy::all -D warnings
- name: Run tests
run: cargo test --verbose

coverage:
runs-on: ubuntu-latest
Expand Down
10 changes: 0 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ proc-macro2 = "1"
quickcheck = "1"
quickcheck_macros = "1"
quote = "1"
redb = "3.1"
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] }
roxmltree = "0.21"
rust-embed = "8.7"
Expand Down
2 changes: 0 additions & 2 deletions crates/hotfix/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ categories.workspace = true
[features]
default = ["test-utils"]
fix44 = ["hotfix-message/fix44"]
redb = ["dep:redb"]
mongodb = ["dep:mongodb"]
test-utils = []

Expand All @@ -31,7 +30,6 @@ chrono-tz = { workspace = true, features = ["serde"] }
futures = { workspace = true }
mongodb = { workspace = true, optional = true }
rustls-pki-types = { workspace = true }
redb = { workspace = true, optional = true }
rustls = { workspace = true }
rustls-native-certs = { workspace = true }
rustls-pemfile = { workspace = true }
Expand Down
1 change: 0 additions & 1 deletion crates/hotfix/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

pub mod application;
pub mod config;
pub(crate) mod error;
pub mod initiator;
pub mod message;
pub mod message_utils;
Expand Down
1 change: 1 addition & 0 deletions crates/hotfix/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub mod resend_request;
pub mod sequence_reset;
pub mod test_request;
pub mod verification;
pub mod verification_error;

pub use parser::RawFixMessage;
pub use resend_request::ResendRequest;
Expand Down
7 changes: 4 additions & 3 deletions crates/hotfix/src/message/verification.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::config::SessionConfig;
use crate::error::{CompIdType, MessageVerificationError};
use crate::message::verification_error::{CompIdType, MessageVerificationError};
use hotfix_message::Part;
use hotfix_message::field_types::Timestamp;
use hotfix_message::message::Message;
Expand Down Expand Up @@ -180,8 +180,9 @@ fn check_target_comp_id(

#[cfg(test)]
mod tests {
use super::{Message, MessageVerificationError, SessionConfig, verify_message};
use crate::error::CompIdType;
use super::{Message, SessionConfig, verify_message};
use crate::message::verification_error::CompIdType;
use crate::message::verification_error::MessageVerificationError;
use hotfix_message::field_types::Timestamp;
use hotfix_message::{Part, fix44};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,3 @@ pub enum CompIdType {
Sender,
Target,
}

#[derive(Debug, Error)]
pub enum SessionError {
#[error("Schedule configuration is invalid: {0}")]
InvalidSchedule(String),
}
32 changes: 18 additions & 14 deletions crates/hotfix/src/session.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub(crate) mod admin_request;
pub mod error;
pub(crate) mod event;
mod info;
mod session_handle;
Expand Down Expand Up @@ -26,15 +27,17 @@ use tracing::{debug, enabled, error, info, warn};

use crate::Application;
use crate::application::{InboundDecision, OutboundDecision};
use crate::error::{CompIdType, MessageVerificationError};
use crate::message::logout::Logout;
use crate::message::reject::Reject;
use crate::message::resend_request::ResendRequest;
use crate::message::sequence_reset::SequenceReset;
use crate::message::test_request::TestRequest;
use crate::message::verification::verify_message;
use crate::message::verification_error::{CompIdType, MessageVerificationError};
use crate::message_utils::{is_admin, prepare_message_for_resend};
use crate::session::admin_request::AdminRequest;
pub use crate::session::info::{SessionInfo, Status};
pub use crate::session::session_handle::SessionHandle;
#[cfg(not(feature = "test-utils"))]
pub(crate) use crate::session::session_ref::InternalSessionRef;
#[cfg(feature = "test-utils")]
Expand All @@ -49,9 +52,6 @@ use hotfix_message::session_fields::{
SessionRejectReason, TEST_REQ_ID,
};

pub use crate::session::info::{SessionInfo, Status};
pub use crate::session::session_handle::SessionHandle;

const SCHEDULE_CHECK_INTERVAL: u64 = 1;

struct Session<A, I, O, S> {
Expand Down Expand Up @@ -365,7 +365,8 @@ where
}
}

self.store.increment_target_seq_number().await
self.store.increment_target_seq_number().await?;
Ok(())
}

async fn on_heartbeat(&mut self, message: &Message) -> Result<()> {
Expand All @@ -378,7 +379,8 @@ where
self.reset_peer_timer(None);
}

self.store.increment_target_seq_number().await
self.store.increment_target_seq_number().await?;
Ok(())
}

async fn on_test_request(&mut self, message: &Message) -> Result<()> {
Expand Down Expand Up @@ -515,7 +517,8 @@ where
return Ok(());
}

self.store.set_target_seq_number(end - 1).await
self.store.set_target_seq_number(end - 1).await?;
Ok(())
}

async fn handle_verification_error(&mut self, error: MessageVerificationError) -> Result<()> {
Expand Down Expand Up @@ -1129,6 +1132,7 @@ mod tests {
use super::*;
use crate::application::{InboundDecision, OutboundDecision};
use crate::message::{InboundMessage, OutboundMessage};
use crate::store::{Result as StoreResult, StoreError};
use chrono::{DateTime, Datelike, NaiveDate, NaiveTime, TimeDelta, Timelike};
use chrono_tz::Tz;
use hotfix_message::message::Message;
Expand Down Expand Up @@ -1168,11 +1172,11 @@ mod tests {

#[async_trait::async_trait]
impl MessageStore for TestStore {
async fn add(&mut self, _sequence_number: u64, _message: &[u8]) -> Result<()> {
async fn add(&mut self, _sequence_number: u64, _message: &[u8]) -> StoreResult<()> {
Ok(())
}

async fn get_slice(&self, _begin: usize, _end: usize) -> Result<Vec<Vec<u8>>> {
async fn get_slice(&self, _begin: usize, _end: usize) -> StoreResult<Vec<Vec<u8>>> {
Ok(vec![])
}

Expand All @@ -1184,25 +1188,25 @@ mod tests {
self.target_seq
}

async fn increment_sender_seq_number(&mut self) -> Result<()> {
async fn increment_sender_seq_number(&mut self) -> StoreResult<()> {
self.sender_seq += 1;
Ok(())
}

async fn increment_target_seq_number(&mut self) -> Result<()> {
async fn increment_target_seq_number(&mut self) -> StoreResult<()> {
self.target_seq += 1;
Ok(())
}

async fn set_target_seq_number(&mut self, seq_number: u64) -> Result<()> {
async fn set_target_seq_number(&mut self, seq_number: u64) -> StoreResult<()> {
self.target_seq = seq_number;
Ok(())
}

async fn reset(&mut self) -> Result<()> {
async fn reset(&mut self) -> StoreResult<()> {
self.reset_called.store(true, Ordering::SeqCst);
if self.fail_reset.load(Ordering::SeqCst) {
bail!("simulated reset failure")
return Err(StoreError::Reset("simulated reset failure".into()));
}
self.creation_time = Utc::now();
Ok(())
Expand Down
13 changes: 13 additions & 0 deletions crates/hotfix/src/session/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use crate::store::StoreError;
use thiserror::Error;

#[derive(Debug, Error)]
pub enum SessionError {
#[error("Schedule configuration is invalid: {0}")]
InvalidSchedule(String),

#[error("store operation failed")]
Store(#[from] StoreError),
}

pub type Result<T> = std::result::Result<T, SessionError>;
3 changes: 2 additions & 1 deletion crates/hotfix/src/session_schedule.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::config::ScheduleConfig;
use crate::error::SessionError;
use crate::session::error::SessionError;
use chrono::{DateTime, Datelike, Days, NaiveDate, NaiveTime, TimeDelta, Utc, Weekday};
use chrono_tz::Tz;

Expand Down Expand Up @@ -1043,6 +1043,7 @@ mod tests {
SessionError::InvalidSchedule(msg) => {
assert!(msg.contains("Weekly sessions cannot have weekdays specified"));
}
other => panic!("unexpected error: {other}"),
}
}

Expand Down
9 changes: 5 additions & 4 deletions crates/hotfix/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
//! By default, only the [in_memory] store is included. Further message store implementations,
//! such as `mongodb` and `redb` can be enabled through feature flags.

/// Error types for store operations.
pub mod error;

/// An in-memory message store that loses its state on restart. Only use this for testing.
pub mod in_memory;

Expand All @@ -12,11 +15,9 @@ pub mod mongodb;

/// A file-based message store for persistence.
pub mod file;
#[cfg(feature = "redb")]
/// A message store using [redb](https://www.redb.org/) for persistence.
pub mod redb;

use anyhow::Result;
pub use error::*;

use chrono::DateTime;

#[async_trait::async_trait]
Expand Down
38 changes: 38 additions & 0 deletions crates/hotfix/src/store/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
//! Error types for message store operations.

use thiserror::Error;

/// A boxed error type for store errors.
pub type BoxError = Box<dyn std::error::Error + Send + Sync>;

/// Errors that can occur during message store operations.
#[derive(Debug, Error)]
pub enum StoreError {
/// Failed to persist a message to the store.
#[error("failed to persist message (seq_num: {sequence_number})")]
PersistMessage {
sequence_number: u64,
#[source]
source: BoxError,
},

/// Failed to retrieve messages from the store.
#[error("failed to retrieve messages (range: {begin}..={end})")]
RetrieveMessages {
begin: usize,
end: usize,
#[source]
source: BoxError,
},

/// Failed to update a sequence number.
#[error("failed to update sequence number")]
UpdateSequenceNumber(#[source] BoxError),

/// Failed to reset the store.
#[error("failed to reset store")]
Reset(#[source] BoxError),
}

/// A specialized Result type for store operations.
pub type Result<T> = std::result::Result<T, StoreError>;
Loading