Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,4 @@ jobs:
timeout 45s cargo run --example dynamic_add
timeout 30s cargo run --example restart_supervisor
timeout 30s cargo run --example runtime_context
timeout 30s cargo run --example axum
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ and its version numbers follow [Semantic Versioning](https://semver.org/).
- `RunnableWithContext`, `RuntimeContext`, and
`with_runtime_context(...)` so runnables can consume runtime
control/ticker context without carrying a `RuntimeGuard` field (`#21`).
- New `axum` example showing how to run a web server under `ProcessManager`
with graceful shutdown and reload handling (`#11`).

### Changed
- `process_handle()` now returns `Arc<dyn ProcessControlHandler>` (cheap cloning,
Expand All @@ -49,6 +51,8 @@ and its version numbers follow [Semantic Versioning](https://semver.org/).
with bounded runtimes to catch regressions in sample programs (`#37`).
- CI now also executes the `runtime_context` example to keep the
context-based runnable API covered (`#21`).
- CI now executes the `axum` example to keep web-framework integration sample
code validated (`#11`).
- Added `RestartSupervisor` with configurable exponential backoff to
automatically restart failed child runnables (`#19`).

Expand Down
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,6 @@ tokio = { version = "1", features = [
log = { version = "0.4", optional = true }
tracing = { version = "0.1", optional = true }
once_cell = "1"

[dev-dependencies]
axum = "0.8"
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ Cargo:
| `cargo run --example dynamic_add` | Dynamically add workers while the manager is running |
| `cargo run --example restart_supervisor` | Restart a flaky worker with exponential backoff |
| `cargo run --example runtime_context` | Use `RunnableWithContext` without storing `RuntimeGuard` |
| `cargo run --example axum` | Manage an `axum` web server with reload/shutdown hooks |

Feel free to copy or adapt the code for your own services.

Expand Down
1 change: 1 addition & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ cargo run --example <name>
| `dynamic_add` | add new `Runnable`s **while the manager is already running** |
| `restart_supervisor` | restart a flaky child with configurable exponential backoff |
| `runtime_context` | implement `RunnableWithContext` without manually managing `RuntimeGuard` |
| `axum` | run an `axum` HTTP server as a managed runnable with graceful shutdown |

Feel free to copy / adapt the code for your own services and let us know if you
run into problems.
138 changes: 138 additions & 0 deletions examples/axum.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
//! Axum integration example.
//!
//! Demonstrates running an `axum` HTTP server under `ProcessManager` using
//! `RunnableWithContext` so no `RuntimeGuard` field is needed in the server
//! runnable.
//!
//! Build & run:
//! ```bash
//! cargo run --example axum
//! ```

use axum::{Router, routing::get};
use processmanager::*;
use std::{borrow::Cow, time::Duration};
use tokio::sync::oneshot;
use tokio::time::sleep;

struct AxumServer;

impl RunnableWithContext for AxumServer {
fn process_start_with_context(&self, ctx: RuntimeContext) -> ProcFuture<'_> {
Box::pin(async move {
let app = Router::new()
.route("/", get(root))
.route("/healthz", get(healthz));

let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.map_err(|err| RuntimeError::Internal {
message: format!("failed to bind axum listener: {err}"),
})?;

let local_addr = listener
.local_addr()
.map_err(|err| RuntimeError::Internal {
message: format!("failed to read local address: {err}"),
})?;

println!("axum server listening on http://{local_addr}");

let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
let mut shutdown_tx = Some(shutdown_tx);

let mut server_task = tokio::spawn(async move {
axum::serve(listener, app)
.with_graceful_shutdown(async move {
let _ = shutdown_rx.await;
})
.await
});

loop {
match ctx
.tick(tokio::time::timeout(
Duration::from_millis(200),
&mut server_task,
))
.await
{
ProcessOperation::Next(Ok(join_result)) => match join_result {
Ok(Ok(())) => return Ok(()),
Ok(Err(err)) => {
return Err(RuntimeError::Internal {
message: format!("axum server stopped with error: {err}"),
});
}
Err(join_err) => {
return Err(RuntimeError::Internal {
message: format!("axum server task join error: {join_err}"),
});
}
},
ProcessOperation::Next(Err(_timeout)) => continue,
ProcessOperation::Control(RuntimeControlMessage::Shutdown) => {
if let Some(tx) = shutdown_tx.take() {
let _ = tx.send(());
}

match server_task.await {
Ok(Ok(())) => return Ok(()),
Ok(Err(err)) => {
return Err(RuntimeError::Internal {
message: format!("axum shutdown failed: {err}"),
});
}
Err(join_err) => {
return Err(RuntimeError::Internal {
message: format!("axum shutdown join error: {join_err}"),
});
}
}
}
ProcessOperation::Control(RuntimeControlMessage::Reload) => {
println!("axum server: reload signal received");
}
ProcessOperation::Control(_) => {}
}
}
})
}

fn process_name(&self) -> Cow<'static, str> {
Cow::Borrowed("AxumServer")
}
}

async fn root() -> &'static str {
"hello from processmanager + axum"
}

async fn healthz() -> &'static str {
"ok"
}

#[tokio::main]
async fn main() {
let manager = ProcessManagerBuilder::default()
.name("axum-demo")
.pre_insert(with_runtime_context(AxumServer))
.build();

let handle = manager.process_handle();

tokio::spawn(async move {
manager
.process_start()
.await
.expect("manager encountered an error");
});

sleep(Duration::from_secs(2)).await;
handle.reload().await;

sleep(Duration::from_secs(2)).await;
handle.shutdown().await;

sleep(Duration::from_millis(300)).await;
}
Loading