Skip to main content

mz_storage_controller/
rtr.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Implementation of real-time recency.
11
12use std::cmp::Reverse;
13use std::collections::BinaryHeap;
14use std::collections::binary_heap::PeekMut;
15
16use differential_dataflow::lattice::Lattice;
17use mz_persist_client::read::{ListenEvent, Subscribe};
18use mz_repr::{GlobalId, Row};
19use mz_storage_types::StorageDiff;
20use mz_storage_types::configuration::StorageConfiguration;
21use mz_storage_types::sources::{
22    GenericSourceConnection, SourceConnection, SourceData, SourceTimestamp,
23};
24use mz_timely_util::antichain::AntichainExt;
25use timely::PartialOrder;
26use timely::progress::Antichain;
27use timely::progress::frontier::MutableAntichain;
28
29use crate::StorageError;
30use crate::Timestamp;
31
32/// Determine the "real-time recency" timestamp for `self`.
33///
34/// Real-time recency is defined as the minimum value of `T` that `id` can
35/// be queried at to return all data visible in the upstream system the
36/// query was issued. In this case, "the upstream system" is the external
37/// system which `self` connects to.
38///
39/// # Panics
40/// - If `self` is a [`GenericSourceConnection::LoadGenerator`]. Load
41///   generator sources do not yet (or might never) support real-time
42///   recency. You can avoid this panic by choosing to not call this
43///   function on load generator sources.
44pub(super) async fn real_time_recency_ts(
45    connection: GenericSourceConnection,
46    id: GlobalId,
47    config: StorageConfiguration,
48    as_of: Antichain<Timestamp>,
49    remap_subscribe: Subscribe<SourceData, (), Timestamp, StorageDiff>,
50) -> Result<Timestamp, StorageError> {
51    match connection {
52        GenericSourceConnection::Kafka(kafka) => {
53            let external_frontier = kafka
54                .fetch_write_frontier(&config)
55                .await
56                .map_err(StorageError::Generic)?;
57
58            decode_remap_data_until_geq_external_frontier(
59                id,
60                external_frontier,
61                as_of,
62                remap_subscribe,
63            )
64            .await
65        }
66        GenericSourceConnection::Postgres(pg) => {
67            let external_frontier = pg
68                .fetch_write_frontier(&config)
69                .await
70                .map_err(StorageError::Generic)?;
71
72            decode_remap_data_until_geq_external_frontier(
73                id,
74                external_frontier,
75                as_of,
76                remap_subscribe,
77            )
78            .await
79        }
80        GenericSourceConnection::MySql(my_sql) => {
81            let external_frontier = my_sql
82                .fetch_write_frontier(&config)
83                .await
84                .map_err(StorageError::Generic)?;
85
86            decode_remap_data_until_geq_external_frontier(
87                id,
88                external_frontier,
89                as_of,
90                remap_subscribe,
91            )
92            .await
93        }
94        GenericSourceConnection::SqlServer(sql_server) => {
95            let external_frontier = sql_server
96                .fetch_write_frontier(&config)
97                .await
98                .map_err(StorageError::Generic)?;
99
100            decode_remap_data_until_geq_external_frontier(
101                id,
102                external_frontier,
103                as_of,
104                remap_subscribe,
105            )
106            .await
107        }
108        // Load generator sources have no "external system" to reach out to,
109        // so it's unclear what RTR would mean for them.
110        s @ GenericSourceConnection::LoadGenerator(_) => unreachable!(
111            "do not try to determine RTR timestamp on {} source",
112            s.name()
113        ),
114    }
115}
116
117async fn decode_remap_data_until_geq_external_frontier<FromTime: SourceTimestamp>(
118    id: GlobalId,
119    external_frontier: timely::progress::Antichain<FromTime>,
120    as_of: Antichain<Timestamp>,
121    mut remap_subscribe: Subscribe<SourceData, (), Timestamp, StorageDiff>,
122) -> Result<Timestamp, StorageError> {
123    tracing::debug!(
124        ?id,
125        "fetched real time recency frontier: {}",
126        external_frontier.pretty()
127    );
128
129    let external_frontier = external_frontier.borrow();
130    let mut native_upper = MutableAntichain::new();
131
132    let mut remap_frontier = Antichain::from_elem(Timestamp::MIN);
133    let mut pending_remap = BinaryHeap::new();
134
135    let mut min_ts = Timestamp::MIN;
136    min_ts.advance_by(as_of.borrow());
137    let mut min_ts = Some(min_ts);
138
139    // First we must read the snapshot so that we can accumulate the minimum timestamp
140    // correctly.
141    while !remap_frontier.is_empty() {
142        // Receive binding updates
143        for event in remap_subscribe.fetch_next().await {
144            match event {
145                ListenEvent::Updates(updates) => {
146                    for ((k, v), into_ts, diff) in updates {
147                        let row: Row = k.0.expect("invalid binding");
148                        let _v: () = v;
149
150                        let from_ts: FromTime = SourceTimestamp::decode_row(&row);
151                        pending_remap.push(Reverse((into_ts, from_ts, diff)));
152                    }
153                }
154
155                ListenEvent::Progress(frontier) => remap_frontier = frontier,
156            }
157        }
158        // Only process the data if we can correctly accumulate timestamps beyond the as_of
159        if as_of.iter().any(|t| !remap_frontier.less_equal(t)) {
160            loop {
161                // The next timestamp that must be considered is either the minimum timestamp,
162                // if we haven't yet checked it, or the next timestamp of the pending remap
163                // updates that is not beyond the frontier.
164                let binding_ts = match min_ts.take() {
165                    Some(min_ts) => min_ts,
166                    None => {
167                        let Some(Reverse((into_ts, _, _))) = pending_remap.peek() else {
168                            break;
169                        };
170                        if !remap_frontier.less_equal(into_ts) {
171                            *into_ts
172                        } else {
173                            break;
174                        }
175                    }
176                };
177                // Create an iterator with all the updates that are less than or equal to
178                // binding_ts
179                let binding_updates = std::iter::from_fn(|| {
180                    let update = pending_remap.peek_mut()?;
181                    if PartialOrder::less_equal(&update.0.0, &binding_ts) {
182                        let Reverse((_, from_ts, diff)) = PeekMut::pop(update);
183                        Some((from_ts, diff))
184                    } else {
185                        None
186                    }
187                });
188                native_upper.update_iter(binding_updates);
189
190                // Check if at binding_ts the source contained all of the data visible in the
191                // external system. If so, we're done.
192                if PartialOrder::less_equal(&external_frontier, &native_upper.frontier()) {
193                    tracing::trace!("real-time recency ts for {id}: {binding_ts:?}");
194                    return Ok(binding_ts);
195                }
196            }
197        }
198    }
199    // Collection dropped before we ingested external frontier.
200    Err(StorageError::RtrDropFailure(id))
201}