mz_storage_controller/
rtr.rs1use std::cmp::Reverse;
13use std::collections::BinaryHeap;
14use std::collections::binary_heap::PeekMut;
15
16use differential_dataflow::lattice::Lattice;
17use mz_ore::now::EpochMillis;
18use mz_persist_client::read::{ListenEvent, Subscribe};
19use mz_persist_types::Codec64;
20use mz_repr::{GlobalId, Row};
21use mz_storage_types::StorageDiff;
22use mz_storage_types::configuration::StorageConfiguration;
23use mz_storage_types::sources::{
24    GenericSourceConnection, SourceConnection, SourceData, SourceTimestamp,
25};
26use mz_timely_util::antichain::AntichainExt;
27use timely::PartialOrder;
28use timely::order::TotalOrder;
29use timely::progress::Antichain;
30use timely::progress::frontier::MutableAntichain;
31
32use crate::StorageError;
33use crate::Timestamp;
34
35pub(super) async fn real_time_recency_ts<
48    T: Timestamp + Lattice + TotalOrder + Codec64 + From<EpochMillis> + Sync,
49>(
50    connection: GenericSourceConnection,
51    id: GlobalId,
52    config: StorageConfiguration,
53    as_of: Antichain<T>,
54    remap_subscribe: Subscribe<SourceData, (), T, StorageDiff>,
55) -> Result<T, StorageError<T>> {
56    match connection {
57        GenericSourceConnection::Kafka(kafka) => {
58            let external_frontier = kafka
59                .fetch_write_frontier(&config)
60                .await
61                .map_err(StorageError::Generic)?;
62
63            decode_remap_data_until_geq_external_frontier(
64                id,
65                external_frontier,
66                as_of,
67                remap_subscribe,
68            )
69            .await
70        }
71        GenericSourceConnection::Postgres(pg) => {
72            let external_frontier = pg
73                .fetch_write_frontier(&config)
74                .await
75                .map_err(StorageError::Generic)?;
76
77            decode_remap_data_until_geq_external_frontier(
78                id,
79                external_frontier,
80                as_of,
81                remap_subscribe,
82            )
83            .await
84        }
85        GenericSourceConnection::MySql(my_sql) => {
86            let external_frontier = my_sql
87                .fetch_write_frontier(&config)
88                .await
89                .map_err(StorageError::Generic)?;
90
91            decode_remap_data_until_geq_external_frontier(
92                id,
93                external_frontier,
94                as_of,
95                remap_subscribe,
96            )
97            .await
98        }
99        GenericSourceConnection::SqlServer(sql_server) => {
100            let external_frontier = sql_server
101                .fetch_write_frontier(&config)
102                .await
103                .map_err(StorageError::Generic)?;
104
105            decode_remap_data_until_geq_external_frontier(
106                id,
107                external_frontier,
108                as_of,
109                remap_subscribe,
110            )
111            .await
112        }
113        s @ GenericSourceConnection::LoadGenerator(_) => unreachable!(
116            "do not try to determine RTR timestamp on {} source",
117            s.name()
118        ),
119    }
120}
121
122async fn decode_remap_data_until_geq_external_frontier<
123    FromTime: SourceTimestamp,
124    T: Timestamp + Lattice + TotalOrder + Codec64 + From<EpochMillis> + Sync,
125>(
126    id: GlobalId,
127    external_frontier: timely::progress::Antichain<FromTime>,
128    as_of: Antichain<T>,
129    mut remap_subscribe: Subscribe<SourceData, (), T, StorageDiff>,
130) -> Result<T, StorageError<T>> {
131    tracing::debug!(
132        ?id,
133        "fetched real time recency frontier: {}",
134        external_frontier.pretty()
135    );
136
137    let external_frontier = external_frontier.borrow();
138    let mut native_upper = MutableAntichain::new();
139
140    let mut remap_frontier = Antichain::from_elem(T::minimum());
141    let mut pending_remap = BinaryHeap::new();
142
143    let mut min_ts = T::minimum();
144    min_ts.advance_by(as_of.borrow());
145    let mut min_ts = Some(min_ts);
146
147    while !remap_frontier.is_empty() {
150        for event in remap_subscribe.fetch_next().await {
152            match event {
153                ListenEvent::Updates(updates) => {
154                    for ((k, v), into_ts, diff) in updates {
155                        let row: Row = k.expect("invalid binding").0.expect("invalid binding");
156                        let _v: () = v.expect("invalid binding");
157
158                        let from_ts: FromTime = SourceTimestamp::decode_row(&row);
159                        pending_remap.push(Reverse((into_ts, from_ts, diff)));
160                    }
161                }
162
163                ListenEvent::Progress(frontier) => remap_frontier = frontier,
164            }
165        }
166        if as_of.iter().any(|t| !remap_frontier.less_equal(t)) {
168            loop {
169                let binding_ts = match min_ts.take() {
173                    Some(min_ts) => min_ts,
174                    None => {
175                        let Some(Reverse((into_ts, _, _))) = pending_remap.peek() else {
176                            break;
177                        };
178                        if !remap_frontier.less_equal(into_ts) {
179                            into_ts.clone()
180                        } else {
181                            break;
182                        }
183                    }
184                };
185                let binding_updates = std::iter::from_fn(|| {
188                    let update = pending_remap.peek_mut()?;
189                    if PartialOrder::less_equal(&update.0.0, &binding_ts) {
190                        let Reverse((_, from_ts, diff)) = PeekMut::pop(update);
191                        Some((from_ts, diff))
192                    } else {
193                        None
194                    }
195                });
196                native_upper.update_iter(binding_updates);
197
198                if PartialOrder::less_equal(&external_frontier, &native_upper.frontier()) {
201                    tracing::trace!("real-time recency ts for {id}: {binding_ts:?}");
202                    return Ok(binding_ts);
203                }
204            }
205        }
206    }
207    Err(StorageError::RtrDropFailure(id))
209}