Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,4 @@ jobs:
timeout 30s cargo run --example simple
timeout 45s cargo run --example dynamic_add
timeout 30s cargo run --example restart_supervisor
timeout 30s cargo run --example runtime_context
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ and its version numbers follow [Semantic Versioning](https://semver.org/).
scheme (`<type>/<issue-id>-<short-kebab-description>`).
- `AGENTS.md` PR title convention (no coding-agent/tool tags) and mandatory
`git pull --ff-only origin main` before branch creation (`#32`).
- `RunnableWithContext`, `RuntimeContext`, and
`with_runtime_context(...)` so runnables can consume runtime
control/ticker context without carrying a `RuntimeGuard` field (`#21`).

### Changed
- `process_handle()` now returns `Arc<dyn ProcessControlHandler>` (cheap cloning,
Expand All @@ -42,6 +45,8 @@ and its version numbers follow [Semantic Versioning](https://semver.org/).
startup panics and `insert` must be used during setup (`#26`).
- CI now executes `simple`, `dynamic_add`, and `restart_supervisor` examples
with bounded runtimes to catch regressions in sample programs (`#37`).
- CI now also executes the `runtime_context` example to keep the
context-based runnable API covered (`#21`).
- Added `RestartSupervisor` with configurable exponential backoff to
automatically restart failed child runnables (`#19`).

Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ Cargo:
| `cargo run --example simple` | Minimal setup, two workers, graceful shutdown |
| `cargo run --example dynamic_add` | Dynamically add workers while the manager is running |
| `cargo run --example restart_supervisor` | Restart a flaky worker with exponential backoff |
| `cargo run --example runtime_context` | Use `RunnableWithContext` without storing `RuntimeGuard` |

Feel free to copy or adapt the code for your own services.

Expand Down
1 change: 1 addition & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ cargo run --example <name>
| `simple` | minimal setup: one manager, two workers, graceful shutdown |
| `dynamic_add` | add new `Runnable`s **while the manager is already running** |
| `restart_supervisor` | restart a flaky child with configurable exponential backoff |
| `runtime_context` | implement `RunnableWithContext` without manually managing `RuntimeGuard` |

Feel free to copy / adapt the code for your own services and let us know if you
run into problems.
75 changes: 75 additions & 0 deletions examples/runtime_context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
//! Runtime context demo.
//!
//! Shows how to implement `RunnableWithContext` so a process can receive
//! runtime control messages without storing its own `RuntimeGuard`.
//!
//! Build & run:
//! ```bash
//! cargo run --example runtime_context
//! ```

use processmanager::*;
use std::{borrow::Cow, time::Duration};
use tokio::time::{interval, sleep};

struct Worker {
id: usize,
}

impl Worker {
fn new(id: usize) -> Self {
Self { id }
}
}

impl RunnableWithContext for Worker {
fn process_start_with_context(&self, ctx: RuntimeContext) -> ProcFuture<'_> {
let id = self.id;

Box::pin(async move {
let mut beat = interval(Duration::from_secs(1));

loop {
match ctx.tick(beat.tick()).await {
ProcessOperation::Next(_) => println!("worker-{id}: heartbeat"),
ProcessOperation::Control(RuntimeControlMessage::Reload) => {
println!("worker-{id}: reload");
}
ProcessOperation::Control(RuntimeControlMessage::Shutdown) => {
println!("worker-{id}: shutdown");
break;
}
ProcessOperation::Control(_) => continue,
}
}

Ok(())
})
}

fn process_name(&self) -> Cow<'static, str> {
Cow::Borrowed("ContextWorker")
}
}

#[tokio::main]
async fn main() {
let manager = ProcessManagerBuilder::default()
.pre_insert(with_runtime_context(Worker::new(0)))
.build();

let handle = manager.process_handle();

tokio::spawn(async move {
manager
.process_start()
.await
.expect("manager encountered an error");
});

sleep(Duration::from_secs(2)).await;
handle.reload().await;
sleep(Duration::from_secs(2)).await;
handle.shutdown().await;
sleep(Duration::from_millis(300)).await;
}
98 changes: 98 additions & 0 deletions src/runtime_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
/// This module defines:
/// • [`Runnable`] – abstraction for long-running async components supervised by
/// a `ProcessManager`.
/// • [`RunnableWithContext`] – convenience trait that receives a runtime
/// context and does not require implementors to manage `RuntimeGuard`.
/// • [`ProcessControlHandler`] – fire-and-forget interface for broadcasting
/// control messages.
/// • [`RuntimeControlMessage`] – set of built-in control messages understood by
Expand All @@ -13,6 +15,8 @@ use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;

use crate::{RuntimeGuard, RuntimeTicker};

/// Boxed future returned by [`Runnable::process_start`].
pub type ProcFuture<'a> = Pin<Box<dyn Future<Output = Result<(), RuntimeError>> + Send + 'a>>;

Expand Down Expand Up @@ -41,6 +45,100 @@ where
fn process_handle(&self) -> Arc<dyn ProcessControlHandler>;
}

/// Runtime context passed to [`RunnableWithContext`].
///
/// This wrapper exposes the runtime ticker and a control handle without
/// requiring each runnable to carry its own [`RuntimeGuard`].
pub struct RuntimeContext {
ticker: RuntimeTicker,
handle: Arc<dyn ProcessControlHandler>,
}

impl RuntimeContext {
fn new(ticker: RuntimeTicker, handle: Arc<dyn ProcessControlHandler>) -> Self {
Self { ticker, handle }
}

/// Race one unit of work against runtime control messages.
pub async fn tick<O, Fut>(&self, fut: Fut) -> ProcessOperation<O>
where
Fut: Future<Output = O>,
{
self.ticker.tick(fut).await
}

/// Return the control handle associated with this runtime.
pub fn handle(&self) -> Arc<dyn ProcessControlHandler> {
Arc::clone(&self.handle)
}
}

/// Convenience variant of [`Runnable`] that receives a [`RuntimeContext`].
///
/// Unlike [`Runnable`], implementors do not need to store a `RuntimeGuard` or
/// provide their own control handle. Wrap it with [`RuntimeContextRunnable`]
/// (or [`with_runtime_context`]) to use it in a `ProcessManager`.
pub trait RunnableWithContext
where
Self: Send + Sync + 'static,
{
fn process_start_with_context(&self, ctx: RuntimeContext) -> ProcFuture<'_>;

fn process_name(&self) -> Cow<'static, str> {
Cow::Borrowed(std::any::type_name::<Self>())
}
}

/// Adapter that turns a [`RunnableWithContext`] into a regular [`Runnable`].
pub struct RuntimeContextRunnable<R>
where
R: RunnableWithContext,
{
inner: R,
runtime_guard: RuntimeGuard,
}

impl<R> RuntimeContextRunnable<R>
where
R: RunnableWithContext,
{
pub fn new(inner: R) -> Self {
Self {
inner,
runtime_guard: RuntimeGuard::default(),
}
}
}

/// Wrap a [`RunnableWithContext`] for use anywhere a [`Runnable`] is expected.
pub fn with_runtime_context<R>(runnable: R) -> RuntimeContextRunnable<R>
where
R: RunnableWithContext,
{
RuntimeContextRunnable::new(runnable)
}

impl<R> Runnable for RuntimeContextRunnable<R>
where
R: RunnableWithContext,
{
fn process_start(&self) -> ProcFuture<'_> {
Box::pin(async move {
let ticker = self.runtime_guard.runtime_ticker().await;
let ctx = RuntimeContext::new(ticker, self.runtime_guard.handle());
self.inner.process_start_with_context(ctx).await
})
}

fn process_name(&self) -> Cow<'static, str> {
self.inner.process_name()
}

fn process_handle(&self) -> Arc<dyn ProcessControlHandler> {
self.runtime_guard.handle()
}
}

/// Boxed future returned by [`ProcessControlHandler`] control methods.
pub type CtrlFuture<'a> = Pin<Box<dyn Future<Output = ()> + Send + 'a>>;

Expand Down
60 changes: 59 additions & 1 deletion tests/integration.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use processmanager::{
ProcFuture, ProcessControlHandler, ProcessManager, ProcessManagerBuilder, ProcessOperation,
RestartBackoff, RestartSupervisor, Runnable, RuntimeControlMessage, RuntimeError, RuntimeGuard,
RestartBackoff, RestartSupervisor, Runnable, RunnableWithContext, RuntimeContext,
RuntimeControlMessage, RuntimeError, RuntimeGuard, with_runtime_context,
};
use std::ops::Add;
use std::sync::Arc;
Expand Down Expand Up @@ -172,6 +173,36 @@ impl FlakyController {
}
}

struct ContextController {
reloads: Arc<AtomicUsize>,
}

impl ContextController {
fn new(reloads: Arc<AtomicUsize>) -> Self {
Self { reloads }
}
}

impl RunnableWithContext for ContextController {
fn process_start_with_context(&self, ctx: RuntimeContext) -> ProcFuture<'_> {
let reloads = Arc::clone(&self.reloads);
Box::pin(async move {
loop {
match ctx.tick(tokio::time::sleep(Duration::from_secs(30))).await {
ProcessOperation::Next(_) => continue,
ProcessOperation::Control(RuntimeControlMessage::Shutdown) => break,
ProcessOperation::Control(RuntimeControlMessage::Reload) => {
reloads.fetch_add(1, Ordering::SeqCst);
}
ProcessOperation::Control(_) => continue,
}
}

Ok(())
})
}
}

impl Runnable for FlakyController {
fn process_start(&self) -> ProcFuture<'_> {
Box::pin(async {
Expand Down Expand Up @@ -300,6 +331,33 @@ async fn test_runnable() {
);
}

#[tokio::test]
async fn test_runnable_with_context_works_without_runtime_guard_field() {
let reloads = Arc::new(AtomicUsize::new(0));
let mut manager = ProcessManager::new();
manager.insert(with_runtime_context(ContextController::new(Arc::clone(
&reloads,
))));

let (tx, rx) = channel::<bool>();
let handle = manager.process_handle();
tokio::spawn(async move {
manager.process_start().await.unwrap();
tx.send(true).unwrap();
});

tokio::time::sleep(Duration::from_millis(50)).await;
handle.reload().await;
tokio::time::sleep(Duration::from_millis(50)).await;
handle.shutdown().await;

assert!(
timeout(Duration::from_secs(2), rx).await.is_ok(),
"manager did not terminate after shutdown"
);
assert_eq!(reloads.load(Ordering::SeqCst), 1);
}

#[tokio::test]
async fn test_shutdown_waits_for_child_termination() {
let mut manager = ProcessManager::new();
Expand Down
Loading