Skip to main content

mz_timestamp_oracle/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An interface/trait that provides write and read timestamps, reads observe
11//! exactly their preceding writes.
12//!
13//! Specifically, all read timestamps will be greater or equal to all previously
14//! reported completed write timestamps, and strictly less than all subsequently
15//! emitted write timestamps.
16
17use async_trait::async_trait;
18use mz_ore::now::{EpochMillis, NowFn};
19
20pub mod batching_oracle;
21pub mod config;
22#[cfg(any(target_os = "linux", feature = "foundationdb"))]
23pub mod foundationdb_oracle;
24pub mod metrics;
25pub mod postgres_oracle;
26pub mod retry;
27
28pub use config::TimestampOracleConfig;
29
30/// Timestamps used by writes in an Append command.
31#[derive(Debug)]
32pub struct WriteTimestamp<T = mz_repr::Timestamp> {
33    /// Timestamp that the write will take place on.
34    pub timestamp: T,
35    /// Timestamp to advance the appended table to.
36    pub advance_to: T,
37}
38
39/// A type that provides write and read timestamps, reads observe exactly their
40/// preceding writes.
41///
42/// Specifically, all read timestamps will be greater or equal to all previously
43/// reported completed write timestamps, and strictly less than all subsequently
44/// emitted write timestamps.
45#[async_trait]
46pub trait TimestampOracle<T>: std::fmt::Debug {
47    /// Acquire a new timestamp for writing.
48    ///
49    /// This timestamp will be strictly greater than all prior values of
50    /// `self.read_ts()` and `self.write_ts()`.
51    async fn write_ts(&self) -> WriteTimestamp<T>;
52
53    /// Peek the current write timestamp.
54    async fn peek_write_ts(&self) -> T;
55
56    /// Acquire a new timestamp for reading.
57    ///
58    /// This timestamp will be greater or equal to all prior values of
59    /// `self.apply_write(write_ts)`, and strictly less than all subsequent
60    /// values of `self.write_ts()`.
61    async fn read_ts(&self) -> T;
62
63    /// Mark a write at `write_ts` completed.
64    ///
65    /// All subsequent values of `self.read_ts()` will be greater or equal to
66    /// `write_ts`.
67    async fn apply_write(&self, lower_bound: T);
68}
69
70/// A [`NowFn`] that is generic over the timestamp.
71///
72/// The oracle operations work in terms of [`mz_repr::Timestamp`] and we could
73/// work around it by bridging between the two in the oracle implementation
74/// itself. This wrapper type makes that slightly easier, though.
75pub trait GenericNowFn<T>: Clone + Send + Sync {
76    fn now(&self) -> T;
77}
78
79impl GenericNowFn<mz_repr::Timestamp> for NowFn<EpochMillis> {
80    fn now(&self) -> mz_repr::Timestamp {
81        (self)().into()
82    }
83}
84
85impl<T: Clone + Send + Sync> GenericNowFn<T> for NowFn<T> {
86    fn now(&self) -> T {
87        (self)()
88    }
89}
90
91// TODO: Gate this with a `#[cfg(test)]` again once the legacy catalog impl goes
92// away.
93pub mod tests {
94    use std::sync::Arc;
95
96    use futures::Future;
97    use mz_repr::Timestamp;
98
99    use super::*;
100
101    // These test methods are meant to be used by tests for timestamp oracle
102    // implementations.
103
104    pub async fn timestamp_oracle_impl_test<F, NewFn>(
105        mut new_fn: NewFn,
106    ) -> Result<(), anyhow::Error>
107    where
108        F: Future<Output = Arc<dyn TimestampOracle<Timestamp> + Send + Sync>>,
109        NewFn: FnMut(String, NowFn, Timestamp) -> F,
110    {
111        // Normally, these could all be separate test methods but we bundle them
112        // all together so that it's easier to call this one test method from
113        // the implementation tests.
114
115        // Timestamp::MIN as initial timestamp
116        let timeline = uuid::Uuid::new_v4().to_string();
117        let oracle = new_fn(timeline, NowFn::from(|| 0u64), Timestamp::MIN).await;
118        assert_eq!(oracle.read_ts().await, Timestamp::MIN);
119        assert_eq!(oracle.peek_write_ts().await, Timestamp::MIN);
120
121        // Timestamp::MAX as initial timestamp
122        let timeline = uuid::Uuid::new_v4().to_string();
123        let oracle = new_fn(timeline, NowFn::from(|| 0u64), Timestamp::MAX).await;
124        assert_eq!(oracle.read_ts().await, Timestamp::MAX);
125        assert_eq!(oracle.peek_write_ts().await, Timestamp::MAX);
126
127        // Timestamp::MAX-1 from NowFn. We have to step back by one, otherwise
128        // `write_ts` can't determine the "advance_to" timestamp.
129        let timeline = uuid::Uuid::new_v4().to_string();
130        let oracle = new_fn(
131            timeline,
132            NowFn::from(|| Timestamp::MAX.step_back().expect("known to work").into()),
133            Timestamp::MIN,
134        )
135        .await;
136        // At first, read_ts and peek_write_ts stay where they are.
137        assert_eq!(oracle.read_ts().await, Timestamp::MIN);
138        assert_eq!(oracle.peek_write_ts().await, Timestamp::MIN);
139        assert_eq!(
140            oracle.write_ts().await.timestamp,
141            Timestamp::MAX.step_back().expect("known to work")
142        );
143        // Now peek_write_ts jump to MAX-1 but read_ts stays.
144        assert_eq!(oracle.read_ts().await, Timestamp::MIN);
145        assert_eq!(
146            oracle.peek_write_ts().await,
147            Timestamp::MAX.step_back().expect("known to work")
148        );
149
150        // Repeated write_ts calls advance the timestamp.
151        let timeline = uuid::Uuid::new_v4().to_string();
152        let oracle = new_fn(timeline, NowFn::from(|| 0u64), Timestamp::MIN).await;
153        assert_eq!(oracle.write_ts().await.timestamp, Timestamp::from(1u64));
154        assert_eq!(oracle.write_ts().await.timestamp, Timestamp::from(2u64));
155
156        // Repeated peek_write_ts calls _DON'T_ advance the timestamp.
157        let timeline = uuid::Uuid::new_v4().to_string();
158        let oracle = new_fn(timeline, NowFn::from(|| 0u64), Timestamp::MIN).await;
159        assert_eq!(oracle.peek_write_ts().await, Timestamp::from(0u64));
160        assert_eq!(oracle.peek_write_ts().await, Timestamp::from(0u64));
161
162        // Interesting scenarios around apply_write, from its rustdoc.
163        //
164        // Scenario #1:
165        // input <= r_0 <= w_0 -> r_1 = r_0 and w_1 = w_0
166        let timeline = uuid::Uuid::new_v4().to_string();
167        let oracle = new_fn(timeline, NowFn::from(|| 0u64), 10u64.into()).await;
168        oracle.apply_write(5u64.into()).await;
169        assert_eq!(oracle.peek_write_ts().await, Timestamp::from(10u64));
170        assert_eq!(oracle.read_ts().await, Timestamp::from(10u64));
171
172        // Scenario #2:
173        // r_0 <= input <= w_0 -> r_1 = input and w_1 = w_0
174        let timeline = uuid::Uuid::new_v4().to_string();
175        let oracle = new_fn(timeline, NowFn::from(|| 0u64), 0u64.into()).await;
176        // Have to bump the write_ts up manually:
177        assert_eq!(oracle.write_ts().await.timestamp, Timestamp::from(1u64));
178        assert_eq!(oracle.write_ts().await.timestamp, Timestamp::from(2u64));
179        assert_eq!(oracle.write_ts().await.timestamp, Timestamp::from(3u64));
180        assert_eq!(oracle.write_ts().await.timestamp, Timestamp::from(4u64));
181        oracle.apply_write(2u64.into()).await;
182        assert_eq!(oracle.peek_write_ts().await, Timestamp::from(4u64));
183        assert_eq!(oracle.read_ts().await, Timestamp::from(2u64));
184
185        // Scenario #3:
186        // r_0 <= w_0 <= input -> r_1 = input and w_1 = input
187        let timeline = uuid::Uuid::new_v4().to_string();
188        let oracle = new_fn(timeline, NowFn::from(|| 0u64), 0u64.into()).await;
189        oracle.apply_write(2u64.into()).await;
190        assert_eq!(oracle.peek_write_ts().await, Timestamp::from(2u64));
191        assert_eq!(oracle.read_ts().await, Timestamp::from(2u64));
192        oracle.apply_write(4u64.into()).await;
193        assert_eq!(oracle.peek_write_ts().await, Timestamp::from(4u64));
194        assert_eq!(oracle.read_ts().await, Timestamp::from(4u64));
195
196        Ok(())
197    }
198}