mz_timestamp_oracle/
lib.rs1use async_trait::async_trait;
18use mz_ore::now::{EpochMillis, NowFn};
19
20pub mod batching_oracle;
21pub mod config;
22#[cfg(any(target_os = "linux", feature = "foundationdb"))]
23pub mod foundationdb_oracle;
24pub mod metrics;
25pub mod postgres_oracle;
26pub mod retry;
27
28pub use config::TimestampOracleConfig;
29
30#[derive(Debug)]
32pub struct WriteTimestamp<T = mz_repr::Timestamp> {
33 pub timestamp: T,
35 pub advance_to: T,
37}
38
39#[async_trait]
46pub trait TimestampOracle<T>: std::fmt::Debug {
47 async fn write_ts(&self) -> WriteTimestamp<T>;
52
53 async fn peek_write_ts(&self) -> T;
55
56 async fn read_ts(&self) -> T;
62
63 async fn apply_write(&self, lower_bound: T);
68}
69
70pub trait GenericNowFn<T>: Clone + Send + Sync {
76 fn now(&self) -> T;
77}
78
79impl GenericNowFn<mz_repr::Timestamp> for NowFn<EpochMillis> {
80 fn now(&self) -> mz_repr::Timestamp {
81 (self)().into()
82 }
83}
84
85impl<T: Clone + Send + Sync> GenericNowFn<T> for NowFn<T> {
86 fn now(&self) -> T {
87 (self)()
88 }
89}
90
91pub mod tests {
94 use std::sync::Arc;
95
96 use futures::Future;
97 use mz_repr::Timestamp;
98
99 use super::*;
100
101 pub async fn timestamp_oracle_impl_test<F, NewFn>(
105 mut new_fn: NewFn,
106 ) -> Result<(), anyhow::Error>
107 where
108 F: Future<Output = Arc<dyn TimestampOracle<Timestamp> + Send + Sync>>,
109 NewFn: FnMut(String, NowFn, Timestamp) -> F,
110 {
111 let timeline = uuid::Uuid::new_v4().to_string();
117 let oracle = new_fn(timeline, NowFn::from(|| 0u64), Timestamp::MIN).await;
118 assert_eq!(oracle.read_ts().await, Timestamp::MIN);
119 assert_eq!(oracle.peek_write_ts().await, Timestamp::MIN);
120
121 let timeline = uuid::Uuid::new_v4().to_string();
123 let oracle = new_fn(timeline, NowFn::from(|| 0u64), Timestamp::MAX).await;
124 assert_eq!(oracle.read_ts().await, Timestamp::MAX);
125 assert_eq!(oracle.peek_write_ts().await, Timestamp::MAX);
126
127 let timeline = uuid::Uuid::new_v4().to_string();
130 let oracle = new_fn(
131 timeline,
132 NowFn::from(|| Timestamp::MAX.step_back().expect("known to work").into()),
133 Timestamp::MIN,
134 )
135 .await;
136 assert_eq!(oracle.read_ts().await, Timestamp::MIN);
138 assert_eq!(oracle.peek_write_ts().await, Timestamp::MIN);
139 assert_eq!(
140 oracle.write_ts().await.timestamp,
141 Timestamp::MAX.step_back().expect("known to work")
142 );
143 assert_eq!(oracle.read_ts().await, Timestamp::MIN);
145 assert_eq!(
146 oracle.peek_write_ts().await,
147 Timestamp::MAX.step_back().expect("known to work")
148 );
149
150 let timeline = uuid::Uuid::new_v4().to_string();
152 let oracle = new_fn(timeline, NowFn::from(|| 0u64), Timestamp::MIN).await;
153 assert_eq!(oracle.write_ts().await.timestamp, Timestamp::from(1u64));
154 assert_eq!(oracle.write_ts().await.timestamp, Timestamp::from(2u64));
155
156 let timeline = uuid::Uuid::new_v4().to_string();
158 let oracle = new_fn(timeline, NowFn::from(|| 0u64), Timestamp::MIN).await;
159 assert_eq!(oracle.peek_write_ts().await, Timestamp::from(0u64));
160 assert_eq!(oracle.peek_write_ts().await, Timestamp::from(0u64));
161
162 let timeline = uuid::Uuid::new_v4().to_string();
167 let oracle = new_fn(timeline, NowFn::from(|| 0u64), 10u64.into()).await;
168 oracle.apply_write(5u64.into()).await;
169 assert_eq!(oracle.peek_write_ts().await, Timestamp::from(10u64));
170 assert_eq!(oracle.read_ts().await, Timestamp::from(10u64));
171
172 let timeline = uuid::Uuid::new_v4().to_string();
175 let oracle = new_fn(timeline, NowFn::from(|| 0u64), 0u64.into()).await;
176 assert_eq!(oracle.write_ts().await.timestamp, Timestamp::from(1u64));
178 assert_eq!(oracle.write_ts().await.timestamp, Timestamp::from(2u64));
179 assert_eq!(oracle.write_ts().await.timestamp, Timestamp::from(3u64));
180 assert_eq!(oracle.write_ts().await.timestamp, Timestamp::from(4u64));
181 oracle.apply_write(2u64.into()).await;
182 assert_eq!(oracle.peek_write_ts().await, Timestamp::from(4u64));
183 assert_eq!(oracle.read_ts().await, Timestamp::from(2u64));
184
185 let timeline = uuid::Uuid::new_v4().to_string();
188 let oracle = new_fn(timeline, NowFn::from(|| 0u64), 0u64.into()).await;
189 oracle.apply_write(2u64.into()).await;
190 assert_eq!(oracle.peek_write_ts().await, Timestamp::from(2u64));
191 assert_eq!(oracle.read_ts().await, Timestamp::from(2u64));
192 oracle.apply_write(4u64.into()).await;
193 assert_eq!(oracle.peek_write_ts().await, Timestamp::from(4u64));
194 assert_eq!(oracle.read_ts().await, Timestamp::from(4u64));
195
196 Ok(())
197 }
198}