mz_storage_controller/persist_handles/
read_only_table_worker.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A tokio tasks (and support machinery) for dealing with the persist handles
11//! that the storage controller needs to hold.
12
13use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::ops::ControlFlow;
15
16use differential_dataflow::lattice::Lattice;
17use futures::FutureExt;
18use mz_persist_client::write::WriteHandle;
19use mz_persist_types::Codec64;
20use mz_repr::{GlobalId, TimestampManipulation};
21use mz_storage_client::client::{TableData, Update};
22use mz_storage_types::StorageDiff;
23use mz_storage_types::controller::InvalidUpper;
24use mz_storage_types::sources::SourceData;
25use timely::PartialOrder;
26use timely::progress::{Antichain, Timestamp};
27use tracing::Span;
28
29use crate::StorageError;
30use crate::persist_handles::{PersistTableWriteCmd, append_work};
31
32/// Handles table updates in read only mode.
33///
34/// In read only mode, we write to tables outside of the txn-wal system. This is
35/// a gross hack, but it is a quick fix to allow us to perform migrations of the
36/// built-in tables in the new generation during a deployment. We need to write
37/// to the new shards for migrated built-in tables so that dataflows that depend
38/// on those tables can catch up, but we don't want to register them into the
39/// existing txn-wal shard, as that would mutate the state of the old generation
40/// while it's still running. We could instead create a new txn shard in the new
41/// generation for *just* system catalog tables, but then we'd have to do a
42/// complicated dance to move the system catalog tables back to the original txn
43/// shard during promotion, without ever losing track of a shard or registering
44/// it in two txn shards simultaneously.
45///
46/// This code is a nearly line-for-line reintroduction of the code that managed
47/// writing to tables before the txn-wal system. This code can (again) be
48/// deleted when we switch to using native persist schema migrations to perform
49/// mgirations of built-in tables.
50pub(crate) async fn read_only_mode_table_worker<
51    T: Timestamp + Lattice + Codec64 + TimestampManipulation,
52>(
53    mut rx: tokio::sync::mpsc::UnboundedReceiver<(Span, PersistTableWriteCmd<T>)>,
54    txns_handle: WriteHandle<SourceData, (), T, StorageDiff>,
55) {
56    let mut write_handles =
57        BTreeMap::<GlobalId, WriteHandle<SourceData, (), T, StorageDiff>>::new();
58
59    let gen_upper_future = |mut handle: WriteHandle<SourceData, (), T, StorageDiff>| {
60        let fut = async move {
61            let current_upper = handle.shared_upper();
62            handle.wait_for_upper_past(&current_upper).await;
63            let new_upper = handle.shared_upper();
64            (handle, new_upper)
65        };
66
67        fut.boxed()
68    };
69
70    let mut txns_upper_future = {
71        let txns_upper_future = gen_upper_future(txns_handle);
72        txns_upper_future
73    };
74
75    let shutdown_reason = loop {
76        tokio::select! {
77            (handle, upper) = &mut txns_upper_future => {
78                tracing::debug!("new upper from txns shard: {:?}, advancing upper of migrated builtin tables", upper);
79                advance_uppers(&mut write_handles, upper).await;
80
81                let fut = gen_upper_future(handle);
82                txns_upper_future = fut;
83            }
84            cmd = rx.recv() => {
85                let Some(cmd) = cmd else {
86                    break "command rx closed".to_string();
87                };
88
89                // Peel off all available commands.
90                // We do this in case we can consolidate commands.
91                // It would be surprising to receive multiple concurrent `Append` commands,
92                // but we might receive multiple *empty* `Append` commands.
93                let mut commands = VecDeque::new();
94                commands.push_back(cmd);
95                while let Ok(cmd) = rx.try_recv() {
96                    commands.push_back(cmd);
97                }
98
99                let result = handle_commands(&mut write_handles, commands).await;
100
101                match result {
102                    ControlFlow::Continue(_) => {
103                        continue;
104                    }
105                    ControlFlow::Break(msg) => {
106                        break msg;
107                    }
108                }
109
110            }
111        }
112    };
113
114    tracing::info!(%shutdown_reason, "PersistTableWriteWorker shutting down");
115}
116
117/// Handles the given commands.
118async fn handle_commands<T>(
119    write_handles: &mut BTreeMap<GlobalId, WriteHandle<SourceData, (), T, StorageDiff>>,
120    mut commands: VecDeque<(Span, PersistTableWriteCmd<T>)>,
121) -> ControlFlow<String>
122where
123    T: Timestamp + Lattice + Codec64 + TimestampManipulation,
124{
125    let mut shutdown = false;
126
127    // Accumulated updates and upper frontier.
128    let mut all_updates = BTreeMap::default();
129    let mut all_responses = Vec::default();
130
131    while let Some((span, command)) = commands.pop_front() {
132        match command {
133            PersistTableWriteCmd::Register(_register_ts, ids_handles, tx) => {
134                for (id, write_handle) in ids_handles {
135                    // As of today, we can only migrate builtin (system) tables.
136                    assert!(id.is_system(), "trying to register non-system id {id}");
137
138                    let previous = write_handles.insert(id, write_handle);
139                    if previous.is_some() {
140                        panic!("already registered a WriteHandle for collection {:?}", id);
141                    }
142                }
143                // We don't care if our waiter has gone away.
144                let _ = tx.send(());
145            }
146            PersistTableWriteCmd::Update {
147                existing_collection,
148                new_collection,
149                handle,
150                forget_ts: _,
151                register_ts: _,
152                tx,
153            } => {
154                write_handles.remove(&existing_collection);
155                write_handles.insert(new_collection, handle).expect(
156                    "PersistTableWriteCmd::Update only valid for updating extant write handles",
157                );
158                // We don't care if our waiter has gone away.
159                let _ = tx.send(());
160            }
161            PersistTableWriteCmd::DropHandles {
162                forget_ts: _,
163                ids,
164                tx,
165            } => {
166                // n.b. this should only remove the
167                // handle from the persist worker and
168                // not take any additional action such
169                // as closing the shard it's connected
170                // to because dataflows might still be
171                // using it.
172                for id in ids {
173                    write_handles.remove(&id);
174                }
175                // We don't care if our waiter has gone away.
176                let _ = tx.send(());
177            }
178            PersistTableWriteCmd::Append {
179                write_ts,
180                advance_to,
181                updates,
182                tx,
183            } => {
184                let mut ids = BTreeSet::new();
185                for (id, updates_no_ts) in updates {
186                    ids.insert(id);
187                    let (old_span, updates, _expected_upper, old_new_upper) =
188                        all_updates.entry(id).or_insert_with(|| {
189                            (
190                                span.clone(),
191                                Vec::default(),
192                                Antichain::from_elem(write_ts.clone()),
193                                Antichain::from_elem(T::minimum()),
194                            )
195                        });
196
197                    if old_span.id() != span.id() {
198                        // Link in any spans for `Append` operations that we
199                        // lump together by doing this. This is not ideal,
200                        // because we only have a true tracing history for
201                        // the "first" span that we process, but it's better
202                        // than nothing.
203                        old_span.follows_from(span.id());
204                    }
205                    let updates_with_ts = updates_no_ts.into_iter().flat_map(|x| match x {
206                        TableData::Rows(rows) => {
207                            let iter = rows.into_iter().map(|(row, diff)| Update {
208                                row,
209                                timestamp: write_ts.clone(),
210                                diff,
211                            });
212                            itertools::Either::Left(iter)
213                        }
214                        TableData::Batches(_) => {
215                            // TODO(cf1): Handle Batches of updates in ReadOnlyTableWorker.
216                            mz_ore::soft_panic_or_log!(
217                                "handle Batches of updates in the ReadOnlyTableWorker"
218                            );
219                            itertools::Either::Right(std::iter::empty())
220                        }
221                    });
222                    updates.extend(updates_with_ts);
223                    old_new_upper.join_assign(&Antichain::from_elem(advance_to.clone()));
224                }
225                all_responses.push((ids, tx));
226            }
227            PersistTableWriteCmd::Shutdown => shutdown = true,
228        }
229    }
230
231    let result = append_work(write_handles, all_updates).await;
232
233    for (ids, response) in all_responses {
234        let result = match &result {
235            Err(bad_ids) => {
236                let filtered: Vec<_> = bad_ids
237                    .iter()
238                    .filter(|(id, _)| ids.contains(id))
239                    .cloned()
240                    .map(|(id, current_upper)| InvalidUpper { id, current_upper })
241                    .collect();
242                if filtered.is_empty() {
243                    Ok(())
244                } else {
245                    Err(StorageError::InvalidUppers(filtered))
246                }
247            }
248            Ok(()) => Ok(()),
249        };
250        // It is not an error for the other end to hang up.
251        let _ = response.send(result);
252    }
253
254    if shutdown {
255        ControlFlow::Break("received a shutdown command".to_string())
256    } else {
257        ControlFlow::Continue(())
258    }
259}
260
261/// Advances the upper of all registered tables (which are only the migrated
262/// builtin tables) to the given `upper`.
263async fn advance_uppers<T>(
264    write_handles: &mut BTreeMap<GlobalId, WriteHandle<SourceData, (), T, StorageDiff>>,
265    upper: Antichain<T>,
266) where
267    T: Timestamp + Lattice + Codec64 + TimestampManipulation,
268{
269    let mut all_updates = BTreeMap::default();
270
271    for (id, write_handle) in write_handles.iter_mut() {
272        // As of today, we can only migrate builtin (system) tables, and so only
273        // expect to register those in this read-only table worker.
274        assert!(id.is_system(), "trying to register non-system id {id}");
275
276        // This business of continually advancing the upper is expensive, but
277        // we're a) only doing it when in read-only mode, and b) only doing it
278        // for each migrated builtin table, of which there usually aren't many.
279        let expected_upper = write_handle.fetch_recent_upper().await.to_owned();
280
281        // Avoid advancing the upper until the coordinator has a chance to back-fill the shard.
282        if expected_upper.elements() == &[T::minimum()] {
283            continue;
284        }
285
286        if PartialOrder::less_equal(&upper, &expected_upper) {
287            // Nothing to do, and append_work doesn't like being called with a
288            // new upper that is less_equal the current upper.
289            continue;
290        }
291
292        all_updates.insert(
293            *id,
294            (Span::none(), Vec::new(), expected_upper, upper.clone()),
295        );
296    }
297
298    let result = append_work(write_handles, all_updates).await;
299    tracing::debug!(?result, "advanced upper of migrated builtin tables");
300}