mz_ore/channel/
trigger.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Trigger channels.
17//!
18//! Trigger channels are a very simple form of channel that communicate only one
19//! bit of information: whether the sending half of the channel is open or
20//! closed.
21//!
22//! Here's an example of using a trigger channel to trigger work on a background
23//! task.
24//!
25//! ```
26//! # tokio_test::block_on(async {
27//! use mz_ore::channel::trigger;
28//!
29//! // Create a new trigger channel.
30//! let (trigger, trigger_rx) = trigger::channel();
31//!
32//! // Spawn a task to do some background work, but only once triggered.
33//! tokio::spawn(async {
34//!     // Wait for trigger to get dropped.
35//!     trigger_rx.await;
36//!
37//!     // Do background work.
38//! });
39//!
40//! // Do some prep work.
41//!
42//! // Fire `trigger` by dropping it.
43//! drop(trigger);
44//! # })
45//! ```
46//!
47//! A trigger channel never errors. It is not an error to drop the receiver
48//! before dropping the corresponding trigger.
49//!
50//! Trigger channels can be easily simulated with [`tokio::sync::oneshot`]
51//! channels (and in fact the implementation uses oneshot channels under the
52//! hood). However, using trigger channels can result in clearer code when the
53//! additional features of oneshot channels are not required, as trigger
54//! channels have a simpler API.
55
56use std::future::Future;
57use std::pin::Pin;
58use std::task::{Context, Poll, ready};
59
60use futures::FutureExt;
61use tokio::sync::oneshot;
62use tokio::sync::oneshot::error::TryRecvError;
63
64/// The sending half of a trigger channel.
65///
66/// Dropping the trigger will cause the receiver to resolve.
67#[derive(Debug)]
68pub struct Trigger {
69    _tx: oneshot::Sender<()>,
70}
71
72impl Trigger {
73    /// Fire this [Trigger].
74    ///
75    /// NOTE: Dropping the trigger also fires it, but this method allows
76    /// call-sites to be more explicit.
77    pub fn fire(self) {
78        // Dropping the Trigger is what fires the oneshot.
79    }
80}
81
82/// The receiving half of a trigger channel.
83///
84/// Awaiting the receiver will block until the trigger is dropped.
85#[derive(Debug)]
86pub struct Receiver {
87    rx: oneshot::Receiver<()>,
88}
89
90impl Receiver {
91    /// Reports whether the channel has been triggered.
92    pub fn is_triggered(&mut self) -> bool {
93        matches!(self.rx.try_recv(), Err(TryRecvError::Closed))
94    }
95}
96
97impl Future for Receiver {
98    type Output = ();
99
100    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
101        let _ = ready!(self.rx.poll_unpin(cx));
102        Poll::Ready(())
103    }
104}
105
106/// Creates a new trigger channel.
107pub fn channel() -> (Trigger, Receiver) {
108    let (tx, rx) = oneshot::channel();
109    let trigger = Trigger { _tx: tx };
110    let trigger_rx = Receiver { rx };
111    (trigger, trigger_rx)
112}
113
114#[cfg(test)]
115mod tests {
116    use crate::channel::trigger;
117
118    #[cfg_attr(miri, ignore)] // error: unsupported operation: returning ready events from epoll_wait is not yet implemented
119    #[mz_ore::test(tokio::test)]
120    async fn test_trigger_channel() {
121        let (trigger1, mut trigger1_rx) = trigger::channel();
122        let (trigger2, trigger2_rx) = trigger::channel();
123
124        crate::task::spawn(|| "test_trigger_channel", async move {
125            assert!(!trigger1_rx.is_triggered());
126            (&mut trigger1_rx).await;
127            assert!(trigger1_rx.is_triggered());
128            drop(trigger2);
129        });
130
131        drop(trigger1);
132        trigger2_rx.await;
133    }
134}