From 6e641554c4657f7dc771807f86b84c25218fddc4 Mon Sep 17 00:00:00 2001 From: nabil salah Date: Wed, 11 Dec 2024 17:08:07 +0200 Subject: [PATCH 1/4] feat: graceful shutdown Signed-off-by: nabil salah --- src/events/mod.rs | 39 +++++++++++++++++-------- src/relay/federation/mod.rs | 57 ++++++++++++++++++++++--------------- src/relay/mod.rs | 35 +++++++++++++++-------- 3 files changed, 84 insertions(+), 47 deletions(-) diff --git a/src/events/mod.rs b/src/events/mod.rs index 8a69542..e46db99 100644 --- a/src/events/mod.rs +++ b/src/events/mod.rs @@ -5,6 +5,7 @@ use anyhow::Result; use futures::StreamExt; use log; use subxt::{OnlineClient, PolkadotConfig}; +use tokio::select; #[derive(Clone)] pub struct Listener @@ -62,21 +63,35 @@ where pub async fn listen(&mut self, got_hit: &mut bool) -> Result<()> { loop { - // always flush in case some blocks were finalized before reconnecting - if let Err(err) = self.cache.flush().await { - log::error!("failed to flush redis cache {}", err); - tokio::time::sleep(Duration::from_millis(500)).await; - continue; - } - if let Err(err) = self.handle_events().await { - log::error!("error listening to events {}", err); - if let Some(subxt::Error::Rpc(_)) = err.downcast_ref::() { - self.api = Self::connect(&mut self.substrate_urls).await?; + select! { + _ = tokio::signal::ctrl_c() => { + log::info!("shutting down listener gracefully"); + if let Err(err) = self.cache.flush().await { + log::error!("failed to flush redis cache {}", err); + tokio::time::sleep(Duration::from_millis(500)).await; + continue; + } + break; + }, + result = self.cache.flush() => { + // always flush in case some blocks were finalized before reconnecting + if let Err(err) = result { + log::error!("failed to flush redis cache {}", err); + tokio::time::sleep(Duration::from_millis(500)).await; + continue; + } + if let Err(err) = self.handle_events().await { + log::error!("error listening to events {}", err); + if let Some(subxt::Error::Rpc(_)) = err.downcast_ref::() { + self.api = Self::connect(&mut self.substrate_urls).await?; + } + } else { + *got_hit = true + } } - } else { - *got_hit = true } } + Ok(()) } async fn handle_events(&self) -> Result<()> { diff --git a/src/relay/federation/mod.rs b/src/relay/federation/mod.rs index 87cd11a..4461db0 100644 --- a/src/relay/federation/mod.rs +++ b/src/relay/federation/mod.rs @@ -6,10 +6,11 @@ use crate::twin::TwinDB; use anyhow::Result; use bb8_redis::{ bb8::{Pool, RunError}, - redis::{cmd, RedisError}, + redis::{aio::ConnectionLike, cmd, RedisError}, RedisConnectionManager, }; use prometheus::{IntCounterVec, Opts, Registry}; +use tokio::select; use workers::WorkerPool; mod router; @@ -127,28 +128,38 @@ where let mut workers = self.workers; loop { - let mut con = match self.pool.get().await { - Ok(con) => con, - Err(err) => { - log::error!("could not get redis connection from pool, {}", err); - continue; - } - }; - let worker_handler = workers.get().await; - let (_, msg): (String, Vec) = match cmd("BRPOP") - .arg(FEDERATION_QUEUE) - .arg(0.0) - .query_async(&mut *con) - .await - { - Ok(msg) => msg, - Err(err) => { - log::error!("could not get message from redis {}", err); - continue; - } - }; - if let Err(err) = worker_handler.send(msg) { - log::error!("failed to send job to worker: {}", err); + select! { + _ = tokio::signal::ctrl_c() => { + log::info!("shutting down fedartor gracefully"); + workers.close().await; + break; + }, + result = self.pool.get() => { + let mut con = match result { + Ok(con) => con, + Err(err) => { + log::error!("could not get redis connection from pool, {}", err); + continue; + } + }; + let worker_handler = workers.get().await; + let (_, msg): (String, Vec) = match cmd("BRPOP") + .arg(FEDERATION_QUEUE) + .arg(0.0) + .query_async(&mut *con) + .await + { + Ok(msg) => msg, + Err(err) => { + log::error!("could not get message from redis {}", err); + continue; + } + }; + if let Err(err) = worker_handler.send(msg) { + log::error!("failed to send job to worker: {}", err); + } + + }, } } } diff --git a/src/relay/mod.rs b/src/relay/mod.rs index 15db835..db81a38 100644 --- a/src/relay/mod.rs +++ b/src/relay/mod.rs @@ -5,6 +5,8 @@ use hyper::server::conn::Http; use hyper_tungstenite::tungstenite::error::ProtocolError; use tokio::net::TcpListener; use tokio::net::ToSocketAddrs; +use tokio::select; +use tokio::signal; mod api; mod federation; @@ -63,19 +65,28 @@ where self.limiter, )); loop { - let (tcp_stream, _) = tcp_listener.accept().await?; - let http = http.clone(); - tokio::task::spawn(async move { - if let Err(http_err) = Http::new() - .http1_keep_alive(true) - .serve_connection(tcp_stream, http) - .with_upgrades() - .await - { - eprintln!("Error while serving HTTP connection: {}", http_err); - } - }); + select! { + _ = signal::ctrl_c() => { + log::info!("shutting down relay gracefully"); + break; + }, + result = tcp_listener.accept() => { + let (tcp_stream, _) = result?; + let http = http.clone(); + tokio::task::spawn(async move { + if let Err(http_err) = Http::new() + .http1_keep_alive(true) + .serve_connection(tcp_stream, http) + .with_upgrades() + .await + { + eprintln!("Error while serving HTTP connection: {}", http_err); + } + }); + }, + } } + Ok(()) } } From 755699bbefbeee944b2feabbdf8578a372819d54 Mon Sep 17 00:00:00 2001 From: nabil salah Date: Wed, 11 Dec 2024 17:45:23 +0200 Subject: [PATCH 2/4] refactor: listener graceful shutdown Signed-off-by: nabil salah --- src/events/mod.rs | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/src/events/mod.rs b/src/events/mod.rs index e46db99..edb5e05 100644 --- a/src/events/mod.rs +++ b/src/events/mod.rs @@ -63,24 +63,19 @@ where pub async fn listen(&mut self, got_hit: &mut bool) -> Result<()> { loop { + // always flush in case some blocks were finalized before reconnecting + if let Err(err) = self.cache.flush().await { + log::error!("failed to flush redis cache {}", err); + tokio::time::sleep(Duration::from_millis(500)).await; + continue; + } select! { _ = tokio::signal::ctrl_c() => { log::info!("shutting down listener gracefully"); - if let Err(err) = self.cache.flush().await { - log::error!("failed to flush redis cache {}", err); - tokio::time::sleep(Duration::from_millis(500)).await; - continue; - } break; }, - result = self.cache.flush() => { - // always flush in case some blocks were finalized before reconnecting + result = self.handle_events() => { if let Err(err) = result { - log::error!("failed to flush redis cache {}", err); - tokio::time::sleep(Duration::from_millis(500)).await; - continue; - } - if let Err(err) = self.handle_events().await { log::error!("error listening to events {}", err); if let Some(subxt::Error::Rpc(_)) = err.downcast_ref::() { self.api = Self::connect(&mut self.substrate_urls).await?; From 4271cb49bd4929f6d428fbfd9f624ef20a736e18 Mon Sep 17 00:00:00 2001 From: nabil salah Date: Thu, 12 Dec 2024 12:59:43 +0200 Subject: [PATCH 3/4] fix: redis flushing on shutdown Signed-off-by: nabil salah --- src/events/mod.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/events/mod.rs b/src/events/mod.rs index edb5e05..df2f884 100644 --- a/src/events/mod.rs +++ b/src/events/mod.rs @@ -72,6 +72,11 @@ where select! { _ = tokio::signal::ctrl_c() => { log::info!("shutting down listener gracefully"); + if let Err(err) = self.cache.flush().await { + log::error!("failed to flush redis cache {}", err); + }else { + log::info!("Succesful flush of redis cache"); + } break; }, result = self.handle_events() => { From 224176e029b467ea54d4c6749d5f8272146fbd3e Mon Sep 17 00:00:00 2001 From: nabil salah Date: Thu, 9 Jan 2025 17:31:43 +0200 Subject: [PATCH 4/4] fix: giving time for threads to make it's shutdown Signed-off-by: nabil salah --- Cargo.lock | 21 ++++++++++----- Cargo.toml | 1 + src/bins/rmb-relay.rs | 53 +++++++++++++++++++++++++----------- src/events/mod.rs | 11 +++++--- src/relay/federation/mod.rs | 54 +++++++++++++++++++++---------------- src/relay/mod.rs | 39 ++++++++++++++++++++------- 6 files changed, 122 insertions(+), 57 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0b234be..b2e7756 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -860,7 +860,7 @@ dependencies = [ "memchr", "pin-project-lite", "tokio", - "tokio-util 0.7.12", + "tokio-util 0.7.13", ] [[package]] @@ -1869,7 +1869,7 @@ dependencies = [ "indexmap 2.6.0", "slab", "tokio", - "tokio-util 0.7.12", + "tokio-util 0.7.13", "tracing", ] @@ -1912,6 +1912,12 @@ dependencies = [ "ahash 0.8.11", ] +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.15.0" @@ -2374,7 +2380,7 @@ dependencies = [ "thiserror", "tokio", "tokio-rustls", - "tokio-util 0.7.12", + "tokio-util 0.7.13", "tracing", "webpki-roots", ] @@ -3678,7 +3684,7 @@ dependencies = [ "pin-project-lite", "ryu", "tokio", - "tokio-util 0.7.12", + "tokio-util 0.7.13", "url", ] @@ -3883,6 +3889,7 @@ dependencies = [ "tokio-retry", "tokio-stream", "tokio-tungstenite", + "tokio-util 0.7.13", "ttl_cache", "url", "uuid", @@ -5754,14 +5761,16 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.12" +version = "0.7.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61e7c3654c13bcd040d4a03abee2c75b1d14a37b423cf5a813ceae1cc903ec6a" +checksum = "d7fcaa8d55a2bdd6b83ace262b016eca0d79ee02818c5c1bcdf0305114081078" dependencies = [ "bytes", "futures-core", "futures-io", "futures-sink", + "futures-util", + "hashbrown 0.14.5", "pin-project-lite", "tokio", ] diff --git a/Cargo.toml b/Cargo.toml index ab06921..ef51590 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ simple_logger = "2.1" thiserror = "1.0.31" tokio = { version = "1", features = ["full"] } tokio-retry = "0.3" +tokio-util = { version = "0.7", features = ["rt"] } uuid = { version = "1.1.0", features = ["v4"] } jsonrpsee-core = "0.14.0" mime = "0.3" diff --git a/src/bins/rmb-relay.rs b/src/bins/rmb-relay.rs index f95bfb0..9895c60 100644 --- a/src/bins/rmb-relay.rs +++ b/src/bins/rmb-relay.rs @@ -11,7 +11,10 @@ use rmb::relay::{ limiter::{FixedWindowOptions, Limiters}, }; use rmb::twin::SubstrateTwinDB; -use tokio::sync::oneshot; +use std::process::ExitCode; +use tokio::signal; +use tokio_util::sync::CancellationToken; +use tokio_util::task::TaskTracker; /// A peer requires only which rely to connect to, and /// which identity (mnemonics) @@ -93,7 +96,7 @@ fn set_limits() -> Result<()> { Ok(()) } -async fn app(args: Args, tx: oneshot::Sender<()>) -> Result<()> { +async fn app(args: Args, cloned_token: CancellationToken, tracker: &TaskTracker) -> Result<()> { if args.workers == 0 { anyhow::bail!("number of workers cannot be zero"); } @@ -171,16 +174,18 @@ async fn app(args: Args, tx: oneshot::Sender<()>) -> Result<()> { .await .unwrap(); + let relay_cancellation_token = cloned_token.clone(); let mut l = events::Listener::new(args.substrate, redis_cache).await?; - tokio::spawn(async move { + tracker.spawn(async move { let max_retries = 9; // max wait is 2^9 = 512 seconds ( 5 minutes ) let mut attempt = 0; let mut backoff = Duration::from_secs(1); let mut got_hit = false; loop { + let listener_cancellation_token = cloned_token.clone(); match l - .listen(&mut got_hit) + .listen(&mut got_hit, listener_cancellation_token) .await .context("failed to listen to chain events") { @@ -195,7 +200,8 @@ async fn app(args: Args, tx: oneshot::Sender<()>) -> Result<()> { attempt += 1; if attempt > max_retries { log::error!("Listener failed after {} attempts: {:?}", attempt - 1, e); - let _ = tx.send(()); + let max_attempt_token = cloned_token.clone(); + max_attempt_token.cancel(); break; } log::warn!( @@ -211,28 +217,45 @@ async fn app(args: Args, tx: oneshot::Sender<()>) -> Result<()> { } }); - r.start(&args.listen).await.unwrap(); + tracker.close(); + r.start(&args.listen, relay_cancellation_token) + .await + .unwrap(); + Ok(()) } #[tokio::main] -async fn main() { +async fn main() -> ExitCode { let args = Args::parse(); - let (tx, rx) = oneshot::channel(); - let app_handle = tokio::spawn(async move { - if let Err(e) = app(args, tx).await { + let token = CancellationToken::new(); + let cloned_token = token.clone(); + let tracker = TaskTracker::new(); + let cloned_tracker = tracker.clone(); + let app_handle = tracker.spawn(async move { + if let Err(e) = app(args, cloned_token, &cloned_tracker).await { eprintln!("{:#}", e); - std::process::exit(1); + return ExitCode::FAILURE; } + ExitCode::SUCCESS }); + tracker.close(); tokio::select! { - _ = app_handle => { + status = app_handle => { log::info!("Application is closing successfully."); + token.cancel(); + tracker.wait().await; + match status { + Ok(code) => code, + Err(_) => ExitCode::FAILURE + } } - _ = rx => { - log::error!("Listener shutdown signal received. Exiting application."); - std::process::exit(1); + _ = signal::ctrl_c() => { + log::info!("Ctrl-C received. Exiting application."); + token.cancel(); + tracker.wait().await; + ExitCode::SUCCESS } } } diff --git a/src/events/mod.rs b/src/events/mod.rs index df2f884..812fefd 100644 --- a/src/events/mod.rs +++ b/src/events/mod.rs @@ -6,6 +6,7 @@ use futures::StreamExt; use log; use subxt::{OnlineClient, PolkadotConfig}; use tokio::select; +use tokio_util::sync::CancellationToken; #[derive(Clone)] pub struct Listener @@ -61,7 +62,11 @@ where anyhow::bail!("failed to connect to substrate using the provided urls") } - pub async fn listen(&mut self, got_hit: &mut bool) -> Result<()> { + pub async fn listen( + &mut self, + got_hit: &mut bool, + listener_cancellation_token: CancellationToken, + ) -> Result<()> { loop { // always flush in case some blocks were finalized before reconnecting if let Err(err) = self.cache.flush().await { @@ -70,10 +75,10 @@ where continue; } select! { - _ = tokio::signal::ctrl_c() => { + _ = listener_cancellation_token.cancelled() => { log::info!("shutting down listener gracefully"); if let Err(err) = self.cache.flush().await { - log::error!("failed to flush redis cache {}", err); + log::info!("failed to flush redis cache {}", err); }else { log::info!("Succesful flush of redis cache"); } diff --git a/src/relay/federation/mod.rs b/src/relay/federation/mod.rs index 4461db0..284b387 100644 --- a/src/relay/federation/mod.rs +++ b/src/relay/federation/mod.rs @@ -6,11 +6,12 @@ use crate::twin::TwinDB; use anyhow::Result; use bb8_redis::{ bb8::{Pool, RunError}, - redis::{aio::ConnectionLike, cmd, RedisError}, + redis::{cmd, RedisError}, RedisConnectionManager, }; use prometheus::{IntCounterVec, Opts, Registry}; use tokio::select; +use tokio_util::{sync::CancellationToken, task::TaskTracker}; use workers::WorkerPool; mod router; @@ -115,40 +116,46 @@ where } /// start the federation router - pub fn start(self) -> Federator { + pub fn start( + self, + federator_cancellation_token: CancellationToken, + tracker: &TaskTracker, + ) -> Federator { let federator = Federator { pool: self.pool.clone(), }; - tokio::spawn(self.run()); + tracker.spawn(self.run(federator_cancellation_token)); federator } - async fn run(self) { + async fn run(self, federator_cancellation_token: CancellationToken) { let mut workers = self.workers; loop { + let mut con = match self.pool.get().await { + Ok(con) => con, + Err(err) => { + log::error!("could not get redis connection from pool, {}", err); + continue; + } + }; + let worker_handler = workers.get().await; select! { - _ = tokio::signal::ctrl_c() => { + _ = federator_cancellation_token.cancelled() => { log::info!("shutting down fedartor gracefully"); - workers.close().await; + //workers.close().await; + log::info!("shutting down fedartor gracefully end"); break; }, - result = self.pool.get() => { - let mut con = match result { - Ok(con) => con, - Err(err) => { - log::error!("could not get redis connection from pool, {}", err); - continue; - } - }; - let worker_handler = workers.get().await; - let (_, msg): (String, Vec) = match cmd("BRPOP") - .arg(FEDERATION_QUEUE) - .arg(0.0) - .query_async(&mut *con) - .await - { + result = async { + cmd("BRPOP") + .arg(FEDERATION_QUEUE) + .arg(0.0) + .query_async(&mut *con).await + + } => { + let (_, msg): (String, Vec) = match result { Ok(msg) => msg, Err(err) => { log::error!("could not get message from redis {}", err); @@ -158,7 +165,6 @@ where if let Err(err) = worker_handler.send(msg) { log::error!("failed to send job to worker: {}", err); } - }, } } @@ -215,7 +221,9 @@ mod test { .build(sink, db, ranker) .unwrap(); - let federator = federation.start(); + let token = CancellationToken::new(); + let tracker = TaskTracker::new(); + let federator = federation.start(token, &tracker); let mock = server.mock(|when, then| { when.method(POST).path("/"); diff --git a/src/relay/mod.rs b/src/relay/mod.rs index db81a38..a73f828 100644 --- a/src/relay/mod.rs +++ b/src/relay/mod.rs @@ -6,7 +6,8 @@ use hyper_tungstenite::tungstenite::error::ProtocolError; use tokio::net::TcpListener; use tokio::net::ToSocketAddrs; use tokio::select; -use tokio::signal; +use tokio_util::sync::CancellationToken; +use tokio_util::task::TaskTracker; mod api; mod federation; @@ -54,9 +55,17 @@ where }) } - pub async fn start(self, address: A) -> Result<()> { + pub async fn start( + self, + address: A, + relay_cancellation_token: CancellationToken, + ) -> Result<()> { + let tracker = TaskTracker::new(); let tcp_listener = TcpListener::bind(address).await?; - let federator = self.federation.start(); + let federator_cancellation_token = relay_cancellation_token.clone(); + let federator = self + .federation + .start(federator_cancellation_token, &tracker); let http = api::HttpService::new(api::AppData::new( self.domain, self.switch, @@ -64,28 +73,38 @@ where federator, self.limiter, )); + tracker.close(); loop { select! { - _ = signal::ctrl_c() => { + _ = relay_cancellation_token.cancelled() => { log::info!("shutting down relay gracefully"); + tracker.close(); break; }, result = tcp_listener.accept() => { let (tcp_stream, _) = result?; let http = http.clone(); - tokio::task::spawn(async move { - if let Err(http_err) = Http::new() + let stream_cancellation_token = relay_cancellation_token.clone(); + tracker.spawn(async move { + select! { + _ = stream_cancellation_token.cancelled() => { + log::info!("shutting down connection gracefully"); + }, + result = Http::new() .http1_keep_alive(true) .serve_connection(tcp_stream, http) - .with_upgrades() - .await - { - eprintln!("Error while serving HTTP connection: {}", http_err); + .with_upgrades() => { + if let Err(http_err) = result { + eprintln!("Error while serving HTTP connection: {}", http_err); + } + }, + } }); }, } } + tracker.wait().await; Ok(()) } }