diff --git a/CHANGELOG.md b/CHANGELOG.md index e96d252..0d398fb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -57,6 +57,8 @@ and its version numbers follow [Semantic Versioning](https://semver.org/). helpers (`#20`). - `RuntimeControlMessage::Custom(...)` is now clone-safe and no longer panics when cloned (`#27`). +- `RuntimeGuard` now supports reliable restart cycles (`start, shutdown, start`) + by retrying control-message delivery across ticker generations (`#18`). ### Removed - Unused aliases and imports producing compiler warnings. diff --git a/src/runtime_guard.rs b/src/runtime_guard.rs index 507c8b9..81e76a3 100644 --- a/src/runtime_guard.rs +++ b/src/runtime_guard.rs @@ -64,7 +64,9 @@ async fn wait_for_ticker_sender( // Register interest before checking state to avoid missing notifications. let notified = ticker_ready.notified(); - if let Some(sender) = sender_slot.lock().await.clone() { + if let Some(sender) = sender_slot.lock().await.clone() + && !sender.is_closed() + { return sender; } @@ -91,12 +93,22 @@ impl RuntimeGuard { // the (single) ticker once it has been created. tokio::spawn(async move { while let Some(msg) = receiver.recv().await { - let ticker = - wait_for_ticker_sender(Arc::clone(&fanout_sender), Arc::clone(&fanout_ready)) - .await; - - if ticker.send(msg).await.is_err() { - break; // ticker dropped + let mut pending = msg; + loop { + let ticker = wait_for_ticker_sender( + Arc::clone(&fanout_sender), + Arc::clone(&fanout_ready), + ) + .await; + + match ticker.send(pending).await { + Ok(_) => break, + Err(err) => { + // Ticker may have dropped between readiness check and send. + // Keep the message and retry once a new ticker is available. + pending = err.0; + } + } } } }); diff --git a/tests/integration.rs b/tests/integration.rs index 96359b2..154e578 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -284,6 +284,44 @@ async fn test_runtime_guard_shutdown_sent_before_ticker_is_not_lost() { ); } +#[tokio::test] +async fn test_runtime_guard_supports_restart_after_ticker_drop() { + let guard = RuntimeGuard::default(); + let handle = guard.handle(); + + // First start/stop cycle. + let first = guard.runtime_ticker().await; + handle.shutdown().await; + let first_op = timeout( + Duration::from_secs(1), + first.tick(tokio::time::sleep(Duration::from_secs(5))), + ) + .await + .expect("timed out waiting for first shutdown"); + assert!(matches!( + first_op, + ProcessOperation::Control(RuntimeControlMessage::Shutdown) + )); + drop(first); + + // Send control while no ticker is alive. This must not kill fanout and + // must be delivered when the next ticker starts. + handle.shutdown().await; + + // Second start/stop cycle using the same RuntimeGuard. + let second = guard.runtime_ticker().await; + let second_op = timeout( + Duration::from_secs(1), + second.tick(tokio::time::sleep(Duration::from_secs(5))), + ) + .await + .expect("timed out waiting for second shutdown"); + assert!(matches!( + second_op, + ProcessOperation::Control(RuntimeControlMessage::Shutdown) + )); +} + #[tokio::test] async fn test_runtime_handle_custom_control_message_is_delivered() { let guard = RuntimeGuard::default(); @@ -313,6 +351,27 @@ async fn test_runtime_handle_custom_control_message_is_delivered() { } } +#[tokio::test] +async fn test_runnable_can_restart_start_shutdown_start() { + let controller = Arc::new(ExampleController::default()); + let handle = controller.process_handle(); + + for _ in 0..2 { + let runnable = Arc::clone(&controller); + let join = tokio::spawn(async move { + runnable.process_start().await.unwrap(); + }); + + tokio::time::sleep(Duration::from_millis(50)).await; + handle.shutdown().await; + + timeout(Duration::from_secs(2), join) + .await + .expect("timed out waiting for runnable shutdown") + .expect("runnable task failed"); + } +} + #[test] fn test_runtime_control_message_custom_clone_is_safe() { let msg = RuntimeControlMessage::Custom(Arc::new(7_u8));