mz_storage_controller/
persist_handles.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A tokio tasks (and support machinery) for dealing with the persist handles
11//! that the storage controller needs to hold.
12
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::fmt::Write;
16use std::sync::Arc;
17
18use differential_dataflow::lattice::Lattice;
19use futures::future::BoxFuture;
20use futures::stream::FuturesUnordered;
21use futures::{FutureExt, StreamExt};
22use itertools::Itertools;
23use mz_ore::tracing::OpenTelemetryContext;
24use mz_persist_client::ShardId;
25use mz_persist_client::write::WriteHandle;
26use mz_persist_types::Codec64;
27use mz_repr::{GlobalId, TimestampManipulation};
28use mz_storage_client::client::{TableData, Update};
29use mz_storage_types::StorageDiff;
30use mz_storage_types::controller::{InvalidUpper, TxnsCodecRow};
31use mz_storage_types::sources::SourceData;
32use mz_txn_wal::txns::{Tidy, TxnsHandle};
33use timely::order::TotalOrder;
34use timely::progress::{Antichain, Timestamp};
35use tokio::sync::mpsc::UnboundedSender;
36use tokio::sync::oneshot;
37use tracing::{Instrument, Span, debug, info_span};
38
39use crate::{PersistEpoch, StorageError};
40
41mod read_only_table_worker;
42
43#[derive(Debug, Clone)]
44pub struct PersistTableWriteWorker<T: Timestamp + Lattice + Codec64 + TimestampManipulation> {
45    inner: Arc<PersistTableWriteWorkerInner<T>>,
46}
47
48/// Commands for [PersistTableWriteWorker].
49#[derive(Debug)]
50enum PersistTableWriteCmd<T: Timestamp + Lattice + Codec64> {
51    Register(
52        T,
53        Vec<(GlobalId, WriteHandle<SourceData, (), T, StorageDiff>)>,
54        tokio::sync::oneshot::Sender<()>,
55    ),
56    Update {
57        /// Existing collection for the table.
58        existing_collection: GlobalId,
59        /// New collection we'll emit writes to.
60        new_collection: GlobalId,
61        /// Timestamp to forget the original handle at.
62        forget_ts: T,
63        /// Timestamp to register the new handle at.
64        register_ts: T,
65        /// New write handle to register.
66        handle: WriteHandle<SourceData, (), T, StorageDiff>,
67        /// Notifies us when the handle has been updated.
68        tx: oneshot::Sender<()>,
69    },
70    DropHandles {
71        forget_ts: T,
72        /// Tables that we want to drop our handle for.
73        ids: Vec<GlobalId>,
74        /// Notifies us when all resources have been cleaned up.
75        tx: oneshot::Sender<()>,
76    },
77    Append {
78        write_ts: T,
79        advance_to: T,
80        updates: Vec<(GlobalId, Vec<TableData>)>,
81        tx: tokio::sync::oneshot::Sender<Result<(), StorageError<T>>>,
82    },
83    Shutdown,
84}
85
86impl<T: Timestamp + Lattice + Codec64> PersistTableWriteCmd<T> {
87    fn name(&self) -> &'static str {
88        match self {
89            PersistTableWriteCmd::Register(_, _, _) => "PersistTableWriteCmd::Register",
90            PersistTableWriteCmd::Update { .. } => "PersistTableWriteCmd::Update",
91            PersistTableWriteCmd::DropHandles { .. } => "PersistTableWriteCmd::DropHandle",
92            PersistTableWriteCmd::Append { .. } => "PersistTableWriteCmd::Append",
93            PersistTableWriteCmd::Shutdown => "PersistTableWriteCmd::Shutdown",
94        }
95    }
96}
97
98async fn append_work<T2: Timestamp + Lattice + Codec64 + Sync>(
99    write_handles: &mut BTreeMap<GlobalId, WriteHandle<SourceData, (), T2, StorageDiff>>,
100    mut commands: BTreeMap<
101        GlobalId,
102        (tracing::Span, Vec<Update<T2>>, Antichain<T2>, Antichain<T2>),
103    >,
104) -> Result<(), Vec<(GlobalId, Antichain<T2>)>> {
105    let futs = FuturesUnordered::new();
106
107    // We cannot iterate through the updates and then set off a persist call
108    // on the write handle because we cannot mutably borrow the write handle
109    // multiple times.
110    //
111    // Instead, we first group the update by ID above and then iterate
112    // through all available write handles and see if there are any updates
113    // for it. If yes, we send them all in one go.
114    for (id, write) in write_handles.iter_mut() {
115        if let Some((span, updates, expected_upper, new_upper)) = commands.remove(id) {
116            let updates = updates.into_iter().map(|u| {
117                (
118                    (SourceData(Ok(u.row)), ()),
119                    u.timestamp,
120                    u.diff.into_inner(),
121                )
122            });
123
124            futs.push(async move {
125                write
126                    .compare_and_append(updates.clone(), expected_upper.clone(), new_upper.clone())
127                    .instrument(span.clone())
128                    .await
129                    .expect("cannot append updates")
130                    .or_else(|upper_mismatch| Err((*id, upper_mismatch.current)))?;
131
132                Ok::<_, (GlobalId, Antichain<T2>)>((*id, new_upper))
133            })
134        }
135    }
136
137    // Ensure all futures run to completion, and track status of each of them individually
138    let (_new_uppers, failed_appends): (Vec<_>, Vec<_>) = futs
139        .collect::<Vec<_>>()
140        .await
141        .into_iter()
142        .partition_result();
143
144    if failed_appends.is_empty() {
145        Ok(())
146    } else {
147        Err(failed_appends)
148    }
149}
150
151impl<T: Timestamp + Lattice + Codec64 + TimestampManipulation> PersistTableWriteWorker<T> {
152    /// Create a new read-only table worker that continually bumps the upper of
153    /// it's tables. It is expected that we only register migrated builtin
154    /// tables, that cannot yet be registered in the txns system in read-only
155    /// mode.
156    ///
157    /// This takes a [WriteHandle] for the txns shard so that it can follow the
158    /// upper and continually bump the upper of registered tables to follow the
159    /// upper of the txns shard.
160    pub(crate) fn new_read_only_mode(
161        txns_handle: WriteHandle<SourceData, (), T, StorageDiff>,
162    ) -> Self {
163        let (tx, rx) =
164            tokio::sync::mpsc::unbounded_channel::<(tracing::Span, PersistTableWriteCmd<T>)>();
165        mz_ore::task::spawn(
166            || "PersistTableWriteWorker",
167            read_only_table_worker::read_only_mode_table_worker(rx, txns_handle),
168        );
169        Self {
170            inner: Arc::new(PersistTableWriteWorkerInner::new(tx)),
171        }
172    }
173
174    pub(crate) fn new_txns(
175        txns: TxnsHandle<SourceData, (), T, StorageDiff, PersistEpoch, TxnsCodecRow>,
176    ) -> Self {
177        let (tx, rx) =
178            tokio::sync::mpsc::unbounded_channel::<(tracing::Span, PersistTableWriteCmd<T>)>();
179        mz_ore::task::spawn(|| "PersistTableWriteWorker", async move {
180            let mut worker = TxnsTableWorker {
181                txns,
182                write_handles: BTreeMap::new(),
183                tidy: Tidy::default(),
184            };
185            worker.run(rx).await
186        });
187        Self {
188            inner: Arc::new(PersistTableWriteWorkerInner::new(tx)),
189        }
190    }
191
192    pub(crate) fn register(
193        &self,
194        register_ts: T,
195        ids_handles: Vec<(GlobalId, WriteHandle<SourceData, (), T, StorageDiff>)>,
196    ) -> tokio::sync::oneshot::Receiver<()> {
197        // We expect this to be awaited, so keep the span connected.
198        let span = info_span!("PersistTableWriteCmd::Register");
199        let (tx, rx) = tokio::sync::oneshot::channel();
200        let cmd = PersistTableWriteCmd::Register(register_ts, ids_handles, tx);
201        self.inner.send_with_span(span, cmd);
202        rx
203    }
204
205    /// Update the existing write handle associated with `id` to `write_handle`.
206    ///
207    /// Note that this should only be called when updating a write handle; to
208    /// initially associate an `id` to a write handle, use [`Self::register`].
209    ///
210    /// # Panics
211    /// - If `id` is not currently associated with any write handle.
212    #[allow(dead_code)]
213    pub(crate) fn update(
214        &self,
215        existing_collection: GlobalId,
216        new_collection: GlobalId,
217        forget_ts: T,
218        register_ts: T,
219        handle: WriteHandle<SourceData, (), T, StorageDiff>,
220    ) -> oneshot::Receiver<()> {
221        let (tx, rx) = oneshot::channel();
222        self.send(PersistTableWriteCmd::Update {
223            existing_collection,
224            new_collection,
225            forget_ts,
226            register_ts,
227            handle,
228            tx,
229        });
230        rx
231    }
232
233    pub(crate) fn append(
234        &self,
235        write_ts: T,
236        advance_to: T,
237        updates: Vec<(GlobalId, Vec<TableData>)>,
238    ) -> tokio::sync::oneshot::Receiver<Result<(), StorageError<T>>> {
239        let (tx, rx) = tokio::sync::oneshot::channel();
240        if updates.is_empty() {
241            tx.send(Ok(()))
242                .expect("rx has not been dropped at this point");
243            rx
244        } else {
245            self.send(PersistTableWriteCmd::Append {
246                write_ts,
247                advance_to,
248                updates,
249                tx,
250            });
251            rx
252        }
253    }
254
255    /// Drops the handles associated with `ids` from this worker.
256    ///
257    /// Note that this does not perform any other cleanup, such as finalizing
258    /// the handle's shard.
259    pub(crate) fn drop_handles(&self, ids: Vec<GlobalId>, forget_ts: T) -> BoxFuture<'static, ()> {
260        let (tx, rx) = oneshot::channel();
261        self.send(PersistTableWriteCmd::DropHandles { forget_ts, ids, tx });
262        Box::pin(rx.map(|_| ()))
263    }
264
265    fn send(&self, cmd: PersistTableWriteCmd<T>) {
266        self.inner.send(cmd);
267    }
268}
269
270struct TxnsTableWorker<T: Timestamp + Lattice + TotalOrder + Codec64> {
271    txns: TxnsHandle<SourceData, (), T, StorageDiff, PersistEpoch, TxnsCodecRow>,
272    write_handles: BTreeMap<GlobalId, ShardId>,
273    tidy: Tidy,
274}
275
276impl<T: Timestamp + Lattice + Codec64 + TimestampManipulation> TxnsTableWorker<T> {
277    async fn run(
278        &mut self,
279        mut rx: tokio::sync::mpsc::UnboundedReceiver<(Span, PersistTableWriteCmd<T>)>,
280    ) {
281        while let Some((span, command)) = rx.recv().await {
282            match command {
283                PersistTableWriteCmd::Register(register_ts, ids_handles, tx) => {
284                    self.register(register_ts, ids_handles)
285                        .instrument(span)
286                        .await;
287                    // We don't care if our waiter has gone away.
288                    let _ = tx.send(());
289                }
290                PersistTableWriteCmd::Update {
291                    existing_collection,
292                    new_collection,
293                    forget_ts,
294                    register_ts,
295                    handle,
296                    tx,
297                } => {
298                    async {
299                        self.drop_handles(vec![existing_collection], forget_ts)
300                            .await;
301                        self.register(register_ts, vec![(new_collection, handle)])
302                            .await;
303                    }
304                    .instrument(span)
305                    .await;
306                    // We don't care if our waiter has gone away.
307                    let _ = tx.send(());
308                }
309                PersistTableWriteCmd::DropHandles { forget_ts, ids, tx } => {
310                    self.drop_handles(ids, forget_ts).instrument(span).await;
311                    // We don't care if our waiter has gone away.
312                    let _ = tx.send(());
313                }
314                PersistTableWriteCmd::Append {
315                    write_ts,
316                    advance_to,
317                    updates,
318                    tx,
319                } => {
320                    self.append(write_ts, advance_to, updates, tx)
321                        .instrument(span)
322                        .await
323                }
324                PersistTableWriteCmd::Shutdown => {
325                    tracing::info!("PersistTableWriteWorker shutting down via command");
326                    return;
327                }
328            }
329        }
330
331        tracing::info!("PersistTableWriteWorker shutting down via input exhaustion");
332    }
333
334    async fn register(
335        &mut self,
336        register_ts: T,
337        mut ids_handles: Vec<(GlobalId, WriteHandle<SourceData, (), T, StorageDiff>)>,
338    ) {
339        // As tables evolve (e.g. columns are added) we treat the older versions as
340        // "views" on the later versions. While it's not required, it's easier to reason
341        // about table registration if we do it in GlobalId order.
342        ids_handles.sort_unstable_by_key(|(gid, _handle)| *gid);
343
344        for (id, write_handle) in ids_handles.iter() {
345            debug!(
346                "tables register {} {:.9}",
347                id,
348                write_handle.shard_id().to_string()
349            );
350            let previous = self.write_handles.insert(*id, write_handle.shard_id());
351            if previous.is_some() {
352                panic!("already registered a WriteHandle for collection {:?}", id);
353            }
354        }
355
356        // Registering also advances the logical upper of all shards in the txns set.
357        let new_ids = ids_handles.iter().map(|(id, _)| *id).collect_vec();
358        let handles = ids_handles.into_iter().map(|(_, handle)| handle);
359        let res = self.txns.register(register_ts.clone(), handles).await;
360        match res {
361            Ok(tidy) => {
362                self.tidy.merge(tidy);
363            }
364            Err(current) => {
365                panic!(
366                    "cannot register {:?} at {:?} because txns is at {:?}",
367                    new_ids, register_ts, current
368                );
369            }
370        }
371    }
372
373    async fn drop_handles(&mut self, ids: Vec<GlobalId>, forget_ts: T) {
374        tracing::info!(?ids, "drop tables");
375        let data_ids = ids
376            .iter()
377            // n.b. this should only remove the handle from the persist
378            // worker and not take any additional action such as closing
379            // the shard it's connected to because dataflows might still
380            // be using it.
381            .filter_map(|id| self.write_handles.remove(id))
382            .collect::<BTreeSet<_>>();
383        if !data_ids.is_empty() {
384            match self.txns.forget(forget_ts.clone(), data_ids.clone()).await {
385                Ok(tidy) => {
386                    self.tidy.merge(tidy);
387                }
388                Err(current) => {
389                    panic!(
390                        "cannot forget {:?} at {:?} because txns is at {:?}",
391                        ids, forget_ts, current
392                    );
393                }
394            }
395        }
396    }
397
398    async fn append(
399        &mut self,
400        write_ts: T,
401        advance_to: T,
402        updates: Vec<(GlobalId, Vec<TableData>)>,
403        tx: tokio::sync::oneshot::Sender<Result<(), StorageError<T>>>,
404    ) {
405        debug!(
406            "tables append timestamp={:?} advance_to={:?} len={} ids={:?}{}",
407            write_ts,
408            advance_to,
409            updates.iter().flat_map(|(_, x)| x).count(),
410            updates
411                .iter()
412                .map(|(x, _)| x.to_string())
413                .collect::<BTreeSet<_>>(),
414            updates.iter().filter(|(_, v)| !v.is_empty()).fold(
415                String::new(),
416                |mut output, (k, v)| {
417                    let _ = write!(output, "\n  {}: {:?}", k, v.first());
418                    output
419                }
420            )
421        );
422        // TODO: txn-wal doesn't take an advance_to yet, it uses
423        // timestamp.step_forward. This is the same in all cases, so just assert that
424        // for now. Note that this uses the _persist_ StepForward, not the
425        // TimestampManipulation one (the impls are the same) because that's what
426        // txn-wal uses.
427        assert_eq!(
428            advance_to,
429            mz_persist_types::StepForward::step_forward(&write_ts)
430        );
431
432        let mut txn = self.txns.begin();
433        for (id, updates) in updates {
434            let Some(data_id) = self.write_handles.get(&id) else {
435                // HACK: When creating a table we get an append that includes it
436                // before it's been registered. When this happens there are no
437                // updates, so it's ~fine to ignore it.
438                assert!(
439                    updates.iter().all(|u| u.is_empty()),
440                    "{}: {:?}",
441                    id,
442                    updates
443                );
444                continue;
445            };
446            for update in updates {
447                match update {
448                    TableData::Rows(updates) => {
449                        for (row, diff) in updates {
450                            let () = txn
451                                .write(data_id, SourceData(Ok(row)), (), diff.into_inner())
452                                .await;
453                        }
454                    }
455                    TableData::Batches(batches) => {
456                        for batch in batches {
457                            let () = txn.write_batch(data_id, batch);
458                        }
459                    }
460                }
461            }
462        }
463        // Sneak in any txns shard tidying from previous commits.
464        txn.tidy(std::mem::take(&mut self.tidy));
465        let txn_res = txn.commit_at(&mut self.txns, write_ts.clone()).await;
466        let response = match txn_res {
467            Ok(apply) => {
468                // TODO: Do the applying in a background task. This will be a
469                // significant INSERT latency performance win.
470                debug!("applying {:?}", apply);
471                let tidy = apply.apply(&mut self.txns).await;
472                self.tidy.merge(tidy);
473
474                // We don't serve any reads out of this TxnsHandle, so go ahead
475                // and compact as aggressively as we can (i.e. to the time we
476                // just wrote).
477                let () = self.txns.compact_to(write_ts).await;
478
479                Ok(())
480            }
481            Err(current) => {
482                self.tidy.merge(txn.take_tidy());
483                debug!(
484                    "unable to commit txn at {:?} current={:?}",
485                    write_ts, current
486                );
487                Err(StorageError::InvalidUppers(
488                    self.write_handles
489                        .keys()
490                        .copied()
491                        .map(|id| InvalidUpper {
492                            id,
493                            current_upper: Antichain::from_elem(current.clone()),
494                        })
495                        .collect(),
496                ))
497            }
498        };
499        // It is not an error for the other end to hang up.
500        let _ = tx.send(response);
501    }
502}
503
504/// Contains the components necessary for sending commands to a `PersistTableWriteWorker`.
505///
506/// When `Drop`-ed sends a shutdown command, as such this should _never_ implement `Clone` because
507/// if one clone is dropped, the other clones will be unable to send commands. If you need this
508/// to be `Clone`-able, wrap it in an `Arc` or `Rc` first.
509///
510/// #[derive(Clone)] <-- do not do this.
511///
512#[derive(Debug)]
513struct PersistTableWriteWorkerInner<T: Timestamp + Lattice + Codec64 + TimestampManipulation> {
514    /// Sending side of a channel that we can use to send commands.
515    tx: UnboundedSender<(tracing::Span, PersistTableWriteCmd<T>)>,
516}
517
518impl<T> Drop for PersistTableWriteWorkerInner<T>
519where
520    T: Timestamp + Lattice + Codec64 + TimestampManipulation,
521{
522    fn drop(&mut self) {
523        self.send(PersistTableWriteCmd::Shutdown);
524        // TODO: Can't easily block on shutdown occurring.
525    }
526}
527
528impl<T> PersistTableWriteWorkerInner<T>
529where
530    T: Timestamp + Lattice + Codec64 + TimestampManipulation,
531{
532    fn new(tx: UnboundedSender<(tracing::Span, PersistTableWriteCmd<T>)>) -> Self {
533        PersistTableWriteWorkerInner { tx }
534    }
535
536    fn send(&self, cmd: PersistTableWriteCmd<T>) {
537        let span =
538            info_span!(parent: None, "PersistTableWriteWorkerInner::send", otel.name = cmd.name());
539        OpenTelemetryContext::obtain().attach_as_parent_to(&span);
540        self.send_with_span(span, cmd)
541    }
542
543    fn send_with_span(&self, span: Span, cmd: PersistTableWriteCmd<T>) {
544        match self.tx.send((span, cmd)) {
545            Ok(()) => (), // All good!
546            Err(e) => {
547                tracing::trace!("could not forward command: {:?}", e);
548            }
549        }
550    }
551}