tokio/sync/broadcast.rs
1//! A multi-producer, multi-consumer broadcast queue. Each sent value is seen by
2//! all consumers.
3//!
4//! A [`Sender`] is used to broadcast values to **all** connected [`Receiver`]
5//! values. [`Sender`] handles are clone-able, allowing concurrent send and
6//! receive actions. [`Sender`] and [`Receiver`] are both `Send` and `Sync` as
7//! long as `T` is `Send`.
8//!
9//! When a value is sent, **all** [`Receiver`] handles are notified and will
10//! receive the value. The value is stored once inside the channel and cloned on
11//! demand for each receiver. Once all receivers have received a clone of the
12//! value, the value is released from the channel.
13//!
14//! A channel is created by calling [`channel`], specifying the maximum number
15//! of messages the channel can retain at any given time.
16//!
17//! New [`Receiver`] handles are created by calling [`Sender::subscribe`]. The
18//! returned [`Receiver`] will receive values sent **after** the call to
19//! `subscribe`.
20//!
21//! This channel is also suitable for the single-producer multi-consumer
22//! use-case, where a single sender broadcasts values to many receivers.
23//!
24//! ## Lagging
25//!
26//! As sent messages must be retained until **all** [`Receiver`] handles receive
27//! a clone, broadcast channels are susceptible to the "slow receiver" problem.
28//! In this case, all but one receiver are able to receive values at the rate
29//! they are sent. Because one receiver is stalled, the channel starts to fill
30//! up.
31//!
32//! This broadcast channel implementation handles this case by setting a hard
33//! upper bound on the number of values the channel may retain at any given
34//! time. This upper bound is passed to the [`channel`] function as an argument.
35//!
36//! If a value is sent when the channel is at capacity, the oldest value
37//! currently held by the channel is released. This frees up space for the new
38//! value. Any receiver that has not yet seen the released value will return
39//! [`RecvError::Lagged`] the next time [`recv`] is called.
40//!
41//! Once [`RecvError::Lagged`] is returned, the lagging receiver's position is
42//! updated to the oldest value contained by the channel. The next call to
43//! [`recv`] will return this value.
44//!
45//! This behavior enables a receiver to detect when it has lagged so far behind
46//! that data has been dropped. The caller may decide how to respond to this:
47//! either by aborting its task or by tolerating lost messages and resuming
48//! consumption of the channel.
49//!
50//! ## Closing
51//!
52//! When **all** [`Sender`] handles have been dropped, no new values may be
53//! sent. At this point, the channel is "closed". Once a receiver has received
54//! all values retained by the channel, the next call to [`recv`] will return
55//! with [`RecvError::Closed`].
56//!
57//! When a [`Receiver`] handle is dropped, any messages not read by the receiver
58//! will be marked as read. If this receiver was the only one not to have read
59//! that message, the message will be dropped at this point.
60//!
61//! [`Sender`]: crate::sync::broadcast::Sender
62//! [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
63//! [`Receiver`]: crate::sync::broadcast::Receiver
64//! [`channel`]: crate::sync::broadcast::channel
65//! [`RecvError::Lagged`]: crate::sync::broadcast::error::RecvError::Lagged
66//! [`RecvError::Closed`]: crate::sync::broadcast::error::RecvError::Closed
67//! [`recv`]: crate::sync::broadcast::Receiver::recv
68//!
69//! # Examples
70//!
71//! Basic usage
72//!
73//! ```
74//! use tokio::sync::broadcast;
75//!
76//! #[tokio::main]
77//! async fn main() {
78//! let (tx, mut rx1) = broadcast::channel(16);
79//! let mut rx2 = tx.subscribe();
80//!
81//! tokio::spawn(async move {
82//! assert_eq!(rx1.recv().await.unwrap(), 10);
83//! assert_eq!(rx1.recv().await.unwrap(), 20);
84//! });
85//!
86//! tokio::spawn(async move {
87//! assert_eq!(rx2.recv().await.unwrap(), 10);
88//! assert_eq!(rx2.recv().await.unwrap(), 20);
89//! });
90//!
91//! tx.send(10).unwrap();
92//! tx.send(20).unwrap();
93//! }
94//! ```
95//!
96//! Handling lag
97//!
98//! ```
99//! use tokio::sync::broadcast;
100//!
101//! #[tokio::main]
102//! async fn main() {
103//! let (tx, mut rx) = broadcast::channel(2);
104//!
105//! tx.send(10).unwrap();
106//! tx.send(20).unwrap();
107//! tx.send(30).unwrap();
108//!
109//! // The receiver lagged behind
110//! assert!(rx.recv().await.is_err());
111//!
112//! // At this point, we can abort or continue with lost messages
113//!
114//! assert_eq!(20, rx.recv().await.unwrap());
115//! assert_eq!(30, rx.recv().await.unwrap());
116//! }
117//! ```
118
119use crate::loom::cell::UnsafeCell;
120use crate::loom::sync::atomic::{AtomicBool, AtomicUsize};
121use crate::loom::sync::{Arc, Mutex, MutexGuard};
122use crate::task::coop::cooperative;
123use crate::util::linked_list::{self, GuardedLinkedList, LinkedList};
124use crate::util::WakeList;
125
126use std::fmt;
127use std::future::Future;
128use std::marker::PhantomPinned;
129use std::pin::Pin;
130use std::ptr::NonNull;
131use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release, SeqCst};
132use std::task::{ready, Context, Poll, Waker};
133
134/// Sending-half of the [`broadcast`] channel.
135///
136/// May be used from many threads. Messages can be sent with
137/// [`send`][Sender::send].
138///
139/// # Examples
140///
141/// ```
142/// use tokio::sync::broadcast;
143///
144/// #[tokio::main]
145/// async fn main() {
146/// let (tx, mut rx1) = broadcast::channel(16);
147/// let mut rx2 = tx.subscribe();
148///
149/// tokio::spawn(async move {
150/// assert_eq!(rx1.recv().await.unwrap(), 10);
151/// assert_eq!(rx1.recv().await.unwrap(), 20);
152/// });
153///
154/// tokio::spawn(async move {
155/// assert_eq!(rx2.recv().await.unwrap(), 10);
156/// assert_eq!(rx2.recv().await.unwrap(), 20);
157/// });
158///
159/// tx.send(10).unwrap();
160/// tx.send(20).unwrap();
161/// }
162/// ```
163///
164/// [`broadcast`]: crate::sync::broadcast
165pub struct Sender<T> {
166 shared: Arc<Shared<T>>,
167}
168
169/// A sender that does not prevent the channel from being closed.
170///
171/// If all [`Sender`] instances of a channel were dropped and only `WeakSender`
172/// instances remain, the channel is closed.
173///
174/// In order to send messages, the `WeakSender` needs to be upgraded using
175/// [`WeakSender::upgrade`], which returns `Option<Sender>`. It returns `None`
176/// if all `Sender`s have been dropped, and otherwise it returns a `Sender`.
177///
178/// [`Sender`]: Sender
179/// [`WeakSender::upgrade`]: WeakSender::upgrade
180///
181/// # Examples
182///
183/// ```
184/// use tokio::sync::broadcast::channel;
185///
186/// #[tokio::main]
187/// async fn main() {
188/// let (tx, _rx) = channel::<i32>(15);
189/// let tx_weak = tx.downgrade();
190///
191/// // Upgrading will succeed because `tx` still exists.
192/// assert!(tx_weak.upgrade().is_some());
193///
194/// // If we drop `tx`, then it will fail.
195/// drop(tx);
196/// assert!(tx_weak.clone().upgrade().is_none());
197/// }
198/// ```
199pub struct WeakSender<T> {
200 shared: Arc<Shared<T>>,
201}
202
203/// Receiving-half of the [`broadcast`] channel.
204///
205/// Must not be used concurrently. Messages may be retrieved using
206/// [`recv`][Receiver::recv].
207///
208/// To turn this receiver into a `Stream`, you can use the [`BroadcastStream`]
209/// wrapper.
210///
211/// [`BroadcastStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.BroadcastStream.html
212///
213/// # Examples
214///
215/// ```
216/// use tokio::sync::broadcast;
217///
218/// #[tokio::main]
219/// async fn main() {
220/// let (tx, mut rx1) = broadcast::channel(16);
221/// let mut rx2 = tx.subscribe();
222///
223/// tokio::spawn(async move {
224/// assert_eq!(rx1.recv().await.unwrap(), 10);
225/// assert_eq!(rx1.recv().await.unwrap(), 20);
226/// });
227///
228/// tokio::spawn(async move {
229/// assert_eq!(rx2.recv().await.unwrap(), 10);
230/// assert_eq!(rx2.recv().await.unwrap(), 20);
231/// });
232///
233/// tx.send(10).unwrap();
234/// tx.send(20).unwrap();
235/// }
236/// ```
237///
238/// [`broadcast`]: crate::sync::broadcast
239pub struct Receiver<T> {
240 /// State shared with all receivers and senders.
241 shared: Arc<Shared<T>>,
242
243 /// Next position to read from
244 next: u64,
245}
246
247pub mod error {
248 //! Broadcast error types
249
250 use std::fmt;
251
252 /// Error returned by the [`send`] function on a [`Sender`].
253 ///
254 /// A **send** operation can only fail if there are no active receivers,
255 /// implying that the message could never be received. The error contains the
256 /// message being sent as a payload so it can be recovered.
257 ///
258 /// [`send`]: crate::sync::broadcast::Sender::send
259 /// [`Sender`]: crate::sync::broadcast::Sender
260 #[derive(Debug)]
261 pub struct SendError<T>(pub T);
262
263 impl<T> fmt::Display for SendError<T> {
264 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
265 write!(f, "channel closed")
266 }
267 }
268
269 impl<T: fmt::Debug> std::error::Error for SendError<T> {}
270
271 /// An error returned from the [`recv`] function on a [`Receiver`].
272 ///
273 /// [`recv`]: crate::sync::broadcast::Receiver::recv
274 /// [`Receiver`]: crate::sync::broadcast::Receiver
275 #[derive(Debug, PartialEq, Eq, Clone)]
276 pub enum RecvError {
277 /// There are no more active senders implying no further messages will ever
278 /// be sent.
279 Closed,
280
281 /// The receiver lagged too far behind. Attempting to receive again will
282 /// return the oldest message still retained by the channel.
283 ///
284 /// Includes the number of skipped messages.
285 Lagged(u64),
286 }
287
288 impl fmt::Display for RecvError {
289 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
290 match self {
291 RecvError::Closed => write!(f, "channel closed"),
292 RecvError::Lagged(amt) => write!(f, "channel lagged by {amt}"),
293 }
294 }
295 }
296
297 impl std::error::Error for RecvError {}
298
299 /// An error returned from the [`try_recv`] function on a [`Receiver`].
300 ///
301 /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
302 /// [`Receiver`]: crate::sync::broadcast::Receiver
303 #[derive(Debug, PartialEq, Eq, Clone)]
304 pub enum TryRecvError {
305 /// The channel is currently empty. There are still active
306 /// [`Sender`] handles, so data may yet become available.
307 ///
308 /// [`Sender`]: crate::sync::broadcast::Sender
309 Empty,
310
311 /// There are no more active senders implying no further messages will ever
312 /// be sent.
313 Closed,
314
315 /// The receiver lagged too far behind and has been forcibly disconnected.
316 /// Attempting to receive again will return the oldest message still
317 /// retained by the channel.
318 ///
319 /// Includes the number of skipped messages.
320 Lagged(u64),
321 }
322
323 impl fmt::Display for TryRecvError {
324 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
325 match self {
326 TryRecvError::Empty => write!(f, "channel empty"),
327 TryRecvError::Closed => write!(f, "channel closed"),
328 TryRecvError::Lagged(amt) => write!(f, "channel lagged by {amt}"),
329 }
330 }
331 }
332
333 impl std::error::Error for TryRecvError {}
334}
335
336use self::error::{RecvError, SendError, TryRecvError};
337
338use super::Notify;
339
340/// Data shared between senders and receivers.
341struct Shared<T> {
342 /// slots in the channel.
343 buffer: Box<[Mutex<Slot<T>>]>,
344
345 /// Mask a position -> index.
346 mask: usize,
347
348 /// Tail of the queue. Includes the rx wait list.
349 tail: Mutex<Tail>,
350
351 /// Number of outstanding Sender handles.
352 num_tx: AtomicUsize,
353
354 /// Number of outstanding weak Sender handles.
355 num_weak_tx: AtomicUsize,
356
357 /// Notify when the last subscribed [`Receiver`] drops.
358 notify_last_rx_drop: Notify,
359}
360
361/// Next position to write a value.
362struct Tail {
363 /// Next position to write to.
364 pos: u64,
365
366 /// Number of active receivers.
367 rx_cnt: usize,
368
369 /// True if the channel is closed.
370 closed: bool,
371
372 /// Receivers waiting for a value.
373 waiters: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
374}
375
376/// Slot in the buffer.
377struct Slot<T> {
378 /// Remaining number of receivers that are expected to see this value.
379 ///
380 /// When this goes to zero, the value is released.
381 ///
382 /// An atomic is used as it is mutated concurrently with the slot read lock
383 /// acquired.
384 rem: AtomicUsize,
385
386 /// Uniquely identifies the `send` stored in the slot.
387 pos: u64,
388
389 /// The value being broadcast.
390 ///
391 /// The value is set by `send` when the write lock is held. When a reader
392 /// drops, `rem` is decremented. When it hits zero, the value is dropped.
393 val: Option<T>,
394}
395
396/// An entry in the wait queue.
397struct Waiter {
398 /// True if queued.
399 queued: AtomicBool,
400
401 /// Task waiting on the broadcast channel.
402 waker: Option<Waker>,
403
404 /// Intrusive linked-list pointers.
405 pointers: linked_list::Pointers<Waiter>,
406
407 /// Should not be `Unpin`.
408 _p: PhantomPinned,
409}
410
411impl Waiter {
412 fn new() -> Self {
413 Self {
414 queued: AtomicBool::new(false),
415 waker: None,
416 pointers: linked_list::Pointers::new(),
417 _p: PhantomPinned,
418 }
419 }
420}
421
422generate_addr_of_methods! {
423 impl<> Waiter {
424 unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
425 &self.pointers
426 }
427 }
428}
429
430struct RecvGuard<'a, T> {
431 slot: MutexGuard<'a, Slot<T>>,
432}
433
434/// Receive a value future.
435struct Recv<'a, T> {
436 /// Receiver being waited on.
437 receiver: &'a mut Receiver<T>,
438
439 /// Entry in the waiter `LinkedList`.
440 waiter: WaiterCell,
441}
442
443// The wrapper around `UnsafeCell` isolates the unsafe impl `Send` and `Sync`
444// from `Recv`.
445struct WaiterCell(UnsafeCell<Waiter>);
446
447unsafe impl Send for WaiterCell {}
448unsafe impl Sync for WaiterCell {}
449
450/// Max number of receivers. Reserve space to lock.
451const MAX_RECEIVERS: usize = usize::MAX >> 2;
452
453/// Create a bounded, multi-producer, multi-consumer channel where each sent
454/// value is broadcasted to all active receivers.
455///
456/// **Note:** The actual capacity may be greater than the provided `capacity`.
457///
458/// All data sent on [`Sender`] will become available on every active
459/// [`Receiver`] in the same order as it was sent.
460///
461/// The `Sender` can be cloned to `send` to the same channel from multiple
462/// points in the process or it can be used concurrently from an `Arc`. New
463/// `Receiver` handles are created by calling [`Sender::subscribe`].
464///
465/// If all [`Receiver`] handles are dropped, the `send` method will return a
466/// [`SendError`]. Similarly, if all [`Sender`] handles are dropped, the [`recv`]
467/// method will return a [`RecvError`].
468///
469/// [`Sender`]: crate::sync::broadcast::Sender
470/// [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
471/// [`Receiver`]: crate::sync::broadcast::Receiver
472/// [`recv`]: crate::sync::broadcast::Receiver::recv
473/// [`SendError`]: crate::sync::broadcast::error::SendError
474/// [`RecvError`]: crate::sync::broadcast::error::RecvError
475///
476/// # Examples
477///
478/// ```
479/// use tokio::sync::broadcast;
480///
481/// #[tokio::main]
482/// async fn main() {
483/// let (tx, mut rx1) = broadcast::channel(16);
484/// let mut rx2 = tx.subscribe();
485///
486/// tokio::spawn(async move {
487/// assert_eq!(rx1.recv().await.unwrap(), 10);
488/// assert_eq!(rx1.recv().await.unwrap(), 20);
489/// });
490///
491/// tokio::spawn(async move {
492/// assert_eq!(rx2.recv().await.unwrap(), 10);
493/// assert_eq!(rx2.recv().await.unwrap(), 20);
494/// });
495///
496/// tx.send(10).unwrap();
497/// tx.send(20).unwrap();
498/// }
499/// ```
500///
501/// # Panics
502///
503/// This will panic if `capacity` is equal to `0` or larger
504/// than `usize::MAX / 2`.
505#[track_caller]
506pub fn channel<T: Clone>(capacity: usize) -> (Sender<T>, Receiver<T>) {
507 // SAFETY: In the line below we are creating one extra receiver, so there will be 1 in total.
508 let tx = unsafe { Sender::new_with_receiver_count(1, capacity) };
509 let rx = Receiver {
510 shared: tx.shared.clone(),
511 next: 0,
512 };
513 (tx, rx)
514}
515
516impl<T> Sender<T> {
517 /// Creates the sending-half of the [`broadcast`] channel.
518 ///
519 /// See the documentation of [`broadcast::channel`] for more information on this method.
520 ///
521 /// [`broadcast`]: crate::sync::broadcast
522 /// [`broadcast::channel`]: crate::sync::broadcast::channel
523 #[track_caller]
524 pub fn new(capacity: usize) -> Self {
525 // SAFETY: We don't create extra receivers, so there are 0.
526 unsafe { Self::new_with_receiver_count(0, capacity) }
527 }
528
529 /// Creates the sending-half of the [`broadcast`](self) channel, and provide the receiver
530 /// count.
531 ///
532 /// See the documentation of [`broadcast::channel`](self::channel) for more errors when
533 /// calling this function.
534 ///
535 /// # Safety:
536 ///
537 /// The caller must ensure that the amount of receivers for this Sender is correct before
538 /// the channel functionalities are used, the count is zero by default, as this function
539 /// does not create any receivers by itself.
540 #[track_caller]
541 unsafe fn new_with_receiver_count(receiver_count: usize, mut capacity: usize) -> Self {
542 assert!(capacity > 0, "broadcast channel capacity cannot be zero");
543 assert!(
544 capacity <= usize::MAX >> 1,
545 "broadcast channel capacity exceeded `usize::MAX / 2`"
546 );
547
548 // Round to a power of two
549 capacity = capacity.next_power_of_two();
550
551 let mut buffer = Vec::with_capacity(capacity);
552
553 for i in 0..capacity {
554 buffer.push(Mutex::new(Slot {
555 rem: AtomicUsize::new(0),
556 pos: (i as u64).wrapping_sub(capacity as u64),
557 val: None,
558 }));
559 }
560
561 let shared = Arc::new(Shared {
562 buffer: buffer.into_boxed_slice(),
563 mask: capacity - 1,
564 tail: Mutex::new(Tail {
565 pos: 0,
566 rx_cnt: receiver_count,
567 closed: false,
568 waiters: LinkedList::new(),
569 }),
570 num_tx: AtomicUsize::new(1),
571 num_weak_tx: AtomicUsize::new(0),
572 notify_last_rx_drop: Notify::new(),
573 });
574
575 Sender { shared }
576 }
577
578 /// Attempts to send a value to all active [`Receiver`] handles, returning
579 /// it back if it could not be sent.
580 ///
581 /// A successful send occurs when there is at least one active [`Receiver`]
582 /// handle. An unsuccessful send would be one where all associated
583 /// [`Receiver`] handles have already been dropped.
584 ///
585 /// # Return
586 ///
587 /// On success, the number of subscribed [`Receiver`] handles is returned.
588 /// This does not mean that this number of receivers will see the message as
589 /// a receiver may drop or lag ([see lagging](self#lagging)) before receiving
590 /// the message.
591 ///
592 /// # Note
593 ///
594 /// A return value of `Ok` **does not** mean that the sent value will be
595 /// observed by all or any of the active [`Receiver`] handles. [`Receiver`]
596 /// handles may be dropped before receiving the sent message.
597 ///
598 /// A return value of `Err` **does not** mean that future calls to `send`
599 /// will fail. New [`Receiver`] handles may be created by calling
600 /// [`subscribe`].
601 ///
602 /// [`Receiver`]: crate::sync::broadcast::Receiver
603 /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
604 ///
605 /// # Examples
606 ///
607 /// ```
608 /// use tokio::sync::broadcast;
609 ///
610 /// #[tokio::main]
611 /// async fn main() {
612 /// let (tx, mut rx1) = broadcast::channel(16);
613 /// let mut rx2 = tx.subscribe();
614 ///
615 /// tokio::spawn(async move {
616 /// assert_eq!(rx1.recv().await.unwrap(), 10);
617 /// assert_eq!(rx1.recv().await.unwrap(), 20);
618 /// });
619 ///
620 /// tokio::spawn(async move {
621 /// assert_eq!(rx2.recv().await.unwrap(), 10);
622 /// assert_eq!(rx2.recv().await.unwrap(), 20);
623 /// });
624 ///
625 /// tx.send(10).unwrap();
626 /// tx.send(20).unwrap();
627 /// }
628 /// ```
629 pub fn send(&self, value: T) -> Result<usize, SendError<T>> {
630 let mut tail = self.shared.tail.lock();
631
632 if tail.rx_cnt == 0 {
633 return Err(SendError(value));
634 }
635
636 // Position to write into
637 let pos = tail.pos;
638 let rem = tail.rx_cnt;
639 let idx = (pos & self.shared.mask as u64) as usize;
640
641 // Update the tail position
642 tail.pos = tail.pos.wrapping_add(1);
643
644 // Get the slot
645 let mut slot = self.shared.buffer[idx].lock();
646
647 // Track the position
648 slot.pos = pos;
649
650 // Set remaining receivers
651 slot.rem.with_mut(|v| *v = rem);
652
653 // Write the value
654 slot.val = Some(value);
655
656 // Release the slot lock before notifying the receivers.
657 drop(slot);
658
659 // Notify and release the mutex. This must happen after the slot lock is
660 // released, otherwise the writer lock bit could be cleared while another
661 // thread is in the critical section.
662 self.shared.notify_rx(tail);
663
664 Ok(rem)
665 }
666
667 /// Creates a new [`Receiver`] handle that will receive values sent **after**
668 /// this call to `subscribe`.
669 ///
670 /// # Examples
671 ///
672 /// ```
673 /// use tokio::sync::broadcast;
674 ///
675 /// #[tokio::main]
676 /// async fn main() {
677 /// let (tx, _rx) = broadcast::channel(16);
678 ///
679 /// // Will not be seen
680 /// tx.send(10).unwrap();
681 ///
682 /// let mut rx = tx.subscribe();
683 ///
684 /// tx.send(20).unwrap();
685 ///
686 /// let value = rx.recv().await.unwrap();
687 /// assert_eq!(20, value);
688 /// }
689 /// ```
690 pub fn subscribe(&self) -> Receiver<T> {
691 let shared = self.shared.clone();
692 new_receiver(shared)
693 }
694
695 /// Converts the `Sender` to a [`WeakSender`] that does not count
696 /// towards RAII semantics, i.e. if all `Sender` instances of the
697 /// channel were dropped and only `WeakSender` instances remain,
698 /// the channel is closed.
699 #[must_use = "Downgrade creates a WeakSender without destroying the original non-weak sender."]
700 pub fn downgrade(&self) -> WeakSender<T> {
701 self.shared.num_weak_tx.fetch_add(1, Relaxed);
702 WeakSender {
703 shared: self.shared.clone(),
704 }
705 }
706
707 /// Returns the number of queued values.
708 ///
709 /// A value is queued until it has either been seen by all receivers that were alive at the time
710 /// it was sent, or has been evicted from the queue by subsequent sends that exceeded the
711 /// queue's capacity.
712 ///
713 /// # Note
714 ///
715 /// In contrast to [`Receiver::len`], this method only reports queued values and not values that
716 /// have been evicted from the queue before being seen by all receivers.
717 ///
718 /// # Examples
719 ///
720 /// ```
721 /// use tokio::sync::broadcast;
722 ///
723 /// #[tokio::main]
724 /// async fn main() {
725 /// let (tx, mut rx1) = broadcast::channel(16);
726 /// let mut rx2 = tx.subscribe();
727 ///
728 /// tx.send(10).unwrap();
729 /// tx.send(20).unwrap();
730 /// tx.send(30).unwrap();
731 ///
732 /// assert_eq!(tx.len(), 3);
733 ///
734 /// rx1.recv().await.unwrap();
735 ///
736 /// // The len is still 3 since rx2 hasn't seen the first value yet.
737 /// assert_eq!(tx.len(), 3);
738 ///
739 /// rx2.recv().await.unwrap();
740 ///
741 /// assert_eq!(tx.len(), 2);
742 /// }
743 /// ```
744 pub fn len(&self) -> usize {
745 let tail = self.shared.tail.lock();
746
747 let base_idx = (tail.pos & self.shared.mask as u64) as usize;
748 let mut low = 0;
749 let mut high = self.shared.buffer.len();
750 while low < high {
751 let mid = low + (high - low) / 2;
752 let idx = base_idx.wrapping_add(mid) & self.shared.mask;
753 if self.shared.buffer[idx].lock().rem.load(SeqCst) == 0 {
754 low = mid + 1;
755 } else {
756 high = mid;
757 }
758 }
759
760 self.shared.buffer.len() - low
761 }
762
763 /// Returns true if there are no queued values.
764 ///
765 /// # Examples
766 ///
767 /// ```
768 /// use tokio::sync::broadcast;
769 ///
770 /// #[tokio::main]
771 /// async fn main() {
772 /// let (tx, mut rx1) = broadcast::channel(16);
773 /// let mut rx2 = tx.subscribe();
774 ///
775 /// assert!(tx.is_empty());
776 ///
777 /// tx.send(10).unwrap();
778 ///
779 /// assert!(!tx.is_empty());
780 ///
781 /// rx1.recv().await.unwrap();
782 ///
783 /// // The queue is still not empty since rx2 hasn't seen the value.
784 /// assert!(!tx.is_empty());
785 ///
786 /// rx2.recv().await.unwrap();
787 ///
788 /// assert!(tx.is_empty());
789 /// }
790 /// ```
791 pub fn is_empty(&self) -> bool {
792 let tail = self.shared.tail.lock();
793
794 let idx = (tail.pos.wrapping_sub(1) & self.shared.mask as u64) as usize;
795 self.shared.buffer[idx].lock().rem.load(SeqCst) == 0
796 }
797
798 /// Returns the number of active receivers.
799 ///
800 /// An active receiver is a [`Receiver`] handle returned from [`channel`] or
801 /// [`subscribe`]. These are the handles that will receive values sent on
802 /// this [`Sender`].
803 ///
804 /// # Note
805 ///
806 /// It is not guaranteed that a sent message will reach this number of
807 /// receivers. Active receivers may never call [`recv`] again before
808 /// dropping.
809 ///
810 /// [`recv`]: crate::sync::broadcast::Receiver::recv
811 /// [`Receiver`]: crate::sync::broadcast::Receiver
812 /// [`Sender`]: crate::sync::broadcast::Sender
813 /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
814 /// [`channel`]: crate::sync::broadcast::channel
815 ///
816 /// # Examples
817 ///
818 /// ```
819 /// use tokio::sync::broadcast;
820 ///
821 /// #[tokio::main]
822 /// async fn main() {
823 /// let (tx, _rx1) = broadcast::channel(16);
824 ///
825 /// assert_eq!(1, tx.receiver_count());
826 ///
827 /// let mut _rx2 = tx.subscribe();
828 ///
829 /// assert_eq!(2, tx.receiver_count());
830 ///
831 /// tx.send(10).unwrap();
832 /// }
833 /// ```
834 pub fn receiver_count(&self) -> usize {
835 let tail = self.shared.tail.lock();
836 tail.rx_cnt
837 }
838
839 /// Returns `true` if senders belong to the same channel.
840 ///
841 /// # Examples
842 ///
843 /// ```
844 /// use tokio::sync::broadcast;
845 ///
846 /// #[tokio::main]
847 /// async fn main() {
848 /// let (tx, _rx) = broadcast::channel::<()>(16);
849 /// let tx2 = tx.clone();
850 ///
851 /// assert!(tx.same_channel(&tx2));
852 ///
853 /// let (tx3, _rx3) = broadcast::channel::<()>(16);
854 ///
855 /// assert!(!tx3.same_channel(&tx2));
856 /// }
857 /// ```
858 pub fn same_channel(&self, other: &Self) -> bool {
859 Arc::ptr_eq(&self.shared, &other.shared)
860 }
861
862 /// A future which completes when the number of [Receiver]s subscribed to this `Sender` reaches
863 /// zero.
864 ///
865 /// # Examples
866 ///
867 /// ```
868 /// use futures::FutureExt;
869 /// use tokio::sync::broadcast;
870 ///
871 /// #[tokio::main]
872 /// async fn main() {
873 /// let (tx, mut rx1) = broadcast::channel::<u32>(16);
874 /// let mut rx2 = tx.subscribe();
875 ///
876 /// let _ = tx.send(10);
877 ///
878 /// assert_eq!(rx1.recv().await.unwrap(), 10);
879 /// drop(rx1);
880 /// assert!(tx.closed().now_or_never().is_none());
881 ///
882 /// assert_eq!(rx2.recv().await.unwrap(), 10);
883 /// drop(rx2);
884 /// assert!(tx.closed().now_or_never().is_some());
885 /// }
886 /// ```
887 pub async fn closed(&self) {
888 loop {
889 let notified = self.shared.notify_last_rx_drop.notified();
890
891 {
892 // Ensure the lock drops if the channel isn't closed
893 let tail = self.shared.tail.lock();
894 if tail.closed {
895 return;
896 }
897 }
898
899 notified.await;
900 }
901 }
902
903 fn close_channel(&self) {
904 let mut tail = self.shared.tail.lock();
905 tail.closed = true;
906
907 self.shared.notify_rx(tail);
908 }
909
910 /// Returns the number of [`Sender`] handles.
911 pub fn strong_count(&self) -> usize {
912 self.shared.num_tx.load(Acquire)
913 }
914
915 /// Returns the number of [`WeakSender`] handles.
916 pub fn weak_count(&self) -> usize {
917 self.shared.num_weak_tx.load(Acquire)
918 }
919}
920
921/// Create a new `Receiver` which reads starting from the tail.
922fn new_receiver<T>(shared: Arc<Shared<T>>) -> Receiver<T> {
923 let mut tail = shared.tail.lock();
924
925 assert!(tail.rx_cnt != MAX_RECEIVERS, "max receivers");
926
927 if tail.rx_cnt == 0 {
928 // Potentially need to re-open the channel, if a new receiver has been added between calls
929 // to poll(). Note that we use rx_cnt == 0 instead of is_closed since is_closed also
930 // applies if the sender has been dropped
931 tail.closed = false;
932 }
933
934 tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow");
935 let next = tail.pos;
936
937 drop(tail);
938
939 Receiver { shared, next }
940}
941
942/// List used in `Shared::notify_rx`. It wraps a guarded linked list
943/// and gates the access to it on the `Shared.tail` mutex. It also empties
944/// the list on drop.
945struct WaitersList<'a, T> {
946 list: GuardedLinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
947 is_empty: bool,
948 shared: &'a Shared<T>,
949}
950
951impl<'a, T> Drop for WaitersList<'a, T> {
952 fn drop(&mut self) {
953 // If the list is not empty, we unlink all waiters from it.
954 // We do not wake the waiters to avoid double panics.
955 if !self.is_empty {
956 let _lock_guard = self.shared.tail.lock();
957 while self.list.pop_back().is_some() {}
958 }
959 }
960}
961
962impl<'a, T> WaitersList<'a, T> {
963 fn new(
964 unguarded_list: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
965 guard: Pin<&'a Waiter>,
966 shared: &'a Shared<T>,
967 ) -> Self {
968 let guard_ptr = NonNull::from(guard.get_ref());
969 let list = unguarded_list.into_guarded(guard_ptr);
970 WaitersList {
971 list,
972 is_empty: false,
973 shared,
974 }
975 }
976
977 /// Removes the last element from the guarded list. Modifying this list
978 /// requires an exclusive access to the main list in `Notify`.
979 fn pop_back_locked(&mut self, _tail: &mut Tail) -> Option<NonNull<Waiter>> {
980 let result = self.list.pop_back();
981 if result.is_none() {
982 // Save information about emptiness to avoid waiting for lock
983 // in the destructor.
984 self.is_empty = true;
985 }
986 result
987 }
988}
989
990impl<T> Shared<T> {
991 fn notify_rx<'a, 'b: 'a>(&'b self, mut tail: MutexGuard<'a, Tail>) {
992 // It is critical for `GuardedLinkedList` safety that the guard node is
993 // pinned in memory and is not dropped until the guarded list is dropped.
994 let guard = Waiter::new();
995 pin!(guard);
996
997 // We move all waiters to a secondary list. It uses a `GuardedLinkedList`
998 // underneath to allow every waiter to safely remove itself from it.
999 //
1000 // * This list will be still guarded by the `waiters` lock.
1001 // `NotifyWaitersList` wrapper makes sure we hold the lock to modify it.
1002 // * This wrapper will empty the list on drop. It is critical for safety
1003 // that we will not leave any list entry with a pointer to the local
1004 // guard node after this function returns / panics.
1005 let mut list = WaitersList::new(std::mem::take(&mut tail.waiters), guard.as_ref(), self);
1006
1007 let mut wakers = WakeList::new();
1008 'outer: loop {
1009 while wakers.can_push() {
1010 match list.pop_back_locked(&mut tail) {
1011 Some(waiter) => {
1012 unsafe {
1013 // Safety: accessing `waker` is safe because
1014 // the tail lock is held.
1015 if let Some(waker) = (*waiter.as_ptr()).waker.take() {
1016 wakers.push(waker);
1017 }
1018
1019 // Safety: `queued` is atomic.
1020 let queued = &(*waiter.as_ptr()).queued;
1021 // `Relaxed` suffices because the tail lock is held.
1022 assert!(queued.load(Relaxed));
1023 // `Release` is needed to synchronize with `Recv::drop`.
1024 // It is critical to set this variable **after** waker
1025 // is extracted, otherwise we may data race with `Recv::drop`.
1026 queued.store(false, Release);
1027 }
1028 }
1029 None => {
1030 break 'outer;
1031 }
1032 }
1033 }
1034
1035 // Release the lock before waking.
1036 drop(tail);
1037
1038 // Before we acquire the lock again all sorts of things can happen:
1039 // some waiters may remove themselves from the list and new waiters
1040 // may be added. This is fine since at worst we will unnecessarily
1041 // wake up waiters which will then queue themselves again.
1042
1043 wakers.wake_all();
1044
1045 // Acquire the lock again.
1046 tail = self.tail.lock();
1047 }
1048
1049 // Release the lock before waking.
1050 drop(tail);
1051
1052 wakers.wake_all();
1053 }
1054}
1055
1056impl<T> Clone for Sender<T> {
1057 fn clone(&self) -> Sender<T> {
1058 let shared = self.shared.clone();
1059 shared.num_tx.fetch_add(1, Relaxed);
1060
1061 Sender { shared }
1062 }
1063}
1064
1065impl<T> Drop for Sender<T> {
1066 fn drop(&mut self) {
1067 if 1 == self.shared.num_tx.fetch_sub(1, AcqRel) {
1068 self.close_channel();
1069 }
1070 }
1071}
1072
1073impl<T> WeakSender<T> {
1074 /// Tries to convert a `WeakSender` into a [`Sender`].
1075 ///
1076 /// This will return `Some` if there are other `Sender` instances alive and
1077 /// the channel wasn't previously dropped, otherwise `None` is returned.
1078 #[must_use]
1079 pub fn upgrade(&self) -> Option<Sender<T>> {
1080 let mut tx_count = self.shared.num_tx.load(Acquire);
1081
1082 loop {
1083 if tx_count == 0 {
1084 // channel is closed so this WeakSender can not be upgraded
1085 return None;
1086 }
1087
1088 match self
1089 .shared
1090 .num_tx
1091 .compare_exchange_weak(tx_count, tx_count + 1, Relaxed, Acquire)
1092 {
1093 Ok(_) => {
1094 return Some(Sender {
1095 shared: self.shared.clone(),
1096 })
1097 }
1098 Err(prev_count) => tx_count = prev_count,
1099 }
1100 }
1101 }
1102
1103 /// Returns the number of [`Sender`] handles.
1104 pub fn strong_count(&self) -> usize {
1105 self.shared.num_tx.load(Acquire)
1106 }
1107
1108 /// Returns the number of [`WeakSender`] handles.
1109 pub fn weak_count(&self) -> usize {
1110 self.shared.num_weak_tx.load(Acquire)
1111 }
1112}
1113
1114impl<T> Clone for WeakSender<T> {
1115 fn clone(&self) -> WeakSender<T> {
1116 let shared = self.shared.clone();
1117 shared.num_weak_tx.fetch_add(1, Relaxed);
1118
1119 Self { shared }
1120 }
1121}
1122
1123impl<T> Drop for WeakSender<T> {
1124 fn drop(&mut self) {
1125 self.shared.num_weak_tx.fetch_sub(1, AcqRel);
1126 }
1127}
1128
1129impl<T> Receiver<T> {
1130 /// Returns the number of messages that were sent into the channel and that
1131 /// this [`Receiver`] has yet to receive.
1132 ///
1133 /// If the returned value from `len` is larger than the next largest power of 2
1134 /// of the capacity of the channel any call to [`recv`] will return an
1135 /// `Err(RecvError::Lagged)` and any call to [`try_recv`] will return an
1136 /// `Err(TryRecvError::Lagged)`, e.g. if the capacity of the channel is 10,
1137 /// [`recv`] will start to return `Err(RecvError::Lagged)` once `len` returns
1138 /// values larger than 16.
1139 ///
1140 /// [`Receiver`]: crate::sync::broadcast::Receiver
1141 /// [`recv`]: crate::sync::broadcast::Receiver::recv
1142 /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
1143 ///
1144 /// # Examples
1145 ///
1146 /// ```
1147 /// use tokio::sync::broadcast;
1148 ///
1149 /// #[tokio::main]
1150 /// async fn main() {
1151 /// let (tx, mut rx1) = broadcast::channel(16);
1152 ///
1153 /// tx.send(10).unwrap();
1154 /// tx.send(20).unwrap();
1155 ///
1156 /// assert_eq!(rx1.len(), 2);
1157 /// assert_eq!(rx1.recv().await.unwrap(), 10);
1158 /// assert_eq!(rx1.len(), 1);
1159 /// assert_eq!(rx1.recv().await.unwrap(), 20);
1160 /// assert_eq!(rx1.len(), 0);
1161 /// }
1162 /// ```
1163 pub fn len(&self) -> usize {
1164 let next_send_pos = self.shared.tail.lock().pos;
1165 (next_send_pos - self.next) as usize
1166 }
1167
1168 /// Returns true if there aren't any messages in the channel that the [`Receiver`]
1169 /// has yet to receive.
1170 ///
1171 /// [`Receiver]: create::sync::broadcast::Receiver
1172 ///
1173 /// # Examples
1174 ///
1175 /// ```
1176 /// use tokio::sync::broadcast;
1177 ///
1178 /// #[tokio::main]
1179 /// async fn main() {
1180 /// let (tx, mut rx1) = broadcast::channel(16);
1181 ///
1182 /// assert!(rx1.is_empty());
1183 ///
1184 /// tx.send(10).unwrap();
1185 /// tx.send(20).unwrap();
1186 ///
1187 /// assert!(!rx1.is_empty());
1188 /// assert_eq!(rx1.recv().await.unwrap(), 10);
1189 /// assert_eq!(rx1.recv().await.unwrap(), 20);
1190 /// assert!(rx1.is_empty());
1191 /// }
1192 /// ```
1193 pub fn is_empty(&self) -> bool {
1194 self.len() == 0
1195 }
1196
1197 /// Returns `true` if receivers belong to the same channel.
1198 ///
1199 /// # Examples
1200 ///
1201 /// ```
1202 /// use tokio::sync::broadcast;
1203 ///
1204 /// #[tokio::main]
1205 /// async fn main() {
1206 /// let (tx, rx) = broadcast::channel::<()>(16);
1207 /// let rx2 = tx.subscribe();
1208 ///
1209 /// assert!(rx.same_channel(&rx2));
1210 ///
1211 /// let (_tx3, rx3) = broadcast::channel::<()>(16);
1212 ///
1213 /// assert!(!rx3.same_channel(&rx2));
1214 /// }
1215 /// ```
1216 pub fn same_channel(&self, other: &Self) -> bool {
1217 Arc::ptr_eq(&self.shared, &other.shared)
1218 }
1219
1220 /// Locks the next value if there is one.
1221 fn recv_ref(
1222 &mut self,
1223 waiter: Option<(&UnsafeCell<Waiter>, &Waker)>,
1224 ) -> Result<RecvGuard<'_, T>, TryRecvError> {
1225 let idx = (self.next & self.shared.mask as u64) as usize;
1226
1227 // The slot holding the next value to read
1228 let mut slot = self.shared.buffer[idx].lock();
1229
1230 if slot.pos != self.next {
1231 // Release the `slot` lock before attempting to acquire the `tail`
1232 // lock. This is required because `send2` acquires the tail lock
1233 // first followed by the slot lock. Acquiring the locks in reverse
1234 // order here would result in a potential deadlock: `recv_ref`
1235 // acquires the `slot` lock and attempts to acquire the `tail` lock
1236 // while `send2` acquired the `tail` lock and attempts to acquire
1237 // the slot lock.
1238 drop(slot);
1239
1240 let mut old_waker = None;
1241
1242 let mut tail = self.shared.tail.lock();
1243
1244 // Acquire slot lock again
1245 slot = self.shared.buffer[idx].lock();
1246
1247 // Make sure the position did not change. This could happen in the
1248 // unlikely event that the buffer is wrapped between dropping the
1249 // read lock and acquiring the tail lock.
1250 if slot.pos != self.next {
1251 let next_pos = slot.pos.wrapping_add(self.shared.buffer.len() as u64);
1252
1253 if next_pos == self.next {
1254 // At this point the channel is empty for *this* receiver. If
1255 // it's been closed, then that's what we return, otherwise we
1256 // set a waker and return empty.
1257 if tail.closed {
1258 return Err(TryRecvError::Closed);
1259 }
1260
1261 // Store the waker
1262 if let Some((waiter, waker)) = waiter {
1263 // Safety: called while locked.
1264 unsafe {
1265 // Only queue if not already queued
1266 waiter.with_mut(|ptr| {
1267 // If there is no waker **or** if the currently
1268 // stored waker references a **different** task,
1269 // track the tasks' waker to be notified on
1270 // receipt of a new value.
1271 match (*ptr).waker {
1272 Some(ref w) if w.will_wake(waker) => {}
1273 _ => {
1274 old_waker = std::mem::replace(
1275 &mut (*ptr).waker,
1276 Some(waker.clone()),
1277 );
1278 }
1279 }
1280
1281 // If the waiter is not already queued, enqueue it.
1282 // `Relaxed` order suffices: we have synchronized with
1283 // all writers through the tail lock that we hold.
1284 if !(*ptr).queued.load(Relaxed) {
1285 // `Relaxed` order suffices: all the readers will
1286 // synchronize with this write through the tail lock.
1287 (*ptr).queued.store(true, Relaxed);
1288 tail.waiters.push_front(NonNull::new_unchecked(&mut *ptr));
1289 }
1290 });
1291 }
1292 }
1293
1294 // Drop the old waker after releasing the locks.
1295 drop(slot);
1296 drop(tail);
1297 drop(old_waker);
1298
1299 return Err(TryRecvError::Empty);
1300 }
1301
1302 // At this point, the receiver has lagged behind the sender by
1303 // more than the channel capacity. The receiver will attempt to
1304 // catch up by skipping dropped messages and setting the
1305 // internal cursor to the **oldest** message stored by the
1306 // channel.
1307 let next = tail.pos.wrapping_sub(self.shared.buffer.len() as u64);
1308
1309 let missed = next.wrapping_sub(self.next);
1310
1311 drop(tail);
1312
1313 // The receiver is slow but no values have been missed
1314 if missed == 0 {
1315 self.next = self.next.wrapping_add(1);
1316
1317 return Ok(RecvGuard { slot });
1318 }
1319
1320 self.next = next;
1321
1322 return Err(TryRecvError::Lagged(missed));
1323 }
1324 }
1325
1326 self.next = self.next.wrapping_add(1);
1327
1328 Ok(RecvGuard { slot })
1329 }
1330
1331 /// Returns the number of [`Sender`] handles.
1332 pub fn sender_strong_count(&self) -> usize {
1333 self.shared.num_tx.load(Acquire)
1334 }
1335
1336 /// Returns the number of [`WeakSender`] handles.
1337 pub fn sender_weak_count(&self) -> usize {
1338 self.shared.num_weak_tx.load(Acquire)
1339 }
1340
1341 /// Checks if a channel is closed.
1342 ///
1343 /// This method returns `true` if the channel has been closed. The channel is closed
1344 /// when all [`Sender`] have been dropped.
1345 ///
1346 /// [`Sender`]: crate::sync::broadcast::Sender
1347 ///
1348 /// # Examples
1349 /// ```
1350 /// use tokio::sync::broadcast;
1351 ///
1352 /// #[tokio::main]
1353 /// async fn main() {
1354 /// let (tx, rx) = broadcast::channel::<()>(10);
1355 /// assert!(!rx.is_closed());
1356 ///
1357 /// drop(tx);
1358 ///
1359 /// assert!(rx.is_closed());
1360 /// }
1361 /// ```
1362 pub fn is_closed(&self) -> bool {
1363 // Channel is closed when there are no strong senders left active
1364 self.shared.num_tx.load(Acquire) == 0
1365 }
1366}
1367
1368impl<T: Clone> Receiver<T> {
1369 /// Re-subscribes to the channel starting from the current tail element.
1370 ///
1371 /// This [`Receiver`] handle will receive a clone of all values sent
1372 /// **after** it has resubscribed. This will not include elements that are
1373 /// in the queue of the current receiver. Consider the following example.
1374 ///
1375 /// # Examples
1376 ///
1377 /// ```
1378 /// use tokio::sync::broadcast;
1379 ///
1380 /// #[tokio::main]
1381 /// async fn main() {
1382 /// let (tx, mut rx) = broadcast::channel(2);
1383 ///
1384 /// tx.send(1).unwrap();
1385 /// let mut rx2 = rx.resubscribe();
1386 /// tx.send(2).unwrap();
1387 ///
1388 /// assert_eq!(rx2.recv().await.unwrap(), 2);
1389 /// assert_eq!(rx.recv().await.unwrap(), 1);
1390 /// }
1391 /// ```
1392 pub fn resubscribe(&self) -> Self {
1393 let shared = self.shared.clone();
1394 new_receiver(shared)
1395 }
1396 /// Receives the next value for this receiver.
1397 ///
1398 /// Each [`Receiver`] handle will receive a clone of all values sent
1399 /// **after** it has subscribed.
1400 ///
1401 /// `Err(RecvError::Closed)` is returned when all `Sender` halves have
1402 /// dropped, indicating that no further values can be sent on the channel.
1403 ///
1404 /// If the [`Receiver`] handle falls behind, once the channel is full, newly
1405 /// sent values will overwrite old values. At this point, a call to [`recv`]
1406 /// will return with `Err(RecvError::Lagged)` and the [`Receiver`]'s
1407 /// internal cursor is updated to point to the oldest value still held by
1408 /// the channel. A subsequent call to [`recv`] will return this value
1409 /// **unless** it has been since overwritten.
1410 ///
1411 /// # Cancel safety
1412 ///
1413 /// This method is cancel safe. If `recv` is used as the event in a
1414 /// [`tokio::select!`](crate::select) statement and some other branch
1415 /// completes first, it is guaranteed that no messages were received on this
1416 /// channel.
1417 ///
1418 /// [`Receiver`]: crate::sync::broadcast::Receiver
1419 /// [`recv`]: crate::sync::broadcast::Receiver::recv
1420 ///
1421 /// # Examples
1422 ///
1423 /// ```
1424 /// use tokio::sync::broadcast;
1425 ///
1426 /// #[tokio::main]
1427 /// async fn main() {
1428 /// let (tx, mut rx1) = broadcast::channel(16);
1429 /// let mut rx2 = tx.subscribe();
1430 ///
1431 /// tokio::spawn(async move {
1432 /// assert_eq!(rx1.recv().await.unwrap(), 10);
1433 /// assert_eq!(rx1.recv().await.unwrap(), 20);
1434 /// });
1435 ///
1436 /// tokio::spawn(async move {
1437 /// assert_eq!(rx2.recv().await.unwrap(), 10);
1438 /// assert_eq!(rx2.recv().await.unwrap(), 20);
1439 /// });
1440 ///
1441 /// tx.send(10).unwrap();
1442 /// tx.send(20).unwrap();
1443 /// }
1444 /// ```
1445 ///
1446 /// Handling lag
1447 ///
1448 /// ```
1449 /// use tokio::sync::broadcast;
1450 ///
1451 /// #[tokio::main]
1452 /// async fn main() {
1453 /// let (tx, mut rx) = broadcast::channel(2);
1454 ///
1455 /// tx.send(10).unwrap();
1456 /// tx.send(20).unwrap();
1457 /// tx.send(30).unwrap();
1458 ///
1459 /// // The receiver lagged behind
1460 /// assert!(rx.recv().await.is_err());
1461 ///
1462 /// // At this point, we can abort or continue with lost messages
1463 ///
1464 /// assert_eq!(20, rx.recv().await.unwrap());
1465 /// assert_eq!(30, rx.recv().await.unwrap());
1466 /// }
1467 /// ```
1468 pub async fn recv(&mut self) -> Result<T, RecvError> {
1469 cooperative(Recv::new(self)).await
1470 }
1471
1472 /// Attempts to return a pending value on this receiver without awaiting.
1473 ///
1474 /// This is useful for a flavor of "optimistic check" before deciding to
1475 /// await on a receiver.
1476 ///
1477 /// Compared with [`recv`], this function has three failure cases instead of two
1478 /// (one for closed, one for an empty buffer, one for a lagging receiver).
1479 ///
1480 /// `Err(TryRecvError::Closed)` is returned when all `Sender` halves have
1481 /// dropped, indicating that no further values can be sent on the channel.
1482 ///
1483 /// If the [`Receiver`] handle falls behind, once the channel is full, newly
1484 /// sent values will overwrite old values. At this point, a call to [`recv`]
1485 /// will return with `Err(TryRecvError::Lagged)` and the [`Receiver`]'s
1486 /// internal cursor is updated to point to the oldest value still held by
1487 /// the channel. A subsequent call to [`try_recv`] will return this value
1488 /// **unless** it has been since overwritten. If there are no values to
1489 /// receive, `Err(TryRecvError::Empty)` is returned.
1490 ///
1491 /// [`recv`]: crate::sync::broadcast::Receiver::recv
1492 /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
1493 /// [`Receiver`]: crate::sync::broadcast::Receiver
1494 ///
1495 /// # Examples
1496 ///
1497 /// ```
1498 /// use tokio::sync::broadcast;
1499 ///
1500 /// #[tokio::main]
1501 /// async fn main() {
1502 /// let (tx, mut rx) = broadcast::channel(16);
1503 ///
1504 /// assert!(rx.try_recv().is_err());
1505 ///
1506 /// tx.send(10).unwrap();
1507 ///
1508 /// let value = rx.try_recv().unwrap();
1509 /// assert_eq!(10, value);
1510 /// }
1511 /// ```
1512 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
1513 let guard = self.recv_ref(None)?;
1514 guard.clone_value().ok_or(TryRecvError::Closed)
1515 }
1516
1517 /// Blocking receive to call outside of asynchronous contexts.
1518 ///
1519 /// # Panics
1520 ///
1521 /// This function panics if called within an asynchronous execution
1522 /// context.
1523 ///
1524 /// # Examples
1525 /// ```
1526 /// use std::thread;
1527 /// use tokio::sync::broadcast;
1528 ///
1529 /// #[tokio::main]
1530 /// async fn main() {
1531 /// let (tx, mut rx) = broadcast::channel(16);
1532 ///
1533 /// let sync_code = thread::spawn(move || {
1534 /// assert_eq!(rx.blocking_recv(), Ok(10));
1535 /// });
1536 ///
1537 /// let _ = tx.send(10);
1538 /// sync_code.join().unwrap();
1539 /// }
1540 /// ```
1541 pub fn blocking_recv(&mut self) -> Result<T, RecvError> {
1542 crate::future::block_on(self.recv())
1543 }
1544}
1545
1546impl<T> Drop for Receiver<T> {
1547 fn drop(&mut self) {
1548 let mut tail = self.shared.tail.lock();
1549
1550 tail.rx_cnt -= 1;
1551 let until = tail.pos;
1552 let remaining_rx = tail.rx_cnt;
1553
1554 if remaining_rx == 0 {
1555 self.shared.notify_last_rx_drop.notify_waiters();
1556 tail.closed = true;
1557 }
1558
1559 drop(tail);
1560
1561 while self.next < until {
1562 match self.recv_ref(None) {
1563 Ok(_) => {}
1564 // The channel is closed
1565 Err(TryRecvError::Closed) => break,
1566 // Ignore lagging, we will catch up
1567 Err(TryRecvError::Lagged(..)) => {}
1568 // Can't be empty
1569 Err(TryRecvError::Empty) => panic!("unexpected empty broadcast channel"),
1570 }
1571 }
1572 }
1573}
1574
1575impl<'a, T> Recv<'a, T> {
1576 fn new(receiver: &'a mut Receiver<T>) -> Recv<'a, T> {
1577 Recv {
1578 receiver,
1579 waiter: WaiterCell(UnsafeCell::new(Waiter {
1580 queued: AtomicBool::new(false),
1581 waker: None,
1582 pointers: linked_list::Pointers::new(),
1583 _p: PhantomPinned,
1584 })),
1585 }
1586 }
1587
1588 /// A custom `project` implementation is used in place of `pin-project-lite`
1589 /// as a custom drop implementation is needed.
1590 fn project(self: Pin<&mut Self>) -> (&mut Receiver<T>, &UnsafeCell<Waiter>) {
1591 unsafe {
1592 // Safety: Receiver is Unpin
1593 is_unpin::<&mut Receiver<T>>();
1594
1595 let me = self.get_unchecked_mut();
1596 (me.receiver, &me.waiter.0)
1597 }
1598 }
1599}
1600
1601impl<'a, T> Future for Recv<'a, T>
1602where
1603 T: Clone,
1604{
1605 type Output = Result<T, RecvError>;
1606
1607 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
1608 ready!(crate::trace::trace_leaf(cx));
1609
1610 let (receiver, waiter) = self.project();
1611
1612 let guard = match receiver.recv_ref(Some((waiter, cx.waker()))) {
1613 Ok(value) => value,
1614 Err(TryRecvError::Empty) => return Poll::Pending,
1615 Err(TryRecvError::Lagged(n)) => return Poll::Ready(Err(RecvError::Lagged(n))),
1616 Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError::Closed)),
1617 };
1618
1619 Poll::Ready(guard.clone_value().ok_or(RecvError::Closed))
1620 }
1621}
1622
1623impl<'a, T> Drop for Recv<'a, T> {
1624 fn drop(&mut self) {
1625 // Safety: `waiter.queued` is atomic.
1626 // Acquire ordering is required to synchronize with
1627 // `Shared::notify_rx` before we drop the object.
1628 let queued = self
1629 .waiter
1630 .0
1631 .with(|ptr| unsafe { (*ptr).queued.load(Acquire) });
1632
1633 // If the waiter is queued, we need to unlink it from the waiters list.
1634 // If not, no further synchronization is required, since the waiter
1635 // is not in the list and, as such, is not shared with any other threads.
1636 if queued {
1637 // Acquire the tail lock. This is required for safety before accessing
1638 // the waiter node.
1639 let mut tail = self.receiver.shared.tail.lock();
1640
1641 // Safety: tail lock is held.
1642 // `Relaxed` order suffices because we hold the tail lock.
1643 let queued = self
1644 .waiter
1645 .0
1646 .with_mut(|ptr| unsafe { (*ptr).queued.load(Relaxed) });
1647
1648 if queued {
1649 // Remove the node
1650 //
1651 // safety: tail lock is held and the wait node is verified to be in
1652 // the list.
1653 unsafe {
1654 self.waiter.0.with_mut(|ptr| {
1655 tail.waiters.remove((&mut *ptr).into());
1656 });
1657 }
1658 }
1659 }
1660 }
1661}
1662
1663/// # Safety
1664///
1665/// `Waiter` is forced to be !Unpin.
1666unsafe impl linked_list::Link for Waiter {
1667 type Handle = NonNull<Waiter>;
1668 type Target = Waiter;
1669
1670 fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
1671 *handle
1672 }
1673
1674 unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
1675 ptr
1676 }
1677
1678 unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
1679 Waiter::addr_of_pointers(target)
1680 }
1681}
1682
1683impl<T> fmt::Debug for Sender<T> {
1684 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1685 write!(fmt, "broadcast::Sender")
1686 }
1687}
1688
1689impl<T> fmt::Debug for WeakSender<T> {
1690 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1691 write!(fmt, "broadcast::WeakSender")
1692 }
1693}
1694
1695impl<T> fmt::Debug for Receiver<T> {
1696 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1697 write!(fmt, "broadcast::Receiver")
1698 }
1699}
1700
1701impl<'a, T> RecvGuard<'a, T> {
1702 fn clone_value(&self) -> Option<T>
1703 where
1704 T: Clone,
1705 {
1706 self.slot.val.clone()
1707 }
1708}
1709
1710impl<'a, T> Drop for RecvGuard<'a, T> {
1711 fn drop(&mut self) {
1712 // Decrement the remaining counter
1713 if 1 == self.slot.rem.fetch_sub(1, SeqCst) {
1714 self.slot.val = None;
1715 }
1716 }
1717}
1718
1719fn is_unpin<T: Unpin>() {}
1720
1721#[cfg(not(loom))]
1722#[cfg(test)]
1723mod tests {
1724 use super::*;
1725
1726 #[test]
1727 fn receiver_count_on_sender_constructor() {
1728 let sender = Sender::<i32>::new(16);
1729 assert_eq!(sender.receiver_count(), 0);
1730
1731 let rx_1 = sender.subscribe();
1732 assert_eq!(sender.receiver_count(), 1);
1733
1734 let rx_2 = rx_1.resubscribe();
1735 assert_eq!(sender.receiver_count(), 2);
1736
1737 let rx_3 = sender.subscribe();
1738 assert_eq!(sender.receiver_count(), 3);
1739
1740 drop(rx_3);
1741 drop(rx_1);
1742 assert_eq!(sender.receiver_count(), 1);
1743
1744 drop(rx_2);
1745 assert_eq!(sender.receiver_count(), 0);
1746 }
1747
1748 #[cfg(not(loom))]
1749 #[test]
1750 fn receiver_count_on_channel_constructor() {
1751 let (sender, rx) = channel::<i32>(16);
1752 assert_eq!(sender.receiver_count(), 1);
1753
1754 let _rx_2 = rx.resubscribe();
1755 assert_eq!(sender.receiver_count(), 2);
1756 }
1757}