1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910//! Implementation of real-time recency.
1112use std::cmp::Reverse;
13use std::collections::BinaryHeap;
14use std::collections::binary_heap::PeekMut;
1516use differential_dataflow::lattice::Lattice;
17use mz_ore::now::EpochMillis;
18use mz_persist_client::read::{ListenEvent, Subscribe};
19use mz_persist_types::Codec64;
20use mz_repr::{GlobalId, Row};
21use mz_storage_types::StorageDiff;
22use mz_storage_types::configuration::StorageConfiguration;
23use mz_storage_types::sources::{
24 GenericSourceConnection, SourceConnection, SourceData, SourceTimestamp,
25};
26use mz_timely_util::antichain::AntichainExt;
27use timely::PartialOrder;
28use timely::order::TotalOrder;
29use timely::progress::Antichain;
30use timely::progress::frontier::MutableAntichain;
3132use crate::StorageError;
33use crate::Timestamp;
3435/// Determine the "real-time recency" timestamp for `self`.
36///
37/// Real-time recency is defined as the minimum value of `T` that `id` can
38/// be queried at to return all data visible in the upstream system the
39/// query was issued. In this case, "the upstream system" is the external
40/// system which `self` connects to.
41///
42/// # Panics
43/// - If `self` is a [`GenericSourceConnection::LoadGenerator`]. Load
44/// generator sources do not yet (or might never) support real-time
45/// recency. You can avoid this panic by choosing to not call this
46/// function on load generator sources.
47pub(super) async fn real_time_recency_ts<
48 T: Timestamp + Lattice + TotalOrder + Codec64 + From<EpochMillis> + Sync,
49>(
50 connection: GenericSourceConnection,
51 id: GlobalId,
52 config: StorageConfiguration,
53 as_of: Antichain<T>,
54 remap_subscribe: Subscribe<SourceData, (), T, StorageDiff>,
55) -> Result<T, StorageError<T>> {
56match connection {
57 GenericSourceConnection::Kafka(kafka) => {
58let external_frontier = kafka
59 .fetch_write_frontier(&config)
60 .await
61.map_err(StorageError::Generic)?;
6263 decode_remap_data_until_geq_external_frontier(
64 id,
65 external_frontier,
66 as_of,
67 remap_subscribe,
68 )
69 .await
70}
71 GenericSourceConnection::Postgres(pg) => {
72let external_frontier = pg
73 .fetch_write_frontier(&config)
74 .await
75.map_err(StorageError::Generic)?;
7677 decode_remap_data_until_geq_external_frontier(
78 id,
79 external_frontier,
80 as_of,
81 remap_subscribe,
82 )
83 .await
84}
85 GenericSourceConnection::MySql(my_sql) => {
86let external_frontier = my_sql
87 .fetch_write_frontier(&config)
88 .await
89.map_err(StorageError::Generic)?;
9091 decode_remap_data_until_geq_external_frontier(
92 id,
93 external_frontier,
94 as_of,
95 remap_subscribe,
96 )
97 .await
98}
99 GenericSourceConnection::SqlServer(sql_server) => {
100let external_frontier = sql_server
101 .fetch_write_frontier(&config)
102 .await
103.map_err(StorageError::Generic)?;
104105 decode_remap_data_until_geq_external_frontier(
106 id,
107 external_frontier,
108 as_of,
109 remap_subscribe,
110 )
111 .await
112}
113// Load generator sources have no "external system" to reach out to,
114 // so it's unclear what RTR would mean for them.
115s @ GenericSourceConnection::LoadGenerator(_) => unreachable!(
116"do not try to determine RTR timestamp on {} source",
117 s.name()
118 ),
119 }
120}
121122async fn decode_remap_data_until_geq_external_frontier<
123 FromTime: SourceTimestamp,
124 T: Timestamp + Lattice + TotalOrder + Codec64 + From<EpochMillis> + Sync,
125>(
126 id: GlobalId,
127 external_frontier: timely::progress::Antichain<FromTime>,
128 as_of: Antichain<T>,
129mut remap_subscribe: Subscribe<SourceData, (), T, StorageDiff>,
130) -> Result<T, StorageError<T>> {
131tracing::debug!(
132?id,
133"fetched real time recency frontier: {}",
134 external_frontier.pretty()
135 );
136137let external_frontier = external_frontier.borrow();
138let mut native_upper = MutableAntichain::new();
139140let mut remap_frontier = Antichain::from_elem(T::minimum());
141let mut pending_remap = BinaryHeap::new();
142143let mut min_ts = T::minimum();
144 min_ts.advance_by(as_of.borrow());
145let mut min_ts = Some(min_ts);
146147// First we must read the snapshot so that we can accumulate the minimum timestamp
148 // correctly.
149while !remap_frontier.is_empty() {
150// Receive binding updates
151for event in remap_subscribe.fetch_next().await {
152match event {
153 ListenEvent::Updates(updates) => {
154for ((k, v), into_ts, diff) in updates {
155let row: Row = k.expect("invalid binding").0.expect("invalid binding");
156let _v: () = v.expect("invalid binding");
157158let from_ts: FromTime = SourceTimestamp::decode_row(&row);
159 pending_remap.push(Reverse((into_ts, from_ts, diff)));
160 }
161 }
162163 ListenEvent::Progress(frontier) => remap_frontier = frontier,
164 }
165 }
166// Only process the data if we can correctly accumulate timestamps beyond the as_of
167if as_of.iter().any(|t| !remap_frontier.less_equal(t)) {
168loop {
169// The next timestamp that must be considered is either the minimum timestamp,
170 // if we haven't yet checked it, or the next timestamp of the pending remap
171 // updates that is not beyond the frontier.
172let binding_ts = match min_ts.take() {
173Some(min_ts) => min_ts,
174None => {
175let Some(Reverse((into_ts, _, _))) = pending_remap.peek() else {
176break;
177 };
178if !remap_frontier.less_equal(into_ts) {
179 into_ts.clone()
180 } else {
181break;
182 }
183 }
184 };
185// Create an iterator with all the updates that are less than or equal to
186 // binding_ts
187let binding_updates = std::iter::from_fn(|| {
188let update = pending_remap.peek_mut()?;
189if PartialOrder::less_equal(&update.0.0, &binding_ts) {
190let Reverse((_, from_ts, diff)) = PeekMut::pop(update);
191Some((from_ts, diff))
192 } else {
193None
194}
195 });
196 native_upper.update_iter(binding_updates);
197198// Check if at binding_ts the source contained all of the data visible in the
199 // external system. If so, we're done.
200if PartialOrder::less_equal(&external_frontier, &native_upper.frontier()) {
201tracing::trace!("real-time recency ts for {id}: {binding_ts:?}");
202return Ok(binding_ts);
203 }
204 }
205 }
206 }
207// Collection dropped before we ingested external frontier.
208Err(StorageError::RtrDropFailure(id))
209}