diff --git a/CHANGELOG.md b/CHANGELOG.md index 253de11..a41bfd6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -52,6 +52,9 @@ and its version numbers follow [Semantic Versioning](https://semver.org/). returning; children are aborted after grace timeout (`#23`). - `ProcessManager` reload fanout now dispatches in parallel, matching documented behavior (`#25`). +- `RuntimeHandle` now supports forwarding arbitrary control messages, including + `RuntimeControlMessage::Custom(...)`, via `control(...)` and `custom(...)` + helpers (`#20`). ### Removed - Unused aliases and imports producing compiler warnings. diff --git a/src/runtime_guard.rs b/src/runtime_guard.rs index 2c441e4..aa81be1 100644 --- a/src/runtime_guard.rs +++ b/src/runtime_guard.rs @@ -9,7 +9,7 @@ //! 1. [`runtime_ticker`](RuntimeGuard::runtime_ticker) – creates the sole ticker //! and connects it to the control fan-out. //! 2. [`handle`](RuntimeGuard::handle) – returns a cheap, clonable -//! [`ProcessControlHandler`] that broadcasts control messages. +//! [`RuntimeHandle`] that broadcasts control messages. //! 3. [`is_running`](RuntimeGuard::is_running) / //! [`block_until_shutdown`](RuntimeGuard::block_until_shutdown) – helpers //! for observing runtime state in tests and demos. @@ -42,7 +42,7 @@ use std::sync::Arc; use tokio::sync::{Mutex, Notify}; -use crate::{ProcessControlHandler, RuntimeControlMessage, RuntimeHandle, RuntimeTicker}; +use crate::{CtrlFuture, RuntimeControlMessage, RuntimeHandle, RuntimeTicker}; #[derive(Debug, Clone)] pub struct RuntimeGuard { @@ -135,14 +135,30 @@ impl RuntimeGuard { !closed } - /// Obtain a clonable [`ProcessControlHandler`] that broadcasts control - /// messages to the ticker. - pub fn handle(&self) -> Arc { + /// Obtain a clonable [`RuntimeHandle`] that broadcasts control messages to + /// the ticker. + pub fn handle(&self) -> Arc { Arc::new(RuntimeHandle::new(Arc::clone( &self.inner.control_ch_sender, ))) } + /// Send an arbitrary runtime control message. + pub fn control(&self, msg: RuntimeControlMessage) -> CtrlFuture<'_> { + Box::pin(async move { + let ch = self.inner.control_ch_sender.lock().await; + let _ = ch.send(msg).await; + }) + } + + /// Send a custom runtime control payload. + pub fn custom(&self, message: T) -> CtrlFuture<'_> + where + T: std::any::Any + Send + Sync + 'static, + { + self.control(RuntimeControlMessage::Custom(Box::new(message))) + } + /// **Busy-wait** helper for tests and demos. /// /// Polls [`is_running`](Self::is_running) once every 10 ms until it diff --git a/src/runtime_handle.rs b/src/runtime_handle.rs index 303bd1b..6eae41e 100644 --- a/src/runtime_handle.rs +++ b/src/runtime_handle.rs @@ -5,6 +5,8 @@ //! channel. The async [`shutdown`](ProcessControlHandler::shutdown) and //! [`reload`](ProcessControlHandler::reload) methods enqueue the requested //! operation and return immediately without waiting for it to be executed. +//! Use [`RuntimeHandle::control`] or [`RuntimeHandle::custom`] to inject custom +//! control messages. // use std::sync::Arc; @@ -28,20 +30,30 @@ impl RuntimeHandle { ) -> Self { Self { control_ch } } -} -impl ProcessControlHandler for RuntimeHandle { - fn shutdown(&self) -> CtrlFuture<'_> { + /// Send an arbitrary runtime control message. + pub fn control(&self, msg: RuntimeControlMessage) -> CtrlFuture<'_> { Box::pin(async move { let ch = self.control_ch.lock().await; - let _ = ch.send(RuntimeControlMessage::Shutdown).await; + let _ = ch.send(msg).await; }) } + /// Send a custom runtime control payload. + pub fn custom(&self, message: T) -> CtrlFuture<'_> + where + T: std::any::Any + Send + Sync + 'static, + { + self.control(RuntimeControlMessage::Custom(Box::new(message))) + } +} + +impl ProcessControlHandler for RuntimeHandle { + fn shutdown(&self) -> CtrlFuture<'_> { + self.control(RuntimeControlMessage::Shutdown) + } + fn reload(&self) -> CtrlFuture<'_> { - Box::pin(async move { - let ch = self.control_ch.lock().await; - let _ = ch.send(RuntimeControlMessage::Reload).await; - }) + self.control(RuntimeControlMessage::Reload) } } diff --git a/tests/integration.rs b/tests/integration.rs index 15b8697..5123778 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -284,6 +284,35 @@ async fn test_runtime_guard_shutdown_sent_before_ticker_is_not_lost() { ); } +#[tokio::test] +async fn test_runtime_handle_custom_control_message_is_delivered() { + let guard = RuntimeGuard::default(); + let handle = guard.handle(); + + // Send custom control message before ticker creation to verify both custom + // payload support and startup buffering behavior. + handle.custom(42_u32).await; + + let ticker = guard.runtime_ticker().await; + + let op = timeout( + Duration::from_secs(1), + ticker.tick(tokio::time::sleep(Duration::from_secs(5))), + ) + .await + .expect("timed out waiting for queued custom message"); + + match op { + ProcessOperation::Control(RuntimeControlMessage::Custom(payload)) => { + let value = payload + .downcast::() + .expect("expected u32 custom payload"); + assert_eq!(*value, 42_u32); + } + _ => panic!("expected custom control message"), + } +} + #[tokio::test] async fn test_reload_dispatch_is_parallel() { let mut manager = ProcessManager::new();