mz_timestamp_oracle/
lib.rs1use async_trait::async_trait;
18use mz_ore::now::{EpochMillis, NowFn};
19
20pub mod batching_oracle;
21pub mod metrics;
22pub mod postgres_oracle;
23pub mod retry;
24
25#[derive(Debug)]
27pub struct WriteTimestamp<T = mz_repr::Timestamp> {
28 pub timestamp: T,
30 pub advance_to: T,
32}
33
34#[async_trait]
41pub trait TimestampOracle<T>: std::fmt::Debug {
42 async fn write_ts(&self) -> WriteTimestamp<T>;
47
48 async fn peek_write_ts(&self) -> T;
50
51 async fn read_ts(&self) -> T;
57
58 async fn apply_write(&self, lower_bound: T);
63}
64
65pub trait GenericNowFn<T>: Clone + Send + Sync {
71 fn now(&self) -> T;
72}
73
74impl GenericNowFn<mz_repr::Timestamp> for NowFn<EpochMillis> {
75 fn now(&self) -> mz_repr::Timestamp {
76 (self)().into()
77 }
78}
79
80impl<T: Clone + Send + Sync> GenericNowFn<T> for NowFn<T> {
81 fn now(&self) -> T {
82 (self)()
83 }
84}
85
86pub mod tests {
89 use std::sync::Arc;
90
91 use futures::Future;
92 use mz_repr::Timestamp;
93
94 use super::*;
95
96 pub async fn timestamp_oracle_impl_test<F, NewFn>(
100 mut new_fn: NewFn,
101 ) -> Result<(), anyhow::Error>
102 where
103 F: Future<Output = Arc<dyn TimestampOracle<Timestamp> + Send + Sync>>,
104 NewFn: FnMut(String, NowFn, Timestamp) -> F,
105 {
106 let timeline = uuid::Uuid::new_v4().to_string();
112 let oracle = new_fn(timeline, NowFn::from(|| 0u64), Timestamp::MIN).await;
113 assert_eq!(oracle.read_ts().await, Timestamp::MIN);
114 assert_eq!(oracle.peek_write_ts().await, Timestamp::MIN);
115
116 let timeline = uuid::Uuid::new_v4().to_string();
118 let oracle = new_fn(timeline, NowFn::from(|| 0u64), Timestamp::MAX).await;
119 assert_eq!(oracle.read_ts().await, Timestamp::MAX);
120 assert_eq!(oracle.peek_write_ts().await, Timestamp::MAX);
121
122 let timeline = uuid::Uuid::new_v4().to_string();
125 let oracle = new_fn(
126 timeline,
127 NowFn::from(|| Timestamp::MAX.step_back().expect("known to work").into()),
128 Timestamp::MIN,
129 )
130 .await;
131 assert_eq!(oracle.read_ts().await, Timestamp::MIN);
133 assert_eq!(oracle.peek_write_ts().await, Timestamp::MIN);
134 assert_eq!(
135 oracle.write_ts().await.timestamp,
136 Timestamp::MAX.step_back().expect("known to work")
137 );
138 assert_eq!(oracle.read_ts().await, Timestamp::MIN);
140 assert_eq!(
141 oracle.peek_write_ts().await,
142 Timestamp::MAX.step_back().expect("known to work")
143 );
144
145 let timeline = uuid::Uuid::new_v4().to_string();
147 let oracle = new_fn(timeline, NowFn::from(|| 0u64), Timestamp::MIN).await;
148 assert_eq!(oracle.write_ts().await.timestamp, Timestamp::from(1u64));
149 assert_eq!(oracle.write_ts().await.timestamp, Timestamp::from(2u64));
150
151 let timeline = uuid::Uuid::new_v4().to_string();
153 let oracle = new_fn(timeline, NowFn::from(|| 0u64), Timestamp::MIN).await;
154 assert_eq!(oracle.peek_write_ts().await, Timestamp::from(0u64));
155 assert_eq!(oracle.peek_write_ts().await, Timestamp::from(0u64));
156
157 let timeline = uuid::Uuid::new_v4().to_string();
162 let oracle = new_fn(timeline, NowFn::from(|| 0u64), 10u64.into()).await;
163 oracle.apply_write(5u64.into()).await;
164 assert_eq!(oracle.peek_write_ts().await, Timestamp::from(10u64));
165 assert_eq!(oracle.read_ts().await, Timestamp::from(10u64));
166
167 let timeline = uuid::Uuid::new_v4().to_string();
170 let oracle = new_fn(timeline, NowFn::from(|| 0u64), 0u64.into()).await;
171 assert_eq!(oracle.write_ts().await.timestamp, Timestamp::from(1u64));
173 assert_eq!(oracle.write_ts().await.timestamp, Timestamp::from(2u64));
174 assert_eq!(oracle.write_ts().await.timestamp, Timestamp::from(3u64));
175 assert_eq!(oracle.write_ts().await.timestamp, Timestamp::from(4u64));
176 oracle.apply_write(2u64.into()).await;
177 assert_eq!(oracle.peek_write_ts().await, Timestamp::from(4u64));
178 assert_eq!(oracle.read_ts().await, Timestamp::from(2u64));
179
180 let timeline = uuid::Uuid::new_v4().to_string();
183 let oracle = new_fn(timeline, NowFn::from(|| 0u64), 0u64.into()).await;
184 oracle.apply_write(2u64.into()).await;
185 assert_eq!(oracle.peek_write_ts().await, Timestamp::from(2u64));
186 assert_eq!(oracle.read_ts().await, Timestamp::from(2u64));
187 oracle.apply_write(4u64.into()).await;
188 assert_eq!(oracle.peek_write_ts().await, Timestamp::from(4u64));
189 assert_eq!(oracle.read_ts().await, Timestamp::from(4u64));
190
191 Ok(())
192 }
193}