tokio/sync/
oneshot.rs

1#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
2
3//! A one-shot channel is used for sending a single message between
4//! asynchronous tasks. The [`channel`] function is used to create a
5//! [`Sender`] and [`Receiver`] handle pair that form the channel.
6//!
7//! The `Sender` handle is used by the producer to send the value.
8//! The `Receiver` handle is used by the consumer to receive the value.
9//!
10//! Each handle can be used on separate tasks.
11//!
12//! Since the `send` method is not async, it can be used anywhere. This includes
13//! sending between two runtimes, and using it from non-async code.
14//!
15//! If the [`Receiver`] is closed before receiving a message which has already
16//! been sent, the message will remain in the channel until the receiver is
17//! dropped, at which point the message will be dropped immediately.
18//!
19//! # Examples
20//!
21//! ```
22//! use tokio::sync::oneshot;
23//!
24//! # #[tokio::main(flavor = "current_thread")]
25//! # async fn main() {
26//! let (tx, rx) = oneshot::channel();
27//!
28//! tokio::spawn(async move {
29//!     if let Err(_) = tx.send(3) {
30//!         println!("the receiver dropped");
31//!     }
32//! });
33//!
34//! match rx.await {
35//!     Ok(v) => println!("got = {:?}", v),
36//!     Err(_) => println!("the sender dropped"),
37//! }
38//! # }
39//! ```
40//!
41//! If the sender is dropped without sending, the receiver will fail with
42//! [`error::RecvError`]:
43//!
44//! ```
45//! use tokio::sync::oneshot;
46//!
47//! # #[tokio::main(flavor = "current_thread")]
48//! # async fn main() {
49//! let (tx, rx) = oneshot::channel::<u32>();
50//!
51//! tokio::spawn(async move {
52//!     drop(tx);
53//! });
54//!
55//! match rx.await {
56//!     Ok(_) => panic!("This doesn't happen"),
57//!     Err(_) => println!("the sender dropped"),
58//! }
59//! # }
60//! ```
61//!
62//! To use a `oneshot` channel in a `tokio::select!` loop, add `&mut` in front of
63//! the channel.
64//!
65//! ```
66//! use tokio::sync::oneshot;
67//! use tokio::time::{interval, sleep, Duration};
68//!
69//! # #[tokio::main(flavor = "current_thread")]
70//! # async fn _doc() {}
71//! # #[tokio::main(flavor = "current_thread", start_paused = true)]
72//! # async fn main() {
73//! let (send, mut recv) = oneshot::channel();
74//! let mut interval = interval(Duration::from_millis(100));
75//!
76//! # let handle =
77//! tokio::spawn(async move {
78//!     sleep(Duration::from_secs(1)).await;
79//!     send.send("shut down").unwrap();
80//! });
81//!
82//! loop {
83//!     tokio::select! {
84//!         _ = interval.tick() => println!("Another 100ms"),
85//!         msg = &mut recv => {
86//!             println!("Got message: {}", msg.unwrap());
87//!             break;
88//!         }
89//!     }
90//! }
91//! # handle.await.unwrap();
92//! # }
93//! ```
94//!
95//! To use a `Sender` from a destructor, put it in an [`Option`] and call
96//! [`Option::take`].
97//!
98//! ```
99//! use tokio::sync::oneshot;
100//!
101//! struct SendOnDrop {
102//!     sender: Option<oneshot::Sender<&'static str>>,
103//! }
104//! impl Drop for SendOnDrop {
105//!     fn drop(&mut self) {
106//!         if let Some(sender) = self.sender.take() {
107//!             // Using `let _ =` to ignore send errors.
108//!             let _ = sender.send("I got dropped!");
109//!         }
110//!     }
111//! }
112//!
113//! # #[tokio::main(flavor = "current_thread")]
114//! # async fn _doc() {}
115//! # #[tokio::main(flavor = "current_thread")]
116//! # async fn main() {
117//! let (send, recv) = oneshot::channel();
118//!
119//! let send_on_drop = SendOnDrop { sender: Some(send) };
120//! drop(send_on_drop);
121//!
122//! assert_eq!(recv.await, Ok("I got dropped!"));
123//! # }
124//! ```
125
126use crate::loom::cell::UnsafeCell;
127use crate::loom::sync::atomic::AtomicUsize;
128use crate::loom::sync::Arc;
129#[cfg(all(tokio_unstable, feature = "tracing"))]
130use crate::util::trace;
131
132use std::fmt;
133use std::future::Future;
134use std::mem::MaybeUninit;
135use std::pin::Pin;
136use std::sync::atomic::Ordering::{self, AcqRel, Acquire};
137use std::task::Poll::{Pending, Ready};
138use std::task::{ready, Context, Poll, Waker};
139
140/// Sends a value to the associated [`Receiver`].
141///
142/// A pair of both a [`Sender`] and a [`Receiver`]  are created by the
143/// [`channel`](fn@channel) function.
144///
145/// # Examples
146///
147/// ```
148/// use tokio::sync::oneshot;
149///
150/// # #[tokio::main(flavor = "current_thread")]
151/// # async fn main() {
152/// let (tx, rx) = oneshot::channel();
153///
154/// tokio::spawn(async move {
155///     if let Err(_) = tx.send(3) {
156///         println!("the receiver dropped");
157///     }
158/// });
159///
160/// match rx.await {
161///     Ok(v) => println!("got = {:?}", v),
162///     Err(_) => println!("the sender dropped"),
163/// }
164/// # }
165/// ```
166///
167/// If the sender is dropped without sending, the receiver will fail with
168/// [`error::RecvError`]:
169///
170/// ```
171/// use tokio::sync::oneshot;
172///
173/// # #[tokio::main(flavor = "current_thread")]
174/// # async fn main() {
175/// let (tx, rx) = oneshot::channel::<u32>();
176///
177/// tokio::spawn(async move {
178///     drop(tx);
179/// });
180///
181/// match rx.await {
182///     Ok(_) => panic!("This doesn't happen"),
183///     Err(_) => println!("the sender dropped"),
184/// }
185/// # }
186/// ```
187///
188/// To use a `Sender` from a destructor, put it in an [`Option`] and call
189/// [`Option::take`].
190///
191/// ```
192/// use tokio::sync::oneshot;
193///
194/// struct SendOnDrop {
195///     sender: Option<oneshot::Sender<&'static str>>,
196/// }
197/// impl Drop for SendOnDrop {
198///     fn drop(&mut self) {
199///         if let Some(sender) = self.sender.take() {
200///             // Using `let _ =` to ignore send errors.
201///             let _ = sender.send("I got dropped!");
202///         }
203///     }
204/// }
205///
206/// # #[tokio::main(flavor = "current_thread")]
207/// # async fn _doc() {}
208/// # #[tokio::main(flavor = "current_thread")]
209/// # async fn main() {
210/// let (send, recv) = oneshot::channel();
211///
212/// let send_on_drop = SendOnDrop { sender: Some(send) };
213/// drop(send_on_drop);
214///
215/// assert_eq!(recv.await, Ok("I got dropped!"));
216/// # }
217/// ```
218///
219/// [`Option`]: std::option::Option
220/// [`Option::take`]: std::option::Option::take
221#[derive(Debug)]
222pub struct Sender<T> {
223    inner: Option<Arc<Inner<T>>>,
224    #[cfg(all(tokio_unstable, feature = "tracing"))]
225    resource_span: tracing::Span,
226}
227
228/// Receives a value from the associated [`Sender`].
229///
230/// A pair of both a [`Sender`] and a [`Receiver`]  are created by the
231/// [`channel`](fn@channel) function.
232///
233/// This channel has no `recv` method because the receiver itself implements the
234/// [`Future`] trait. To receive a `Result<T, `[`error::RecvError`]`>`, `.await` the `Receiver` object directly.
235///
236/// The `poll` method on the `Future` trait is allowed to spuriously return
237/// `Poll::Pending` even if the message has been sent. If such a spurious
238/// failure happens, then the caller will be woken when the spurious failure has
239/// been resolved so that the caller can attempt to receive the message again.
240/// Note that receiving such a wakeup does not guarantee that the next call will
241/// succeed — it could fail with another spurious failure. (A spurious failure
242/// does not mean that the message is lost. It is just delayed.)
243///
244/// [`Future`]: trait@std::future::Future
245///
246/// # Examples
247///
248/// ```
249/// use tokio::sync::oneshot;
250///
251/// # #[tokio::main(flavor = "current_thread")]
252/// # async fn main() {
253/// let (tx, rx) = oneshot::channel();
254///
255/// tokio::spawn(async move {
256///     if let Err(_) = tx.send(3) {
257///         println!("the receiver dropped");
258///     }
259/// });
260///
261/// match rx.await {
262///     Ok(v) => println!("got = {:?}", v),
263///     Err(_) => println!("the sender dropped"),
264/// }
265/// # }
266/// ```
267///
268/// If the sender is dropped without sending, the receiver will fail with
269/// [`error::RecvError`]:
270///
271/// ```
272/// use tokio::sync::oneshot;
273///
274/// # #[tokio::main(flavor = "current_thread")]
275/// # async fn main() {
276/// let (tx, rx) = oneshot::channel::<u32>();
277///
278/// tokio::spawn(async move {
279///     drop(tx);
280/// });
281///
282/// match rx.await {
283///     Ok(_) => panic!("This doesn't happen"),
284///     Err(_) => println!("the sender dropped"),
285/// }
286/// # }
287/// ```
288///
289/// To use a `Receiver` in a `tokio::select!` loop, add `&mut` in front of the
290/// channel.
291///
292/// ```
293/// use tokio::sync::oneshot;
294/// use tokio::time::{interval, sleep, Duration};
295///
296/// # #[tokio::main(flavor = "current_thread")]
297/// # async fn _doc() {}
298/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
299/// # async fn main() {
300/// let (send, mut recv) = oneshot::channel();
301/// let mut interval = interval(Duration::from_millis(100));
302///
303/// # let handle =
304/// tokio::spawn(async move {
305///     sleep(Duration::from_secs(1)).await;
306///     send.send("shut down").unwrap();
307/// });
308///
309/// loop {
310///     tokio::select! {
311///         _ = interval.tick() => println!("Another 100ms"),
312///         msg = &mut recv => {
313///             println!("Got message: {}", msg.unwrap());
314///             break;
315///         }
316///     }
317/// }
318/// # handle.await.unwrap();
319/// # }
320/// ```
321#[derive(Debug)]
322pub struct Receiver<T> {
323    inner: Option<Arc<Inner<T>>>,
324    #[cfg(all(tokio_unstable, feature = "tracing"))]
325    resource_span: tracing::Span,
326    #[cfg(all(tokio_unstable, feature = "tracing"))]
327    async_op_span: tracing::Span,
328    #[cfg(all(tokio_unstable, feature = "tracing"))]
329    async_op_poll_span: tracing::Span,
330}
331
332pub mod error {
333    //! `Oneshot` error types.
334
335    use std::fmt;
336
337    /// Error returned by the `Future` implementation for `Receiver`.
338    ///
339    /// This error is returned by the receiver when the sender is dropped without sending.
340    #[derive(Debug, Eq, PartialEq, Clone)]
341    pub struct RecvError(pub(super) ());
342
343    /// Error returned by the `try_recv` function on `Receiver`.
344    #[derive(Debug, Eq, PartialEq, Clone)]
345    pub enum TryRecvError {
346        /// The send half of the channel has not yet sent a value.
347        Empty,
348
349        /// The send half of the channel was dropped without sending a value.
350        Closed,
351    }
352
353    // ===== impl RecvError =====
354
355    impl fmt::Display for RecvError {
356        fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
357            write!(fmt, "channel closed")
358        }
359    }
360
361    impl std::error::Error for RecvError {}
362
363    // ===== impl TryRecvError =====
364
365    impl fmt::Display for TryRecvError {
366        fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
367            match self {
368                TryRecvError::Empty => write!(fmt, "channel empty"),
369                TryRecvError::Closed => write!(fmt, "channel closed"),
370            }
371        }
372    }
373
374    impl std::error::Error for TryRecvError {}
375}
376
377use self::error::*;
378
379struct Inner<T> {
380    /// Manages the state of the inner cell.
381    state: AtomicUsize,
382
383    /// The value. This is set by `Sender` and read by `Receiver`. The state of
384    /// the cell is tracked by `state`.
385    value: UnsafeCell<Option<T>>,
386
387    /// The task to notify when the receiver drops without consuming the value.
388    ///
389    /// ## Safety
390    ///
391    /// The `TX_TASK_SET` bit in the `state` field is set if this field is
392    /// initialized. If that bit is unset, this field may be uninitialized.
393    tx_task: Task,
394
395    /// The task to notify when the value is sent.
396    ///
397    /// ## Safety
398    ///
399    /// The `RX_TASK_SET` bit in the `state` field is set if this field is
400    /// initialized. If that bit is unset, this field may be uninitialized.
401    rx_task: Task,
402}
403
404struct Task(UnsafeCell<MaybeUninit<Waker>>);
405
406impl Task {
407    /// # Safety
408    ///
409    /// The caller must do the necessary synchronization to ensure that
410    /// the [`Self::0`] contains the valid [`Waker`] during the call.
411    unsafe fn will_wake(&self, cx: &mut Context<'_>) -> bool {
412        unsafe { self.with_task(|w| w.will_wake(cx.waker())) }
413    }
414
415    /// # Safety
416    ///
417    /// The caller must do the necessary synchronization to ensure that
418    /// the [`Self::0`] contains the valid [`Waker`] during the call.
419    unsafe fn with_task<F, R>(&self, f: F) -> R
420    where
421        F: FnOnce(&Waker) -> R,
422    {
423        self.0.with(|ptr| {
424            let waker: *const Waker = unsafe { (*ptr).as_ptr() };
425            f(unsafe { &*waker })
426        })
427    }
428
429    /// # Safety
430    ///
431    /// The caller must do the necessary synchronization to ensure that
432    /// the [`Self::0`] contains the valid [`Waker`] during the call.
433    unsafe fn drop_task(&self) {
434        self.0.with_mut(|ptr| {
435            let ptr: *mut Waker = unsafe { (*ptr).as_mut_ptr() };
436            unsafe {
437                ptr.drop_in_place();
438            }
439        });
440    }
441
442    /// # Safety
443    ///
444    /// The caller must do the necessary synchronization to ensure that
445    /// the [`Self::0`] contains the valid [`Waker`] during the call.
446    unsafe fn set_task(&self, cx: &mut Context<'_>) {
447        self.0.with_mut(|ptr| {
448            let ptr: *mut Waker = unsafe { (*ptr).as_mut_ptr() };
449            unsafe {
450                ptr.write(cx.waker().clone());
451            }
452        });
453    }
454}
455
456#[derive(Clone, Copy)]
457struct State(usize);
458
459/// Creates a new one-shot channel for sending single values across asynchronous
460/// tasks.
461///
462/// The function returns separate "send" and "receive" handles. The `Sender`
463/// handle is used by the producer to send the value. The `Receiver` handle is
464/// used by the consumer to receive the value.
465///
466/// Each handle can be used on separate tasks.
467///
468/// # Examples
469///
470/// ```
471/// use tokio::sync::oneshot;
472///
473/// # #[tokio::main(flavor = "current_thread")]
474/// # async fn main() {
475/// let (tx, rx) = oneshot::channel();
476///
477/// tokio::spawn(async move {
478///     if let Err(_) = tx.send(3) {
479///         println!("the receiver dropped");
480///     }
481/// });
482///
483/// match rx.await {
484///     Ok(v) => println!("got = {:?}", v),
485///     Err(_) => println!("the sender dropped"),
486/// }
487/// # }
488/// ```
489#[track_caller]
490pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
491    #[cfg(all(tokio_unstable, feature = "tracing"))]
492    let resource_span = {
493        let location = std::panic::Location::caller();
494
495        let resource_span = tracing::trace_span!(
496            parent: None,
497            "runtime.resource",
498            concrete_type = "Sender|Receiver",
499            kind = "Sync",
500            loc.file = location.file(),
501            loc.line = location.line(),
502            loc.col = location.column(),
503        );
504
505        resource_span.in_scope(|| {
506            tracing::trace!(
507            target: "runtime::resource::state_update",
508            tx_dropped = false,
509            tx_dropped.op = "override",
510            )
511        });
512
513        resource_span.in_scope(|| {
514            tracing::trace!(
515            target: "runtime::resource::state_update",
516            rx_dropped = false,
517            rx_dropped.op = "override",
518            )
519        });
520
521        resource_span.in_scope(|| {
522            tracing::trace!(
523            target: "runtime::resource::state_update",
524            value_sent = false,
525            value_sent.op = "override",
526            )
527        });
528
529        resource_span.in_scope(|| {
530            tracing::trace!(
531            target: "runtime::resource::state_update",
532            value_received = false,
533            value_received.op = "override",
534            )
535        });
536
537        resource_span
538    };
539
540    let inner = Arc::new(Inner {
541        state: AtomicUsize::new(State::new().as_usize()),
542        value: UnsafeCell::new(None),
543        tx_task: Task(UnsafeCell::new(MaybeUninit::uninit())),
544        rx_task: Task(UnsafeCell::new(MaybeUninit::uninit())),
545    });
546
547    let tx = Sender {
548        inner: Some(inner.clone()),
549        #[cfg(all(tokio_unstable, feature = "tracing"))]
550        resource_span: resource_span.clone(),
551    };
552
553    #[cfg(all(tokio_unstable, feature = "tracing"))]
554    let async_op_span = resource_span
555        .in_scope(|| tracing::trace_span!("runtime.resource.async_op", source = "Receiver::await"));
556
557    #[cfg(all(tokio_unstable, feature = "tracing"))]
558    let async_op_poll_span =
559        async_op_span.in_scope(|| tracing::trace_span!("runtime.resource.async_op.poll"));
560
561    let rx = Receiver {
562        inner: Some(inner),
563        #[cfg(all(tokio_unstable, feature = "tracing"))]
564        resource_span,
565        #[cfg(all(tokio_unstable, feature = "tracing"))]
566        async_op_span,
567        #[cfg(all(tokio_unstable, feature = "tracing"))]
568        async_op_poll_span,
569    };
570
571    (tx, rx)
572}
573
574impl<T> Sender<T> {
575    /// Attempts to send a value on this channel, returning it back if it could
576    /// not be sent.
577    ///
578    /// This method consumes `self` as only one value may ever be sent on a `oneshot`
579    /// channel. It is not marked async because sending a message to an `oneshot`
580    /// channel never requires any form of waiting.  Because of this, the `send`
581    /// method can be used in both synchronous and asynchronous code without
582    /// problems.
583    ///
584    /// A successful send occurs when it is determined that the other end of the
585    /// channel has not hung up already. An unsuccessful send would be one where
586    /// the corresponding receiver has already been deallocated. Note that a
587    /// return value of `Err` means that the data will never be received, but
588    /// a return value of `Ok` does *not* mean that the data will be received.
589    /// It is possible for the corresponding receiver to hang up immediately
590    /// after this function returns `Ok`.
591    ///
592    /// # Examples
593    ///
594    /// Send a value to another task
595    ///
596    /// ```
597    /// use tokio::sync::oneshot;
598    ///
599    /// # #[tokio::main(flavor = "current_thread")]
600    /// # async fn main() {
601    /// let (tx, rx) = oneshot::channel();
602    ///
603    /// tokio::spawn(async move {
604    ///     if let Err(_) = tx.send(3) {
605    ///         println!("the receiver dropped");
606    ///     }
607    /// });
608    ///
609    /// match rx.await {
610    ///     Ok(v) => println!("got = {:?}", v),
611    ///     Err(_) => println!("the sender dropped"),
612    /// }
613    /// # }
614    /// ```
615    pub fn send(mut self, t: T) -> Result<(), T> {
616        let inner = self.inner.take().unwrap();
617
618        inner.value.with_mut(|ptr| unsafe {
619            // SAFETY: The receiver will not access the `UnsafeCell` unless the
620            // channel has been marked as "complete" (the `VALUE_SENT` state bit
621            // is set).
622            // That bit is only set by the sender later on in this method, and
623            // calling this method consumes `self`. Therefore, if it was possible to
624            // call this method, we know that the `VALUE_SENT` bit is unset, and
625            // the receiver is not currently accessing the `UnsafeCell`.
626            *ptr = Some(t);
627        });
628
629        if !inner.complete() {
630            unsafe {
631                // SAFETY: The receiver will not access the `UnsafeCell` unless
632                // the channel has been marked as "complete". Calling
633                // `complete()` will return true if this bit is set, and false
634                // if it is not set. Thus, if `complete()` returned false, it is
635                // safe for us to access the value, because we know that the
636                // receiver will not.
637                return Err(inner.consume_value().unwrap());
638            }
639        }
640
641        #[cfg(all(tokio_unstable, feature = "tracing"))]
642        self.resource_span.in_scope(|| {
643            tracing::trace!(
644            target: "runtime::resource::state_update",
645            value_sent = true,
646            value_sent.op = "override",
647            )
648        });
649
650        Ok(())
651    }
652
653    /// Waits for the associated [`Receiver`] handle to close.
654    ///
655    /// A [`Receiver`] is closed by either calling [`close`] explicitly or the
656    /// [`Receiver`] value is dropped.
657    ///
658    /// This function is useful when paired with `select!` to abort a
659    /// computation when the receiver is no longer interested in the result.
660    ///
661    /// # Return
662    ///
663    /// Returns a `Future` which must be awaited on.
664    ///
665    /// [`Receiver`]: Receiver
666    /// [`close`]: Receiver::close
667    ///
668    /// # Examples
669    ///
670    /// Basic usage
671    ///
672    /// ```
673    /// use tokio::sync::oneshot;
674    ///
675    /// # #[tokio::main(flavor = "current_thread")]
676    /// # async fn main() {
677    /// let (mut tx, rx) = oneshot::channel::<()>();
678    ///
679    /// tokio::spawn(async move {
680    ///     drop(rx);
681    /// });
682    ///
683    /// tx.closed().await;
684    /// println!("the receiver dropped");
685    /// # }
686    /// ```
687    ///
688    /// Paired with select
689    ///
690    /// ```
691    /// use tokio::sync::oneshot;
692    /// use tokio::time::{self, Duration};
693    ///
694    /// async fn compute() -> String {
695    ///     // Complex computation returning a `String`
696    /// # "hello".to_string()
697    /// }
698    ///
699    /// # #[tokio::main(flavor = "current_thread")]
700    /// # async fn main() {
701    /// let (mut tx, rx) = oneshot::channel();
702    ///
703    /// tokio::spawn(async move {
704    ///     tokio::select! {
705    ///         _ = tx.closed() => {
706    ///             // The receiver dropped, no need to do any further work
707    ///         }
708    ///         value = compute() => {
709    ///             // The send can fail if the channel was closed at the exact same
710    ///             // time as when compute() finished, so just ignore the failure.
711    ///             let _ = tx.send(value);
712    ///         }
713    ///     }
714    /// });
715    ///
716    /// // Wait for up to 10 seconds
717    /// let _ = time::timeout(Duration::from_secs(10), rx).await;
718    /// # }
719    /// ```
720    pub async fn closed(&mut self) {
721        use std::future::poll_fn;
722
723        #[cfg(all(tokio_unstable, feature = "tracing"))]
724        let resource_span = self.resource_span.clone();
725        #[cfg(all(tokio_unstable, feature = "tracing"))]
726        let closed = trace::async_op(
727            || poll_fn(|cx| self.poll_closed(cx)),
728            resource_span,
729            "Sender::closed",
730            "poll_closed",
731            false,
732        );
733        #[cfg(not(all(tokio_unstable, feature = "tracing")))]
734        let closed = poll_fn(|cx| self.poll_closed(cx));
735
736        closed.await;
737    }
738
739    /// Returns `true` if the associated [`Receiver`] handle has been dropped.
740    ///
741    /// A [`Receiver`] is closed by either calling [`close`] explicitly or the
742    /// [`Receiver`] value is dropped.
743    ///
744    /// If `true` is returned, a call to `send` will always result in an error.
745    ///
746    /// [`Receiver`]: Receiver
747    /// [`close`]: Receiver::close
748    ///
749    /// # Examples
750    ///
751    /// ```
752    /// use tokio::sync::oneshot;
753    ///
754    /// # #[tokio::main(flavor = "current_thread")]
755    /// # async fn main() {
756    /// let (tx, rx) = oneshot::channel();
757    ///
758    /// assert!(!tx.is_closed());
759    ///
760    /// drop(rx);
761    ///
762    /// assert!(tx.is_closed());
763    /// assert!(tx.send("never received").is_err());
764    /// # }
765    /// ```
766    pub fn is_closed(&self) -> bool {
767        let inner = self.inner.as_ref().unwrap();
768
769        let state = State::load(&inner.state, Acquire);
770        state.is_closed()
771    }
772
773    /// Checks whether the `oneshot` channel has been closed, and if not, schedules the
774    /// `Waker` in the provided `Context` to receive a notification when the channel is
775    /// closed.
776    ///
777    /// A [`Receiver`] is closed by either calling [`close`] explicitly, or when the
778    /// [`Receiver`] value is dropped.
779    ///
780    /// Note that on multiple calls to poll, only the `Waker` from the `Context` passed
781    /// to the most recent call will be scheduled to receive a wakeup.
782    ///
783    /// [`Receiver`]: struct@crate::sync::oneshot::Receiver
784    /// [`close`]: fn@crate::sync::oneshot::Receiver::close
785    ///
786    /// # Return value
787    ///
788    /// This function returns:
789    ///
790    ///  * `Poll::Pending` if the channel is still open.
791    ///  * `Poll::Ready(())` if the channel is closed.
792    ///
793    /// # Examples
794    ///
795    /// ```
796    /// use tokio::sync::oneshot;
797    ///
798    /// use std::future::poll_fn;
799    ///
800    /// # #[tokio::main(flavor = "current_thread")]
801    /// # async fn main() {
802    /// let (mut tx, mut rx) = oneshot::channel::<()>();
803    ///
804    /// tokio::spawn(async move {
805    ///     rx.close();
806    /// });
807    ///
808    /// poll_fn(|cx| tx.poll_closed(cx)).await;
809    ///
810    /// println!("the receiver dropped");
811    /// # }
812    /// ```
813    pub fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
814        ready!(crate::trace::trace_leaf(cx));
815
816        // Keep track of task budget
817        let coop = ready!(crate::task::coop::poll_proceed(cx));
818
819        let inner = self.inner.as_ref().unwrap();
820
821        let mut state = State::load(&inner.state, Acquire);
822
823        if state.is_closed() {
824            coop.made_progress();
825            return Ready(());
826        }
827
828        if state.is_tx_task_set() {
829            let will_notify = unsafe { inner.tx_task.will_wake(cx) };
830
831            if !will_notify {
832                state = State::unset_tx_task(&inner.state);
833
834                if state.is_closed() {
835                    // Set the flag again so that the waker is released in drop
836                    State::set_tx_task(&inner.state);
837                    coop.made_progress();
838                    return Ready(());
839                } else {
840                    unsafe { inner.tx_task.drop_task() };
841                }
842            }
843        }
844
845        if !state.is_tx_task_set() {
846            // Attempt to set the task
847            unsafe {
848                inner.tx_task.set_task(cx);
849            }
850
851            // Update the state
852            state = State::set_tx_task(&inner.state);
853
854            if state.is_closed() {
855                coop.made_progress();
856                return Ready(());
857            }
858        }
859
860        Pending
861    }
862}
863
864impl<T> Drop for Sender<T> {
865    fn drop(&mut self) {
866        if let Some(inner) = self.inner.as_ref() {
867            inner.complete();
868            #[cfg(all(tokio_unstable, feature = "tracing"))]
869            self.resource_span.in_scope(|| {
870                tracing::trace!(
871                target: "runtime::resource::state_update",
872                tx_dropped = true,
873                tx_dropped.op = "override",
874                )
875            });
876        }
877    }
878}
879
880impl<T> Receiver<T> {
881    /// Prevents the associated [`Sender`] handle from sending a value.
882    ///
883    /// Any `send` operation which happens after calling `close` is guaranteed
884    /// to fail. After calling `close`, [`try_recv`] should be called to
885    /// receive a value if one was sent **before** the call to `close`
886    /// completed.
887    ///
888    /// This function is useful to perform a graceful shutdown and ensure that a
889    /// value will not be sent into the channel and never received.
890    ///
891    /// `close` is no-op if a message is already received or the channel
892    /// is already closed.
893    ///
894    /// [`Sender`]: Sender
895    /// [`try_recv`]: Receiver::try_recv
896    ///
897    /// # Examples
898    ///
899    /// Prevent a value from being sent
900    ///
901    /// ```
902    /// use tokio::sync::oneshot;
903    /// use tokio::sync::oneshot::error::TryRecvError;
904    ///
905    /// # #[tokio::main(flavor = "current_thread")]
906    /// # async fn main() {
907    /// let (tx, mut rx) = oneshot::channel();
908    ///
909    /// assert!(!tx.is_closed());
910    ///
911    /// rx.close();
912    ///
913    /// assert!(tx.is_closed());
914    /// assert!(tx.send("never received").is_err());
915    ///
916    /// match rx.try_recv() {
917    ///     Err(TryRecvError::Closed) => {}
918    ///     _ => unreachable!(),
919    /// }
920    /// # }
921    /// ```
922    ///
923    /// Receive a value sent **before** calling `close`
924    ///
925    /// ```
926    /// use tokio::sync::oneshot;
927    ///
928    /// # #[tokio::main(flavor = "current_thread")]
929    /// # async fn main() {
930    /// let (tx, mut rx) = oneshot::channel();
931    ///
932    /// assert!(tx.send("will receive").is_ok());
933    ///
934    /// rx.close();
935    ///
936    /// let msg = rx.try_recv().unwrap();
937    /// assert_eq!(msg, "will receive");
938    /// # }
939    /// ```
940    pub fn close(&mut self) {
941        if let Some(inner) = self.inner.as_ref() {
942            inner.close();
943            #[cfg(all(tokio_unstable, feature = "tracing"))]
944            self.resource_span.in_scope(|| {
945                tracing::trace!(
946                target: "runtime::resource::state_update",
947                rx_dropped = true,
948                rx_dropped.op = "override",
949                )
950            });
951        }
952    }
953
954    /// Checks if this receiver is terminated.
955    ///
956    /// This function returns true if this receiver has already yielded a [`Poll::Ready`] result.
957    /// If so, this receiver should no longer be polled.
958    ///
959    /// # Examples
960    ///
961    /// Sending a value and polling it.
962    ///
963    /// ```
964    /// use tokio::sync::oneshot;
965    ///
966    /// use std::task::Poll;
967    ///
968    /// # #[tokio::main(flavor = "current_thread")]
969    /// # async fn main() {
970    /// let (tx, mut rx) = oneshot::channel();
971    ///
972    /// // A receiver is not terminated when it is initialized.
973    /// assert!(!rx.is_terminated());
974    ///
975    /// // A receiver is not terminated it is polled and is still pending.
976    /// let poll = futures::poll!(&mut rx);
977    /// assert_eq!(poll, Poll::Pending);
978    /// assert!(!rx.is_terminated());
979    ///
980    /// // A receiver is not terminated if a value has been sent, but not yet read.
981    /// tx.send(0).unwrap();
982    /// assert!(!rx.is_terminated());
983    ///
984    /// // A receiver *is* terminated after it has been polled and yielded a value.
985    /// assert_eq!((&mut rx).await, Ok(0));
986    /// assert!(rx.is_terminated());
987    /// # }
988    /// ```
989    ///
990    /// Dropping the sender.
991    ///
992    /// ```
993    /// use tokio::sync::oneshot;
994    ///
995    /// # #[tokio::main(flavor = "current_thread")]
996    /// # async fn main() {
997    /// let (tx, mut rx) = oneshot::channel::<()>();
998    ///
999    /// // A receiver is not immediately terminated when the sender is dropped.
1000    /// drop(tx);
1001    /// assert!(!rx.is_terminated());
1002    ///
1003    /// // A receiver *is* terminated after it has been polled and yielded an error.
1004    /// let _ = (&mut rx).await.unwrap_err();
1005    /// assert!(rx.is_terminated());
1006    /// # }
1007    /// ```
1008    pub fn is_terminated(&self) -> bool {
1009        self.inner.is_none()
1010    }
1011
1012    /// Checks if a channel is empty.
1013    ///
1014    /// This method returns `true` if the channel has no messages.
1015    ///
1016    /// It is not necessarily safe to poll an empty receiver, which may have
1017    /// already yielded a value. Use [`is_terminated()`][Self::is_terminated]
1018    /// to check whether or not a receiver can be safely polled, instead.
1019    ///
1020    /// # Examples
1021    ///
1022    /// Sending a value.
1023    ///
1024    /// ```
1025    /// use tokio::sync::oneshot;
1026    ///
1027    /// # #[tokio::main(flavor = "current_thread")]
1028    /// # async fn main() {
1029    /// let (tx, mut rx) = oneshot::channel();
1030    /// assert!(rx.is_empty());
1031    ///
1032    /// tx.send(0).unwrap();
1033    /// assert!(!rx.is_empty());
1034    ///
1035    /// let _ = (&mut rx).await;
1036    /// assert!(rx.is_empty());
1037    /// # }
1038    /// ```
1039    ///
1040    /// Dropping the sender.
1041    ///
1042    /// ```
1043    /// use tokio::sync::oneshot;
1044    ///
1045    /// # #[tokio::main(flavor = "current_thread")]
1046    /// # async fn main() {
1047    /// let (tx, mut rx) = oneshot::channel::<()>();
1048    ///
1049    /// // A channel is empty if the sender is dropped.
1050    /// drop(tx);
1051    /// assert!(rx.is_empty());
1052    ///
1053    /// // A closed channel still yields an error, however.
1054    /// (&mut rx).await.expect_err("should yield an error");
1055    /// assert!(rx.is_empty());
1056    /// # }
1057    /// ```
1058    ///
1059    /// Terminated channels are empty.
1060    ///
1061    /// ```should_panic,ignore-wasm
1062    /// use tokio::sync::oneshot;
1063    ///
1064    /// #[tokio::main]
1065    /// async fn main() {
1066    ///     let (tx, mut rx) = oneshot::channel();
1067    ///     tx.send(0).unwrap();
1068    ///     let _ = (&mut rx).await;
1069    ///
1070    ///     // NB: an empty channel is not necessarily safe to poll!
1071    ///     assert!(rx.is_empty());
1072    ///     let _ = (&mut rx).await;
1073    /// }
1074    /// ```
1075    pub fn is_empty(&self) -> bool {
1076        let Some(inner) = self.inner.as_ref() else {
1077            // The channel has already terminated.
1078            return true;
1079        };
1080
1081        let state = State::load(&inner.state, Acquire);
1082        if state.is_complete() {
1083            // SAFETY: If `state.is_complete()` returns true, then the
1084            // `VALUE_SENT` bit has been set and the sender side of the
1085            // channel will no longer attempt to access the inner
1086            // `UnsafeCell`. Therefore, it is now safe for us to access the
1087            // cell.
1088            //
1089            // The channel is empty if it does not have a value.
1090            unsafe { !inner.has_value() }
1091        } else {
1092            // The receiver closed the channel or no value has been sent yet.
1093            true
1094        }
1095    }
1096
1097    /// Attempts to receive a value.
1098    ///
1099    /// If a pending value exists in the channel, it is returned. If no value
1100    /// has been sent, the current task **will not** be registered for
1101    /// future notification.
1102    ///
1103    /// This function is useful to call from outside the context of an
1104    /// asynchronous task.
1105    ///
1106    /// Note that unlike the `poll` method, the `try_recv` method cannot fail
1107    /// spuriously. Any send or close event that happens before this call to
1108    /// `try_recv` will be correctly returned to the caller.
1109    ///
1110    /// # Return
1111    ///
1112    /// - `Ok(T)` if a value is pending in the channel.
1113    /// - `Err(TryRecvError::Empty)` if no value has been sent yet.
1114    /// - `Err(TryRecvError::Closed)` if the sender has dropped without sending
1115    ///   a value, or if the message has already been received.
1116    ///
1117    /// # Examples
1118    ///
1119    /// `try_recv` before a value is sent, then after.
1120    ///
1121    /// ```
1122    /// use tokio::sync::oneshot;
1123    /// use tokio::sync::oneshot::error::TryRecvError;
1124    ///
1125    /// # #[tokio::main(flavor = "current_thread")]
1126    /// # async fn main() {
1127    /// let (tx, mut rx) = oneshot::channel();
1128    ///
1129    /// match rx.try_recv() {
1130    ///     // The channel is currently empty
1131    ///     Err(TryRecvError::Empty) => {}
1132    ///     _ => unreachable!(),
1133    /// }
1134    ///
1135    /// // Send a value
1136    /// tx.send("hello").unwrap();
1137    ///
1138    /// match rx.try_recv() {
1139    ///      Ok(value) => assert_eq!(value, "hello"),
1140    ///      _ => unreachable!(),
1141    /// }
1142    /// # }
1143    /// ```
1144    ///
1145    /// `try_recv` when the sender dropped before sending a value
1146    ///
1147    /// ```
1148    /// use tokio::sync::oneshot;
1149    /// use tokio::sync::oneshot::error::TryRecvError;
1150    ///
1151    /// # #[tokio::main(flavor = "current_thread")]
1152    /// # async fn main() {
1153    /// let (tx, mut rx) = oneshot::channel::<()>();
1154    ///
1155    /// drop(tx);
1156    ///
1157    /// match rx.try_recv() {
1158    ///     // The channel will never receive a value.
1159    ///     Err(TryRecvError::Closed) => {}
1160    ///     _ => unreachable!(),
1161    /// }
1162    /// # }
1163    /// ```
1164    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
1165        let result = if let Some(inner) = self.inner.as_ref() {
1166            let state = State::load(&inner.state, Acquire);
1167
1168            if state.is_complete() {
1169                // SAFETY: If `state.is_complete()` returns true, then the
1170                // `VALUE_SENT` bit has been set and the sender side of the
1171                // channel will no longer attempt to access the inner
1172                // `UnsafeCell`. Therefore, it is now safe for us to access the
1173                // cell.
1174                match unsafe { inner.consume_value() } {
1175                    Some(value) => {
1176                        #[cfg(all(tokio_unstable, feature = "tracing"))]
1177                        self.resource_span.in_scope(|| {
1178                            tracing::trace!(
1179                            target: "runtime::resource::state_update",
1180                            value_received = true,
1181                            value_received.op = "override",
1182                            )
1183                        });
1184                        Ok(value)
1185                    }
1186                    None => Err(TryRecvError::Closed),
1187                }
1188            } else if state.is_closed() {
1189                Err(TryRecvError::Closed)
1190            } else {
1191                // Not ready, this does not clear `inner`
1192                return Err(TryRecvError::Empty);
1193            }
1194        } else {
1195            Err(TryRecvError::Closed)
1196        };
1197
1198        self.inner = None;
1199        result
1200    }
1201
1202    /// Blocking receive to call outside of asynchronous contexts.
1203    ///
1204    /// # Panics
1205    ///
1206    /// This function panics if called within an asynchronous execution
1207    /// context.
1208    ///
1209    /// # Examples
1210    ///
1211    /// ```
1212    /// # #[cfg(not(target_family = "wasm"))]
1213    /// # {
1214    /// use std::thread;
1215    /// use tokio::sync::oneshot;
1216    ///
1217    /// #[tokio::main]
1218    /// async fn main() {
1219    ///     let (tx, rx) = oneshot::channel::<u8>();
1220    ///
1221    ///     let sync_code = thread::spawn(move || {
1222    ///         assert_eq!(Ok(10), rx.blocking_recv());
1223    ///     });
1224    ///
1225    ///     let _ = tx.send(10);
1226    ///     sync_code.join().unwrap();
1227    /// }
1228    /// # }
1229    /// ```
1230    #[track_caller]
1231    #[cfg(feature = "sync")]
1232    #[cfg_attr(docsrs, doc(alias = "recv_blocking"))]
1233    pub fn blocking_recv(self) -> Result<T, RecvError> {
1234        crate::future::block_on(self)
1235    }
1236}
1237
1238impl<T> Drop for Receiver<T> {
1239    fn drop(&mut self) {
1240        if let Some(inner) = self.inner.as_ref() {
1241            let state = inner.close();
1242
1243            if state.is_complete() {
1244                // SAFETY: we have ensured that the `VALUE_SENT` bit has been set,
1245                // so only the receiver can access the value.
1246                drop(unsafe { inner.consume_value() });
1247            }
1248
1249            #[cfg(all(tokio_unstable, feature = "tracing"))]
1250            self.resource_span.in_scope(|| {
1251                tracing::trace!(
1252                target: "runtime::resource::state_update",
1253                rx_dropped = true,
1254                rx_dropped.op = "override",
1255                )
1256            });
1257        }
1258    }
1259}
1260
1261impl<T> Future for Receiver<T> {
1262    type Output = Result<T, RecvError>;
1263
1264    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1265        // If `inner` is `None`, then `poll()` has already completed.
1266        #[cfg(all(tokio_unstable, feature = "tracing"))]
1267        let _res_span = self.resource_span.clone().entered();
1268        #[cfg(all(tokio_unstable, feature = "tracing"))]
1269        let _ao_span = self.async_op_span.clone().entered();
1270        #[cfg(all(tokio_unstable, feature = "tracing"))]
1271        let _ao_poll_span = self.async_op_poll_span.clone().entered();
1272
1273        let ret = if let Some(inner) = self.as_ref().get_ref().inner.as_ref() {
1274            #[cfg(all(tokio_unstable, feature = "tracing"))]
1275            let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx))).map_err(Into::into);
1276
1277            #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
1278            let res = ready!(inner.poll_recv(cx)).map_err(Into::into);
1279
1280            res
1281        } else {
1282            panic!("called after complete");
1283        };
1284
1285        self.inner = None;
1286        Ready(ret)
1287    }
1288}
1289
1290impl<T> Inner<T> {
1291    fn complete(&self) -> bool {
1292        let prev = State::set_complete(&self.state);
1293
1294        if prev.is_closed() {
1295            return false;
1296        }
1297
1298        if prev.is_rx_task_set() {
1299            // TODO: Consume waker?
1300            unsafe {
1301                self.rx_task.with_task(Waker::wake_by_ref);
1302            }
1303        }
1304
1305        true
1306    }
1307
1308    fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
1309        ready!(crate::trace::trace_leaf(cx));
1310        // Keep track of task budget
1311        let coop = ready!(crate::task::coop::poll_proceed(cx));
1312
1313        // Load the state
1314        let mut state = State::load(&self.state, Acquire);
1315
1316        if state.is_complete() {
1317            coop.made_progress();
1318            match unsafe { self.consume_value() } {
1319                Some(value) => Ready(Ok(value)),
1320                None => Ready(Err(RecvError(()))),
1321            }
1322        } else if state.is_closed() {
1323            coop.made_progress();
1324            Ready(Err(RecvError(())))
1325        } else {
1326            if state.is_rx_task_set() {
1327                let will_notify = unsafe { self.rx_task.will_wake(cx) };
1328
1329                // Check if the task is still the same
1330                if !will_notify {
1331                    // Unset the task
1332                    state = State::unset_rx_task(&self.state);
1333                    if state.is_complete() {
1334                        // Set the flag again so that the waker is released in drop
1335                        State::set_rx_task(&self.state);
1336
1337                        coop.made_progress();
1338                        // SAFETY: If `state.is_complete()` returns true, then the
1339                        // `VALUE_SENT` bit has been set and the sender side of the
1340                        // channel will no longer attempt to access the inner
1341                        // `UnsafeCell`. Therefore, it is now safe for us to access the
1342                        // cell.
1343                        return match unsafe { self.consume_value() } {
1344                            Some(value) => Ready(Ok(value)),
1345                            None => Ready(Err(RecvError(()))),
1346                        };
1347                    } else {
1348                        unsafe { self.rx_task.drop_task() };
1349                    }
1350                }
1351            }
1352
1353            if !state.is_rx_task_set() {
1354                // Attempt to set the task
1355                unsafe {
1356                    self.rx_task.set_task(cx);
1357                }
1358
1359                // Update the state
1360                state = State::set_rx_task(&self.state);
1361
1362                if state.is_complete() {
1363                    coop.made_progress();
1364                    match unsafe { self.consume_value() } {
1365                        Some(value) => Ready(Ok(value)),
1366                        None => Ready(Err(RecvError(()))),
1367                    }
1368                } else {
1369                    Pending
1370                }
1371            } else {
1372                Pending
1373            }
1374        }
1375    }
1376
1377    /// Called by `Receiver` to indicate that the value will never be received.
1378    fn close(&self) -> State {
1379        let prev = State::set_closed(&self.state);
1380
1381        if prev.is_tx_task_set() && !prev.is_complete() {
1382            unsafe {
1383                self.tx_task.with_task(Waker::wake_by_ref);
1384            }
1385        }
1386
1387        prev
1388    }
1389
1390    /// Consumes the value. This function does not check `state`.
1391    ///
1392    /// # Safety
1393    ///
1394    /// Calling this method concurrently on multiple threads will result in a
1395    /// data race. The `VALUE_SENT` state bit is used to ensure that only the
1396    /// sender *or* the receiver will call this method at a given point in time.
1397    /// If `VALUE_SENT` is not set, then only the sender may call this method;
1398    /// if it is set, then only the receiver may call this method.
1399    unsafe fn consume_value(&self) -> Option<T> {
1400        self.value.with_mut(|ptr| unsafe { (*ptr).take() })
1401    }
1402
1403    /// Returns true if there is a value. This function does not check `state`.
1404    ///
1405    /// # Safety
1406    ///
1407    /// Calling this method concurrently on multiple threads will result in a
1408    /// data race. The `VALUE_SENT` state bit is used to ensure that only the
1409    /// sender *or* the receiver will call this method at a given point in time.
1410    /// If `VALUE_SENT` is not set, then only the sender may call this method;
1411    /// if it is set, then only the receiver may call this method.
1412    unsafe fn has_value(&self) -> bool {
1413        self.value.with(|ptr| unsafe { (*ptr).is_some() })
1414    }
1415}
1416
1417unsafe impl<T: Send> Send for Inner<T> {}
1418unsafe impl<T: Send> Sync for Inner<T> {}
1419
1420fn mut_load(this: &mut AtomicUsize) -> usize {
1421    this.with_mut(|v| *v)
1422}
1423
1424impl<T> Drop for Inner<T> {
1425    fn drop(&mut self) {
1426        let state = State(mut_load(&mut self.state));
1427
1428        if state.is_rx_task_set() {
1429            unsafe {
1430                self.rx_task.drop_task();
1431            }
1432        }
1433
1434        if state.is_tx_task_set() {
1435            unsafe {
1436                self.tx_task.drop_task();
1437            }
1438        }
1439
1440        // SAFETY: we have `&mut self`, and therefore we have
1441        // exclusive access to the value.
1442        unsafe {
1443            // Note: the assertion holds because if the value has been sent by sender,
1444            // we must ensure that the value must have been consumed by the receiver before
1445            // dropping the `Inner`.
1446            debug_assert!(self.consume_value().is_none());
1447        }
1448    }
1449}
1450
1451impl<T: fmt::Debug> fmt::Debug for Inner<T> {
1452    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1453        use std::sync::atomic::Ordering::Relaxed;
1454
1455        fmt.debug_struct("Inner")
1456            .field("state", &State::load(&self.state, Relaxed))
1457            .finish()
1458    }
1459}
1460
1461/// Indicates that a waker for the receiving task has been set.
1462///
1463/// # Safety
1464///
1465/// If this bit is not set, the `rx_task` field may be uninitialized.
1466const RX_TASK_SET: usize = 0b00001;
1467/// Indicates that a value has been stored in the channel's inner `UnsafeCell`.
1468///
1469/// # Safety
1470///
1471/// This bit controls which side of the channel is permitted to access the
1472/// `UnsafeCell`. If it is set, the `UnsafeCell` may ONLY be accessed by the
1473/// receiver. If this bit is NOT set, the `UnsafeCell` may ONLY be accessed by
1474/// the sender.
1475const VALUE_SENT: usize = 0b00010;
1476const CLOSED: usize = 0b00100;
1477
1478/// Indicates that a waker for the sending task has been set.
1479///
1480/// # Safety
1481///
1482/// If this bit is not set, the `tx_task` field may be uninitialized.
1483const TX_TASK_SET: usize = 0b01000;
1484
1485impl State {
1486    fn new() -> State {
1487        State(0)
1488    }
1489
1490    fn is_complete(self) -> bool {
1491        self.0 & VALUE_SENT == VALUE_SENT
1492    }
1493
1494    fn set_complete(cell: &AtomicUsize) -> State {
1495        // This method is a compare-and-swap loop rather than a fetch-or like
1496        // other `set_$WHATEVER` methods on `State`. This is because we must
1497        // check if the state has been closed before setting the `VALUE_SENT`
1498        // bit.
1499        //
1500        // We don't want to set both the `VALUE_SENT` bit if the `CLOSED`
1501        // bit is already set, because `VALUE_SENT` will tell the receiver that
1502        // it's okay to access the inner `UnsafeCell`. Immediately after calling
1503        // `set_complete`, if the channel was closed, the sender will _also_
1504        // access the `UnsafeCell` to take the value back out, so if a
1505        // `poll_recv` or `try_recv` call is occurring concurrently, both
1506        // threads may try to access the `UnsafeCell` if we were to set the
1507        // `VALUE_SENT` bit on a closed channel.
1508        let mut state = cell.load(Ordering::Relaxed);
1509        loop {
1510            if State(state).is_closed() {
1511                break;
1512            }
1513            // TODO: This could be `Release`, followed by an `Acquire` fence *if*
1514            // the `RX_TASK_SET` flag is set. However, `loom` does not support
1515            // fences yet.
1516            match cell.compare_exchange_weak(
1517                state,
1518                state | VALUE_SENT,
1519                Ordering::AcqRel,
1520                Ordering::Acquire,
1521            ) {
1522                Ok(_) => break,
1523                Err(actual) => state = actual,
1524            }
1525        }
1526        State(state)
1527    }
1528
1529    fn is_rx_task_set(self) -> bool {
1530        self.0 & RX_TASK_SET == RX_TASK_SET
1531    }
1532
1533    fn set_rx_task(cell: &AtomicUsize) -> State {
1534        let val = cell.fetch_or(RX_TASK_SET, AcqRel);
1535        State(val | RX_TASK_SET)
1536    }
1537
1538    fn unset_rx_task(cell: &AtomicUsize) -> State {
1539        let val = cell.fetch_and(!RX_TASK_SET, AcqRel);
1540        State(val & !RX_TASK_SET)
1541    }
1542
1543    fn is_closed(self) -> bool {
1544        self.0 & CLOSED == CLOSED
1545    }
1546
1547    fn set_closed(cell: &AtomicUsize) -> State {
1548        // Acquire because we want all later writes (attempting to poll) to be
1549        // ordered after this.
1550        let val = cell.fetch_or(CLOSED, Acquire);
1551        State(val)
1552    }
1553
1554    fn set_tx_task(cell: &AtomicUsize) -> State {
1555        let val = cell.fetch_or(TX_TASK_SET, AcqRel);
1556        State(val | TX_TASK_SET)
1557    }
1558
1559    fn unset_tx_task(cell: &AtomicUsize) -> State {
1560        let val = cell.fetch_and(!TX_TASK_SET, AcqRel);
1561        State(val & !TX_TASK_SET)
1562    }
1563
1564    fn is_tx_task_set(self) -> bool {
1565        self.0 & TX_TASK_SET == TX_TASK_SET
1566    }
1567
1568    fn as_usize(self) -> usize {
1569        self.0
1570    }
1571
1572    fn load(cell: &AtomicUsize, order: Ordering) -> State {
1573        let val = cell.load(order);
1574        State(val)
1575    }
1576}
1577
1578impl fmt::Debug for State {
1579    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1580        fmt.debug_struct("State")
1581            .field("is_complete", &self.is_complete())
1582            .field("is_closed", &self.is_closed())
1583            .field("is_rx_task_set", &self.is_rx_task_set())
1584            .field("is_tx_task_set", &self.is_tx_task_set())
1585            .finish()
1586    }
1587}