tokio/sync/oneshot.rs
1#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
2
3//! A one-shot channel is used for sending a single message between
4//! asynchronous tasks. The [`channel`] function is used to create a
5//! [`Sender`] and [`Receiver`] handle pair that form the channel.
6//!
7//! The `Sender` handle is used by the producer to send the value.
8//! The `Receiver` handle is used by the consumer to receive the value.
9//!
10//! Each handle can be used on separate tasks.
11//!
12//! Since the `send` method is not async, it can be used anywhere. This includes
13//! sending between two runtimes, and using it from non-async code.
14//!
15//! If the [`Receiver`] is closed before receiving a message which has already
16//! been sent, the message will remain in the channel until the receiver is
17//! dropped, at which point the message will be dropped immediately.
18//!
19//! # Examples
20//!
21//! ```
22//! use tokio::sync::oneshot;
23//!
24//! # #[tokio::main(flavor = "current_thread")]
25//! # async fn main() {
26//! let (tx, rx) = oneshot::channel();
27//!
28//! tokio::spawn(async move {
29//! if let Err(_) = tx.send(3) {
30//! println!("the receiver dropped");
31//! }
32//! });
33//!
34//! match rx.await {
35//! Ok(v) => println!("got = {:?}", v),
36//! Err(_) => println!("the sender dropped"),
37//! }
38//! # }
39//! ```
40//!
41//! If the sender is dropped without sending, the receiver will fail with
42//! [`error::RecvError`]:
43//!
44//! ```
45//! use tokio::sync::oneshot;
46//!
47//! # #[tokio::main(flavor = "current_thread")]
48//! # async fn main() {
49//! let (tx, rx) = oneshot::channel::<u32>();
50//!
51//! tokio::spawn(async move {
52//! drop(tx);
53//! });
54//!
55//! match rx.await {
56//! Ok(_) => panic!("This doesn't happen"),
57//! Err(_) => println!("the sender dropped"),
58//! }
59//! # }
60//! ```
61//!
62//! To use a `oneshot` channel in a `tokio::select!` loop, add `&mut` in front of
63//! the channel.
64//!
65//! ```
66//! use tokio::sync::oneshot;
67//! use tokio::time::{interval, sleep, Duration};
68//!
69//! # #[tokio::main(flavor = "current_thread")]
70//! # async fn _doc() {}
71//! # #[tokio::main(flavor = "current_thread", start_paused = true)]
72//! # async fn main() {
73//! let (send, mut recv) = oneshot::channel();
74//! let mut interval = interval(Duration::from_millis(100));
75//!
76//! # let handle =
77//! tokio::spawn(async move {
78//! sleep(Duration::from_secs(1)).await;
79//! send.send("shut down").unwrap();
80//! });
81//!
82//! loop {
83//! tokio::select! {
84//! _ = interval.tick() => println!("Another 100ms"),
85//! msg = &mut recv => {
86//! println!("Got message: {}", msg.unwrap());
87//! break;
88//! }
89//! }
90//! }
91//! # handle.await.unwrap();
92//! # }
93//! ```
94//!
95//! To use a `Sender` from a destructor, put it in an [`Option`] and call
96//! [`Option::take`].
97//!
98//! ```
99//! use tokio::sync::oneshot;
100//!
101//! struct SendOnDrop {
102//! sender: Option<oneshot::Sender<&'static str>>,
103//! }
104//! impl Drop for SendOnDrop {
105//! fn drop(&mut self) {
106//! if let Some(sender) = self.sender.take() {
107//! // Using `let _ =` to ignore send errors.
108//! let _ = sender.send("I got dropped!");
109//! }
110//! }
111//! }
112//!
113//! # #[tokio::main(flavor = "current_thread")]
114//! # async fn _doc() {}
115//! # #[tokio::main(flavor = "current_thread")]
116//! # async fn main() {
117//! let (send, recv) = oneshot::channel();
118//!
119//! let send_on_drop = SendOnDrop { sender: Some(send) };
120//! drop(send_on_drop);
121//!
122//! assert_eq!(recv.await, Ok("I got dropped!"));
123//! # }
124//! ```
125
126use crate::loom::cell::UnsafeCell;
127use crate::loom::sync::atomic::AtomicUsize;
128use crate::loom::sync::Arc;
129#[cfg(all(tokio_unstable, feature = "tracing"))]
130use crate::util::trace;
131
132use std::fmt;
133use std::future::Future;
134use std::mem::MaybeUninit;
135use std::pin::Pin;
136use std::sync::atomic::Ordering::{self, AcqRel, Acquire};
137use std::task::Poll::{Pending, Ready};
138use std::task::{ready, Context, Poll, Waker};
139
140/// Sends a value to the associated [`Receiver`].
141///
142/// A pair of both a [`Sender`] and a [`Receiver`] are created by the
143/// [`channel`](fn@channel) function.
144///
145/// # Examples
146///
147/// ```
148/// use tokio::sync::oneshot;
149///
150/// # #[tokio::main(flavor = "current_thread")]
151/// # async fn main() {
152/// let (tx, rx) = oneshot::channel();
153///
154/// tokio::spawn(async move {
155/// if let Err(_) = tx.send(3) {
156/// println!("the receiver dropped");
157/// }
158/// });
159///
160/// match rx.await {
161/// Ok(v) => println!("got = {:?}", v),
162/// Err(_) => println!("the sender dropped"),
163/// }
164/// # }
165/// ```
166///
167/// If the sender is dropped without sending, the receiver will fail with
168/// [`error::RecvError`]:
169///
170/// ```
171/// use tokio::sync::oneshot;
172///
173/// # #[tokio::main(flavor = "current_thread")]
174/// # async fn main() {
175/// let (tx, rx) = oneshot::channel::<u32>();
176///
177/// tokio::spawn(async move {
178/// drop(tx);
179/// });
180///
181/// match rx.await {
182/// Ok(_) => panic!("This doesn't happen"),
183/// Err(_) => println!("the sender dropped"),
184/// }
185/// # }
186/// ```
187///
188/// To use a `Sender` from a destructor, put it in an [`Option`] and call
189/// [`Option::take`].
190///
191/// ```
192/// use tokio::sync::oneshot;
193///
194/// struct SendOnDrop {
195/// sender: Option<oneshot::Sender<&'static str>>,
196/// }
197/// impl Drop for SendOnDrop {
198/// fn drop(&mut self) {
199/// if let Some(sender) = self.sender.take() {
200/// // Using `let _ =` to ignore send errors.
201/// let _ = sender.send("I got dropped!");
202/// }
203/// }
204/// }
205///
206/// # #[tokio::main(flavor = "current_thread")]
207/// # async fn _doc() {}
208/// # #[tokio::main(flavor = "current_thread")]
209/// # async fn main() {
210/// let (send, recv) = oneshot::channel();
211///
212/// let send_on_drop = SendOnDrop { sender: Some(send) };
213/// drop(send_on_drop);
214///
215/// assert_eq!(recv.await, Ok("I got dropped!"));
216/// # }
217/// ```
218///
219/// [`Option`]: std::option::Option
220/// [`Option::take`]: std::option::Option::take
221#[derive(Debug)]
222pub struct Sender<T> {
223 inner: Option<Arc<Inner<T>>>,
224 #[cfg(all(tokio_unstable, feature = "tracing"))]
225 resource_span: tracing::Span,
226}
227
228/// Receives a value from the associated [`Sender`].
229///
230/// A pair of both a [`Sender`] and a [`Receiver`] are created by the
231/// [`channel`](fn@channel) function.
232///
233/// This channel has no `recv` method because the receiver itself implements the
234/// [`Future`] trait. To receive a `Result<T, `[`error::RecvError`]`>`, `.await` the `Receiver` object directly.
235///
236/// The `poll` method on the `Future` trait is allowed to spuriously return
237/// `Poll::Pending` even if the message has been sent. If such a spurious
238/// failure happens, then the caller will be woken when the spurious failure has
239/// been resolved so that the caller can attempt to receive the message again.
240/// Note that receiving such a wakeup does not guarantee that the next call will
241/// succeed — it could fail with another spurious failure. (A spurious failure
242/// does not mean that the message is lost. It is just delayed.)
243///
244/// [`Future`]: trait@std::future::Future
245///
246/// # Examples
247///
248/// ```
249/// use tokio::sync::oneshot;
250///
251/// # #[tokio::main(flavor = "current_thread")]
252/// # async fn main() {
253/// let (tx, rx) = oneshot::channel();
254///
255/// tokio::spawn(async move {
256/// if let Err(_) = tx.send(3) {
257/// println!("the receiver dropped");
258/// }
259/// });
260///
261/// match rx.await {
262/// Ok(v) => println!("got = {:?}", v),
263/// Err(_) => println!("the sender dropped"),
264/// }
265/// # }
266/// ```
267///
268/// If the sender is dropped without sending, the receiver will fail with
269/// [`error::RecvError`]:
270///
271/// ```
272/// use tokio::sync::oneshot;
273///
274/// # #[tokio::main(flavor = "current_thread")]
275/// # async fn main() {
276/// let (tx, rx) = oneshot::channel::<u32>();
277///
278/// tokio::spawn(async move {
279/// drop(tx);
280/// });
281///
282/// match rx.await {
283/// Ok(_) => panic!("This doesn't happen"),
284/// Err(_) => println!("the sender dropped"),
285/// }
286/// # }
287/// ```
288///
289/// To use a `Receiver` in a `tokio::select!` loop, add `&mut` in front of the
290/// channel.
291///
292/// ```
293/// use tokio::sync::oneshot;
294/// use tokio::time::{interval, sleep, Duration};
295///
296/// # #[tokio::main(flavor = "current_thread")]
297/// # async fn _doc() {}
298/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
299/// # async fn main() {
300/// let (send, mut recv) = oneshot::channel();
301/// let mut interval = interval(Duration::from_millis(100));
302///
303/// # let handle =
304/// tokio::spawn(async move {
305/// sleep(Duration::from_secs(1)).await;
306/// send.send("shut down").unwrap();
307/// });
308///
309/// loop {
310/// tokio::select! {
311/// _ = interval.tick() => println!("Another 100ms"),
312/// msg = &mut recv => {
313/// println!("Got message: {}", msg.unwrap());
314/// break;
315/// }
316/// }
317/// }
318/// # handle.await.unwrap();
319/// # }
320/// ```
321#[derive(Debug)]
322pub struct Receiver<T> {
323 inner: Option<Arc<Inner<T>>>,
324 #[cfg(all(tokio_unstable, feature = "tracing"))]
325 resource_span: tracing::Span,
326 #[cfg(all(tokio_unstable, feature = "tracing"))]
327 async_op_span: tracing::Span,
328 #[cfg(all(tokio_unstable, feature = "tracing"))]
329 async_op_poll_span: tracing::Span,
330}
331
332pub mod error {
333 //! `Oneshot` error types.
334
335 use std::fmt;
336
337 /// Error returned by the `Future` implementation for `Receiver`.
338 ///
339 /// This error is returned by the receiver when the sender is dropped without sending.
340 #[derive(Debug, Eq, PartialEq, Clone)]
341 pub struct RecvError(pub(super) ());
342
343 /// Error returned by the `try_recv` function on `Receiver`.
344 #[derive(Debug, Eq, PartialEq, Clone)]
345 pub enum TryRecvError {
346 /// The send half of the channel has not yet sent a value.
347 Empty,
348
349 /// The send half of the channel was dropped without sending a value.
350 Closed,
351 }
352
353 // ===== impl RecvError =====
354
355 impl fmt::Display for RecvError {
356 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
357 write!(fmt, "channel closed")
358 }
359 }
360
361 impl std::error::Error for RecvError {}
362
363 // ===== impl TryRecvError =====
364
365 impl fmt::Display for TryRecvError {
366 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
367 match self {
368 TryRecvError::Empty => write!(fmt, "channel empty"),
369 TryRecvError::Closed => write!(fmt, "channel closed"),
370 }
371 }
372 }
373
374 impl std::error::Error for TryRecvError {}
375}
376
377use self::error::*;
378
379struct Inner<T> {
380 /// Manages the state of the inner cell.
381 state: AtomicUsize,
382
383 /// The value. This is set by `Sender` and read by `Receiver`. The state of
384 /// the cell is tracked by `state`.
385 value: UnsafeCell<Option<T>>,
386
387 /// The task to notify when the receiver drops without consuming the value.
388 ///
389 /// ## Safety
390 ///
391 /// The `TX_TASK_SET` bit in the `state` field is set if this field is
392 /// initialized. If that bit is unset, this field may be uninitialized.
393 tx_task: Task,
394
395 /// The task to notify when the value is sent.
396 ///
397 /// ## Safety
398 ///
399 /// The `RX_TASK_SET` bit in the `state` field is set if this field is
400 /// initialized. If that bit is unset, this field may be uninitialized.
401 rx_task: Task,
402}
403
404struct Task(UnsafeCell<MaybeUninit<Waker>>);
405
406impl Task {
407 /// # Safety
408 ///
409 /// The caller must do the necessary synchronization to ensure that
410 /// the [`Self::0`] contains the valid [`Waker`] during the call.
411 unsafe fn will_wake(&self, cx: &mut Context<'_>) -> bool {
412 unsafe { self.with_task(|w| w.will_wake(cx.waker())) }
413 }
414
415 /// # Safety
416 ///
417 /// The caller must do the necessary synchronization to ensure that
418 /// the [`Self::0`] contains the valid [`Waker`] during the call.
419 unsafe fn with_task<F, R>(&self, f: F) -> R
420 where
421 F: FnOnce(&Waker) -> R,
422 {
423 self.0.with(|ptr| {
424 let waker: *const Waker = unsafe { (*ptr).as_ptr() };
425 f(unsafe { &*waker })
426 })
427 }
428
429 /// # Safety
430 ///
431 /// The caller must do the necessary synchronization to ensure that
432 /// the [`Self::0`] contains the valid [`Waker`] during the call.
433 unsafe fn drop_task(&self) {
434 self.0.with_mut(|ptr| {
435 let ptr: *mut Waker = unsafe { (*ptr).as_mut_ptr() };
436 unsafe {
437 ptr.drop_in_place();
438 }
439 });
440 }
441
442 /// # Safety
443 ///
444 /// The caller must do the necessary synchronization to ensure that
445 /// the [`Self::0`] contains the valid [`Waker`] during the call.
446 unsafe fn set_task(&self, cx: &mut Context<'_>) {
447 self.0.with_mut(|ptr| {
448 let ptr: *mut Waker = unsafe { (*ptr).as_mut_ptr() };
449 unsafe {
450 ptr.write(cx.waker().clone());
451 }
452 });
453 }
454}
455
456#[derive(Clone, Copy)]
457struct State(usize);
458
459/// Creates a new one-shot channel for sending single values across asynchronous
460/// tasks.
461///
462/// The function returns separate "send" and "receive" handles. The `Sender`
463/// handle is used by the producer to send the value. The `Receiver` handle is
464/// used by the consumer to receive the value.
465///
466/// Each handle can be used on separate tasks.
467///
468/// # Examples
469///
470/// ```
471/// use tokio::sync::oneshot;
472///
473/// # #[tokio::main(flavor = "current_thread")]
474/// # async fn main() {
475/// let (tx, rx) = oneshot::channel();
476///
477/// tokio::spawn(async move {
478/// if let Err(_) = tx.send(3) {
479/// println!("the receiver dropped");
480/// }
481/// });
482///
483/// match rx.await {
484/// Ok(v) => println!("got = {:?}", v),
485/// Err(_) => println!("the sender dropped"),
486/// }
487/// # }
488/// ```
489#[track_caller]
490pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
491 #[cfg(all(tokio_unstable, feature = "tracing"))]
492 let resource_span = {
493 let location = std::panic::Location::caller();
494
495 let resource_span = tracing::trace_span!(
496 parent: None,
497 "runtime.resource",
498 concrete_type = "Sender|Receiver",
499 kind = "Sync",
500 loc.file = location.file(),
501 loc.line = location.line(),
502 loc.col = location.column(),
503 );
504
505 resource_span.in_scope(|| {
506 tracing::trace!(
507 target: "runtime::resource::state_update",
508 tx_dropped = false,
509 tx_dropped.op = "override",
510 )
511 });
512
513 resource_span.in_scope(|| {
514 tracing::trace!(
515 target: "runtime::resource::state_update",
516 rx_dropped = false,
517 rx_dropped.op = "override",
518 )
519 });
520
521 resource_span.in_scope(|| {
522 tracing::trace!(
523 target: "runtime::resource::state_update",
524 value_sent = false,
525 value_sent.op = "override",
526 )
527 });
528
529 resource_span.in_scope(|| {
530 tracing::trace!(
531 target: "runtime::resource::state_update",
532 value_received = false,
533 value_received.op = "override",
534 )
535 });
536
537 resource_span
538 };
539
540 let inner = Arc::new(Inner {
541 state: AtomicUsize::new(State::new().as_usize()),
542 value: UnsafeCell::new(None),
543 tx_task: Task(UnsafeCell::new(MaybeUninit::uninit())),
544 rx_task: Task(UnsafeCell::new(MaybeUninit::uninit())),
545 });
546
547 let tx = Sender {
548 inner: Some(inner.clone()),
549 #[cfg(all(tokio_unstable, feature = "tracing"))]
550 resource_span: resource_span.clone(),
551 };
552
553 #[cfg(all(tokio_unstable, feature = "tracing"))]
554 let async_op_span = resource_span
555 .in_scope(|| tracing::trace_span!("runtime.resource.async_op", source = "Receiver::await"));
556
557 #[cfg(all(tokio_unstable, feature = "tracing"))]
558 let async_op_poll_span =
559 async_op_span.in_scope(|| tracing::trace_span!("runtime.resource.async_op.poll"));
560
561 let rx = Receiver {
562 inner: Some(inner),
563 #[cfg(all(tokio_unstable, feature = "tracing"))]
564 resource_span,
565 #[cfg(all(tokio_unstable, feature = "tracing"))]
566 async_op_span,
567 #[cfg(all(tokio_unstable, feature = "tracing"))]
568 async_op_poll_span,
569 };
570
571 (tx, rx)
572}
573
574impl<T> Sender<T> {
575 /// Attempts to send a value on this channel, returning it back if it could
576 /// not be sent.
577 ///
578 /// This method consumes `self` as only one value may ever be sent on a `oneshot`
579 /// channel. It is not marked async because sending a message to an `oneshot`
580 /// channel never requires any form of waiting. Because of this, the `send`
581 /// method can be used in both synchronous and asynchronous code without
582 /// problems.
583 ///
584 /// A successful send occurs when it is determined that the other end of the
585 /// channel has not hung up already. An unsuccessful send would be one where
586 /// the corresponding receiver has already been deallocated. Note that a
587 /// return value of `Err` means that the data will never be received, but
588 /// a return value of `Ok` does *not* mean that the data will be received.
589 /// It is possible for the corresponding receiver to hang up immediately
590 /// after this function returns `Ok`.
591 ///
592 /// # Examples
593 ///
594 /// Send a value to another task
595 ///
596 /// ```
597 /// use tokio::sync::oneshot;
598 ///
599 /// # #[tokio::main(flavor = "current_thread")]
600 /// # async fn main() {
601 /// let (tx, rx) = oneshot::channel();
602 ///
603 /// tokio::spawn(async move {
604 /// if let Err(_) = tx.send(3) {
605 /// println!("the receiver dropped");
606 /// }
607 /// });
608 ///
609 /// match rx.await {
610 /// Ok(v) => println!("got = {:?}", v),
611 /// Err(_) => println!("the sender dropped"),
612 /// }
613 /// # }
614 /// ```
615 pub fn send(mut self, t: T) -> Result<(), T> {
616 let inner = self.inner.take().unwrap();
617
618 inner.value.with_mut(|ptr| unsafe {
619 // SAFETY: The receiver will not access the `UnsafeCell` unless the
620 // channel has been marked as "complete" (the `VALUE_SENT` state bit
621 // is set).
622 // That bit is only set by the sender later on in this method, and
623 // calling this method consumes `self`. Therefore, if it was possible to
624 // call this method, we know that the `VALUE_SENT` bit is unset, and
625 // the receiver is not currently accessing the `UnsafeCell`.
626 *ptr = Some(t);
627 });
628
629 if !inner.complete() {
630 unsafe {
631 // SAFETY: The receiver will not access the `UnsafeCell` unless
632 // the channel has been marked as "complete". Calling
633 // `complete()` will return true if this bit is set, and false
634 // if it is not set. Thus, if `complete()` returned false, it is
635 // safe for us to access the value, because we know that the
636 // receiver will not.
637 return Err(inner.consume_value().unwrap());
638 }
639 }
640
641 #[cfg(all(tokio_unstable, feature = "tracing"))]
642 self.resource_span.in_scope(|| {
643 tracing::trace!(
644 target: "runtime::resource::state_update",
645 value_sent = true,
646 value_sent.op = "override",
647 )
648 });
649
650 Ok(())
651 }
652
653 /// Waits for the associated [`Receiver`] handle to close.
654 ///
655 /// A [`Receiver`] is closed by either calling [`close`] explicitly or the
656 /// [`Receiver`] value is dropped.
657 ///
658 /// This function is useful when paired with `select!` to abort a
659 /// computation when the receiver is no longer interested in the result.
660 ///
661 /// # Return
662 ///
663 /// Returns a `Future` which must be awaited on.
664 ///
665 /// [`Receiver`]: Receiver
666 /// [`close`]: Receiver::close
667 ///
668 /// # Examples
669 ///
670 /// Basic usage
671 ///
672 /// ```
673 /// use tokio::sync::oneshot;
674 ///
675 /// # #[tokio::main(flavor = "current_thread")]
676 /// # async fn main() {
677 /// let (mut tx, rx) = oneshot::channel::<()>();
678 ///
679 /// tokio::spawn(async move {
680 /// drop(rx);
681 /// });
682 ///
683 /// tx.closed().await;
684 /// println!("the receiver dropped");
685 /// # }
686 /// ```
687 ///
688 /// Paired with select
689 ///
690 /// ```
691 /// use tokio::sync::oneshot;
692 /// use tokio::time::{self, Duration};
693 ///
694 /// async fn compute() -> String {
695 /// // Complex computation returning a `String`
696 /// # "hello".to_string()
697 /// }
698 ///
699 /// # #[tokio::main(flavor = "current_thread")]
700 /// # async fn main() {
701 /// let (mut tx, rx) = oneshot::channel();
702 ///
703 /// tokio::spawn(async move {
704 /// tokio::select! {
705 /// _ = tx.closed() => {
706 /// // The receiver dropped, no need to do any further work
707 /// }
708 /// value = compute() => {
709 /// // The send can fail if the channel was closed at the exact same
710 /// // time as when compute() finished, so just ignore the failure.
711 /// let _ = tx.send(value);
712 /// }
713 /// }
714 /// });
715 ///
716 /// // Wait for up to 10 seconds
717 /// let _ = time::timeout(Duration::from_secs(10), rx).await;
718 /// # }
719 /// ```
720 pub async fn closed(&mut self) {
721 use std::future::poll_fn;
722
723 #[cfg(all(tokio_unstable, feature = "tracing"))]
724 let resource_span = self.resource_span.clone();
725 #[cfg(all(tokio_unstable, feature = "tracing"))]
726 let closed = trace::async_op(
727 || poll_fn(|cx| self.poll_closed(cx)),
728 resource_span,
729 "Sender::closed",
730 "poll_closed",
731 false,
732 );
733 #[cfg(not(all(tokio_unstable, feature = "tracing")))]
734 let closed = poll_fn(|cx| self.poll_closed(cx));
735
736 closed.await;
737 }
738
739 /// Returns `true` if the associated [`Receiver`] handle has been dropped.
740 ///
741 /// A [`Receiver`] is closed by either calling [`close`] explicitly or the
742 /// [`Receiver`] value is dropped.
743 ///
744 /// If `true` is returned, a call to `send` will always result in an error.
745 ///
746 /// [`Receiver`]: Receiver
747 /// [`close`]: Receiver::close
748 ///
749 /// # Examples
750 ///
751 /// ```
752 /// use tokio::sync::oneshot;
753 ///
754 /// # #[tokio::main(flavor = "current_thread")]
755 /// # async fn main() {
756 /// let (tx, rx) = oneshot::channel();
757 ///
758 /// assert!(!tx.is_closed());
759 ///
760 /// drop(rx);
761 ///
762 /// assert!(tx.is_closed());
763 /// assert!(tx.send("never received").is_err());
764 /// # }
765 /// ```
766 pub fn is_closed(&self) -> bool {
767 let inner = self.inner.as_ref().unwrap();
768
769 let state = State::load(&inner.state, Acquire);
770 state.is_closed()
771 }
772
773 /// Checks whether the `oneshot` channel has been closed, and if not, schedules the
774 /// `Waker` in the provided `Context` to receive a notification when the channel is
775 /// closed.
776 ///
777 /// A [`Receiver`] is closed by either calling [`close`] explicitly, or when the
778 /// [`Receiver`] value is dropped.
779 ///
780 /// Note that on multiple calls to poll, only the `Waker` from the `Context` passed
781 /// to the most recent call will be scheduled to receive a wakeup.
782 ///
783 /// [`Receiver`]: struct@crate::sync::oneshot::Receiver
784 /// [`close`]: fn@crate::sync::oneshot::Receiver::close
785 ///
786 /// # Return value
787 ///
788 /// This function returns:
789 ///
790 /// * `Poll::Pending` if the channel is still open.
791 /// * `Poll::Ready(())` if the channel is closed.
792 ///
793 /// # Examples
794 ///
795 /// ```
796 /// use tokio::sync::oneshot;
797 ///
798 /// use std::future::poll_fn;
799 ///
800 /// # #[tokio::main(flavor = "current_thread")]
801 /// # async fn main() {
802 /// let (mut tx, mut rx) = oneshot::channel::<()>();
803 ///
804 /// tokio::spawn(async move {
805 /// rx.close();
806 /// });
807 ///
808 /// poll_fn(|cx| tx.poll_closed(cx)).await;
809 ///
810 /// println!("the receiver dropped");
811 /// # }
812 /// ```
813 pub fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
814 ready!(crate::trace::trace_leaf(cx));
815
816 // Keep track of task budget
817 let coop = ready!(crate::task::coop::poll_proceed(cx));
818
819 let inner = self.inner.as_ref().unwrap();
820
821 let mut state = State::load(&inner.state, Acquire);
822
823 if state.is_closed() {
824 coop.made_progress();
825 return Ready(());
826 }
827
828 if state.is_tx_task_set() {
829 let will_notify = unsafe { inner.tx_task.will_wake(cx) };
830
831 if !will_notify {
832 state = State::unset_tx_task(&inner.state);
833
834 if state.is_closed() {
835 // Set the flag again so that the waker is released in drop
836 State::set_tx_task(&inner.state);
837 coop.made_progress();
838 return Ready(());
839 } else {
840 unsafe { inner.tx_task.drop_task() };
841 }
842 }
843 }
844
845 if !state.is_tx_task_set() {
846 // Attempt to set the task
847 unsafe {
848 inner.tx_task.set_task(cx);
849 }
850
851 // Update the state
852 state = State::set_tx_task(&inner.state);
853
854 if state.is_closed() {
855 coop.made_progress();
856 return Ready(());
857 }
858 }
859
860 Pending
861 }
862}
863
864impl<T> Drop for Sender<T> {
865 fn drop(&mut self) {
866 if let Some(inner) = self.inner.as_ref() {
867 inner.complete();
868 #[cfg(all(tokio_unstable, feature = "tracing"))]
869 self.resource_span.in_scope(|| {
870 tracing::trace!(
871 target: "runtime::resource::state_update",
872 tx_dropped = true,
873 tx_dropped.op = "override",
874 )
875 });
876 }
877 }
878}
879
880impl<T> Receiver<T> {
881 /// Prevents the associated [`Sender`] handle from sending a value.
882 ///
883 /// Any `send` operation which happens after calling `close` is guaranteed
884 /// to fail. After calling `close`, [`try_recv`] should be called to
885 /// receive a value if one was sent **before** the call to `close`
886 /// completed.
887 ///
888 /// This function is useful to perform a graceful shutdown and ensure that a
889 /// value will not be sent into the channel and never received.
890 ///
891 /// `close` is no-op if a message is already received or the channel
892 /// is already closed.
893 ///
894 /// [`Sender`]: Sender
895 /// [`try_recv`]: Receiver::try_recv
896 ///
897 /// # Examples
898 ///
899 /// Prevent a value from being sent
900 ///
901 /// ```
902 /// use tokio::sync::oneshot;
903 /// use tokio::sync::oneshot::error::TryRecvError;
904 ///
905 /// # #[tokio::main(flavor = "current_thread")]
906 /// # async fn main() {
907 /// let (tx, mut rx) = oneshot::channel();
908 ///
909 /// assert!(!tx.is_closed());
910 ///
911 /// rx.close();
912 ///
913 /// assert!(tx.is_closed());
914 /// assert!(tx.send("never received").is_err());
915 ///
916 /// match rx.try_recv() {
917 /// Err(TryRecvError::Closed) => {}
918 /// _ => unreachable!(),
919 /// }
920 /// # }
921 /// ```
922 ///
923 /// Receive a value sent **before** calling `close`
924 ///
925 /// ```
926 /// use tokio::sync::oneshot;
927 ///
928 /// # #[tokio::main(flavor = "current_thread")]
929 /// # async fn main() {
930 /// let (tx, mut rx) = oneshot::channel();
931 ///
932 /// assert!(tx.send("will receive").is_ok());
933 ///
934 /// rx.close();
935 ///
936 /// let msg = rx.try_recv().unwrap();
937 /// assert_eq!(msg, "will receive");
938 /// # }
939 /// ```
940 pub fn close(&mut self) {
941 if let Some(inner) = self.inner.as_ref() {
942 inner.close();
943 #[cfg(all(tokio_unstable, feature = "tracing"))]
944 self.resource_span.in_scope(|| {
945 tracing::trace!(
946 target: "runtime::resource::state_update",
947 rx_dropped = true,
948 rx_dropped.op = "override",
949 )
950 });
951 }
952 }
953
954 /// Checks if this receiver is terminated.
955 ///
956 /// This function returns true if this receiver has already yielded a [`Poll::Ready`] result.
957 /// If so, this receiver should no longer be polled.
958 ///
959 /// # Examples
960 ///
961 /// Sending a value and polling it.
962 ///
963 /// ```
964 /// use tokio::sync::oneshot;
965 ///
966 /// use std::task::Poll;
967 ///
968 /// # #[tokio::main(flavor = "current_thread")]
969 /// # async fn main() {
970 /// let (tx, mut rx) = oneshot::channel();
971 ///
972 /// // A receiver is not terminated when it is initialized.
973 /// assert!(!rx.is_terminated());
974 ///
975 /// // A receiver is not terminated it is polled and is still pending.
976 /// let poll = futures::poll!(&mut rx);
977 /// assert_eq!(poll, Poll::Pending);
978 /// assert!(!rx.is_terminated());
979 ///
980 /// // A receiver is not terminated if a value has been sent, but not yet read.
981 /// tx.send(0).unwrap();
982 /// assert!(!rx.is_terminated());
983 ///
984 /// // A receiver *is* terminated after it has been polled and yielded a value.
985 /// assert_eq!((&mut rx).await, Ok(0));
986 /// assert!(rx.is_terminated());
987 /// # }
988 /// ```
989 ///
990 /// Dropping the sender.
991 ///
992 /// ```
993 /// use tokio::sync::oneshot;
994 ///
995 /// # #[tokio::main(flavor = "current_thread")]
996 /// # async fn main() {
997 /// let (tx, mut rx) = oneshot::channel::<()>();
998 ///
999 /// // A receiver is not immediately terminated when the sender is dropped.
1000 /// drop(tx);
1001 /// assert!(!rx.is_terminated());
1002 ///
1003 /// // A receiver *is* terminated after it has been polled and yielded an error.
1004 /// let _ = (&mut rx).await.unwrap_err();
1005 /// assert!(rx.is_terminated());
1006 /// # }
1007 /// ```
1008 pub fn is_terminated(&self) -> bool {
1009 self.inner.is_none()
1010 }
1011
1012 /// Checks if a channel is empty.
1013 ///
1014 /// This method returns `true` if the channel has no messages.
1015 ///
1016 /// It is not necessarily safe to poll an empty receiver, which may have
1017 /// already yielded a value. Use [`is_terminated()`][Self::is_terminated]
1018 /// to check whether or not a receiver can be safely polled, instead.
1019 ///
1020 /// # Examples
1021 ///
1022 /// Sending a value.
1023 ///
1024 /// ```
1025 /// use tokio::sync::oneshot;
1026 ///
1027 /// # #[tokio::main(flavor = "current_thread")]
1028 /// # async fn main() {
1029 /// let (tx, mut rx) = oneshot::channel();
1030 /// assert!(rx.is_empty());
1031 ///
1032 /// tx.send(0).unwrap();
1033 /// assert!(!rx.is_empty());
1034 ///
1035 /// let _ = (&mut rx).await;
1036 /// assert!(rx.is_empty());
1037 /// # }
1038 /// ```
1039 ///
1040 /// Dropping the sender.
1041 ///
1042 /// ```
1043 /// use tokio::sync::oneshot;
1044 ///
1045 /// # #[tokio::main(flavor = "current_thread")]
1046 /// # async fn main() {
1047 /// let (tx, mut rx) = oneshot::channel::<()>();
1048 ///
1049 /// // A channel is empty if the sender is dropped.
1050 /// drop(tx);
1051 /// assert!(rx.is_empty());
1052 ///
1053 /// // A closed channel still yields an error, however.
1054 /// (&mut rx).await.expect_err("should yield an error");
1055 /// assert!(rx.is_empty());
1056 /// # }
1057 /// ```
1058 ///
1059 /// Terminated channels are empty.
1060 ///
1061 /// ```should_panic,ignore-wasm
1062 /// use tokio::sync::oneshot;
1063 ///
1064 /// #[tokio::main]
1065 /// async fn main() {
1066 /// let (tx, mut rx) = oneshot::channel();
1067 /// tx.send(0).unwrap();
1068 /// let _ = (&mut rx).await;
1069 ///
1070 /// // NB: an empty channel is not necessarily safe to poll!
1071 /// assert!(rx.is_empty());
1072 /// let _ = (&mut rx).await;
1073 /// }
1074 /// ```
1075 pub fn is_empty(&self) -> bool {
1076 let Some(inner) = self.inner.as_ref() else {
1077 // The channel has already terminated.
1078 return true;
1079 };
1080
1081 let state = State::load(&inner.state, Acquire);
1082 if state.is_complete() {
1083 // SAFETY: If `state.is_complete()` returns true, then the
1084 // `VALUE_SENT` bit has been set and the sender side of the
1085 // channel will no longer attempt to access the inner
1086 // `UnsafeCell`. Therefore, it is now safe for us to access the
1087 // cell.
1088 //
1089 // The channel is empty if it does not have a value.
1090 unsafe { !inner.has_value() }
1091 } else {
1092 // The receiver closed the channel or no value has been sent yet.
1093 true
1094 }
1095 }
1096
1097 /// Attempts to receive a value.
1098 ///
1099 /// If a pending value exists in the channel, it is returned. If no value
1100 /// has been sent, the current task **will not** be registered for
1101 /// future notification.
1102 ///
1103 /// This function is useful to call from outside the context of an
1104 /// asynchronous task.
1105 ///
1106 /// Note that unlike the `poll` method, the `try_recv` method cannot fail
1107 /// spuriously. Any send or close event that happens before this call to
1108 /// `try_recv` will be correctly returned to the caller.
1109 ///
1110 /// # Return
1111 ///
1112 /// - `Ok(T)` if a value is pending in the channel.
1113 /// - `Err(TryRecvError::Empty)` if no value has been sent yet.
1114 /// - `Err(TryRecvError::Closed)` if the sender has dropped without sending
1115 /// a value, or if the message has already been received.
1116 ///
1117 /// # Examples
1118 ///
1119 /// `try_recv` before a value is sent, then after.
1120 ///
1121 /// ```
1122 /// use tokio::sync::oneshot;
1123 /// use tokio::sync::oneshot::error::TryRecvError;
1124 ///
1125 /// # #[tokio::main(flavor = "current_thread")]
1126 /// # async fn main() {
1127 /// let (tx, mut rx) = oneshot::channel();
1128 ///
1129 /// match rx.try_recv() {
1130 /// // The channel is currently empty
1131 /// Err(TryRecvError::Empty) => {}
1132 /// _ => unreachable!(),
1133 /// }
1134 ///
1135 /// // Send a value
1136 /// tx.send("hello").unwrap();
1137 ///
1138 /// match rx.try_recv() {
1139 /// Ok(value) => assert_eq!(value, "hello"),
1140 /// _ => unreachable!(),
1141 /// }
1142 /// # }
1143 /// ```
1144 ///
1145 /// `try_recv` when the sender dropped before sending a value
1146 ///
1147 /// ```
1148 /// use tokio::sync::oneshot;
1149 /// use tokio::sync::oneshot::error::TryRecvError;
1150 ///
1151 /// # #[tokio::main(flavor = "current_thread")]
1152 /// # async fn main() {
1153 /// let (tx, mut rx) = oneshot::channel::<()>();
1154 ///
1155 /// drop(tx);
1156 ///
1157 /// match rx.try_recv() {
1158 /// // The channel will never receive a value.
1159 /// Err(TryRecvError::Closed) => {}
1160 /// _ => unreachable!(),
1161 /// }
1162 /// # }
1163 /// ```
1164 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
1165 let result = if let Some(inner) = self.inner.as_ref() {
1166 let state = State::load(&inner.state, Acquire);
1167
1168 if state.is_complete() {
1169 // SAFETY: If `state.is_complete()` returns true, then the
1170 // `VALUE_SENT` bit has been set and the sender side of the
1171 // channel will no longer attempt to access the inner
1172 // `UnsafeCell`. Therefore, it is now safe for us to access the
1173 // cell.
1174 match unsafe { inner.consume_value() } {
1175 Some(value) => {
1176 #[cfg(all(tokio_unstable, feature = "tracing"))]
1177 self.resource_span.in_scope(|| {
1178 tracing::trace!(
1179 target: "runtime::resource::state_update",
1180 value_received = true,
1181 value_received.op = "override",
1182 )
1183 });
1184 Ok(value)
1185 }
1186 None => Err(TryRecvError::Closed),
1187 }
1188 } else if state.is_closed() {
1189 Err(TryRecvError::Closed)
1190 } else {
1191 // Not ready, this does not clear `inner`
1192 return Err(TryRecvError::Empty);
1193 }
1194 } else {
1195 Err(TryRecvError::Closed)
1196 };
1197
1198 self.inner = None;
1199 result
1200 }
1201
1202 /// Blocking receive to call outside of asynchronous contexts.
1203 ///
1204 /// # Panics
1205 ///
1206 /// This function panics if called within an asynchronous execution
1207 /// context.
1208 ///
1209 /// # Examples
1210 ///
1211 /// ```
1212 /// # #[cfg(not(target_family = "wasm"))]
1213 /// # {
1214 /// use std::thread;
1215 /// use tokio::sync::oneshot;
1216 ///
1217 /// #[tokio::main]
1218 /// async fn main() {
1219 /// let (tx, rx) = oneshot::channel::<u8>();
1220 ///
1221 /// let sync_code = thread::spawn(move || {
1222 /// assert_eq!(Ok(10), rx.blocking_recv());
1223 /// });
1224 ///
1225 /// let _ = tx.send(10);
1226 /// sync_code.join().unwrap();
1227 /// }
1228 /// # }
1229 /// ```
1230 #[track_caller]
1231 #[cfg(feature = "sync")]
1232 #[cfg_attr(docsrs, doc(alias = "recv_blocking"))]
1233 pub fn blocking_recv(self) -> Result<T, RecvError> {
1234 crate::future::block_on(self)
1235 }
1236}
1237
1238impl<T> Drop for Receiver<T> {
1239 fn drop(&mut self) {
1240 if let Some(inner) = self.inner.as_ref() {
1241 let state = inner.close();
1242
1243 if state.is_complete() {
1244 // SAFETY: we have ensured that the `VALUE_SENT` bit has been set,
1245 // so only the receiver can access the value.
1246 drop(unsafe { inner.consume_value() });
1247 }
1248
1249 #[cfg(all(tokio_unstable, feature = "tracing"))]
1250 self.resource_span.in_scope(|| {
1251 tracing::trace!(
1252 target: "runtime::resource::state_update",
1253 rx_dropped = true,
1254 rx_dropped.op = "override",
1255 )
1256 });
1257 }
1258 }
1259}
1260
1261impl<T> Future for Receiver<T> {
1262 type Output = Result<T, RecvError>;
1263
1264 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1265 // If `inner` is `None`, then `poll()` has already completed.
1266 #[cfg(all(tokio_unstable, feature = "tracing"))]
1267 let _res_span = self.resource_span.clone().entered();
1268 #[cfg(all(tokio_unstable, feature = "tracing"))]
1269 let _ao_span = self.async_op_span.clone().entered();
1270 #[cfg(all(tokio_unstable, feature = "tracing"))]
1271 let _ao_poll_span = self.async_op_poll_span.clone().entered();
1272
1273 let ret = if let Some(inner) = self.as_ref().get_ref().inner.as_ref() {
1274 #[cfg(all(tokio_unstable, feature = "tracing"))]
1275 let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx))).map_err(Into::into);
1276
1277 #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
1278 let res = ready!(inner.poll_recv(cx)).map_err(Into::into);
1279
1280 res
1281 } else {
1282 panic!("called after complete");
1283 };
1284
1285 self.inner = None;
1286 Ready(ret)
1287 }
1288}
1289
1290impl<T> Inner<T> {
1291 fn complete(&self) -> bool {
1292 let prev = State::set_complete(&self.state);
1293
1294 if prev.is_closed() {
1295 return false;
1296 }
1297
1298 if prev.is_rx_task_set() {
1299 // TODO: Consume waker?
1300 unsafe {
1301 self.rx_task.with_task(Waker::wake_by_ref);
1302 }
1303 }
1304
1305 true
1306 }
1307
1308 fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
1309 ready!(crate::trace::trace_leaf(cx));
1310 // Keep track of task budget
1311 let coop = ready!(crate::task::coop::poll_proceed(cx));
1312
1313 // Load the state
1314 let mut state = State::load(&self.state, Acquire);
1315
1316 if state.is_complete() {
1317 coop.made_progress();
1318 match unsafe { self.consume_value() } {
1319 Some(value) => Ready(Ok(value)),
1320 None => Ready(Err(RecvError(()))),
1321 }
1322 } else if state.is_closed() {
1323 coop.made_progress();
1324 Ready(Err(RecvError(())))
1325 } else {
1326 if state.is_rx_task_set() {
1327 let will_notify = unsafe { self.rx_task.will_wake(cx) };
1328
1329 // Check if the task is still the same
1330 if !will_notify {
1331 // Unset the task
1332 state = State::unset_rx_task(&self.state);
1333 if state.is_complete() {
1334 // Set the flag again so that the waker is released in drop
1335 State::set_rx_task(&self.state);
1336
1337 coop.made_progress();
1338 // SAFETY: If `state.is_complete()` returns true, then the
1339 // `VALUE_SENT` bit has been set and the sender side of the
1340 // channel will no longer attempt to access the inner
1341 // `UnsafeCell`. Therefore, it is now safe for us to access the
1342 // cell.
1343 return match unsafe { self.consume_value() } {
1344 Some(value) => Ready(Ok(value)),
1345 None => Ready(Err(RecvError(()))),
1346 };
1347 } else {
1348 unsafe { self.rx_task.drop_task() };
1349 }
1350 }
1351 }
1352
1353 if !state.is_rx_task_set() {
1354 // Attempt to set the task
1355 unsafe {
1356 self.rx_task.set_task(cx);
1357 }
1358
1359 // Update the state
1360 state = State::set_rx_task(&self.state);
1361
1362 if state.is_complete() {
1363 coop.made_progress();
1364 match unsafe { self.consume_value() } {
1365 Some(value) => Ready(Ok(value)),
1366 None => Ready(Err(RecvError(()))),
1367 }
1368 } else {
1369 Pending
1370 }
1371 } else {
1372 Pending
1373 }
1374 }
1375 }
1376
1377 /// Called by `Receiver` to indicate that the value will never be received.
1378 fn close(&self) -> State {
1379 let prev = State::set_closed(&self.state);
1380
1381 if prev.is_tx_task_set() && !prev.is_complete() {
1382 unsafe {
1383 self.tx_task.with_task(Waker::wake_by_ref);
1384 }
1385 }
1386
1387 prev
1388 }
1389
1390 /// Consumes the value. This function does not check `state`.
1391 ///
1392 /// # Safety
1393 ///
1394 /// Calling this method concurrently on multiple threads will result in a
1395 /// data race. The `VALUE_SENT` state bit is used to ensure that only the
1396 /// sender *or* the receiver will call this method at a given point in time.
1397 /// If `VALUE_SENT` is not set, then only the sender may call this method;
1398 /// if it is set, then only the receiver may call this method.
1399 unsafe fn consume_value(&self) -> Option<T> {
1400 self.value.with_mut(|ptr| unsafe { (*ptr).take() })
1401 }
1402
1403 /// Returns true if there is a value. This function does not check `state`.
1404 ///
1405 /// # Safety
1406 ///
1407 /// Calling this method concurrently on multiple threads will result in a
1408 /// data race. The `VALUE_SENT` state bit is used to ensure that only the
1409 /// sender *or* the receiver will call this method at a given point in time.
1410 /// If `VALUE_SENT` is not set, then only the sender may call this method;
1411 /// if it is set, then only the receiver may call this method.
1412 unsafe fn has_value(&self) -> bool {
1413 self.value.with(|ptr| unsafe { (*ptr).is_some() })
1414 }
1415}
1416
1417unsafe impl<T: Send> Send for Inner<T> {}
1418unsafe impl<T: Send> Sync for Inner<T> {}
1419
1420fn mut_load(this: &mut AtomicUsize) -> usize {
1421 this.with_mut(|v| *v)
1422}
1423
1424impl<T> Drop for Inner<T> {
1425 fn drop(&mut self) {
1426 let state = State(mut_load(&mut self.state));
1427
1428 if state.is_rx_task_set() {
1429 unsafe {
1430 self.rx_task.drop_task();
1431 }
1432 }
1433
1434 if state.is_tx_task_set() {
1435 unsafe {
1436 self.tx_task.drop_task();
1437 }
1438 }
1439
1440 // SAFETY: we have `&mut self`, and therefore we have
1441 // exclusive access to the value.
1442 unsafe {
1443 // Note: the assertion holds because if the value has been sent by sender,
1444 // we must ensure that the value must have been consumed by the receiver before
1445 // dropping the `Inner`.
1446 debug_assert!(self.consume_value().is_none());
1447 }
1448 }
1449}
1450
1451impl<T: fmt::Debug> fmt::Debug for Inner<T> {
1452 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1453 use std::sync::atomic::Ordering::Relaxed;
1454
1455 fmt.debug_struct("Inner")
1456 .field("state", &State::load(&self.state, Relaxed))
1457 .finish()
1458 }
1459}
1460
1461/// Indicates that a waker for the receiving task has been set.
1462///
1463/// # Safety
1464///
1465/// If this bit is not set, the `rx_task` field may be uninitialized.
1466const RX_TASK_SET: usize = 0b00001;
1467/// Indicates that a value has been stored in the channel's inner `UnsafeCell`.
1468///
1469/// # Safety
1470///
1471/// This bit controls which side of the channel is permitted to access the
1472/// `UnsafeCell`. If it is set, the `UnsafeCell` may ONLY be accessed by the
1473/// receiver. If this bit is NOT set, the `UnsafeCell` may ONLY be accessed by
1474/// the sender.
1475const VALUE_SENT: usize = 0b00010;
1476const CLOSED: usize = 0b00100;
1477
1478/// Indicates that a waker for the sending task has been set.
1479///
1480/// # Safety
1481///
1482/// If this bit is not set, the `tx_task` field may be uninitialized.
1483const TX_TASK_SET: usize = 0b01000;
1484
1485impl State {
1486 fn new() -> State {
1487 State(0)
1488 }
1489
1490 fn is_complete(self) -> bool {
1491 self.0 & VALUE_SENT == VALUE_SENT
1492 }
1493
1494 fn set_complete(cell: &AtomicUsize) -> State {
1495 // This method is a compare-and-swap loop rather than a fetch-or like
1496 // other `set_$WHATEVER` methods on `State`. This is because we must
1497 // check if the state has been closed before setting the `VALUE_SENT`
1498 // bit.
1499 //
1500 // We don't want to set both the `VALUE_SENT` bit if the `CLOSED`
1501 // bit is already set, because `VALUE_SENT` will tell the receiver that
1502 // it's okay to access the inner `UnsafeCell`. Immediately after calling
1503 // `set_complete`, if the channel was closed, the sender will _also_
1504 // access the `UnsafeCell` to take the value back out, so if a
1505 // `poll_recv` or `try_recv` call is occurring concurrently, both
1506 // threads may try to access the `UnsafeCell` if we were to set the
1507 // `VALUE_SENT` bit on a closed channel.
1508 let mut state = cell.load(Ordering::Relaxed);
1509 loop {
1510 if State(state).is_closed() {
1511 break;
1512 }
1513 // TODO: This could be `Release`, followed by an `Acquire` fence *if*
1514 // the `RX_TASK_SET` flag is set. However, `loom` does not support
1515 // fences yet.
1516 match cell.compare_exchange_weak(
1517 state,
1518 state | VALUE_SENT,
1519 Ordering::AcqRel,
1520 Ordering::Acquire,
1521 ) {
1522 Ok(_) => break,
1523 Err(actual) => state = actual,
1524 }
1525 }
1526 State(state)
1527 }
1528
1529 fn is_rx_task_set(self) -> bool {
1530 self.0 & RX_TASK_SET == RX_TASK_SET
1531 }
1532
1533 fn set_rx_task(cell: &AtomicUsize) -> State {
1534 let val = cell.fetch_or(RX_TASK_SET, AcqRel);
1535 State(val | RX_TASK_SET)
1536 }
1537
1538 fn unset_rx_task(cell: &AtomicUsize) -> State {
1539 let val = cell.fetch_and(!RX_TASK_SET, AcqRel);
1540 State(val & !RX_TASK_SET)
1541 }
1542
1543 fn is_closed(self) -> bool {
1544 self.0 & CLOSED == CLOSED
1545 }
1546
1547 fn set_closed(cell: &AtomicUsize) -> State {
1548 // Acquire because we want all later writes (attempting to poll) to be
1549 // ordered after this.
1550 let val = cell.fetch_or(CLOSED, Acquire);
1551 State(val)
1552 }
1553
1554 fn set_tx_task(cell: &AtomicUsize) -> State {
1555 let val = cell.fetch_or(TX_TASK_SET, AcqRel);
1556 State(val | TX_TASK_SET)
1557 }
1558
1559 fn unset_tx_task(cell: &AtomicUsize) -> State {
1560 let val = cell.fetch_and(!TX_TASK_SET, AcqRel);
1561 State(val & !TX_TASK_SET)
1562 }
1563
1564 fn is_tx_task_set(self) -> bool {
1565 self.0 & TX_TASK_SET == TX_TASK_SET
1566 }
1567
1568 fn as_usize(self) -> usize {
1569 self.0
1570 }
1571
1572 fn load(cell: &AtomicUsize, order: Ordering) -> State {
1573 let val = cell.load(order);
1574 State(val)
1575 }
1576}
1577
1578impl fmt::Debug for State {
1579 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1580 fmt.debug_struct("State")
1581 .field("is_complete", &self.is_complete())
1582 .field("is_closed", &self.is_closed())
1583 .field("is_rx_task_set", &self.is_rx_task_set())
1584 .field("is_tx_task_set", &self.is_tx_task_set())
1585 .finish()
1586 }
1587}