async_broadcast/
lib.rs

1//! Async broadcast channel
2//!
3//! An async multi-producer multi-consumer broadcast channel, where each consumer gets a clone of every
4//! message sent on the channel. For obvious reasons, the channel can only be used to broadcast types
5//! that implement [`Clone`].
6//!
7//! A channel has the [`Sender`] and [`Receiver`] side. Both sides are cloneable and can be shared
8//! among multiple threads.
9//!
10//! When all `Sender`s or all `Receiver`s are dropped, the channel becomes closed. When a channel is
11//! closed, no more messages can be sent, but remaining messages can still be received.
12//!
13//! The channel can also be closed manually by calling [`Sender::close()`] or [`Receiver::close()`].
14//!
15//! ## Examples
16//!
17//! ```rust
18//! use async_broadcast::{broadcast, TryRecvError};
19//! use futures_lite::{future::block_on, stream::StreamExt};
20//!
21//! block_on(async move {
22//!     let (s1, mut r1) = broadcast(2);
23//!     let s2 = s1.clone();
24//!     let mut r2 = r1.clone();
25//!
26//!     // Send 2 messages from two different senders.
27//!     s1.broadcast(7).await.unwrap();
28//!     s2.broadcast(8).await.unwrap();
29//!
30//!     // Channel is now at capacity so sending more messages will result in an error.
31//!     assert!(s2.try_broadcast(9).unwrap_err().is_full());
32//!     assert!(s1.try_broadcast(10).unwrap_err().is_full());
33//!
34//!     // We can use `recv` method of the `Stream` implementation to receive messages.
35//!     assert_eq!(r1.next().await.unwrap(), 7);
36//!     assert_eq!(r1.recv().await.unwrap(), 8);
37//!     assert_eq!(r2.next().await.unwrap(), 7);
38//!     assert_eq!(r2.recv().await.unwrap(), 8);
39//!
40//!     // All receiver got all messages so channel is now empty.
41//!     assert_eq!(r1.try_recv(), Err(TryRecvError::Empty));
42//!     assert_eq!(r2.try_recv(), Err(TryRecvError::Empty));
43//!
44//!     // Drop both senders, which closes the channel.
45//!     drop(s1);
46//!     drop(s2);
47//!
48//!     assert_eq!(r1.try_recv(), Err(TryRecvError::Closed));
49//!     assert_eq!(r2.try_recv(), Err(TryRecvError::Closed));
50//! })
51//! ```
52//!
53//! ## Difference with `async-channel`
54//!
55//! This crate is similar to [`async-channel`] in that they both provide an MPMC channel but the
56//! main difference being that in `async-channel`, each message sent on the channel is only received
57//! by one of the receivers. `async-broadcast` on the other hand, delivers each message to every
58//! receiver (IOW broadcast) by cloning it for each receiver.
59//!
60//! [`async-channel`]: https://crates.io/crates/async-channel
61//!
62//! ## Difference with other broadcast crates
63//!
64//! * [`broadcaster`]: The main difference would be that `broadcaster` doesn't have a sender and
65//!   receiver split and both sides use clones of the same BroadcastChannel instance. The messages
66//!   are sent are sent to all channel clones. While this can work for many cases, the lack of
67//!   sender and receiver split, means that often times, you'll find yourself having to drain the
68//!   channel on the sending side yourself.
69//!
70//! * [`postage`]: this crate provides a [broadcast API][pba] similar to `async_broadcast`. However,
71//!   it:
72//!   - (at the time of this writing) duplicates [futures] API, which isn't ideal.
73//!   - Does not support overflow mode nor has the concept of inactive receivers, so a slow or
74//!     inactive receiver blocking the whole channel is not a solvable problem.
75//!   - Provides all kinds of channels, which is generally good but if you just need a broadcast
76//!     channel, `async_broadcast` is probably a better choice.
77//!
78//! * [`tokio::sync`]: Tokio's `sync` module provides a [broadcast channel][tbc] API. The differences
79//!    here are:
80//!   - While this implementation does provide [overflow mode][tom], it is the default behavior and not
81//!     opt-in.
82//!   - There is no equivalent of inactive receivers.
83//!   - While it's possible to build tokio with only the `sync` module, it comes with other APIs that
84//!     you may not need.
85//!
86//! [`broadcaster`]: https://crates.io/crates/broadcaster
87//! [`postage`]: https://crates.io/crates/postage
88//! [pba]: https://docs.rs/postage/0.4.1/postage/broadcast/fn.channel.html
89//! [futures]: https://crates.io/crates/futures
90//! [`tokio::sync`]: https://docs.rs/tokio/1.6.0/tokio/sync
91//! [tbc]: https://docs.rs/tokio/1.6.0/tokio/sync/broadcast/index.html
92//! [tom]: https://docs.rs/tokio/1.6.0/tokio/sync/broadcast/index.html#lagging
93//!
94#![forbid(unsafe_code)]
95#![deny(missing_debug_implementations, nonstandard_style, rust_2018_idioms)]
96#![warn(rustdoc::missing_doc_code_examples, unreachable_pub)]
97#![doc(
98    html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
99)]
100#![doc(
101    html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
102)]
103
104#[cfg(doctest)]
105mod doctests {
106    doc_comment::doctest!("../README.md");
107}
108
109use std::collections::VecDeque;
110use std::convert::TryInto;
111use std::error;
112use std::fmt;
113use std::future::Future;
114use std::marker::PhantomPinned;
115use std::pin::Pin;
116use std::sync::{Arc, RwLock};
117use std::task::{Context, Poll};
118
119use event_listener::{Event, EventListener};
120use event_listener_strategy::{easy_wrapper, EventListenerFuture};
121use futures_core::{ready, stream::Stream};
122use pin_project_lite::pin_project;
123
124/// Create a new broadcast channel.
125///
126/// The created channel has space to hold at most `cap` messages at a time.
127///
128/// # Panics
129///
130/// Capacity must be a positive number. If `cap` is zero, this function will panic.
131///
132/// # Examples
133///
134/// ```
135/// # futures_lite::future::block_on(async {
136/// use async_broadcast::{broadcast, TryRecvError, TrySendError};
137///
138/// let (s, mut r1) = broadcast(1);
139/// let mut r2 = r1.clone();
140///
141/// assert_eq!(s.broadcast(10).await, Ok(None));
142/// assert_eq!(s.try_broadcast(20), Err(TrySendError::Full(20)));
143///
144/// assert_eq!(r1.recv().await, Ok(10));
145/// assert_eq!(r2.recv().await, Ok(10));
146/// assert_eq!(r1.try_recv(), Err(TryRecvError::Empty));
147/// assert_eq!(r2.try_recv(), Err(TryRecvError::Empty));
148/// # });
149/// ```
150pub fn broadcast<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
151    assert!(cap > 0, "capacity cannot be zero");
152
153    let inner = Arc::new(RwLock::new(Inner {
154        queue: VecDeque::with_capacity(cap),
155        capacity: cap,
156        overflow: false,
157        await_active: true,
158        receiver_count: 1,
159        inactive_receiver_count: 0,
160        sender_count: 1,
161        head_pos: 0,
162        is_closed: false,
163        send_ops: Event::new(),
164        recv_ops: Event::new(),
165    }));
166
167    let s = Sender {
168        inner: inner.clone(),
169    };
170    let r = Receiver {
171        inner,
172        pos: 0,
173        listener: None,
174    };
175
176    (s, r)
177}
178
179#[derive(Debug)]
180struct Inner<T> {
181    queue: VecDeque<(T, usize)>,
182    // We assign the same capacity to the queue but that's just specifying the minimum capacity and
183    // the actual capacity could be anything. Hence the need to keep track of our own set capacity.
184    capacity: usize,
185    receiver_count: usize,
186    inactive_receiver_count: usize,
187    sender_count: usize,
188    /// Send sequence number of the front of the queue
189    head_pos: u64,
190    overflow: bool,
191    await_active: bool,
192
193    is_closed: bool,
194
195    /// Send operations waiting while the channel is full.
196    send_ops: Event,
197
198    /// Receive operations waiting while the channel is empty and not closed.
199    recv_ops: Event,
200}
201
202impl<T> Inner<T> {
203    /// Try receiving at the given position, returning either the element or a reference to it.
204    ///
205    /// Result is used here instead of Cow because we don't have a Clone bound on T.
206    fn try_recv_at(&mut self, pos: &mut u64) -> Result<Result<T, &T>, TryRecvError> {
207        let i = match pos.checked_sub(self.head_pos) {
208            Some(i) => i
209                .try_into()
210                .expect("Head position more than usize::MAX behind a receiver"),
211            None => {
212                let count = self.head_pos - *pos;
213                *pos = self.head_pos;
214                return Err(TryRecvError::Overflowed(count));
215            }
216        };
217
218        let last_waiter;
219        if let Some((_elt, waiters)) = self.queue.get_mut(i) {
220            *pos += 1;
221            *waiters -= 1;
222            last_waiter = *waiters == 0;
223        } else {
224            debug_assert_eq!(i, self.queue.len());
225            if self.is_closed {
226                return Err(TryRecvError::Closed);
227            } else {
228                return Err(TryRecvError::Empty);
229            }
230        }
231
232        // If we read from the front of the queue and this is the last receiver reading it
233        // we can pop the queue instead of cloning the message
234        if last_waiter {
235            // Only the first element of the queue should have 0 waiters
236            assert_eq!(i, 0);
237
238            // Remove the element from the queue, adjust space, and notify senders
239            let elt = self.queue.pop_front().unwrap().0;
240            self.head_pos += 1;
241            if !self.overflow {
242                // Notify 1 awaiting senders that there is now room. If there is still room in the
243                // queue, the notified operation will notify another awaiting sender.
244                self.send_ops.notify(1);
245            }
246
247            Ok(Ok(elt))
248        } else {
249            Ok(Err(&self.queue[i].0))
250        }
251    }
252
253    /// Closes the channel and notifies all waiting operations.
254    ///
255    /// Returns `true` if this call has closed the channel and it was not closed already.
256    fn close(&mut self) -> bool {
257        if self.is_closed {
258            return false;
259        }
260
261        self.is_closed = true;
262        // Notify all waiting senders and receivers.
263        self.send_ops.notify(usize::MAX);
264        self.recv_ops.notify(usize::MAX);
265
266        true
267    }
268
269    /// Set the channel capacity.
270    ///
271    /// There are times when you need to change the channel's capacity after creating it. If the
272    /// `new_cap` is less than the number of messages in the channel, the oldest messages will be
273    /// dropped to shrink the channel.
274    fn set_capacity(&mut self, new_cap: usize) {
275        self.capacity = new_cap;
276        if new_cap > self.queue.capacity() {
277            let diff = new_cap - self.queue.capacity();
278            self.queue.reserve(diff);
279        }
280
281        // Ensure queue doesn't have more than `new_cap` messages.
282        if new_cap < self.queue.len() {
283            let diff = self.queue.len() - new_cap;
284            self.queue.drain(0..diff);
285            self.head_pos += diff as u64;
286        }
287    }
288
289    /// Close the channel if there aren't any receivers present anymore
290    fn close_channel(&mut self) {
291        if self.receiver_count == 0 && self.inactive_receiver_count == 0 {
292            self.close();
293        }
294    }
295}
296
297/// The sending side of the broadcast channel.
298///
299/// Senders can be cloned and shared among threads. When all senders associated with a channel are
300/// dropped, the channel becomes closed.
301///
302/// The channel can also be closed manually by calling [`Sender::close()`].
303#[derive(Debug)]
304pub struct Sender<T> {
305    inner: Arc<RwLock<Inner<T>>>,
306}
307
308impl<T> Sender<T> {
309    /// Returns the channel capacity.
310    ///
311    /// # Examples
312    ///
313    /// ```
314    /// use async_broadcast::broadcast;
315    ///
316    /// let (s, r) = broadcast::<i32>(5);
317    /// assert_eq!(s.capacity(), 5);
318    /// ```
319    pub fn capacity(&self) -> usize {
320        self.inner.read().unwrap().capacity
321    }
322
323    /// Set the channel capacity.
324    ///
325    /// There are times when you need to change the channel's capacity after creating it. If the
326    /// `new_cap` is less than the number of messages in the channel, the oldest messages will be
327    /// dropped to shrink the channel.
328    ///
329    /// # Examples
330    ///
331    /// ```
332    /// use async_broadcast::{broadcast, TrySendError, TryRecvError};
333    ///
334    /// let (mut s, mut r) = broadcast::<i32>(3);
335    /// assert_eq!(s.capacity(), 3);
336    /// s.try_broadcast(1).unwrap();
337    /// s.try_broadcast(2).unwrap();
338    /// s.try_broadcast(3).unwrap();
339    ///
340    /// s.set_capacity(1);
341    /// assert_eq!(s.capacity(), 1);
342    /// assert_eq!(r.try_recv(), Err(TryRecvError::Overflowed(2)));
343    /// assert_eq!(r.try_recv().unwrap(), 3);
344    /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
345    /// s.try_broadcast(1).unwrap();
346    /// assert_eq!(s.try_broadcast(2), Err(TrySendError::Full(2)));
347    ///
348    /// s.set_capacity(2);
349    /// assert_eq!(s.capacity(), 2);
350    /// s.try_broadcast(2).unwrap();
351    /// assert_eq!(s.try_broadcast(2), Err(TrySendError::Full(2)));
352    /// ```
353    pub fn set_capacity(&mut self, new_cap: usize) {
354        self.inner.write().unwrap().set_capacity(new_cap);
355    }
356
357    /// If overflow mode is enabled on this channel.
358    ///
359    /// # Examples
360    ///
361    /// ```
362    /// use async_broadcast::broadcast;
363    ///
364    /// let (s, r) = broadcast::<i32>(5);
365    /// assert!(!s.overflow());
366    /// ```
367    pub fn overflow(&self) -> bool {
368        self.inner.read().unwrap().overflow
369    }
370
371    /// Set overflow mode on the channel.
372    ///
373    /// When overflow mode is set, broadcasting to the channel will succeed even if the channel is
374    /// full. It achieves that by removing the oldest message from the channel.
375    ///
376    /// # Examples
377    ///
378    /// ```
379    /// use async_broadcast::{broadcast, TrySendError, TryRecvError};
380    ///
381    /// let (mut s, mut r) = broadcast::<i32>(2);
382    /// s.try_broadcast(1).unwrap();
383    /// s.try_broadcast(2).unwrap();
384    /// assert_eq!(s.try_broadcast(3), Err(TrySendError::Full(3)));
385    /// s.set_overflow(true);
386    /// assert_eq!(s.try_broadcast(3).unwrap(), Some(1));
387    /// assert_eq!(s.try_broadcast(4).unwrap(), Some(2));
388    ///
389    /// assert_eq!(r.try_recv(), Err(TryRecvError::Overflowed(2)));
390    /// assert_eq!(r.try_recv().unwrap(), 3);
391    /// assert_eq!(r.try_recv().unwrap(), 4);
392    /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
393    /// ```
394    pub fn set_overflow(&mut self, overflow: bool) {
395        self.inner.write().unwrap().overflow = overflow;
396    }
397
398    /// If sender will wait for active receivers.
399    ///
400    /// If set to `false`, [`Send`] will resolve immediately with a [`SendError`]. Defaults to
401    /// `true`.
402    ///
403    /// # Examples
404    ///
405    /// ```
406    /// use async_broadcast::broadcast;
407    ///
408    /// let (s, _) = broadcast::<i32>(5);
409    /// assert!(s.await_active());
410    /// ```
411    pub fn await_active(&self) -> bool {
412        self.inner.read().unwrap().await_active
413    }
414
415    /// Specify if sender will wait for active receivers.
416    ///
417    /// If set to `false`, [`Send`] will resolve immediately with a [`SendError`]. Defaults to
418    /// `true`.
419    ///
420    /// # Examples
421    ///
422    /// ```
423    /// # futures_lite::future::block_on(async {
424    /// use async_broadcast::broadcast;
425    ///
426    /// let (mut s, mut r) = broadcast::<i32>(2);
427    /// s.broadcast(1).await.unwrap();
428    ///
429    /// let _ = r.deactivate();
430    /// s.set_await_active(false);
431    /// assert!(s.broadcast(2).await.is_err());
432    /// # });
433    /// ```
434    pub fn set_await_active(&mut self, await_active: bool) {
435        self.inner.write().unwrap().await_active = await_active;
436    }
437
438    /// Closes the channel.
439    ///
440    /// Returns `true` if this call has closed the channel and it was not closed already.
441    ///
442    /// The remaining messages can still be received.
443    ///
444    /// # Examples
445    ///
446    /// ```
447    /// # futures_lite::future::block_on(async {
448    /// use async_broadcast::{broadcast, RecvError};
449    ///
450    /// let (s, mut r) = broadcast(1);
451    /// s.broadcast(1).await.unwrap();
452    /// assert!(s.close());
453    ///
454    /// assert_eq!(r.recv().await.unwrap(), 1);
455    /// assert_eq!(r.recv().await, Err(RecvError::Closed));
456    /// # });
457    /// ```
458    pub fn close(&self) -> bool {
459        self.inner.write().unwrap().close()
460    }
461
462    /// Returns `true` if the channel is closed.
463    ///
464    /// # Examples
465    ///
466    /// ```
467    /// # futures_lite::future::block_on(async {
468    /// use async_broadcast::{broadcast, RecvError};
469    ///
470    /// let (s, r) = broadcast::<()>(1);
471    /// assert!(!s.is_closed());
472    ///
473    /// drop(r);
474    /// assert!(s.is_closed());
475    /// # });
476    /// ```
477    pub fn is_closed(&self) -> bool {
478        self.inner.read().unwrap().is_closed
479    }
480
481    /// Returns `true` if the channel is empty.
482    ///
483    /// # Examples
484    ///
485    /// ```
486    /// # futures_lite::future::block_on(async {
487    /// use async_broadcast::broadcast;
488    ///
489    /// let (s, r) = broadcast(1);
490    ///
491    /// assert!(s.is_empty());
492    /// s.broadcast(1).await;
493    /// assert!(!s.is_empty());
494    /// # });
495    /// ```
496    pub fn is_empty(&self) -> bool {
497        self.inner.read().unwrap().queue.is_empty()
498    }
499
500    /// Returns `true` if the channel is full.
501    ///
502    /// # Examples
503    ///
504    /// ```
505    /// # futures_lite::future::block_on(async {
506    /// use async_broadcast::broadcast;
507    ///
508    /// let (s, r) = broadcast(1);
509    ///
510    /// assert!(!s.is_full());
511    /// s.broadcast(1).await;
512    /// assert!(s.is_full());
513    /// # });
514    /// ```
515    pub fn is_full(&self) -> bool {
516        let inner = self.inner.read().unwrap();
517
518        inner.queue.len() == inner.capacity
519    }
520
521    /// Returns the number of messages in the channel.
522    ///
523    /// # Examples
524    ///
525    /// ```
526    /// # futures_lite::future::block_on(async {
527    /// use async_broadcast::broadcast;
528    ///
529    /// let (s, r) = broadcast(2);
530    /// assert_eq!(s.len(), 0);
531    ///
532    /// s.broadcast(1).await;
533    /// s.broadcast(2).await;
534    /// assert_eq!(s.len(), 2);
535    /// # });
536    /// ```
537    pub fn len(&self) -> usize {
538        self.inner.read().unwrap().queue.len()
539    }
540
541    /// Returns the number of receivers for the channel.
542    ///
543    /// This does not include inactive receivers. Use [`Sender::inactive_receiver_count`] if you
544    /// are interested in that.
545    ///
546    /// # Examples
547    ///
548    /// ```
549    /// use async_broadcast::broadcast;
550    ///
551    /// let (s, r) = broadcast::<()>(1);
552    /// assert_eq!(s.receiver_count(), 1);
553    /// let r = r.deactivate();
554    /// assert_eq!(s.receiver_count(), 0);
555    ///
556    /// let r2 = r.activate_cloned();
557    /// assert_eq!(r.receiver_count(), 1);
558    /// assert_eq!(r.inactive_receiver_count(), 1);
559    /// ```
560    pub fn receiver_count(&self) -> usize {
561        self.inner.read().unwrap().receiver_count
562    }
563
564    /// Returns the number of inactive receivers for the channel.
565    ///
566    /// # Examples
567    ///
568    /// ```
569    /// use async_broadcast::broadcast;
570    ///
571    /// let (s, r) = broadcast::<()>(1);
572    /// assert_eq!(s.receiver_count(), 1);
573    /// let r = r.deactivate();
574    /// assert_eq!(s.receiver_count(), 0);
575    ///
576    /// let r2 = r.activate_cloned();
577    /// assert_eq!(r.receiver_count(), 1);
578    /// assert_eq!(r.inactive_receiver_count(), 1);
579    /// ```
580    pub fn inactive_receiver_count(&self) -> usize {
581        self.inner.read().unwrap().inactive_receiver_count
582    }
583
584    /// Returns the number of senders for the channel.
585    ///
586    /// # Examples
587    ///
588    /// ```
589    /// # futures_lite::future::block_on(async {
590    /// use async_broadcast::broadcast;
591    ///
592    /// let (s, r) = broadcast::<()>(1);
593    /// assert_eq!(s.sender_count(), 1);
594    ///
595    /// let s2 = s.clone();
596    /// assert_eq!(s.sender_count(), 2);
597    /// # });
598    /// ```
599    pub fn sender_count(&self) -> usize {
600        self.inner.read().unwrap().sender_count
601    }
602
603    /// Produce a new Receiver for this channel.
604    ///
605    /// The new receiver starts with zero messages available.  This will not re-open the channel if
606    /// it was closed due to all receivers being dropped.
607    ///
608    /// # Examples
609    ///
610    /// ```
611    /// # futures_lite::future::block_on(async {
612    /// use async_broadcast::{broadcast, RecvError};
613    ///
614    /// let (s, mut r1) = broadcast(2);
615    ///
616    /// assert_eq!(s.broadcast(1).await, Ok(None));
617    ///
618    /// let mut r2 = s.new_receiver();
619    ///
620    /// assert_eq!(s.broadcast(2).await, Ok(None));
621    /// drop(s);
622    ///
623    /// assert_eq!(r1.recv().await, Ok(1));
624    /// assert_eq!(r1.recv().await, Ok(2));
625    /// assert_eq!(r1.recv().await, Err(RecvError::Closed));
626    ///
627    /// assert_eq!(r2.recv().await, Ok(2));
628    /// assert_eq!(r2.recv().await, Err(RecvError::Closed));
629    /// # });
630    /// ```
631    pub fn new_receiver(&self) -> Receiver<T> {
632        let mut inner = self.inner.write().unwrap();
633        inner.receiver_count += 1;
634        Receiver {
635            inner: self.inner.clone(),
636            pos: inner.head_pos + inner.queue.len() as u64,
637            listener: None,
638        }
639    }
640}
641
642impl<T: Clone> Sender<T> {
643    /// Broadcasts a message on the channel.
644    ///
645    /// If the channel is full, this method waits until there is space for a message unless:
646    ///
647    /// 1. overflow mode (set through [`Sender::set_overflow`]) is enabled, in which case it removes
648    ///    the oldest message from the channel to make room for the new message. The removed message
649    ///    is returned to the caller.
650    /// 2. this behavior is disabled using [`Sender::set_await_active`], in which case, it returns
651    ///    [`SendError`] immediately.
652    ///
653    /// If the channel is closed, this method returns an error.
654    ///
655    /// The future returned by this function is pinned to the heap. If the future being `Unpin` is
656    /// not important to you, or if you just `.await` this future, use the [`broadcast_direct`] method
657    /// instead.
658    ///
659    /// # Examples
660    ///
661    /// ```
662    /// # futures_lite::future::block_on(async {
663    /// use async_broadcast::{broadcast, SendError};
664    ///
665    /// let (s, r) = broadcast(1);
666    ///
667    /// assert_eq!(s.broadcast(1).await, Ok(None));
668    /// drop(r);
669    /// assert_eq!(s.broadcast(2).await, Err(SendError(2)));
670    /// # });
671    /// ```
672    pub fn broadcast(&self, msg: T) -> Pin<Box<Send<'_, T>>> {
673        Box::pin(self.broadcast_direct(msg))
674    }
675
676    /// Broadcasts a message on the channel without pinning the future to the heap.
677    ///
678    /// The future returned by this method is not `Unpin` and must be pinned before use. This is
679    /// the desired behavior if you just `.await` on the future. For other uses cases, use the
680    /// [`broadcast`] method instead.
681    ///
682    /// # Examples
683    ///
684    /// ```
685    /// # futures_lite::future::block_on(async {
686    /// use async_broadcast::{broadcast, SendError};
687    ///
688    /// let (s, r) = broadcast(1);
689    ///
690    /// assert_eq!(s.broadcast_direct(1).await, Ok(None));
691    /// drop(r);
692    /// assert_eq!(s.broadcast_direct(2).await, Err(SendError(2)));
693    /// # });
694    /// ```
695    pub fn broadcast_direct(&self, msg: T) -> Send<'_, T> {
696        Send::_new(SendInner {
697            sender: self,
698            listener: None,
699            msg: Some(msg),
700            _pin: PhantomPinned,
701        })
702    }
703
704    /// Attempts to broadcast a message on the channel.
705    ///
706    /// If the channel is full, this method returns an error unless overflow mode (set through
707    /// [`Sender::set_overflow`]) is enabled. If the overflow mode is enabled, it removes the
708    /// oldest message from the channel to make room for the new message. The removed message
709    /// is returned to the caller.
710    ///
711    /// If the channel is closed, this method returns an error.
712    ///
713    /// # Examples
714    ///
715    /// ```
716    /// use async_broadcast::{broadcast, TrySendError};
717    ///
718    /// let (s, r) = broadcast(1);
719    ///
720    /// assert_eq!(s.try_broadcast(1), Ok(None));
721    /// assert_eq!(s.try_broadcast(2), Err(TrySendError::Full(2)));
722    ///
723    /// drop(r);
724    /// assert_eq!(s.try_broadcast(3), Err(TrySendError::Closed(3)));
725    /// ```
726    pub fn try_broadcast(&self, msg: T) -> Result<Option<T>, TrySendError<T>> {
727        let mut ret = None;
728        let mut inner = self.inner.write().unwrap();
729
730        if inner.is_closed {
731            return Err(TrySendError::Closed(msg));
732        } else if inner.receiver_count == 0 {
733            assert!(inner.inactive_receiver_count != 0);
734
735            return Err(TrySendError::Inactive(msg));
736        } else if inner.queue.len() == inner.capacity {
737            if inner.overflow {
738                // Make room by popping a message.
739                ret = inner.queue.pop_front().map(|(m, _)| m);
740            } else {
741                return Err(TrySendError::Full(msg));
742            }
743        }
744        let receiver_count = inner.receiver_count;
745        inner.queue.push_back((msg, receiver_count));
746        if ret.is_some() {
747            inner.head_pos += 1;
748        }
749
750        // Notify all awaiting receive operations.
751        inner.recv_ops.notify(usize::MAX);
752
753        Ok(ret)
754    }
755}
756
757impl<T> Drop for Sender<T> {
758    fn drop(&mut self) {
759        let mut inner = self.inner.write().unwrap();
760
761        inner.sender_count -= 1;
762
763        if inner.sender_count == 0 {
764            inner.close();
765        }
766    }
767}
768
769impl<T> Clone for Sender<T> {
770    fn clone(&self) -> Self {
771        self.inner.write().unwrap().sender_count += 1;
772
773        Sender {
774            inner: self.inner.clone(),
775        }
776    }
777}
778
779/// The receiving side of a channel.
780///
781/// Receivers can be cloned and shared among threads. When all (active) receivers associated with a
782/// channel are dropped, the channel becomes closed. You can deactivate a receiver using
783/// [`Receiver::deactivate`] if you would like the channel to remain open without keeping active
784/// receivers around.
785#[derive(Debug)]
786pub struct Receiver<T> {
787    inner: Arc<RwLock<Inner<T>>>,
788    pos: u64,
789
790    /// Listens for a send or close event to unblock this stream.
791    listener: Option<EventListener>,
792}
793
794impl<T> Receiver<T> {
795    /// Returns the channel capacity.
796    ///
797    /// # Examples
798    ///
799    /// ```
800    /// use async_broadcast::broadcast;
801    ///
802    /// let (_s, r) = broadcast::<i32>(5);
803    /// assert_eq!(r.capacity(), 5);
804    /// ```
805    pub fn capacity(&self) -> usize {
806        self.inner.read().unwrap().capacity
807    }
808
809    /// Set the channel capacity.
810    ///
811    /// There are times when you need to change the channel's capacity after creating it. If the
812    /// `new_cap` is less than the number of messages in the channel, the oldest messages will be
813    /// dropped to shrink the channel.
814    ///
815    /// # Examples
816    ///
817    /// ```
818    /// use async_broadcast::{broadcast, TrySendError, TryRecvError};
819    ///
820    /// let (s, mut r) = broadcast::<i32>(3);
821    /// assert_eq!(r.capacity(), 3);
822    /// s.try_broadcast(1).unwrap();
823    /// s.try_broadcast(2).unwrap();
824    /// s.try_broadcast(3).unwrap();
825    ///
826    /// r.set_capacity(1);
827    /// assert_eq!(r.capacity(), 1);
828    /// assert_eq!(r.try_recv(), Err(TryRecvError::Overflowed(2)));
829    /// assert_eq!(r.try_recv().unwrap(), 3);
830    /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
831    /// s.try_broadcast(1).unwrap();
832    /// assert_eq!(s.try_broadcast(2), Err(TrySendError::Full(2)));
833    ///
834    /// r.set_capacity(2);
835    /// assert_eq!(r.capacity(), 2);
836    /// s.try_broadcast(2).unwrap();
837    /// assert_eq!(s.try_broadcast(2), Err(TrySendError::Full(2)));
838    /// ```
839    pub fn set_capacity(&mut self, new_cap: usize) {
840        self.inner.write().unwrap().set_capacity(new_cap);
841    }
842
843    /// If overflow mode is enabled on this channel.
844    ///
845    /// # Examples
846    ///
847    /// ```
848    /// use async_broadcast::broadcast;
849    ///
850    /// let (_s, r) = broadcast::<i32>(5);
851    /// assert!(!r.overflow());
852    /// ```
853    pub fn overflow(&self) -> bool {
854        self.inner.read().unwrap().overflow
855    }
856
857    /// Set overflow mode on the channel.
858    ///
859    /// When overflow mode is set, broadcasting to the channel will succeed even if the channel is
860    /// full. It achieves that by removing the oldest message from the channel.
861    ///
862    /// # Examples
863    ///
864    /// ```
865    /// use async_broadcast::{broadcast, TrySendError, TryRecvError};
866    ///
867    /// let (s, mut r) = broadcast::<i32>(2);
868    /// s.try_broadcast(1).unwrap();
869    /// s.try_broadcast(2).unwrap();
870    /// assert_eq!(s.try_broadcast(3), Err(TrySendError::Full(3)));
871    /// r.set_overflow(true);
872    /// assert_eq!(s.try_broadcast(3).unwrap(), Some(1));
873    /// assert_eq!(s.try_broadcast(4).unwrap(), Some(2));
874    ///
875    /// assert_eq!(r.try_recv(), Err(TryRecvError::Overflowed(2)));
876    /// assert_eq!(r.try_recv().unwrap(), 3);
877    /// assert_eq!(r.try_recv().unwrap(), 4);
878    /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
879    /// ```
880    pub fn set_overflow(&mut self, overflow: bool) {
881        self.inner.write().unwrap().overflow = overflow;
882    }
883
884    /// If sender will wait for active receivers.
885    ///
886    /// If set to `false`, [`Send`] will resolve immediately with a [`SendError`]. Defaults to
887    /// `true`.
888    ///
889    /// # Examples
890    ///
891    /// ```
892    /// use async_broadcast::broadcast;
893    ///
894    /// let (_, r) = broadcast::<i32>(5);
895    /// assert!(r.await_active());
896    /// ```
897    pub fn await_active(&self) -> bool {
898        self.inner.read().unwrap().await_active
899    }
900
901    /// Specify if sender will wait for active receivers.
902    ///
903    /// If set to `false`, [`Send`] will resolve immediately with a [`SendError`]. Defaults to
904    /// `true`.
905    ///
906    /// # Examples
907    ///
908    /// ```
909    /// # futures_lite::future::block_on(async {
910    /// use async_broadcast::broadcast;
911    ///
912    /// let (s, mut r) = broadcast::<i32>(2);
913    /// s.broadcast(1).await.unwrap();
914    ///
915    /// r.set_await_active(false);
916    /// let _ = r.deactivate();
917    /// assert!(s.broadcast(2).await.is_err());
918    /// # });
919    /// ```
920    pub fn set_await_active(&mut self, await_active: bool) {
921        self.inner.write().unwrap().await_active = await_active;
922    }
923
924    /// Closes the channel.
925    ///
926    /// Returns `true` if this call has closed the channel and it was not closed already.
927    ///
928    /// The remaining messages can still be received.
929    ///
930    /// # Examples
931    ///
932    /// ```
933    /// # futures_lite::future::block_on(async {
934    /// use async_broadcast::{broadcast, RecvError};
935    ///
936    /// let (s, mut r) = broadcast(1);
937    /// s.broadcast(1).await.unwrap();
938    /// assert!(s.close());
939    ///
940    /// assert_eq!(r.recv().await.unwrap(), 1);
941    /// assert_eq!(r.recv().await, Err(RecvError::Closed));
942    /// # });
943    /// ```
944    pub fn close(&self) -> bool {
945        self.inner.write().unwrap().close()
946    }
947
948    /// Returns `true` if the channel is closed.
949    ///
950    /// # Examples
951    ///
952    /// ```
953    /// # futures_lite::future::block_on(async {
954    /// use async_broadcast::{broadcast, RecvError};
955    ///
956    /// let (s, r) = broadcast::<()>(1);
957    /// assert!(!s.is_closed());
958    ///
959    /// drop(r);
960    /// assert!(s.is_closed());
961    /// # });
962    /// ```
963    pub fn is_closed(&self) -> bool {
964        self.inner.read().unwrap().is_closed
965    }
966
967    /// Returns `true` if the channel is empty.
968    ///
969    /// # Examples
970    ///
971    /// ```
972    /// # futures_lite::future::block_on(async {
973    /// use async_broadcast::broadcast;
974    ///
975    /// let (s, r) = broadcast(1);
976    ///
977    /// assert!(s.is_empty());
978    /// s.broadcast(1).await;
979    /// assert!(!s.is_empty());
980    /// # });
981    /// ```
982    pub fn is_empty(&self) -> bool {
983        self.inner.read().unwrap().queue.is_empty()
984    }
985
986    /// Returns `true` if the channel is full.
987    ///
988    /// # Examples
989    ///
990    /// ```
991    /// # futures_lite::future::block_on(async {
992    /// use async_broadcast::broadcast;
993    ///
994    /// let (s, r) = broadcast(1);
995    ///
996    /// assert!(!s.is_full());
997    /// s.broadcast(1).await;
998    /// assert!(s.is_full());
999    /// # });
1000    /// ```
1001    pub fn is_full(&self) -> bool {
1002        let inner = self.inner.read().unwrap();
1003
1004        inner.queue.len() == inner.capacity
1005    }
1006
1007    /// Returns the number of messages in the channel.
1008    ///
1009    /// # Examples
1010    ///
1011    /// ```
1012    /// # futures_lite::future::block_on(async {
1013    /// use async_broadcast::broadcast;
1014    ///
1015    /// let (s, r) = broadcast(2);
1016    /// assert_eq!(s.len(), 0);
1017    ///
1018    /// s.broadcast(1).await;
1019    /// s.broadcast(2).await;
1020    /// assert_eq!(s.len(), 2);
1021    /// # });
1022    /// ```
1023    pub fn len(&self) -> usize {
1024        self.inner.read().unwrap().queue.len()
1025    }
1026
1027    /// Returns the number of receivers for the channel.
1028    ///
1029    /// This does not include inactive receivers. Use [`Receiver::inactive_receiver_count`] if you
1030    /// are interested in that.
1031    ///
1032    /// # Examples
1033    ///
1034    /// ```
1035    /// use async_broadcast::broadcast;
1036    ///
1037    /// let (s, r) = broadcast::<()>(1);
1038    /// assert_eq!(s.receiver_count(), 1);
1039    /// let r = r.deactivate();
1040    /// assert_eq!(s.receiver_count(), 0);
1041    ///
1042    /// let r2 = r.activate_cloned();
1043    /// assert_eq!(r.receiver_count(), 1);
1044    /// assert_eq!(r.inactive_receiver_count(), 1);
1045    /// ```
1046    pub fn receiver_count(&self) -> usize {
1047        self.inner.read().unwrap().receiver_count
1048    }
1049
1050    /// Returns the number of inactive receivers for the channel.
1051    ///
1052    /// # Examples
1053    ///
1054    /// ```
1055    /// use async_broadcast::broadcast;
1056    ///
1057    /// let (s, r) = broadcast::<()>(1);
1058    /// assert_eq!(s.receiver_count(), 1);
1059    /// let r = r.deactivate();
1060    /// assert_eq!(s.receiver_count(), 0);
1061    ///
1062    /// let r2 = r.activate_cloned();
1063    /// assert_eq!(r.receiver_count(), 1);
1064    /// assert_eq!(r.inactive_receiver_count(), 1);
1065    /// ```
1066    pub fn inactive_receiver_count(&self) -> usize {
1067        self.inner.read().unwrap().inactive_receiver_count
1068    }
1069
1070    /// Returns the number of senders for the channel.
1071    ///
1072    /// # Examples
1073    ///
1074    /// ```
1075    /// # futures_lite::future::block_on(async {
1076    /// use async_broadcast::broadcast;
1077    ///
1078    /// let (s, r) = broadcast::<()>(1);
1079    /// assert_eq!(s.sender_count(), 1);
1080    ///
1081    /// let s2 = s.clone();
1082    /// assert_eq!(s.sender_count(), 2);
1083    /// # });
1084    /// ```
1085    pub fn sender_count(&self) -> usize {
1086        self.inner.read().unwrap().sender_count
1087    }
1088
1089    /// Downgrade to a [`InactiveReceiver`].
1090    ///
1091    /// An inactive receiver is one that can not and does not receive any messages. Its only purpose
1092    /// is keep the associated channel open even when there are no (active) receivers. An inactive
1093    /// receiver can be upgraded into a [`Receiver`] using [`InactiveReceiver::activate`] or
1094    /// [`InactiveReceiver::activate_cloned`].
1095    ///
1096    /// [`Sender::try_broadcast`] will return [`TrySendError::Inactive`] if only inactive
1097    /// receivers exists for the associated channel and [`Sender::broadcast`] will wait until an
1098    /// active receiver is available.
1099    ///
1100    /// # Examples
1101    ///
1102    /// ```
1103    /// # futures_lite::future::block_on(async {
1104    /// use async_broadcast::{broadcast, TrySendError};
1105    ///
1106    /// let (s, r) = broadcast(1);
1107    /// let inactive = r.deactivate();
1108    /// assert_eq!(s.try_broadcast(10), Err(TrySendError::Inactive(10)));
1109    ///
1110    /// let mut r = inactive.activate();
1111    /// assert_eq!(s.broadcast(10).await, Ok(None));
1112    /// assert_eq!(r.recv().await, Ok(10));
1113    /// # });
1114    /// ```
1115    pub fn deactivate(self) -> InactiveReceiver<T> {
1116        // Drop::drop impl of Receiver will take care of `receiver_count`.
1117        self.inner.write().unwrap().inactive_receiver_count += 1;
1118
1119        InactiveReceiver {
1120            inner: self.inner.clone(),
1121        }
1122    }
1123}
1124
1125impl<T: Clone> Receiver<T> {
1126    /// Receives a message from the channel.
1127    ///
1128    /// If the channel is empty, this method waits until there is a message.
1129    ///
1130    /// If the channel is closed, this method receives a message or returns an error if there are
1131    /// no more messages.
1132    ///
1133    /// If this receiver has missed a message (only possible if overflow mode is enabled), then
1134    /// this method returns an error and readjusts its cursor to point to the first available
1135    /// message.
1136    ///
1137    /// The future returned by this function is pinned to the heap. If the future being `Unpin` is
1138    /// not important to you, or if you just `.await` this future, use the [`recv_direct`] method
1139    /// instead.
1140    ///
1141    /// # Examples
1142    ///
1143    /// ```
1144    /// # futures_lite::future::block_on(async {
1145    /// use async_broadcast::{broadcast, RecvError};
1146    ///
1147    /// let (s, mut r1) = broadcast(1);
1148    /// let mut r2 = r1.clone();
1149    ///
1150    /// assert_eq!(s.broadcast(1).await, Ok(None));
1151    /// drop(s);
1152    ///
1153    /// assert_eq!(r1.recv().await, Ok(1));
1154    /// assert_eq!(r1.recv().await, Err(RecvError::Closed));
1155    /// assert_eq!(r2.recv().await, Ok(1));
1156    /// assert_eq!(r2.recv().await, Err(RecvError::Closed));
1157    /// # });
1158    /// ```
1159    pub fn recv(&mut self) -> Pin<Box<Recv<'_, T>>> {
1160        Box::pin(self.recv_direct())
1161    }
1162
1163    /// Receives a message from the channel without pinning the future to the heap.
1164    ///
1165    /// The future returned by this method is not `Unpin` and must be pinned before use. This is
1166    /// the desired behavior if you just `.await` on the future. For other uses cases, use the
1167    /// [`recv`] method instead.
1168    ///
1169    /// # Examples
1170    ///
1171    /// ```
1172    /// # futures_lite::future::block_on(async {
1173    /// use async_broadcast::{broadcast, RecvError};
1174    ///
1175    /// let (s, mut r1) = broadcast(1);
1176    /// let mut r2 = r1.clone();
1177    ///
1178    /// assert_eq!(s.broadcast(1).await, Ok(None));
1179    /// drop(s);
1180    ///
1181    /// assert_eq!(r1.recv_direct().await, Ok(1));
1182    /// assert_eq!(r1.recv_direct().await, Err(RecvError::Closed));
1183    /// assert_eq!(r2.recv_direct().await, Ok(1));
1184    /// assert_eq!(r2.recv_direct().await, Err(RecvError::Closed));
1185    /// # });
1186    /// ```
1187    pub fn recv_direct(&mut self) -> Recv<'_, T> {
1188        Recv::_new(RecvInner {
1189            receiver: self,
1190            listener: None,
1191            _pin: PhantomPinned,
1192        })
1193    }
1194
1195    /// Attempts to receive a message from the channel.
1196    ///
1197    /// If the channel is empty or closed, this method returns an error.
1198    ///
1199    /// If this receiver has missed a message (only possible if overflow mode is enabled), then
1200    /// this method returns an error and readjusts its cursor to point to the first available
1201    /// message.
1202    ///
1203    /// # Examples
1204    ///
1205    /// ```
1206    /// # futures_lite::future::block_on(async {
1207    /// use async_broadcast::{broadcast, TryRecvError};
1208    ///
1209    /// let (s, mut r1) = broadcast(1);
1210    /// let mut r2 = r1.clone();
1211    /// assert_eq!(s.broadcast(1).await, Ok(None));
1212    ///
1213    /// assert_eq!(r1.try_recv(), Ok(1));
1214    /// assert_eq!(r1.try_recv(), Err(TryRecvError::Empty));
1215    /// assert_eq!(r2.try_recv(), Ok(1));
1216    /// assert_eq!(r2.try_recv(), Err(TryRecvError::Empty));
1217    ///
1218    /// drop(s);
1219    /// assert_eq!(r1.try_recv(), Err(TryRecvError::Closed));
1220    /// assert_eq!(r2.try_recv(), Err(TryRecvError::Closed));
1221    /// # });
1222    /// ```
1223    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
1224        self.inner
1225            .write()
1226            .unwrap()
1227            .try_recv_at(&mut self.pos)
1228            .map(|cow| cow.unwrap_or_else(T::clone))
1229    }
1230
1231    /// Produce a new Sender for this channel.
1232    ///
1233    /// This will not re-open the channel if it was closed due to all senders being dropped.
1234    ///
1235    /// # Examples
1236    ///
1237    /// ```
1238    /// # futures_lite::future::block_on(async {
1239    /// use async_broadcast::{broadcast, RecvError};
1240    ///
1241    /// let (s1, mut r) = broadcast(2);
1242    ///
1243    /// assert_eq!(s1.broadcast(1).await, Ok(None));
1244    ///
1245    /// let mut s2 = r.new_sender();
1246    ///
1247    /// assert_eq!(s2.broadcast(2).await, Ok(None));
1248    /// drop(s1);
1249    /// drop(s2);
1250    ///
1251    /// assert_eq!(r.recv().await, Ok(1));
1252    /// assert_eq!(r.recv().await, Ok(2));
1253    /// assert_eq!(r.recv().await, Err(RecvError::Closed));
1254    /// # });
1255    /// ```
1256    pub fn new_sender(&self) -> Sender<T> {
1257        self.inner.write().unwrap().sender_count += 1;
1258
1259        Sender {
1260            inner: self.inner.clone(),
1261        }
1262    }
1263
1264    /// Produce a new Receiver for this channel.
1265    ///
1266    /// Unlike [`Receiver::clone`], this method creates a new receiver that starts with zero
1267    /// messages available.  This is slightly faster than a real clone.
1268    ///
1269    /// # Examples
1270    ///
1271    /// ```
1272    /// # futures_lite::future::block_on(async {
1273    /// use async_broadcast::{broadcast, RecvError};
1274    ///
1275    /// let (s, mut r1) = broadcast(2);
1276    ///
1277    /// assert_eq!(s.broadcast(1).await, Ok(None));
1278    ///
1279    /// let mut r2 = r1.new_receiver();
1280    ///
1281    /// assert_eq!(s.broadcast(2).await, Ok(None));
1282    /// drop(s);
1283    ///
1284    /// assert_eq!(r1.recv().await, Ok(1));
1285    /// assert_eq!(r1.recv().await, Ok(2));
1286    /// assert_eq!(r1.recv().await, Err(RecvError::Closed));
1287    ///
1288    /// assert_eq!(r2.recv().await, Ok(2));
1289    /// assert_eq!(r2.recv().await, Err(RecvError::Closed));
1290    /// # });
1291    /// ```
1292    pub fn new_receiver(&self) -> Self {
1293        let mut inner = self.inner.write().unwrap();
1294        inner.receiver_count += 1;
1295        Receiver {
1296            inner: self.inner.clone(),
1297            pos: inner.head_pos + inner.queue.len() as u64,
1298            listener: None,
1299        }
1300    }
1301
1302    /// A low level poll method that is similar to [`Receiver::recv()`] or
1303    /// [`Receiver::recv_direct()`], and can be useful for building stream implementations which
1304    /// use a [`Receiver`] under the hood and want to know if the stream has overflowed.
1305    ///
1306    /// Prefer to use [`Receiver::recv()`] or [`Receiver::recv_direct()`] when otherwise possible.
1307    ///
1308    /// # Errors
1309    ///
1310    /// If the number of messages that have been sent has overflowed the channel capacity, a
1311    /// [`RecvError::Overflowed`] variant is returned containing the number of items that
1312    /// overflowed and were lost.
1313    ///
1314    /// # Examples
1315    ///
1316    /// This example shows how the [`Receiver::poll_recv`] method can be used to allow a custom
1317    /// stream implementation to internally make use of a [`Receiver`]. This example implementation
1318    /// differs from the stream implementation of [`Receiver`] because it returns an error if
1319    /// the channel capacity overflows, which the built in [`Receiver`] stream doesn't do.
1320    ///
1321    /// ```
1322    /// use futures_core::Stream;
1323    /// use async_broadcast::{Receiver, RecvError};
1324    /// use std::{pin::Pin, task::{Poll, Context}};
1325    ///
1326    /// struct MyStream(Receiver<i32>);
1327    ///
1328    /// impl futures_core::Stream for MyStream {
1329    ///     type Item = Result<i32, RecvError>;
1330    ///     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1331    ///         Pin::new(&mut self.0).poll_recv(cx)
1332    ///     }
1333    /// }
1334    /// ```
1335    pub fn poll_recv(
1336        mut self: Pin<&mut Self>,
1337        cx: &mut Context<'_>,
1338    ) -> Poll<Option<Result<T, RecvError>>> {
1339        loop {
1340            // If this stream is listening for events, first wait for a notification.
1341            if let Some(listener) = self.listener.as_mut() {
1342                ready!(Pin::new(listener).poll(cx));
1343                self.listener = None;
1344            }
1345
1346            loop {
1347                // Attempt to receive a message.
1348                match self.try_recv() {
1349                    Ok(msg) => {
1350                        // The stream is not blocked on an event - drop the listener.
1351                        self.listener = None;
1352                        return Poll::Ready(Some(Ok(msg)));
1353                    }
1354                    Err(TryRecvError::Closed) => {
1355                        // The stream is not blocked on an event - drop the listener.
1356                        self.listener = None;
1357                        return Poll::Ready(None);
1358                    }
1359                    Err(TryRecvError::Overflowed(n)) => {
1360                        // The stream is not blocked on an event - drop the listener.
1361                        self.listener = None;
1362                        return Poll::Ready(Some(Err(RecvError::Overflowed(n))));
1363                    }
1364                    Err(TryRecvError::Empty) => {}
1365                }
1366
1367                // Receiving failed - now start listening for notifications or wait for one.
1368                match self.listener.as_mut() {
1369                    None => {
1370                        // Start listening and then try receiving again.
1371                        self.listener = {
1372                            let inner = self.inner.write().unwrap();
1373                            Some(inner.recv_ops.listen())
1374                        };
1375                    }
1376                    Some(_) => {
1377                        // Go back to the outer loop to poll the listener.
1378                        break;
1379                    }
1380                }
1381            }
1382        }
1383    }
1384}
1385
1386impl<T> Drop for Receiver<T> {
1387    fn drop(&mut self) {
1388        let mut inner = self.inner.write().unwrap();
1389
1390        // Remove ourself from each item's counter
1391        loop {
1392            match inner.try_recv_at(&mut self.pos) {
1393                Ok(_) => continue,
1394                Err(TryRecvError::Overflowed(_)) => continue,
1395                Err(TryRecvError::Closed) => break,
1396                Err(TryRecvError::Empty) => break,
1397            }
1398        }
1399
1400        inner.receiver_count -= 1;
1401
1402        inner.close_channel();
1403    }
1404}
1405
1406impl<T> Clone for Receiver<T> {
1407    /// Produce a clone of this Receiver that has the same messages queued.
1408    ///
1409    /// # Examples
1410    ///
1411    /// ```
1412    /// # futures_lite::future::block_on(async {
1413    /// use async_broadcast::{broadcast, RecvError};
1414    ///
1415    /// let (s, mut r1) = broadcast(1);
1416    ///
1417    /// assert_eq!(s.broadcast(1).await, Ok(None));
1418    /// drop(s);
1419    ///
1420    /// let mut r2 = r1.clone();
1421    ///
1422    /// assert_eq!(r1.recv().await, Ok(1));
1423    /// assert_eq!(r1.recv().await, Err(RecvError::Closed));
1424    /// assert_eq!(r2.recv().await, Ok(1));
1425    /// assert_eq!(r2.recv().await, Err(RecvError::Closed));
1426    /// # });
1427    /// ```
1428    fn clone(&self) -> Self {
1429        let mut inner = self.inner.write().unwrap();
1430        inner.receiver_count += 1;
1431        // increment the waiter count on all items not yet received by this object
1432        let n = self.pos.saturating_sub(inner.head_pos) as usize;
1433        for (_elt, waiters) in inner.queue.iter_mut().skip(n) {
1434            *waiters += 1;
1435        }
1436        Receiver {
1437            inner: self.inner.clone(),
1438            pos: self.pos,
1439            listener: None,
1440        }
1441    }
1442}
1443
1444impl<T: Clone> Stream for Receiver<T> {
1445    type Item = T;
1446
1447    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1448        loop {
1449            match ready!(self.as_mut().poll_recv(cx)) {
1450                Some(Ok(val)) => return Poll::Ready(Some(val)),
1451                // If overflowed, we expect future operations to succeed so try again.
1452                Some(Err(RecvError::Overflowed(_))) => continue,
1453                // RecvError::Closed should never appear here, but handle it anyway.
1454                None | Some(Err(RecvError::Closed)) => return Poll::Ready(None),
1455            }
1456        }
1457    }
1458}
1459
1460impl<T: Clone> futures_core::stream::FusedStream for Receiver<T> {
1461    fn is_terminated(&self) -> bool {
1462        let inner = self.inner.read().unwrap();
1463
1464        inner.is_closed && inner.queue.is_empty()
1465    }
1466}
1467
1468/// An error returned from [`Sender::broadcast()`].
1469///
1470/// Received because the channel is closed or no active receivers were present while `await-active`
1471/// was set to `false` (See [`Sender::set_await_active`] for details).
1472#[derive(PartialEq, Eq, Clone, Copy)]
1473pub struct SendError<T>(pub T);
1474
1475impl<T> SendError<T> {
1476    /// Unwraps the message that couldn't be sent.
1477    pub fn into_inner(self) -> T {
1478        self.0
1479    }
1480}
1481
1482impl<T> error::Error for SendError<T> {}
1483
1484impl<T> fmt::Debug for SendError<T> {
1485    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1486        write!(f, "SendError(..)")
1487    }
1488}
1489
1490impl<T> fmt::Display for SendError<T> {
1491    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1492        write!(f, "sending into a closed channel")
1493    }
1494}
1495
1496/// An error returned from [`Sender::try_broadcast()`].
1497#[derive(PartialEq, Eq, Clone, Copy)]
1498pub enum TrySendError<T> {
1499    /// The channel is full but not closed.
1500    Full(T),
1501
1502    /// The channel is closed.
1503    Closed(T),
1504
1505    /// There are currently no active receivers, only inactive ones.
1506    Inactive(T),
1507}
1508
1509impl<T> TrySendError<T> {
1510    /// Unwraps the message that couldn't be sent.
1511    pub fn into_inner(self) -> T {
1512        match self {
1513            TrySendError::Full(t) => t,
1514            TrySendError::Closed(t) => t,
1515            TrySendError::Inactive(t) => t,
1516        }
1517    }
1518
1519    /// Returns `true` if the channel is full but not closed.
1520    pub fn is_full(&self) -> bool {
1521        match self {
1522            TrySendError::Full(_) => true,
1523            TrySendError::Closed(_) | TrySendError::Inactive(_) => false,
1524        }
1525    }
1526
1527    /// Returns `true` if the channel is closed.
1528    pub fn is_closed(&self) -> bool {
1529        match self {
1530            TrySendError::Full(_) | TrySendError::Inactive(_) => false,
1531            TrySendError::Closed(_) => true,
1532        }
1533    }
1534
1535    /// Returns `true` if there are currently no active receivers, only inactive ones.
1536    pub fn is_disconnected(&self) -> bool {
1537        match self {
1538            TrySendError::Full(_) | TrySendError::Closed(_) => false,
1539            TrySendError::Inactive(_) => true,
1540        }
1541    }
1542}
1543
1544impl<T> error::Error for TrySendError<T> {}
1545
1546impl<T> fmt::Debug for TrySendError<T> {
1547    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1548        match *self {
1549            TrySendError::Full(..) => write!(f, "Full(..)"),
1550            TrySendError::Closed(..) => write!(f, "Closed(..)"),
1551            TrySendError::Inactive(..) => write!(f, "Inactive(..)"),
1552        }
1553    }
1554}
1555
1556impl<T> fmt::Display for TrySendError<T> {
1557    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1558        match *self {
1559            TrySendError::Full(..) => write!(f, "sending into a full channel"),
1560            TrySendError::Closed(..) => write!(f, "sending into a closed channel"),
1561            TrySendError::Inactive(..) => write!(f, "sending into the void (no active receivers)"),
1562        }
1563    }
1564}
1565
1566/// An error returned from [`Receiver::recv()`].
1567#[derive(PartialEq, Eq, Clone, Copy, Debug)]
1568pub enum RecvError {
1569    /// The channel has overflowed since the last element was seen.  Future recv operations will
1570    /// succeed, but some messages have been skipped.
1571    ///
1572    /// Contains the number of messages missed.
1573    Overflowed(u64),
1574
1575    /// The channel is empty and closed.
1576    Closed,
1577}
1578
1579impl error::Error for RecvError {}
1580
1581impl fmt::Display for RecvError {
1582    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1583        match self {
1584            Self::Overflowed(n) => write!(f, "receiving skipped {} messages", n),
1585            Self::Closed => write!(f, "receiving from an empty and closed channel"),
1586        }
1587    }
1588}
1589
1590/// An error returned from [`Receiver::try_recv()`].
1591#[derive(PartialEq, Eq, Clone, Copy, Debug)]
1592pub enum TryRecvError {
1593    /// The channel has overflowed since the last element was seen.  Future recv operations will
1594    /// succeed, but some messages have been skipped.
1595    Overflowed(u64),
1596
1597    /// The channel is empty but not closed.
1598    Empty,
1599
1600    /// The channel is empty and closed.
1601    Closed,
1602}
1603
1604impl TryRecvError {
1605    /// Returns `true` if the channel is empty but not closed.
1606    pub fn is_empty(&self) -> bool {
1607        match self {
1608            TryRecvError::Empty => true,
1609            TryRecvError::Closed => false,
1610            TryRecvError::Overflowed(_) => false,
1611        }
1612    }
1613
1614    /// Returns `true` if the channel is empty and closed.
1615    pub fn is_closed(&self) -> bool {
1616        match self {
1617            TryRecvError::Empty => false,
1618            TryRecvError::Closed => true,
1619            TryRecvError::Overflowed(_) => false,
1620        }
1621    }
1622
1623    /// Returns `true` if this error indicates the receiver missed messages.
1624    pub fn is_overflowed(&self) -> bool {
1625        match self {
1626            TryRecvError::Empty => false,
1627            TryRecvError::Closed => false,
1628            TryRecvError::Overflowed(_) => true,
1629        }
1630    }
1631}
1632
1633impl error::Error for TryRecvError {}
1634
1635impl fmt::Display for TryRecvError {
1636    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1637        match *self {
1638            TryRecvError::Empty => write!(f, "receiving from an empty channel"),
1639            TryRecvError::Closed => write!(f, "receiving from an empty and closed channel"),
1640            TryRecvError::Overflowed(n) => {
1641                write!(f, "receiving operation observed {} lost messages", n)
1642            }
1643        }
1644    }
1645}
1646
1647easy_wrapper! {
1648    /// A future returned by [`Sender::broadcast()`].
1649    #[derive(Debug)]
1650    #[must_use = "futures do nothing unless .awaited"]
1651    pub struct Send<'a, T: Clone>(SendInner<'a, T> => Result<Option<T>, SendError<T>>);
1652    pub(crate) wait();
1653}
1654
1655pin_project! {
1656    #[derive(Debug)]
1657    struct SendInner<'a, T> {
1658        sender: &'a Sender<T>,
1659        listener: Option<EventListener>,
1660        msg: Option<T>,
1661
1662        // Keeping this type `!Unpin` enables future optimizations.
1663        #[pin]
1664        _pin: PhantomPinned
1665    }
1666}
1667
1668impl<'a, T: Clone> EventListenerFuture for SendInner<'a, T> {
1669    type Output = Result<Option<T>, SendError<T>>;
1670
1671    fn poll_with_strategy<'x, S: event_listener_strategy::Strategy<'x>>(
1672        self: Pin<&mut Self>,
1673        strategy: &mut S,
1674        context: &mut S::Context,
1675    ) -> Poll<Self::Output> {
1676        let this = self.project();
1677
1678        loop {
1679            let msg = this.msg.take().unwrap();
1680            let inner = &this.sender.inner;
1681
1682            // Attempt to send a message.
1683            match this.sender.try_broadcast(msg) {
1684                Ok(msg) => {
1685                    let inner = inner.write().unwrap();
1686
1687                    if inner.queue.len() < inner.capacity {
1688                        // Not full still, so notify the next awaiting sender.
1689                        inner.send_ops.notify(1);
1690                    }
1691
1692                    return Poll::Ready(Ok(msg));
1693                }
1694                Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))),
1695                Err(TrySendError::Full(m)) => *this.msg = Some(m),
1696                Err(TrySendError::Inactive(m)) if inner.read().unwrap().await_active => {
1697                    *this.msg = Some(m)
1698                }
1699                Err(TrySendError::Inactive(m)) => return Poll::Ready(Err(SendError(m))),
1700            }
1701
1702            // Sending failed - now start listening for notifications or wait for one.
1703            match &this.listener {
1704                None => {
1705                    // Start listening and then try sending again.
1706                    let inner = inner.write().unwrap();
1707                    *this.listener = Some(inner.send_ops.listen());
1708                }
1709                Some(_) => {
1710                    // Wait for a notification.
1711                    ready!(strategy.poll(this.listener, context));
1712                    *this.listener = None;
1713                }
1714            }
1715        }
1716    }
1717}
1718
1719easy_wrapper! {
1720    /// A future returned by [`Receiver::recv()`].
1721    #[derive(Debug)]
1722    #[must_use = "futures do nothing unless .awaited"]
1723    pub struct Recv<'a, T: Clone>(RecvInner<'a, T> => Result<T, RecvError>);
1724    pub(crate) wait();
1725}
1726
1727pin_project! {
1728    #[derive(Debug)]
1729    struct RecvInner<'a, T> {
1730        receiver: &'a mut Receiver<T>,
1731        listener: Option<EventListener>,
1732
1733        // Keeping this type `!Unpin` enables future optimizations.
1734        #[pin]
1735        _pin: PhantomPinned
1736    }
1737}
1738
1739impl<'a, T: Clone> EventListenerFuture for RecvInner<'a, T> {
1740    type Output = Result<T, RecvError>;
1741
1742    fn poll_with_strategy<'x, S: event_listener_strategy::Strategy<'x>>(
1743        self: Pin<&mut Self>,
1744        strategy: &mut S,
1745        context: &mut S::Context,
1746    ) -> Poll<Self::Output> {
1747        let this = self.project();
1748
1749        loop {
1750            // Attempt to receive a message.
1751            match this.receiver.try_recv() {
1752                Ok(msg) => return Poll::Ready(Ok(msg)),
1753                Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError::Closed)),
1754                Err(TryRecvError::Overflowed(n)) => {
1755                    return Poll::Ready(Err(RecvError::Overflowed(n)));
1756                }
1757                Err(TryRecvError::Empty) => {}
1758            }
1759
1760            // Receiving failed - now start listening for notifications or wait for one.
1761            match &this.listener {
1762                None => {
1763                    // Start listening and then try receiving again.
1764                    *this.listener = {
1765                        let inner = this.receiver.inner.write().unwrap();
1766                        Some(inner.recv_ops.listen())
1767                    };
1768                }
1769                Some(_) => {
1770                    // Wait for a notification.
1771                    ready!(strategy.poll(this.listener, context));
1772                    *this.listener = None;
1773                }
1774            }
1775        }
1776    }
1777}
1778
1779/// An inactive  receiver.
1780///
1781/// An inactive receiver is a receiver that is unable to receive messages. It's only useful for
1782/// keeping a channel open even when no associated active receivers exist.
1783#[derive(Debug)]
1784pub struct InactiveReceiver<T> {
1785    inner: Arc<RwLock<Inner<T>>>,
1786}
1787
1788impl<T> InactiveReceiver<T> {
1789    /// Convert to an activate [`Receiver`].
1790    ///
1791    /// Consumes `self`. Use [`InactiveReceiver::activate_cloned`] if you want to keep `self`.
1792    ///
1793    /// # Examples
1794    ///
1795    /// ```
1796    /// use async_broadcast::{broadcast, TrySendError};
1797    ///
1798    /// let (s, r) = broadcast(1);
1799    /// let inactive = r.deactivate();
1800    /// assert_eq!(s.try_broadcast(10), Err(TrySendError::Inactive(10)));
1801    ///
1802    /// let mut r = inactive.activate();
1803    /// assert_eq!(s.try_broadcast(10), Ok(None));
1804    /// assert_eq!(r.try_recv(), Ok(10));
1805    /// ```
1806    pub fn activate(self) -> Receiver<T> {
1807        self.activate_cloned()
1808    }
1809
1810    /// Create an activate [`Receiver`] for the associated channel.
1811    ///
1812    /// # Examples
1813    ///
1814    /// ```
1815    /// use async_broadcast::{broadcast, TrySendError};
1816    ///
1817    /// let (s, r) = broadcast(1);
1818    /// let inactive = r.deactivate();
1819    /// assert_eq!(s.try_broadcast(10), Err(TrySendError::Inactive(10)));
1820    ///
1821    /// let mut r = inactive.activate_cloned();
1822    /// assert_eq!(s.try_broadcast(10), Ok(None));
1823    /// assert_eq!(r.try_recv(), Ok(10));
1824    /// ```
1825    pub fn activate_cloned(&self) -> Receiver<T> {
1826        let mut inner = self.inner.write().unwrap();
1827        inner.receiver_count += 1;
1828
1829        if inner.receiver_count == 1 {
1830            // Notify 1 awaiting senders that there is now a receiver. If there is still room in the
1831            // queue, the notified operation will notify another awaiting sender.
1832            inner.send_ops.notify(1);
1833        }
1834
1835        Receiver {
1836            inner: self.inner.clone(),
1837            pos: inner.head_pos + inner.queue.len() as u64,
1838            listener: None,
1839        }
1840    }
1841
1842    /// Returns the channel capacity.
1843    ///
1844    /// See [`Receiver::capacity`] documentation for examples.
1845    pub fn capacity(&self) -> usize {
1846        self.inner.read().unwrap().capacity
1847    }
1848
1849    /// Set the channel capacity.
1850    ///
1851    /// There are times when you need to change the channel's capacity after creating it. If the
1852    /// `new_cap` is less than the number of messages in the channel, the oldest messages will be
1853    /// dropped to shrink the channel.
1854    ///
1855    /// See [`Receiver::set_capacity`] documentation for examples.
1856    pub fn set_capacity(&mut self, new_cap: usize) {
1857        self.inner.write().unwrap().set_capacity(new_cap);
1858    }
1859
1860    /// If overflow mode is enabled on this channel.
1861    ///
1862    /// See [`Receiver::overflow`] documentation for examples.
1863    pub fn overflow(&self) -> bool {
1864        self.inner.read().unwrap().overflow
1865    }
1866
1867    /// Set overflow mode on the channel.
1868    ///
1869    /// When overflow mode is set, broadcasting to the channel will succeed even if the channel is
1870    /// full. It achieves that by removing the oldest message from the channel.
1871    ///
1872    /// See [`Receiver::set_overflow`] documentation for examples.
1873    pub fn set_overflow(&mut self, overflow: bool) {
1874        self.inner.write().unwrap().overflow = overflow;
1875    }
1876
1877    /// If sender will wait for active receivers.
1878    ///
1879    /// If set to `false`, [`Send`] will resolve immediately with a [`SendError`]. Defaults to
1880    /// `true`.
1881    ///
1882    /// # Examples
1883    ///
1884    /// ```
1885    /// use async_broadcast::broadcast;
1886    ///
1887    /// let (_, r) = broadcast::<i32>(5);
1888    /// let r = r.deactivate();
1889    /// assert!(r.await_active());
1890    /// ```
1891    pub fn await_active(&self) -> bool {
1892        self.inner.read().unwrap().await_active
1893    }
1894
1895    /// Specify if sender will wait for active receivers.
1896    ///
1897    /// If set to `false`, [`Send`] will resolve immediately with a [`SendError`]. Defaults to
1898    /// `true`.
1899    ///
1900    /// # Examples
1901    ///
1902    /// ```
1903    /// # futures_lite::future::block_on(async {
1904    /// use async_broadcast::broadcast;
1905    ///
1906    /// let (s, r) = broadcast::<i32>(2);
1907    /// s.broadcast(1).await.unwrap();
1908    ///
1909    /// let mut r = r.deactivate();
1910    /// r.set_await_active(false);
1911    /// assert!(s.broadcast(2).await.is_err());
1912    /// # });
1913    /// ```
1914    pub fn set_await_active(&mut self, await_active: bool) {
1915        self.inner.write().unwrap().await_active = await_active;
1916    }
1917
1918    /// Closes the channel.
1919    ///
1920    /// Returns `true` if this call has closed the channel and it was not closed already.
1921    ///
1922    /// The remaining messages can still be received.
1923    ///
1924    /// See [`Receiver::close`] documentation for examples.
1925    pub fn close(&self) -> bool {
1926        self.inner.write().unwrap().close()
1927    }
1928
1929    /// Returns `true` if the channel is closed.
1930    ///
1931    /// See [`Receiver::is_closed`] documentation for examples.
1932    pub fn is_closed(&self) -> bool {
1933        self.inner.read().unwrap().is_closed
1934    }
1935
1936    /// Returns `true` if the channel is empty.
1937    ///
1938    /// See [`Receiver::is_empty`] documentation for examples.
1939    pub fn is_empty(&self) -> bool {
1940        self.inner.read().unwrap().queue.is_empty()
1941    }
1942
1943    /// Returns `true` if the channel is full.
1944    ///
1945    /// See [`Receiver::is_full`] documentation for examples.
1946    pub fn is_full(&self) -> bool {
1947        let inner = self.inner.read().unwrap();
1948
1949        inner.queue.len() == inner.capacity
1950    }
1951
1952    /// Returns the number of messages in the channel.
1953    ///
1954    /// See [`Receiver::len`] documentation for examples.
1955    pub fn len(&self) -> usize {
1956        self.inner.read().unwrap().queue.len()
1957    }
1958
1959    /// Returns the number of receivers for the channel.
1960    ///
1961    /// This does not include inactive receivers. Use [`InactiveReceiver::inactive_receiver_count`]
1962    /// if you're interested in that.
1963    ///
1964    /// # Examples
1965    ///
1966    /// ```
1967    /// use async_broadcast::broadcast;
1968    ///
1969    /// let (s, r) = broadcast::<()>(1);
1970    /// assert_eq!(s.receiver_count(), 1);
1971    /// let r = r.deactivate();
1972    /// assert_eq!(s.receiver_count(), 0);
1973    ///
1974    /// let r2 = r.activate_cloned();
1975    /// assert_eq!(r.receiver_count(), 1);
1976    /// assert_eq!(r.inactive_receiver_count(), 1);
1977    /// ```
1978    pub fn receiver_count(&self) -> usize {
1979        self.inner.read().unwrap().receiver_count
1980    }
1981
1982    /// Returns the number of inactive receivers for the channel.
1983    ///
1984    /// # Examples
1985    ///
1986    /// ```
1987    /// use async_broadcast::broadcast;
1988    ///
1989    /// let (s, r) = broadcast::<()>(1);
1990    /// assert_eq!(s.receiver_count(), 1);
1991    /// let r = r.deactivate();
1992    /// assert_eq!(s.receiver_count(), 0);
1993    ///
1994    /// let r2 = r.activate_cloned();
1995    /// assert_eq!(r.receiver_count(), 1);
1996    /// assert_eq!(r.inactive_receiver_count(), 1);
1997    /// ```
1998    pub fn inactive_receiver_count(&self) -> usize {
1999        self.inner.read().unwrap().inactive_receiver_count
2000    }
2001
2002    /// Returns the number of senders for the channel.
2003    ///
2004    /// See [`Receiver::sender_count`] documentation for examples.
2005    pub fn sender_count(&self) -> usize {
2006        self.inner.read().unwrap().sender_count
2007    }
2008}
2009
2010impl<T> Clone for InactiveReceiver<T> {
2011    fn clone(&self) -> Self {
2012        self.inner.write().unwrap().inactive_receiver_count += 1;
2013
2014        InactiveReceiver {
2015            inner: self.inner.clone(),
2016        }
2017    }
2018}
2019
2020impl<T> Drop for InactiveReceiver<T> {
2021    fn drop(&mut self) {
2022        let mut inner = self.inner.write().unwrap();
2023
2024        inner.inactive_receiver_count -= 1;
2025        inner.close_channel();
2026    }
2027}