1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! A timestamp oracle that relies on the [`crate::catalog::Catalog`] for persistence/durability
//! and reserves ranges of timestamps.

use std::fmt::Debug;

use derivative::Derivative;
use mz_repr::TimestampManipulation;
use mz_timestamp_oracle::{GenericNowFn, WriteTimestamp};

/// A type that provides write and read timestamps, reads observe exactly their
/// preceding writes.
///
/// Specifically, all read timestamps will be greater or equal to all previously
/// reported completed write timestamps, and strictly less than all subsequently
/// emitted write timestamps.
///
/// A timeline can perform reads and writes. Reads happen at the read timestamp
/// and writes happen at the write timestamp. After the write has completed, but
/// before a response is sent, the read timestamp must be updated to a value
/// greater than or equal to `self.write_ts`.
#[derive(Derivative)]
#[derivative(Debug)]
pub struct InMemoryTimestampOracle<T, N>
where
    T: Debug,
    N: GenericNowFn<T>,
{
    read_ts: T,
    write_ts: T,
    #[derivative(Debug = "ignore")]
    #[allow(dead_code)]
    next: N,
}

impl<T: TimestampManipulation, N> InMemoryTimestampOracle<T, N>
where
    N: GenericNowFn<T>,
{
    /// Create a new timeline, starting at the indicated time. `next` generates
    /// new timestamps when invoked. The timestamps have no requirements, and
    /// can retreat from previous invocations.
    pub fn new(initially: T, next: N) -> Self
    where
        N: GenericNowFn<T>,
    {
        Self {
            read_ts: initially.clone(),
            write_ts: initially,
            next,
        }
    }

    /// Acquire a new timestamp for writing.
    ///
    /// This timestamp will be strictly greater than all prior values of
    /// `self.read_ts()` and `self.write_ts()`.
    #[allow(dead_code)]
    fn write_ts(&mut self) -> WriteTimestamp<T> {
        let mut next = self.next.now();
        if next.less_equal(&self.write_ts) {
            next = TimestampManipulation::step_forward(&self.write_ts);
        }
        assert!(self.read_ts.less_than(&next));
        assert!(self.write_ts.less_than(&next));
        self.write_ts = next.clone();
        assert!(self.read_ts.less_equal(&self.write_ts));
        let advance_to = TimestampManipulation::step_forward(&next);
        WriteTimestamp {
            timestamp: next,
            advance_to,
        }
    }

    /// Peek the current write timestamp.
    #[allow(dead_code)]
    fn peek_write_ts(&self) -> T {
        self.write_ts.clone()
    }

    /// Acquire a new timestamp for reading.
    ///
    /// This timestamp will be greater or equal to all prior values of
    /// `self.apply_write(write_ts)`, and strictly less than all subsequent
    /// values of `self.write_ts()`.
    pub(crate) fn read_ts(&self) -> T {
        self.read_ts.clone()
    }

    /// Mark a write at `write_ts` completed.
    ///
    /// All subsequent values of `self.read_ts()` will be greater or equal to
    /// `write_ts`.
    pub(crate) fn apply_write(&mut self, write_ts: T) {
        if self.read_ts.less_than(&write_ts) {
            self.read_ts = write_ts;

            if self.write_ts.less_than(&self.read_ts) {
                self.write_ts = self.read_ts.clone();
            }
        }
        assert!(self.read_ts.less_equal(&self.write_ts));
    }
}