mz_timestamp_oracle/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An interface/trait that provides write and read timestamps, reads observe
11//! exactly their preceding writes.
12//!
13//! Specifically, all read timestamps will be greater or equal to all previously
14//! reported completed write timestamps, and strictly less than all subsequently
15//! emitted write timestamps.
16
17use async_trait::async_trait;
18use mz_ore::now::{EpochMillis, NowFn};
19
20pub mod batching_oracle;
21pub mod metrics;
22pub mod postgres_oracle;
23pub mod retry;
24
25/// Timestamps used by writes in an Append command.
26#[derive(Debug)]
27pub struct WriteTimestamp<T = mz_repr::Timestamp> {
28    /// Timestamp that the write will take place on.
29    pub timestamp: T,
30    /// Timestamp to advance the appended table to.
31    pub advance_to: T,
32}
33
34/// A type that provides write and read timestamps, reads observe exactly their
35/// preceding writes.
36///
37/// Specifically, all read timestamps will be greater or equal to all previously
38/// reported completed write timestamps, and strictly less than all subsequently
39/// emitted write timestamps.
40#[async_trait]
41pub trait TimestampOracle<T>: std::fmt::Debug {
42    /// Acquire a new timestamp for writing.
43    ///
44    /// This timestamp will be strictly greater than all prior values of
45    /// `self.read_ts()` and `self.write_ts()`.
46    async fn write_ts(&self) -> WriteTimestamp<T>;
47
48    /// Peek the current write timestamp.
49    async fn peek_write_ts(&self) -> T;
50
51    /// Acquire a new timestamp for reading.
52    ///
53    /// This timestamp will be greater or equal to all prior values of
54    /// `self.apply_write(write_ts)`, and strictly less than all subsequent
55    /// values of `self.write_ts()`.
56    async fn read_ts(&self) -> T;
57
58    /// Mark a write at `write_ts` completed.
59    ///
60    /// All subsequent values of `self.read_ts()` will be greater or equal to
61    /// `write_ts`.
62    async fn apply_write(&self, lower_bound: T);
63}
64
65/// A [`NowFn`] that is generic over the timestamp.
66///
67/// The oracle operations work in terms of [`mz_repr::Timestamp`] and we could
68/// work around it by bridging between the two in the oracle implementation
69/// itself. This wrapper type makes that slightly easier, though.
70pub trait GenericNowFn<T>: Clone + Send + Sync {
71    fn now(&self) -> T;
72}
73
74impl GenericNowFn<mz_repr::Timestamp> for NowFn<EpochMillis> {
75    fn now(&self) -> mz_repr::Timestamp {
76        (self)().into()
77    }
78}
79
80impl<T: Clone + Send + Sync> GenericNowFn<T> for NowFn<T> {
81    fn now(&self) -> T {
82        (self)()
83    }
84}
85
86// TODO: Gate this with a `#[cfg(test)]` again once the legacy catalog impl goes
87// away.
88pub mod tests {
89    use std::sync::Arc;
90
91    use futures::Future;
92    use mz_repr::Timestamp;
93
94    use super::*;
95
96    // These test methods are meant to be used by tests for timestamp oracle
97    // implementations.
98
99    pub async fn timestamp_oracle_impl_test<F, NewFn>(
100        mut new_fn: NewFn,
101    ) -> Result<(), anyhow::Error>
102    where
103        F: Future<Output = Arc<dyn TimestampOracle<Timestamp> + Send + Sync>>,
104        NewFn: FnMut(String, NowFn, Timestamp) -> F,
105    {
106        // Normally, these could all be separate test methods but we bundle them
107        // all together so that it's easier to call this one test method from
108        // the implementation tests.
109
110        // Timestamp::MIN as initial timestamp
111        let timeline = uuid::Uuid::new_v4().to_string();
112        let oracle = new_fn(timeline, NowFn::from(|| 0u64), Timestamp::MIN).await;
113        assert_eq!(oracle.read_ts().await, Timestamp::MIN);
114        assert_eq!(oracle.peek_write_ts().await, Timestamp::MIN);
115
116        // Timestamp::MAX as initial timestamp
117        let timeline = uuid::Uuid::new_v4().to_string();
118        let oracle = new_fn(timeline, NowFn::from(|| 0u64), Timestamp::MAX).await;
119        assert_eq!(oracle.read_ts().await, Timestamp::MAX);
120        assert_eq!(oracle.peek_write_ts().await, Timestamp::MAX);
121
122        // Timestamp::MAX-1 from NowFn. We have to step back by one, otherwise
123        // `write_ts` can't determine the "advance_to" timestamp.
124        let timeline = uuid::Uuid::new_v4().to_string();
125        let oracle = new_fn(
126            timeline,
127            NowFn::from(|| Timestamp::MAX.step_back().expect("known to work").into()),
128            Timestamp::MIN,
129        )
130        .await;
131        // At first, read_ts and peek_write_ts stay where they are.
132        assert_eq!(oracle.read_ts().await, Timestamp::MIN);
133        assert_eq!(oracle.peek_write_ts().await, Timestamp::MIN);
134        assert_eq!(
135            oracle.write_ts().await.timestamp,
136            Timestamp::MAX.step_back().expect("known to work")
137        );
138        // Now peek_write_ts jump to MAX-1 but read_ts stays.
139        assert_eq!(oracle.read_ts().await, Timestamp::MIN);
140        assert_eq!(
141            oracle.peek_write_ts().await,
142            Timestamp::MAX.step_back().expect("known to work")
143        );
144
145        // Repeated write_ts calls advance the timestamp.
146        let timeline = uuid::Uuid::new_v4().to_string();
147        let oracle = new_fn(timeline, NowFn::from(|| 0u64), Timestamp::MIN).await;
148        assert_eq!(oracle.write_ts().await.timestamp, Timestamp::from(1u64));
149        assert_eq!(oracle.write_ts().await.timestamp, Timestamp::from(2u64));
150
151        // Repeated peek_write_ts calls _DON'T_ advance the timestamp.
152        let timeline = uuid::Uuid::new_v4().to_string();
153        let oracle = new_fn(timeline, NowFn::from(|| 0u64), Timestamp::MIN).await;
154        assert_eq!(oracle.peek_write_ts().await, Timestamp::from(0u64));
155        assert_eq!(oracle.peek_write_ts().await, Timestamp::from(0u64));
156
157        // Interesting scenarios around apply_write, from its rustdoc.
158        //
159        // Scenario #1:
160        // input <= r_0 <= w_0 -> r_1 = r_0 and w_1 = w_0
161        let timeline = uuid::Uuid::new_v4().to_string();
162        let oracle = new_fn(timeline, NowFn::from(|| 0u64), 10u64.into()).await;
163        oracle.apply_write(5u64.into()).await;
164        assert_eq!(oracle.peek_write_ts().await, Timestamp::from(10u64));
165        assert_eq!(oracle.read_ts().await, Timestamp::from(10u64));
166
167        // Scenario #2:
168        // r_0 <= input <= w_0 -> r_1 = input and w_1 = w_0
169        let timeline = uuid::Uuid::new_v4().to_string();
170        let oracle = new_fn(timeline, NowFn::from(|| 0u64), 0u64.into()).await;
171        // Have to bump the write_ts up manually:
172        assert_eq!(oracle.write_ts().await.timestamp, Timestamp::from(1u64));
173        assert_eq!(oracle.write_ts().await.timestamp, Timestamp::from(2u64));
174        assert_eq!(oracle.write_ts().await.timestamp, Timestamp::from(3u64));
175        assert_eq!(oracle.write_ts().await.timestamp, Timestamp::from(4u64));
176        oracle.apply_write(2u64.into()).await;
177        assert_eq!(oracle.peek_write_ts().await, Timestamp::from(4u64));
178        assert_eq!(oracle.read_ts().await, Timestamp::from(2u64));
179
180        // Scenario #3:
181        // r_0 <= w_0 <= input -> r_1 = input and w_1 = input
182        let timeline = uuid::Uuid::new_v4().to_string();
183        let oracle = new_fn(timeline, NowFn::from(|| 0u64), 0u64.into()).await;
184        oracle.apply_write(2u64.into()).await;
185        assert_eq!(oracle.peek_write_ts().await, Timestamp::from(2u64));
186        assert_eq!(oracle.read_ts().await, Timestamp::from(2u64));
187        oracle.apply_write(4u64.into()).await;
188        assert_eq!(oracle.peek_write_ts().await, Timestamp::from(4u64));
189        assert_eq!(oracle.read_ts().await, Timestamp::from(4u64));
190
191        Ok(())
192    }
193}