1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910//! An interface/trait that provides write and read timestamps, reads observe
11//! exactly their preceding writes.
12//!
13//! Specifically, all read timestamps will be greater or equal to all previously
14//! reported completed write timestamps, and strictly less than all subsequently
15//! emitted write timestamps.
1617use async_trait::async_trait;
18use mz_ore::now::{EpochMillis, NowFn};
1920pub mod batching_oracle;
21pub mod metrics;
22pub mod postgres_oracle;
23pub mod retry;
2425/// Timestamps used by writes in an Append command.
26#[derive(Debug)]
27pub struct WriteTimestamp<T = mz_repr::Timestamp> {
28/// Timestamp that the write will take place on.
29pub timestamp: T,
30/// Timestamp to advance the appended table to.
31pub advance_to: T,
32}
3334/// A type that provides write and read timestamps, reads observe exactly their
35/// preceding writes.
36///
37/// Specifically, all read timestamps will be greater or equal to all previously
38/// reported completed write timestamps, and strictly less than all subsequently
39/// emitted write timestamps.
40#[async_trait]
41pub trait TimestampOracle<T>: std::fmt::Debug {
42/// Acquire a new timestamp for writing.
43 ///
44 /// This timestamp will be strictly greater than all prior values of
45 /// `self.read_ts()` and `self.write_ts()`.
46async fn write_ts(&self) -> WriteTimestamp<T>;
4748/// Peek the current write timestamp.
49async fn peek_write_ts(&self) -> T;
5051/// Acquire a new timestamp for reading.
52 ///
53 /// This timestamp will be greater or equal to all prior values of
54 /// `self.apply_write(write_ts)`, and strictly less than all subsequent
55 /// values of `self.write_ts()`.
56async fn read_ts(&self) -> T;
5758/// Mark a write at `write_ts` completed.
59 ///
60 /// All subsequent values of `self.read_ts()` will be greater or equal to
61 /// `write_ts`.
62async fn apply_write(&self, lower_bound: T);
63}
6465/// A [`NowFn`] that is generic over the timestamp.
66///
67/// The oracle operations work in terms of [`mz_repr::Timestamp`] and we could
68/// work around it by bridging between the two in the oracle implementation
69/// itself. This wrapper type makes that slightly easier, though.
70pub trait GenericNowFn<T>: Clone + Send + Sync {
71fn now(&self) -> T;
72}
7374impl GenericNowFn<mz_repr::Timestamp> for NowFn<EpochMillis> {
75fn now(&self) -> mz_repr::Timestamp {
76 (self)().into()
77 }
78}
7980impl<T: Clone + Send + Sync> GenericNowFn<T> for NowFn<T> {
81fn now(&self) -> T {
82 (self)()
83 }
84}
8586// TODO: Gate this with a `#[cfg(test)]` again once the legacy catalog impl goes
87// away.
88pub mod tests {
89use std::sync::Arc;
9091use futures::Future;
92use mz_repr::Timestamp;
9394use super::*;
9596// These test methods are meant to be used by tests for timestamp oracle
97 // implementations.
9899pub async fn timestamp_oracle_impl_test<F, NewFn>(
100mut new_fn: NewFn,
101 ) -> Result<(), anyhow::Error>
102where
103F: Future<Output = Arc<dyn TimestampOracle<Timestamp> + Send + Sync>>,
104 NewFn: FnMut(String, NowFn, Timestamp) -> F,
105 {
106// Normally, these could all be separate test methods but we bundle them
107 // all together so that it's easier to call this one test method from
108 // the implementation tests.
109110 // Timestamp::MIN as initial timestamp
111let timeline = uuid::Uuid::new_v4().to_string();
112let oracle = new_fn(timeline, NowFn::from(|| 0u64), Timestamp::MIN).await;
113assert_eq!(oracle.read_ts().await, Timestamp::MIN);
114assert_eq!(oracle.peek_write_ts().await, Timestamp::MIN);
115116// Timestamp::MAX as initial timestamp
117let timeline = uuid::Uuid::new_v4().to_string();
118let oracle = new_fn(timeline, NowFn::from(|| 0u64), Timestamp::MAX).await;
119assert_eq!(oracle.read_ts().await, Timestamp::MAX);
120assert_eq!(oracle.peek_write_ts().await, Timestamp::MAX);
121122// Timestamp::MAX-1 from NowFn. We have to step back by one, otherwise
123 // `write_ts` can't determine the "advance_to" timestamp.
124let timeline = uuid::Uuid::new_v4().to_string();
125let oracle = new_fn(
126 timeline,
127 NowFn::from(|| Timestamp::MAX.step_back().expect("known to work").into()),
128 Timestamp::MIN,
129 )
130 .await;
131// At first, read_ts and peek_write_ts stay where they are.
132assert_eq!(oracle.read_ts().await, Timestamp::MIN);
133assert_eq!(oracle.peek_write_ts().await, Timestamp::MIN);
134assert_eq!(
135 oracle.write_ts().await.timestamp,
136 Timestamp::MAX.step_back().expect("known to work")
137 );
138// Now peek_write_ts jump to MAX-1 but read_ts stays.
139assert_eq!(oracle.read_ts().await, Timestamp::MIN);
140assert_eq!(
141 oracle.peek_write_ts().await,
142 Timestamp::MAX.step_back().expect("known to work")
143 );
144145// Repeated write_ts calls advance the timestamp.
146let timeline = uuid::Uuid::new_v4().to_string();
147let oracle = new_fn(timeline, NowFn::from(|| 0u64), Timestamp::MIN).await;
148assert_eq!(oracle.write_ts().await.timestamp, Timestamp::from(1u64));
149assert_eq!(oracle.write_ts().await.timestamp, Timestamp::from(2u64));
150151// Repeated peek_write_ts calls _DON'T_ advance the timestamp.
152let timeline = uuid::Uuid::new_v4().to_string();
153let oracle = new_fn(timeline, NowFn::from(|| 0u64), Timestamp::MIN).await;
154assert_eq!(oracle.peek_write_ts().await, Timestamp::from(0u64));
155assert_eq!(oracle.peek_write_ts().await, Timestamp::from(0u64));
156157// Interesting scenarios around apply_write, from its rustdoc.
158 //
159 // Scenario #1:
160 // input <= r_0 <= w_0 -> r_1 = r_0 and w_1 = w_0
161let timeline = uuid::Uuid::new_v4().to_string();
162let oracle = new_fn(timeline, NowFn::from(|| 0u64), 10u64.into()).await;
163 oracle.apply_write(5u64.into()).await;
164assert_eq!(oracle.peek_write_ts().await, Timestamp::from(10u64));
165assert_eq!(oracle.read_ts().await, Timestamp::from(10u64));
166167// Scenario #2:
168 // r_0 <= input <= w_0 -> r_1 = input and w_1 = w_0
169let timeline = uuid::Uuid::new_v4().to_string();
170let oracle = new_fn(timeline, NowFn::from(|| 0u64), 0u64.into()).await;
171// Have to bump the write_ts up manually:
172assert_eq!(oracle.write_ts().await.timestamp, Timestamp::from(1u64));
173assert_eq!(oracle.write_ts().await.timestamp, Timestamp::from(2u64));
174assert_eq!(oracle.write_ts().await.timestamp, Timestamp::from(3u64));
175assert_eq!(oracle.write_ts().await.timestamp, Timestamp::from(4u64));
176 oracle.apply_write(2u64.into()).await;
177assert_eq!(oracle.peek_write_ts().await, Timestamp::from(4u64));
178assert_eq!(oracle.read_ts().await, Timestamp::from(2u64));
179180// Scenario #3:
181 // r_0 <= w_0 <= input -> r_1 = input and w_1 = input
182let timeline = uuid::Uuid::new_v4().to_string();
183let oracle = new_fn(timeline, NowFn::from(|| 0u64), 0u64.into()).await;
184 oracle.apply_write(2u64.into()).await;
185assert_eq!(oracle.peek_write_ts().await, Timestamp::from(2u64));
186assert_eq!(oracle.read_ts().await, Timestamp::from(2u64));
187 oracle.apply_write(4u64.into()).await;
188assert_eq!(oracle.peek_write_ts().await, Timestamp::from(4u64));
189assert_eq!(oracle.read_ts().await, Timestamp::from(4u64));
190191Ok(())
192 }
193}