async_broadcast/lib.rs
1//! Async broadcast channel
2//!
3//! An async multi-producer multi-consumer broadcast channel, where each consumer gets a clone of every
4//! message sent on the channel. For obvious reasons, the channel can only be used to broadcast types
5//! that implement [`Clone`].
6//!
7//! A channel has the [`Sender`] and [`Receiver`] side. Both sides are cloneable and can be shared
8//! among multiple threads.
9//!
10//! When all `Sender`s or all `Receiver`s are dropped, the channel becomes closed. When a channel is
11//! closed, no more messages can be sent, but remaining messages can still be received.
12//!
13//! The channel can also be closed manually by calling [`Sender::close()`] or [`Receiver::close()`].
14//!
15//! ## Examples
16//!
17//! ```rust
18//! use async_broadcast::{broadcast, TryRecvError};
19//! use futures_lite::{future::block_on, stream::StreamExt};
20//!
21//! block_on(async move {
22//! let (s1, mut r1) = broadcast(2);
23//! let s2 = s1.clone();
24//! let mut r2 = r1.clone();
25//!
26//! // Send 2 messages from two different senders.
27//! s1.broadcast(7).await.unwrap();
28//! s2.broadcast(8).await.unwrap();
29//!
30//! // Channel is now at capacity so sending more messages will result in an error.
31//! assert!(s2.try_broadcast(9).unwrap_err().is_full());
32//! assert!(s1.try_broadcast(10).unwrap_err().is_full());
33//!
34//! // We can use `recv` method of the `Stream` implementation to receive messages.
35//! assert_eq!(r1.next().await.unwrap(), 7);
36//! assert_eq!(r1.recv().await.unwrap(), 8);
37//! assert_eq!(r2.next().await.unwrap(), 7);
38//! assert_eq!(r2.recv().await.unwrap(), 8);
39//!
40//! // All receiver got all messages so channel is now empty.
41//! assert_eq!(r1.try_recv(), Err(TryRecvError::Empty));
42//! assert_eq!(r2.try_recv(), Err(TryRecvError::Empty));
43//!
44//! // Drop both senders, which closes the channel.
45//! drop(s1);
46//! drop(s2);
47//!
48//! assert_eq!(r1.try_recv(), Err(TryRecvError::Closed));
49//! assert_eq!(r2.try_recv(), Err(TryRecvError::Closed));
50//! })
51//! ```
52//!
53//! ## Difference with `async-channel`
54//!
55//! This crate is similar to [`async-channel`] in that they both provide an MPMC channel but the
56//! main difference being that in `async-channel`, each message sent on the channel is only received
57//! by one of the receivers. `async-broadcast` on the other hand, delivers each message to every
58//! receiver (IOW broadcast) by cloning it for each receiver.
59//!
60//! [`async-channel`]: https://crates.io/crates/async-channel
61//!
62//! ## Difference with other broadcast crates
63//!
64//! * [`broadcaster`]: The main difference would be that `broadcaster` doesn't have a sender and
65//! receiver split and both sides use clones of the same BroadcastChannel instance. The messages
66//! are sent are sent to all channel clones. While this can work for many cases, the lack of
67//! sender and receiver split, means that often times, you'll find yourself having to drain the
68//! channel on the sending side yourself.
69//!
70//! * [`postage`]: this crate provides a [broadcast API][pba] similar to `async_broadcast`. However,
71//! it:
72//! - (at the time of this writing) duplicates [futures] API, which isn't ideal.
73//! - Does not support overflow mode nor has the concept of inactive receivers, so a slow or
74//! inactive receiver blocking the whole channel is not a solvable problem.
75//! - Provides all kinds of channels, which is generally good but if you just need a broadcast
76//! channel, `async_broadcast` is probably a better choice.
77//!
78//! * [`tokio::sync`]: Tokio's `sync` module provides a [broadcast channel][tbc] API. The differences
79//! here are:
80//! - While this implementation does provide [overflow mode][tom], it is the default behavior and not
81//! opt-in.
82//! - There is no equivalent of inactive receivers.
83//! - While it's possible to build tokio with only the `sync` module, it comes with other APIs that
84//! you may not need.
85//!
86//! [`broadcaster`]: https://crates.io/crates/broadcaster
87//! [`postage`]: https://crates.io/crates/postage
88//! [pba]: https://docs.rs/postage/0.4.1/postage/broadcast/fn.channel.html
89//! [futures]: https://crates.io/crates/futures
90//! [`tokio::sync`]: https://docs.rs/tokio/1.6.0/tokio/sync
91//! [tbc]: https://docs.rs/tokio/1.6.0/tokio/sync/broadcast/index.html
92//! [tom]: https://docs.rs/tokio/1.6.0/tokio/sync/broadcast/index.html#lagging
93//!
94#![forbid(unsafe_code)]
95#![deny(missing_debug_implementations, nonstandard_style, rust_2018_idioms)]
96#![warn(rustdoc::missing_doc_code_examples, unreachable_pub)]
97#![doc(
98 html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
99)]
100#![doc(
101 html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
102)]
103
104#[cfg(doctest)]
105mod doctests {
106 doc_comment::doctest!("../README.md");
107}
108
109use std::collections::VecDeque;
110use std::convert::TryInto;
111use std::error;
112use std::fmt;
113use std::future::Future;
114use std::marker::PhantomPinned;
115use std::pin::Pin;
116use std::sync::{Arc, RwLock};
117use std::task::{Context, Poll};
118
119use event_listener::{Event, EventListener};
120use event_listener_strategy::{easy_wrapper, EventListenerFuture};
121use futures_core::{ready, stream::Stream};
122use pin_project_lite::pin_project;
123
124/// Create a new broadcast channel.
125///
126/// The created channel has space to hold at most `cap` messages at a time.
127///
128/// # Panics
129///
130/// Capacity must be a positive number. If `cap` is zero, this function will panic.
131///
132/// # Examples
133///
134/// ```
135/// # futures_lite::future::block_on(async {
136/// use async_broadcast::{broadcast, TryRecvError, TrySendError};
137///
138/// let (s, mut r1) = broadcast(1);
139/// let mut r2 = r1.clone();
140///
141/// assert_eq!(s.broadcast(10).await, Ok(None));
142/// assert_eq!(s.try_broadcast(20), Err(TrySendError::Full(20)));
143///
144/// assert_eq!(r1.recv().await, Ok(10));
145/// assert_eq!(r2.recv().await, Ok(10));
146/// assert_eq!(r1.try_recv(), Err(TryRecvError::Empty));
147/// assert_eq!(r2.try_recv(), Err(TryRecvError::Empty));
148/// # });
149/// ```
150pub fn broadcast<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
151 assert!(cap > 0, "capacity cannot be zero");
152
153 let inner = Arc::new(RwLock::new(Inner {
154 queue: VecDeque::with_capacity(cap),
155 capacity: cap,
156 overflow: false,
157 await_active: true,
158 receiver_count: 1,
159 inactive_receiver_count: 0,
160 sender_count: 1,
161 head_pos: 0,
162 is_closed: false,
163 send_ops: Event::new(),
164 recv_ops: Event::new(),
165 }));
166
167 let s = Sender {
168 inner: inner.clone(),
169 };
170 let r = Receiver {
171 inner,
172 pos: 0,
173 listener: None,
174 };
175
176 (s, r)
177}
178
179#[derive(Debug)]
180struct Inner<T> {
181 queue: VecDeque<(T, usize)>,
182 // We assign the same capacity to the queue but that's just specifying the minimum capacity and
183 // the actual capacity could be anything. Hence the need to keep track of our own set capacity.
184 capacity: usize,
185 receiver_count: usize,
186 inactive_receiver_count: usize,
187 sender_count: usize,
188 /// Send sequence number of the front of the queue
189 head_pos: u64,
190 overflow: bool,
191 await_active: bool,
192
193 is_closed: bool,
194
195 /// Send operations waiting while the channel is full.
196 send_ops: Event,
197
198 /// Receive operations waiting while the channel is empty and not closed.
199 recv_ops: Event,
200}
201
202impl<T> Inner<T> {
203 /// Try receiving at the given position, returning either the element or a reference to it.
204 ///
205 /// Result is used here instead of Cow because we don't have a Clone bound on T.
206 fn try_recv_at(&mut self, pos: &mut u64) -> Result<Result<T, &T>, TryRecvError> {
207 let i = match pos.checked_sub(self.head_pos) {
208 Some(i) => i
209 .try_into()
210 .expect("Head position more than usize::MAX behind a receiver"),
211 None => {
212 let count = self.head_pos - *pos;
213 *pos = self.head_pos;
214 return Err(TryRecvError::Overflowed(count));
215 }
216 };
217
218 let last_waiter;
219 if let Some((_elt, waiters)) = self.queue.get_mut(i) {
220 *pos += 1;
221 *waiters -= 1;
222 last_waiter = *waiters == 0;
223 } else {
224 debug_assert_eq!(i, self.queue.len());
225 if self.is_closed {
226 return Err(TryRecvError::Closed);
227 } else {
228 return Err(TryRecvError::Empty);
229 }
230 }
231
232 // If we read from the front of the queue and this is the last receiver reading it
233 // we can pop the queue instead of cloning the message
234 if last_waiter {
235 // Only the first element of the queue should have 0 waiters
236 assert_eq!(i, 0);
237
238 // Remove the element from the queue, adjust space, and notify senders
239 let elt = self.queue.pop_front().unwrap().0;
240 self.head_pos += 1;
241 if !self.overflow {
242 // Notify 1 awaiting senders that there is now room. If there is still room in the
243 // queue, the notified operation will notify another awaiting sender.
244 self.send_ops.notify(1);
245 }
246
247 Ok(Ok(elt))
248 } else {
249 Ok(Err(&self.queue[i].0))
250 }
251 }
252
253 /// Closes the channel and notifies all waiting operations.
254 ///
255 /// Returns `true` if this call has closed the channel and it was not closed already.
256 fn close(&mut self) -> bool {
257 if self.is_closed {
258 return false;
259 }
260
261 self.is_closed = true;
262 // Notify all waiting senders and receivers.
263 self.send_ops.notify(usize::MAX);
264 self.recv_ops.notify(usize::MAX);
265
266 true
267 }
268
269 /// Set the channel capacity.
270 ///
271 /// There are times when you need to change the channel's capacity after creating it. If the
272 /// `new_cap` is less than the number of messages in the channel, the oldest messages will be
273 /// dropped to shrink the channel.
274 fn set_capacity(&mut self, new_cap: usize) {
275 self.capacity = new_cap;
276 if new_cap > self.queue.capacity() {
277 let diff = new_cap - self.queue.capacity();
278 self.queue.reserve(diff);
279 }
280
281 // Ensure queue doesn't have more than `new_cap` messages.
282 if new_cap < self.queue.len() {
283 let diff = self.queue.len() - new_cap;
284 self.queue.drain(0..diff);
285 self.head_pos += diff as u64;
286 }
287 }
288
289 /// Close the channel if there aren't any receivers present anymore
290 fn close_channel(&mut self) {
291 if self.receiver_count == 0 && self.inactive_receiver_count == 0 {
292 self.close();
293 }
294 }
295}
296
297/// The sending side of the broadcast channel.
298///
299/// Senders can be cloned and shared among threads. When all senders associated with a channel are
300/// dropped, the channel becomes closed.
301///
302/// The channel can also be closed manually by calling [`Sender::close()`].
303#[derive(Debug)]
304pub struct Sender<T> {
305 inner: Arc<RwLock<Inner<T>>>,
306}
307
308impl<T> Sender<T> {
309 /// Returns the channel capacity.
310 ///
311 /// # Examples
312 ///
313 /// ```
314 /// use async_broadcast::broadcast;
315 ///
316 /// let (s, r) = broadcast::<i32>(5);
317 /// assert_eq!(s.capacity(), 5);
318 /// ```
319 pub fn capacity(&self) -> usize {
320 self.inner.read().unwrap().capacity
321 }
322
323 /// Set the channel capacity.
324 ///
325 /// There are times when you need to change the channel's capacity after creating it. If the
326 /// `new_cap` is less than the number of messages in the channel, the oldest messages will be
327 /// dropped to shrink the channel.
328 ///
329 /// # Examples
330 ///
331 /// ```
332 /// use async_broadcast::{broadcast, TrySendError, TryRecvError};
333 ///
334 /// let (mut s, mut r) = broadcast::<i32>(3);
335 /// assert_eq!(s.capacity(), 3);
336 /// s.try_broadcast(1).unwrap();
337 /// s.try_broadcast(2).unwrap();
338 /// s.try_broadcast(3).unwrap();
339 ///
340 /// s.set_capacity(1);
341 /// assert_eq!(s.capacity(), 1);
342 /// assert_eq!(r.try_recv(), Err(TryRecvError::Overflowed(2)));
343 /// assert_eq!(r.try_recv().unwrap(), 3);
344 /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
345 /// s.try_broadcast(1).unwrap();
346 /// assert_eq!(s.try_broadcast(2), Err(TrySendError::Full(2)));
347 ///
348 /// s.set_capacity(2);
349 /// assert_eq!(s.capacity(), 2);
350 /// s.try_broadcast(2).unwrap();
351 /// assert_eq!(s.try_broadcast(2), Err(TrySendError::Full(2)));
352 /// ```
353 pub fn set_capacity(&mut self, new_cap: usize) {
354 self.inner.write().unwrap().set_capacity(new_cap);
355 }
356
357 /// If overflow mode is enabled on this channel.
358 ///
359 /// # Examples
360 ///
361 /// ```
362 /// use async_broadcast::broadcast;
363 ///
364 /// let (s, r) = broadcast::<i32>(5);
365 /// assert!(!s.overflow());
366 /// ```
367 pub fn overflow(&self) -> bool {
368 self.inner.read().unwrap().overflow
369 }
370
371 /// Set overflow mode on the channel.
372 ///
373 /// When overflow mode is set, broadcasting to the channel will succeed even if the channel is
374 /// full. It achieves that by removing the oldest message from the channel.
375 ///
376 /// # Examples
377 ///
378 /// ```
379 /// use async_broadcast::{broadcast, TrySendError, TryRecvError};
380 ///
381 /// let (mut s, mut r) = broadcast::<i32>(2);
382 /// s.try_broadcast(1).unwrap();
383 /// s.try_broadcast(2).unwrap();
384 /// assert_eq!(s.try_broadcast(3), Err(TrySendError::Full(3)));
385 /// s.set_overflow(true);
386 /// assert_eq!(s.try_broadcast(3).unwrap(), Some(1));
387 /// assert_eq!(s.try_broadcast(4).unwrap(), Some(2));
388 ///
389 /// assert_eq!(r.try_recv(), Err(TryRecvError::Overflowed(2)));
390 /// assert_eq!(r.try_recv().unwrap(), 3);
391 /// assert_eq!(r.try_recv().unwrap(), 4);
392 /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
393 /// ```
394 pub fn set_overflow(&mut self, overflow: bool) {
395 self.inner.write().unwrap().overflow = overflow;
396 }
397
398 /// If sender will wait for active receivers.
399 ///
400 /// If set to `false`, [`Send`] will resolve immediately with a [`SendError`]. Defaults to
401 /// `true`.
402 ///
403 /// # Examples
404 ///
405 /// ```
406 /// use async_broadcast::broadcast;
407 ///
408 /// let (s, _) = broadcast::<i32>(5);
409 /// assert!(s.await_active());
410 /// ```
411 pub fn await_active(&self) -> bool {
412 self.inner.read().unwrap().await_active
413 }
414
415 /// Specify if sender will wait for active receivers.
416 ///
417 /// If set to `false`, [`Send`] will resolve immediately with a [`SendError`]. Defaults to
418 /// `true`.
419 ///
420 /// # Examples
421 ///
422 /// ```
423 /// # futures_lite::future::block_on(async {
424 /// use async_broadcast::broadcast;
425 ///
426 /// let (mut s, mut r) = broadcast::<i32>(2);
427 /// s.broadcast(1).await.unwrap();
428 ///
429 /// let _ = r.deactivate();
430 /// s.set_await_active(false);
431 /// assert!(s.broadcast(2).await.is_err());
432 /// # });
433 /// ```
434 pub fn set_await_active(&mut self, await_active: bool) {
435 self.inner.write().unwrap().await_active = await_active;
436 }
437
438 /// Closes the channel.
439 ///
440 /// Returns `true` if this call has closed the channel and it was not closed already.
441 ///
442 /// The remaining messages can still be received.
443 ///
444 /// # Examples
445 ///
446 /// ```
447 /// # futures_lite::future::block_on(async {
448 /// use async_broadcast::{broadcast, RecvError};
449 ///
450 /// let (s, mut r) = broadcast(1);
451 /// s.broadcast(1).await.unwrap();
452 /// assert!(s.close());
453 ///
454 /// assert_eq!(r.recv().await.unwrap(), 1);
455 /// assert_eq!(r.recv().await, Err(RecvError::Closed));
456 /// # });
457 /// ```
458 pub fn close(&self) -> bool {
459 self.inner.write().unwrap().close()
460 }
461
462 /// Returns `true` if the channel is closed.
463 ///
464 /// # Examples
465 ///
466 /// ```
467 /// # futures_lite::future::block_on(async {
468 /// use async_broadcast::{broadcast, RecvError};
469 ///
470 /// let (s, r) = broadcast::<()>(1);
471 /// assert!(!s.is_closed());
472 ///
473 /// drop(r);
474 /// assert!(s.is_closed());
475 /// # });
476 /// ```
477 pub fn is_closed(&self) -> bool {
478 self.inner.read().unwrap().is_closed
479 }
480
481 /// Returns `true` if the channel is empty.
482 ///
483 /// # Examples
484 ///
485 /// ```
486 /// # futures_lite::future::block_on(async {
487 /// use async_broadcast::broadcast;
488 ///
489 /// let (s, r) = broadcast(1);
490 ///
491 /// assert!(s.is_empty());
492 /// s.broadcast(1).await;
493 /// assert!(!s.is_empty());
494 /// # });
495 /// ```
496 pub fn is_empty(&self) -> bool {
497 self.inner.read().unwrap().queue.is_empty()
498 }
499
500 /// Returns `true` if the channel is full.
501 ///
502 /// # Examples
503 ///
504 /// ```
505 /// # futures_lite::future::block_on(async {
506 /// use async_broadcast::broadcast;
507 ///
508 /// let (s, r) = broadcast(1);
509 ///
510 /// assert!(!s.is_full());
511 /// s.broadcast(1).await;
512 /// assert!(s.is_full());
513 /// # });
514 /// ```
515 pub fn is_full(&self) -> bool {
516 let inner = self.inner.read().unwrap();
517
518 inner.queue.len() == inner.capacity
519 }
520
521 /// Returns the number of messages in the channel.
522 ///
523 /// # Examples
524 ///
525 /// ```
526 /// # futures_lite::future::block_on(async {
527 /// use async_broadcast::broadcast;
528 ///
529 /// let (s, r) = broadcast(2);
530 /// assert_eq!(s.len(), 0);
531 ///
532 /// s.broadcast(1).await;
533 /// s.broadcast(2).await;
534 /// assert_eq!(s.len(), 2);
535 /// # });
536 /// ```
537 pub fn len(&self) -> usize {
538 self.inner.read().unwrap().queue.len()
539 }
540
541 /// Returns the number of receivers for the channel.
542 ///
543 /// This does not include inactive receivers. Use [`Sender::inactive_receiver_count`] if you
544 /// are interested in that.
545 ///
546 /// # Examples
547 ///
548 /// ```
549 /// use async_broadcast::broadcast;
550 ///
551 /// let (s, r) = broadcast::<()>(1);
552 /// assert_eq!(s.receiver_count(), 1);
553 /// let r = r.deactivate();
554 /// assert_eq!(s.receiver_count(), 0);
555 ///
556 /// let r2 = r.activate_cloned();
557 /// assert_eq!(r.receiver_count(), 1);
558 /// assert_eq!(r.inactive_receiver_count(), 1);
559 /// ```
560 pub fn receiver_count(&self) -> usize {
561 self.inner.read().unwrap().receiver_count
562 }
563
564 /// Returns the number of inactive receivers for the channel.
565 ///
566 /// # Examples
567 ///
568 /// ```
569 /// use async_broadcast::broadcast;
570 ///
571 /// let (s, r) = broadcast::<()>(1);
572 /// assert_eq!(s.receiver_count(), 1);
573 /// let r = r.deactivate();
574 /// assert_eq!(s.receiver_count(), 0);
575 ///
576 /// let r2 = r.activate_cloned();
577 /// assert_eq!(r.receiver_count(), 1);
578 /// assert_eq!(r.inactive_receiver_count(), 1);
579 /// ```
580 pub fn inactive_receiver_count(&self) -> usize {
581 self.inner.read().unwrap().inactive_receiver_count
582 }
583
584 /// Returns the number of senders for the channel.
585 ///
586 /// # Examples
587 ///
588 /// ```
589 /// # futures_lite::future::block_on(async {
590 /// use async_broadcast::broadcast;
591 ///
592 /// let (s, r) = broadcast::<()>(1);
593 /// assert_eq!(s.sender_count(), 1);
594 ///
595 /// let s2 = s.clone();
596 /// assert_eq!(s.sender_count(), 2);
597 /// # });
598 /// ```
599 pub fn sender_count(&self) -> usize {
600 self.inner.read().unwrap().sender_count
601 }
602
603 /// Produce a new Receiver for this channel.
604 ///
605 /// The new receiver starts with zero messages available. This will not re-open the channel if
606 /// it was closed due to all receivers being dropped.
607 ///
608 /// # Examples
609 ///
610 /// ```
611 /// # futures_lite::future::block_on(async {
612 /// use async_broadcast::{broadcast, RecvError};
613 ///
614 /// let (s, mut r1) = broadcast(2);
615 ///
616 /// assert_eq!(s.broadcast(1).await, Ok(None));
617 ///
618 /// let mut r2 = s.new_receiver();
619 ///
620 /// assert_eq!(s.broadcast(2).await, Ok(None));
621 /// drop(s);
622 ///
623 /// assert_eq!(r1.recv().await, Ok(1));
624 /// assert_eq!(r1.recv().await, Ok(2));
625 /// assert_eq!(r1.recv().await, Err(RecvError::Closed));
626 ///
627 /// assert_eq!(r2.recv().await, Ok(2));
628 /// assert_eq!(r2.recv().await, Err(RecvError::Closed));
629 /// # });
630 /// ```
631 pub fn new_receiver(&self) -> Receiver<T> {
632 let mut inner = self.inner.write().unwrap();
633 inner.receiver_count += 1;
634 Receiver {
635 inner: self.inner.clone(),
636 pos: inner.head_pos + inner.queue.len() as u64,
637 listener: None,
638 }
639 }
640}
641
642impl<T: Clone> Sender<T> {
643 /// Broadcasts a message on the channel.
644 ///
645 /// If the channel is full, this method waits until there is space for a message unless:
646 ///
647 /// 1. overflow mode (set through [`Sender::set_overflow`]) is enabled, in which case it removes
648 /// the oldest message from the channel to make room for the new message. The removed message
649 /// is returned to the caller.
650 /// 2. this behavior is disabled using [`Sender::set_await_active`], in which case, it returns
651 /// [`SendError`] immediately.
652 ///
653 /// If the channel is closed, this method returns an error.
654 ///
655 /// The future returned by this function is pinned to the heap. If the future being `Unpin` is
656 /// not important to you, or if you just `.await` this future, use the [`broadcast_direct`] method
657 /// instead.
658 ///
659 /// # Examples
660 ///
661 /// ```
662 /// # futures_lite::future::block_on(async {
663 /// use async_broadcast::{broadcast, SendError};
664 ///
665 /// let (s, r) = broadcast(1);
666 ///
667 /// assert_eq!(s.broadcast(1).await, Ok(None));
668 /// drop(r);
669 /// assert_eq!(s.broadcast(2).await, Err(SendError(2)));
670 /// # });
671 /// ```
672 pub fn broadcast(&self, msg: T) -> Pin<Box<Send<'_, T>>> {
673 Box::pin(self.broadcast_direct(msg))
674 }
675
676 /// Broadcasts a message on the channel without pinning the future to the heap.
677 ///
678 /// The future returned by this method is not `Unpin` and must be pinned before use. This is
679 /// the desired behavior if you just `.await` on the future. For other uses cases, use the
680 /// [`broadcast`] method instead.
681 ///
682 /// # Examples
683 ///
684 /// ```
685 /// # futures_lite::future::block_on(async {
686 /// use async_broadcast::{broadcast, SendError};
687 ///
688 /// let (s, r) = broadcast(1);
689 ///
690 /// assert_eq!(s.broadcast_direct(1).await, Ok(None));
691 /// drop(r);
692 /// assert_eq!(s.broadcast_direct(2).await, Err(SendError(2)));
693 /// # });
694 /// ```
695 pub fn broadcast_direct(&self, msg: T) -> Send<'_, T> {
696 Send::_new(SendInner {
697 sender: self,
698 listener: None,
699 msg: Some(msg),
700 _pin: PhantomPinned,
701 })
702 }
703
704 /// Attempts to broadcast a message on the channel.
705 ///
706 /// If the channel is full, this method returns an error unless overflow mode (set through
707 /// [`Sender::set_overflow`]) is enabled. If the overflow mode is enabled, it removes the
708 /// oldest message from the channel to make room for the new message. The removed message
709 /// is returned to the caller.
710 ///
711 /// If the channel is closed, this method returns an error.
712 ///
713 /// # Examples
714 ///
715 /// ```
716 /// use async_broadcast::{broadcast, TrySendError};
717 ///
718 /// let (s, r) = broadcast(1);
719 ///
720 /// assert_eq!(s.try_broadcast(1), Ok(None));
721 /// assert_eq!(s.try_broadcast(2), Err(TrySendError::Full(2)));
722 ///
723 /// drop(r);
724 /// assert_eq!(s.try_broadcast(3), Err(TrySendError::Closed(3)));
725 /// ```
726 pub fn try_broadcast(&self, msg: T) -> Result<Option<T>, TrySendError<T>> {
727 let mut ret = None;
728 let mut inner = self.inner.write().unwrap();
729
730 if inner.is_closed {
731 return Err(TrySendError::Closed(msg));
732 } else if inner.receiver_count == 0 {
733 assert!(inner.inactive_receiver_count != 0);
734
735 return Err(TrySendError::Inactive(msg));
736 } else if inner.queue.len() == inner.capacity {
737 if inner.overflow {
738 // Make room by popping a message.
739 ret = inner.queue.pop_front().map(|(m, _)| m);
740 } else {
741 return Err(TrySendError::Full(msg));
742 }
743 }
744 let receiver_count = inner.receiver_count;
745 inner.queue.push_back((msg, receiver_count));
746 if ret.is_some() {
747 inner.head_pos += 1;
748 }
749
750 // Notify all awaiting receive operations.
751 inner.recv_ops.notify(usize::MAX);
752
753 Ok(ret)
754 }
755}
756
757impl<T> Drop for Sender<T> {
758 fn drop(&mut self) {
759 let mut inner = self.inner.write().unwrap();
760
761 inner.sender_count -= 1;
762
763 if inner.sender_count == 0 {
764 inner.close();
765 }
766 }
767}
768
769impl<T> Clone for Sender<T> {
770 fn clone(&self) -> Self {
771 self.inner.write().unwrap().sender_count += 1;
772
773 Sender {
774 inner: self.inner.clone(),
775 }
776 }
777}
778
779/// The receiving side of a channel.
780///
781/// Receivers can be cloned and shared among threads. When all (active) receivers associated with a
782/// channel are dropped, the channel becomes closed. You can deactivate a receiver using
783/// [`Receiver::deactivate`] if you would like the channel to remain open without keeping active
784/// receivers around.
785#[derive(Debug)]
786pub struct Receiver<T> {
787 inner: Arc<RwLock<Inner<T>>>,
788 pos: u64,
789
790 /// Listens for a send or close event to unblock this stream.
791 listener: Option<EventListener>,
792}
793
794impl<T> Receiver<T> {
795 /// Returns the channel capacity.
796 ///
797 /// # Examples
798 ///
799 /// ```
800 /// use async_broadcast::broadcast;
801 ///
802 /// let (_s, r) = broadcast::<i32>(5);
803 /// assert_eq!(r.capacity(), 5);
804 /// ```
805 pub fn capacity(&self) -> usize {
806 self.inner.read().unwrap().capacity
807 }
808
809 /// Set the channel capacity.
810 ///
811 /// There are times when you need to change the channel's capacity after creating it. If the
812 /// `new_cap` is less than the number of messages in the channel, the oldest messages will be
813 /// dropped to shrink the channel.
814 ///
815 /// # Examples
816 ///
817 /// ```
818 /// use async_broadcast::{broadcast, TrySendError, TryRecvError};
819 ///
820 /// let (s, mut r) = broadcast::<i32>(3);
821 /// assert_eq!(r.capacity(), 3);
822 /// s.try_broadcast(1).unwrap();
823 /// s.try_broadcast(2).unwrap();
824 /// s.try_broadcast(3).unwrap();
825 ///
826 /// r.set_capacity(1);
827 /// assert_eq!(r.capacity(), 1);
828 /// assert_eq!(r.try_recv(), Err(TryRecvError::Overflowed(2)));
829 /// assert_eq!(r.try_recv().unwrap(), 3);
830 /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
831 /// s.try_broadcast(1).unwrap();
832 /// assert_eq!(s.try_broadcast(2), Err(TrySendError::Full(2)));
833 ///
834 /// r.set_capacity(2);
835 /// assert_eq!(r.capacity(), 2);
836 /// s.try_broadcast(2).unwrap();
837 /// assert_eq!(s.try_broadcast(2), Err(TrySendError::Full(2)));
838 /// ```
839 pub fn set_capacity(&mut self, new_cap: usize) {
840 self.inner.write().unwrap().set_capacity(new_cap);
841 }
842
843 /// If overflow mode is enabled on this channel.
844 ///
845 /// # Examples
846 ///
847 /// ```
848 /// use async_broadcast::broadcast;
849 ///
850 /// let (_s, r) = broadcast::<i32>(5);
851 /// assert!(!r.overflow());
852 /// ```
853 pub fn overflow(&self) -> bool {
854 self.inner.read().unwrap().overflow
855 }
856
857 /// Set overflow mode on the channel.
858 ///
859 /// When overflow mode is set, broadcasting to the channel will succeed even if the channel is
860 /// full. It achieves that by removing the oldest message from the channel.
861 ///
862 /// # Examples
863 ///
864 /// ```
865 /// use async_broadcast::{broadcast, TrySendError, TryRecvError};
866 ///
867 /// let (s, mut r) = broadcast::<i32>(2);
868 /// s.try_broadcast(1).unwrap();
869 /// s.try_broadcast(2).unwrap();
870 /// assert_eq!(s.try_broadcast(3), Err(TrySendError::Full(3)));
871 /// r.set_overflow(true);
872 /// assert_eq!(s.try_broadcast(3).unwrap(), Some(1));
873 /// assert_eq!(s.try_broadcast(4).unwrap(), Some(2));
874 ///
875 /// assert_eq!(r.try_recv(), Err(TryRecvError::Overflowed(2)));
876 /// assert_eq!(r.try_recv().unwrap(), 3);
877 /// assert_eq!(r.try_recv().unwrap(), 4);
878 /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
879 /// ```
880 pub fn set_overflow(&mut self, overflow: bool) {
881 self.inner.write().unwrap().overflow = overflow;
882 }
883
884 /// If sender will wait for active receivers.
885 ///
886 /// If set to `false`, [`Send`] will resolve immediately with a [`SendError`]. Defaults to
887 /// `true`.
888 ///
889 /// # Examples
890 ///
891 /// ```
892 /// use async_broadcast::broadcast;
893 ///
894 /// let (_, r) = broadcast::<i32>(5);
895 /// assert!(r.await_active());
896 /// ```
897 pub fn await_active(&self) -> bool {
898 self.inner.read().unwrap().await_active
899 }
900
901 /// Specify if sender will wait for active receivers.
902 ///
903 /// If set to `false`, [`Send`] will resolve immediately with a [`SendError`]. Defaults to
904 /// `true`.
905 ///
906 /// # Examples
907 ///
908 /// ```
909 /// # futures_lite::future::block_on(async {
910 /// use async_broadcast::broadcast;
911 ///
912 /// let (s, mut r) = broadcast::<i32>(2);
913 /// s.broadcast(1).await.unwrap();
914 ///
915 /// r.set_await_active(false);
916 /// let _ = r.deactivate();
917 /// assert!(s.broadcast(2).await.is_err());
918 /// # });
919 /// ```
920 pub fn set_await_active(&mut self, await_active: bool) {
921 self.inner.write().unwrap().await_active = await_active;
922 }
923
924 /// Closes the channel.
925 ///
926 /// Returns `true` if this call has closed the channel and it was not closed already.
927 ///
928 /// The remaining messages can still be received.
929 ///
930 /// # Examples
931 ///
932 /// ```
933 /// # futures_lite::future::block_on(async {
934 /// use async_broadcast::{broadcast, RecvError};
935 ///
936 /// let (s, mut r) = broadcast(1);
937 /// s.broadcast(1).await.unwrap();
938 /// assert!(s.close());
939 ///
940 /// assert_eq!(r.recv().await.unwrap(), 1);
941 /// assert_eq!(r.recv().await, Err(RecvError::Closed));
942 /// # });
943 /// ```
944 pub fn close(&self) -> bool {
945 self.inner.write().unwrap().close()
946 }
947
948 /// Returns `true` if the channel is closed.
949 ///
950 /// # Examples
951 ///
952 /// ```
953 /// # futures_lite::future::block_on(async {
954 /// use async_broadcast::{broadcast, RecvError};
955 ///
956 /// let (s, r) = broadcast::<()>(1);
957 /// assert!(!s.is_closed());
958 ///
959 /// drop(r);
960 /// assert!(s.is_closed());
961 /// # });
962 /// ```
963 pub fn is_closed(&self) -> bool {
964 self.inner.read().unwrap().is_closed
965 }
966
967 /// Returns `true` if the channel is empty.
968 ///
969 /// # Examples
970 ///
971 /// ```
972 /// # futures_lite::future::block_on(async {
973 /// use async_broadcast::broadcast;
974 ///
975 /// let (s, r) = broadcast(1);
976 ///
977 /// assert!(s.is_empty());
978 /// s.broadcast(1).await;
979 /// assert!(!s.is_empty());
980 /// # });
981 /// ```
982 pub fn is_empty(&self) -> bool {
983 self.inner.read().unwrap().queue.is_empty()
984 }
985
986 /// Returns `true` if the channel is full.
987 ///
988 /// # Examples
989 ///
990 /// ```
991 /// # futures_lite::future::block_on(async {
992 /// use async_broadcast::broadcast;
993 ///
994 /// let (s, r) = broadcast(1);
995 ///
996 /// assert!(!s.is_full());
997 /// s.broadcast(1).await;
998 /// assert!(s.is_full());
999 /// # });
1000 /// ```
1001 pub fn is_full(&self) -> bool {
1002 let inner = self.inner.read().unwrap();
1003
1004 inner.queue.len() == inner.capacity
1005 }
1006
1007 /// Returns the number of messages in the channel.
1008 ///
1009 /// # Examples
1010 ///
1011 /// ```
1012 /// # futures_lite::future::block_on(async {
1013 /// use async_broadcast::broadcast;
1014 ///
1015 /// let (s, r) = broadcast(2);
1016 /// assert_eq!(s.len(), 0);
1017 ///
1018 /// s.broadcast(1).await;
1019 /// s.broadcast(2).await;
1020 /// assert_eq!(s.len(), 2);
1021 /// # });
1022 /// ```
1023 pub fn len(&self) -> usize {
1024 self.inner.read().unwrap().queue.len()
1025 }
1026
1027 /// Returns the number of receivers for the channel.
1028 ///
1029 /// This does not include inactive receivers. Use [`Receiver::inactive_receiver_count`] if you
1030 /// are interested in that.
1031 ///
1032 /// # Examples
1033 ///
1034 /// ```
1035 /// use async_broadcast::broadcast;
1036 ///
1037 /// let (s, r) = broadcast::<()>(1);
1038 /// assert_eq!(s.receiver_count(), 1);
1039 /// let r = r.deactivate();
1040 /// assert_eq!(s.receiver_count(), 0);
1041 ///
1042 /// let r2 = r.activate_cloned();
1043 /// assert_eq!(r.receiver_count(), 1);
1044 /// assert_eq!(r.inactive_receiver_count(), 1);
1045 /// ```
1046 pub fn receiver_count(&self) -> usize {
1047 self.inner.read().unwrap().receiver_count
1048 }
1049
1050 /// Returns the number of inactive receivers for the channel.
1051 ///
1052 /// # Examples
1053 ///
1054 /// ```
1055 /// use async_broadcast::broadcast;
1056 ///
1057 /// let (s, r) = broadcast::<()>(1);
1058 /// assert_eq!(s.receiver_count(), 1);
1059 /// let r = r.deactivate();
1060 /// assert_eq!(s.receiver_count(), 0);
1061 ///
1062 /// let r2 = r.activate_cloned();
1063 /// assert_eq!(r.receiver_count(), 1);
1064 /// assert_eq!(r.inactive_receiver_count(), 1);
1065 /// ```
1066 pub fn inactive_receiver_count(&self) -> usize {
1067 self.inner.read().unwrap().inactive_receiver_count
1068 }
1069
1070 /// Returns the number of senders for the channel.
1071 ///
1072 /// # Examples
1073 ///
1074 /// ```
1075 /// # futures_lite::future::block_on(async {
1076 /// use async_broadcast::broadcast;
1077 ///
1078 /// let (s, r) = broadcast::<()>(1);
1079 /// assert_eq!(s.sender_count(), 1);
1080 ///
1081 /// let s2 = s.clone();
1082 /// assert_eq!(s.sender_count(), 2);
1083 /// # });
1084 /// ```
1085 pub fn sender_count(&self) -> usize {
1086 self.inner.read().unwrap().sender_count
1087 }
1088
1089 /// Downgrade to a [`InactiveReceiver`].
1090 ///
1091 /// An inactive receiver is one that can not and does not receive any messages. Its only purpose
1092 /// is keep the associated channel open even when there are no (active) receivers. An inactive
1093 /// receiver can be upgraded into a [`Receiver`] using [`InactiveReceiver::activate`] or
1094 /// [`InactiveReceiver::activate_cloned`].
1095 ///
1096 /// [`Sender::try_broadcast`] will return [`TrySendError::Inactive`] if only inactive
1097 /// receivers exists for the associated channel and [`Sender::broadcast`] will wait until an
1098 /// active receiver is available.
1099 ///
1100 /// # Examples
1101 ///
1102 /// ```
1103 /// # futures_lite::future::block_on(async {
1104 /// use async_broadcast::{broadcast, TrySendError};
1105 ///
1106 /// let (s, r) = broadcast(1);
1107 /// let inactive = r.deactivate();
1108 /// assert_eq!(s.try_broadcast(10), Err(TrySendError::Inactive(10)));
1109 ///
1110 /// let mut r = inactive.activate();
1111 /// assert_eq!(s.broadcast(10).await, Ok(None));
1112 /// assert_eq!(r.recv().await, Ok(10));
1113 /// # });
1114 /// ```
1115 pub fn deactivate(self) -> InactiveReceiver<T> {
1116 // Drop::drop impl of Receiver will take care of `receiver_count`.
1117 self.inner.write().unwrap().inactive_receiver_count += 1;
1118
1119 InactiveReceiver {
1120 inner: self.inner.clone(),
1121 }
1122 }
1123}
1124
1125impl<T: Clone> Receiver<T> {
1126 /// Receives a message from the channel.
1127 ///
1128 /// If the channel is empty, this method waits until there is a message.
1129 ///
1130 /// If the channel is closed, this method receives a message or returns an error if there are
1131 /// no more messages.
1132 ///
1133 /// If this receiver has missed a message (only possible if overflow mode is enabled), then
1134 /// this method returns an error and readjusts its cursor to point to the first available
1135 /// message.
1136 ///
1137 /// The future returned by this function is pinned to the heap. If the future being `Unpin` is
1138 /// not important to you, or if you just `.await` this future, use the [`recv_direct`] method
1139 /// instead.
1140 ///
1141 /// # Examples
1142 ///
1143 /// ```
1144 /// # futures_lite::future::block_on(async {
1145 /// use async_broadcast::{broadcast, RecvError};
1146 ///
1147 /// let (s, mut r1) = broadcast(1);
1148 /// let mut r2 = r1.clone();
1149 ///
1150 /// assert_eq!(s.broadcast(1).await, Ok(None));
1151 /// drop(s);
1152 ///
1153 /// assert_eq!(r1.recv().await, Ok(1));
1154 /// assert_eq!(r1.recv().await, Err(RecvError::Closed));
1155 /// assert_eq!(r2.recv().await, Ok(1));
1156 /// assert_eq!(r2.recv().await, Err(RecvError::Closed));
1157 /// # });
1158 /// ```
1159 pub fn recv(&mut self) -> Pin<Box<Recv<'_, T>>> {
1160 Box::pin(self.recv_direct())
1161 }
1162
1163 /// Receives a message from the channel without pinning the future to the heap.
1164 ///
1165 /// The future returned by this method is not `Unpin` and must be pinned before use. This is
1166 /// the desired behavior if you just `.await` on the future. For other uses cases, use the
1167 /// [`recv`] method instead.
1168 ///
1169 /// # Examples
1170 ///
1171 /// ```
1172 /// # futures_lite::future::block_on(async {
1173 /// use async_broadcast::{broadcast, RecvError};
1174 ///
1175 /// let (s, mut r1) = broadcast(1);
1176 /// let mut r2 = r1.clone();
1177 ///
1178 /// assert_eq!(s.broadcast(1).await, Ok(None));
1179 /// drop(s);
1180 ///
1181 /// assert_eq!(r1.recv_direct().await, Ok(1));
1182 /// assert_eq!(r1.recv_direct().await, Err(RecvError::Closed));
1183 /// assert_eq!(r2.recv_direct().await, Ok(1));
1184 /// assert_eq!(r2.recv_direct().await, Err(RecvError::Closed));
1185 /// # });
1186 /// ```
1187 pub fn recv_direct(&mut self) -> Recv<'_, T> {
1188 Recv::_new(RecvInner {
1189 receiver: self,
1190 listener: None,
1191 _pin: PhantomPinned,
1192 })
1193 }
1194
1195 /// Attempts to receive a message from the channel.
1196 ///
1197 /// If the channel is empty or closed, this method returns an error.
1198 ///
1199 /// If this receiver has missed a message (only possible if overflow mode is enabled), then
1200 /// this method returns an error and readjusts its cursor to point to the first available
1201 /// message.
1202 ///
1203 /// # Examples
1204 ///
1205 /// ```
1206 /// # futures_lite::future::block_on(async {
1207 /// use async_broadcast::{broadcast, TryRecvError};
1208 ///
1209 /// let (s, mut r1) = broadcast(1);
1210 /// let mut r2 = r1.clone();
1211 /// assert_eq!(s.broadcast(1).await, Ok(None));
1212 ///
1213 /// assert_eq!(r1.try_recv(), Ok(1));
1214 /// assert_eq!(r1.try_recv(), Err(TryRecvError::Empty));
1215 /// assert_eq!(r2.try_recv(), Ok(1));
1216 /// assert_eq!(r2.try_recv(), Err(TryRecvError::Empty));
1217 ///
1218 /// drop(s);
1219 /// assert_eq!(r1.try_recv(), Err(TryRecvError::Closed));
1220 /// assert_eq!(r2.try_recv(), Err(TryRecvError::Closed));
1221 /// # });
1222 /// ```
1223 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
1224 self.inner
1225 .write()
1226 .unwrap()
1227 .try_recv_at(&mut self.pos)
1228 .map(|cow| cow.unwrap_or_else(T::clone))
1229 }
1230
1231 /// Produce a new Sender for this channel.
1232 ///
1233 /// This will not re-open the channel if it was closed due to all senders being dropped.
1234 ///
1235 /// # Examples
1236 ///
1237 /// ```
1238 /// # futures_lite::future::block_on(async {
1239 /// use async_broadcast::{broadcast, RecvError};
1240 ///
1241 /// let (s1, mut r) = broadcast(2);
1242 ///
1243 /// assert_eq!(s1.broadcast(1).await, Ok(None));
1244 ///
1245 /// let mut s2 = r.new_sender();
1246 ///
1247 /// assert_eq!(s2.broadcast(2).await, Ok(None));
1248 /// drop(s1);
1249 /// drop(s2);
1250 ///
1251 /// assert_eq!(r.recv().await, Ok(1));
1252 /// assert_eq!(r.recv().await, Ok(2));
1253 /// assert_eq!(r.recv().await, Err(RecvError::Closed));
1254 /// # });
1255 /// ```
1256 pub fn new_sender(&self) -> Sender<T> {
1257 self.inner.write().unwrap().sender_count += 1;
1258
1259 Sender {
1260 inner: self.inner.clone(),
1261 }
1262 }
1263
1264 /// Produce a new Receiver for this channel.
1265 ///
1266 /// Unlike [`Receiver::clone`], this method creates a new receiver that starts with zero
1267 /// messages available. This is slightly faster than a real clone.
1268 ///
1269 /// # Examples
1270 ///
1271 /// ```
1272 /// # futures_lite::future::block_on(async {
1273 /// use async_broadcast::{broadcast, RecvError};
1274 ///
1275 /// let (s, mut r1) = broadcast(2);
1276 ///
1277 /// assert_eq!(s.broadcast(1).await, Ok(None));
1278 ///
1279 /// let mut r2 = r1.new_receiver();
1280 ///
1281 /// assert_eq!(s.broadcast(2).await, Ok(None));
1282 /// drop(s);
1283 ///
1284 /// assert_eq!(r1.recv().await, Ok(1));
1285 /// assert_eq!(r1.recv().await, Ok(2));
1286 /// assert_eq!(r1.recv().await, Err(RecvError::Closed));
1287 ///
1288 /// assert_eq!(r2.recv().await, Ok(2));
1289 /// assert_eq!(r2.recv().await, Err(RecvError::Closed));
1290 /// # });
1291 /// ```
1292 pub fn new_receiver(&self) -> Self {
1293 let mut inner = self.inner.write().unwrap();
1294 inner.receiver_count += 1;
1295 Receiver {
1296 inner: self.inner.clone(),
1297 pos: inner.head_pos + inner.queue.len() as u64,
1298 listener: None,
1299 }
1300 }
1301
1302 /// A low level poll method that is similar to [`Receiver::recv()`] or
1303 /// [`Receiver::recv_direct()`], and can be useful for building stream implementations which
1304 /// use a [`Receiver`] under the hood and want to know if the stream has overflowed.
1305 ///
1306 /// Prefer to use [`Receiver::recv()`] or [`Receiver::recv_direct()`] when otherwise possible.
1307 ///
1308 /// # Errors
1309 ///
1310 /// If the number of messages that have been sent has overflowed the channel capacity, a
1311 /// [`RecvError::Overflowed`] variant is returned containing the number of items that
1312 /// overflowed and were lost.
1313 ///
1314 /// # Examples
1315 ///
1316 /// This example shows how the [`Receiver::poll_recv`] method can be used to allow a custom
1317 /// stream implementation to internally make use of a [`Receiver`]. This example implementation
1318 /// differs from the stream implementation of [`Receiver`] because it returns an error if
1319 /// the channel capacity overflows, which the built in [`Receiver`] stream doesn't do.
1320 ///
1321 /// ```
1322 /// use futures_core::Stream;
1323 /// use async_broadcast::{Receiver, RecvError};
1324 /// use std::{pin::Pin, task::{Poll, Context}};
1325 ///
1326 /// struct MyStream(Receiver<i32>);
1327 ///
1328 /// impl futures_core::Stream for MyStream {
1329 /// type Item = Result<i32, RecvError>;
1330 /// fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1331 /// Pin::new(&mut self.0).poll_recv(cx)
1332 /// }
1333 /// }
1334 /// ```
1335 pub fn poll_recv(
1336 mut self: Pin<&mut Self>,
1337 cx: &mut Context<'_>,
1338 ) -> Poll<Option<Result<T, RecvError>>> {
1339 loop {
1340 // If this stream is listening for events, first wait for a notification.
1341 if let Some(listener) = self.listener.as_mut() {
1342 ready!(Pin::new(listener).poll(cx));
1343 self.listener = None;
1344 }
1345
1346 loop {
1347 // Attempt to receive a message.
1348 match self.try_recv() {
1349 Ok(msg) => {
1350 // The stream is not blocked on an event - drop the listener.
1351 self.listener = None;
1352 return Poll::Ready(Some(Ok(msg)));
1353 }
1354 Err(TryRecvError::Closed) => {
1355 // The stream is not blocked on an event - drop the listener.
1356 self.listener = None;
1357 return Poll::Ready(None);
1358 }
1359 Err(TryRecvError::Overflowed(n)) => {
1360 // The stream is not blocked on an event - drop the listener.
1361 self.listener = None;
1362 return Poll::Ready(Some(Err(RecvError::Overflowed(n))));
1363 }
1364 Err(TryRecvError::Empty) => {}
1365 }
1366
1367 // Receiving failed - now start listening for notifications or wait for one.
1368 match self.listener.as_mut() {
1369 None => {
1370 // Start listening and then try receiving again.
1371 self.listener = {
1372 let inner = self.inner.write().unwrap();
1373 Some(inner.recv_ops.listen())
1374 };
1375 }
1376 Some(_) => {
1377 // Go back to the outer loop to poll the listener.
1378 break;
1379 }
1380 }
1381 }
1382 }
1383 }
1384}
1385
1386impl<T> Drop for Receiver<T> {
1387 fn drop(&mut self) {
1388 let mut inner = self.inner.write().unwrap();
1389
1390 // Remove ourself from each item's counter
1391 loop {
1392 match inner.try_recv_at(&mut self.pos) {
1393 Ok(_) => continue,
1394 Err(TryRecvError::Overflowed(_)) => continue,
1395 Err(TryRecvError::Closed) => break,
1396 Err(TryRecvError::Empty) => break,
1397 }
1398 }
1399
1400 inner.receiver_count -= 1;
1401
1402 inner.close_channel();
1403 }
1404}
1405
1406impl<T> Clone for Receiver<T> {
1407 /// Produce a clone of this Receiver that has the same messages queued.
1408 ///
1409 /// # Examples
1410 ///
1411 /// ```
1412 /// # futures_lite::future::block_on(async {
1413 /// use async_broadcast::{broadcast, RecvError};
1414 ///
1415 /// let (s, mut r1) = broadcast(1);
1416 ///
1417 /// assert_eq!(s.broadcast(1).await, Ok(None));
1418 /// drop(s);
1419 ///
1420 /// let mut r2 = r1.clone();
1421 ///
1422 /// assert_eq!(r1.recv().await, Ok(1));
1423 /// assert_eq!(r1.recv().await, Err(RecvError::Closed));
1424 /// assert_eq!(r2.recv().await, Ok(1));
1425 /// assert_eq!(r2.recv().await, Err(RecvError::Closed));
1426 /// # });
1427 /// ```
1428 fn clone(&self) -> Self {
1429 let mut inner = self.inner.write().unwrap();
1430 inner.receiver_count += 1;
1431 // increment the waiter count on all items not yet received by this object
1432 let n = self.pos.saturating_sub(inner.head_pos) as usize;
1433 for (_elt, waiters) in inner.queue.iter_mut().skip(n) {
1434 *waiters += 1;
1435 }
1436 Receiver {
1437 inner: self.inner.clone(),
1438 pos: self.pos,
1439 listener: None,
1440 }
1441 }
1442}
1443
1444impl<T: Clone> Stream for Receiver<T> {
1445 type Item = T;
1446
1447 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1448 loop {
1449 match ready!(self.as_mut().poll_recv(cx)) {
1450 Some(Ok(val)) => return Poll::Ready(Some(val)),
1451 // If overflowed, we expect future operations to succeed so try again.
1452 Some(Err(RecvError::Overflowed(_))) => continue,
1453 // RecvError::Closed should never appear here, but handle it anyway.
1454 None | Some(Err(RecvError::Closed)) => return Poll::Ready(None),
1455 }
1456 }
1457 }
1458}
1459
1460impl<T: Clone> futures_core::stream::FusedStream for Receiver<T> {
1461 fn is_terminated(&self) -> bool {
1462 let inner = self.inner.read().unwrap();
1463
1464 inner.is_closed && inner.queue.is_empty()
1465 }
1466}
1467
1468/// An error returned from [`Sender::broadcast()`].
1469///
1470/// Received because the channel is closed or no active receivers were present while `await-active`
1471/// was set to `false` (See [`Sender::set_await_active`] for details).
1472#[derive(PartialEq, Eq, Clone, Copy)]
1473pub struct SendError<T>(pub T);
1474
1475impl<T> SendError<T> {
1476 /// Unwraps the message that couldn't be sent.
1477 pub fn into_inner(self) -> T {
1478 self.0
1479 }
1480}
1481
1482impl<T> error::Error for SendError<T> {}
1483
1484impl<T> fmt::Debug for SendError<T> {
1485 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1486 write!(f, "SendError(..)")
1487 }
1488}
1489
1490impl<T> fmt::Display for SendError<T> {
1491 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1492 write!(f, "sending into a closed channel")
1493 }
1494}
1495
1496/// An error returned from [`Sender::try_broadcast()`].
1497#[derive(PartialEq, Eq, Clone, Copy)]
1498pub enum TrySendError<T> {
1499 /// The channel is full but not closed.
1500 Full(T),
1501
1502 /// The channel is closed.
1503 Closed(T),
1504
1505 /// There are currently no active receivers, only inactive ones.
1506 Inactive(T),
1507}
1508
1509impl<T> TrySendError<T> {
1510 /// Unwraps the message that couldn't be sent.
1511 pub fn into_inner(self) -> T {
1512 match self {
1513 TrySendError::Full(t) => t,
1514 TrySendError::Closed(t) => t,
1515 TrySendError::Inactive(t) => t,
1516 }
1517 }
1518
1519 /// Returns `true` if the channel is full but not closed.
1520 pub fn is_full(&self) -> bool {
1521 match self {
1522 TrySendError::Full(_) => true,
1523 TrySendError::Closed(_) | TrySendError::Inactive(_) => false,
1524 }
1525 }
1526
1527 /// Returns `true` if the channel is closed.
1528 pub fn is_closed(&self) -> bool {
1529 match self {
1530 TrySendError::Full(_) | TrySendError::Inactive(_) => false,
1531 TrySendError::Closed(_) => true,
1532 }
1533 }
1534
1535 /// Returns `true` if there are currently no active receivers, only inactive ones.
1536 pub fn is_disconnected(&self) -> bool {
1537 match self {
1538 TrySendError::Full(_) | TrySendError::Closed(_) => false,
1539 TrySendError::Inactive(_) => true,
1540 }
1541 }
1542}
1543
1544impl<T> error::Error for TrySendError<T> {}
1545
1546impl<T> fmt::Debug for TrySendError<T> {
1547 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1548 match *self {
1549 TrySendError::Full(..) => write!(f, "Full(..)"),
1550 TrySendError::Closed(..) => write!(f, "Closed(..)"),
1551 TrySendError::Inactive(..) => write!(f, "Inactive(..)"),
1552 }
1553 }
1554}
1555
1556impl<T> fmt::Display for TrySendError<T> {
1557 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1558 match *self {
1559 TrySendError::Full(..) => write!(f, "sending into a full channel"),
1560 TrySendError::Closed(..) => write!(f, "sending into a closed channel"),
1561 TrySendError::Inactive(..) => write!(f, "sending into the void (no active receivers)"),
1562 }
1563 }
1564}
1565
1566/// An error returned from [`Receiver::recv()`].
1567#[derive(PartialEq, Eq, Clone, Copy, Debug)]
1568pub enum RecvError {
1569 /// The channel has overflowed since the last element was seen. Future recv operations will
1570 /// succeed, but some messages have been skipped.
1571 ///
1572 /// Contains the number of messages missed.
1573 Overflowed(u64),
1574
1575 /// The channel is empty and closed.
1576 Closed,
1577}
1578
1579impl error::Error for RecvError {}
1580
1581impl fmt::Display for RecvError {
1582 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1583 match self {
1584 Self::Overflowed(n) => write!(f, "receiving skipped {} messages", n),
1585 Self::Closed => write!(f, "receiving from an empty and closed channel"),
1586 }
1587 }
1588}
1589
1590/// An error returned from [`Receiver::try_recv()`].
1591#[derive(PartialEq, Eq, Clone, Copy, Debug)]
1592pub enum TryRecvError {
1593 /// The channel has overflowed since the last element was seen. Future recv operations will
1594 /// succeed, but some messages have been skipped.
1595 Overflowed(u64),
1596
1597 /// The channel is empty but not closed.
1598 Empty,
1599
1600 /// The channel is empty and closed.
1601 Closed,
1602}
1603
1604impl TryRecvError {
1605 /// Returns `true` if the channel is empty but not closed.
1606 pub fn is_empty(&self) -> bool {
1607 match self {
1608 TryRecvError::Empty => true,
1609 TryRecvError::Closed => false,
1610 TryRecvError::Overflowed(_) => false,
1611 }
1612 }
1613
1614 /// Returns `true` if the channel is empty and closed.
1615 pub fn is_closed(&self) -> bool {
1616 match self {
1617 TryRecvError::Empty => false,
1618 TryRecvError::Closed => true,
1619 TryRecvError::Overflowed(_) => false,
1620 }
1621 }
1622
1623 /// Returns `true` if this error indicates the receiver missed messages.
1624 pub fn is_overflowed(&self) -> bool {
1625 match self {
1626 TryRecvError::Empty => false,
1627 TryRecvError::Closed => false,
1628 TryRecvError::Overflowed(_) => true,
1629 }
1630 }
1631}
1632
1633impl error::Error for TryRecvError {}
1634
1635impl fmt::Display for TryRecvError {
1636 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1637 match *self {
1638 TryRecvError::Empty => write!(f, "receiving from an empty channel"),
1639 TryRecvError::Closed => write!(f, "receiving from an empty and closed channel"),
1640 TryRecvError::Overflowed(n) => {
1641 write!(f, "receiving operation observed {} lost messages", n)
1642 }
1643 }
1644 }
1645}
1646
1647easy_wrapper! {
1648 /// A future returned by [`Sender::broadcast()`].
1649 #[derive(Debug)]
1650 #[must_use = "futures do nothing unless .awaited"]
1651 pub struct Send<'a, T: Clone>(SendInner<'a, T> => Result<Option<T>, SendError<T>>);
1652 pub(crate) wait();
1653}
1654
1655pin_project! {
1656 #[derive(Debug)]
1657 struct SendInner<'a, T> {
1658 sender: &'a Sender<T>,
1659 listener: Option<EventListener>,
1660 msg: Option<T>,
1661
1662 // Keeping this type `!Unpin` enables future optimizations.
1663 #[pin]
1664 _pin: PhantomPinned
1665 }
1666}
1667
1668impl<'a, T: Clone> EventListenerFuture for SendInner<'a, T> {
1669 type Output = Result<Option<T>, SendError<T>>;
1670
1671 fn poll_with_strategy<'x, S: event_listener_strategy::Strategy<'x>>(
1672 self: Pin<&mut Self>,
1673 strategy: &mut S,
1674 context: &mut S::Context,
1675 ) -> Poll<Self::Output> {
1676 let this = self.project();
1677
1678 loop {
1679 let msg = this.msg.take().unwrap();
1680 let inner = &this.sender.inner;
1681
1682 // Attempt to send a message.
1683 match this.sender.try_broadcast(msg) {
1684 Ok(msg) => {
1685 let inner = inner.write().unwrap();
1686
1687 if inner.queue.len() < inner.capacity {
1688 // Not full still, so notify the next awaiting sender.
1689 inner.send_ops.notify(1);
1690 }
1691
1692 return Poll::Ready(Ok(msg));
1693 }
1694 Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))),
1695 Err(TrySendError::Full(m)) => *this.msg = Some(m),
1696 Err(TrySendError::Inactive(m)) if inner.read().unwrap().await_active => {
1697 *this.msg = Some(m)
1698 }
1699 Err(TrySendError::Inactive(m)) => return Poll::Ready(Err(SendError(m))),
1700 }
1701
1702 // Sending failed - now start listening for notifications or wait for one.
1703 match &this.listener {
1704 None => {
1705 // Start listening and then try sending again.
1706 let inner = inner.write().unwrap();
1707 *this.listener = Some(inner.send_ops.listen());
1708 }
1709 Some(_) => {
1710 // Wait for a notification.
1711 ready!(strategy.poll(this.listener, context));
1712 *this.listener = None;
1713 }
1714 }
1715 }
1716 }
1717}
1718
1719easy_wrapper! {
1720 /// A future returned by [`Receiver::recv()`].
1721 #[derive(Debug)]
1722 #[must_use = "futures do nothing unless .awaited"]
1723 pub struct Recv<'a, T: Clone>(RecvInner<'a, T> => Result<T, RecvError>);
1724 pub(crate) wait();
1725}
1726
1727pin_project! {
1728 #[derive(Debug)]
1729 struct RecvInner<'a, T> {
1730 receiver: &'a mut Receiver<T>,
1731 listener: Option<EventListener>,
1732
1733 // Keeping this type `!Unpin` enables future optimizations.
1734 #[pin]
1735 _pin: PhantomPinned
1736 }
1737}
1738
1739impl<'a, T: Clone> EventListenerFuture for RecvInner<'a, T> {
1740 type Output = Result<T, RecvError>;
1741
1742 fn poll_with_strategy<'x, S: event_listener_strategy::Strategy<'x>>(
1743 self: Pin<&mut Self>,
1744 strategy: &mut S,
1745 context: &mut S::Context,
1746 ) -> Poll<Self::Output> {
1747 let this = self.project();
1748
1749 loop {
1750 // Attempt to receive a message.
1751 match this.receiver.try_recv() {
1752 Ok(msg) => return Poll::Ready(Ok(msg)),
1753 Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError::Closed)),
1754 Err(TryRecvError::Overflowed(n)) => {
1755 return Poll::Ready(Err(RecvError::Overflowed(n)));
1756 }
1757 Err(TryRecvError::Empty) => {}
1758 }
1759
1760 // Receiving failed - now start listening for notifications or wait for one.
1761 match &this.listener {
1762 None => {
1763 // Start listening and then try receiving again.
1764 *this.listener = {
1765 let inner = this.receiver.inner.write().unwrap();
1766 Some(inner.recv_ops.listen())
1767 };
1768 }
1769 Some(_) => {
1770 // Wait for a notification.
1771 ready!(strategy.poll(this.listener, context));
1772 *this.listener = None;
1773 }
1774 }
1775 }
1776 }
1777}
1778
1779/// An inactive receiver.
1780///
1781/// An inactive receiver is a receiver that is unable to receive messages. It's only useful for
1782/// keeping a channel open even when no associated active receivers exist.
1783#[derive(Debug)]
1784pub struct InactiveReceiver<T> {
1785 inner: Arc<RwLock<Inner<T>>>,
1786}
1787
1788impl<T> InactiveReceiver<T> {
1789 /// Convert to an activate [`Receiver`].
1790 ///
1791 /// Consumes `self`. Use [`InactiveReceiver::activate_cloned`] if you want to keep `self`.
1792 ///
1793 /// # Examples
1794 ///
1795 /// ```
1796 /// use async_broadcast::{broadcast, TrySendError};
1797 ///
1798 /// let (s, r) = broadcast(1);
1799 /// let inactive = r.deactivate();
1800 /// assert_eq!(s.try_broadcast(10), Err(TrySendError::Inactive(10)));
1801 ///
1802 /// let mut r = inactive.activate();
1803 /// assert_eq!(s.try_broadcast(10), Ok(None));
1804 /// assert_eq!(r.try_recv(), Ok(10));
1805 /// ```
1806 pub fn activate(self) -> Receiver<T> {
1807 self.activate_cloned()
1808 }
1809
1810 /// Create an activate [`Receiver`] for the associated channel.
1811 ///
1812 /// # Examples
1813 ///
1814 /// ```
1815 /// use async_broadcast::{broadcast, TrySendError};
1816 ///
1817 /// let (s, r) = broadcast(1);
1818 /// let inactive = r.deactivate();
1819 /// assert_eq!(s.try_broadcast(10), Err(TrySendError::Inactive(10)));
1820 ///
1821 /// let mut r = inactive.activate_cloned();
1822 /// assert_eq!(s.try_broadcast(10), Ok(None));
1823 /// assert_eq!(r.try_recv(), Ok(10));
1824 /// ```
1825 pub fn activate_cloned(&self) -> Receiver<T> {
1826 let mut inner = self.inner.write().unwrap();
1827 inner.receiver_count += 1;
1828
1829 if inner.receiver_count == 1 {
1830 // Notify 1 awaiting senders that there is now a receiver. If there is still room in the
1831 // queue, the notified operation will notify another awaiting sender.
1832 inner.send_ops.notify(1);
1833 }
1834
1835 Receiver {
1836 inner: self.inner.clone(),
1837 pos: inner.head_pos + inner.queue.len() as u64,
1838 listener: None,
1839 }
1840 }
1841
1842 /// Returns the channel capacity.
1843 ///
1844 /// See [`Receiver::capacity`] documentation for examples.
1845 pub fn capacity(&self) -> usize {
1846 self.inner.read().unwrap().capacity
1847 }
1848
1849 /// Set the channel capacity.
1850 ///
1851 /// There are times when you need to change the channel's capacity after creating it. If the
1852 /// `new_cap` is less than the number of messages in the channel, the oldest messages will be
1853 /// dropped to shrink the channel.
1854 ///
1855 /// See [`Receiver::set_capacity`] documentation for examples.
1856 pub fn set_capacity(&mut self, new_cap: usize) {
1857 self.inner.write().unwrap().set_capacity(new_cap);
1858 }
1859
1860 /// If overflow mode is enabled on this channel.
1861 ///
1862 /// See [`Receiver::overflow`] documentation for examples.
1863 pub fn overflow(&self) -> bool {
1864 self.inner.read().unwrap().overflow
1865 }
1866
1867 /// Set overflow mode on the channel.
1868 ///
1869 /// When overflow mode is set, broadcasting to the channel will succeed even if the channel is
1870 /// full. It achieves that by removing the oldest message from the channel.
1871 ///
1872 /// See [`Receiver::set_overflow`] documentation for examples.
1873 pub fn set_overflow(&mut self, overflow: bool) {
1874 self.inner.write().unwrap().overflow = overflow;
1875 }
1876
1877 /// If sender will wait for active receivers.
1878 ///
1879 /// If set to `false`, [`Send`] will resolve immediately with a [`SendError`]. Defaults to
1880 /// `true`.
1881 ///
1882 /// # Examples
1883 ///
1884 /// ```
1885 /// use async_broadcast::broadcast;
1886 ///
1887 /// let (_, r) = broadcast::<i32>(5);
1888 /// let r = r.deactivate();
1889 /// assert!(r.await_active());
1890 /// ```
1891 pub fn await_active(&self) -> bool {
1892 self.inner.read().unwrap().await_active
1893 }
1894
1895 /// Specify if sender will wait for active receivers.
1896 ///
1897 /// If set to `false`, [`Send`] will resolve immediately with a [`SendError`]. Defaults to
1898 /// `true`.
1899 ///
1900 /// # Examples
1901 ///
1902 /// ```
1903 /// # futures_lite::future::block_on(async {
1904 /// use async_broadcast::broadcast;
1905 ///
1906 /// let (s, r) = broadcast::<i32>(2);
1907 /// s.broadcast(1).await.unwrap();
1908 ///
1909 /// let mut r = r.deactivate();
1910 /// r.set_await_active(false);
1911 /// assert!(s.broadcast(2).await.is_err());
1912 /// # });
1913 /// ```
1914 pub fn set_await_active(&mut self, await_active: bool) {
1915 self.inner.write().unwrap().await_active = await_active;
1916 }
1917
1918 /// Closes the channel.
1919 ///
1920 /// Returns `true` if this call has closed the channel and it was not closed already.
1921 ///
1922 /// The remaining messages can still be received.
1923 ///
1924 /// See [`Receiver::close`] documentation for examples.
1925 pub fn close(&self) -> bool {
1926 self.inner.write().unwrap().close()
1927 }
1928
1929 /// Returns `true` if the channel is closed.
1930 ///
1931 /// See [`Receiver::is_closed`] documentation for examples.
1932 pub fn is_closed(&self) -> bool {
1933 self.inner.read().unwrap().is_closed
1934 }
1935
1936 /// Returns `true` if the channel is empty.
1937 ///
1938 /// See [`Receiver::is_empty`] documentation for examples.
1939 pub fn is_empty(&self) -> bool {
1940 self.inner.read().unwrap().queue.is_empty()
1941 }
1942
1943 /// Returns `true` if the channel is full.
1944 ///
1945 /// See [`Receiver::is_full`] documentation for examples.
1946 pub fn is_full(&self) -> bool {
1947 let inner = self.inner.read().unwrap();
1948
1949 inner.queue.len() == inner.capacity
1950 }
1951
1952 /// Returns the number of messages in the channel.
1953 ///
1954 /// See [`Receiver::len`] documentation for examples.
1955 pub fn len(&self) -> usize {
1956 self.inner.read().unwrap().queue.len()
1957 }
1958
1959 /// Returns the number of receivers for the channel.
1960 ///
1961 /// This does not include inactive receivers. Use [`InactiveReceiver::inactive_receiver_count`]
1962 /// if you're interested in that.
1963 ///
1964 /// # Examples
1965 ///
1966 /// ```
1967 /// use async_broadcast::broadcast;
1968 ///
1969 /// let (s, r) = broadcast::<()>(1);
1970 /// assert_eq!(s.receiver_count(), 1);
1971 /// let r = r.deactivate();
1972 /// assert_eq!(s.receiver_count(), 0);
1973 ///
1974 /// let r2 = r.activate_cloned();
1975 /// assert_eq!(r.receiver_count(), 1);
1976 /// assert_eq!(r.inactive_receiver_count(), 1);
1977 /// ```
1978 pub fn receiver_count(&self) -> usize {
1979 self.inner.read().unwrap().receiver_count
1980 }
1981
1982 /// Returns the number of inactive receivers for the channel.
1983 ///
1984 /// # Examples
1985 ///
1986 /// ```
1987 /// use async_broadcast::broadcast;
1988 ///
1989 /// let (s, r) = broadcast::<()>(1);
1990 /// assert_eq!(s.receiver_count(), 1);
1991 /// let r = r.deactivate();
1992 /// assert_eq!(s.receiver_count(), 0);
1993 ///
1994 /// let r2 = r.activate_cloned();
1995 /// assert_eq!(r.receiver_count(), 1);
1996 /// assert_eq!(r.inactive_receiver_count(), 1);
1997 /// ```
1998 pub fn inactive_receiver_count(&self) -> usize {
1999 self.inner.read().unwrap().inactive_receiver_count
2000 }
2001
2002 /// Returns the number of senders for the channel.
2003 ///
2004 /// See [`Receiver::sender_count`] documentation for examples.
2005 pub fn sender_count(&self) -> usize {
2006 self.inner.read().unwrap().sender_count
2007 }
2008}
2009
2010impl<T> Clone for InactiveReceiver<T> {
2011 fn clone(&self) -> Self {
2012 self.inner.write().unwrap().inactive_receiver_count += 1;
2013
2014 InactiveReceiver {
2015 inner: self.inner.clone(),
2016 }
2017 }
2018}
2019
2020impl<T> Drop for InactiveReceiver<T> {
2021 fn drop(&mut self) {
2022 let mut inner = self.inner.write().unwrap();
2023
2024 inner.inactive_receiver_count -= 1;
2025 inner.close_channel();
2026 }
2027}