mz_storage_controller/
rtr.rs1use std::cmp::Reverse;
13use std::collections::BinaryHeap;
14use std::collections::binary_heap::PeekMut;
15
16use differential_dataflow::lattice::Lattice;
17use mz_persist_client::read::{ListenEvent, Subscribe};
18use mz_repr::{GlobalId, Row};
19use mz_storage_types::StorageDiff;
20use mz_storage_types::configuration::StorageConfiguration;
21use mz_storage_types::sources::{
22 GenericSourceConnection, SourceConnection, SourceData, SourceTimestamp,
23};
24use mz_timely_util::antichain::AntichainExt;
25use timely::PartialOrder;
26use timely::progress::Antichain;
27use timely::progress::frontier::MutableAntichain;
28
29use crate::StorageError;
30use crate::Timestamp;
31
32pub(super) async fn real_time_recency_ts(
45 connection: GenericSourceConnection,
46 id: GlobalId,
47 config: StorageConfiguration,
48 as_of: Antichain<Timestamp>,
49 remap_subscribe: Subscribe<SourceData, (), Timestamp, StorageDiff>,
50) -> Result<Timestamp, StorageError> {
51 match connection {
52 GenericSourceConnection::Kafka(kafka) => {
53 let external_frontier = kafka
54 .fetch_write_frontier(&config)
55 .await
56 .map_err(StorageError::Generic)?;
57
58 decode_remap_data_until_geq_external_frontier(
59 id,
60 external_frontier,
61 as_of,
62 remap_subscribe,
63 )
64 .await
65 }
66 GenericSourceConnection::Postgres(pg) => {
67 let external_frontier = pg
68 .fetch_write_frontier(&config)
69 .await
70 .map_err(StorageError::Generic)?;
71
72 decode_remap_data_until_geq_external_frontier(
73 id,
74 external_frontier,
75 as_of,
76 remap_subscribe,
77 )
78 .await
79 }
80 GenericSourceConnection::MySql(my_sql) => {
81 let external_frontier = my_sql
82 .fetch_write_frontier(&config)
83 .await
84 .map_err(StorageError::Generic)?;
85
86 decode_remap_data_until_geq_external_frontier(
87 id,
88 external_frontier,
89 as_of,
90 remap_subscribe,
91 )
92 .await
93 }
94 GenericSourceConnection::SqlServer(sql_server) => {
95 let external_frontier = sql_server
96 .fetch_write_frontier(&config)
97 .await
98 .map_err(StorageError::Generic)?;
99
100 decode_remap_data_until_geq_external_frontier(
101 id,
102 external_frontier,
103 as_of,
104 remap_subscribe,
105 )
106 .await
107 }
108 s @ GenericSourceConnection::LoadGenerator(_) => unreachable!(
111 "do not try to determine RTR timestamp on {} source",
112 s.name()
113 ),
114 }
115}
116
117async fn decode_remap_data_until_geq_external_frontier<FromTime: SourceTimestamp>(
118 id: GlobalId,
119 external_frontier: timely::progress::Antichain<FromTime>,
120 as_of: Antichain<Timestamp>,
121 mut remap_subscribe: Subscribe<SourceData, (), Timestamp, StorageDiff>,
122) -> Result<Timestamp, StorageError> {
123 tracing::debug!(
124 ?id,
125 "fetched real time recency frontier: {}",
126 external_frontier.pretty()
127 );
128
129 let external_frontier = external_frontier.borrow();
130 let mut native_upper = MutableAntichain::new();
131
132 let mut remap_frontier = Antichain::from_elem(Timestamp::MIN);
133 let mut pending_remap = BinaryHeap::new();
134
135 let mut min_ts = Timestamp::MIN;
136 min_ts.advance_by(as_of.borrow());
137 let mut min_ts = Some(min_ts);
138
139 while !remap_frontier.is_empty() {
142 for event in remap_subscribe.fetch_next().await {
144 match event {
145 ListenEvent::Updates(updates) => {
146 for ((k, v), into_ts, diff) in updates {
147 let row: Row = k.0.expect("invalid binding");
148 let _v: () = v;
149
150 let from_ts: FromTime = SourceTimestamp::decode_row(&row);
151 pending_remap.push(Reverse((into_ts, from_ts, diff)));
152 }
153 }
154
155 ListenEvent::Progress(frontier) => remap_frontier = frontier,
156 }
157 }
158 if as_of.iter().any(|t| !remap_frontier.less_equal(t)) {
160 loop {
161 let binding_ts = match min_ts.take() {
165 Some(min_ts) => min_ts,
166 None => {
167 let Some(Reverse((into_ts, _, _))) = pending_remap.peek() else {
168 break;
169 };
170 if !remap_frontier.less_equal(into_ts) {
171 *into_ts
172 } else {
173 break;
174 }
175 }
176 };
177 let binding_updates = std::iter::from_fn(|| {
180 let update = pending_remap.peek_mut()?;
181 if PartialOrder::less_equal(&update.0.0, &binding_ts) {
182 let Reverse((_, from_ts, diff)) = PeekMut::pop(update);
183 Some((from_ts, diff))
184 } else {
185 None
186 }
187 });
188 native_upper.update_iter(binding_updates);
189
190 if PartialOrder::less_equal(&external_frontier, &native_upper.frontier()) {
193 tracing::trace!("real-time recency ts for {id}: {binding_ts:?}");
194 return Ok(binding_ts);
195 }
196 }
197 }
198 }
199 Err(StorageError::RtrDropFailure(id))
201}