mz_storage_controller/persist_handles/read_only_table_worker.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A tokio tasks (and support machinery) for dealing with the persist handles
11//! that the storage controller needs to hold.
12
13use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::ops::ControlFlow;
15
16use differential_dataflow::lattice::Lattice;
17use futures::FutureExt;
18use mz_persist_client::write::WriteHandle;
19use mz_repr::{GlobalId, Timestamp};
20use mz_storage_client::client::{TableData, Update};
21use mz_storage_types::StorageDiff;
22use mz_storage_types::controller::InvalidUpper;
23use mz_storage_types::sources::SourceData;
24use timely::PartialOrder;
25use timely::progress::Antichain;
26use tracing::Span;
27
28use crate::StorageError;
29use crate::persist_handles::{PersistTableWriteCmd, append_work};
30
31/// Handles table updates in read only mode.
32///
33/// In read only mode, we write to tables outside of the txn-wal system. This is
34/// a gross hack, but it is a quick fix to allow us to perform migrations of the
35/// built-in tables in the new generation during a deployment. We need to write
36/// to the new shards for migrated built-in tables so that dataflows that depend
37/// on those tables can catch up, but we don't want to register them into the
38/// existing txn-wal shard, as that would mutate the state of the old generation
39/// while it's still running. We could instead create a new txn shard in the new
40/// generation for *just* system catalog tables, but then we'd have to do a
41/// complicated dance to move the system catalog tables back to the original txn
42/// shard during promotion, without ever losing track of a shard or registering
43/// it in two txn shards simultaneously.
44///
45/// This code is a nearly line-for-line reintroduction of the code that managed
46/// writing to tables before the txn-wal system. This code can (again) be
47/// deleted when we switch to using native persist schema migrations to perform
48/// mgirations of built-in tables.
49pub(crate) async fn read_only_mode_table_worker(
50 mut rx: tokio::sync::mpsc::UnboundedReceiver<(Span, PersistTableWriteCmd)>,
51 txns_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
52) {
53 let mut write_handles =
54 BTreeMap::<GlobalId, WriteHandle<SourceData, (), Timestamp, StorageDiff>>::new();
55
56 let gen_upper_future = |mut handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>| {
57 let fut = async move {
58 let current_upper = handle.shared_upper();
59 handle.wait_for_upper_past(¤t_upper).await;
60 let new_upper = handle.shared_upper();
61 (handle, new_upper)
62 };
63
64 fut.boxed()
65 };
66
67 let mut txns_upper_future = {
68 let txns_upper_future = gen_upper_future(txns_handle);
69 txns_upper_future
70 };
71
72 let shutdown_reason = loop {
73 tokio::select! {
74 (handle, upper) = &mut txns_upper_future => {
75 tracing::debug!("new upper from txns shard: {:?}, advancing upper of migrated builtin tables", upper);
76 advance_uppers(&mut write_handles, upper).await;
77
78 let fut = gen_upper_future(handle);
79 txns_upper_future = fut;
80 }
81 cmd = rx.recv() => {
82 let Some(cmd) = cmd else {
83 break "command rx closed".to_string();
84 };
85
86 // Peel off all available commands.
87 // We do this in case we can consolidate commands.
88 // It would be surprising to receive multiple concurrent `Append` commands,
89 // but we might receive multiple *empty* `Append` commands.
90 let mut commands = VecDeque::new();
91 commands.push_back(cmd);
92 while let Ok(cmd) = rx.try_recv() {
93 commands.push_back(cmd);
94 }
95
96 let result = handle_commands(&mut write_handles, commands).await;
97
98 match result {
99 ControlFlow::Continue(_) => {
100 continue;
101 }
102 ControlFlow::Break(msg) => {
103 break msg;
104 }
105 }
106
107 }
108 }
109 };
110
111 tracing::info!(%shutdown_reason, "PersistTableWriteWorker shutting down");
112}
113
114/// Handles the given commands.
115async fn handle_commands(
116 write_handles: &mut BTreeMap<GlobalId, WriteHandle<SourceData, (), Timestamp, StorageDiff>>,
117 mut commands: VecDeque<(Span, PersistTableWriteCmd)>,
118) -> ControlFlow<String> {
119 let mut shutdown = false;
120
121 // Accumulated updates and upper frontier.
122 let mut all_updates = BTreeMap::default();
123 let mut all_responses = Vec::default();
124
125 while let Some((span, command)) = commands.pop_front() {
126 match command {
127 PersistTableWriteCmd::Register(_register_ts, ids_handles, tx) => {
128 for (id, write_handle) in ids_handles {
129 // As of today, we can only migrate builtin (system) tables.
130 assert!(id.is_system(), "trying to register non-system id {id}");
131
132 let previous = write_handles.insert(id, write_handle);
133 if previous.is_some() {
134 panic!("already registered a WriteHandle for collection {:?}", id);
135 }
136 }
137 // We don't care if our waiter has gone away.
138 let _ = tx.send(());
139 }
140 PersistTableWriteCmd::Update {
141 existing_collection,
142 new_collection,
143 handle,
144 forget_ts: _,
145 register_ts: _,
146 tx,
147 } => {
148 write_handles.remove(&existing_collection);
149 write_handles.insert(new_collection, handle).expect(
150 "PersistTableWriteCmd::Update only valid for updating extant write handles",
151 );
152 // We don't care if our waiter has gone away.
153 let _ = tx.send(());
154 }
155 PersistTableWriteCmd::DropHandles {
156 forget_ts: _,
157 ids,
158 tx,
159 } => {
160 // n.b. this should only remove the
161 // handle from the persist worker and
162 // not take any additional action such
163 // as closing the shard it's connected
164 // to because dataflows might still be
165 // using it.
166 for id in ids {
167 write_handles.remove(&id);
168 }
169 // We don't care if our waiter has gone away.
170 let _ = tx.send(());
171 }
172 PersistTableWriteCmd::Append {
173 write_ts,
174 advance_to,
175 updates,
176 tx,
177 } => {
178 let mut ids = BTreeSet::new();
179 for (id, updates_no_ts) in updates {
180 ids.insert(id);
181 let (old_span, updates, _expected_upper, old_new_upper) =
182 all_updates.entry(id).or_insert_with(|| {
183 (
184 span.clone(),
185 Vec::default(),
186 Antichain::from_elem(write_ts),
187 Antichain::from_elem(Timestamp::MIN),
188 )
189 });
190
191 if old_span.id() != span.id() {
192 // Link in any spans for `Append` operations that we
193 // lump together by doing this. This is not ideal,
194 // because we only have a true tracing history for
195 // the "first" span that we process, but it's better
196 // than nothing.
197 old_span.follows_from(span.id());
198 }
199 let updates_with_ts = updates_no_ts.into_iter().flat_map(|x| match x {
200 TableData::Rows(rows) => {
201 let iter = rows.into_iter().map(|(row, diff)| Update {
202 row,
203 timestamp: write_ts,
204 diff,
205 });
206 itertools::Either::Left(iter)
207 }
208 TableData::Batches(_) => {
209 // TODO(cf1): Handle Batches of updates in ReadOnlyTableWorker.
210 mz_ore::soft_panic_or_log!(
211 "handle Batches of updates in the ReadOnlyTableWorker"
212 );
213 itertools::Either::Right(std::iter::empty())
214 }
215 });
216 updates.extend(updates_with_ts);
217 old_new_upper.join_assign(&Antichain::from_elem(advance_to));
218 }
219 all_responses.push((ids, tx));
220 }
221 PersistTableWriteCmd::Shutdown => shutdown = true,
222 }
223 }
224
225 let result = append_work(write_handles, all_updates).await;
226
227 for (ids, response) in all_responses {
228 let result = match &result {
229 Err(bad_ids) => {
230 let filtered: Vec<_> = bad_ids
231 .iter()
232 .filter(|(id, _)| ids.contains(id))
233 .cloned()
234 .map(|(id, current_upper)| InvalidUpper { id, current_upper })
235 .collect();
236 if filtered.is_empty() {
237 Ok(())
238 } else {
239 Err(StorageError::InvalidUppers(filtered))
240 }
241 }
242 Ok(()) => Ok(()),
243 };
244 // It is not an error for the other end to hang up.
245 let _ = response.send(result);
246 }
247
248 if shutdown {
249 ControlFlow::Break("received a shutdown command".to_string())
250 } else {
251 ControlFlow::Continue(())
252 }
253}
254
255/// Advances the upper of all registered tables (which are only the migrated
256/// builtin tables) to the given `upper`.
257async fn advance_uppers(
258 write_handles: &mut BTreeMap<GlobalId, WriteHandle<SourceData, (), Timestamp, StorageDiff>>,
259 upper: Antichain<Timestamp>,
260) {
261 let mut all_updates = BTreeMap::default();
262
263 for (id, write_handle) in write_handles.iter_mut() {
264 // As of today, we can only migrate builtin (system) tables, and so only
265 // expect to register those in this read-only table worker.
266 assert!(id.is_system(), "trying to register non-system id {id}");
267
268 // This business of continually advancing the upper is expensive, but
269 // we're a) only doing it when in read-only mode, and b) only doing it
270 // for each migrated builtin table, of which there usually aren't many.
271 let expected_upper = write_handle.fetch_recent_upper().await.to_owned();
272
273 // Avoid advancing the upper until the coordinator has a chance to back-fill the shard.
274 if expected_upper.elements() == &[Timestamp::MIN] {
275 continue;
276 }
277
278 if PartialOrder::less_equal(&upper, &expected_upper) {
279 // Nothing to do, and append_work doesn't like being called with a
280 // new upper that is less_equal the current upper.
281 continue;
282 }
283
284 all_updates.insert(
285 *id,
286 (Span::none(), Vec::new(), expected_upper, upper.clone()),
287 );
288 }
289
290 let result = append_work(write_handles, all_updates).await;
291 tracing::debug!(?result, "advanced upper of migrated builtin tables");
292}