mz_adapter/coord/in_memory_oracle.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A timestamp oracle that relies on the [`crate::catalog::Catalog`] for persistence/durability
11//! and reserves ranges of timestamps.
12
13use std::fmt::Debug;
14
15use derivative::Derivative;
16use mz_repr::TimestampManipulation;
17use mz_timestamp_oracle::{GenericNowFn, WriteTimestamp};
18
19/// A type that provides write and read timestamps, reads observe exactly their
20/// preceding writes.
21///
22/// Specifically, all read timestamps will be greater or equal to all previously
23/// reported completed write timestamps, and strictly less than all subsequently
24/// emitted write timestamps.
25///
26/// A timeline can perform reads and writes. Reads happen at the read timestamp
27/// and writes happen at the write timestamp. After the write has completed, but
28/// before a response is sent, the read timestamp must be updated to a value
29/// greater than or equal to `self.write_ts`.
30#[derive(Derivative)]
31#[derivative(Debug)]
32pub struct InMemoryTimestampOracle<T, N>
33where
34 T: Debug,
35 N: GenericNowFn<T>,
36{
37 read_ts: T,
38 write_ts: T,
39 #[derivative(Debug = "ignore")]
40 #[allow(dead_code)]
41 next: N,
42}
43
44impl<T: TimestampManipulation, N> InMemoryTimestampOracle<T, N>
45where
46 N: GenericNowFn<T>,
47{
48 /// Create a new timeline, starting at the indicated time. `next` generates
49 /// new timestamps when invoked. The timestamps have no requirements, and
50 /// can retreat from previous invocations.
51 pub fn new(initially: T, next: N) -> Self
52 where
53 N: GenericNowFn<T>,
54 {
55 Self {
56 read_ts: initially.clone(),
57 write_ts: initially,
58 next,
59 }
60 }
61
62 /// Acquire a new timestamp for writing.
63 ///
64 /// This timestamp will be strictly greater than all prior values of
65 /// `self.read_ts()` and `self.write_ts()`.
66 #[allow(dead_code)]
67 fn write_ts(&mut self) -> WriteTimestamp<T> {
68 let mut next = self.next.now();
69 if next.less_equal(&self.write_ts) {
70 next = TimestampManipulation::step_forward(&self.write_ts);
71 }
72 assert!(self.read_ts.less_than(&next));
73 assert!(self.write_ts.less_than(&next));
74 self.write_ts = next.clone();
75 assert!(self.read_ts.less_equal(&self.write_ts));
76 let advance_to = TimestampManipulation::step_forward(&next);
77 WriteTimestamp {
78 timestamp: next,
79 advance_to,
80 }
81 }
82
83 /// Peek the current write timestamp.
84 #[allow(dead_code)]
85 fn peek_write_ts(&self) -> T {
86 self.write_ts.clone()
87 }
88
89 /// Acquire a new timestamp for reading.
90 ///
91 /// This timestamp will be greater or equal to all prior values of
92 /// `self.apply_write(write_ts)`, and strictly less than all subsequent
93 /// values of `self.write_ts()`.
94 pub(crate) fn read_ts(&self) -> T {
95 self.read_ts.clone()
96 }
97
98 /// Mark a write at `write_ts` completed.
99 ///
100 /// All subsequent values of `self.read_ts()` will be greater or equal to
101 /// `write_ts`.
102 pub(crate) fn apply_write(&mut self, write_ts: T) {
103 if self.read_ts.less_than(&write_ts) {
104 self.read_ts = write_ts;
105
106 if self.write_ts.less_than(&self.read_ts) {
107 self.write_ts = self.read_ts.clone();
108 }
109 }
110 assert!(self.read_ts.less_equal(&self.write_ts));
111 }
112}