Skip to main content

mz_storage_controller/
persist_handles.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A tokio tasks (and support machinery) for dealing with the persist handles
11//! that the storage controller needs to hold.
12
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::fmt::Write;
16use std::sync::Arc;
17
18use futures::future::BoxFuture;
19use futures::stream::FuturesUnordered;
20use futures::{FutureExt, StreamExt};
21use itertools::Itertools;
22use mz_ore::tracing::OpenTelemetryContext;
23use mz_persist_client::ShardId;
24use mz_persist_client::write::WriteHandle;
25use mz_repr::{GlobalId, Timestamp};
26use mz_storage_client::client::{TableData, Update};
27use mz_storage_types::StorageDiff;
28use mz_storage_types::controller::{InvalidUpper, TxnsCodecRow};
29use mz_storage_types::sources::SourceData;
30use mz_txn_wal::txns::{Tidy, TxnsHandle};
31use timely::progress::Antichain;
32use tokio::sync::mpsc::UnboundedSender;
33use tokio::sync::oneshot;
34use tracing::{Instrument, Span, debug, info_span};
35
36use crate::StorageError;
37
38mod read_only_table_worker;
39
40#[derive(Debug, Clone)]
41pub struct PersistTableWriteWorker {
42    inner: Arc<PersistTableWriteWorkerInner>,
43}
44
45/// Commands for [PersistTableWriteWorker].
46#[derive(Debug)]
47enum PersistTableWriteCmd {
48    Register(
49        Timestamp,
50        Vec<(
51            GlobalId,
52            WriteHandle<SourceData, (), Timestamp, StorageDiff>,
53        )>,
54        tokio::sync::oneshot::Sender<()>,
55    ),
56    Update {
57        /// Existing collection for the table.
58        existing_collection: GlobalId,
59        /// New collection we'll emit writes to.
60        new_collection: GlobalId,
61        /// Timestamp to forget the original handle at.
62        forget_ts: Timestamp,
63        /// Timestamp to register the new handle at.
64        register_ts: Timestamp,
65        /// New write handle to register.
66        handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
67        /// Notifies us when the handle has been updated.
68        tx: oneshot::Sender<()>,
69    },
70    DropHandles {
71        forget_ts: Timestamp,
72        /// Tables that we want to drop our handle for.
73        ids: Vec<GlobalId>,
74        /// Notifies us when all resources have been cleaned up.
75        tx: oneshot::Sender<()>,
76    },
77    Append {
78        write_ts: Timestamp,
79        advance_to: Timestamp,
80        updates: Vec<(GlobalId, Vec<TableData>)>,
81        tx: tokio::sync::oneshot::Sender<Result<(), StorageError>>,
82    },
83    Shutdown,
84}
85
86impl PersistTableWriteCmd {
87    fn name(&self) -> &'static str {
88        match self {
89            PersistTableWriteCmd::Register(_, _, _) => "PersistTableWriteCmd::Register",
90            PersistTableWriteCmd::Update { .. } => "PersistTableWriteCmd::Update",
91            PersistTableWriteCmd::DropHandles { .. } => "PersistTableWriteCmd::DropHandle",
92            PersistTableWriteCmd::Append { .. } => "PersistTableWriteCmd::Append",
93            PersistTableWriteCmd::Shutdown => "PersistTableWriteCmd::Shutdown",
94        }
95    }
96}
97
98async fn append_work(
99    write_handles: &mut BTreeMap<GlobalId, WriteHandle<SourceData, (), Timestamp, StorageDiff>>,
100    mut commands: BTreeMap<
101        GlobalId,
102        (
103            tracing::Span,
104            Vec<Update>,
105            Antichain<Timestamp>,
106            Antichain<Timestamp>,
107        ),
108    >,
109) -> Result<(), Vec<(GlobalId, Antichain<Timestamp>)>> {
110    let futs = FuturesUnordered::new();
111
112    // We cannot iterate through the updates and then set off a persist call
113    // on the write handle because we cannot mutably borrow the write handle
114    // multiple times.
115    //
116    // Instead, we first group the update by ID above and then iterate
117    // through all available write handles and see if there are any updates
118    // for it. If yes, we send them all in one go.
119    for (id, write) in write_handles.iter_mut() {
120        if let Some((span, updates, expected_upper, new_upper)) = commands.remove(id) {
121            let updates = updates.into_iter().map(|u| {
122                (
123                    (SourceData(Ok(u.row)), ()),
124                    u.timestamp,
125                    u.diff.into_inner(),
126                )
127            });
128
129            futs.push(async move {
130                write
131                    .compare_and_append(updates.clone(), expected_upper.clone(), new_upper.clone())
132                    .instrument(span.clone())
133                    .await
134                    .expect("cannot append updates")
135                    .or_else(|upper_mismatch| Err((*id, upper_mismatch.current)))?;
136
137                Ok::<_, (GlobalId, Antichain<Timestamp>)>((*id, new_upper))
138            })
139        }
140    }
141
142    // Ensure all futures run to completion, and track status of each of them individually
143    let (_new_uppers, failed_appends): (Vec<_>, Vec<_>) = futs
144        .collect::<Vec<_>>()
145        .await
146        .into_iter()
147        .partition_result();
148
149    if failed_appends.is_empty() {
150        Ok(())
151    } else {
152        Err(failed_appends)
153    }
154}
155
156impl PersistTableWriteWorker {
157    /// Create a new read-only table worker that continually bumps the upper of
158    /// it's tables. It is expected that we only register migrated builtin
159    /// tables, that cannot yet be registered in the txns system in read-only
160    /// mode.
161    ///
162    /// This takes a [WriteHandle] for the txns shard so that it can follow the
163    /// upper and continually bump the upper of registered tables to follow the
164    /// upper of the txns shard.
165    pub(crate) fn new_read_only_mode(
166        txns_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
167    ) -> Self {
168        let (tx, rx) =
169            tokio::sync::mpsc::unbounded_channel::<(tracing::Span, PersistTableWriteCmd)>();
170        mz_ore::task::spawn(
171            || "PersistTableWriteWorker",
172            read_only_table_worker::read_only_mode_table_worker(rx, txns_handle),
173        );
174        Self {
175            inner: Arc::new(PersistTableWriteWorkerInner::new(tx)),
176        }
177    }
178
179    pub(crate) fn new_txns(
180        txns: TxnsHandle<SourceData, (), Timestamp, StorageDiff, TxnsCodecRow>,
181    ) -> Self {
182        let (tx, rx) =
183            tokio::sync::mpsc::unbounded_channel::<(tracing::Span, PersistTableWriteCmd)>();
184        mz_ore::task::spawn(|| "PersistTableWriteWorker", async move {
185            let mut worker = TxnsTableWorker {
186                txns,
187                write_handles: BTreeMap::new(),
188                tidy: Tidy::default(),
189            };
190            worker.run(rx).await
191        });
192        Self {
193            inner: Arc::new(PersistTableWriteWorkerInner::new(tx)),
194        }
195    }
196
197    pub(crate) fn register(
198        &self,
199        register_ts: Timestamp,
200        ids_handles: Vec<(
201            GlobalId,
202            WriteHandle<SourceData, (), Timestamp, StorageDiff>,
203        )>,
204    ) -> tokio::sync::oneshot::Receiver<()> {
205        // We expect this to be awaited, so keep the span connected.
206        let span = info_span!("PersistTableWriteCmd::Register");
207        let (tx, rx) = tokio::sync::oneshot::channel();
208        let cmd = PersistTableWriteCmd::Register(register_ts, ids_handles, tx);
209        self.inner.send_with_span(span, cmd);
210        rx
211    }
212
213    /// Update the existing write handle associated with `id` to `write_handle`.
214    ///
215    /// Note that this should only be called when updating a write handle; to
216    /// initially associate an `id` to a write handle, use [`Self::register`].
217    ///
218    /// # Panics
219    /// - If `id` is not currently associated with any write handle.
220    #[allow(dead_code)]
221    pub(crate) fn update(
222        &self,
223        existing_collection: GlobalId,
224        new_collection: GlobalId,
225        forget_ts: Timestamp,
226        register_ts: Timestamp,
227        handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
228    ) -> oneshot::Receiver<()> {
229        let (tx, rx) = oneshot::channel();
230        self.send(PersistTableWriteCmd::Update {
231            existing_collection,
232            new_collection,
233            forget_ts,
234            register_ts,
235            handle,
236            tx,
237        });
238        rx
239    }
240
241    pub(crate) fn append(
242        &self,
243        write_ts: Timestamp,
244        advance_to: Timestamp,
245        updates: Vec<(GlobalId, Vec<TableData>)>,
246    ) -> tokio::sync::oneshot::Receiver<Result<(), StorageError>> {
247        let (tx, rx) = tokio::sync::oneshot::channel();
248        if updates.is_empty() {
249            tx.send(Ok(()))
250                .expect("rx has not been dropped at this point");
251            rx
252        } else {
253            self.send(PersistTableWriteCmd::Append {
254                write_ts,
255                advance_to,
256                updates,
257                tx,
258            });
259            rx
260        }
261    }
262
263    /// Drops the handles associated with `ids` from this worker.
264    ///
265    /// Note that this does not perform any other cleanup, such as finalizing
266    /// the handle's shard.
267    pub(crate) fn drop_handles(
268        &self,
269        ids: Vec<GlobalId>,
270        forget_ts: Timestamp,
271    ) -> BoxFuture<'static, ()> {
272        let (tx, rx) = oneshot::channel();
273        self.send(PersistTableWriteCmd::DropHandles { forget_ts, ids, tx });
274        Box::pin(rx.map(|_| ()))
275    }
276
277    fn send(&self, cmd: PersistTableWriteCmd) {
278        self.inner.send(cmd);
279    }
280}
281
282struct TxnsTableWorker {
283    txns: TxnsHandle<SourceData, (), Timestamp, StorageDiff, TxnsCodecRow>,
284    write_handles: BTreeMap<GlobalId, ShardId>,
285    tidy: Tidy,
286}
287
288impl TxnsTableWorker {
289    async fn run(
290        &mut self,
291        mut rx: tokio::sync::mpsc::UnboundedReceiver<(Span, PersistTableWriteCmd)>,
292    ) {
293        while let Some((span, command)) = rx.recv().await {
294            match command {
295                PersistTableWriteCmd::Register(register_ts, ids_handles, tx) => {
296                    self.register(register_ts, ids_handles)
297                        .instrument(span)
298                        .await;
299                    // We don't care if our waiter has gone away.
300                    let _ = tx.send(());
301                }
302                PersistTableWriteCmd::Update {
303                    existing_collection,
304                    new_collection,
305                    forget_ts,
306                    register_ts,
307                    handle,
308                    tx,
309                } => {
310                    async {
311                        self.drop_handles(vec![existing_collection], forget_ts)
312                            .await;
313                        self.register(register_ts, vec![(new_collection, handle)])
314                            .await;
315                    }
316                    .instrument(span)
317                    .await;
318                    // We don't care if our waiter has gone away.
319                    let _ = tx.send(());
320                }
321                PersistTableWriteCmd::DropHandles { forget_ts, ids, tx } => {
322                    self.drop_handles(ids, forget_ts).instrument(span).await;
323                    // We don't care if our waiter has gone away.
324                    let _ = tx.send(());
325                }
326                PersistTableWriteCmd::Append {
327                    write_ts,
328                    advance_to,
329                    updates,
330                    tx,
331                } => {
332                    self.append(write_ts, advance_to, updates, tx)
333                        .instrument(span)
334                        .await
335                }
336                PersistTableWriteCmd::Shutdown => {
337                    tracing::info!("PersistTableWriteWorker shutting down via command");
338                    return;
339                }
340            }
341        }
342
343        tracing::info!("PersistTableWriteWorker shutting down via input exhaustion");
344    }
345
346    async fn register(
347        &mut self,
348        register_ts: Timestamp,
349        mut ids_handles: Vec<(
350            GlobalId,
351            WriteHandle<SourceData, (), Timestamp, StorageDiff>,
352        )>,
353    ) {
354        // As tables evolve (e.g. columns are added) we treat the older versions as
355        // "views" on the later versions. While it's not required, it's easier to reason
356        // about table registration if we do it in GlobalId order.
357        ids_handles.sort_unstable_by_key(|(gid, _handle)| *gid);
358
359        for (id, write_handle) in ids_handles.iter() {
360            debug!(
361                "tables register {} {:.9}",
362                id,
363                write_handle.shard_id().to_string()
364            );
365            let previous = self.write_handles.insert(*id, write_handle.shard_id());
366            if previous.is_some() {
367                panic!("already registered a WriteHandle for collection {:?}", id);
368            }
369        }
370
371        // Registering also advances the logical upper of all shards in the txns set.
372        let new_ids = ids_handles.iter().map(|(id, _)| *id).collect_vec();
373        let handles = ids_handles.into_iter().map(|(_, handle)| handle);
374        let res = self.txns.register(register_ts, handles).await;
375        match res {
376            Ok(tidy) => {
377                self.tidy.merge(tidy);
378            }
379            Err(current) => {
380                panic!(
381                    "cannot register {:?} at {:?} because txns is at {:?}",
382                    new_ids, register_ts, current
383                );
384            }
385        }
386    }
387
388    async fn drop_handles(&mut self, ids: Vec<GlobalId>, forget_ts: Timestamp) {
389        tracing::info!(?ids, "drop tables");
390        let data_ids = ids
391            .iter()
392            // n.b. this should only remove the handle from the persist
393            // worker and not take any additional action such as closing
394            // the shard it's connected to because dataflows might still
395            // be using it.
396            .filter_map(|id| self.write_handles.remove(id))
397            .collect::<BTreeSet<_>>();
398        if !data_ids.is_empty() {
399            match self.txns.forget(forget_ts, data_ids.clone()).await {
400                Ok(tidy) => {
401                    self.tidy.merge(tidy);
402                }
403                Err(current) => {
404                    panic!(
405                        "cannot forget {:?} at {:?} because txns is at {:?}",
406                        ids, forget_ts, current
407                    );
408                }
409            }
410        }
411    }
412
413    async fn append(
414        &mut self,
415        write_ts: Timestamp,
416        advance_to: Timestamp,
417        updates: Vec<(GlobalId, Vec<TableData>)>,
418        tx: tokio::sync::oneshot::Sender<Result<(), StorageError>>,
419    ) {
420        debug!(
421            "tables append timestamp={:?} advance_to={:?} len={} ids={:?}{}",
422            write_ts,
423            advance_to,
424            updates.iter().flat_map(|(_, x)| x).count(),
425            updates
426                .iter()
427                .map(|(x, _)| x.to_string())
428                .collect::<BTreeSet<_>>(),
429            updates.iter().filter(|(_, v)| !v.is_empty()).fold(
430                String::new(),
431                |mut output, (k, v)| {
432                    let _ = write!(output, "\n  {}: {:?}", k, v.first());
433                    output
434                }
435            )
436        );
437        // TODO: txn-wal doesn't take an advance_to yet, it uses
438        // timestamp.step_forward. This is the same in all cases, so just assert that
439        // for now. Note that this uses the _persist_ StepForward, not the
440        // TimestampManipulation one (the impls are the same) because that's what
441        // txn-wal uses.
442        assert_eq!(
443            advance_to,
444            mz_persist_types::StepForward::step_forward(&write_ts)
445        );
446
447        let mut txn = self.txns.begin();
448        for (id, updates) in updates {
449            let Some(data_id) = self.write_handles.get(&id) else {
450                // HACK: When creating a table we get an append that includes it
451                // before it's been registered. When this happens there are no
452                // updates, so it's ~fine to ignore it.
453                assert!(
454                    updates.iter().all(|u| u.is_empty()),
455                    "{}: {:?}",
456                    id,
457                    updates
458                );
459                continue;
460            };
461            for update in updates {
462                match update {
463                    TableData::Rows(updates) => {
464                        for (row, diff) in updates {
465                            let () = txn
466                                .write(data_id, SourceData(Ok(row)), (), diff.into_inner())
467                                .await;
468                        }
469                    }
470                    TableData::Batches(batches) => {
471                        for batch in batches {
472                            let () = txn.write_batch(data_id, batch);
473                        }
474                    }
475                }
476            }
477        }
478        // Sneak in any txns shard tidying from previous commits.
479        txn.tidy(std::mem::take(&mut self.tidy));
480        let txn_res = txn.commit_at(&mut self.txns, write_ts).await;
481        let response = match txn_res {
482            Ok(apply) => {
483                // TODO: Do the applying in a background task. This will be a
484                // significant INSERT latency performance win.
485                debug!("applying {:?}", apply);
486                let tidy = apply.apply(&mut self.txns).await;
487                self.tidy.merge(tidy);
488
489                // We don't serve any reads out of this TxnsHandle, so go ahead
490                // and compact as aggressively as we can (i.e. to the time we
491                // just wrote).
492                let () = self.txns.compact_to(write_ts).await;
493
494                Ok(())
495            }
496            Err(current) => {
497                self.tidy.merge(txn.take_tidy());
498                debug!(
499                    "unable to commit txn at {:?} current={:?}",
500                    write_ts, current
501                );
502                Err(StorageError::InvalidUppers(
503                    self.write_handles
504                        .keys()
505                        .copied()
506                        .map(|id| InvalidUpper {
507                            id,
508                            current_upper: Antichain::from_elem(current),
509                        })
510                        .collect(),
511                ))
512            }
513        };
514        // It is not an error for the other end to hang up.
515        let _ = tx.send(response);
516    }
517}
518
519/// Contains the components necessary for sending commands to a `PersistTableWriteWorker`.
520///
521/// When `Drop`-ed sends a shutdown command, as such this should _never_ implement `Clone` because
522/// if one clone is dropped, the other clones will be unable to send commands. If you need this
523/// to be `Clone`-able, wrap it in an `Arc` or `Rc` first.
524///
525/// #[derive(Clone)] <-- do not do this.
526///
527#[derive(Debug)]
528struct PersistTableWriteWorkerInner {
529    /// Sending side of a channel that we can use to send commands.
530    tx: UnboundedSender<(tracing::Span, PersistTableWriteCmd)>,
531}
532
533impl Drop for PersistTableWriteWorkerInner {
534    fn drop(&mut self) {
535        self.send(PersistTableWriteCmd::Shutdown);
536        // TODO: Can't easily block on shutdown occurring.
537    }
538}
539
540impl PersistTableWriteWorkerInner {
541    fn new(tx: UnboundedSender<(tracing::Span, PersistTableWriteCmd)>) -> Self {
542        PersistTableWriteWorkerInner { tx }
543    }
544
545    fn send(&self, cmd: PersistTableWriteCmd) {
546        let span =
547            info_span!(parent: None, "PersistTableWriteWorkerInner::send", otel.name = cmd.name());
548        OpenTelemetryContext::obtain().attach_as_parent_to(&span);
549        self.send_with_span(span, cmd)
550    }
551
552    fn send_with_span(&self, span: Span, cmd: PersistTableWriteCmd) {
553        match self.tx.send((span, cmd)) {
554            Ok(()) => (), // All good!
555            Err(e) => {
556                tracing::trace!("could not forward command: {:?}", e);
557            }
558        }
559    }
560}