mz_timestamp_oracle/
batching_oracle.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A timestamp oracle that wraps a `TimestampOracle` and batches calls
11//! to it.
12
13use std::sync::Arc;
14
15use async_trait::async_trait;
16use mz_ore::cast::CastFrom;
17use tokio::sync::mpsc::UnboundedSender;
18use tokio::sync::oneshot;
19
20use crate::metrics::Metrics;
21use crate::{TimestampOracle, WriteTimestamp};
22
23/// A batching [`TimestampOracle`] backed by a [`TimestampOracle`]
24///
25/// This will only batch calls to `read_ts` because the rest of the system
26/// already naturally does batching of write-related calls via the group commit
27/// mechanism. Write-related calls are passed straight through to the backing
28/// oracle.
29///
30/// For `read_ts` calls, we have to be careful to never cache results from the
31/// backing oracle: for the timestamp to be linearized we can never return a
32/// result as of an earlier moment, but batching them up is correct because this
33/// can only make it so that we return later timestamps. Those later timestamps
34/// still fall within the duration of the `read_ts` call and so are linearized.
35pub struct BatchingTimestampOracle<T> {
36    inner: Arc<dyn TimestampOracle<T> + Send + Sync>,
37    command_tx: UnboundedSender<Command<T>>,
38}
39
40/// A command on the internal batching command stream.
41enum Command<T> {
42    ReadTs(oneshot::Sender<T>),
43}
44
45impl<T> std::fmt::Debug for BatchingTimestampOracle<T> {
46    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47        f.debug_struct("BatchingTimestampOracle").finish()
48    }
49}
50
51impl<T> BatchingTimestampOracle<T>
52where
53    T: Clone + Send + Sync + 'static,
54{
55    /// Crates a [`BatchingTimestampOracle`] that uses the given inner oracle.
56    pub fn new(metrics: Arc<Metrics>, oracle: Arc<dyn TimestampOracle<T> + Send + Sync>) -> Self {
57        let (command_tx, mut command_rx) = tokio::sync::mpsc::unbounded_channel();
58
59        let task_oracle = Arc::clone(&oracle);
60
61        mz_ore::task::spawn(|| "BatchingTimestampOracle Worker Task", async move {
62            let read_ts_metrics = &metrics.batching.read_ts;
63
64            // See comment on `BatchingTimestampOracle` for why this batching is
65            // correct.
66            while let Some(cmd) = command_rx.recv().await {
67                let mut pending_cmds = vec![cmd];
68                while let Ok(cmd) = command_rx.try_recv() {
69                    pending_cmds.push(cmd);
70                }
71
72                read_ts_metrics
73                    .ops_count
74                    .inc_by(u64::cast_from(pending_cmds.len()));
75                read_ts_metrics.batches_count.inc();
76
77                let ts = task_oracle.read_ts().await;
78                for cmd in pending_cmds {
79                    match cmd {
80                        Command::ReadTs(response_tx) => {
81                            // It's okay if the receiver drops, just means
82                            // they're not interested anymore.
83                            let _ = response_tx.send(ts.clone());
84                        }
85                    }
86                }
87            }
88
89            tracing::debug!("shutting down BatchingTimestampOracle task");
90        });
91
92        Self {
93            inner: oracle,
94            command_tx,
95        }
96    }
97}
98
99#[async_trait]
100impl<T> TimestampOracle<T> for BatchingTimestampOracle<T>
101where
102    T: Send + Sync,
103{
104    async fn write_ts(&self) -> WriteTimestamp<T> {
105        self.inner.write_ts().await
106    }
107
108    async fn peek_write_ts(&self) -> T {
109        self.inner.peek_write_ts().await
110    }
111
112    async fn read_ts(&self) -> T {
113        let (tx, rx) = oneshot::channel();
114
115        self.command_tx.send(Command::ReadTs(tx)).expect(
116            "worker task cannot stop while we still have senders for the command/request channel",
117        );
118
119        rx.await
120            .expect("worker task cannot stop while there are outstanding commands/requests")
121    }
122
123    async fn apply_write(&self, write_ts: T) {
124        self.inner.apply_write(write_ts).await
125    }
126}
127
128#[cfg(test)]
129mod tests {
130
131    use mz_ore::metrics::MetricsRegistry;
132    use mz_repr::Timestamp;
133    use tracing::info;
134
135    use crate::postgres_oracle::{PostgresTimestampOracle, PostgresTimestampOracleConfig};
136
137    use super::*;
138
139    #[mz_ore::test(tokio::test)]
140    #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
141    async fn test_batching_timestamp_oracle() -> Result<(), anyhow::Error> {
142        let config = match PostgresTimestampOracleConfig::new_for_test() {
143            Some(config) => config,
144            None => {
145                info!(
146                    "{} env not set: skipping test that uses external service",
147                    PostgresTimestampOracleConfig::EXTERNAL_TESTS_POSTGRES_URL
148                );
149                return Ok(());
150            }
151        };
152        let metrics = Arc::new(Metrics::new(&MetricsRegistry::new()));
153
154        crate::tests::timestamp_oracle_impl_test(|timeline, now_fn, initial_ts| {
155            // We use the postgres oracle as the backing oracle.
156            let pg_oracle = PostgresTimestampOracle::open(
157                config.clone(),
158                timeline,
159                initial_ts,
160                now_fn,
161                false, /* read-only */
162            );
163
164            async {
165                let arced_pg_oracle: Arc<dyn TimestampOracle<Timestamp> + Send + Sync> =
166                    Arc::new(pg_oracle.await);
167
168                let batching_oracle =
169                    BatchingTimestampOracle::new(Arc::clone(&metrics), arced_pg_oracle);
170
171                let arced_oracle: Arc<dyn TimestampOracle<Timestamp> + Send + Sync> =
172                    Arc::new(batching_oracle);
173
174                arced_oracle
175            }
176        })
177        .await?;
178
179        Ok(())
180    }
181}