Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,4 @@ jobs:
run: |
timeout 30s cargo run --example simple
timeout 45s cargo run --example dynamic_add
timeout 30s cargo run --example restart_supervisor
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ and its version numbers follow [Semantic Versioning](https://semver.org/).
shutdown grace timeout instead of relying on a fixed 30s value (`#17`).
- `ProcessManager::add` docs now match runtime behavior: calling `add` before
startup panics and `insert` must be used during setup (`#26`).
- CI now executes `simple` and `dynamic_add` examples with bounded runtimes to
catch regressions in sample programs (`#37`).
- CI now executes `simple`, `dynamic_add`, and `restart_supervisor` examples
with bounded runtimes to catch regressions in sample programs (`#37`).
- Added `RestartSupervisor` with configurable exponential backoff to
automatically restart failed child runnables (`#19`).

### Fixed
- Active-child counter accuracy under edge conditions (spawn panics).
Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ processes.
| Error propagation | A failing child triggers a global shutdown and returns the *first* error. |
| Auto cleanup | Optionally remove finished children to keep memory usage bounded. |
| Hierarchical composition | Managers implement `Runnable` themselves → build arbitrary process trees. |
| Built-in helpers | See [`IdleProcess`](#built-in-helpers) and [`SignalReceiver`](#built-in-helpers). |
| Built-in helpers | See [`IdleProcess`](#built-in-helpers), [`RestartSupervisor`](#built-in-helpers) and [`SignalReceiver`](#built-in-helpers). |

---

Expand Down Expand Up @@ -126,6 +126,7 @@ async fn main() {
| Helper | Purpose | Feature flag |
| ---------------- | ------------------------------------------------------------------------------------------------------------------------ | ------------ |
| `IdleProcess` | Keeps an otherwise empty manager alive until an external shutdown is requested. | — |
| `RestartSupervisor` | Wraps a child `Runnable`, restarts it after failures, and applies exponential backoff between retry attempts. | — |
| `SignalReceiver` | Listens for `SIGHUP`, `SIGINT`, `SIGTERM`, `SIGQUIT` and converts them into *shutdown / reload* control messages. | `signal` |

Enable `SignalReceiver` like this:
Expand All @@ -145,6 +146,7 @@ Cargo:
| -------------------------------------------- | ------------------------------------------------------ |
| `cargo run --example simple` | Minimal setup, two workers, graceful shutdown |
| `cargo run --example dynamic_add` | Dynamically add workers while the manager is running |
| `cargo run --example restart_supervisor` | Restart a flaky worker with exponential backoff |

Feel free to copy or adapt the code for your own services.

Expand Down
3 changes: 2 additions & 1 deletion examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ cargo run --example <name>
| -------------------------------- | ---------------------------------------------------------------------- |
| `simple` | minimal setup: one manager, two workers, graceful shutdown |
| `dynamic_add` | add new `Runnable`s **while the manager is already running** |
| `restart_supervisor` | restart a flaky child with configurable exponential backoff |

Feel free to copy / adapt the code for your own services and let us know if you
run into problems.
run into problems.
106 changes: 106 additions & 0 deletions examples/restart_supervisor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
//! Restart supervisor demo.
//!
//! Shows how `RestartSupervisor` can wrap a flaky `Runnable` and restart it
//! with exponential backoff until it succeeds.
//!
//! Build & run:
//! ```bash
//! cargo run --example restart_supervisor
//! ```

use processmanager::*;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use tokio::time::{interval, sleep};

struct FlakyWorker {
fail_until_attempt: usize,
attempts: Arc<AtomicUsize>,
guard: Arc<RuntimeGuard>,
}

impl FlakyWorker {
fn new(fail_until_attempt: usize, attempts: Arc<AtomicUsize>) -> Self {
Self {
fail_until_attempt,
attempts,
guard: Arc::new(RuntimeGuard::default()),
}
}
}

impl Runnable for FlakyWorker {
fn process_start(&self) -> ProcFuture<'_> {
let attempts = Arc::clone(&self.attempts);
let guard = Arc::clone(&self.guard);
let fail_until_attempt = self.fail_until_attempt;

Box::pin(async move {
let attempt = attempts.fetch_add(1, Ordering::SeqCst) + 1;
println!("flaky-worker: start attempt {attempt}");

if attempt <= fail_until_attempt {
return Err(RuntimeError::Internal {
message: format!("simulated failure at attempt {attempt}"),
});
}

let ticker = guard.runtime_ticker().await;
let mut beat = interval(Duration::from_millis(500));

loop {
match ticker.tick(beat.tick()).await {
ProcessOperation::Next(_) => println!("flaky-worker: heartbeat"),
ProcessOperation::Control(RuntimeControlMessage::Shutdown) => {
println!("flaky-worker: shutdown");
break;
}
ProcessOperation::Control(RuntimeControlMessage::Reload) => {
println!("flaky-worker: reload");
}
ProcessOperation::Control(_) => continue,
}
}

Ok(())
})
}

fn process_handle(&self) -> Arc<dyn ProcessControlHandler> {
self.guard.handle()
}

fn process_name(&self) -> std::borrow::Cow<'static, str> {
std::borrow::Cow::Borrowed("FlakyWorker")
}
}

#[tokio::main]
async fn main() {
let attempts = Arc::new(AtomicUsize::new(0));

let wrapped = RestartSupervisor::new(FlakyWorker::new(2, Arc::clone(&attempts))).backoff(
RestartBackoff::new(Duration::from_millis(200), Duration::from_secs(1), 2),
);

let manager = ProcessManagerBuilder::default().pre_insert(wrapped).build();
let handle = manager.process_handle();

tokio::spawn(async move {
manager
.process_start()
.await
.expect("manager encountered an error");
});

// Let the wrapped worker fail twice, restart, and then run.
sleep(Duration::from_secs(3)).await;
println!(
"main: observed {} total worker start attempts",
attempts.load(Ordering::SeqCst)
);

handle.shutdown().await;
sleep(Duration::from_millis(300)).await;
}
2 changes: 2 additions & 0 deletions src/builtin/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
mod idle;
mod restart_supervisor;
#[cfg(feature = "signal")]
mod signal_receiver;

pub use idle::IdleProcess;
pub use restart_supervisor::{RestartBackoff, RestartSupervisor};
#[cfg(feature = "signal")]
pub use signal_receiver::SignalReceiver;
182 changes: 182 additions & 0 deletions src/builtin/restart_supervisor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
//! Restart supervisor for flaky child processes.
//!
//! `RestartSupervisor` supervises one child [`Runnable`]. When the child exits
//! with an error (or panics), the supervisor restarts it after an exponential
//! backoff delay.
//!
//! This is useful for services that should auto-recover from transient
//! failures.
use std::{borrow::Cow, sync::Arc, time::Duration};

use crate::{
ProcFuture, ProcessControlHandler, ProcessOperation, Runnable, RuntimeControlMessage,
RuntimeError, RuntimeGuard,
};

/// Exponential backoff settings for [`RestartSupervisor`].
#[derive(Debug, Clone, Copy)]
pub struct RestartBackoff {
/// Delay used after the first failure.
pub initial: Duration,
/// Maximum delay cap.
pub max: Duration,
/// Multiplication factor for subsequent failures.
pub factor: u32,
}

impl RestartBackoff {
pub fn new(initial: Duration, max: Duration, factor: u32) -> Self {
Self {
initial,
max,
factor: factor.max(1),
}
}

fn delay_for_failures(self, failures: u32) -> Duration {
if failures == 0 {
return Duration::ZERO;
}

let mut delay = self.initial;
for _ in 1..failures {
delay = delay.saturating_mul(self.factor);
if delay >= self.max {
return self.max;
}
}
delay.min(self.max)
}
}

impl Default for RestartBackoff {
fn default() -> Self {
Self {
initial: Duration::from_millis(200),
max: Duration::from_secs(30),
factor: 2,
}
}
}

/// Wraps one child [`Runnable`] and restarts it after failures with backoff.
pub struct RestartSupervisor {
child: Arc<dyn Runnable>,
child_handle: Arc<dyn ProcessControlHandler>,
runtime_guard: RuntimeGuard,
backoff: RestartBackoff,
control_poll: Duration,
}

impl RestartSupervisor {
pub fn new(child: impl Runnable) -> Self {
let child: Arc<dyn Runnable> = Arc::from(Box::new(child) as Box<dyn Runnable>);
let child_handle = child.process_handle();
Self {
child,
child_handle,
runtime_guard: RuntimeGuard::default(),
backoff: RestartBackoff::default(),
control_poll: Duration::from_millis(50),
}
}

pub fn backoff(mut self, backoff: RestartBackoff) -> Self {
self.backoff = backoff;
self
}
}

impl Runnable for RestartSupervisor {
fn process_start(&self) -> ProcFuture<'_> {
let child = Arc::clone(&self.child);
let child_handle = Arc::clone(&self.child_handle);
let backoff = self.backoff;
let control_poll = self.control_poll;
let guard = self.runtime_guard.clone();
let child_name = child.process_name().to_string();

Box::pin(async move {
let ticker = guard.runtime_ticker().await;
let mut failures = 0_u32;

loop {
let mut child_task = tokio::spawn({
let child = Arc::clone(&child);
async move { child.process_start().await }
});

// Child run-loop. React to control messages while the child is alive.
let child_result = loop {
match ticker
.tick(tokio::time::timeout(control_poll, &mut child_task))
.await
{
ProcessOperation::Next(Ok(join_result)) => break join_result,
ProcessOperation::Next(Err(_timeout)) => continue,
ProcessOperation::Control(RuntimeControlMessage::Shutdown) => {
child_handle.shutdown().await;
let _ = child_task.await;
return Ok(());
}
ProcessOperation::Control(RuntimeControlMessage::Reload) => {
child_handle.reload().await;
}
ProcessOperation::Control(_) => {}
}
};

let failure = match child_result {
Ok(Ok(())) => {
// Child stopped successfully; supervisor also exits successfully.
return Ok(());
}
Ok(Err(err)) => err,
Err(join_err) => RuntimeError::Internal {
message: format!(
"restart supervisor child `{child_name}` join failure: {join_err}"
),
},
};

failures = failures.saturating_add(1);
let delay = backoff.delay_for_failures(failures);

#[cfg(feature = "tracing")]
::tracing::warn!(
child = %child_name,
failures = failures,
backoff = ?delay,
"Child failed; restarting after backoff: {failure:?}"
);
#[cfg(all(not(feature = "tracing"), feature = "log"))]
::log::warn!(
"Child {child_name} failed ({failure:?}); restarting in {delay:?} (attempt {failures})"
);
#[cfg(all(not(feature = "tracing"), not(feature = "log")))]
eprintln!(
"Child {child_name} failed ({failure:?}); restarting in {delay:?} (attempt {failures})"
);

// Backoff wait remains responsive to shutdown/reload.
match ticker.tick(tokio::time::sleep(delay)).await {
ProcessOperation::Control(RuntimeControlMessage::Shutdown) => {
return Ok(());
}
ProcessOperation::Control(RuntimeControlMessage::Reload) => {
child_handle.reload().await;
}
ProcessOperation::Control(_) | ProcessOperation::Next(_) => {}
}
}
})
}

fn process_handle(&self) -> Arc<dyn ProcessControlHandler> {
self.runtime_guard.handle()
}

fn process_name(&self) -> Cow<'static, str> {
Cow::Borrowed("RestartSupervisor")
}
}
Loading
Loading