diff --git a/.cargo/config.toml b/.cargo/config.toml index 299ded4740..0193216538 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,5 +1,13 @@ -# cargo build --all --target=aarch64-unknown-linux-gnu +# Note: we use `[target.'cfg(all())']` instead of `[build]` to specify rustflags common +# to all targets, because `build.rustflags` is mutually exclusive with `target.xxx.rustflags`, +# and the latter overrides the former instead of being merged together. + +[target.'cfg(all())'] +# Needed for tokio-console support +rustflags = ["--cfg", "tokio_unstable"] + [target.aarch64-unknown-linux-gnu] linker = "aarch64-linux-gnu-gcc" + [target.x86_64-pc-windows-msvc] rustflags = ["-C", "target-feature=+crt-static"] diff --git a/Cargo.lock b/Cargo.lock index 64551f15ec..1d304ac4f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -330,7 +330,7 @@ dependencies = [ "api-server-common", "api-web-server", "async-trait", - "axum", + "axum 0.7.9", "chainstate", "chainstate-test-framework", "common", @@ -359,7 +359,7 @@ version = "1.2.0" dependencies = [ "api-server-common", "async-trait", - "axum", + "axum 0.7.9", "clap", "common", "crypto", @@ -631,7 +631,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" dependencies = [ "async-trait", - "axum-core", + "axum-core 0.4.5", "bytes", "futures-util", "http", @@ -640,7 +640,7 @@ dependencies = [ "hyper", "hyper-util", "itoa", - "matchit", + "matchit 0.7.3", "memchr", "mime", "percent-encoding", @@ -658,6 +658,31 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum" +version = "0.8.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b52af3cb4058c895d37317bb27508dccc8e5f2d39454016b297bf4a400597b8" +dependencies = [ + "axum-core 0.5.6", + "bytes", + "futures-util", + "http", + "http-body", + "http-body-util", + "itoa", + "matchit 0.8.4", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "serde_core", + "sync_wrapper", + "tower", + "tower-layer", + "tower-service", +] + [[package]] name = "axum-core" version = "0.4.5" @@ -679,6 +704,24 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum-core" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08c78f31d7b1291f7ee735c1c6780ccde7785daae9a9206026862dab7d8792d1" +dependencies = [ + "bytes", + "futures-core", + "http", + "http-body", + "http-body-util", + "mime", + "pin-project-lite", + "sync_wrapper", + "tower-layer", + "tower-service", +] + [[package]] name = "base58ck" version = "0.1.0" @@ -1671,6 +1714,46 @@ dependencies = [ "utxo", ] +[[package]] +name = "console-api" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8599749b6667e2f0c910c1d0dff6901163ff698a52d5a39720f61b5be4b20d3" +dependencies = [ + "futures-core", + "prost", + "prost-types", + "tonic", + "tonic-prost", + "tracing-core", +] + +[[package]] +name = "console-subscriber" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb4915b7d8dd960457a1b6c380114c2944f728e7c65294ab247ae6b6f1f37592" +dependencies = [ + "console-api", + "crossbeam-channel", + "crossbeam-utils", + "futures-task", + "hdrhistogram", + "humantime", + "hyper-util", + "prost", + "prost-types", + "serde", + "serde_json", + "thread_local", + "tokio", + "tokio-stream", + "tonic", + "tracing", + "tracing-core", + "tracing-subscriber", +] + [[package]] name = "const_format" version = "0.2.35" @@ -3256,6 +3339,19 @@ dependencies = [ "winapi", ] +[[package]] +name = "hdrhistogram" +version = "7.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d" +dependencies = [ + "base64 0.21.7", + "byteorder", + "flate2", + "nom", + "num-traits", +] + [[package]] name = "heck" version = "0.4.1" @@ -3477,6 +3573,19 @@ dependencies = [ "tower-service", ] +[[package]] +name = "hyper-timeout" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" +dependencies = [ + "hyper", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "hyper-tls" version = "0.6.0" @@ -4382,9 +4491,9 @@ dependencies = [ name = "logging" version = "1.2.0" dependencies = [ + "console-subscriber", "log", "thiserror 1.0.69", - "tokio", "tracing", "tracing-subscriber", ] @@ -4484,6 +4593,12 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" +[[package]] +name = "matchit" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" + [[package]] name = "md-5" version = "0.10.6" @@ -4615,6 +4730,12 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "miniz_oxide" version = "0.8.9" @@ -5045,6 +5166,16 @@ dependencies = [ "utils-networking", ] +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + [[package]] name = "nu-ansi-term" version = "0.50.3" @@ -6229,6 +6360,38 @@ dependencies = [ "unarray", ] +[[package]] +name = "prost" +version = "0.14.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2ea70524a2f82d518bce41317d0fae74151505651af45faf1ffbd6fd33f0568" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.14.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b" +dependencies = [ + "anyhow", + "itertools 0.14.0", + "proc-macro2", + "quote", + "syn 2.0.106", +] + +[[package]] +name = "prost-types" +version = "0.14.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8991c4cbdb8bc5b11f0b074ffe286c30e523de90fee5ba8132f1399f23cb3dd7" +dependencies = [ + "prost", +] + [[package]] name = "protobuf" version = "3.7.2" @@ -8561,6 +8724,46 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801" +[[package]] +name = "tonic" +version = "0.14.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a286e33f82f8a1ee2df63f4fa35c0becf4a85a0cb03091a15fd7bf0b402dc94a" +dependencies = [ + "async-trait", + "axum 0.8.8", + "base64 0.22.1", + "bytes", + "h2", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "socket2 0.6.1", + "sync_wrapper", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tonic-prost" +version = "0.14.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6c55a2d6a14174563de34409c9f92ff981d006f56da9c6ecd40d9d4a31500b0" +dependencies = [ + "bytes", + "prost", + "tonic", +] + [[package]] name = "tower" version = "0.5.2" @@ -8569,9 +8772,12 @@ checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" dependencies = [ "futures-core", "futures-util", + "indexmap 2.11.4", "pin-project-lite", + "slab", "sync_wrapper", "tokio", + "tokio-util", "tower-layer", "tower-service", "tracing", diff --git a/Cargo.toml b/Cargo.toml index ce49fac580..e4eab96fe7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -152,10 +152,11 @@ cfg-if = "1.0" chacha20poly1305 = "0.10" chrono = "0.4" clap = "4.5" -csv = "1.3" -ctor = "0.2" +console-subscriber = "0.5" criterion = "0.5" crossterm = "0.28" +csv = "1.3" +ctor = "0.2" derive_more = { version = "1.0", features = ["full"] } directories = "5.0" humantime = "2.1" @@ -313,6 +314,7 @@ overflow-checks = true opt-level = 2 [features] +tokio-console = [] trezor = [] default = ["trezor"] diff --git a/blockprod/src/detail/job_manager/mod.rs b/blockprod/src/detail/job_manager/mod.rs index 891e785d11..9ec4289775 100644 --- a/blockprod/src/detail/job_manager/mod.rs +++ b/blockprod/src/detail/job_manager/mod.rs @@ -17,8 +17,12 @@ mod jobs_container; use std::sync::Arc; -use crate::detail::CustomId; use async_trait::async_trait; +use tokio::sync::{ + mpsc::{self, unbounded_channel, UnboundedReceiver, UnboundedSender}, + oneshot, +}; + use chainstate::{ChainstateEvent, ChainstateHandle}; use common::{ chain::{block::timestamp::BlockTimestamp, GenBlock}, @@ -26,11 +30,9 @@ use common::{ }; use logging::log; use serialization::{Decode, Encode}; -use tokio::sync::{ - mpsc::{self, unbounded_channel, UnboundedReceiver, UnboundedSender}, - oneshot, -}; -use utils::{ensure, tap_log::TapLog}; +use utils::{ensure, tap_log::TapLog, tokio_spawn}; + +use crate::detail::CustomId; #[derive(thiserror::Error, Debug, PartialEq, Eq)] pub enum JobManagerError { @@ -239,31 +241,34 @@ impl JobManager { mut stop_job_receiver: UnboundedReceiver<(Option, oneshot::Sender)>, mut shutdown_receiver: UnboundedReceiver>, ) { - tokio::spawn(async move { - let mut jobs = jobs_container::JobsContainer::default(); + tokio_spawn( + async move { + let mut jobs = jobs_container::JobsContainer::default(); - loop { - tokio::select! { - event = get_job_count_receiver.recv() - => event_then(event, |result_sender| jobs.handle_job_count(result_sender)), + loop { + tokio::select! { + event = get_job_count_receiver.recv() + => event_then(event, |result_sender| jobs.handle_job_count(result_sender)), - tip_id = chainstate_receiver.recv() - => event_then(tip_id, |id| jobs.handle_chainstate_event(id)), + tip_id = chainstate_receiver.recv() + => event_then(tip_id, |id| jobs.handle_chainstate_event(id)), - event = new_job_receiver.recv() - => event_then(event, |job| jobs.handle_add_job(job)), + event = new_job_receiver.recv() + => event_then(event, |job| jobs.handle_add_job(job)), - event = last_used_block_timestamp_receiver.recv() - => event_then(event, |ev| jobs.handle_update_last_used_block_timestamp(ev)), + event = last_used_block_timestamp_receiver.recv() + => event_then(event, |ev| jobs.handle_update_last_used_block_timestamp(ev)), - event = stop_job_receiver.recv() - => event_then(event, |ev| jobs.handle_stop_job(ev)), + event = stop_job_receiver.recv() + => event_then(event, |ev| jobs.handle_stop_job(ev)), - event = shutdown_receiver.recv() - => return event_then(event, |result_sender| jobs.handle_shutdown(result_sender)), + event = shutdown_receiver.recv() + => return event_then(event, |result_sender| jobs.handle_shutdown(result_sender)), + } } - } - }); + }, + "Blockprod job mgr", + ); } fn subscribe_to_chainstate(&self, chainstate_sender: UnboundedSender>) { @@ -272,24 +277,27 @@ impl JobManager { None => return, }; - tokio::spawn(async move { - chainstate_handle - .call_mut(|this| { - let subscribe_func = - Arc::new( - move |chainstate_event: ChainstateEvent| match chainstate_event { - ChainstateEvent::NewTip(block_id, _) => { - _ = chainstate_sender.send(block_id.into()).log_err_pfx( - "Chainstate subscriber failed to send new tip", - ); - } - }, - ); - - this.subscribe_to_subsystem_events(subscribe_func); - }) - .await - }); + tokio_spawn( + async move { + chainstate_handle + .call_mut(|this| { + let subscribe_func = + Arc::new( + move |chainstate_event: ChainstateEvent| match chainstate_event { + ChainstateEvent::NewTip(block_id, _) => { + _ = chainstate_sender.send(block_id.into()).log_err_pfx( + "Chainstate subscriber failed to send new tip", + ); + } + }, + ); + + this.subscribe_to_subsystem_events(subscribe_func); + }) + .await + }, + "Blockprod subscribe to CS", + ); } #[allow(dead_code)] @@ -397,7 +405,7 @@ impl Drop for JobManager { return; } - tokio::spawn(result_receiver); + tokio_spawn(result_receiver, "Blockprod job mgr drop result receiver"); } } diff --git a/blockprod/src/detail/timestamp_searcher/mod.rs b/blockprod/src/detail/timestamp_searcher/mod.rs index 2241da40b4..c548e1ab12 100644 --- a/blockprod/src/detail/timestamp_searcher/mod.rs +++ b/blockprod/src/detail/timestamp_searcher/mod.rs @@ -38,7 +38,7 @@ use crypto::vrf::{VRFPrivateKey, VRFPublicKey}; use logging::log; use randomness::{CryptoRng, Rng}; use serialization::{Decode, Encode}; -use utils::{ensure, once_destructor::OnceDestructor}; +use utils::{ensure, once_destructor::OnceDestructor, tokio_spawn_blocking}; use crate::{ detail::utils::{ @@ -289,9 +289,10 @@ pub async fn find_timestamps_for_staking( secret_input_data: PoSTimestampSearchInputData, search_data: TimestampSearchData, ) -> Result>, BlockProductionError> { - let task_join_result = tokio::task::spawn_blocking({ - move || find_timestamps_for_staking_impl(&search_data, &secret_input_data) - }) + let task_join_result = tokio_spawn_blocking( + move || find_timestamps_for_staking_impl(&search_data, &secret_input_data), + "find_timestamps_for_staking", + ) .await; match task_join_result { diff --git a/dns-server/src/dns_server/mod.rs b/dns-server/src/dns_server/mod.rs index 6ae183cd97..5a30d08531 100644 --- a/dns-server/src/dns_server/mod.rs +++ b/dns-server/src/dns_server/mod.rs @@ -21,7 +21,6 @@ use std::{ sync::{Arc, Mutex}, }; -use common::{chain::ChainConfig, primitives::per_thousand::PerThousand}; use futures::never::Never; use hickory_client::{ proto::rr::{LowerName, RrKey}, @@ -40,10 +39,12 @@ use hickory_server::{ ServerFuture, }; use itertools::Itertools; +use tokio::{net::UdpSocket, sync::mpsc}; + +use common::{chain::ChainConfig, primitives::per_thousand::PerThousand}; use logging::log; use randomness::{make_pseudo_rng, Rng, SliceRandom}; -use tokio::{net::UdpSocket, sync::mpsc}; -use utils::{atomics::RelaxedAtomicU32, make_config_setting}; +use utils::{atomics::RelaxedAtomicU32, make_config_setting, tokio_spawn}; use crate::{ config::DnsServerConfig, crawler_p2p::crawler::address_data::SoftwareInfo, @@ -126,11 +127,14 @@ impl DnsServer { mut cmd_rx, } = self; - tokio::spawn(async move { - while let Some(command) = cmd_rx.recv().await { - handle_command(&auth, command); - } - }); + tokio_spawn( + async move { + while let Some(command) = cmd_rx.recv().await { + handle_command(&auth, command); + } + }, + "Cmd handling loop", + ); server.block_until_done().await?; diff --git a/dns-server/src/main.rs b/dns-server/src/main.rs index c4a8f4ec20..ffac9a6831 100644 --- a/dns-server/src/main.rs +++ b/dns-server/src/main.rs @@ -17,18 +17,21 @@ use std::sync::Arc; use clap::Parser; use futures::never::Never; -use logging::log; use tokio::sync::{mpsc, oneshot}; use common::{primitives::user_agent::UserAgent, time_getter::TimeGetter}; use config::DnsServerRunOptions; use crawler_p2p::crawler_manager::{CrawlerManager, CrawlerManagerConfig}; +use logging::log; use p2p::{ config::{NodeType, P2pConfig}, net::NetworkingService, }; -use utils::atomics::SeqCstAtomicBool; -use utils::default_data_dir::{default_data_dir_for_chain, prepare_data_dir}; +use utils::{ + atomics::SeqCstAtomicBool, + default_data_dir::{default_data_dir_for_chain, prepare_data_dir}, + tokio_spawn, +}; use crate::{ crawler_p2p::{crawler::CrawlerConfig, crawler_manager::storage::open_storage}, @@ -170,8 +173,9 @@ async fn run(options: DnsServerRunOptions) -> anyhow::Result { let server = dns_server::DnsServer::new(config, chain_config, dns_server_cmd_rx).await?; // Spawn for better parallelism - let crawler_manager_task = tokio::spawn(async move { crawler_manager.run().await }); - let server_task = tokio::spawn(server.run()); + let crawler_manager_task = + tokio_spawn(async move { crawler_manager.run().await }, "Crawler mgr"); + let server_task = tokio_spawn(server.run(), "Server"); tokio::select! { res = crawler_manager_task => { diff --git a/logging/Cargo.toml b/logging/Cargo.toml index f9514a9d6c..04dd2c7031 100644 --- a/logging/Cargo.toml +++ b/logging/Cargo.toml @@ -8,8 +8,11 @@ rust-version.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +console-subscriber = { workspace = true, optional = true } log.workspace = true thiserror.workspace = true -tokio = { workspace = true, default-features = false, features = ["rt", "sync"] } tracing.workspace = true tracing-subscriber.workspace = true + +[features] +tokio-console = ["dep:console-subscriber"] diff --git a/logging/src/lib.rs b/logging/src/lib.rs index 3d768d9f69..8cda313c02 100644 --- a/logging/src/lib.rs +++ b/logging/src/lib.rs @@ -14,7 +14,6 @@ // limitations under the License. mod log_style; -mod tracing_utils; mod utils; use std::{ @@ -31,7 +30,6 @@ use log_style::{get_log_style_from_env, LogStyleParseError}; pub use log; pub use log_style::{LogStyle, TextColoring}; -pub use tracing_utils::{spawn_in_current_span, spawn_in_span}; pub use utils::{get_from_env, GetFromEnvError, ValueOrEnvVar}; /// Send log output to the terminal. @@ -100,10 +98,12 @@ pub fn init_logging_generic( let mut errors = Vec::new(); let main_layer = make_layer(main_writer_settings, &mut errors); let aux_layer = aux_writer_settings.map(|settings| make_layer(settings, &mut errors)); + let tokio_console_layer = make_tokio_console_layer(); Registry::default() .with(main_layer) .with(aux_layer) + .with(tokio_console_layer) // This basically calls tracing::subscriber::set_global_default on self and then // initializes a 'log' compatibility layer, so that 'log' macros continue to work // (this requires the "tracing-log" feature to be enabled, but it is enabled by default). @@ -120,6 +120,22 @@ pub fn no_writer_settings() -> Option() -> Option + Send + Sync>> +where + S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>, +{ + // Note: `spawn` calls `Builder::with_default_env`, which reads config values (such as the bind + // address) from a number of env vars, see: + // https://github.com/tokio-rs/console/blob/console-subscriber-v0.5.0/console-subscriber/src/builder.rs#L314-L321 + // Also note: if we ever decide to enable tokio-console support permanently (and switch it on/off via command + // line arguments), then this env-based configuration should better be disabled. + #[cfg(feature = "tokio-console")] + return Some(console_subscriber::spawn().boxed()); + + #[cfg(not(feature = "tokio-console"))] + return None; +} + fn make_layer( writer_settings: WriterSettings, errors: &mut Vec, diff --git a/logging/src/tracing_utils.rs b/logging/src/tracing_utils.rs deleted file mode 100644 index 8990cc5be0..0000000000 --- a/logging/src/tracing_utils.rs +++ /dev/null @@ -1,35 +0,0 @@ -// Copyright (c) 2021-2023 RBB S.r.l -// opensource@mintlayer.org -// SPDX-License-Identifier: MIT -// Licensed under the MIT License; -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://github.com/mintlayer/mintlayer-core/blob/master/LICENSE -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::future::Future; - -use tokio::task::JoinHandle; -use tracing::{Instrument, Span}; - -pub fn spawn_in_current_span(future: F) -> JoinHandle -where - F: Future + Send + 'static, - F::Output: Send + 'static, -{ - tokio::spawn(future.in_current_span()) -} - -pub fn spawn_in_span(future: F, span: Span) -> JoinHandle -where - F: Future + Send + 'static, - F::Output: Send + 'static, -{ - tokio::spawn(future.instrument(span)) -} diff --git a/networking/src/transport/impls/stream_adapter/wrapped_transport/tests.rs b/networking/src/transport/impls/stream_adapter/wrapped_transport/tests.rs index c010d527e5..89aaf5dc47 100644 --- a/networking/src/transport/impls/stream_adapter/wrapped_transport/tests.rs +++ b/networking/src/transport/impls/stream_adapter/wrapped_transport/tests.rs @@ -30,6 +30,7 @@ use test_utils::{ random::{gen_random_bytes, Seed}, BasicTestTimeGetter, }; +use utils::tokio_spawn_in_current_tracing_span; use crate::{ test_helpers::{TestTransportChannel, TestTransportMaker, TestTransportTcp}, @@ -271,11 +272,14 @@ async fn pending_handshakes() { let mut server = transport.bind(vec![TestTransportTcp::make_address()]).await.unwrap(); let local_addr = server.local_addresses().unwrap(); - let join_handle = logging::spawn_in_current_span(async move { - loop { - _ = server.accept().await; - } - }); + let join_handle = tokio_spawn_in_current_tracing_span( + async move { + loop { + _ = server.accept().await; + } + }, + "", + ); // Connect MAX_CONCURRENT_HANDSHAKES amount of idle clients let mut sockets = futures::stream::iter(0..MAX_CONCURRENT_HANDSHAKES) @@ -316,11 +320,14 @@ async fn handshake_timeout() { let mut server = transport.bind(vec![TestTransportTcp::make_address()]).await.unwrap(); let local_addr = server.local_addresses().unwrap(); - let join_handle = logging::spawn_in_current_span(async move { - loop { - _ = server.accept().await; - } - }); + let join_handle = tokio_spawn_in_current_tracing_span( + async move { + loop { + _ = server.accept().await; + } + }, + "", + ); let mut bad_client = tokio::net::TcpStream::connect(local_addr[0]).await.unwrap(); for _ in 0..30 { diff --git a/node-gui/backend/src/backend_impl.rs b/node-gui/backend/src/backend_impl.rs index d2d1f0709d..62d3997272 100644 --- a/node-gui/backend/src/backend_impl.rs +++ b/node-gui/backend/src/backend_impl.rs @@ -31,6 +31,7 @@ use logging::log; use node_comm::rpc_client::ColdWalletClient; use node_lib::node_controller::NodeController; use serialization::hex_encoded::HexEncoded; +use utils::tokio_spawn; use wallet::{account::transaction_list::TransactionList, wallet::Error, WalletError}; use wallet_cli_commands::{ get_repl_command, parse_input, CommandHandler, ConsoleCommand, ManageableWalletCommand, @@ -448,14 +449,17 @@ impl Backend { return Err(BackendError::MultipleTrezorDevicesFound(found_devices)) } } - tokio::spawn(forward_events( - wallet_events, - wallet_service - .handle() - .subscribe() - .await - .map_err(|e| BackendError::WalletError(e.to_string()))?, - )); + tokio_spawn( + forward_events( + wallet_events, + wallet_service + .handle() + .subscribe() + .await + .map_err(|e| BackendError::WalletError(e.to_string()))?, + ), + "Forward events (create_wallet)", + ); let command_handler = CommandHandler::new( ControllerConfig { in_top_x_mb: IN_TOP_X_MB, @@ -669,14 +673,17 @@ impl Backend { return Err(BackendError::MultipleTrezorDevicesFound(found_devices)) } } - tokio::spawn(forward_events( - wallet_events, - wallet_service - .handle() - .subscribe() - .await - .map_err(|e| BackendError::WalletError(e.to_string()))?, - )); + tokio_spawn( + forward_events( + wallet_events, + wallet_service + .handle() + .subscribe() + .await + .map_err(|e| BackendError::WalletError(e.to_string()))?, + ), + "Forward events (open_wallet)", + ); let command_handler = CommandHandler::new( ControllerConfig { in_top_x_mb: IN_TOP_X_MB, diff --git a/node-gui/backend/src/lib.rs b/node-gui/backend/src/lib.rs index da4450f4be..5409a1b977 100644 --- a/node-gui/backend/src/lib.rs +++ b/node-gui/backend/src/lib.rs @@ -35,6 +35,7 @@ use common::{ }; use logging::log; use node_lib::OptionsWithResolvedCommand; +use utils::tokio_spawn; use crate::{chainstate_event_handler::ChainstateEventHandler, p2p_event_handler::P2pEventHandler}; @@ -150,7 +151,8 @@ pub async fn node_initialize( let controller = node.controller().clone(); - let manager_join_handle = tokio::spawn(async move { node.main().await }); + let manager_join_handle = + tokio_spawn(async move { node.main().await }, "Node subsystem mgr"); // Subscribe to chainstate before getting the current chain_info! let chainstate_event_handler = @@ -172,16 +174,19 @@ pub async fn node_initialize( manager_join_handle, ); - tokio::spawn(async move { - backend_impl::run( - backend, - request_rx, - wallet_updated_rx, - chainstate_event_handler, - p2p_event_handler, - ) - .await; - }); + tokio_spawn( + async move { + backend_impl::run( + backend, + request_rx, + wallet_updated_rx, + chainstate_event_handler, + p2p_event_handler, + ) + .await; + }, + "NodeGUI backend", + ); (chain_config, chain_info) } WalletMode::Cold => spawn_cold_backend( @@ -228,7 +233,7 @@ fn spawn_cold_backend( is_initial_block_download: false, }; - let manager_join_handle = tokio::spawn(async move {}); + let manager_join_handle = tokio_spawn(async move {}, "Fake node subsystem mgr"); let backend = backend_impl::Backend::new_cold( chain_config.clone(), @@ -238,9 +243,12 @@ fn spawn_cold_backend( manager_join_handle, ); - tokio::spawn(async move { - backend_impl::run_cold(backend, request_rx, wallet_updated_rx).await; - }); + tokio_spawn( + async move { + backend_impl::run_cold(backend, request_rx, wallet_updated_rx).await; + }, + "NodeGUI cold backend", + ); Ok((chain_config, chain_info)) } diff --git a/node-lib/src/lib.rs b/node-lib/src/lib.rs index 1250332c78..a76c3f9217 100644 --- a/node-lib/src/lib.rs +++ b/node-lib/src/lib.rs @@ -41,10 +41,6 @@ pub fn default_rpc_config(chain_config: &ChainConfig) -> RpcConfigFile { ) } -pub fn init_logging(_opts: &Options) { - logging::init_logging() -} - #[cfg(test)] mod tests { #[ctor::ctor] diff --git a/node-lib/src/rpc.rs b/node-lib/src/rpc.rs index 461f66a78b..6e3cbc0884 100644 --- a/node-lib/src/rpc.rs +++ b/node-lib/src/rpc.rs @@ -20,6 +20,7 @@ use std::{sync::Arc, time::Duration}; use chainstate_launcher::ChainConfig; use rpc::{description::Described, handle_result, RpcResult}; use subsystem::ShutdownTrigger; +use utils::tokio_spawn; /// RPC methods controlling the node. #[rpc::describe] @@ -63,10 +64,13 @@ impl NodeRpcServer for NodeRpc { // TODO: This is supposedly fixed in jsonrpsee 0.17.1: https://github.com/paritytech/jsonrpsee/releases/tag/v0.17.1 // See if we can remove this workaround since we're using that version now. let shutdown_trigger = self.shutdown_trigger.clone(); - tokio::spawn(async move { - tokio::time::sleep(Duration::from_millis(100)).await; - shutdown_trigger.initiate(); - }); + tokio_spawn( + async move { + tokio::time::sleep(Duration::from_millis(100)).await; + shutdown_trigger.initiate(); + }, + "NodeRpc shutdown trigger", + ); Ok(()) } diff --git a/p2p/backend-test-suite/src/ban.rs b/p2p/backend-test-suite/src/ban.rs index 2798d6be03..a1c8d81076 100644 --- a/p2p/backend-test-suite/src/ban.rs +++ b/p2p/backend-test-suite/src/ban.rs @@ -35,7 +35,7 @@ use p2p::{ test_helpers::{connect_and_accept_services, test_p2p_config}, PeerManagerEvent, }; -use utils::atomics::SeqCstAtomicBool; +use utils::{atomics::SeqCstAtomicBool, tokio_spawn_in_current_tracing_span}; tests![invalid_pubsub_block,]; @@ -117,37 +117,40 @@ where ) .unwrap(); - let sync1_handle = logging::spawn_in_current_span(async move { sync1.run().await }); + let sync1_handle = tokio_spawn_in_current_tracing_span(async move { sync1.run().await }, ""); // spawn `sync2` into background and spam an orphan block on the network - logging::spawn_in_current_span(async move { - let (peer, mut block_sync_msg_receiver) = match sync2.poll_next().await.unwrap() { - SyncingEvent::Connected { - peer_id, - common_services: _, - protocol_version: _, - block_sync_msg_receiver, - transaction_sync_msg_receiver: _, - } => (peer_id, block_sync_msg_receiver), - e => panic!("Unexpected event type: {e:?}"), - }; - match block_sync_msg_receiver.recv().await.unwrap() { - BlockSyncMessage::HeaderListRequest(_) => {} - e => panic!("Unexpected event type: {e:?}"), - }; - messaging_handle_2 - .send_block_sync_message( - peer, - BlockSyncMessage::HeaderList(HeaderList::new(Vec::new())), - ) - .unwrap(); - messaging_handle_2 - .send_block_sync_message( - peer, - BlockSyncMessage::HeaderList(HeaderList::new(vec![block.header().clone()])), - ) - .unwrap(); - }); + tokio_spawn_in_current_tracing_span( + async move { + let (peer, mut block_sync_msg_receiver) = match sync2.poll_next().await.unwrap() { + SyncingEvent::Connected { + peer_id, + common_services: _, + protocol_version: _, + block_sync_msg_receiver, + transaction_sync_msg_receiver: _, + } => (peer_id, block_sync_msg_receiver), + e => panic!("Unexpected event type: {e:?}"), + }; + match block_sync_msg_receiver.recv().await.unwrap() { + BlockSyncMessage::HeaderListRequest(_) => {} + e => panic!("Unexpected event type: {e:?}"), + }; + messaging_handle_2 + .send_block_sync_message( + peer, + BlockSyncMessage::HeaderList(HeaderList::new(Vec::new())), + ) + .unwrap(); + messaging_handle_2 + .send_block_sync_message( + peer, + BlockSyncMessage::HeaderList(HeaderList::new(vec![block.header().clone()])), + ) + .unwrap(); + }, + "", + ); match peer_mgr_event_receiver.recv().await { Some(PeerManagerEvent::AdjustPeerScore { diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index 2274d87e58..6538d4c853 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -43,8 +43,7 @@ use tokio::{ task::JoinHandle, }; -use ::utils::atomics::SeqCstAtomicBool; -use ::utils::ensure; +use ::utils::{atomics::SeqCstAtomicBool, ensure, tokio_spawn_in_current_tracing_span}; use common::{ chain::{config::ChainType, ChainConfig}, primitives::BlockHeight, @@ -158,19 +157,22 @@ where peerdb_storage, )?; let shutdown_ = Arc::clone(&shutdown); - let peer_manager_task = logging::spawn_in_current_span(async move { - match peer_manager.run().await { - Ok(never) => match never {}, - // The channel can be closed during the shutdown process. - Err(P2pError::ChannelClosed) if shutdown_.load() => { - log::info!("Peer manager is shut down"); + let peer_manager_task = tokio_spawn_in_current_tracing_span( + async move { + match peer_manager.run().await { + Ok(never) => match never {}, + // The channel can be closed during the shutdown process. + Err(P2pError::ChannelClosed) if shutdown_.load() => { + log::info!("Peer manager is shut down"); + } + Err(e) => { + shutdown_.store(true); + log::error!("Peer manager failed: {e:?}"); + } } - Err(e) => { - shutdown_.store(true); - log::error!("Peer manager failed: {e:?}"); - } - } - }); + }, + "P2p peer mgr", + ); let sync_manager = sync::SyncManager::::new( chain_config, @@ -183,19 +185,22 @@ where time_getter, ); let shutdown_ = Arc::clone(&shutdown); - let sync_manager_task = logging::spawn_in_current_span(async move { - match sync_manager.run().await { - Ok(never) => match never {}, - // The channel can be closed during the shutdown process. - Err(P2pError::ChannelClosed) if shutdown_.load() => { - log::info!("Sync manager is shut down"); + let sync_manager_task = tokio_spawn_in_current_tracing_span( + async move { + match sync_manager.run().await { + Ok(never) => match never {}, + // The channel can be closed during the shutdown process. + Err(P2pError::ChannelClosed) if shutdown_.load() => { + log::info!("Sync manager is shut down"); + } + Err(e) => { + shutdown_.store(true); + log::error!("Sync manager failed: {e:?}"); + } } - Err(e) => { - shutdown_.store(true); - log::error!("Sync manager failed: {e:?}"); - } - } - }); + }, + "P2p sync mgr", + ); Ok(Self { peer_mgr_event_sender, diff --git a/p2p/src/net/default_backend/backend.rs b/p2p/src/net/default_backend/backend.rs index 320a6d91c7..852d1a04ab 100644 --- a/p2p/src/net/default_backend/backend.rs +++ b/p2p/src/net/default_backend/backend.rs @@ -37,7 +37,7 @@ use p2p_types::socket_address::SocketAddress; use randomness::{make_pseudo_rng, Rng}; use utils::{ atomics::SeqCstAtomicBool, eventhandler::EventsController, set_flag::SetFlag, - shallow_clone::ShallowClone, + shallow_clone::ShallowClone, tokio_spawn_in_current_tracing_span, }; use crate::{ @@ -404,13 +404,16 @@ where self.time_getter.shallow_clone(), ); let shutdown = Arc::clone(&self.shutdown); - let handle = logging::spawn_in_current_span(async move { - match peer.run().await { - Ok(()) => {} - Err(P2pError::ChannelClosed) if shutdown.load() => {} - Err(e) => log::error!("Peer {peer_id} failed: {e}"), - } - }); + let handle = tokio_spawn_in_current_tracing_span( + async move { + match peer.run().await { + Ok(()) => {} + Err(P2pError::ChannelClosed) if shutdown.load() => {} + Err(e) => log::error!("Peer {peer_id} failed: {e}"), + } + }, + &format!("Peer[id={peer_id}]"), + ); self.pending_peers.insert( peer_id, diff --git a/p2p/src/net/default_backend/default_networking_service.rs b/p2p/src/net/default_backend/default_networking_service.rs index 234bb53cd4..5eb88aa9aa 100644 --- a/p2p/src/net/default_backend/default_networking_service.rs +++ b/p2p/src/net/default_backend/default_networking_service.rs @@ -25,7 +25,7 @@ use common::time_getter::TimeGetter; use logging::log; use networking::transport::{TransportListener, TransportSocket}; use p2p_types::socket_address::SocketAddress; -use utils::atomics::SeqCstAtomicBool; +use utils::{atomics::SeqCstAtomicBool, tokio_spawn_in_tracing_span}; use crate::{ error::P2pError, @@ -96,7 +96,7 @@ impl DefaultNetworkingService { subscribers_receiver, protocol_version, ); - let backend_task = logging::spawn_in_span( + let backend_task = tokio_spawn_in_tracing_span( async move { match backend.run().await { Ok(never) => match never {}, @@ -110,6 +110,7 @@ impl DefaultNetworkingService { } }, tracing_span, + "P2p backend", ); Ok(( diff --git a/p2p/src/net/default_backend/peer.rs b/p2p/src/net/default_backend/peer.rs index 14ef4f379e..5cac02ec25 100644 --- a/p2p/src/net/default_backend/peer.rs +++ b/p2p/src/net/default_backend/peer.rs @@ -557,7 +557,7 @@ mod tests { assert_matches, mock_time_getter::{mocked_time_getter_milliseconds, mocked_time_getter_seconds}, }; - use utils::atomics::SeqCstAtomicU64; + use utils::{atomics::SeqCstAtomicU64, tokio_spawn_in_current_tracing_span}; use super::*; use crate::{ @@ -631,10 +631,13 @@ mod tests { time_getter, ); - let handle = logging::spawn_in_current_span(async move { - peer.handshake().await.unwrap(); - peer - }); + let handle = tokio_spawn_in_current_tracing_span( + async move { + peer.handshake().await.unwrap(); + peer + }, + "", + ); let mut socket2 = BufferedTranscoder::new(socket2, Some(*p2p_config.protocol_config.max_message_size)); @@ -717,10 +720,13 @@ mod tests { time_getter, ); - let handle = logging::spawn_in_current_span(async move { - peer.handshake().await.unwrap(); - peer - }); + let handle = tokio_spawn_in_current_tracing_span( + async move { + peer.handshake().await.unwrap(); + peer + }, + "", + ); let mut socket2 = BufferedTranscoder::new(socket2, Some(*p2p_config.protocol_config.max_message_size)); @@ -798,7 +804,7 @@ mod tests { time_getter, ); - let handle = logging::spawn_in_current_span(async move { peer.handshake().await }); + let handle = tokio_spawn_in_current_tracing_span(async move { peer.handshake().await }, ""); let mut socket2 = BufferedTranscoder::new(socket2, Some(*p2p_config.protocol_config.max_message_size)); @@ -866,7 +872,7 @@ mod tests { time_getter, ); - let handle = logging::spawn_in_current_span(async move { peer.handshake().await }); + let handle = tokio_spawn_in_current_tracing_span(async move { peer.handshake().await }, ""); let mut socket2 = BufferedTranscoder::new(socket2, Some(*p2p_config.protocol_config.max_message_size)); @@ -987,7 +993,8 @@ mod tests { peer_time_getter, ); - let handle = logging::spawn_in_current_span(async move { peer.run_handshake().await }); + let handle = + tokio_spawn_in_current_tracing_span(async move { peer.run_handshake().await }, ""); // Advance both peer clocks and tokio time by given delay in 200ms increments to simulate // the flow of time. Doing this in one step makes the test result sensitive to the runtime diff --git a/p2p/src/net/default_backend/tests.rs b/p2p/src/net/default_backend/tests.rs index 1287a4511f..f47cb89759 100644 --- a/p2p/src/net/default_backend/tests.rs +++ b/p2p/src/net/default_backend/tests.rs @@ -35,7 +35,7 @@ use networking::{ }; use p2p_test_utils::run_with_timeout; use test_utils::assert_matches_return_val; -use utils::atomics::SeqCstAtomicBool; +use utils::{atomics::SeqCstAtomicBool, tokio_spawn_in_current_tracing_span}; use crate::{ config::NodeType, @@ -445,18 +445,21 @@ where let transport = A::make_transport(); let mut listener = transport.bind(vec![A::make_address()]).await.unwrap(); let addr = listener.local_addresses().unwrap(); - let _peer_socket_join_handle = logging::spawn_in_current_span(async move { - let (mut peer_socket, _address) = listener.accept().await.unwrap(); - let _ = peer_socket.write_all(b"invalid message").await; - // Return the socket to make sure it lives to the end of the test. - // This is mainly needed in the case of MpscChannelTransport, where a connection - // is just a pair of tokio::io::DuplexStream's, one of which is held inside - // the socket. Once the socket is dropped, any attempts to write to the stream - // on the other side will fail immediately. I.e. the handshake will likely fail - // when the node tries to send Hello, so it won't even see the "invalid message", - // which it's supposed to receive instead of HelloAck. - peer_socket - }); + let _peer_socket_join_handle = tokio_spawn_in_current_tracing_span( + async move { + let (mut peer_socket, _address) = listener.accept().await.unwrap(); + let _ = peer_socket.write_all(b"invalid message").await; + // Return the socket to make sure it lives to the end of the test. + // This is mainly needed in the case of MpscChannelTransport, where a connection + // is just a pair of tokio::io::DuplexStream's, one of which is held inside + // the socket. Once the socket is dropped, any attempts to write to the stream + // on the other side will fail immediately. I.e. the handshake will likely fail + // when the node tries to send Hello, so it won't even see the "invalid message", + // which it's supposed to receive instead of HelloAck. + peer_socket + }, + "", + ); let config = Arc::new(common::chain::config::create_unit_test_config()); let p2p_config = Arc::new(test_p2p_config()); diff --git a/p2p/src/peer_manager/tests/addresses.rs b/p2p/src/peer_manager/tests/addresses.rs index 0e50fce86a..395b5ca801 100644 --- a/p2p/src/peer_manager/tests/addresses.rs +++ b/p2p/src/peer_manager/tests/addresses.rs @@ -44,6 +44,7 @@ use test_utils::{ random::{make_seedable_rng, Seed}, BasicTestTimeGetter, }; +use utils::tokio_spawn_in_current_tracing_span; use crate::{ config::{NodeType, P2pConfig}, @@ -434,7 +435,10 @@ async fn resend_own_addresses(#[case] seed: Seed) { assert_eq!(pm.peers.len(), peer_count); let (started_sender, started_receiver) = oneshot_nofail::channel(); - logging::spawn_in_current_span(async move { pm.run_internal(Some(started_sender)).await }); + tokio_spawn_in_current_tracing_span( + async move { pm.run_internal(Some(started_sender)).await }, + "", + ); started_receiver.await.unwrap(); // Flush all pending messages @@ -510,11 +514,14 @@ async fn connect_to_predefined_address_if_dns_seed_is_empty(#[case] seed: Seed) ) .unwrap(); - let peer_mgr_join_handle = logging::spawn_in_current_span(async move { - let mut peer_mgr = peer_mgr; - let _ = peer_mgr.run_internal(None).await; - peer_mgr - }); + let peer_mgr_join_handle = tokio_spawn_in_current_tracing_span( + async move { + let mut peer_mgr = peer_mgr; + let _ = peer_mgr.run_internal(None).await; + peer_mgr + }, + "", + ); // Connection to predefined_peer_address is requested let cmd = @@ -577,11 +584,14 @@ async fn dont_connect_to_predefined_address_if_dns_seed_is_non_empty(#[case] see ) .unwrap(); - let peer_mgr_join_handle = logging::spawn_in_current_span(async move { - let mut peer_mgr = peer_mgr; - let _ = peer_mgr.run_internal(None).await; - peer_mgr - }); + let peer_mgr_join_handle = tokio_spawn_in_current_tracing_span( + async move { + let mut peer_mgr = peer_mgr; + let _ = peer_mgr.run_internal(None).await; + peer_mgr + }, + "", + ); // Connection to seeded_peer_address is requested let cmd = @@ -647,11 +657,14 @@ async fn connect_to_predefined_address_if_dns_seed_returned_bogus_address(#[case ) .unwrap(); - let peer_mgr_join_handle = logging::spawn_in_current_span(async move { - let mut peer_mgr = peer_mgr; - let _ = peer_mgr.run_internal(None).await; - peer_mgr - }); + let peer_mgr_join_handle = tokio_spawn_in_current_tracing_span( + async move { + let mut peer_mgr = peer_mgr; + let _ = peer_mgr.run_internal(None).await; + peer_mgr + }, + "", + ); // Connection to seeded_peer_address is requested; make it fail. let cmd = @@ -748,11 +761,14 @@ async fn dont_use_dns_seed_if_connections_exist(#[case] seed: Seed) { peer_mgr.peerdb.peer_discovered(existing_address); - let peer_mgr_join_handle = logging::spawn_in_current_span(async move { - let mut peer_mgr = peer_mgr; - let _ = peer_mgr.run_internal(None).await; - peer_mgr - }); + let peer_mgr_join_handle = tokio_spawn_in_current_tracing_span( + async move { + let mut peer_mgr = peer_mgr; + let _ = peer_mgr.run_internal(None).await; + peer_mgr + }, + "", + ); // Connection to existing_address is requested let cmd = diff --git a/p2p/src/peer_manager/tests/ban.rs b/p2p/src/peer_manager/tests/ban.rs index edfd7bfeb2..8d04330405 100644 --- a/p2p/src/peer_manager/tests/ban.rs +++ b/p2p/src/peer_manager/tests/ban.rs @@ -24,6 +24,7 @@ use test_utils::{ random::{make_seedable_rng, Seed}, BasicTestTimeGetter, }; +use utils::tokio_spawn_in_current_tracing_span; use crate::{ ban_config::BanConfig, @@ -90,11 +91,14 @@ async fn dont_auto_ban_connected_peer(#[case] seed: Seed) { time_getter.get_time_getter(), ); - let peer_mgr_join_handle = logging::spawn_in_current_span(async move { - let mut peer_mgr = peer_mgr; - let _ = peer_mgr.run_internal(None).await; - peer_mgr - }); + let peer_mgr_join_handle = tokio_spawn_in_current_tracing_span( + async move { + let mut peer_mgr = peer_mgr; + let _ = peer_mgr.run_internal(None).await; + peer_mgr + }, + "", + ); let peer_addr = TestAddressMaker::new_random_address(&mut rng).into(); let peer_id = inbound_block_relay_peer_accepted_by_backend( @@ -155,11 +159,14 @@ async fn disconnect_manually_banned_peer(#[case] seed: Seed) { time_getter.get_time_getter(), ); - let peer_mgr_join_handle = logging::spawn_in_current_span(async move { - let mut peer_mgr = peer_mgr; - let _ = peer_mgr.run_internal(None).await; - peer_mgr - }); + let peer_mgr_join_handle = tokio_spawn_in_current_tracing_span( + async move { + let mut peer_mgr = peer_mgr; + let _ = peer_mgr.run_internal(None).await; + peer_mgr + }, + "", + ); let peer_addr = TestAddressMaker::new_random_address(&mut rng).into(); let peer_id = inbound_block_relay_peer_accepted_by_backend( @@ -251,11 +258,14 @@ async fn reject_incoming_connection_from_banned_peer(#[case] seed: Seed) { peer_mgr.ban(banned_addr.as_bannable(), ban_duration); - let peer_mgr_join_handle = logging::spawn_in_current_span(async move { - let mut peer_mgr = peer_mgr; - let _ = peer_mgr.run_internal(None).await; - peer_mgr - }); + let peer_mgr_join_handle = tokio_spawn_in_current_tracing_span( + async move { + let mut peer_mgr = peer_mgr; + let _ = peer_mgr.run_internal(None).await; + peer_mgr + }, + "", + ); // Connection from the banned peer is rejected. let peer1_id = inbound_block_relay_peer_accepted_by_backend( @@ -345,11 +355,14 @@ async fn no_outgoing_connection_to_banned_peer(#[case] seed: Seed) { peer_mgr.ban(banned_addr.as_bannable(), ban_duration); - let peer_mgr_join_handle = logging::spawn_in_current_span(async move { - let mut peer_mgr = peer_mgr; - let _ = peer_mgr.run_internal(None).await; - peer_mgr - }); + let peer_mgr_join_handle = tokio_spawn_in_current_tracing_span( + async move { + let mut peer_mgr = peer_mgr; + let _ = peer_mgr.run_internal(None).await; + peer_mgr + }, + "", + ); // Connection to the normal peer is established. let cmd = expect_recv!(cmd_receiver); @@ -426,11 +439,14 @@ async fn banned_address_is_not_announced(#[case] seed: Seed) { peer_mgr.ban(banned_addr.as_bannable(), ban_duration); - let peer_mgr_join_handle = logging::spawn_in_current_span(async move { - let mut peer_mgr = peer_mgr; - let _ = peer_mgr.run_internal(None).await; - peer_mgr - }); + let peer_mgr_join_handle = tokio_spawn_in_current_tracing_span( + async move { + let mut peer_mgr = peer_mgr; + let _ = peer_mgr.run_internal(None).await; + peer_mgr + }, + "", + ); let peer1_id = inbound_full_relay_peer_accepted_by_backend( &conn_event_sender, @@ -550,11 +566,14 @@ async fn banned_address_not_in_addr_response(#[case] seed: Seed) { peer_mgr.ban(banned_addr.as_bannable(), ban_duration); - let peer_mgr_join_handle = logging::spawn_in_current_span(async move { - let mut peer_mgr = peer_mgr; - let _ = peer_mgr.run_internal(None).await; - peer_mgr - }); + let peer_mgr_join_handle = tokio_spawn_in_current_tracing_span( + async move { + let mut peer_mgr = peer_mgr; + let _ = peer_mgr.run_internal(None).await; + peer_mgr + }, + "", + ); let peer_id = inbound_full_relay_peer_accepted_by_backend( &conn_event_sender, diff --git a/p2p/src/peer_manager/tests/connections.rs b/p2p/src/peer_manager/tests/connections.rs index c6627886fc..0f6c7f6d44 100644 --- a/p2p/src/peer_manager/tests/connections.rs +++ b/p2p/src/peer_manager/tests/connections.rs @@ -43,7 +43,7 @@ use test_utils::{ random::{make_seedable_rng, Seed}, BasicTestTimeGetter, }; -use utils::atomics::SeqCstAtomicBool; +use utils::{atomics::SeqCstAtomicBool, tokio_spawn_in_current_tracing_span}; use utils_networking::IpOrSocketAddress; use crate::{ @@ -196,7 +196,7 @@ where // run the first peer manager in the background and poll events from the peer manager // that tries to connect to the first manager - logging::spawn_in_current_span(async move { pm1.run().await }); + tokio_spawn_in_current_tracing_span(async move { pm1.run().await }, ""); let event = get_connectivity_event::(&mut pm2.peer_connectivity_handle).await; match event { @@ -309,11 +309,14 @@ where let addr = pm2.peer_connectivity_handle.local_addresses()[0]; - logging::spawn_in_current_span(async move { - loop { - assert!(pm2.peer_connectivity_handle.poll_next().await.is_ok()); - } - }); + tokio_spawn_in_current_tracing_span( + async move { + loop { + assert!(pm2.peer_connectivity_handle.poll_next().await.is_ok()); + } + }, + "", + ); // "discover" the other networking service pm1.peerdb.peer_discovered(addr); @@ -662,7 +665,7 @@ where // run the first peer manager in the background and poll events from the peer manager // that tries to connect to the first manager - logging::spawn_in_current_span(async move { pm1.run().await }); + tokio_spawn_in_current_tracing_span(async move { pm1.run().await }, ""); let event = get_connectivity_event::(&mut pm2.peer_connectivity_handle).await; if let Ok(net::types::ConnectivityEvent::ConnectionClosed { peer_id }) = event { @@ -888,13 +891,14 @@ async fn connection_timeout_rpc_notified( ) .unwrap(); - logging::spawn_in_current_span( + tokio_spawn_in_current_tracing_span( // Rust 1.92 thinks that the unwrap call here is unreachable, even though the function // returns a normal error. #[allow(unreachable_code)] async move { peer_manager.run().await.unwrap(); }, + "", ); let (response_sender, response_receiver) = oneshot_nofail::channel(); @@ -1698,11 +1702,14 @@ async fn feeler_connections_test_impl(seed: Seed) { let peerdb_tried_addresses = tried_addr_table_as_set(&peer_mgr.peerdb); assert!(peerdb_tried_addresses.is_empty()); - let peer_mgr_join_handle = logging::spawn_in_current_span(async move { - let mut peer_mgr = peer_mgr; - let _ = peer_mgr.run_internal(None).await; - peer_mgr - }); + let peer_mgr_join_handle = tokio_spawn_in_current_tracing_span( + async move { + let mut peer_mgr = peer_mgr; + let _ = peer_mgr.run_internal(None).await; + peer_mgr + }, + "", + ); let mut successful_conn_addresses = BTreeSet::new(); let mut unsuccessful_conn_addresses = BTreeSet::new(); @@ -2023,11 +2030,14 @@ async fn reject_connection_to_existing_ip(#[case] seed: Seed) { SocketAddress::new(socket_addr) }; - let peer_mgr_join_handle = logging::spawn_in_current_span(async move { - let mut peer_mgr = peer_mgr; - let _ = peer_mgr.run_internal(None).await; - peer_mgr - }); + let peer_mgr_join_handle = tokio_spawn_in_current_tracing_span( + async move { + let mut peer_mgr = peer_mgr; + let _ = peer_mgr.run_internal(None).await; + peer_mgr + }, + "", + ); // Accept an inbound connection. let peer1_id = inbound_block_relay_peer_accepted_by_backend( @@ -2163,11 +2173,14 @@ async fn feeler_connection_to_ip_address_of_inbound_peer(#[case] seed: Seed) { SocketAddress::new(peer_addr) }; - let peer_mgr_join_handle = logging::spawn_in_current_span(async move { - let mut peer_mgr = peer_mgr; - let _ = peer_mgr.run_internal(None).await; - peer_mgr - }); + let peer_mgr_join_handle = tokio_spawn_in_current_tracing_span( + async move { + let mut peer_mgr = peer_mgr; + let _ = peer_mgr.run_internal(None).await; + peer_mgr + }, + "", + ); // Accept an inbound connection. let inbound_peer_id = inbound_block_relay_peer_accepted_by_backend( diff --git a/p2p/src/peer_manager/tests/discouragement.rs b/p2p/src/peer_manager/tests/discouragement.rs index a4d4917b2f..ae95f07a18 100644 --- a/p2p/src/peer_manager/tests/discouragement.rs +++ b/p2p/src/peer_manager/tests/discouragement.rs @@ -24,6 +24,7 @@ use test_utils::{ random::{make_seedable_rng, Seed}, BasicTestTimeGetter, }; +use utils::tokio_spawn_in_current_tracing_span; use crate::{ ban_config::BanConfig, @@ -90,11 +91,14 @@ async fn discourage_connected_peer(#[case] seed: Seed) { time_getter.get_time_getter(), ); - let peer_mgr_join_handle = logging::spawn_in_current_span(async move { - let mut peer_mgr = peer_mgr; - let _ = peer_mgr.run_internal(None).await; - peer_mgr - }); + let peer_mgr_join_handle = tokio_spawn_in_current_tracing_span( + async move { + let mut peer_mgr = peer_mgr; + let _ = peer_mgr.run_internal(None).await; + peer_mgr + }, + "", + ); let peer_addr = TestAddressMaker::new_random_address(&mut rng).into(); let peer_id = inbound_block_relay_peer_accepted_by_backend( @@ -218,11 +222,14 @@ async fn dont_reject_incoming_connection_from_discouraged_peer_if_limit_not_reac peer_mgr.discourage(peer_addr.as_bannable()); - let peer_mgr_join_handle = logging::spawn_in_current_span(async move { - let mut peer_mgr = peer_mgr; - let _ = peer_mgr.run_internal(None).await; - peer_mgr - }); + let peer_mgr_join_handle = tokio_spawn_in_current_tracing_span( + async move { + let mut peer_mgr = peer_mgr; + let _ = peer_mgr.run_internal(None).await; + peer_mgr + }, + "", + ); // Connection from the discouraged peer is accepted. let peer_id = inbound_block_relay_peer_accepted_by_backend( @@ -295,11 +302,14 @@ async fn reject_incoming_connection_from_discouraged_peer_if_limit_reached(#[cas peer_mgr.discourage(discouraged_addr.as_bannable()); - let peer_mgr_join_handle = logging::spawn_in_current_span(async move { - let mut peer_mgr = peer_mgr; - let _ = peer_mgr.run_internal(None).await; - peer_mgr - }); + let peer_mgr_join_handle = tokio_spawn_in_current_tracing_span( + async move { + let mut peer_mgr = peer_mgr; + let _ = peer_mgr.run_internal(None).await; + peer_mgr + }, + "", + ); // Connection from a normal peer is accepted. let normal_peer1_id = inbound_block_relay_peer_accepted_by_backend( @@ -416,11 +426,14 @@ async fn no_outgoing_connection_to_discouraged_peer(#[case] seed: Seed) { peer_mgr.discourage(discouraged_addr.as_bannable()); - let peer_mgr_join_handle = logging::spawn_in_current_span(async move { - let mut peer_mgr = peer_mgr; - let _ = peer_mgr.run_internal(None).await; - peer_mgr - }); + let peer_mgr_join_handle = tokio_spawn_in_current_tracing_span( + async move { + let mut peer_mgr = peer_mgr; + let _ = peer_mgr.run_internal(None).await; + peer_mgr + }, + "", + ); // Connection to the normal peer is established. let cmd = expect_recv!(cmd_receiver); @@ -496,11 +509,14 @@ async fn discouraged_address_is_not_announced(#[case] seed: Seed) { peer_mgr.discourage(discouraged_addr.as_bannable()); - let peer_mgr_join_handle = logging::spawn_in_current_span(async move { - let mut peer_mgr = peer_mgr; - let _ = peer_mgr.run_internal(None).await; - peer_mgr - }); + let peer_mgr_join_handle = tokio_spawn_in_current_tracing_span( + async move { + let mut peer_mgr = peer_mgr; + let _ = peer_mgr.run_internal(None).await; + peer_mgr + }, + "", + ); let peer1_id = inbound_full_relay_peer_accepted_by_backend( &conn_event_sender, @@ -619,11 +635,14 @@ async fn discouraged_address_not_in_addr_response(#[case] seed: Seed) { peer_mgr.discourage(discouraged_addr.as_bannable()); - let peer_mgr_join_handle = logging::spawn_in_current_span(async move { - let mut peer_mgr = peer_mgr; - let _ = peer_mgr.run_internal(None).await; - peer_mgr - }); + let peer_mgr_join_handle = tokio_spawn_in_current_tracing_span( + async move { + let mut peer_mgr = peer_mgr; + let _ = peer_mgr.run_internal(None).await; + peer_mgr + }, + "", + ); let peer_id = inbound_full_relay_peer_accepted_by_backend( &conn_event_sender, diff --git a/p2p/src/peer_manager/tests/eviction.rs b/p2p/src/peer_manager/tests/eviction.rs index da9de7c539..72ca2292a9 100644 --- a/p2p/src/peer_manager/tests/eviction.rs +++ b/p2p/src/peer_manager/tests/eviction.rs @@ -29,6 +29,7 @@ use test_utils::{ random::{make_seedable_rng, Seed}, BasicTestTimeGetter, }; +use utils::tokio_spawn_in_current_tracing_span; use crate::{ config::P2pConfig, @@ -170,11 +171,14 @@ mod dont_evict_if_blocks_in_flight { for addr in &addresses { peer_mgr.peerdb.peer_discovered(*addr); } - let peer_mgr_join_handle = logging::spawn_in_current_span(async move { - let mut peer_mgr = peer_mgr; - let _ = peer_mgr.run_internal(None).await; - peer_mgr - }); + let peer_mgr_join_handle = tokio_spawn_in_current_tracing_span( + async move { + let mut peer_mgr = peer_mgr; + let _ = peer_mgr.run_internal(None).await; + peer_mgr + }, + "", + ); let mut peer_ids = Vec::new(); diff --git a/p2p/src/peer_manager/tests/mod.rs b/p2p/src/peer_manager/tests/mod.rs index 5711f79a1b..b93f37c8f8 100644 --- a/p2p/src/peer_manager/tests/mod.rs +++ b/p2p/src/peer_manager/tests/mod.rs @@ -34,7 +34,7 @@ use tokio::{ time::timeout, }; -use ::utils::atomics::SeqCstAtomicBool; +use ::utils::{atomics::SeqCstAtomicBool, tokio_spawn_in_current_tracing_span}; use common::{chain::ChainConfig, time_getter::TimeGetter}; use networking::transport::TcpTransportSocket; use p2p_test_utils::expect_recv; @@ -209,13 +209,14 @@ where { let (peer_manager, peer_mgr_event_sender, shutdown_sender, subscribers_sender) = make_peer_manager_custom::(transport, addr, chain_config, p2p_config, time_getter).await; - logging::spawn_in_current_span( + tokio_spawn_in_current_tracing_span( // Rust 1.92 thinks that the unwrap call here is unreachable, even though the function // returns a normal error. #[allow(unreachable_code)] async move { peer_manager.run().await.unwrap(); }, + "", ); (peer_mgr_event_sender, shutdown_sender, subscribers_sender) } diff --git a/p2p/src/peer_manager/tests/ping.rs b/p2p/src/peer_manager/tests/ping.rs index e1d00137f2..ce7e73853d 100644 --- a/p2p/src/peer_manager/tests/ping.rs +++ b/p2p/src/peer_manager/tests/ping.rs @@ -20,6 +20,7 @@ use networking::test_helpers::{TestTransportMaker, TestTransportTcp}; use networking::transport::TcpTransportSocket; use p2p_test_utils::expect_recv; use test_utils::{assert_matches, assert_matches_return_val, BasicTestTimeGetter}; +use utils::tokio_spawn_in_current_tracing_span; use crate::{ config::{NodeType, P2pConfig}, @@ -92,9 +93,12 @@ async fn ping_timeout() { ) .unwrap(); - logging::spawn_in_current_span(async move { - let _ = peer_manager.run().await; - }); + tokio_spawn_in_current_tracing_span( + async move { + let _ = peer_manager.run().await; + }, + "", + ); // Notify about new inbound connection conn_event_sender diff --git a/p2p/src/sync/mod.rs b/p2p/src/sync/mod.rs index 4517c8255f..2051833c3e 100644 --- a/p2p/src/sync/mod.rs +++ b/p2p/src/sync/mod.rs @@ -30,6 +30,7 @@ use tokio::{ sync::mpsc::{self, Receiver, UnboundedReceiver, UnboundedSender}, task::JoinSet, }; +use tracing::Instrument; use common::{ chain::{config::ChainConfig, Block, Transaction}, @@ -38,8 +39,7 @@ use common::{ }; use logging::log; use mempool::{event::TransactionProcessed, tx_origin::TxOrigin, MempoolHandle}; -use tracing::Instrument; -use utils::{sync::Arc, tap_log::TapLog}; +use utils::{sync::Arc, tap_log::TapLog, tokio_spawn_in_join_set}; use crate::{ config::P2pConfig, @@ -207,11 +207,13 @@ where self.time_getter.clone(), ); - peer_tasks.spawn( + tokio_spawn_in_join_set( + &mut peer_tasks, async move { mgr.run().await; } .in_current_span(), + &format!("Peer[id={peer_id}] block sync mgr"), ); peer_local_event_senders.push(local_event_sender); @@ -231,11 +233,13 @@ where self.observer.clone(), ); - peer_tasks.spawn( + tokio_spawn_in_join_set( + &mut peer_tasks, async move { mgr.run().await; } .in_current_span(), + &format!("Peer[id={peer_id}] tx sync mgr"), ); peer_local_event_senders.push(local_event_sender); diff --git a/p2p/src/sync/tests/ban_scores.rs b/p2p/src/sync/tests/ban_scores.rs index 2200eadbd5..7be2e4e2ee 100644 --- a/p2p/src/sync/tests/ban_scores.rs +++ b/p2p/src/sync/tests/ban_scores.rs @@ -29,6 +29,7 @@ use mempool::error::MempoolPolicyError; use p2p_test_utils::create_n_blocks; use randomness::Rng; use test_utils::random::{make_seedable_rng, Seed}; +use utils::tokio_spawn_in_current_tracing_span; use crate::{ error::{P2pError, PeerError, ProtocolError}, @@ -58,21 +59,24 @@ fn ban_scores() { #[tokio::test] async fn peer_handle_result() { let (peer_mgr_event_sender, mut peer_mgr_event_receiver) = unbounded_channel(); - logging::spawn_in_current_span(async move { - while let Some(event) = peer_mgr_event_receiver.recv().await { - match event { - crate::PeerManagerEvent::AdjustPeerScore { - peer_id: _, - adjust_by: _, - reason: _, - response_sender, - } => { - response_sender.send(Ok(())); + tokio_spawn_in_current_tracing_span( + async move { + while let Some(event) = peer_mgr_event_receiver.recv().await { + match event { + crate::PeerManagerEvent::AdjustPeerScore { + peer_id: _, + adjust_by: _, + reason: _, + response_sender, + } => { + response_sender.send(Ok(())); + } + e => unreachable!("Unexpected event: {e:?}"), } - e => unreachable!("Unexpected event: {e:?}"), } - } - }); + }, + "", + ); // Test that the non-fatal errors are converted to Ok for err in [ diff --git a/p2p/src/sync/tests/helpers/mod.rs b/p2p/src/sync/tests/helpers/mod.rs index 39766a9e38..931095058c 100644 --- a/p2p/src/sync/tests/helpers/mod.rs +++ b/p2p/src/sync/tests/helpers/mod.rs @@ -52,7 +52,7 @@ use p2p_types::{bannable_address::BannableAddress, socket_address::SocketAddress use randomness::Rng; use subsystem::{ManagerJoinHandle, ShutdownTrigger}; use test_utils::random::Seed; -use utils::atomics::SeqCstAtomicBool; +use utils::{atomics::SeqCstAtomicBool, tokio_spawn_in_current_tracing_span}; use utils_networking::IpOrSocketAddress; use crate::{ @@ -145,10 +145,13 @@ impl TestNode { let sync_manager_chainstate_handle = sync_manager.chainstate().clone(); let (error_sender, error_receiver) = mpsc::unbounded_channel(); - let sync_manager_handle = logging::spawn_in_current_span(async move { - let e = sync_manager.run().await.unwrap_err(); - let _ = error_sender.send(e); - }); + let sync_manager_handle = tokio_spawn_in_current_tracing_span( + async move { + let e = sync_manager.run().await.unwrap_err(); + let _ = error_sender.send(e); + }, + "", + ); let new_tip_receiver = subscribe_to_new_tip(&sync_manager_chainstate_handle).await.unwrap(); let tx_processed_receiver = subscribe_to_tx_processed(&mempool_handle).await.unwrap(); diff --git a/p2p/src/tests/helpers/test_node.rs b/p2p/src/tests/helpers/test_node.rs index 228954ebb0..179f03f2ca 100644 --- a/p2p/src/tests/helpers/test_node.rs +++ b/p2p/src/tests/helpers/test_node.rs @@ -39,7 +39,7 @@ use p2p_test_utils::SHORT_TIMEOUT; use p2p_types::{p2p_event::P2pEventHandler, socket_address::SocketAddress}; use storage_inmemory::InMemory; use subsystem::ShutdownTrigger; -use utils::atomics::SeqCstAtomicBool; +use utils::{atomics::SeqCstAtomicBool, tokio_spawn_in_tracing_span}; use utils_networking::IpOrSocketAddress; use crate::{ @@ -191,7 +191,7 @@ where Box::new(TestDnsSeed::new(dns_seed_addresses.clone())), ) .unwrap(); - let peer_mgr_join_handle = logging::spawn_in_span( + let peer_mgr_join_handle = tokio_spawn_in_tracing_span( async move { let mut peer_mgr = peer_mgr; let err = match peer_mgr.run_without_consuming_self().await { @@ -202,6 +202,7 @@ where (peer_mgr, err) }, tracing_span.clone(), + "", ); let sync_mgr = SyncManager::>::new( @@ -214,7 +215,7 @@ where peer_mgr_event_sender.clone(), time_getter.get_time_getter(), ); - let sync_mgr_join_handle = logging::spawn_in_span( + let sync_mgr_join_handle = tokio_spawn_in_tracing_span( async move { match sync_mgr.run().await { Err(err) => err, @@ -222,6 +223,7 @@ where } }, tracing_span.clone(), + "", ); TestNode { diff --git a/p2p/test-utils/src/lib.rs b/p2p/test-utils/src/lib.rs index 35f0a753e8..15707acdcd 100644 --- a/p2p/test-utils/src/lib.rs +++ b/p2p/test-utils/src/lib.rs @@ -95,7 +95,7 @@ pub fn start_subsystems_generic( ); let mempool = manager.add_custom_subsystem("p2p-test-mempool", |handle| mempool.init(handle)); - let manager_handle = manager.main_in_task_in_span(tracing_span); + let manager_handle = manager.main_in_task_in_tracing_span(tracing_span); (chainstate, mempool, shutdown_trigger, manager_handle) } diff --git a/subsystem/src/manager/manager_impl.rs b/subsystem/src/manager/manager_impl.rs index a1b33d5ad1..7e748539fc 100644 --- a/subsystem/src/manager/manager_impl.rs +++ b/subsystem/src/manager/manager_impl.rs @@ -22,7 +22,10 @@ use tokio::{ }; use logging::log; -use utils::{const_value::ConstValue, shallow_clone::ShallowClone}; +use utils::{ + const_value::ConstValue, shallow_clone::ShallowClone, tokio_spawn_in_current_tracing_span, + tokio_spawn_in_tracing_span, +}; use crate::{task, Handle, ManagerConfig, SubmitOnlyHandle, Subsystem}; @@ -150,7 +153,11 @@ impl Manager { let subsystems: Vec<_> = self .subsystems .into_iter() - .map(|s| s.map_task(logging::spawn_in_current_span)) + .map(|subsys_data| { + subsys_data.map_task(|fut, subsys_full_name| { + tokio_spawn_in_current_tracing_span(fut, subsys_full_name) + }) + }) .collect(); // Signal the manager is shut down so it does not wait for itself @@ -173,11 +180,12 @@ impl Manager { /// Runs the application in a separate task. /// /// This method should always be used instead of spawning a task manually because it prevents - /// an incorrect usage. The returned handle must be joined to ensure a proper subsystems + /// incorrect usage. The returned handle must be joined to ensure a proper subsystems /// shutdown. pub fn main_in_task(self) -> ManagerJoinHandle { - let handle = Some(logging::spawn_in_current_span( + let handle = Some(tokio_spawn_in_current_tracing_span( async move { self.main().await }, + "Subsystem mgr", )); ManagerJoinHandle { handle } } @@ -186,10 +194,11 @@ impl Manager { /// /// This does the same as `main_in_task` but uses the specified tracing span instead of /// the current one. - pub fn main_in_task_in_span(self, tracing_span: tracing::Span) -> ManagerJoinHandle { - let handle = Some(logging::spawn_in_span( + pub fn main_in_task_in_tracing_span(self, tracing_span: tracing::Span) -> ManagerJoinHandle { + let handle = Some(tokio_spawn_in_tracing_span( async move { self.main().await }, tracing_span, + "Subsystem mgr", )); ManagerJoinHandle { handle } } @@ -203,16 +212,17 @@ struct SubsystemData { } impl SubsystemData { - fn map_task(self, f: impl FnOnce(T) -> U) -> SubsystemData { + fn map_task(self, f: impl FnOnce(T, /*full_name*/ &str) -> U) -> SubsystemData { let Self { full_name, shutdown_tx, task, } = self; + let task = f(task, &full_name); SubsystemData { full_name, shutdown_tx, - task: f(task), + task, } } } diff --git a/subsystem/src/task.rs b/subsystem/src/task.rs index 36f994f047..4fdcc067cd 100644 --- a/subsystem/src/task.rs +++ b/subsystem/src/task.rs @@ -22,7 +22,7 @@ use tokio::{ use tracing::Instrument; use logging::log; -use utils::{once_destructor::OnceDestructor, sync::Arc}; +use utils::{once_destructor::OnceDestructor, sync::Arc, tokio_spawn_in_join_set}; use crate::{calls::Action, SubmitOnlyHandle, Subsystem}; @@ -106,9 +106,13 @@ pub async fn subsystem( }, Action::Ref(call) => { let subsys = Arc::clone(&subsys); - worker_tasks.spawn(async move { - call(subsys.read().await.interface_ref()).await - }.in_current_span()); + tokio_spawn_in_join_set( + &mut worker_tasks, + async move { + call(subsys.read().await.interface_ref()).await + }.in_current_span(), + &format!("{full_name}'s Action::Ref"), + ); }, } } diff --git a/supply-chain/README.md b/supply-chain/README.md index a20d921d8f..6548991958 100644 --- a/supply-chain/README.md +++ b/supply-chain/README.md @@ -103,6 +103,10 @@ will be automatically removed by `cargo vet`, while `cargo vet --locked` will co | Joe Birr-Pixton | ctz | The creator and lead maintainer of the widely used `rustls` crate. | | Dirkjan Ochtman | djc | One of the maintainers of `rustls`. | | Daniel McCarney | cpu | One of the maintainers of `rustls`. He's also a member of `Go`. | + | Lucio Franco | LucioFranco | Member of `tokio-rs`, `tower-rs` and `hyperium`, original author of `tonic`, maintains other crates, e.g. `tower` and `prost`. | + | Jonas Platte | jplatte | Member of `tokio-rs` and `tower-rs`, maintainer of `axum`. | + | Eliza Weisman | hawkw | Member of `tokio-rs` and `tower-rs`, creator/maintainer of `tracing` crates and crates related to `tokio-console`. | + | Jon Gjengset | jonhoo | Educator, author of "Rust for Rustaceans", author and maintainer of the `hdrhistogram` crate used by `console-subscriber`. | - We also trust the crates that we've forked. diff --git a/supply-chain/audits.toml b/supply-chain/audits.toml index 0e53d6b317..8ef7440944 100644 --- a/supply-chain/audits.toml +++ b/supply-chain/audits.toml @@ -141,6 +141,18 @@ user-id = 3618 # David Tolnay (dtolnay) start = "2019-07-23" end = "2026-10-15" +[[trusted.axum]] +criteria = "safe-to-deploy" +user-id = 6913 # Jonas Platte (jplatte) +start = "2024-09-20" +end = "2027-02-04" + +[[trusted.axum-core]] +criteria = "safe-to-deploy" +user-id = 6913 # Jonas Platte (jplatte) +start = "2024-09-20" +end = "2027-02-04" + [[trusted.base58ck]] criteria = "safe-to-deploy" user-id = 1588 # Andrew Poelstra (apoelstra) @@ -357,6 +369,18 @@ user-id = 33035 # Taiki Endo (taiki-e) start = "2022-07-17" end = "2026-10-15" +[[trusted.console-api]] +criteria = "safe-to-deploy" +user-id = 1249 # Eliza Weisman (hawkw) +start = "2021-12-16" +end = "2027-02-04" + +[[trusted.console-subscriber]] +criteria = "safe-to-deploy" +user-id = 1249 # Eliza Weisman (hawkw) +start = "2021-12-16" +end = "2027-02-04" + [[trusted.cpufeatures]] criteria = "safe-to-deploy" user-id = 267 # Tony Arcieri (tarcieri) @@ -585,6 +609,12 @@ user-id = 4556 # Dirkjan Ochtman (djc) start = "2025-10-26" end = "2026-12-08" +[[trusted.hdrhistogram]] +criteria = "safe-to-deploy" +user-id = 3989 # Jon Gjengset (jonhoo) +start = "2019-06-01" +end = "2027-02-04" + [[trusted.hex-conservative]] criteria = "safe-to-deploy" user-id = 1588 # Andrew Poelstra (apoelstra) @@ -1101,6 +1131,24 @@ user-id = 3618 # David Tolnay (dtolnay) start = "2019-04-23" end = "2026-10-15" +[[trusted.prost]] +criteria = "safe-to-deploy" +user-id = 3959 # Lucio Franco (LucioFranco) +start = "2021-07-08" +end = "2027-02-04" + +[[trusted.prost-derive]] +criteria = "safe-to-deploy" +user-id = 3959 # Lucio Franco (LucioFranco) +start = "2021-07-08" +end = "2027-02-04" + +[[trusted.prost-types]] +criteria = "safe-to-deploy" +user-id = 3959 # Lucio Franco (LucioFranco) +start = "2021-07-08" +end = "2027-02-04" + [[trusted.quote]] criteria = "safe-to-deploy" user-id = 3618 # David Tolnay (dtolnay) @@ -1431,6 +1479,18 @@ user-id = 10 start = "2019-03-02" end = "2026-10-15" +[[trusted.tokio]] +criteria = "safe-to-deploy" +user-id = 1249 # Eliza Weisman (hawkw) +start = "2020-04-04" +end = "2027-02-04" + +[[trusted.tokio]] +criteria = "safe-to-deploy" +user-id = 3959 # Lucio Franco (LucioFranco) +start = "2019-10-01" +end = "2027-02-04" + [[trusted.tokio]] criteria = "safe-to-deploy" user-id = 6741 # Alice Ryhl (Darksonn) @@ -1443,6 +1503,12 @@ user-id = 10 start = "2019-04-24" end = "2026-10-15" +[[trusted.tokio-macros]] +criteria = "safe-to-deploy" +user-id = 3959 # Lucio Franco (LucioFranco) +start = "2019-10-01" +end = "2027-02-04" + [[trusted.tokio-macros]] criteria = "safe-to-deploy" user-id = 6741 # Alice Ryhl (Darksonn) @@ -1509,6 +1575,18 @@ user-id = 6743 # Ed Page (epage) start = "2025-04-25" end = "2026-10-15" +[[trusted.tonic]] +criteria = "safe-to-deploy" +user-id = 3959 # Lucio Franco (LucioFranco) +start = "2019-10-02" +end = "2027-02-04" + +[[trusted.tonic-prost]] +criteria = "safe-to-deploy" +user-id = 3959 # Lucio Franco (LucioFranco) +start = "2025-07-28" +end = "2027-02-04" + [[trusted.tower]] criteria = "safe-to-deploy" user-id = 10 @@ -1527,12 +1605,36 @@ user-id = 359 # Sean McArthur (seanmonstar) start = "2024-09-23" end = "2026-10-15" +[[trusted.tower-http]] +criteria = "safe-to-deploy" +user-id = 3959 # Lucio Franco (LucioFranco) +start = "2023-09-01" +end = "2027-02-04" + [[trusted.tower-layer]] criteria = "safe-to-deploy" user-id = 10 start = "2019-04-27" end = "2026-10-15" +[[trusted.tower-layer]] +criteria = "safe-to-deploy" +user-id = 3959 # Lucio Franco (LucioFranco) +start = "2019-09-11" +end = "2027-02-04" + +[[trusted.tower-service]] +criteria = "safe-to-deploy" +user-id = 3959 # Lucio Franco (LucioFranco) +start = "2019-08-20" +end = "2027-02-04" + +[[trusted.tracing-serde]] +criteria = "safe-to-deploy" +user-id = 1249 # Eliza Weisman (hawkw) +start = "2019-06-27" +end = "2027-02-04" + [[trusted.unicode-ident]] criteria = "safe-to-deploy" user-id = 3618 # David Tolnay (dtolnay) diff --git a/supply-chain/config.toml b/supply-chain/config.toml index b87143333e..0ed20c998f 100644 --- a/supply-chain/config.toml +++ b/supply-chain/config.toml @@ -110,14 +110,6 @@ criteria = "safe-to-deploy" version = "4.7.1" criteria = "safe-to-deploy" -[[exemptions.axum]] -version = "0.7.9" -criteria = "safe-to-deploy" - -[[exemptions.axum-core]] -version = "0.4.5" -criteria = "safe-to-deploy" - [[exemptions.bigdecimal]] version = "0.4.9" criteria = "safe-to-deploy" @@ -538,6 +530,10 @@ criteria = "safe-to-deploy" version = "0.2.1" criteria = "safe-to-deploy" +[[exemptions.hyper-timeout]] +version = "0.5.2" +criteria = "safe-to-deploy" + [[exemptions.iced_aw]] version = "0.12.2@git:def1db9aac1e58a47e0c3127d4d4e95d724ca8ad" criteria = "safe-to-deploy" @@ -658,6 +654,10 @@ criteria = "safe-to-deploy" version = "0.7.3" criteria = "safe-to-deploy" +[[exemptions.matchit]] +version = "0.8.4" +criteria = "safe-to-deploy" + [[exemptions.memmap2]] version = "0.9.8" criteria = "safe-to-deploy" @@ -674,6 +674,10 @@ criteria = "safe-to-deploy" version = "3.0.0" criteria = "safe-to-deploy" +[[exemptions.minimal-lexical]] +version = "0.2.1" +criteria = "safe-to-deploy" + [[exemptions.mockall]] version = "0.13.1" criteria = "safe-to-deploy" @@ -1254,14 +1258,6 @@ criteria = "safe-to-deploy" version = "0.5.2" criteria = "safe-to-deploy" -[[exemptions.tower-layer]] -version = "0.3.3" -criteria = "safe-to-deploy" - -[[exemptions.tower-service]] -version = "0.3.3" -criteria = "safe-to-deploy" - [[exemptions.tracing-serde]] version = "0.2.0" criteria = "safe-to-deploy" diff --git a/supply-chain/imports.lock b/supply-chain/imports.lock index 1584c5b2ab..e02fa6670c 100644 --- a/supply-chain/imports.lock +++ b/supply-chain/imports.lock @@ -134,6 +134,34 @@ user-id = 3618 user-login = "dtolnay" user-name = "David Tolnay" +[[publisher.axum]] +version = "0.7.9" +when = "2024-11-16" +user-id = 6913 +user-login = "jplatte" +user-name = "Jonas Platte" + +[[publisher.axum]] +version = "0.8.8" +when = "2025-12-20" +user-id = 6913 +user-login = "jplatte" +user-name = "Jonas Platte" + +[[publisher.axum-core]] +version = "0.4.5" +when = "2024-09-27" +user-id = 6913 +user-login = "jplatte" +user-name = "Jonas Platte" + +[[publisher.axum-core]] +version = "0.5.6" +when = "2025-12-27" +user-id = 6913 +user-login = "jplatte" +user-name = "Jonas Platte" + [[publisher.base58ck]] version = "0.1.0" when = "2024-04-02" @@ -371,6 +399,20 @@ user-id = 6743 user-login = "epage" user-name = "Ed Page" +[[publisher.console-api]] +version = "0.9.0" +when = "2025-10-30" +user-id = 1249 +user-login = "hawkw" +user-name = "Eliza Weisman" + +[[publisher.console-subscriber]] +version = "0.5.0" +when = "2025-10-30" +user-id = 1249 +user-login = "hawkw" +user-name = "Eliza Weisman" + [[publisher.core-foundation]] version = "0.9.3" when = "2022-02-07" @@ -664,6 +706,13 @@ user-id = 2915 user-login = "Amanieu" user-name = "Amanieu d'Antras" +[[publisher.hdrhistogram]] +version = "7.5.4" +when = "2023-11-18" +user-id = 3989 +user-login = "jonhoo" +user-name = "Jon Gjengset" + [[publisher.hex-conservative]] version = "0.1.2" when = "2024-05-14" @@ -1213,6 +1262,27 @@ user-id = 3618 user-login = "dtolnay" user-name = "David Tolnay" +[[publisher.prost]] +version = "0.14.3" +when = "2026-01-10" +user-id = 3959 +user-login = "LucioFranco" +user-name = "Lucio Franco" + +[[publisher.prost-derive]] +version = "0.14.3" +when = "2026-01-10" +user-id = 3959 +user-login = "LucioFranco" +user-name = "Lucio Franco" + +[[publisher.prost-types]] +version = "0.14.3" +when = "2026-01-10" +user-id = 3959 +user-login = "LucioFranco" +user-name = "Lucio Franco" + [[publisher.quote]] version = "1.0.41" when = "2025-09-29" @@ -1640,6 +1710,20 @@ user-id = 6743 user-login = "epage" user-name = "Ed Page" +[[publisher.tonic]] +version = "0.14.3" +when = "2026-01-28" +user-id = 3959 +user-login = "LucioFranco" +user-name = "Lucio Franco" + +[[publisher.tonic-prost]] +version = "0.14.3" +when = "2026-01-28" +user-id = 3959 +user-login = "LucioFranco" +user-name = "Lucio Franco" + [[publisher.tower]] version = "0.5.2" when = "2024-12-11" @@ -1654,6 +1738,20 @@ user-id = 359 user-login = "seanmonstar" user-name = "Sean McArthur" +[[publisher.tower-layer]] +version = "0.3.3" +when = "2024-08-13" +user-id = 3959 +user-login = "LucioFranco" +user-name = "Lucio Franco" + +[[publisher.tower-service]] +version = "0.3.3" +when = "2024-08-13" +user-id = 3959 +user-login = "LucioFranco" +user-name = "Lucio Franco" + [[publisher.unicode-ident]] version = "1.0.19" when = "2025-09-10" @@ -3529,6 +3627,15 @@ describe in the review doc. """ aggregated-from = "https://chromium.googlesource.com/chromium/src/+/main/third_party/rust/chromium_crates_io/supply-chain/audits.toml?format=TEXT" +[[audits.google.audits.nom]] +who = "danakj@chromium.org" +criteria = "safe-to-deploy" +version = "7.1.3" +notes = """ +Reviewed in https://chromium-review.googlesource.com/c/chromium/src/+/5046153 +""" +aggregated-from = "https://chromium.googlesource.com/chromium/src/+/main/third_party/rust/chromium_crates_io/supply-chain/audits.toml?format=TEXT" + [[audits.google.audits.num-integer]] who = "Manish Goregaokar " criteria = "safe-to-deploy" diff --git a/utils/Cargo.toml b/utils/Cargo.toml index f8117656b5..74dfaf5a31 100644 --- a/utils/Cargo.toml +++ b/utils/Cargo.toml @@ -23,6 +23,8 @@ probabilistic-collections.workspace = true qrcodegen.workspace = true slave-pool.workspace = true thiserror.workspace = true +tokio = { workspace = true, default-features = false, features = ["macros", "rt", "sync", "tracing"] } +tracing.workspace = true zeroize.workspace = true [dev-dependencies] @@ -33,12 +35,13 @@ regex.workspace = true rstest.workspace = true static_assertions.workspace = true tempfile.workspace = true -tracing.workspace = true -tokio = { workspace = true, features = ["macros"] } [target.'cfg(loom)'.dependencies] loom.workspace = true +[features] +tokio-console = [] + [[bench]] name = "benches" harness = false diff --git a/utils/src/lib.rs b/utils/src/lib.rs index a5baf56463..3f546147d8 100644 --- a/utils/src/lib.rs +++ b/utils/src/lib.rs @@ -50,6 +50,12 @@ pub mod workspace_path; mod concurrency_impl; pub use concurrency_impl::*; +mod tokio_utils; +pub use tokio_utils::{ + tokio_spawn, tokio_spawn_blocking, tokio_spawn_in_current_tracing_span, + tokio_spawn_in_join_set, tokio_spawn_in_tracing_span, +}; + pub use log_error::log_error; // The internals of the `log_error` macro will refer to other parts of the repository as diff --git a/utils/src/tokio_utils.rs b/utils/src/tokio_utils.rs new file mode 100644 index 0000000000..881b8cfc87 --- /dev/null +++ b/utils/src/tokio_utils.rs @@ -0,0 +1,122 @@ +// Copyright (c) 2021-2026 RBB S.r.l +// opensource@mintlayer.org +// SPDX-License-Identifier: MIT +// Licensed under the MIT License; +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://github.com/mintlayer/mintlayer-core/blob/master/LICENSE +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Wrappers for tokio task-launching functions that also accept a "task name", which will be +//! visible in tokio-console when it's attached to the app (requires the corresponding +//! compile-time feature). + +use std::future::Future; + +use tokio::task::{AbortHandle, JoinHandle, JoinSet}; +use tracing::{Instrument, Span}; + +// Note: +// - "track_caller" is needed for tokio-console to be able to correctly display the location, +// otherwise it'll always be "utils/src/tokio_utils.rs". +// - It'd be better if enabling tokio-console was done via a command-line option, without the +// need to rebuild the app. But this would require to always use the currently unstable builder +// APIs, which means that we'd have to return proper Results from the "tokio_spawn_" functions +// and make sure the caller code can handle potential errors (because even though most functions +// can't return an error now, they may start doing so in future versions of Tokio, given that +// the API is considered unstable), which is not always easy to do. + +#[track_caller] +pub fn tokio_spawn_in_current_tracing_span( + future: Fut, + task_name: &str, +) -> JoinHandle +where + Fut: Future + Send + 'static, + Fut::Output: Send + 'static, +{ + tokio_spawn(future.in_current_span(), task_name) +} + +#[track_caller] +pub fn tokio_spawn_in_tracing_span( + future: Fut, + span: Span, + task_name: &str, +) -> JoinHandle +where + Fut: Future + Send + 'static, + Fut::Output: Send + 'static, +{ + tokio_spawn(future.instrument(span), task_name) +} + +#[track_caller] +#[allow(unused_variables)] +pub fn tokio_spawn(future: Fut, task_name: &str) -> JoinHandle +where + Fut: Future + Send + 'static, + Fut::Output: Send + 'static, +{ + #[cfg(feature = "tokio-console")] + return tokio::task::Builder::new() + .name(task_name) + .spawn(future) + // Note: as of tokio 1.49, this cannot fail + .expect("task::Builder::spawn failed"); + + #[cfg(not(feature = "tokio-console"))] + return tokio::spawn(future); +} + +#[track_caller] +#[allow(unused_variables)] +pub fn tokio_spawn_blocking(func: Func, task_name: &str) -> JoinHandle +where + Func: FnOnce() -> Output + Send + 'static, + Output: Send + 'static, +{ + // Note: this actually behaves slightly differently compared to the normal `task::spawn_blocking`: + // the latter returns a bogus handle when the internal `SpawnError::ShuttingDown` occurs + // (see https://github.com/tokio-rs/tokio/blob/tokio-1.49.0/tokio/src/runtime/blocking/pool.rs#L320-L327), + // while the builder method will return an actual error in this case. + // Since the "tokio-console" feature is for debugging purposes only, it's ok to panic when + // the task is being spawned during shutdown. + #[cfg(feature = "tokio-console")] + return tokio::task::Builder::new() + .name(task_name) + .spawn_blocking(func) + .expect("task::Builder::spawn_blocking failed"); + + #[cfg(not(feature = "tokio-console"))] + return tokio::task::spawn_blocking(func); +} + +#[track_caller] +#[allow(unused_variables)] +pub fn tokio_spawn_in_join_set( + set: &mut JoinSet, + future: Fut, + task_name: &str, +) -> AbortHandle +where + Fut: Future + Send + 'static, + Fut::Output: Send + 'static, +{ + #[cfg(feature = "tokio-console")] + return set + .build_task() + .name(task_name) + .spawn(future) + // Note: as of tokio 1.49, this cannot fail + .expect("join_set::Builder::spawn failed"); + + #[cfg(not(feature = "tokio-console"))] + return set.spawn(future); +} diff --git a/wallet/wallet-rpc-lib/src/service/worker.rs b/wallet/wallet-rpc-lib/src/service/worker.rs index ba7f3c09cf..5456b07020 100644 --- a/wallet/wallet-rpc-lib/src/service/worker.rs +++ b/wallet/wallet-rpc-lib/src/service/worker.rs @@ -15,22 +15,20 @@ use std::{ops::ControlFlow, path::PathBuf, sync::Arc}; -use common::chain::ChainConfig; use futures::{future::BoxFuture, never::Never}; use tokio::{sync::mpsc, task::JoinHandle}; +use common::chain::ChainConfig; use logging::log; +use utils::tokio_spawn; use utils_networking::broadcaster::Broadcaster; use wallet_controller::types::{ CreatedWallet, OpenedWallet, WalletCreationOptions, WalletTypeArgs, }; use wallet_controller::{ControllerError, NodeInterface}; -use wallet_types::scan_blockchain::ScanBlockchain; -use wallet_types::wallet_type::WalletType; - -use crate::types::RpcError; +use wallet_types::{scan_blockchain::ScanBlockchain, wallet_type::WalletType}; -use crate::Event; +use crate::{types::RpcError, Event}; use super::WalletServiceEvents; @@ -303,6 +301,6 @@ where wallet_events, ); - tokio::spawn(worker.event_loop()) + tokio_spawn(worker.event_loop(), "Wallet worker event loop") } }