mz_timestamp_oracle/
batching_oracle.rs1use std::sync::Arc;
14
15use async_trait::async_trait;
16use mz_ore::cast::CastFrom;
17use tokio::sync::mpsc::UnboundedSender;
18use tokio::sync::oneshot;
19
20use crate::metrics::Metrics;
21use crate::{TimestampOracle, WriteTimestamp};
22
23pub struct BatchingTimestampOracle<T> {
36 inner: Arc<dyn TimestampOracle<T> + Send + Sync>,
37 command_tx: UnboundedSender<Command<T>>,
38}
39
40enum Command<T> {
42 ReadTs(oneshot::Sender<T>),
43}
44
45impl<T> std::fmt::Debug for BatchingTimestampOracle<T> {
46 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47 f.debug_struct("BatchingTimestampOracle").finish()
48 }
49}
50
51impl<T> BatchingTimestampOracle<T>
52where
53 T: Clone + Send + Sync + 'static,
54{
55 pub fn new(metrics: Arc<Metrics>, oracle: Arc<dyn TimestampOracle<T> + Send + Sync>) -> Self {
57 let (command_tx, mut command_rx) = tokio::sync::mpsc::unbounded_channel();
58
59 let task_oracle = Arc::clone(&oracle);
60
61 mz_ore::task::spawn(|| "BatchingTimestampOracle Worker Task", async move {
62 let read_ts_metrics = &metrics.batching.read_ts;
63
64 while let Some(cmd) = command_rx.recv().await {
67 let mut pending_cmds = vec![cmd];
68 while let Ok(cmd) = command_rx.try_recv() {
69 pending_cmds.push(cmd);
70 }
71
72 read_ts_metrics
73 .ops_count
74 .inc_by(u64::cast_from(pending_cmds.len()));
75 read_ts_metrics.batches_count.inc();
76
77 let ts = task_oracle.read_ts().await;
78 for cmd in pending_cmds {
79 match cmd {
80 Command::ReadTs(response_tx) => {
81 let _ = response_tx.send(ts.clone());
84 }
85 }
86 }
87 }
88
89 tracing::debug!("shutting down BatchingTimestampOracle task");
90 });
91
92 Self {
93 inner: oracle,
94 command_tx,
95 }
96 }
97}
98
99#[async_trait]
100impl<T> TimestampOracle<T> for BatchingTimestampOracle<T>
101where
102 T: Send + Sync,
103{
104 async fn write_ts(&self) -> WriteTimestamp<T> {
105 self.inner.write_ts().await
106 }
107
108 async fn peek_write_ts(&self) -> T {
109 self.inner.peek_write_ts().await
110 }
111
112 async fn read_ts(&self) -> T {
113 let (tx, rx) = oneshot::channel();
114
115 self.command_tx.send(Command::ReadTs(tx)).expect(
116 "worker task cannot stop while we still have senders for the command/request channel",
117 );
118
119 rx.await
120 .expect("worker task cannot stop while there are outstanding commands/requests")
121 }
122
123 async fn apply_write(&self, write_ts: T) {
124 self.inner.apply_write(write_ts).await
125 }
126}
127
128#[cfg(test)]
129mod tests {
130
131 use mz_ore::metrics::MetricsRegistry;
132 use mz_repr::Timestamp;
133 use tracing::info;
134
135 use crate::postgres_oracle::{PostgresTimestampOracle, PostgresTimestampOracleConfig};
136
137 use super::*;
138
139 #[mz_ore::test(tokio::test)]
140 #[cfg_attr(miri, ignore)] async fn test_batching_timestamp_oracle() -> Result<(), anyhow::Error> {
142 let config = match PostgresTimestampOracleConfig::new_for_test() {
143 Some(config) => config,
144 None => {
145 info!(
146 "{} env not set: skipping test that uses external service",
147 PostgresTimestampOracleConfig::EXTERNAL_TESTS_POSTGRES_URL
148 );
149 return Ok(());
150 }
151 };
152 let metrics = Arc::new(Metrics::new(&MetricsRegistry::new()));
153
154 crate::tests::timestamp_oracle_impl_test(|timeline, now_fn, initial_ts| {
155 let pg_oracle = PostgresTimestampOracle::open(
157 config.clone(),
158 timeline,
159 initial_ts,
160 now_fn,
161 false, );
163
164 async {
165 let arced_pg_oracle: Arc<dyn TimestampOracle<Timestamp> + Send + Sync> =
166 Arc::new(pg_oracle.await);
167
168 let batching_oracle =
169 BatchingTimestampOracle::new(Arc::clone(&metrics), arced_pg_oracle);
170
171 let arced_oracle: Arc<dyn TimestampOracle<Timestamp> + Send + Sync> =
172 Arc::new(batching_oracle);
173
174 arced_oracle
175 }
176 })
177 .await?;
178
179 Ok(())
180 }
181}