Skip to main content

mz_storage_controller/
persist_handles.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A tokio tasks (and support machinery) for dealing with the persist handles
11//! that the storage controller needs to hold.
12
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::fmt::Write;
16use std::sync::Arc;
17
18use differential_dataflow::lattice::Lattice;
19use futures::future::BoxFuture;
20use futures::stream::FuturesUnordered;
21use futures::{FutureExt, StreamExt};
22use itertools::Itertools;
23use mz_ore::tracing::OpenTelemetryContext;
24use mz_persist_client::ShardId;
25use mz_persist_client::write::WriteHandle;
26use mz_persist_types::Codec64;
27use mz_repr::{GlobalId, TimestampManipulation};
28use mz_storage_client::client::{TableData, Update};
29use mz_storage_types::StorageDiff;
30use mz_storage_types::controller::{InvalidUpper, TxnsCodecRow};
31use mz_storage_types::sources::SourceData;
32use mz_txn_wal::txns::{Tidy, TxnsHandle};
33use timely::order::TotalOrder;
34use timely::progress::{Antichain, Timestamp};
35use tokio::sync::mpsc::UnboundedSender;
36use tokio::sync::oneshot;
37use tracing::{Instrument, Span, debug, info_span};
38
39use crate::StorageError;
40
41mod read_only_table_worker;
42
43#[derive(Debug, Clone)]
44pub struct PersistTableWriteWorker<T: Timestamp + Lattice + Codec64 + TimestampManipulation> {
45    inner: Arc<PersistTableWriteWorkerInner<T>>,
46}
47
48/// Commands for [PersistTableWriteWorker].
49#[derive(Debug)]
50enum PersistTableWriteCmd<T: Timestamp + Lattice + Codec64> {
51    Register(
52        T,
53        Vec<(GlobalId, WriteHandle<SourceData, (), T, StorageDiff>)>,
54        tokio::sync::oneshot::Sender<()>,
55    ),
56    Update {
57        /// Existing collection for the table.
58        existing_collection: GlobalId,
59        /// New collection we'll emit writes to.
60        new_collection: GlobalId,
61        /// Timestamp to forget the original handle at.
62        forget_ts: T,
63        /// Timestamp to register the new handle at.
64        register_ts: T,
65        /// New write handle to register.
66        handle: WriteHandle<SourceData, (), T, StorageDiff>,
67        /// Notifies us when the handle has been updated.
68        tx: oneshot::Sender<()>,
69    },
70    DropHandles {
71        forget_ts: T,
72        /// Tables that we want to drop our handle for.
73        ids: Vec<GlobalId>,
74        /// Notifies us when all resources have been cleaned up.
75        tx: oneshot::Sender<()>,
76    },
77    Append {
78        write_ts: T,
79        advance_to: T,
80        updates: Vec<(GlobalId, Vec<TableData>)>,
81        tx: tokio::sync::oneshot::Sender<Result<(), StorageError<T>>>,
82    },
83    Shutdown,
84}
85
86impl<T: Timestamp + Lattice + Codec64> PersistTableWriteCmd<T> {
87    fn name(&self) -> &'static str {
88        match self {
89            PersistTableWriteCmd::Register(_, _, _) => "PersistTableWriteCmd::Register",
90            PersistTableWriteCmd::Update { .. } => "PersistTableWriteCmd::Update",
91            PersistTableWriteCmd::DropHandles { .. } => "PersistTableWriteCmd::DropHandle",
92            PersistTableWriteCmd::Append { .. } => "PersistTableWriteCmd::Append",
93            PersistTableWriteCmd::Shutdown => "PersistTableWriteCmd::Shutdown",
94        }
95    }
96}
97
98async fn append_work<T2: Timestamp + TotalOrder + Lattice + Codec64 + Sync>(
99    write_handles: &mut BTreeMap<GlobalId, WriteHandle<SourceData, (), T2, StorageDiff>>,
100    mut commands: BTreeMap<
101        GlobalId,
102        (tracing::Span, Vec<Update<T2>>, Antichain<T2>, Antichain<T2>),
103    >,
104) -> Result<(), Vec<(GlobalId, Antichain<T2>)>> {
105    let futs = FuturesUnordered::new();
106
107    // We cannot iterate through the updates and then set off a persist call
108    // on the write handle because we cannot mutably borrow the write handle
109    // multiple times.
110    //
111    // Instead, we first group the update by ID above and then iterate
112    // through all available write handles and see if there are any updates
113    // for it. If yes, we send them all in one go.
114    for (id, write) in write_handles.iter_mut() {
115        if let Some((span, updates, expected_upper, new_upper)) = commands.remove(id) {
116            let updates = updates.into_iter().map(|u| {
117                (
118                    (SourceData(Ok(u.row)), ()),
119                    u.timestamp,
120                    u.diff.into_inner(),
121                )
122            });
123
124            futs.push(async move {
125                write
126                    .compare_and_append(updates.clone(), expected_upper.clone(), new_upper.clone())
127                    .instrument(span.clone())
128                    .await
129                    .expect("cannot append updates")
130                    .or_else(|upper_mismatch| Err((*id, upper_mismatch.current)))?;
131
132                Ok::<_, (GlobalId, Antichain<T2>)>((*id, new_upper))
133            })
134        }
135    }
136
137    // Ensure all futures run to completion, and track status of each of them individually
138    let (_new_uppers, failed_appends): (Vec<_>, Vec<_>) = futs
139        .collect::<Vec<_>>()
140        .await
141        .into_iter()
142        .partition_result();
143
144    if failed_appends.is_empty() {
145        Ok(())
146    } else {
147        Err(failed_appends)
148    }
149}
150
151impl<T: Timestamp + Lattice + Codec64 + TimestampManipulation> PersistTableWriteWorker<T> {
152    /// Create a new read-only table worker that continually bumps the upper of
153    /// it's tables. It is expected that we only register migrated builtin
154    /// tables, that cannot yet be registered in the txns system in read-only
155    /// mode.
156    ///
157    /// This takes a [WriteHandle] for the txns shard so that it can follow the
158    /// upper and continually bump the upper of registered tables to follow the
159    /// upper of the txns shard.
160    pub(crate) fn new_read_only_mode(
161        txns_handle: WriteHandle<SourceData, (), T, StorageDiff>,
162    ) -> Self {
163        let (tx, rx) =
164            tokio::sync::mpsc::unbounded_channel::<(tracing::Span, PersistTableWriteCmd<T>)>();
165        mz_ore::task::spawn(
166            || "PersistTableWriteWorker",
167            read_only_table_worker::read_only_mode_table_worker(rx, txns_handle),
168        );
169        Self {
170            inner: Arc::new(PersistTableWriteWorkerInner::new(tx)),
171        }
172    }
173
174    pub(crate) fn new_txns(txns: TxnsHandle<SourceData, (), T, StorageDiff, TxnsCodecRow>) -> Self {
175        let (tx, rx) =
176            tokio::sync::mpsc::unbounded_channel::<(tracing::Span, PersistTableWriteCmd<T>)>();
177        mz_ore::task::spawn(|| "PersistTableWriteWorker", async move {
178            let mut worker = TxnsTableWorker {
179                txns,
180                write_handles: BTreeMap::new(),
181                tidy: Tidy::default(),
182            };
183            worker.run(rx).await
184        });
185        Self {
186            inner: Arc::new(PersistTableWriteWorkerInner::new(tx)),
187        }
188    }
189
190    pub(crate) fn register(
191        &self,
192        register_ts: T,
193        ids_handles: Vec<(GlobalId, WriteHandle<SourceData, (), T, StorageDiff>)>,
194    ) -> tokio::sync::oneshot::Receiver<()> {
195        // We expect this to be awaited, so keep the span connected.
196        let span = info_span!("PersistTableWriteCmd::Register");
197        let (tx, rx) = tokio::sync::oneshot::channel();
198        let cmd = PersistTableWriteCmd::Register(register_ts, ids_handles, tx);
199        self.inner.send_with_span(span, cmd);
200        rx
201    }
202
203    /// Update the existing write handle associated with `id` to `write_handle`.
204    ///
205    /// Note that this should only be called when updating a write handle; to
206    /// initially associate an `id` to a write handle, use [`Self::register`].
207    ///
208    /// # Panics
209    /// - If `id` is not currently associated with any write handle.
210    #[allow(dead_code)]
211    pub(crate) fn update(
212        &self,
213        existing_collection: GlobalId,
214        new_collection: GlobalId,
215        forget_ts: T,
216        register_ts: T,
217        handle: WriteHandle<SourceData, (), T, StorageDiff>,
218    ) -> oneshot::Receiver<()> {
219        let (tx, rx) = oneshot::channel();
220        self.send(PersistTableWriteCmd::Update {
221            existing_collection,
222            new_collection,
223            forget_ts,
224            register_ts,
225            handle,
226            tx,
227        });
228        rx
229    }
230
231    pub(crate) fn append(
232        &self,
233        write_ts: T,
234        advance_to: T,
235        updates: Vec<(GlobalId, Vec<TableData>)>,
236    ) -> tokio::sync::oneshot::Receiver<Result<(), StorageError<T>>> {
237        let (tx, rx) = tokio::sync::oneshot::channel();
238        if updates.is_empty() {
239            tx.send(Ok(()))
240                .expect("rx has not been dropped at this point");
241            rx
242        } else {
243            self.send(PersistTableWriteCmd::Append {
244                write_ts,
245                advance_to,
246                updates,
247                tx,
248            });
249            rx
250        }
251    }
252
253    /// Drops the handles associated with `ids` from this worker.
254    ///
255    /// Note that this does not perform any other cleanup, such as finalizing
256    /// the handle's shard.
257    pub(crate) fn drop_handles(&self, ids: Vec<GlobalId>, forget_ts: T) -> BoxFuture<'static, ()> {
258        let (tx, rx) = oneshot::channel();
259        self.send(PersistTableWriteCmd::DropHandles { forget_ts, ids, tx });
260        Box::pin(rx.map(|_| ()))
261    }
262
263    fn send(&self, cmd: PersistTableWriteCmd<T>) {
264        self.inner.send(cmd);
265    }
266}
267
268struct TxnsTableWorker<T: Timestamp + Lattice + TotalOrder + Codec64> {
269    txns: TxnsHandle<SourceData, (), T, StorageDiff, TxnsCodecRow>,
270    write_handles: BTreeMap<GlobalId, ShardId>,
271    tidy: Tidy,
272}
273
274impl<T: Timestamp + Lattice + Codec64 + TimestampManipulation> TxnsTableWorker<T> {
275    async fn run(
276        &mut self,
277        mut rx: tokio::sync::mpsc::UnboundedReceiver<(Span, PersistTableWriteCmd<T>)>,
278    ) {
279        while let Some((span, command)) = rx.recv().await {
280            match command {
281                PersistTableWriteCmd::Register(register_ts, ids_handles, tx) => {
282                    self.register(register_ts, ids_handles)
283                        .instrument(span)
284                        .await;
285                    // We don't care if our waiter has gone away.
286                    let _ = tx.send(());
287                }
288                PersistTableWriteCmd::Update {
289                    existing_collection,
290                    new_collection,
291                    forget_ts,
292                    register_ts,
293                    handle,
294                    tx,
295                } => {
296                    async {
297                        self.drop_handles(vec![existing_collection], forget_ts)
298                            .await;
299                        self.register(register_ts, vec![(new_collection, handle)])
300                            .await;
301                    }
302                    .instrument(span)
303                    .await;
304                    // We don't care if our waiter has gone away.
305                    let _ = tx.send(());
306                }
307                PersistTableWriteCmd::DropHandles { forget_ts, ids, tx } => {
308                    self.drop_handles(ids, forget_ts).instrument(span).await;
309                    // We don't care if our waiter has gone away.
310                    let _ = tx.send(());
311                }
312                PersistTableWriteCmd::Append {
313                    write_ts,
314                    advance_to,
315                    updates,
316                    tx,
317                } => {
318                    self.append(write_ts, advance_to, updates, tx)
319                        .instrument(span)
320                        .await
321                }
322                PersistTableWriteCmd::Shutdown => {
323                    tracing::info!("PersistTableWriteWorker shutting down via command");
324                    return;
325                }
326            }
327        }
328
329        tracing::info!("PersistTableWriteWorker shutting down via input exhaustion");
330    }
331
332    async fn register(
333        &mut self,
334        register_ts: T,
335        mut ids_handles: Vec<(GlobalId, WriteHandle<SourceData, (), T, StorageDiff>)>,
336    ) {
337        // As tables evolve (e.g. columns are added) we treat the older versions as
338        // "views" on the later versions. While it's not required, it's easier to reason
339        // about table registration if we do it in GlobalId order.
340        ids_handles.sort_unstable_by_key(|(gid, _handle)| *gid);
341
342        for (id, write_handle) in ids_handles.iter() {
343            debug!(
344                "tables register {} {:.9}",
345                id,
346                write_handle.shard_id().to_string()
347            );
348            let previous = self.write_handles.insert(*id, write_handle.shard_id());
349            if previous.is_some() {
350                panic!("already registered a WriteHandle for collection {:?}", id);
351            }
352        }
353
354        // Registering also advances the logical upper of all shards in the txns set.
355        let new_ids = ids_handles.iter().map(|(id, _)| *id).collect_vec();
356        let handles = ids_handles.into_iter().map(|(_, handle)| handle);
357        let res = self.txns.register(register_ts.clone(), handles).await;
358        match res {
359            Ok(tidy) => {
360                self.tidy.merge(tidy);
361            }
362            Err(current) => {
363                panic!(
364                    "cannot register {:?} at {:?} because txns is at {:?}",
365                    new_ids, register_ts, current
366                );
367            }
368        }
369    }
370
371    async fn drop_handles(&mut self, ids: Vec<GlobalId>, forget_ts: T) {
372        tracing::info!(?ids, "drop tables");
373        let data_ids = ids
374            .iter()
375            // n.b. this should only remove the handle from the persist
376            // worker and not take any additional action such as closing
377            // the shard it's connected to because dataflows might still
378            // be using it.
379            .filter_map(|id| self.write_handles.remove(id))
380            .collect::<BTreeSet<_>>();
381        if !data_ids.is_empty() {
382            match self.txns.forget(forget_ts.clone(), data_ids.clone()).await {
383                Ok(tidy) => {
384                    self.tidy.merge(tidy);
385                }
386                Err(current) => {
387                    panic!(
388                        "cannot forget {:?} at {:?} because txns is at {:?}",
389                        ids, forget_ts, current
390                    );
391                }
392            }
393        }
394    }
395
396    async fn append(
397        &mut self,
398        write_ts: T,
399        advance_to: T,
400        updates: Vec<(GlobalId, Vec<TableData>)>,
401        tx: tokio::sync::oneshot::Sender<Result<(), StorageError<T>>>,
402    ) {
403        debug!(
404            "tables append timestamp={:?} advance_to={:?} len={} ids={:?}{}",
405            write_ts,
406            advance_to,
407            updates.iter().flat_map(|(_, x)| x).count(),
408            updates
409                .iter()
410                .map(|(x, _)| x.to_string())
411                .collect::<BTreeSet<_>>(),
412            updates.iter().filter(|(_, v)| !v.is_empty()).fold(
413                String::new(),
414                |mut output, (k, v)| {
415                    let _ = write!(output, "\n  {}: {:?}", k, v.first());
416                    output
417                }
418            )
419        );
420        // TODO: txn-wal doesn't take an advance_to yet, it uses
421        // timestamp.step_forward. This is the same in all cases, so just assert that
422        // for now. Note that this uses the _persist_ StepForward, not the
423        // TimestampManipulation one (the impls are the same) because that's what
424        // txn-wal uses.
425        assert_eq!(
426            advance_to,
427            mz_persist_types::StepForward::step_forward(&write_ts)
428        );
429
430        let mut txn = self.txns.begin();
431        for (id, updates) in updates {
432            let Some(data_id) = self.write_handles.get(&id) else {
433                // HACK: When creating a table we get an append that includes it
434                // before it's been registered. When this happens there are no
435                // updates, so it's ~fine to ignore it.
436                assert!(
437                    updates.iter().all(|u| u.is_empty()),
438                    "{}: {:?}",
439                    id,
440                    updates
441                );
442                continue;
443            };
444            for update in updates {
445                match update {
446                    TableData::Rows(updates) => {
447                        for (row, diff) in updates {
448                            let () = txn
449                                .write(data_id, SourceData(Ok(row)), (), diff.into_inner())
450                                .await;
451                        }
452                    }
453                    TableData::Batches(batches) => {
454                        for batch in batches {
455                            let () = txn.write_batch(data_id, batch);
456                        }
457                    }
458                }
459            }
460        }
461        // Sneak in any txns shard tidying from previous commits.
462        txn.tidy(std::mem::take(&mut self.tidy));
463        let txn_res = txn.commit_at(&mut self.txns, write_ts.clone()).await;
464        let response = match txn_res {
465            Ok(apply) => {
466                // TODO: Do the applying in a background task. This will be a
467                // significant INSERT latency performance win.
468                debug!("applying {:?}", apply);
469                let tidy = apply.apply(&mut self.txns).await;
470                self.tidy.merge(tidy);
471
472                // We don't serve any reads out of this TxnsHandle, so go ahead
473                // and compact as aggressively as we can (i.e. to the time we
474                // just wrote).
475                let () = self.txns.compact_to(write_ts).await;
476
477                Ok(())
478            }
479            Err(current) => {
480                self.tidy.merge(txn.take_tidy());
481                debug!(
482                    "unable to commit txn at {:?} current={:?}",
483                    write_ts, current
484                );
485                Err(StorageError::InvalidUppers(
486                    self.write_handles
487                        .keys()
488                        .copied()
489                        .map(|id| InvalidUpper {
490                            id,
491                            current_upper: Antichain::from_elem(current.clone()),
492                        })
493                        .collect(),
494                ))
495            }
496        };
497        // It is not an error for the other end to hang up.
498        let _ = tx.send(response);
499    }
500}
501
502/// Contains the components necessary for sending commands to a `PersistTableWriteWorker`.
503///
504/// When `Drop`-ed sends a shutdown command, as such this should _never_ implement `Clone` because
505/// if one clone is dropped, the other clones will be unable to send commands. If you need this
506/// to be `Clone`-able, wrap it in an `Arc` or `Rc` first.
507///
508/// #[derive(Clone)] <-- do not do this.
509///
510#[derive(Debug)]
511struct PersistTableWriteWorkerInner<T: Timestamp + Lattice + Codec64 + TimestampManipulation> {
512    /// Sending side of a channel that we can use to send commands.
513    tx: UnboundedSender<(tracing::Span, PersistTableWriteCmd<T>)>,
514}
515
516impl<T> Drop for PersistTableWriteWorkerInner<T>
517where
518    T: Timestamp + Lattice + Codec64 + TimestampManipulation,
519{
520    fn drop(&mut self) {
521        self.send(PersistTableWriteCmd::Shutdown);
522        // TODO: Can't easily block on shutdown occurring.
523    }
524}
525
526impl<T> PersistTableWriteWorkerInner<T>
527where
528    T: Timestamp + Lattice + Codec64 + TimestampManipulation,
529{
530    fn new(tx: UnboundedSender<(tracing::Span, PersistTableWriteCmd<T>)>) -> Self {
531        PersistTableWriteWorkerInner { tx }
532    }
533
534    fn send(&self, cmd: PersistTableWriteCmd<T>) {
535        let span =
536            info_span!(parent: None, "PersistTableWriteWorkerInner::send", otel.name = cmd.name());
537        OpenTelemetryContext::obtain().attach_as_parent_to(&span);
538        self.send_with_span(span, cmd)
539    }
540
541    fn send_with_span(&self, span: Span, cmd: PersistTableWriteCmd<T>) {
542        match self.tx.send((span, cmd)) {
543            Ok(()) => (), // All good!
544            Err(e) => {
545                tracing::trace!("could not forward command: {:?}", e);
546            }
547        }
548    }
549}