Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ and its version numbers follow [Semantic Versioning](https://semver.org/).
helpers (`#20`).
- `RuntimeControlMessage::Custom(...)` is now clone-safe and no longer panics
when cloned (`#27`).
- `RuntimeGuard` now supports reliable restart cycles (`start, shutdown, start`)
by retrying control-message delivery across ticker generations (`#18`).

### Removed
- Unused aliases and imports producing compiler warnings.
Expand Down
26 changes: 19 additions & 7 deletions src/runtime_guard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ async fn wait_for_ticker_sender(
// Register interest before checking state to avoid missing notifications.
let notified = ticker_ready.notified();

if let Some(sender) = sender_slot.lock().await.clone() {
if let Some(sender) = sender_slot.lock().await.clone()
&& !sender.is_closed()
{
return sender;
}

Expand All @@ -91,12 +93,22 @@ impl RuntimeGuard {
// the (single) ticker once it has been created.
tokio::spawn(async move {
while let Some(msg) = receiver.recv().await {
let ticker =
wait_for_ticker_sender(Arc::clone(&fanout_sender), Arc::clone(&fanout_ready))
.await;

if ticker.send(msg).await.is_err() {
break; // ticker dropped
let mut pending = msg;
loop {
let ticker = wait_for_ticker_sender(
Arc::clone(&fanout_sender),
Arc::clone(&fanout_ready),
)
.await;

match ticker.send(pending).await {
Ok(_) => break,
Err(err) => {
// Ticker may have dropped between readiness check and send.
// Keep the message and retry once a new ticker is available.
pending = err.0;
}
}
}
}
});
Expand Down
59 changes: 59 additions & 0 deletions tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,44 @@ async fn test_runtime_guard_shutdown_sent_before_ticker_is_not_lost() {
);
}

#[tokio::test]
async fn test_runtime_guard_supports_restart_after_ticker_drop() {
let guard = RuntimeGuard::default();
let handle = guard.handle();

// First start/stop cycle.
let first = guard.runtime_ticker().await;
handle.shutdown().await;
let first_op = timeout(
Duration::from_secs(1),
first.tick(tokio::time::sleep(Duration::from_secs(5))),
)
.await
.expect("timed out waiting for first shutdown");
assert!(matches!(
first_op,
ProcessOperation::Control(RuntimeControlMessage::Shutdown)
));
drop(first);

// Send control while no ticker is alive. This must not kill fanout and
// must be delivered when the next ticker starts.
handle.shutdown().await;

// Second start/stop cycle using the same RuntimeGuard.
let second = guard.runtime_ticker().await;
let second_op = timeout(
Duration::from_secs(1),
second.tick(tokio::time::sleep(Duration::from_secs(5))),
)
.await
.expect("timed out waiting for second shutdown");
assert!(matches!(
second_op,
ProcessOperation::Control(RuntimeControlMessage::Shutdown)
));
}

#[tokio::test]
async fn test_runtime_handle_custom_control_message_is_delivered() {
let guard = RuntimeGuard::default();
Expand Down Expand Up @@ -313,6 +351,27 @@ async fn test_runtime_handle_custom_control_message_is_delivered() {
}
}

#[tokio::test]
async fn test_runnable_can_restart_start_shutdown_start() {
let controller = Arc::new(ExampleController::default());
let handle = controller.process_handle();

for _ in 0..2 {
let runnable = Arc::clone(&controller);
let join = tokio::spawn(async move {
runnable.process_start().await.unwrap();
});

tokio::time::sleep(Duration::from_millis(50)).await;
handle.shutdown().await;

timeout(Duration::from_secs(2), join)
.await
.expect("timed out waiting for runnable shutdown")
.expect("runnable task failed");
}
}

#[test]
fn test_runtime_control_message_custom_clone_is_safe() {
let msg = RuntimeControlMessage::Custom(Arc::new(7_u8));
Expand Down
Loading