mz_ore/channel/trigger.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Trigger channels.
17//!
18//! Trigger channels are a very simple form of channel that communicate only one
19//! bit of information: whether the sending half of the channel is open or
20//! closed.
21//!
22//! Here's an example of using a trigger channel to trigger work on a background
23//! task.
24//!
25//! ```
26//! # tokio_test::block_on(async {
27//! use mz_ore::channel::trigger;
28//!
29//! // Create a new trigger channel.
30//! let (trigger, trigger_rx) = trigger::channel();
31//!
32//! // Spawn a task to do some background work, but only once triggered.
33//! tokio::spawn(async {
34//! // Wait for trigger to get dropped.
35//! trigger_rx.await;
36//!
37//! // Do background work.
38//! });
39//!
40//! // Do some prep work.
41//!
42//! // Fire `trigger` by dropping it.
43//! drop(trigger);
44//! # })
45//! ```
46//!
47//! A trigger channel never errors. It is not an error to drop the receiver
48//! before dropping the corresponding trigger.
49//!
50//! Trigger channels can be easily simulated with [`tokio::sync::oneshot`]
51//! channels (and in fact the implementation uses oneshot channels under the
52//! hood). However, using trigger channels can result in clearer code when the
53//! additional features of oneshot channels are not required, as trigger
54//! channels have a simpler API.
55
56use std::future::Future;
57use std::pin::Pin;
58use std::task::{Context, Poll, ready};
59
60use futures::FutureExt;
61use tokio::sync::oneshot;
62use tokio::sync::oneshot::error::TryRecvError;
63
64/// The sending half of a trigger channel.
65///
66/// Dropping the trigger will cause the receiver to resolve.
67#[derive(Debug)]
68pub struct Trigger {
69 _tx: oneshot::Sender<()>,
70}
71
72impl Trigger {
73 /// Fire this [Trigger].
74 ///
75 /// NOTE: Dropping the trigger also fires it, but this method allows
76 /// call-sites to be more explicit.
77 pub fn fire(self) {
78 // Dropping the Trigger is what fires the oneshot.
79 }
80}
81
82/// The receiving half of a trigger channel.
83///
84/// Awaiting the receiver will block until the trigger is dropped.
85#[derive(Debug)]
86pub struct Receiver {
87 rx: oneshot::Receiver<()>,
88}
89
90impl Receiver {
91 /// Reports whether the channel has been triggered.
92 pub fn is_triggered(&mut self) -> bool {
93 matches!(self.rx.try_recv(), Err(TryRecvError::Closed))
94 }
95}
96
97impl Future for Receiver {
98 type Output = ();
99
100 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
101 let _ = ready!(self.rx.poll_unpin(cx));
102 Poll::Ready(())
103 }
104}
105
106/// Creates a new trigger channel.
107pub fn channel() -> (Trigger, Receiver) {
108 let (tx, rx) = oneshot::channel();
109 let trigger = Trigger { _tx: tx };
110 let trigger_rx = Receiver { rx };
111 (trigger, trigger_rx)
112}
113
114#[cfg(test)]
115mod tests {
116 use crate::channel::trigger;
117
118 #[cfg_attr(miri, ignore)] // error: unsupported operation: returning ready events from epoll_wait is not yet implemented
119 #[mz_ore::test(tokio::test)]
120 async fn test_trigger_channel() {
121 let (trigger1, mut trigger1_rx) = trigger::channel();
122 let (trigger2, trigger2_rx) = trigger::channel();
123
124 crate::task::spawn(|| "test_trigger_channel", async move {
125 assert!(!trigger1_rx.is_triggered());
126 (&mut trigger1_rx).await;
127 assert!(trigger1_rx.is_triggered());
128 drop(trigger2);
129 });
130
131 drop(trigger1);
132 trigger2_rx.await;
133 }
134}