mz_storage_controller/
rtr.rs1use std::cmp::Reverse;
13use std::collections::BinaryHeap;
14use std::collections::binary_heap::PeekMut;
15
16use differential_dataflow::lattice::Lattice;
17use mz_ore::now::EpochMillis;
18use mz_persist_client::read::{ListenEvent, Subscribe};
19use mz_persist_types::Codec64;
20use mz_repr::{GlobalId, Row};
21use mz_storage_types::StorageDiff;
22use mz_storage_types::configuration::StorageConfiguration;
23use mz_storage_types::sources::{
24 GenericSourceConnection, SourceConnection, SourceData, SourceTimestamp,
25};
26use mz_timely_util::antichain::AntichainExt;
27use timely::PartialOrder;
28use timely::order::TotalOrder;
29use timely::progress::Antichain;
30use timely::progress::frontier::MutableAntichain;
31
32use crate::StorageError;
33use crate::Timestamp;
34
35pub(super) async fn real_time_recency_ts<
48 T: Timestamp + Lattice + TotalOrder + Codec64 + From<EpochMillis> + Sync,
49>(
50 connection: GenericSourceConnection,
51 id: GlobalId,
52 config: StorageConfiguration,
53 as_of: Antichain<T>,
54 remap_subscribe: Subscribe<SourceData, (), T, StorageDiff>,
55) -> Result<T, StorageError<T>> {
56 match connection {
57 GenericSourceConnection::Kafka(kafka) => {
58 let external_frontier = kafka
59 .fetch_write_frontier(&config)
60 .await
61 .map_err(StorageError::Generic)?;
62
63 decode_remap_data_until_geq_external_frontier(
64 id,
65 external_frontier,
66 as_of,
67 remap_subscribe,
68 )
69 .await
70 }
71 GenericSourceConnection::Postgres(pg) => {
72 let external_frontier = pg
73 .fetch_write_frontier(&config)
74 .await
75 .map_err(StorageError::Generic)?;
76
77 decode_remap_data_until_geq_external_frontier(
78 id,
79 external_frontier,
80 as_of,
81 remap_subscribe,
82 )
83 .await
84 }
85 GenericSourceConnection::MySql(my_sql) => {
86 let external_frontier = my_sql
87 .fetch_write_frontier(&config)
88 .await
89 .map_err(StorageError::Generic)?;
90
91 decode_remap_data_until_geq_external_frontier(
92 id,
93 external_frontier,
94 as_of,
95 remap_subscribe,
96 )
97 .await
98 }
99 GenericSourceConnection::SqlServer(sql_server) => {
100 let external_frontier = sql_server
101 .fetch_write_frontier(&config)
102 .await
103 .map_err(StorageError::Generic)?;
104
105 decode_remap_data_until_geq_external_frontier(
106 id,
107 external_frontier,
108 as_of,
109 remap_subscribe,
110 )
111 .await
112 }
113 s @ GenericSourceConnection::LoadGenerator(_) => unreachable!(
116 "do not try to determine RTR timestamp on {} source",
117 s.name()
118 ),
119 }
120}
121
122async fn decode_remap_data_until_geq_external_frontier<
123 FromTime: SourceTimestamp,
124 T: Timestamp + Lattice + TotalOrder + Codec64 + From<EpochMillis> + Sync,
125>(
126 id: GlobalId,
127 external_frontier: timely::progress::Antichain<FromTime>,
128 as_of: Antichain<T>,
129 mut remap_subscribe: Subscribe<SourceData, (), T, StorageDiff>,
130) -> Result<T, StorageError<T>> {
131 tracing::debug!(
132 ?id,
133 "fetched real time recency frontier: {}",
134 external_frontier.pretty()
135 );
136
137 let external_frontier = external_frontier.borrow();
138 let mut native_upper = MutableAntichain::new();
139
140 let mut remap_frontier = Antichain::from_elem(T::minimum());
141 let mut pending_remap = BinaryHeap::new();
142
143 let mut min_ts = T::minimum();
144 min_ts.advance_by(as_of.borrow());
145 let mut min_ts = Some(min_ts);
146
147 while !remap_frontier.is_empty() {
150 for event in remap_subscribe.fetch_next().await {
152 match event {
153 ListenEvent::Updates(updates) => {
154 for ((k, v), into_ts, diff) in updates {
155 let row: Row = k.expect("invalid binding").0.expect("invalid binding");
156 let _v: () = v.expect("invalid binding");
157
158 let from_ts: FromTime = SourceTimestamp::decode_row(&row);
159 pending_remap.push(Reverse((into_ts, from_ts, diff)));
160 }
161 }
162
163 ListenEvent::Progress(frontier) => remap_frontier = frontier,
164 }
165 }
166 if as_of.iter().any(|t| !remap_frontier.less_equal(t)) {
168 loop {
169 let binding_ts = match min_ts.take() {
173 Some(min_ts) => min_ts,
174 None => {
175 let Some(Reverse((into_ts, _, _))) = pending_remap.peek() else {
176 break;
177 };
178 if !remap_frontier.less_equal(into_ts) {
179 into_ts.clone()
180 } else {
181 break;
182 }
183 }
184 };
185 let binding_updates = std::iter::from_fn(|| {
188 let update = pending_remap.peek_mut()?;
189 if PartialOrder::less_equal(&update.0.0, &binding_ts) {
190 let Reverse((_, from_ts, diff)) = PeekMut::pop(update);
191 Some((from_ts, diff))
192 } else {
193 None
194 }
195 });
196 native_upper.update_iter(binding_updates);
197
198 if PartialOrder::less_equal(&external_frontier, &native_upper.frontier()) {
201 tracing::trace!("real-time recency ts for {id}: {binding_ts:?}");
202 return Ok(binding_ts);
203 }
204 }
205 }
206 }
207 Err(StorageError::RtrDropFailure(id))
209}