mz_adapter/coord/in_memory_oracle.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A timestamp oracle that relies on the [`crate::catalog::Catalog`] for persistence/durability
11//! and reserves ranges of timestamps.
12
13use derivative::Derivative;
14use mz_ore::now::NowFn;
15use mz_repr::{Timestamp, TimestampManipulation};
16use mz_timestamp_oracle::{GenericNowFn, WriteTimestamp};
17use timely::order::PartialOrder;
18
19/// A type that provides write and read timestamps, reads observe exactly their
20/// preceding writes.
21///
22/// Specifically, all read timestamps will be greater or equal to all previously
23/// reported completed write timestamps, and strictly less than all subsequently
24/// emitted write timestamps.
25///
26/// A timeline can perform reads and writes. Reads happen at the read timestamp
27/// and writes happen at the write timestamp. After the write has completed, but
28/// before a response is sent, the read timestamp must be updated to a value
29/// greater than or equal to `self.write_ts`.
30#[derive(Derivative)]
31#[derivative(Debug)]
32pub struct InMemoryTimestampOracle {
33 read_ts: Timestamp,
34 write_ts: Timestamp,
35 #[derivative(Debug = "ignore")]
36 #[allow(dead_code)]
37 next: NowFn<Timestamp>,
38}
39
40impl InMemoryTimestampOracle {
41 /// Create a new timeline, starting at the indicated time. `next` generates
42 /// new timestamps when invoked. The timestamps have no requirements, and
43 /// can retreat from previous invocations.
44 pub fn new(initially: Timestamp, next: NowFn<Timestamp>) -> Self {
45 Self {
46 read_ts: initially.clone(),
47 write_ts: initially,
48 next,
49 }
50 }
51
52 /// Acquire a new timestamp for writing.
53 ///
54 /// This timestamp will be strictly greater than all prior values of
55 /// `self.read_ts()` and `self.write_ts()`.
56 #[allow(dead_code)]
57 fn write_ts(&mut self) -> WriteTimestamp<Timestamp> {
58 let mut next = self.next.now();
59 if next.less_equal(&self.write_ts) {
60 next = TimestampManipulation::step_forward(&self.write_ts);
61 }
62 assert!(self.read_ts.less_than(&next));
63 assert!(self.write_ts.less_than(&next));
64 self.write_ts = next.clone();
65 assert!(self.read_ts.less_equal(&self.write_ts));
66 let advance_to = TimestampManipulation::step_forward(&next);
67 WriteTimestamp {
68 timestamp: next,
69 advance_to,
70 }
71 }
72
73 /// Peek the current write timestamp.
74 #[allow(dead_code)]
75 fn peek_write_ts(&self) -> Timestamp {
76 self.write_ts.clone()
77 }
78
79 /// Acquire a new timestamp for reading.
80 ///
81 /// This timestamp will be greater or equal to all prior values of
82 /// `self.apply_write(write_ts)`, and strictly less than all subsequent
83 /// values of `self.write_ts()`.
84 pub(crate) fn read_ts(&self) -> Timestamp {
85 self.read_ts.clone()
86 }
87
88 /// Mark a write at `write_ts` completed.
89 ///
90 /// All subsequent values of `self.read_ts()` will be greater or equal to
91 /// `write_ts`.
92 pub(crate) fn apply_write(&mut self, write_ts: Timestamp) {
93 if self.read_ts.less_than(&write_ts) {
94 self.read_ts = write_ts;
95
96 if self.write_ts.less_than(&self.read_ts) {
97 self.write_ts = self.read_ts.clone();
98 }
99 }
100 assert!(self.read_ts.less_equal(&self.write_ts));
101 }
102}