mz_storage_controller/persist_handles/
read_only_table_worker.rs1use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::ops::ControlFlow;
15
16use differential_dataflow::lattice::Lattice;
17use futures::FutureExt;
18use mz_persist_client::write::WriteHandle;
19use mz_persist_types::Codec64;
20use mz_repr::{GlobalId, TimestampManipulation};
21use mz_storage_client::client::{TableData, Update};
22use mz_storage_types::StorageDiff;
23use mz_storage_types::controller::InvalidUpper;
24use mz_storage_types::sources::SourceData;
25use timely::PartialOrder;
26use timely::progress::{Antichain, Timestamp};
27use tracing::Span;
28
29use crate::StorageError;
30use crate::persist_handles::{PersistTableWriteCmd, append_work};
31
32pub(crate) async fn read_only_mode_table_worker<
51 T: Timestamp + Lattice + Codec64 + TimestampManipulation,
52>(
53 mut rx: tokio::sync::mpsc::UnboundedReceiver<(Span, PersistTableWriteCmd<T>)>,
54 txns_handle: WriteHandle<SourceData, (), T, StorageDiff>,
55) {
56 let mut write_handles =
57 BTreeMap::<GlobalId, WriteHandle<SourceData, (), T, StorageDiff>>::new();
58
59 let gen_upper_future = |mut handle: WriteHandle<SourceData, (), T, StorageDiff>| {
60 let fut = async move {
61 let current_upper = handle.shared_upper();
62 handle.wait_for_upper_past(¤t_upper).await;
63 let new_upper = handle.shared_upper();
64 (handle, new_upper)
65 };
66
67 fut.boxed()
68 };
69
70 let mut txns_upper_future = {
71 let txns_upper_future = gen_upper_future(txns_handle);
72 txns_upper_future
73 };
74
75 let shutdown_reason = loop {
76 tokio::select! {
77 (handle, upper) = &mut txns_upper_future => {
78 tracing::debug!("new upper from txns shard: {:?}, advancing upper of migrated builtin tables", upper);
79 advance_uppers(&mut write_handles, upper).await;
80
81 let fut = gen_upper_future(handle);
82 txns_upper_future = fut;
83 }
84 cmd = rx.recv() => {
85 let Some(cmd) = cmd else {
86 break "command rx closed".to_string();
87 };
88
89 let mut commands = VecDeque::new();
94 commands.push_back(cmd);
95 while let Ok(cmd) = rx.try_recv() {
96 commands.push_back(cmd);
97 }
98
99 let result = handle_commands(&mut write_handles, commands).await;
100
101 match result {
102 ControlFlow::Continue(_) => {
103 continue;
104 }
105 ControlFlow::Break(msg) => {
106 break msg;
107 }
108 }
109
110 }
111 }
112 };
113
114 tracing::info!(%shutdown_reason, "PersistTableWriteWorker shutting down");
115}
116
117async fn handle_commands<T>(
119 write_handles: &mut BTreeMap<GlobalId, WriteHandle<SourceData, (), T, StorageDiff>>,
120 mut commands: VecDeque<(Span, PersistTableWriteCmd<T>)>,
121) -> ControlFlow<String>
122where
123 T: Timestamp + Lattice + Codec64 + TimestampManipulation,
124{
125 let mut shutdown = false;
126
127 let mut all_updates = BTreeMap::default();
129 let mut all_responses = Vec::default();
130
131 while let Some((span, command)) = commands.pop_front() {
132 match command {
133 PersistTableWriteCmd::Register(_register_ts, ids_handles, tx) => {
134 for (id, write_handle) in ids_handles {
135 assert!(id.is_system(), "trying to register non-system id {id}");
137
138 let previous = write_handles.insert(id, write_handle);
139 if previous.is_some() {
140 panic!("already registered a WriteHandle for collection {:?}", id);
141 }
142 }
143 let _ = tx.send(());
145 }
146 PersistTableWriteCmd::Update {
147 existing_collection,
148 new_collection,
149 handle,
150 forget_ts: _,
151 register_ts: _,
152 tx,
153 } => {
154 write_handles.remove(&existing_collection);
155 write_handles.insert(new_collection, handle).expect(
156 "PersistTableWriteCmd::Update only valid for updating extant write handles",
157 );
158 let _ = tx.send(());
160 }
161 PersistTableWriteCmd::DropHandles {
162 forget_ts: _,
163 ids,
164 tx,
165 } => {
166 for id in ids {
173 write_handles.remove(&id);
174 }
175 let _ = tx.send(());
177 }
178 PersistTableWriteCmd::Append {
179 write_ts,
180 advance_to,
181 updates,
182 tx,
183 } => {
184 let mut ids = BTreeSet::new();
185 for (id, updates_no_ts) in updates {
186 ids.insert(id);
187 let (old_span, updates, _expected_upper, old_new_upper) =
188 all_updates.entry(id).or_insert_with(|| {
189 (
190 span.clone(),
191 Vec::default(),
192 Antichain::from_elem(write_ts.clone()),
193 Antichain::from_elem(T::minimum()),
194 )
195 });
196
197 if old_span.id() != span.id() {
198 old_span.follows_from(span.id());
204 }
205 let updates_with_ts = updates_no_ts.into_iter().flat_map(|x| match x {
206 TableData::Rows(rows) => {
207 let iter = rows.into_iter().map(|(row, diff)| Update {
208 row,
209 timestamp: write_ts.clone(),
210 diff,
211 });
212 itertools::Either::Left(iter)
213 }
214 TableData::Batches(_) => {
215 mz_ore::soft_panic_or_log!(
217 "handle Batches of updates in the ReadOnlyTableWorker"
218 );
219 itertools::Either::Right(std::iter::empty())
220 }
221 });
222 updates.extend(updates_with_ts);
223 old_new_upper.join_assign(&Antichain::from_elem(advance_to.clone()));
224 }
225 all_responses.push((ids, tx));
226 }
227 PersistTableWriteCmd::Shutdown => shutdown = true,
228 }
229 }
230
231 let result = append_work(write_handles, all_updates).await;
232
233 for (ids, response) in all_responses {
234 let result = match &result {
235 Err(bad_ids) => {
236 let filtered: Vec<_> = bad_ids
237 .iter()
238 .filter(|(id, _)| ids.contains(id))
239 .cloned()
240 .map(|(id, current_upper)| InvalidUpper { id, current_upper })
241 .collect();
242 if filtered.is_empty() {
243 Ok(())
244 } else {
245 Err(StorageError::InvalidUppers(filtered))
246 }
247 }
248 Ok(()) => Ok(()),
249 };
250 let _ = response.send(result);
252 }
253
254 if shutdown {
255 ControlFlow::Break("received a shutdown command".to_string())
256 } else {
257 ControlFlow::Continue(())
258 }
259}
260
261async fn advance_uppers<T>(
264 write_handles: &mut BTreeMap<GlobalId, WriteHandle<SourceData, (), T, StorageDiff>>,
265 upper: Antichain<T>,
266) where
267 T: Timestamp + Lattice + Codec64 + TimestampManipulation,
268{
269 let mut all_updates = BTreeMap::default();
270
271 for (id, write_handle) in write_handles.iter_mut() {
272 assert!(id.is_system(), "trying to register non-system id {id}");
275
276 let expected_upper = write_handle.fetch_recent_upper().await.to_owned();
280
281 if expected_upper.elements() == &[T::minimum()] {
283 continue;
284 }
285
286 if PartialOrder::less_equal(&upper, &expected_upper) {
287 continue;
290 }
291
292 all_updates.insert(
293 *id,
294 (Span::none(), Vec::new(), expected_upper, upper.clone()),
295 );
296 }
297
298 let result = append_work(write_handles, all_updates).await;
299 tracing::debug!(?result, "advanced upper of migrated builtin tables");
300}