mz_adapter/coord/
in_memory_oracle.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A timestamp oracle that relies on the [`crate::catalog::Catalog`] for persistence/durability
11//! and reserves ranges of timestamps.
12
13use std::fmt::Debug;
14
15use derivative::Derivative;
16use mz_repr::TimestampManipulation;
17use mz_timestamp_oracle::{GenericNowFn, WriteTimestamp};
18
19/// A type that provides write and read timestamps, reads observe exactly their
20/// preceding writes.
21///
22/// Specifically, all read timestamps will be greater or equal to all previously
23/// reported completed write timestamps, and strictly less than all subsequently
24/// emitted write timestamps.
25///
26/// A timeline can perform reads and writes. Reads happen at the read timestamp
27/// and writes happen at the write timestamp. After the write has completed, but
28/// before a response is sent, the read timestamp must be updated to a value
29/// greater than or equal to `self.write_ts`.
30#[derive(Derivative)]
31#[derivative(Debug)]
32pub struct InMemoryTimestampOracle<T, N>
33where
34    T: Debug,
35    N: GenericNowFn<T>,
36{
37    read_ts: T,
38    write_ts: T,
39    #[derivative(Debug = "ignore")]
40    #[allow(dead_code)]
41    next: N,
42}
43
44impl<T: TimestampManipulation, N> InMemoryTimestampOracle<T, N>
45where
46    N: GenericNowFn<T>,
47{
48    /// Create a new timeline, starting at the indicated time. `next` generates
49    /// new timestamps when invoked. The timestamps have no requirements, and
50    /// can retreat from previous invocations.
51    pub fn new(initially: T, next: N) -> Self
52    where
53        N: GenericNowFn<T>,
54    {
55        Self {
56            read_ts: initially.clone(),
57            write_ts: initially,
58            next,
59        }
60    }
61
62    /// Acquire a new timestamp for writing.
63    ///
64    /// This timestamp will be strictly greater than all prior values of
65    /// `self.read_ts()` and `self.write_ts()`.
66    #[allow(dead_code)]
67    fn write_ts(&mut self) -> WriteTimestamp<T> {
68        let mut next = self.next.now();
69        if next.less_equal(&self.write_ts) {
70            next = TimestampManipulation::step_forward(&self.write_ts);
71        }
72        assert!(self.read_ts.less_than(&next));
73        assert!(self.write_ts.less_than(&next));
74        self.write_ts = next.clone();
75        assert!(self.read_ts.less_equal(&self.write_ts));
76        let advance_to = TimestampManipulation::step_forward(&next);
77        WriteTimestamp {
78            timestamp: next,
79            advance_to,
80        }
81    }
82
83    /// Peek the current write timestamp.
84    #[allow(dead_code)]
85    fn peek_write_ts(&self) -> T {
86        self.write_ts.clone()
87    }
88
89    /// Acquire a new timestamp for reading.
90    ///
91    /// This timestamp will be greater or equal to all prior values of
92    /// `self.apply_write(write_ts)`, and strictly less than all subsequent
93    /// values of `self.write_ts()`.
94    pub(crate) fn read_ts(&self) -> T {
95        self.read_ts.clone()
96    }
97
98    /// Mark a write at `write_ts` completed.
99    ///
100    /// All subsequent values of `self.read_ts()` will be greater or equal to
101    /// `write_ts`.
102    pub(crate) fn apply_write(&mut self, write_ts: T) {
103        if self.read_ts.less_than(&write_ts) {
104            self.read_ts = write_ts;
105
106            if self.write_ts.less_than(&self.read_ts) {
107                self.write_ts = self.read_ts.clone();
108            }
109        }
110        assert!(self.read_ts.less_equal(&self.write_ts));
111    }
112}