From f1166a859f39e4355d0559ec31e8701340ee3955 Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 13 May 2026 09:06:51 +0800 Subject: [PATCH 1/5] fix: tighten oneshot memory ordering --- mea/src/oneshot/mod.rs | 171 ++++++++++++++++++++++++++------------- mea/src/oneshot/tests.rs | 108 +++++++++++++++++++++++++ 2 files changed, 223 insertions(+), 56 deletions(-) diff --git a/mea/src/oneshot/mod.rs b/mea/src/oneshot/mod.rs index d6d2908..00cb250 100644 --- a/mea/src/oneshot/mod.rs +++ b/mea/src/oneshot/mod.rs @@ -114,10 +114,6 @@ unsafe impl Sync for Sender {} #[inline(always)] fn sender_wake_up_receiver(channel: &Channel, state: u8) { - // ORDERING: Synchronizes with writing waker to memory, and prevents the - // taking of the waker from being ordered before this operation. - fence(Ordering::Acquire); - // Take the waker, but critically do not awake it. If we awake it now, the // receiving thread could still observe the AWAKING state and re-await, meaning // that after we change to the MESSAGE state, it would remain waiting indefinitely @@ -125,7 +121,8 @@ fn sender_wake_up_receiver(channel: &Channel, state: u8) { // // SAFETY: at this point we are in the AWAKING state, and the receiving thread // does not access the waker while in this state, nor does it free the channel - // allocation in this state. + // allocation in this state. The caller's acquire ordering establishes a happens-before + // relationship with the writing of the waker. let waker = unsafe { channel.take_waker() }; // ORDERING: this ordering serves two-fold: it synchronizes with the acquire load @@ -171,11 +168,11 @@ impl Sender { // * RECEIVING + 1 = AWAKING // * DISCONNECTED + 1 = EMPTY (invalid), however this state is never observed // - // ORDERING: we use release ordering to ensure writing the message is visible to the - // receiving thread. The EMPTY and DISCONNECTED branches do not observe any shared state, - // and thus we do not need an acquire ordering. The RECEIVING branch manages synchronization - // independent of this operation. - match channel.state.fetch_add(1, Ordering::Release) { + // ORDERING: we need release ordering to allow the receiver to synchronize with our write + // of the message and with our final write of the state, in the case where the receiver + // becomes responsible for freeing the channel. We need acquire ordering in the RECEIVING + // and DISCONNECTED branches, as explained further down. + match channel.state.fetch_add(1, Ordering::AcqRel) { // The receiver is alive and has not started waiting. Send done. EMPTY => Ok(()), // The receiver is waiting. Wake it up so it can return the message. @@ -185,8 +182,10 @@ impl Sender { } // The receiver was already dropped. The error is responsible for freeing the channel. // - // SAFETY: since the receiver disconnected it will no longer access `channel_ptr`, so - // we can transfer exclusive ownership of the channel's resources to the error. + // SAFETY: The acquire ordering above synchronizes with the receiver's write of the + // DISCONNECTED state. Since the receiver disconnected it will no longer access + // `channel_ptr`, so we can transfer exclusive ownership of the channel's resources to + // the error. // Moreover, since we just placed the message in the channel, the channel contains a // valid message. DISCONNECTED => Err(SendError { channel_ptr }), @@ -230,11 +229,13 @@ impl Drop for Sender { // * RECEIVING ^ 001 = AWAKING // * DISCONNECTED ^ 001 = EMPTY (invalid), but this state is never observed // - // ORDERING: we need not release ordering here since there are no modifications we - // need to make visible to other thread, and the Err(RECEIVING) branch handles - // synchronization independent of this fetch_xor - match channel.state.fetch_xor(0b001, Ordering::Relaxed) { - // The receiver has not started waiting, nor is it dropped. + // ORDERING: Release is required so that in the states where the receiver becomes + // responsible for deallocating the channel, they can synchronize with this final state + // write from us. Acquire is required by the branches below to synchronize with writes from + // the receiver. + match channel.state.fetch_xor(0b001, Ordering::AcqRel) { + // The receiver is not waiting, nor is it dropped. The receiver is responsible for + // deallocating the channel. EMPTY => {} // The receiver is waiting. Wake it up so it can detect that the channel disconnected. RECEIVING => sender_wake_up_receiver(channel, DISCONNECTED), @@ -243,7 +244,8 @@ impl Drop for Sender { // SAFETY: when the receiver switches the state to DISCONNECTED they have received // the message or will no longer be trying to receive the message, and have // observed that the sender is still alive, meaning that we are responsible for - // freeing the channel allocation. + // freeing the channel allocation. The acquire ordering above synchronizes with + // the receiver's final write of the state. unsafe { dealloc(self.channel_ptr) }; } state => unreachable!("unexpected channel state: {}", state), @@ -335,10 +337,9 @@ impl Receiver { // SAFETY: The channel will not be freed while this method is still running. let channel = unsafe { self.channel_ptr.as_ref() }; - // ORDERING: we use acquire ordering to synchronize with the store of the message. - match channel.state.load(Ordering::Acquire) { - EMPTY => Err(TryRecvError::Empty), - DISCONNECTED => Err(TryRecvError::Disconnected), + // ORDERING: Relaxed is fine since the only branch that needs synchronization is MESSAGE, + // and that branch has its own synchronization. + match channel.state.load(Ordering::Relaxed) { MESSAGE => { // It is okay to break up the load and store since once we are in the MESSAGE state, // the sender no longer modifies the state @@ -347,9 +348,14 @@ impl Receiver { // we need not make any side effects visible to it. channel.state.store(DISCONNECTED, Ordering::Relaxed); - // SAFETY: we are in the MESSAGE state so the message is present + // ORDERING: Synchronize with the sender's write of the message. + fence(Ordering::Acquire); + + // SAFETY: we are in the MESSAGE state so the message is present and synchronized. Ok(unsafe { channel.take_message() }) } + EMPTY => Err(TryRecvError::Empty), + DISCONNECTED => Err(TryRecvError::Disconnected), state => unreachable!("unexpected channel state: {}", state), } } @@ -361,17 +367,29 @@ impl Drop for Receiver { // left deallocating the channel allocation to us. let channel = unsafe { self.channel_ptr.as_ref() }; - // Set the channel state to disconnected and read what state the receiver was in. - match channel.state.swap(DISCONNECTED, Ordering::Acquire) { - // The sender has not sent anything, nor is it dropped. + // Set the channel state to disconnected and read what state the channel was in. + // + // ORDERING: Release is required so that in the states where the sender becomes responsible + // for deallocating the channel, they can synchronize with this final state write from us. + // Acquire is required by the branches below to synchronize with writes from the sender. + match channel.state.swap(DISCONNECTED, Ordering::AcqRel) { + // The sender has not sent anything, nor is it dropped. The sender is responsible for + // deallocating the channel. EMPTY => {} // The sender already sent something. We must drop it, and free the channel. MESSAGE => { + // SAFETY: The MESSAGE state plus acquire ordering guarantees the sender has + // written a message and that it has a happens-before relationship with this drop. unsafe { channel.drop_message() }; + + // SAFETY: The acquire ordering above synchronizes with the sender's final write + // of the state, so we can safely deallocate the channel. unsafe { dealloc(self.channel_ptr) }; } // The sender was already dropped. We are responsible for freeing the channel. DISCONNECTED => { + // SAFETY: The acquire ordering above synchronizes with the sender's final write + // of the state, so we can safely deallocate the channel. unsafe { dealloc(self.channel_ptr) }; } // NOTE: the receiver, unless transformed into a future, will never see the @@ -399,7 +417,8 @@ fn recv_awaken(channel: &Channel) -> Poll> { loop { hint::spin_loop(); - // ORDERING: The load above has already synchronized with writing message. + // ORDERING: The MESSAGE branch below uses a dedicated fence to synchronize with the + // sender. Until then we only need to observe the state change. match channel.state.load(Ordering::Relaxed) { AWAKING => {} DISCONNECTED => break Poll::Ready(Err(RecvError::Disconnected)), @@ -407,7 +426,11 @@ fn recv_awaken(channel: &Channel) -> Poll> { // ORDERING: the sender has been dropped, so this update only // needs to be visible to us. channel.state.store(DISCONNECTED, Ordering::Relaxed); - // SAFETY: We observed the MESSAGE state. + + // ORDERING: Synchronize with the sender's write of the message and final state. + fence(Ordering::Acquire); + + // SAFETY: We observed the MESSAGE state and synchronized with the sender. break Poll::Ready(Ok(unsafe { channel.take_message() })); } state => unreachable!("unexpected channel state: {}", state), @@ -425,8 +448,9 @@ impl Future for Recv { // channel to us, so `self.channel` is valid let channel = unsafe { self.channel_ptr.as_ref() }; - // ORDERING: we use acquire ordering to synchronize with the store of the message. - match channel.state.load(Ordering::Acquire) { + // ORDERING: Relaxed is fine since the branches that need synchronization use dedicated + // fences. + match channel.state.load(Ordering::Relaxed) { // The sender is alive but has not sent anything yet. EMPTY => { let waker = cx.waker().clone(); @@ -438,6 +462,11 @@ impl Future for Recv { // ORDERING: the sender has been dropped so this update only needs to be // visible to us. channel.state.store(DISCONNECTED, Ordering::Relaxed); + + // ORDERING: Synchronize with the sender's write of the message and final state. + fence(Ordering::Acquire); + + // SAFETY: we are in the MESSAGE state and have synchronized with the sender. Poll::Ready(Ok(unsafe { channel.take_message() })) } // We were polled again while waiting for the sender. Replace the waker with the new @@ -471,11 +500,15 @@ impl Future for Recv { // We take the message and mark the channel disconnected. // The sender has already taken the waker. Err(MESSAGE) => { - // ORDERING: Synchronize with writing message. This branch is - // unlikely to be taken. - channel.state.swap(DISCONNECTED, Ordering::Acquire); + // ORDERING: the sender has been dropped so this update only needs to be + // visible to us. + channel.state.store(DISCONNECTED, Ordering::Relaxed); - // SAFETY: The state tells us the sender has initialized the message. + // ORDERING: Synchronize with the sender's write of the message. + fence(Ordering::Acquire); + + // SAFETY: The state tells us the sender has initialized the message, and + // the fence above synchronizes with that write. Poll::Ready(Ok(unsafe { channel.take_message() })) } // The sender is currently waking us up. @@ -503,22 +536,43 @@ impl Drop for Recv { // left deallocating the channel allocation to us. let channel = unsafe { self.channel_ptr.as_ref() }; - // Set the channel state to disconnected and read what state the receiver was in. - match channel.state.swap(DISCONNECTED, Ordering::Acquire) { - // The sender has not sent anything, nor is it dropped. + // If this receiver was previously polled, but was not polled to completion, then the + // channel is in the RECEIVING state and has a waker written. We must move away from the + // RECEIVING state before dropping the waker, otherwise we could race with the sender + // taking the same waker and deallocating the channel. + if channel.state.load(Ordering::Relaxed) == RECEIVING + && channel + .state + .compare_exchange(RECEIVING, EMPTY, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + { + // SAFETY: The RECEIVING state guarantees we have written a waker. + unsafe { channel.drop_waker() }; + } + + // Set the channel state to disconnected and read what state the channel was in. + // + // ORDERING: Release is required so that in the states where the sender becomes responsible + // for deallocating the channel, they can synchronize with this final state write from us. + // Acquire is required by the branches below to synchronize with writes from the sender. + match channel.state.swap(DISCONNECTED, Ordering::AcqRel) { + // The sender has not sent anything, nor is it dropped. The sender is responsible for + // deallocating the channel. EMPTY => {} // The sender already sent something. We must drop it, and free the channel. MESSAGE => { + // SAFETY: The MESSAGE state plus acquire ordering guarantees the sender has + // written a message and that it has a happens-before relationship with this drop. unsafe { channel.drop_message() }; + + // SAFETY: The acquire ordering above synchronizes with the sender's final write + // of the state, so we can safely deallocate the channel. unsafe { dealloc(self.channel_ptr) }; } - // The receiver has been polled. We must drop the waker. - RECEIVING => { - unsafe { channel.drop_waker() }; - } // The sender was already dropped. We are responsible for freeing the channel. DISCONNECTED => { - // SAFETY: see safety comment at top of function. + // SAFETY: The acquire ordering above synchronizes with the sender's final write + // of the state, so we can safely deallocate the channel. unsafe { dealloc(self.channel_ptr) }; } // This receiver was previously polled, so the channel was in the RECEIVING state. @@ -534,13 +588,20 @@ impl Drop for Recv { AWAKING => {} DISCONNECTED => break, MESSAGE => { - // SAFETY: we are in the message state so the message is initialized. + // SAFETY: The AcqRel swap above already synchronized with the sender's + // message write. unsafe { channel.drop_message() }; break; } state => unreachable!("unexpected channel state: {}", state), } } + // ORDERING: Synchronize with the sender's final write of the state. This is a + // separate fence so the busy loop can use relaxed loads. + fence(Ordering::Acquire); + + // SAFETY: The acquire fence above establishes a happens-before relationship with + // the sender's final state write. unsafe { dealloc(self.channel_ptr) }; } state => unreachable!("unexpected channel state: {}", state), @@ -629,29 +690,27 @@ impl Channel { // The sender sent the message while we prepared to await. // We take the message and mark the channel disconnected. Err(MESSAGE) => { - // ORDERING: Synchronize with writing message. This branch is unlikely to be - // taken, so it is likely more efficient to use a fence here - // instead of AcqRel ordering on the compare_exchange - // operation. - fence(Ordering::Acquire); - - // SAFETY: we started in the EMPTY state and the sender switched us to the - // MESSAGE state. This means that it did not take the waker, so we're - // responsible for dropping it. + // SAFETY: We wrote a waker above. The sender cannot have observed the RECEIVING + // state, so it has not accessed the waker. We must drop it. unsafe { self.drop_waker() }; // ORDERING: sender does not exist, so this update only needs to be visible to // us. self.state.store(DISCONNECTED, Ordering::Relaxed); - // SAFETY: The MESSAGE state tells us there is a correctly initialized message. + // ORDERING: Synchronize with writing message. This branch is unlikely to be + // taken, so it is likely more efficient to use a fence here instead of AcqRel + // ordering on the compare_exchange operation. + fence(Ordering::Acquire); + + // SAFETY: The MESSAGE state tells us there is a correctly initialized message, + // and the fence above synchronizes with that write. Poll::Ready(Ok(unsafe { self.take_message() })) } // The sender was dropped before sending anything while we prepared to await. Err(DISCONNECTED) => { - // SAFETY: we started in the EMPTY state and the sender switched us to the - // DISCONNECTED state. This means that it did not take the waker, so we are - // responsible for dropping it. + // SAFETY: We wrote a waker above. The sender cannot have observed the RECEIVING + // state, so it has not accessed the waker. We must drop it. unsafe { self.drop_waker() }; Poll::Ready(Err(RecvError::Disconnected)) } diff --git a/mea/src/oneshot/tests.rs b/mea/src/oneshot/tests.rs index af394c9..4f60835 100644 --- a/mea/src/oneshot/tests.rs +++ b/mea/src/oneshot/tests.rs @@ -14,6 +14,7 @@ use std::future::Future; use std::future::IntoFuture; +use std::hint::spin_loop; use std::mem; use std::pin::Pin; use std::sync::Arc; @@ -324,3 +325,110 @@ fn async_receiver_has_message() { assert!(sender.send(19i128).is_ok()); assert!(receiver.has_message()); } + +#[test] +fn concurrent_send_and_try_recv_to_completion() { + let (sender, receiver) = oneshot::channel::(); + + let receiver_thread = spawn_named("receiver", move || { + loop { + match receiver.try_recv() { + Ok(999) => break, + Ok(value) => panic!("unexpected value: {value}"), + Err(TryRecvError::Empty) => spin_loop(), + Err(TryRecvError::Disconnected) => panic!("unexpected disconnect"), + } + } + }); + + let sender_thread = spawn_named("sender", move || { + sender.send(999).unwrap(); + }); + + receiver_thread.join().unwrap(); + sender_thread.join().unwrap(); +} + +#[test] +fn concurrent_drop_sender_and_try_recv_to_completion() { + let (sender, receiver) = oneshot::channel::(); + + let receiver_thread = spawn_named("receiver", move || { + loop { + match receiver.try_recv() { + Ok(value) => panic!("unexpected value: {value}"), + Err(TryRecvError::Empty) => spin_loop(), + Err(TryRecvError::Disconnected) => break, + } + } + }); + + let sender_thread = spawn_named("sender", move || { + drop(sender); + }); + + receiver_thread.join().unwrap(); + sender_thread.join().unwrap(); +} + +#[test] +fn concurrent_send_and_poll_to_completion() { + let (sender, receiver) = oneshot::channel::(); + + let receiver_thread = spawn_named("receiver", move || { + let mut receiver = receiver.into_future(); + let (waker, _waker_handle) = waker(); + let mut context = Context::from_waker(&waker); + + loop { + match Pin::new(&mut receiver).poll(&mut context) { + Poll::Ready(Ok(999)) => break, + Poll::Ready(result) => panic!("unexpected result: {result:?}"), + Poll::Pending => spin_loop(), + } + } + }); + + let sender_thread = spawn_named("sender", move || { + sender.send(999).unwrap(); + }); + + receiver_thread.join().unwrap(); + sender_thread.join().unwrap(); +} + +#[test] +fn concurrent_drop_sender_and_poll_to_completion() { + let (sender, receiver) = oneshot::channel::(); + + let receiver_thread = spawn_named("receiver", move || { + let mut receiver = receiver.into_future(); + let (waker, _waker_handle) = waker(); + let mut context = Context::from_waker(&waker); + + loop { + match Pin::new(&mut receiver).poll(&mut context) { + Poll::Ready(Err(oneshot::RecvError::Disconnected)) => break, + Poll::Ready(result) => panic!("unexpected result: {result:?}"), + Poll::Pending => spin_loop(), + } + } + }); + + let sender_thread = spawn_named("sender", move || { + drop(sender); + }); + + receiver_thread.join().unwrap(); + sender_thread.join().unwrap(); +} + +fn spawn_named(name: &str, f: F) -> std::thread::JoinHandle<()> +where + F: FnOnce() + Send + 'static, +{ + std::thread::Builder::new() + .name(name.to_string()) + .spawn(f) + .unwrap() +} From fdf1981098e514afab0c1ca65f893b485b719b06 Mon Sep 17 00:00:00 2001 From: orthur2 Date: Fri, 22 May 2026 07:13:19 +0200 Subject: [PATCH 2/5] fix(oneshot): avoid pending receiver races Use acquire ordering before dropping stored wakers in Recv::poll and Recv::drop. When Recv::drop observes AWAKING, wait for the sender to finish the transition instead of writing DISCONNECTED first. Also bound the spin-loop tests and add coverage for pending receiver drop and cross-thread waker replacement. --- mea/src/oneshot/mod.rs | 134 ++++++++++++++++++--------------------- mea/src/oneshot/tests.rs | 120 ++++++++++++++++++++++++++++------- 2 files changed, 158 insertions(+), 96 deletions(-) diff --git a/mea/src/oneshot/mod.rs b/mea/src/oneshot/mod.rs index 00cb250..599c328 100644 --- a/mea/src/oneshot/mod.rs +++ b/mea/src/oneshot/mod.rs @@ -472,13 +472,12 @@ impl Future for Recv { // We were polled again while waiting for the sender. Replace the waker with the new // one. RECEIVING => { - // ORDERING: We use relaxed ordering on both success and failure since we have not - // written anything above that must be released, and the individual match arms - // handle any additional synchronization. + // ORDERING: Success synchronizes with the previous write_waker call before we + // drop the stored waker. Failure does not access the stored waker. match channel.state.compare_exchange( RECEIVING, EMPTY, - Ordering::Relaxed, + Ordering::Acquire, Ordering::Relaxed, ) { // We successfully changed the state back to EMPTY. @@ -488,8 +487,9 @@ impl Future for Recv { Ok(_) => { let waker = cx.waker().clone(); - // SAFETY: We wrote the waker in a previous call to poll. We do not need - // a memory barrier since the previous write here was by ourselves. + // SAFETY: The successful exchange makes the state EMPTY, so the sender + // cannot take the stored waker. The acquire ordering synchronizes with the + // waker write. unsafe { channel.drop_waker() }; // SAFETY: We can not be in the forbidden states, and no waker in the @@ -536,75 +536,63 @@ impl Drop for Recv { // left deallocating the channel allocation to us. let channel = unsafe { self.channel_ptr.as_ref() }; - // If this receiver was previously polled, but was not polled to completion, then the - // channel is in the RECEIVING state and has a waker written. We must move away from the - // RECEIVING state before dropping the waker, otherwise we could race with the sender - // taking the same waker and deallocating the channel. - if channel.state.load(Ordering::Relaxed) == RECEIVING - && channel - .state - .compare_exchange(RECEIVING, EMPTY, Ordering::Relaxed, Ordering::Relaxed) - .is_ok() - { - // SAFETY: The RECEIVING state guarantees we have written a waker. - unsafe { channel.drop_waker() }; - } - - // Set the channel state to disconnected and read what state the channel was in. - // - // ORDERING: Release is required so that in the states where the sender becomes responsible - // for deallocating the channel, they can synchronize with this final state write from us. - // Acquire is required by the branches below to synchronize with writes from the sender. - match channel.state.swap(DISCONNECTED, Ordering::AcqRel) { - // The sender has not sent anything, nor is it dropped. The sender is responsible for - // deallocating the channel. - EMPTY => {} - // The sender already sent something. We must drop it, and free the channel. - MESSAGE => { - // SAFETY: The MESSAGE state plus acquire ordering guarantees the sender has - // written a message and that it has a happens-before relationship with this drop. - unsafe { channel.drop_message() }; - - // SAFETY: The acquire ordering above synchronizes with the sender's final write - // of the state, so we can safely deallocate the channel. - unsafe { dealloc(self.channel_ptr) }; - } - // The sender was already dropped. We are responsible for freeing the channel. - DISCONNECTED => { - // SAFETY: The acquire ordering above synchronizes with the sender's final write - // of the state, so we can safely deallocate the channel. - unsafe { dealloc(self.channel_ptr) }; - } - // This receiver was previously polled, so the channel was in the RECEIVING state. - // But the sender has observed the RECEIVING state and is currently reading the waker - // to wake us up. We need to loop here until we observe the MESSAGE or DISCONNECTED - // state. We busy loop here since we know the sender is done very soon. - AWAKING => { - loop { - hint::spin_loop(); - - // ORDERING: The swap above has already synchronized with writing message. - match channel.state.load(Ordering::Relaxed) { - AWAKING => {} - DISCONNECTED => break, - MESSAGE => { - // SAFETY: The AcqRel swap above already synchronized with the sender's - // message write. - unsafe { channel.drop_message() }; - break; - } - state => unreachable!("unexpected channel state: {}", state), + loop { + // ORDERING: MESSAGE and DISCONNECTED synchronize with the sender's state writes. + match channel.state.load(Ordering::Acquire) { + // The sender has not sent anything, nor is it dropped. Mark the receiver as + // dropped; the sender is responsible for deallocating the channel. + EMPTY => { + if channel + .state + .compare_exchange(EMPTY, DISCONNECTED, Ordering::Release, Ordering::Relaxed) + .is_ok() + { + break; } } - // ORDERING: Synchronize with the sender's final write of the state. This is a - // separate fence so the busy loop can use relaxed loads. - fence(Ordering::Acquire); - - // SAFETY: The acquire fence above establishes a happens-before relationship with - // the sender's final state write. - unsafe { dealloc(self.channel_ptr) }; + // The sender already sent something. We must drop it, and free the channel. + MESSAGE => { + // SAFETY: The MESSAGE state plus acquire ordering guarantees the sender has + // written a message and that it has a happens-before relationship with this + // drop. + unsafe { channel.drop_message() }; + + // SAFETY: The acquire load above synchronizes with the sender's final write of + // the state, so we can safely deallocate the channel. + unsafe { dealloc(self.channel_ptr) }; + break; + } + // This receiver was previously polled, but was not polled to completion. Move away + // from RECEIVING before dropping the waker so the sender cannot take the same + // waker. + RECEIVING => { + if channel + .state + .compare_exchange(RECEIVING, EMPTY, Ordering::Acquire, Ordering::Relaxed) + .is_ok() + { + // SAFETY: The successful exchange makes the state EMPTY, so the sender + // cannot take the stored waker. The acquire ordering synchronizes with the + // waker write. + unsafe { channel.drop_waker() }; + } + } + // The sender has observed RECEIVING and is taking the waker. Wait until it stores + // MESSAGE or DISCONNECTED. + AWAKING => { + hint::spin_loop(); + } + // The sender was already dropped, or this future was previously polled to + // completion. We are responsible for freeing the channel. + DISCONNECTED => { + // SAFETY: When DISCONNECTED comes from the sender, the acquire load + // synchronizes with the sender's state write. When it comes from our own + // completed poll, the message has already been taken. + unsafe { dealloc(self.channel_ptr) }; + break; + } + state => unreachable!("unexpected channel state: {}", state), } - state => unreachable!("unexpected channel state: {}", state), } } } @@ -666,7 +654,7 @@ impl Channel { /// # Safety /// /// * The `waker` field must not have a waker stored when calling this method. - /// * The `state` must not be in the RECEIVING state when calling this method. + /// * The `state` must not be in the RECEIVING or AWAKING state when calling this method. unsafe fn write_waker(&self, waker: Waker) -> Poll> { // Write the waker instance to the channel. // diff --git a/mea/src/oneshot/tests.rs b/mea/src/oneshot/tests.rs index 4f60835..bb4c980 100644 --- a/mea/src/oneshot/tests.rs +++ b/mea/src/oneshot/tests.rs @@ -27,6 +27,7 @@ use std::task::RawWaker; use std::task::RawWakerVTable; use std::task::Waker; use std::time::Duration; +use std::time::Instant; use crate::oneshot; use crate::oneshot::TryRecvError; @@ -286,6 +287,59 @@ fn poll_with_different_wakers() { assert_eq!(waker_handle2.wake_count(), 1); } +#[test] +fn poll_with_different_wakers_across_threads() { + let (sender, receiver) = oneshot::channel::(); + let mut receiver = receiver.into_future(); + + let (waker1, waker_handle1) = waker(); + let mut context1 = Context::from_waker(&waker1); + + assert_eq!(Pin::new(&mut receiver).poll(&mut context1), Poll::Pending); + assert_eq!(waker_handle1.clone_count(), 1); + assert_eq!(waker_handle1.drop_count(), 0); + assert_eq!(waker_handle1.wake_count(), 0); + + let receiver_thread = spawn_named("receiver", move || { + let (waker2, waker_handle2) = waker(); + let mut context2 = Context::from_waker(&waker2); + + assert_eq!(Pin::new(&mut receiver).poll(&mut context2), Poll::Pending); + assert_eq!(waker_handle2.clone_count(), 1); + assert_eq!(waker_handle2.drop_count(), 0); + assert_eq!(waker_handle2.wake_count(), 0); + + drop(receiver); + assert_eq!(waker_handle2.drop_count(), 1); + }); + + receiver_thread.join().unwrap(); + assert_eq!(waker_handle1.drop_count(), 1); + assert!(sender.is_closed()); +} + +#[test] +fn drop_pending_receiver_closes_channel_and_drops_waker() { + let (sender, receiver) = oneshot::channel::(); + let mut receiver = receiver.into_future(); + + let (waker, waker_handle) = waker(); + let mut context = Context::from_waker(&waker); + + assert_eq!(Pin::new(&mut receiver).poll(&mut context), Poll::Pending); + assert_eq!(waker_handle.clone_count(), 1); + assert_eq!(waker_handle.drop_count(), 0); + assert_eq!(waker_handle.wake_count(), 0); + + drop(receiver); + assert_eq!(waker_handle.drop_count(), 1); + assert_eq!(waker_handle.wake_count(), 0); + assert!(sender.is_closed()); + + let error = sender.send(1234).unwrap_err(); + assert_eq!(*error.as_inner(), 1234); +} + #[test] fn poll_then_drop_receiver_during_send() { let (sender, receiver) = oneshot::channel::(); @@ -331,14 +385,12 @@ fn concurrent_send_and_try_recv_to_completion() { let (sender, receiver) = oneshot::channel::(); let receiver_thread = spawn_named("receiver", move || { - loop { - match receiver.try_recv() { - Ok(999) => break, - Ok(value) => panic!("unexpected value: {value}"), - Err(TryRecvError::Empty) => spin_loop(), - Err(TryRecvError::Disconnected) => panic!("unexpected disconnect"), - } - } + spin_until("message from sender", || match receiver.try_recv() { + Ok(999) => Some(()), + Ok(value) => panic!("unexpected value: {value}"), + Err(TryRecvError::Empty) => None, + Err(TryRecvError::Disconnected) => panic!("unexpected disconnect"), + }); }); let sender_thread = spawn_named("sender", move || { @@ -354,13 +406,11 @@ fn concurrent_drop_sender_and_try_recv_to_completion() { let (sender, receiver) = oneshot::channel::(); let receiver_thread = spawn_named("receiver", move || { - loop { - match receiver.try_recv() { - Ok(value) => panic!("unexpected value: {value}"), - Err(TryRecvError::Empty) => spin_loop(), - Err(TryRecvError::Disconnected) => break, - } - } + spin_until("sender disconnect", || match receiver.try_recv() { + Ok(value) => panic!("unexpected value: {value}"), + Err(TryRecvError::Empty) => None, + Err(TryRecvError::Disconnected) => Some(()), + }); }); let sender_thread = spawn_named("sender", move || { @@ -380,13 +430,13 @@ fn concurrent_send_and_poll_to_completion() { let (waker, _waker_handle) = waker(); let mut context = Context::from_waker(&waker); - loop { + spin_until("poll ready with message", || { match Pin::new(&mut receiver).poll(&mut context) { - Poll::Ready(Ok(999)) => break, + Poll::Ready(Ok(999)) => Some(()), Poll::Ready(result) => panic!("unexpected result: {result:?}"), - Poll::Pending => spin_loop(), + Poll::Pending => None, } - } + }); }); let sender_thread = spawn_named("sender", move || { @@ -406,13 +456,13 @@ fn concurrent_drop_sender_and_poll_to_completion() { let (waker, _waker_handle) = waker(); let mut context = Context::from_waker(&waker); - loop { + spin_until("poll ready with disconnect", || { match Pin::new(&mut receiver).poll(&mut context) { - Poll::Ready(Err(oneshot::RecvError::Disconnected)) => break, + Poll::Ready(Err(oneshot::RecvError::Disconnected)) => Some(()), Poll::Ready(result) => panic!("unexpected result: {result:?}"), - Poll::Pending => spin_loop(), + Poll::Pending => None, } - } + }); }); let sender_thread = spawn_named("sender", move || { @@ -432,3 +482,27 @@ where .spawn(f) .unwrap() } + +fn spin_until(label: &str, mut f: F) +where + F: FnMut() -> Option<()>, +{ + let deadline = Instant::now() + Duration::from_secs(5); + let mut spins = 0usize; + + loop { + if f().is_some() { + break; + } + + assert!(Instant::now() < deadline, "timed out waiting for {label}"); + + if spins % 64 == 0 { + std::thread::yield_now(); + } else { + spin_loop(); + } + + spins += 1; + } +} From 930a6a603b07eeba78c1966ffe967e856c94b4aa Mon Sep 17 00:00:00 2001 From: tison Date: Sat, 23 May 2026 11:16:52 +0800 Subject: [PATCH 3/5] fixup style and comments Signed-off-by: tison --- mea/src/mpsc/bounded.rs | 8 ++++---- mea/src/mpsc/error.rs | 38 +++++++++++++++++++------------------- mea/src/mpsc/unbounded.rs | 9 ++++----- mea/src/oneshot/mod.rs | 17 +++++++++-------- 4 files changed, 36 insertions(+), 36 deletions(-) diff --git a/mea/src/mpsc/bounded.rs b/mea/src/mpsc/bounded.rs index b6ffcb3..1e89588 100644 --- a/mea/src/mpsc/bounded.rs +++ b/mea/src/mpsc/bounded.rs @@ -85,8 +85,8 @@ impl Clone for BoundedSender { } impl fmt::Debug for BoundedSender { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("BoundedSender").finish_non_exhaustive() + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("BoundedSender").finish_non_exhaustive() } } @@ -227,8 +227,8 @@ pub struct BoundedReceiver { unsafe impl Sync for BoundedReceiver {} impl fmt::Debug for BoundedReceiver { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("BoundedReceiver").finish_non_exhaustive() + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("BoundedReceiver").finish_non_exhaustive() } } diff --git a/mea/src/mpsc/error.rs b/mea/src/mpsc/error.rs index f3fb9c9..ee104b8 100644 --- a/mea/src/mpsc/error.rs +++ b/mea/src/mpsc/error.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::any::type_name; use std::fmt; /// An error returned when trying to send on a closed channel. @@ -49,13 +50,13 @@ impl SendError { impl fmt::Display for SendError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - "sending on a closed channel".fmt(f) + f.write_str("sending on a closed channel") } } impl fmt::Debug for SendError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "SendError<{}>(..)", stringify!(T)) + write!(f, "SendError<{}>(..)", type_name::()) } } @@ -88,21 +89,20 @@ impl TrySendError { } impl fmt::Display for TrySendError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - TrySendError::Full(_) => "sending on a full channel".fmt(fmt), - TrySendError::Disconnected(_) => "sending on a closed channel".fmt(fmt), - } + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(match self { + TrySendError::Full(_) => "sending on a full channel", + TrySendError::Disconnected(_) => "sending on a closed channel", + }) } } impl fmt::Debug for TrySendError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let ty = type_name::(); match self { - TrySendError::Full(_) => write!(fmt, "TrySendError<{}>::Full(..)", stringify!(T)), - TrySendError::Disconnected(_) => { - write!(fmt, "TrySendError<{}>::Disconnected(..)", stringify!(T)) - } + TrySendError::Full(_) => write!(f, "TrySendError<{ty}>::Full(..)"), + TrySendError::Disconnected(_) => write!(f, "TrySendError<{ty}>::Disconnected(..)"), } } } @@ -117,8 +117,8 @@ pub enum RecvError { } impl fmt::Display for RecvError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - "receiving on a closed channel".fmt(fmt) + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("receiving on a closed channel") } } @@ -135,11 +135,11 @@ pub enum TryRecvError { } impl fmt::Display for TryRecvError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - match *self { - TryRecvError::Empty => "receiving on an empty channel".fmt(fmt), - TryRecvError::Disconnected => "receiving on a closed channel".fmt(fmt), - } + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(match self { + TryRecvError::Empty => "receiving on an empty channel", + TryRecvError::Disconnected => "receiving on a closed channel", + }) } } diff --git a/mea/src/mpsc/unbounded.rs b/mea/src/mpsc/unbounded.rs index 9f7d944..df23b87 100644 --- a/mea/src/mpsc/unbounded.rs +++ b/mea/src/mpsc/unbounded.rs @@ -79,8 +79,8 @@ impl Clone for UnboundedSender { } impl fmt::Debug for UnboundedSender { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("UnboundedSender").finish_non_exhaustive() + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("UnboundedSender").finish_non_exhaustive() } } @@ -139,9 +139,8 @@ pub struct UnboundedReceiver { unsafe impl Sync for UnboundedReceiver {} impl fmt::Debug for UnboundedReceiver { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("UnboundedReceiver") - .finish_non_exhaustive() + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("UnboundedReceiver").finish_non_exhaustive() } } diff --git a/mea/src/oneshot/mod.rs b/mea/src/oneshot/mod.rs index 599c328..d5b1001 100644 --- a/mea/src/oneshot/mod.rs +++ b/mea/src/oneshot/mod.rs @@ -72,6 +72,7 @@ //! # } //! ``` +use std::any::type_name; use std::cell::UnsafeCell; use std::fmt; use std::future::Future; @@ -418,7 +419,7 @@ fn recv_awaken(channel: &Channel) -> Poll> { hint::spin_loop(); // ORDERING: The MESSAGE branch below uses a dedicated fence to synchronize with the - // sender. Until then we only need to observe the state change. + // sender. Until then, we only need to observe the state change. match channel.state.load(Ordering::Relaxed) { AWAKING => {} DISCONNECTED => break Poll::Ready(Err(RecvError::Disconnected)), @@ -784,13 +785,13 @@ impl Drop for SendError { impl fmt::Display for SendError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - "sending on a closed channel".fmt(f) + f.write_str("sending on a closed channel") } } impl fmt::Debug for SendError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "SendError<{}>(..)", core::any::type_name::()) + write!(f, "SendError<{}>(..)", type_name::()) } } @@ -808,10 +809,10 @@ pub enum TryRecvError { impl fmt::Display for TryRecvError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - TryRecvError::Empty => write!(f, "receiving on an empty channel"), - TryRecvError::Disconnected => write!(f, "receiving on a closed channel"), - } + f.write_str(match self { + TryRecvError::Empty => "receiving on an empty channel", + TryRecvError::Disconnected => "receiving on a closed channel", + }) } } @@ -830,7 +831,7 @@ pub enum RecvError { impl fmt::Display for RecvError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "receiving on a closed channel") + f.write_str("receiving on a closed channel") } } From ed1749757f75ca319f04d6b4010b783653cf367c Mon Sep 17 00:00:00 2001 From: orthur2 Date: Sat, 23 May 2026 10:32:37 +0200 Subject: [PATCH 4/5] docs(oneshot): clarify receiver drop ownership --- mea/src/oneshot/mod.rs | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/mea/src/oneshot/mod.rs b/mea/src/oneshot/mod.rs index d5b1001..b0b98e1 100644 --- a/mea/src/oneshot/mod.rs +++ b/mea/src/oneshot/mod.rs @@ -424,8 +424,8 @@ fn recv_awaken(channel: &Channel) -> Poll> { AWAKING => {} DISCONNECTED => break Poll::Ready(Err(RecvError::Disconnected)), MESSAGE => { - // ORDERING: the sender has been dropped, so this update only - // needs to be visible to us. + // ORDERING: after publishing MESSAGE, the sender no longer uses the channel, so + // this state update only needs to be visible to this receiver. channel.state.store(DISCONNECTED, Ordering::Relaxed); // ORDERING: Synchronize with the sender's write of the message and final state. @@ -460,8 +460,8 @@ impl Future for Recv { } // The sender sent the message. MESSAGE => { - // ORDERING: the sender has been dropped so this update only needs to be - // visible to us. + // ORDERING: after publishing MESSAGE, the sender no longer uses the channel, so + // this state update only needs to be visible to this receiver. channel.state.store(DISCONNECTED, Ordering::Relaxed); // ORDERING: Synchronize with the sender's write of the message and final state. @@ -481,10 +481,7 @@ impl Future for Recv { Ordering::Acquire, Ordering::Relaxed, ) { - // We successfully changed the state back to EMPTY. - // - // This is the most likely branch to be taken, which is why we do not use any - // memory barriers in the compare_exchange above. + // The state is EMPTY again. Ok(_) => { let waker = cx.waker().clone(); @@ -501,8 +498,8 @@ impl Future for Recv { // We take the message and mark the channel disconnected. // The sender has already taken the waker. Err(MESSAGE) => { - // ORDERING: the sender has been dropped so this update only needs to be - // visible to us. + // ORDERING: after publishing MESSAGE, the sender no longer uses the + // channel, so this state update only needs to be visible to this receiver. channel.state.store(DISCONNECTED, Ordering::Relaxed); // ORDERING: Synchronize with the sender's write of the message. @@ -566,6 +563,13 @@ impl Drop for Recv { // This receiver was previously polled, but was not polled to completion. Move away // from RECEIVING before dropping the waker so the sender cannot take the same // waker. + // + // A successful exchange creates a short EMPTY window before the next iteration can + // mark DISCONNECTED. This branch owns and drops the stored waker first. A sender + // that observes EMPTY does not touch the waker. It either stores MESSAGE and + // leaves the message and allocation to this loop, or stores DISCONNECTED and + // leaves the allocation to this loop. If this loop marks DISCONNECTED first, the + // sender observes DISCONNECTED and owns any send error cleanup. RECEIVING => { if channel .state From adc70f9c30ad3d7a6991bca8250b9f8ca5cc2cab Mon Sep 17 00:00:00 2001 From: tison Date: Sun, 24 May 2026 15:03:52 +0800 Subject: [PATCH 5/5] fixup style and comments Signed-off-by: tison --- mea/src/oneshot/mod.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/mea/src/oneshot/mod.rs b/mea/src/oneshot/mod.rs index b0b98e1..5e43107 100644 --- a/mea/src/oneshot/mod.rs +++ b/mea/src/oneshot/mod.rs @@ -126,11 +126,11 @@ fn sender_wake_up_receiver(channel: &Channel, state: u8) { // relationship with the writing of the waker. let waker = unsafe { channel.take_waker() }; - // ORDERING: this ordering serves two-fold: it synchronizes with the acquire load - // in the receiving thread, ensuring that both our read of the waker and write of - // the message happen-before the taking of the message and freeing of the channel. - // Furthermore, we need acquire ordering to ensure awaking the receiver - // happens after the channel state is updated. + // ORDERING: this ordering serves two-fold: it synchronizes with the receiver's + // acquire fence after it observes this state, ensuring that both our read of the + // waker and write of the message happen-before the taking of the message and + // freeing of the channel. Furthermore, we need acquire ordering to ensure awaking + // the receiver happens after the channel state is updated. channel.state.swap(state, Ordering::AcqRel); // Note: it is possible that between the store above and this statement that