diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f24ed57..4591b0d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -54,3 +54,4 @@ jobs: run: | timeout 30s cargo run --example simple timeout 45s cargo run --example dynamic_add + timeout 30s cargo run --example restart_supervisor diff --git a/CHANGELOG.md b/CHANGELOG.md index 0d398fb..1b13be9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,8 +40,10 @@ and its version numbers follow [Semantic Versioning](https://semver.org/). shutdown grace timeout instead of relying on a fixed 30s value (`#17`). - `ProcessManager::add` docs now match runtime behavior: calling `add` before startup panics and `insert` must be used during setup (`#26`). -- CI now executes `simple` and `dynamic_add` examples with bounded runtimes to - catch regressions in sample programs (`#37`). +- CI now executes `simple`, `dynamic_add`, and `restart_supervisor` examples + with bounded runtimes to catch regressions in sample programs (`#37`). +- Added `RestartSupervisor` with configurable exponential backoff to + automatically restart failed child runnables (`#19`). ### Fixed - Active-child counter accuracy under edge conditions (spawn panics). diff --git a/README.md b/README.md index b47ce20..f096cb5 100644 --- a/README.md +++ b/README.md @@ -38,7 +38,7 @@ processes. | Error propagation | A failing child triggers a global shutdown and returns the *first* error. | | Auto cleanup | Optionally remove finished children to keep memory usage bounded. | | Hierarchical composition | Managers implement `Runnable` themselves → build arbitrary process trees. | -| Built-in helpers | See [`IdleProcess`](#built-in-helpers) and [`SignalReceiver`](#built-in-helpers). | +| Built-in helpers | See [`IdleProcess`](#built-in-helpers), [`RestartSupervisor`](#built-in-helpers) and [`SignalReceiver`](#built-in-helpers). | --- @@ -126,6 +126,7 @@ async fn main() { | Helper | Purpose | Feature flag | | ---------------- | ------------------------------------------------------------------------------------------------------------------------ | ------------ | | `IdleProcess` | Keeps an otherwise empty manager alive until an external shutdown is requested. | — | +| `RestartSupervisor` | Wraps a child `Runnable`, restarts it after failures, and applies exponential backoff between retry attempts. | — | | `SignalReceiver` | Listens for `SIGHUP`, `SIGINT`, `SIGTERM`, `SIGQUIT` and converts them into *shutdown / reload* control messages. | `signal` | Enable `SignalReceiver` like this: @@ -145,6 +146,7 @@ Cargo: | -------------------------------------------- | ------------------------------------------------------ | | `cargo run --example simple` | Minimal setup, two workers, graceful shutdown | | `cargo run --example dynamic_add` | Dynamically add workers while the manager is running | +| `cargo run --example restart_supervisor` | Restart a flaky worker with exponential backoff | Feel free to copy or adapt the code for your own services. diff --git a/examples/README.md b/examples/README.md index 46a78bc..cbd8491 100644 --- a/examples/README.md +++ b/examples/README.md @@ -13,6 +13,7 @@ cargo run --example | -------------------------------- | ---------------------------------------------------------------------- | | `simple` | minimal setup: one manager, two workers, graceful shutdown | | `dynamic_add` | add new `Runnable`s **while the manager is already running** | +| `restart_supervisor` | restart a flaky child with configurable exponential backoff | Feel free to copy / adapt the code for your own services and let us know if you -run into problems. \ No newline at end of file +run into problems. diff --git a/examples/restart_supervisor.rs b/examples/restart_supervisor.rs new file mode 100644 index 0000000..b0d44d8 --- /dev/null +++ b/examples/restart_supervisor.rs @@ -0,0 +1,106 @@ +//! Restart supervisor demo. +//! +//! Shows how `RestartSupervisor` can wrap a flaky `Runnable` and restart it +//! with exponential backoff until it succeeds. +//! +//! Build & run: +//! ```bash +//! cargo run --example restart_supervisor +//! ``` + +use processmanager::*; +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::time::Duration; +use tokio::time::{interval, sleep}; + +struct FlakyWorker { + fail_until_attempt: usize, + attempts: Arc, + guard: Arc, +} + +impl FlakyWorker { + fn new(fail_until_attempt: usize, attempts: Arc) -> Self { + Self { + fail_until_attempt, + attempts, + guard: Arc::new(RuntimeGuard::default()), + } + } +} + +impl Runnable for FlakyWorker { + fn process_start(&self) -> ProcFuture<'_> { + let attempts = Arc::clone(&self.attempts); + let guard = Arc::clone(&self.guard); + let fail_until_attempt = self.fail_until_attempt; + + Box::pin(async move { + let attempt = attempts.fetch_add(1, Ordering::SeqCst) + 1; + println!("flaky-worker: start attempt {attempt}"); + + if attempt <= fail_until_attempt { + return Err(RuntimeError::Internal { + message: format!("simulated failure at attempt {attempt}"), + }); + } + + let ticker = guard.runtime_ticker().await; + let mut beat = interval(Duration::from_millis(500)); + + loop { + match ticker.tick(beat.tick()).await { + ProcessOperation::Next(_) => println!("flaky-worker: heartbeat"), + ProcessOperation::Control(RuntimeControlMessage::Shutdown) => { + println!("flaky-worker: shutdown"); + break; + } + ProcessOperation::Control(RuntimeControlMessage::Reload) => { + println!("flaky-worker: reload"); + } + ProcessOperation::Control(_) => continue, + } + } + + Ok(()) + }) + } + + fn process_handle(&self) -> Arc { + self.guard.handle() + } + + fn process_name(&self) -> std::borrow::Cow<'static, str> { + std::borrow::Cow::Borrowed("FlakyWorker") + } +} + +#[tokio::main] +async fn main() { + let attempts = Arc::new(AtomicUsize::new(0)); + + let wrapped = RestartSupervisor::new(FlakyWorker::new(2, Arc::clone(&attempts))).backoff( + RestartBackoff::new(Duration::from_millis(200), Duration::from_secs(1), 2), + ); + + let manager = ProcessManagerBuilder::default().pre_insert(wrapped).build(); + let handle = manager.process_handle(); + + tokio::spawn(async move { + manager + .process_start() + .await + .expect("manager encountered an error"); + }); + + // Let the wrapped worker fail twice, restart, and then run. + sleep(Duration::from_secs(3)).await; + println!( + "main: observed {} total worker start attempts", + attempts.load(Ordering::SeqCst) + ); + + handle.shutdown().await; + sleep(Duration::from_millis(300)).await; +} diff --git a/src/builtin/mod.rs b/src/builtin/mod.rs index da92747..594c6ba 100644 --- a/src/builtin/mod.rs +++ b/src/builtin/mod.rs @@ -1,7 +1,9 @@ mod idle; +mod restart_supervisor; #[cfg(feature = "signal")] mod signal_receiver; pub use idle::IdleProcess; +pub use restart_supervisor::{RestartBackoff, RestartSupervisor}; #[cfg(feature = "signal")] pub use signal_receiver::SignalReceiver; diff --git a/src/builtin/restart_supervisor.rs b/src/builtin/restart_supervisor.rs new file mode 100644 index 0000000..54704cd --- /dev/null +++ b/src/builtin/restart_supervisor.rs @@ -0,0 +1,182 @@ +//! Restart supervisor for flaky child processes. +//! +//! `RestartSupervisor` supervises one child [`Runnable`]. When the child exits +//! with an error (or panics), the supervisor restarts it after an exponential +//! backoff delay. +//! +//! This is useful for services that should auto-recover from transient +//! failures. +use std::{borrow::Cow, sync::Arc, time::Duration}; + +use crate::{ + ProcFuture, ProcessControlHandler, ProcessOperation, Runnable, RuntimeControlMessage, + RuntimeError, RuntimeGuard, +}; + +/// Exponential backoff settings for [`RestartSupervisor`]. +#[derive(Debug, Clone, Copy)] +pub struct RestartBackoff { + /// Delay used after the first failure. + pub initial: Duration, + /// Maximum delay cap. + pub max: Duration, + /// Multiplication factor for subsequent failures. + pub factor: u32, +} + +impl RestartBackoff { + pub fn new(initial: Duration, max: Duration, factor: u32) -> Self { + Self { + initial, + max, + factor: factor.max(1), + } + } + + fn delay_for_failures(self, failures: u32) -> Duration { + if failures == 0 { + return Duration::ZERO; + } + + let mut delay = self.initial; + for _ in 1..failures { + delay = delay.saturating_mul(self.factor); + if delay >= self.max { + return self.max; + } + } + delay.min(self.max) + } +} + +impl Default for RestartBackoff { + fn default() -> Self { + Self { + initial: Duration::from_millis(200), + max: Duration::from_secs(30), + factor: 2, + } + } +} + +/// Wraps one child [`Runnable`] and restarts it after failures with backoff. +pub struct RestartSupervisor { + child: Arc, + child_handle: Arc, + runtime_guard: RuntimeGuard, + backoff: RestartBackoff, + control_poll: Duration, +} + +impl RestartSupervisor { + pub fn new(child: impl Runnable) -> Self { + let child: Arc = Arc::from(Box::new(child) as Box); + let child_handle = child.process_handle(); + Self { + child, + child_handle, + runtime_guard: RuntimeGuard::default(), + backoff: RestartBackoff::default(), + control_poll: Duration::from_millis(50), + } + } + + pub fn backoff(mut self, backoff: RestartBackoff) -> Self { + self.backoff = backoff; + self + } +} + +impl Runnable for RestartSupervisor { + fn process_start(&self) -> ProcFuture<'_> { + let child = Arc::clone(&self.child); + let child_handle = Arc::clone(&self.child_handle); + let backoff = self.backoff; + let control_poll = self.control_poll; + let guard = self.runtime_guard.clone(); + let child_name = child.process_name().to_string(); + + Box::pin(async move { + let ticker = guard.runtime_ticker().await; + let mut failures = 0_u32; + + loop { + let mut child_task = tokio::spawn({ + let child = Arc::clone(&child); + async move { child.process_start().await } + }); + + // Child run-loop. React to control messages while the child is alive. + let child_result = loop { + match ticker + .tick(tokio::time::timeout(control_poll, &mut child_task)) + .await + { + ProcessOperation::Next(Ok(join_result)) => break join_result, + ProcessOperation::Next(Err(_timeout)) => continue, + ProcessOperation::Control(RuntimeControlMessage::Shutdown) => { + child_handle.shutdown().await; + let _ = child_task.await; + return Ok(()); + } + ProcessOperation::Control(RuntimeControlMessage::Reload) => { + child_handle.reload().await; + } + ProcessOperation::Control(_) => {} + } + }; + + let failure = match child_result { + Ok(Ok(())) => { + // Child stopped successfully; supervisor also exits successfully. + return Ok(()); + } + Ok(Err(err)) => err, + Err(join_err) => RuntimeError::Internal { + message: format!( + "restart supervisor child `{child_name}` join failure: {join_err}" + ), + }, + }; + + failures = failures.saturating_add(1); + let delay = backoff.delay_for_failures(failures); + + #[cfg(feature = "tracing")] + ::tracing::warn!( + child = %child_name, + failures = failures, + backoff = ?delay, + "Child failed; restarting after backoff: {failure:?}" + ); + #[cfg(all(not(feature = "tracing"), feature = "log"))] + ::log::warn!( + "Child {child_name} failed ({failure:?}); restarting in {delay:?} (attempt {failures})" + ); + #[cfg(all(not(feature = "tracing"), not(feature = "log")))] + eprintln!( + "Child {child_name} failed ({failure:?}); restarting in {delay:?} (attempt {failures})" + ); + + // Backoff wait remains responsive to shutdown/reload. + match ticker.tick(tokio::time::sleep(delay)).await { + ProcessOperation::Control(RuntimeControlMessage::Shutdown) => { + return Ok(()); + } + ProcessOperation::Control(RuntimeControlMessage::Reload) => { + child_handle.reload().await; + } + ProcessOperation::Control(_) | ProcessOperation::Next(_) => {} + } + } + }) + } + + fn process_handle(&self) -> Arc { + self.runtime_guard.handle() + } + + fn process_name(&self) -> Cow<'static, str> { + Cow::Borrowed("RestartSupervisor") + } +} diff --git a/tests/integration.rs b/tests/integration.rs index 154e578..15d7e14 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -1,9 +1,10 @@ use processmanager::{ ProcFuture, ProcessControlHandler, ProcessManager, ProcessManagerBuilder, ProcessOperation, - Runnable, RuntimeControlMessage, RuntimeError, RuntimeGuard, + RestartBackoff, RestartSupervisor, Runnable, RuntimeControlMessage, RuntimeError, RuntimeGuard, }; use std::ops::Add; use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Duration; use tokio::sync::oneshot::channel; use tokio::time::timeout; @@ -157,6 +158,75 @@ struct IgnoreShutdownController { runtime_guard: RuntimeGuard, } +struct FlakyController { + fail_until_attempt: usize, + attempts: Arc, +} + +impl FlakyController { + fn new(failures: usize, attempts: Arc) -> Self { + Self { + fail_until_attempt: failures, + attempts, + } + } +} + +impl Runnable for FlakyController { + fn process_start(&self) -> ProcFuture<'_> { + Box::pin(async { + let attempt = self.attempts.fetch_add(1, Ordering::SeqCst) + 1; + if attempt <= self.fail_until_attempt { + return Err(RuntimeError::Internal { + message: format!("intentional failure on attempt {attempt}"), + }); + } + Ok(()) + }) + } + + fn process_handle(&self) -> Arc { + Arc::new(StubControlHandle) + } +} + +struct AlwaysFailController { + attempts: Arc, +} + +impl AlwaysFailController { + fn new(attempts: Arc) -> Self { + Self { attempts } + } +} + +impl Runnable for AlwaysFailController { + fn process_start(&self) -> ProcFuture<'_> { + Box::pin(async { + self.attempts.fetch_add(1, Ordering::SeqCst); + Err(RuntimeError::Internal { + message: "intentional permanent failure".to_string(), + }) + }) + } + + fn process_handle(&self) -> Arc { + Arc::new(StubControlHandle) + } +} + +struct StubControlHandle; + +impl ProcessControlHandler for StubControlHandle { + fn shutdown(&self) -> processmanager::CtrlFuture<'_> { + Box::pin(async {}) + } + + fn reload(&self) -> processmanager::CtrlFuture<'_> { + Box::pin(async {}) + } +} + impl Runnable for IgnoreShutdownController { fn process_start(&self) -> ProcFuture<'_> { Box::pin(async { @@ -420,6 +490,61 @@ async fn test_reload_dispatch_is_parallel() { ); } +#[tokio::test] +async fn test_restart_supervisor_restarts_failed_child_with_backoff() { + let attempts = Arc::new(AtomicUsize::new(0)); + let wrapper = RestartSupervisor::new(FlakyController::new(2, Arc::clone(&attempts))).backoff( + RestartBackoff::new(Duration::from_millis(60), Duration::from_millis(200), 2), + ); + + let started = tokio::time::Instant::now(); + wrapper.process_start().await.unwrap(); + let elapsed = started.elapsed(); + + assert_eq!(attempts.load(Ordering::SeqCst), 3); + assert!( + elapsed >= Duration::from_millis(150), + "expected visible backoff delay before successful restart, elapsed={elapsed:?}" + ); +} + +#[tokio::test] +async fn test_restart_supervisor_shutdown_interrupts_backoff() { + let attempts = Arc::new(AtomicUsize::new(0)); + let wrapper = Arc::new( + RestartSupervisor::new(AlwaysFailController::new(Arc::clone(&attempts))).backoff( + RestartBackoff::new(Duration::from_secs(5), Duration::from_secs(5), 2), + ), + ); + let handle = wrapper.process_handle(); + + let run = tokio::spawn({ + let wrapper = Arc::clone(&wrapper); + async move { wrapper.process_start().await.unwrap() } + }); + + // Wait for at least one failed attempt so the wrapper is in backoff. + for _ in 0..20 { + if attempts.load(Ordering::SeqCst) > 0 { + break; + } + tokio::time::sleep(Duration::from_millis(20)).await; + } + assert!(attempts.load(Ordering::SeqCst) > 0); + + let started = tokio::time::Instant::now(); + handle.shutdown().await; + timeout(Duration::from_secs(1), run) + .await + .expect("restart wrapper did not stop promptly after shutdown") + .expect("restart wrapper task failed"); + let elapsed = started.elapsed(); + assert!( + elapsed < Duration::from_secs(1), + "shutdown should interrupt backoff promptly, elapsed={elapsed:?}" + ); +} + #[tokio::test] async fn test_shutdown_grace_period_is_configurable() { let manager = ProcessManagerBuilder::default()