Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ and its version numbers follow [Semantic Versioning](https://semver.org/).
returning; children are aborted after grace timeout (`#23`).
- `ProcessManager` reload fanout now dispatches in parallel, matching documented
behavior (`#25`).
- `RuntimeHandle` now supports forwarding arbitrary control messages, including
`RuntimeControlMessage::Custom(...)`, via `control(...)` and `custom(...)`
helpers (`#20`).

### Removed
- Unused aliases and imports producing compiler warnings.
Expand Down
26 changes: 21 additions & 5 deletions src/runtime_guard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
//! 1. [`runtime_ticker`](RuntimeGuard::runtime_ticker) – creates the sole ticker
//! and connects it to the control fan-out.
//! 2. [`handle`](RuntimeGuard::handle) – returns a cheap, clonable
//! [`ProcessControlHandler`] that broadcasts control messages.
//! [`RuntimeHandle`] that broadcasts control messages.
//! 3. [`is_running`](RuntimeGuard::is_running) /
//! [`block_until_shutdown`](RuntimeGuard::block_until_shutdown) – helpers
//! for observing runtime state in tests and demos.
Expand Down Expand Up @@ -42,7 +42,7 @@ use std::sync::Arc;

use tokio::sync::{Mutex, Notify};

use crate::{ProcessControlHandler, RuntimeControlMessage, RuntimeHandle, RuntimeTicker};
use crate::{CtrlFuture, RuntimeControlMessage, RuntimeHandle, RuntimeTicker};

#[derive(Debug, Clone)]
pub struct RuntimeGuard {
Expand Down Expand Up @@ -135,14 +135,30 @@ impl RuntimeGuard {
!closed
}

/// Obtain a clonable [`ProcessControlHandler`] that broadcasts control
/// messages to the ticker.
pub fn handle(&self) -> Arc<dyn ProcessControlHandler> {
/// Obtain a clonable [`RuntimeHandle`] that broadcasts control messages to
/// the ticker.
pub fn handle(&self) -> Arc<RuntimeHandle> {
Arc::new(RuntimeHandle::new(Arc::clone(
&self.inner.control_ch_sender,
)))
}

/// Send an arbitrary runtime control message.
pub fn control(&self, msg: RuntimeControlMessage) -> CtrlFuture<'_> {
Box::pin(async move {
let ch = self.inner.control_ch_sender.lock().await;
let _ = ch.send(msg).await;
})
}

/// Send a custom runtime control payload.
pub fn custom<T>(&self, message: T) -> CtrlFuture<'_>
where
T: std::any::Any + Send + Sync + 'static,
{
self.control(RuntimeControlMessage::Custom(Box::new(message)))
}

/// **Busy-wait** helper for tests and demos.
///
/// Polls [`is_running`](Self::is_running) once every 10 ms until it
Expand Down
28 changes: 20 additions & 8 deletions src/runtime_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
//! channel. The async [`shutdown`](ProcessControlHandler::shutdown) and
//! [`reload`](ProcessControlHandler::reload) methods enqueue the requested
//! operation and return immediately without waiting for it to be executed.
//! Use [`RuntimeHandle::control`] or [`RuntimeHandle::custom`] to inject custom
//! control messages.
//
use std::sync::Arc;

Expand All @@ -28,20 +30,30 @@ impl RuntimeHandle {
) -> Self {
Self { control_ch }
}
}

impl ProcessControlHandler for RuntimeHandle {
fn shutdown(&self) -> CtrlFuture<'_> {
/// Send an arbitrary runtime control message.
pub fn control(&self, msg: RuntimeControlMessage) -> CtrlFuture<'_> {
Box::pin(async move {
let ch = self.control_ch.lock().await;
let _ = ch.send(RuntimeControlMessage::Shutdown).await;
let _ = ch.send(msg).await;
})
}

/// Send a custom runtime control payload.
pub fn custom<T>(&self, message: T) -> CtrlFuture<'_>
where
T: std::any::Any + Send + Sync + 'static,
{
self.control(RuntimeControlMessage::Custom(Box::new(message)))
}
}

impl ProcessControlHandler for RuntimeHandle {
fn shutdown(&self) -> CtrlFuture<'_> {
self.control(RuntimeControlMessage::Shutdown)
}

fn reload(&self) -> CtrlFuture<'_> {
Box::pin(async move {
let ch = self.control_ch.lock().await;
let _ = ch.send(RuntimeControlMessage::Reload).await;
})
self.control(RuntimeControlMessage::Reload)
}
}
29 changes: 29 additions & 0 deletions tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,35 @@ async fn test_runtime_guard_shutdown_sent_before_ticker_is_not_lost() {
);
}

#[tokio::test]
async fn test_runtime_handle_custom_control_message_is_delivered() {
let guard = RuntimeGuard::default();
let handle = guard.handle();

// Send custom control message before ticker creation to verify both custom
// payload support and startup buffering behavior.
handle.custom(42_u32).await;

let ticker = guard.runtime_ticker().await;

let op = timeout(
Duration::from_secs(1),
ticker.tick(tokio::time::sleep(Duration::from_secs(5))),
)
.await
.expect("timed out waiting for queued custom message");

match op {
ProcessOperation::Control(RuntimeControlMessage::Custom(payload)) => {
let value = payload
.downcast::<u32>()
.expect("expected u32 custom payload");
assert_eq!(*value, 42_u32);
}
_ => panic!("expected custom control message"),
}
}

#[tokio::test]
async fn test_reload_dispatch_is_parallel() {
let mut manager = ProcessManager::new();
Expand Down
Loading