Skip to main content

mz_adapter/coord/
in_memory_oracle.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A timestamp oracle that relies on the [`crate::catalog::Catalog`] for persistence/durability
11//! and reserves ranges of timestamps.
12
13use derivative::Derivative;
14use mz_ore::now::NowFn;
15use mz_repr::{Timestamp, TimestampManipulation};
16use mz_timestamp_oracle::{GenericNowFn, WriteTimestamp};
17use timely::order::PartialOrder;
18
19/// A type that provides write and read timestamps, reads observe exactly their
20/// preceding writes.
21///
22/// Specifically, all read timestamps will be greater or equal to all previously
23/// reported completed write timestamps, and strictly less than all subsequently
24/// emitted write timestamps.
25///
26/// A timeline can perform reads and writes. Reads happen at the read timestamp
27/// and writes happen at the write timestamp. After the write has completed, but
28/// before a response is sent, the read timestamp must be updated to a value
29/// greater than or equal to `self.write_ts`.
30#[derive(Derivative)]
31#[derivative(Debug)]
32pub struct InMemoryTimestampOracle {
33    read_ts: Timestamp,
34    write_ts: Timestamp,
35    #[derivative(Debug = "ignore")]
36    #[allow(dead_code)]
37    next: NowFn<Timestamp>,
38}
39
40impl InMemoryTimestampOracle {
41    /// Create a new timeline, starting at the indicated time. `next` generates
42    /// new timestamps when invoked. The timestamps have no requirements, and
43    /// can retreat from previous invocations.
44    pub fn new(initially: Timestamp, next: NowFn<Timestamp>) -> Self {
45        Self {
46            read_ts: initially.clone(),
47            write_ts: initially,
48            next,
49        }
50    }
51
52    /// Acquire a new timestamp for writing.
53    ///
54    /// This timestamp will be strictly greater than all prior values of
55    /// `self.read_ts()` and `self.write_ts()`.
56    #[allow(dead_code)]
57    fn write_ts(&mut self) -> WriteTimestamp<Timestamp> {
58        let mut next = self.next.now();
59        if next.less_equal(&self.write_ts) {
60            next = TimestampManipulation::step_forward(&self.write_ts);
61        }
62        assert!(self.read_ts.less_than(&next));
63        assert!(self.write_ts.less_than(&next));
64        self.write_ts = next.clone();
65        assert!(self.read_ts.less_equal(&self.write_ts));
66        let advance_to = TimestampManipulation::step_forward(&next);
67        WriteTimestamp {
68            timestamp: next,
69            advance_to,
70        }
71    }
72
73    /// Peek the current write timestamp.
74    #[allow(dead_code)]
75    fn peek_write_ts(&self) -> Timestamp {
76        self.write_ts.clone()
77    }
78
79    /// Acquire a new timestamp for reading.
80    ///
81    /// This timestamp will be greater or equal to all prior values of
82    /// `self.apply_write(write_ts)`, and strictly less than all subsequent
83    /// values of `self.write_ts()`.
84    pub(crate) fn read_ts(&self) -> Timestamp {
85        self.read_ts.clone()
86    }
87
88    /// Mark a write at `write_ts` completed.
89    ///
90    /// All subsequent values of `self.read_ts()` will be greater or equal to
91    /// `write_ts`.
92    pub(crate) fn apply_write(&mut self, write_ts: Timestamp) {
93        if self.read_ts.less_than(&write_ts) {
94            self.read_ts = write_ts;
95
96            if self.write_ts.less_than(&self.read_ts) {
97                self.write_ts = self.read_ts.clone();
98            }
99        }
100        assert!(self.read_ts.less_equal(&self.write_ts));
101    }
102}