1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910//! A timestamp oracle that wraps a `TimestampOracle` and batches calls
11//! to it.
1213use std::sync::Arc;
1415use async_trait::async_trait;
16use mz_ore::cast::CastFrom;
17use tokio::sync::mpsc::UnboundedSender;
18use tokio::sync::oneshot;
1920use crate::metrics::Metrics;
21use crate::{TimestampOracle, WriteTimestamp};
2223/// A batching [`TimestampOracle`] backed by a [`TimestampOracle`]
24///
25/// This will only batch calls to `read_ts` because the rest of the system
26/// already naturally does batching of write-related calls via the group commit
27/// mechanism. Write-related calls are passed straight through to the backing
28/// oracle.
29///
30/// For `read_ts` calls, we have to be careful to never cache results from the
31/// backing oracle: for the timestamp to be linearized we can never return a
32/// result as of an earlier moment, but batching them up is correct because this
33/// can only make it so that we return later timestamps. Those later timestamps
34/// still fall within the duration of the `read_ts` call and so are linearized.
35pub struct BatchingTimestampOracle<T> {
36 inner: Arc<dyn TimestampOracle<T> + Send + Sync>,
37 command_tx: UnboundedSender<Command<T>>,
38}
3940/// A command on the internal batching command stream.
41enum Command<T> {
42 ReadTs(oneshot::Sender<T>),
43}
4445impl<T> std::fmt::Debug for BatchingTimestampOracle<T> {
46fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47 f.debug_struct("BatchingTimestampOracle").finish()
48 }
49}
5051impl<T> BatchingTimestampOracle<T>
52where
53T: Clone + Send + Sync + 'static,
54{
55/// Crates a [`BatchingTimestampOracle`] that uses the given inner oracle.
56pub fn new(metrics: Arc<Metrics>, oracle: Arc<dyn TimestampOracle<T> + Send + Sync>) -> Self {
57let (command_tx, mut command_rx) = tokio::sync::mpsc::unbounded_channel();
5859let task_oracle = Arc::clone(&oracle);
6061 mz_ore::task::spawn(|| "BatchingTimestampOracle Worker Task", async move {
62let read_ts_metrics = &metrics.batching.read_ts;
6364// See comment on `BatchingTimestampOracle` for why this batching is
65 // correct.
66while let Some(cmd) = command_rx.recv().await {
67let mut pending_cmds = vec![cmd];
68while let Ok(cmd) = command_rx.try_recv() {
69 pending_cmds.push(cmd);
70 }
7172 read_ts_metrics
73 .ops_count
74 .inc_by(u64::cast_from(pending_cmds.len()));
75 read_ts_metrics.batches_count.inc();
7677let ts = task_oracle.read_ts().await;
78for cmd in pending_cmds {
79match cmd {
80 Command::ReadTs(response_tx) => {
81// It's okay if the receiver drops, just means
82 // they're not interested anymore.
83let _ = response_tx.send(ts.clone());
84 }
85 }
86 }
87 }
8889tracing::debug!("shutting down BatchingTimestampOracle task");
90 });
9192Self {
93 inner: oracle,
94 command_tx,
95 }
96 }
97}
9899#[async_trait]
100impl<T> TimestampOracle<T> for BatchingTimestampOracle<T>
101where
102T: Send + Sync,
103{
104async fn write_ts(&self) -> WriteTimestamp<T> {
105self.inner.write_ts().await
106}
107108async fn peek_write_ts(&self) -> T {
109self.inner.peek_write_ts().await
110}
111112async fn read_ts(&self) -> T {
113let (tx, rx) = oneshot::channel();
114115self.command_tx.send(Command::ReadTs(tx)).expect(
116"worker task cannot stop while we still have senders for the command/request channel",
117 );
118119 rx.await
120.expect("worker task cannot stop while there are outstanding commands/requests")
121 }
122123async fn apply_write(&self, write_ts: T) {
124self.inner.apply_write(write_ts).await
125}
126}
127128#[cfg(test)]
129mod tests {
130131use mz_ore::metrics::MetricsRegistry;
132use mz_repr::Timestamp;
133use tracing::info;
134135use crate::postgres_oracle::{PostgresTimestampOracle, PostgresTimestampOracleConfig};
136137use super::*;
138139#[mz_ore::test(tokio::test)]
140 #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
141async fn test_batching_timestamp_oracle() -> Result<(), anyhow::Error> {
142let config = match PostgresTimestampOracleConfig::new_for_test() {
143Some(config) => config,
144None => {
145info!(
146"{} env not set: skipping test that uses external service",
147 PostgresTimestampOracleConfig::EXTERNAL_TESTS_POSTGRES_URL
148 );
149return Ok(());
150 }
151 };
152let metrics = Arc::new(Metrics::new(&MetricsRegistry::new()));
153154crate::tests::timestamp_oracle_impl_test(|timeline, now_fn, initial_ts| {
155// We use the postgres oracle as the backing oracle.
156let pg_oracle = PostgresTimestampOracle::open(
157 config.clone(),
158 timeline,
159 initial_ts,
160 now_fn,
161false, /* read-only */
162);
163164async {
165let arced_pg_oracle: Arc<dyn TimestampOracle<Timestamp> + Send + Sync> =
166 Arc::new(pg_oracle.await);
167168let batching_oracle =
169 BatchingTimestampOracle::new(Arc::clone(&metrics), arced_pg_oracle);
170171let arced_oracle: Arc<dyn TimestampOracle<Timestamp> + Send + Sync> =
172 Arc::new(batching_oracle);
173174 arced_oracle
175 }
176 })
177 .await?;
178179Ok(())
180 }
181}