From c4697cf052e66b0761de2102e31b5fda996d0228 Mon Sep 17 00:00:00 2001 From: Marc Riegel Date: Tue, 7 Apr 2026 14:35:20 +0200 Subject: [PATCH] feat: add runtime context runnable API Closes #21 --- .github/workflows/ci.yml | 1 + CHANGELOG.md | 5 ++ README.md | 1 + examples/README.md | 1 + examples/runtime_context.rs | 75 ++++++++++++++++++++++++++++ src/runtime_process.rs | 98 +++++++++++++++++++++++++++++++++++++ tests/integration.rs | 60 ++++++++++++++++++++++- 7 files changed, 240 insertions(+), 1 deletion(-) create mode 100644 examples/runtime_context.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4591b0d..2214d20 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -55,3 +55,4 @@ jobs: timeout 30s cargo run --example simple timeout 45s cargo run --example dynamic_add timeout 30s cargo run --example restart_supervisor + timeout 30s cargo run --example runtime_context diff --git a/CHANGELOG.md b/CHANGELOG.md index 1b13be9..0a0a09c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,9 @@ and its version numbers follow [Semantic Versioning](https://semver.org/). scheme (`/-`). - `AGENTS.md` PR title convention (no coding-agent/tool tags) and mandatory `git pull --ff-only origin main` before branch creation (`#32`). +- `RunnableWithContext`, `RuntimeContext`, and + `with_runtime_context(...)` so runnables can consume runtime + control/ticker context without carrying a `RuntimeGuard` field (`#21`). ### Changed - `process_handle()` now returns `Arc` (cheap cloning, @@ -42,6 +45,8 @@ and its version numbers follow [Semantic Versioning](https://semver.org/). startup panics and `insert` must be used during setup (`#26`). - CI now executes `simple`, `dynamic_add`, and `restart_supervisor` examples with bounded runtimes to catch regressions in sample programs (`#37`). +- CI now also executes the `runtime_context` example to keep the + context-based runnable API covered (`#21`). - Added `RestartSupervisor` with configurable exponential backoff to automatically restart failed child runnables (`#19`). diff --git a/README.md b/README.md index f096cb5..21896d3 100644 --- a/README.md +++ b/README.md @@ -147,6 +147,7 @@ Cargo: | `cargo run --example simple` | Minimal setup, two workers, graceful shutdown | | `cargo run --example dynamic_add` | Dynamically add workers while the manager is running | | `cargo run --example restart_supervisor` | Restart a flaky worker with exponential backoff | +| `cargo run --example runtime_context` | Use `RunnableWithContext` without storing `RuntimeGuard` | Feel free to copy or adapt the code for your own services. diff --git a/examples/README.md b/examples/README.md index cbd8491..e07d5ae 100644 --- a/examples/README.md +++ b/examples/README.md @@ -14,6 +14,7 @@ cargo run --example | `simple` | minimal setup: one manager, two workers, graceful shutdown | | `dynamic_add` | add new `Runnable`s **while the manager is already running** | | `restart_supervisor` | restart a flaky child with configurable exponential backoff | +| `runtime_context` | implement `RunnableWithContext` without manually managing `RuntimeGuard` | Feel free to copy / adapt the code for your own services and let us know if you run into problems. diff --git a/examples/runtime_context.rs b/examples/runtime_context.rs new file mode 100644 index 0000000..868b42c --- /dev/null +++ b/examples/runtime_context.rs @@ -0,0 +1,75 @@ +//! Runtime context demo. +//! +//! Shows how to implement `RunnableWithContext` so a process can receive +//! runtime control messages without storing its own `RuntimeGuard`. +//! +//! Build & run: +//! ```bash +//! cargo run --example runtime_context +//! ``` + +use processmanager::*; +use std::{borrow::Cow, time::Duration}; +use tokio::time::{interval, sleep}; + +struct Worker { + id: usize, +} + +impl Worker { + fn new(id: usize) -> Self { + Self { id } + } +} + +impl RunnableWithContext for Worker { + fn process_start_with_context(&self, ctx: RuntimeContext) -> ProcFuture<'_> { + let id = self.id; + + Box::pin(async move { + let mut beat = interval(Duration::from_secs(1)); + + loop { + match ctx.tick(beat.tick()).await { + ProcessOperation::Next(_) => println!("worker-{id}: heartbeat"), + ProcessOperation::Control(RuntimeControlMessage::Reload) => { + println!("worker-{id}: reload"); + } + ProcessOperation::Control(RuntimeControlMessage::Shutdown) => { + println!("worker-{id}: shutdown"); + break; + } + ProcessOperation::Control(_) => continue, + } + } + + Ok(()) + }) + } + + fn process_name(&self) -> Cow<'static, str> { + Cow::Borrowed("ContextWorker") + } +} + +#[tokio::main] +async fn main() { + let manager = ProcessManagerBuilder::default() + .pre_insert(with_runtime_context(Worker::new(0))) + .build(); + + let handle = manager.process_handle(); + + tokio::spawn(async move { + manager + .process_start() + .await + .expect("manager encountered an error"); + }); + + sleep(Duration::from_secs(2)).await; + handle.reload().await; + sleep(Duration::from_secs(2)).await; + handle.shutdown().await; + sleep(Duration::from_millis(300)).await; +} diff --git a/src/runtime_process.rs b/src/runtime_process.rs index 5c089a5..481e5c2 100644 --- a/src/runtime_process.rs +++ b/src/runtime_process.rs @@ -3,6 +3,8 @@ /// This module defines: /// • [`Runnable`] – abstraction for long-running async components supervised by /// a `ProcessManager`. +/// • [`RunnableWithContext`] – convenience trait that receives a runtime +/// context and does not require implementors to manage `RuntimeGuard`. /// • [`ProcessControlHandler`] – fire-and-forget interface for broadcasting /// control messages. /// • [`RuntimeControlMessage`] – set of built-in control messages understood by @@ -13,6 +15,8 @@ use std::future::Future; use std::pin::Pin; use std::sync::Arc; +use crate::{RuntimeGuard, RuntimeTicker}; + /// Boxed future returned by [`Runnable::process_start`]. pub type ProcFuture<'a> = Pin> + Send + 'a>>; @@ -41,6 +45,100 @@ where fn process_handle(&self) -> Arc; } +/// Runtime context passed to [`RunnableWithContext`]. +/// +/// This wrapper exposes the runtime ticker and a control handle without +/// requiring each runnable to carry its own [`RuntimeGuard`]. +pub struct RuntimeContext { + ticker: RuntimeTicker, + handle: Arc, +} + +impl RuntimeContext { + fn new(ticker: RuntimeTicker, handle: Arc) -> Self { + Self { ticker, handle } + } + + /// Race one unit of work against runtime control messages. + pub async fn tick(&self, fut: Fut) -> ProcessOperation + where + Fut: Future, + { + self.ticker.tick(fut).await + } + + /// Return the control handle associated with this runtime. + pub fn handle(&self) -> Arc { + Arc::clone(&self.handle) + } +} + +/// Convenience variant of [`Runnable`] that receives a [`RuntimeContext`]. +/// +/// Unlike [`Runnable`], implementors do not need to store a `RuntimeGuard` or +/// provide their own control handle. Wrap it with [`RuntimeContextRunnable`] +/// (or [`with_runtime_context`]) to use it in a `ProcessManager`. +pub trait RunnableWithContext +where + Self: Send + Sync + 'static, +{ + fn process_start_with_context(&self, ctx: RuntimeContext) -> ProcFuture<'_>; + + fn process_name(&self) -> Cow<'static, str> { + Cow::Borrowed(std::any::type_name::()) + } +} + +/// Adapter that turns a [`RunnableWithContext`] into a regular [`Runnable`]. +pub struct RuntimeContextRunnable +where + R: RunnableWithContext, +{ + inner: R, + runtime_guard: RuntimeGuard, +} + +impl RuntimeContextRunnable +where + R: RunnableWithContext, +{ + pub fn new(inner: R) -> Self { + Self { + inner, + runtime_guard: RuntimeGuard::default(), + } + } +} + +/// Wrap a [`RunnableWithContext`] for use anywhere a [`Runnable`] is expected. +pub fn with_runtime_context(runnable: R) -> RuntimeContextRunnable +where + R: RunnableWithContext, +{ + RuntimeContextRunnable::new(runnable) +} + +impl Runnable for RuntimeContextRunnable +where + R: RunnableWithContext, +{ + fn process_start(&self) -> ProcFuture<'_> { + Box::pin(async move { + let ticker = self.runtime_guard.runtime_ticker().await; + let ctx = RuntimeContext::new(ticker, self.runtime_guard.handle()); + self.inner.process_start_with_context(ctx).await + }) + } + + fn process_name(&self) -> Cow<'static, str> { + self.inner.process_name() + } + + fn process_handle(&self) -> Arc { + self.runtime_guard.handle() + } +} + /// Boxed future returned by [`ProcessControlHandler`] control methods. pub type CtrlFuture<'a> = Pin + Send + 'a>>; diff --git a/tests/integration.rs b/tests/integration.rs index 15d7e14..1a73cd2 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -1,6 +1,7 @@ use processmanager::{ ProcFuture, ProcessControlHandler, ProcessManager, ProcessManagerBuilder, ProcessOperation, - RestartBackoff, RestartSupervisor, Runnable, RuntimeControlMessage, RuntimeError, RuntimeGuard, + RestartBackoff, RestartSupervisor, Runnable, RunnableWithContext, RuntimeContext, + RuntimeControlMessage, RuntimeError, RuntimeGuard, with_runtime_context, }; use std::ops::Add; use std::sync::Arc; @@ -172,6 +173,36 @@ impl FlakyController { } } +struct ContextController { + reloads: Arc, +} + +impl ContextController { + fn new(reloads: Arc) -> Self { + Self { reloads } + } +} + +impl RunnableWithContext for ContextController { + fn process_start_with_context(&self, ctx: RuntimeContext) -> ProcFuture<'_> { + let reloads = Arc::clone(&self.reloads); + Box::pin(async move { + loop { + match ctx.tick(tokio::time::sleep(Duration::from_secs(30))).await { + ProcessOperation::Next(_) => continue, + ProcessOperation::Control(RuntimeControlMessage::Shutdown) => break, + ProcessOperation::Control(RuntimeControlMessage::Reload) => { + reloads.fetch_add(1, Ordering::SeqCst); + } + ProcessOperation::Control(_) => continue, + } + } + + Ok(()) + }) + } +} + impl Runnable for FlakyController { fn process_start(&self) -> ProcFuture<'_> { Box::pin(async { @@ -300,6 +331,33 @@ async fn test_runnable() { ); } +#[tokio::test] +async fn test_runnable_with_context_works_without_runtime_guard_field() { + let reloads = Arc::new(AtomicUsize::new(0)); + let mut manager = ProcessManager::new(); + manager.insert(with_runtime_context(ContextController::new(Arc::clone( + &reloads, + )))); + + let (tx, rx) = channel::(); + let handle = manager.process_handle(); + tokio::spawn(async move { + manager.process_start().await.unwrap(); + tx.send(true).unwrap(); + }); + + tokio::time::sleep(Duration::from_millis(50)).await; + handle.reload().await; + tokio::time::sleep(Duration::from_millis(50)).await; + handle.shutdown().await; + + assert!( + timeout(Duration::from_secs(2), rx).await.is_ok(), + "manager did not terminate after shutdown" + ); + assert_eq!(reloads.load(Ordering::SeqCst), 1); +} + #[tokio::test] async fn test_shutdown_waits_for_child_termination() { let mut manager = ProcessManager::new();