Skip to main content

mz_storage_controller/persist_handles/
read_only_table_worker.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A tokio tasks (and support machinery) for dealing with the persist handles
11//! that the storage controller needs to hold.
12
13use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::ops::ControlFlow;
15
16use differential_dataflow::lattice::Lattice;
17use futures::FutureExt;
18use mz_persist_client::write::WriteHandle;
19use mz_repr::{GlobalId, Timestamp};
20use mz_storage_client::client::{TableData, Update};
21use mz_storage_types::StorageDiff;
22use mz_storage_types::controller::InvalidUpper;
23use mz_storage_types::sources::SourceData;
24use timely::PartialOrder;
25use timely::progress::Antichain;
26use tracing::Span;
27
28use crate::StorageError;
29use crate::persist_handles::{PersistTableWriteCmd, append_work};
30
31/// Handles table updates in read only mode.
32///
33/// In read only mode, we write to tables outside of the txn-wal system. This is
34/// a gross hack, but it is a quick fix to allow us to perform migrations of the
35/// built-in tables in the new generation during a deployment. We need to write
36/// to the new shards for migrated built-in tables so that dataflows that depend
37/// on those tables can catch up, but we don't want to register them into the
38/// existing txn-wal shard, as that would mutate the state of the old generation
39/// while it's still running. We could instead create a new txn shard in the new
40/// generation for *just* system catalog tables, but then we'd have to do a
41/// complicated dance to move the system catalog tables back to the original txn
42/// shard during promotion, without ever losing track of a shard or registering
43/// it in two txn shards simultaneously.
44///
45/// This code is a nearly line-for-line reintroduction of the code that managed
46/// writing to tables before the txn-wal system. This code can (again) be
47/// deleted when we switch to using native persist schema migrations to perform
48/// mgirations of built-in tables.
49pub(crate) async fn read_only_mode_table_worker(
50    mut rx: tokio::sync::mpsc::UnboundedReceiver<(Span, PersistTableWriteCmd)>,
51    txns_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
52) {
53    let mut write_handles =
54        BTreeMap::<GlobalId, WriteHandle<SourceData, (), Timestamp, StorageDiff>>::new();
55
56    let gen_upper_future = |mut handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>| {
57        let fut = async move {
58            let current_upper = handle.shared_upper();
59            handle.wait_for_upper_past(&current_upper).await;
60            let new_upper = handle.shared_upper();
61            (handle, new_upper)
62        };
63
64        fut.boxed()
65    };
66
67    let mut txns_upper_future = {
68        let txns_upper_future = gen_upper_future(txns_handle);
69        txns_upper_future
70    };
71
72    let shutdown_reason = loop {
73        tokio::select! {
74            (handle, upper) = &mut txns_upper_future => {
75                tracing::debug!("new upper from txns shard: {:?}, advancing upper of migrated builtin tables", upper);
76                advance_uppers(&mut write_handles, upper).await;
77
78                let fut = gen_upper_future(handle);
79                txns_upper_future = fut;
80            }
81            cmd = rx.recv() => {
82                let Some(cmd) = cmd else {
83                    break "command rx closed".to_string();
84                };
85
86                // Peel off all available commands.
87                // We do this in case we can consolidate commands.
88                // It would be surprising to receive multiple concurrent `Append` commands,
89                // but we might receive multiple *empty* `Append` commands.
90                let mut commands = VecDeque::new();
91                commands.push_back(cmd);
92                while let Ok(cmd) = rx.try_recv() {
93                    commands.push_back(cmd);
94                }
95
96                let result = handle_commands(&mut write_handles, commands).await;
97
98                match result {
99                    ControlFlow::Continue(_) => {
100                        continue;
101                    }
102                    ControlFlow::Break(msg) => {
103                        break msg;
104                    }
105                }
106
107            }
108        }
109    };
110
111    tracing::info!(%shutdown_reason, "PersistTableWriteWorker shutting down");
112}
113
114/// Handles the given commands.
115async fn handle_commands(
116    write_handles: &mut BTreeMap<GlobalId, WriteHandle<SourceData, (), Timestamp, StorageDiff>>,
117    mut commands: VecDeque<(Span, PersistTableWriteCmd)>,
118) -> ControlFlow<String> {
119    let mut shutdown = false;
120
121    // Accumulated updates and upper frontier.
122    let mut all_updates = BTreeMap::default();
123    let mut all_responses = Vec::default();
124
125    while let Some((span, command)) = commands.pop_front() {
126        match command {
127            PersistTableWriteCmd::Register(_register_ts, ids_handles, tx) => {
128                for (id, write_handle) in ids_handles {
129                    // As of today, we can only migrate builtin (system) tables.
130                    assert!(id.is_system(), "trying to register non-system id {id}");
131
132                    let previous = write_handles.insert(id, write_handle);
133                    if previous.is_some() {
134                        panic!("already registered a WriteHandle for collection {:?}", id);
135                    }
136                }
137                // We don't care if our waiter has gone away.
138                let _ = tx.send(());
139            }
140            PersistTableWriteCmd::Update {
141                existing_collection,
142                new_collection,
143                handle,
144                forget_ts: _,
145                register_ts: _,
146                tx,
147            } => {
148                write_handles.remove(&existing_collection);
149                write_handles.insert(new_collection, handle).expect(
150                    "PersistTableWriteCmd::Update only valid for updating extant write handles",
151                );
152                // We don't care if our waiter has gone away.
153                let _ = tx.send(());
154            }
155            PersistTableWriteCmd::DropHandles {
156                forget_ts: _,
157                ids,
158                tx,
159            } => {
160                // n.b. this should only remove the
161                // handle from the persist worker and
162                // not take any additional action such
163                // as closing the shard it's connected
164                // to because dataflows might still be
165                // using it.
166                for id in ids {
167                    write_handles.remove(&id);
168                }
169                // We don't care if our waiter has gone away.
170                let _ = tx.send(());
171            }
172            PersistTableWriteCmd::Append {
173                write_ts,
174                advance_to,
175                updates,
176                tx,
177            } => {
178                let mut ids = BTreeSet::new();
179                for (id, updates_no_ts) in updates {
180                    ids.insert(id);
181                    let (old_span, updates, _expected_upper, old_new_upper) =
182                        all_updates.entry(id).or_insert_with(|| {
183                            (
184                                span.clone(),
185                                Vec::default(),
186                                Antichain::from_elem(write_ts),
187                                Antichain::from_elem(Timestamp::MIN),
188                            )
189                        });
190
191                    if old_span.id() != span.id() {
192                        // Link in any spans for `Append` operations that we
193                        // lump together by doing this. This is not ideal,
194                        // because we only have a true tracing history for
195                        // the "first" span that we process, but it's better
196                        // than nothing.
197                        old_span.follows_from(span.id());
198                    }
199                    let updates_with_ts = updates_no_ts.into_iter().flat_map(|x| match x {
200                        TableData::Rows(rows) => {
201                            let iter = rows.into_iter().map(|(row, diff)| Update {
202                                row,
203                                timestamp: write_ts,
204                                diff,
205                            });
206                            itertools::Either::Left(iter)
207                        }
208                        TableData::Batches(_) => {
209                            // TODO(cf1): Handle Batches of updates in ReadOnlyTableWorker.
210                            mz_ore::soft_panic_or_log!(
211                                "handle Batches of updates in the ReadOnlyTableWorker"
212                            );
213                            itertools::Either::Right(std::iter::empty())
214                        }
215                    });
216                    updates.extend(updates_with_ts);
217                    old_new_upper.join_assign(&Antichain::from_elem(advance_to));
218                }
219                all_responses.push((ids, tx));
220            }
221            PersistTableWriteCmd::Shutdown => shutdown = true,
222        }
223    }
224
225    let result = append_work(write_handles, all_updates).await;
226
227    for (ids, response) in all_responses {
228        let result = match &result {
229            Err(bad_ids) => {
230                let filtered: Vec<_> = bad_ids
231                    .iter()
232                    .filter(|(id, _)| ids.contains(id))
233                    .cloned()
234                    .map(|(id, current_upper)| InvalidUpper { id, current_upper })
235                    .collect();
236                if filtered.is_empty() {
237                    Ok(())
238                } else {
239                    Err(StorageError::InvalidUppers(filtered))
240                }
241            }
242            Ok(()) => Ok(()),
243        };
244        // It is not an error for the other end to hang up.
245        let _ = response.send(result);
246    }
247
248    if shutdown {
249        ControlFlow::Break("received a shutdown command".to_string())
250    } else {
251        ControlFlow::Continue(())
252    }
253}
254
255/// Advances the upper of all registered tables (which are only the migrated
256/// builtin tables) to the given `upper`.
257async fn advance_uppers(
258    write_handles: &mut BTreeMap<GlobalId, WriteHandle<SourceData, (), Timestamp, StorageDiff>>,
259    upper: Antichain<Timestamp>,
260) {
261    let mut all_updates = BTreeMap::default();
262
263    for (id, write_handle) in write_handles.iter_mut() {
264        // As of today, we can only migrate builtin (system) tables, and so only
265        // expect to register those in this read-only table worker.
266        assert!(id.is_system(), "trying to register non-system id {id}");
267
268        // This business of continually advancing the upper is expensive, but
269        // we're a) only doing it when in read-only mode, and b) only doing it
270        // for each migrated builtin table, of which there usually aren't many.
271        let expected_upper = write_handle.fetch_recent_upper().await.to_owned();
272
273        // Avoid advancing the upper until the coordinator has a chance to back-fill the shard.
274        if expected_upper.elements() == &[Timestamp::MIN] {
275            continue;
276        }
277
278        if PartialOrder::less_equal(&upper, &expected_upper) {
279            // Nothing to do, and append_work doesn't like being called with a
280            // new upper that is less_equal the current upper.
281            continue;
282        }
283
284        all_updates.insert(
285            *id,
286            (Span::none(), Vec::new(), expected_upper, upper.clone()),
287        );
288    }
289
290    let result = append_work(write_handles, all_updates).await;
291    tracing::debug!(?result, "advanced upper of migrated builtin tables");
292}