Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 2 additions & 7 deletions crates/hotfix-web/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Other

- release ([#268](https://github.com/Validus-Risk-Management/hotfix/pull/268))

## [0.1.5](https://github.com/Validus-Risk-Management/hotfix/compare/hotfix-web-v0.1.4...hotfix-web-v0.1.5) - 2026-01-20

### Other

- updated the following local packages: hotfix, hotfix-web-ui

## [0.1.4](https://github.com/Validus-Risk-Management/hotfix/compare/hotfix-web-v0.1.3...hotfix-web-v0.1.4) - 2025-12-09
Expand All @@ -41,7 +35,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- support reconnects in shutdowns initiated via CLI tool ([#250](https://github.com/Validus-Risk-Management/hotfix/pull/250))
- support reconnects in shutdowns initiated via CLI
tool ([#250](https://github.com/Validus-Risk-Management/hotfix/pull/250))

## [0.1.0](https://github.com/Validus-Risk-Management/hotfix/releases/tag/hotfix-web-v0.1.0) - 2025-11-26

Expand Down
3 changes: 3 additions & 0 deletions crates/hotfix/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,8 @@ uuid = { workspace = true, features = ["v4"] }
[dev-dependencies]
hotfix-message = { version = "0.2.8", path = "../hotfix-message", features = ["fix44", "utils-chrono"] }

rcgen = "0.13"
rustls = { workspace = true, features = ["ring"] }
tempfile = "3"
testcontainers = { workspace = true }
tokio = { workspace = true, features = ["test-util"] }
3 changes: 3 additions & 0 deletions crates/hotfix/clippy.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
allow-expect-in-tests = true
allow-panic-in-tests = true
allow-unwrap-in-tests = true
67 changes: 63 additions & 4 deletions crates/hotfix/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ pub struct Config {

impl Config {
/// Load a [Config] from a `toml` file.
pub fn load_from_path<P: AsRef<Path>>(path: P) -> Self {
let config_str = fs::read_to_string(path).expect("to be able to load config");
toml::from_str::<Self>(&config_str).expect("to be able to parse config")
pub fn load_from_path<P: AsRef<Path>>(path: P) -> Result<Self, ConfigError> {
let config_str = fs::read_to_string(path)?;
let config = toml::from_str::<Self>(&config_str)?;
Ok(config)
}
}

Expand Down Expand Up @@ -115,10 +116,22 @@ pub struct SessionConfig {
pub schedule: Option<ScheduleConfig>,
}

/// Errors that may occur when loading configuration.
#[derive(Debug, thiserror::Error)]
pub enum ConfigError {
#[error("failed to read config file")]
Io(#[from] std::io::Error),

#[error("failed to parse config")]
Parse(#[from] toml::de::Error),
}

#[cfg(test)]
mod tests {
use crate::config::{Config, TlsConfig};
use crate::config::{Config, ConfigError, TlsConfig};
use chrono::{NaiveTime, Weekday};
use std::io::Write;
use tempfile::NamedTempFile;

#[test]
fn test_simple_config() {
Expand Down Expand Up @@ -425,4 +438,50 @@ end_day = "Friday"
let session_config = config.sessions.first().unwrap();
assert_eq!(session_config.reconnect_interval, 15);
}

#[test]
fn test_load_from_path_success() {
let config_contents = r#"
[[sessions]]
begin_string = "FIX.4.4"
sender_comp_id = "sender"
target_comp_id = "target"
connection_host = "127.0.0.1"
connection_port = 9876
heartbeat_interval = 30
"#;

let mut temp_file = NamedTempFile::new().unwrap();
temp_file.write_all(config_contents.as_bytes()).unwrap();

let config = Config::load_from_path(temp_file.path()).unwrap();
assert_eq!(config.sessions.len(), 1);

let session = config.sessions.first().unwrap();
assert_eq!(session.begin_string, "FIX.4.4");
assert_eq!(session.sender_comp_id, "sender");
assert_eq!(session.target_comp_id, "target");
assert_eq!(session.connection_host, "127.0.0.1");
assert_eq!(session.connection_port, 9876);
assert_eq!(session.heartbeat_interval, 30);
}

#[test]
fn test_load_from_path_missing_file() {
let result = Config::load_from_path("/nonexistent/path/to/config.toml");
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), ConfigError::Io(_)));
}

#[test]
fn test_load_from_path_invalid_toml() {
let invalid_toml = "this is not valid toml {{{{";

let mut temp_file = NamedTempFile::new().unwrap();
temp_file.write_all(invalid_toml.as_bytes()).unwrap();

let result = Config::load_from_path(temp_file.path());
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), ConfigError::Parse(_)));
}
}
29 changes: 24 additions & 5 deletions crates/hotfix/src/initiator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,21 @@ async fn establish_connection<Outbound: OutboundMessage>(
completion_tx: watch::Sender<bool>,
) {
loop {
session_ref.await_active_session_time().await;
if session_ref.await_active_session_time().await.is_err() {
warn!("session task terminated when checking active session time");
break;
}

match connect(&config, session_ref.clone()).await {
Ok(conn) => {
session_ref.register_writer(conn.get_writer()).await;
if session_ref
.register_writer(conn.get_writer())
.await
.is_err()
{
warn!("session task terminated when trying to register writer");
break;
};
conn.run_until_disconnect().await;
warn!("session connection dropped, attempting to reconnect");
}
Expand All @@ -108,9 +118,18 @@ async fn establish_connection<Outbound: OutboundMessage>(
}
};

if !session_ref.should_reconnect().await {
warn!("session indicated we shouldn't reconnect");
break;
match session_ref.should_reconnect().await {
Ok(false) => {
warn!("session indicated we shouldn't reconnect");
break;
}
Ok(true) => {
debug!("session indicated we should reconnect");
}
Err(_) => {
warn!("session task terminated when making decision to reconnect");
break;
}
}
let reconnect_interval = config.reconnect_interval;
debug!("waiting for {reconnect_interval} seconds before attempting to reconnect");
Expand Down
5 changes: 5 additions & 0 deletions crates/hotfix/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
//!
//! Check out the [examples](https://github.com/Validus-Risk-Management/hotfix/tree/main/examples)
//! to get started.

#![deny(clippy::expect_used)]
#![deny(clippy::panic)]
#![deny(clippy::unwrap_used)]

pub mod application;
pub mod config;
pub(crate) mod error;
Expand Down
6 changes: 5 additions & 1 deletion crates/hotfix/src/message/reject.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,11 @@ impl OutboundMessage for Reject {
impl InboundMessage for Reject {
fn parse(message: &Message) -> Self {
Self {
ref_seq_num: message.get(REF_SEQ_NUM).unwrap(),
// TODO: how do we handle errors in parsing messages?
#[allow(clippy::expect_used)]
ref_seq_num: message
.get(REF_SEQ_NUM)
.expect("ref_seq_num should be present"),
ref_tag_id: message.get(REF_TAG_ID).ok(),
ref_msg_type: message.get(REF_MSG_TYPE).ok(),
session_reject_reason: message.get(SESSION_REJECT_REASON).ok(),
Expand Down
5 changes: 4 additions & 1 deletion crates/hotfix/src/message/sequence_reset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ impl OutboundMessage for SequenceReset {
fn write(&self, msg: &mut Message) {
msg.set(GAP_FILL_FLAG, self.gap_fill);
msg.set(NEW_SEQ_NO, self.new_seq_no);
let sending_time: Timestamp = msg.header().get(SENDING_TIME).unwrap();
#[allow(clippy::expect_used)]
let sending_time: Timestamp = msg.header().get(SENDING_TIME).expect(
"sending time should always be present due to previously having validated message",
);
msg.header_mut().set(ORIG_SENDING_TIME, sending_time);
msg.header_mut().set(POSS_DUP_FLAG, true);
}
Expand Down
54 changes: 37 additions & 17 deletions crates/hotfix/src/session/session_ref.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::Result;
use thiserror::Error;
use tokio::sync::{mpsc, oneshot};
use tracing::debug;

Expand Down Expand Up @@ -42,44 +43,63 @@ impl<Outbound: OutboundMessage> InternalSessionRef<Outbound> {
})
}

pub async fn register_writer(&self, writer: WriterRef) {
pub async fn register_writer(&self, writer: WriterRef) -> Result<(), SessionGone> {
self.event_sender
.send(SessionEvent::Connected(writer))
.await
.expect("be able to register writer");
.await?;

Ok(())
}

pub async fn new_fix_message_received(&self, msg: RawFixMessage) {
pub async fn new_fix_message_received(&self, msg: RawFixMessage) -> Result<(), SessionGone> {
self.event_sender
.send(SessionEvent::FixMessageReceived(msg))
.await
.expect("be able to receive message");
.await?;

Ok(())
}

pub async fn disconnect(&self, reason: String) {
pub async fn disconnect(&self, reason: String) -> Result<(), SessionGone> {
self.event_sender
.send(SessionEvent::Disconnected(reason))
.await
.expect("be able to send disconnect");
.await?;

Ok(())
}

pub async fn should_reconnect(&self) -> bool {
pub async fn should_reconnect(&self) -> Result<bool, SessionGone> {
let (sender, receiver) = oneshot::channel();
self.event_sender
.send(SessionEvent::ShouldReconnect(sender))
.await
.unwrap();
receiver.await.expect("to receive a response")
.await?;
Ok(receiver.await?)
}

pub async fn await_active_session_time(&self) {
pub async fn await_active_session_time(&self) -> Result<(), SessionGone> {
debug!("awaiting active session time");
let (sender, receiver) = oneshot::channel::<AwaitingActiveSessionResponse>();
self.event_sender
.send(SessionEvent::AwaitingActiveSession(sender))
.await
.unwrap();
receiver.await.expect("to receive a response");
.await?;
receiver.await?;

debug!("resuming connection as session is active");
Ok(())
}
}

#[derive(Debug, Error)]
#[error("session task terminated")]
pub struct SessionGone(String);

impl From<mpsc::error::SendError<SessionEvent>> for SessionGone {
fn from(err: mpsc::error::SendError<SessionEvent>) -> Self {
Self(err.to_string())
}
}

impl From<oneshot::error::RecvError> for SessionGone {
fn from(err: oneshot::error::RecvError) -> Self {
Self(err.to_string())
}
}
Loading