mz_txn_wal/
txn_read.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Interfaces for reading txn shards as well as data shards.
11
12use std::cmp::Ordering;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::future::Future;
16use std::sync::Arc;
17
18use differential_dataflow::difference::Semigroup;
19use differential_dataflow::lattice::Lattice;
20use futures::Stream;
21use mz_ore::instrument;
22use mz_ore::task::AbortOnDropHandle;
23use mz_persist_client::cfg::USE_CRITICAL_SINCE_TXN;
24use mz_persist_client::critical::SinceHandle;
25use mz_persist_client::read::{Cursor, LazyPartStats, ListenEvent, ReadHandle, Since, Subscribe};
26use mz_persist_client::stats::{SnapshotPartsStats, SnapshotStats};
27use mz_persist_client::write::WriteHandle;
28use mz_persist_client::{Diagnostics, PersistClient, ShardId};
29use mz_persist_types::txn::{TxnsCodec, TxnsEntry};
30use mz_persist_types::{Codec, Codec64, Opaque, StepForward};
31use timely::order::TotalOrder;
32use timely::progress::{Antichain, Timestamp};
33use tokio::sync::{mpsc, oneshot};
34use tracing::{debug, warn};
35use uuid::Uuid;
36
37use crate::TxnsCodecDefault;
38use crate::txn_cache::{TxnsCache, TxnsCacheState};
39
40/// A token exchangeable for a data shard snapshot.
41///
42/// - Invariant: `latest_write <= as_of < empty_to`
43/// - Invariant: `(latest_write, empty_to)` and `(as_of, empty_to)` have no
44///   unapplied writes (which means we can do an empty CaA of those times if we
45///   like).
46#[derive(Debug)]
47#[cfg_attr(test, derive(PartialEq))]
48pub struct DataSnapshot<T> {
49    /// The id of the data shard this snapshot is for.
50    pub(crate) data_id: ShardId,
51    /// The latest possibly unapplied write <= as_of. None if there are no
52    /// writes via txns or if they're all known to be applied.
53    pub(crate) latest_write: Option<T>,
54    /// The as_of asked for.
55    pub(crate) as_of: T,
56    /// Some timestamp s.t. [as_of, empty_to) is known to be empty of
57    /// unapplied writes via txns.
58    pub(crate) empty_to: T,
59}
60
61impl<T: Timestamp + Lattice + TotalOrder + Codec64 + Sync> DataSnapshot<T> {
62    /// Unblocks reading a snapshot at `self.as_of` by waiting for the latest
63    /// write before that time and then running an empty CaA if necessary.
64    #[instrument(level = "debug", fields(shard = %self.data_id, ts = ?self.as_of, empty_to = ?self.empty_to))]
65    pub(crate) async fn unblock_read<K, V, D>(&self, mut data_write: WriteHandle<K, V, T, D>)
66    where
67        K: Debug + Codec,
68        V: Debug + Codec,
69        D: Debug + Semigroup + Ord + Codec64 + Send + Sync,
70    {
71        debug!(
72            "unblock_read latest_write={:?} as_of={:?} for {:.9}",
73            self.latest_write,
74            self.as_of,
75            self.data_id.to_string()
76        );
77        // First block until the latest write has been applied.
78        if let Some(latest_write) = self.latest_write.as_ref() {
79            let () = data_write
80                .wait_for_upper_past(&Antichain::from_elem(latest_write.clone()))
81                .await;
82        }
83
84        // Now fill `(latest_write, as_of]` with empty updates, so we can read
85        // the shard at as_of normally. In practice, because CaA takes an
86        // exclusive upper, we actually fill `(latest_write, empty_to)`.
87        //
88        // It's quite counter-intuitive for reads to involve writes, but I think
89        // this is fine. In particular, because writing empty updates to a
90        // persist shard is a metadata-only operation. It might result in things
91        // like GC maintenance or a CRDB write, but this is also true for
92        // registering a reader. On the balance, I think this is a _much_ better
93        // set of tradeoffs than the original plan of trying to translate read
94        // timestamps to the most recent write and reading that.
95        let Some(mut data_upper) = data_write.shared_upper().into_option() else {
96            // If the upper is the empty antichain, then we've unblocked all
97            // possible reads. Return early.
98            debug!(
99                "CaA data snapshot {:.9} shard finalized",
100                self.data_id.to_string(),
101            );
102            return;
103        };
104
105        // TODO(jkosh44) We should not be writing to unregistered shards, but
106        // we haven't checked to see if this was registered at `self.empty_to`.
107        // See https://github.com/MaterializeInc/database-issues/issues/8022.
108        while data_upper < self.empty_to {
109            // It would be very bad if we accidentally filled any times <=
110            // latest_write with empty updates, so defensively assert on each
111            // iteration through the loop.
112            if let Some(latest_write) = self.latest_write.as_ref() {
113                assert!(latest_write < &data_upper);
114            }
115            assert!(self.as_of < self.empty_to);
116            let res = crate::small_caa(
117                || format!("data {:.9} unblock reads", self.data_id.to_string()),
118                &mut data_write,
119                &[],
120                data_upper.clone(),
121                self.empty_to.clone(),
122            )
123            .await;
124            match res {
125                Ok(()) => {
126                    // Persist registers writers on the first write, so politely
127                    // expire the writer we just created, but (as a performance
128                    // optimization) only if we actually wrote something.
129                    data_write.expire().await;
130                    break;
131                }
132                Err(new_data_upper) => {
133                    data_upper = new_data_upper;
134                    continue;
135                }
136            }
137        }
138    }
139
140    /// See [ReadHandle::snapshot_and_fetch].
141    pub async fn snapshot_and_fetch<K, V, D>(
142        &self,
143        data_read: &mut ReadHandle<K, V, T, D>,
144    ) -> Result<Vec<((Result<K, String>, Result<V, String>), T, D)>, Since<T>>
145    where
146        K: Debug + Codec + Ord,
147        V: Debug + Codec + Ord,
148        D: Debug + Semigroup + Ord + Codec64 + Send + Sync,
149    {
150        let data_write = WriteHandle::from_read(data_read, "unblock_read");
151        self.unblock_read(data_write).await;
152        data_read
153            .snapshot_and_fetch(Antichain::from_elem(self.as_of.clone()))
154            .await
155    }
156
157    /// See [ReadHandle::snapshot_cursor].
158    pub async fn snapshot_cursor<K, V, D>(
159        &self,
160        data_read: &mut ReadHandle<K, V, T, D>,
161        should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
162    ) -> Result<Cursor<K, V, T, D>, Since<T>>
163    where
164        K: Debug + Codec + Ord,
165        V: Debug + Codec + Ord,
166        D: Debug + Semigroup + Ord + Codec64 + Send + Sync,
167    {
168        let data_write = WriteHandle::from_read(data_read, "unblock_read");
169        self.unblock_read(data_write).await;
170        data_read
171            .snapshot_cursor(Antichain::from_elem(self.as_of.clone()), should_fetch_part)
172            .await
173    }
174
175    /// See [ReadHandle::snapshot_and_stream].
176    pub async fn snapshot_and_stream<K, V, D>(
177        &self,
178        data_read: &mut ReadHandle<K, V, T, D>,
179    ) -> Result<
180        impl Stream<Item = ((Result<K, String>, Result<V, String>), T, D)> + use<K, V, T, D>,
181        Since<T>,
182    >
183    where
184        K: Debug + Codec + Ord,
185        V: Debug + Codec + Ord,
186        D: Debug + Semigroup + Ord + Codec64 + Send + Sync,
187    {
188        let data_write = WriteHandle::from_read(data_read, "unblock_read");
189        self.unblock_read(data_write).await;
190        data_read
191            .snapshot_and_stream(Antichain::from_elem(self.as_of.clone()))
192            .await
193    }
194
195    /// See [SinceHandle::snapshot_stats].
196    pub fn snapshot_stats_from_critical<K, V, D, O>(
197        &self,
198        data_since: &SinceHandle<K, V, T, D, O>,
199    ) -> impl Future<Output = Result<SnapshotStats, Since<T>>> + Send + 'static
200    where
201        K: Debug + Codec + Ord,
202        V: Debug + Codec + Ord,
203        D: Semigroup + Codec64 + Send + Sync,
204        O: Opaque + Codec64,
205    {
206        // This is used by the optimizer in planning to get cost statistics, so
207        // it can't use `unblock_read`. Instead use the "translated as_of"
208        // trick we invented but didn't end up using for read of the shard
209        // contents. The reason we didn't use it for that was because we'd have
210        // to deal with advancing timestamps of the updates we read, but the
211        // stats we return here don't have that issue.
212        //
213        // TODO: If we don't have a `latest_write`, then the `None` option to
214        // `snapshot_stats` is not quite correct because of pubsub races
215        // (probably marginal) and historical `as_of`s (probably less marginal
216        // but not common in mz right now). Fixing this more precisely in a
217        // performant way (i.e. no crdb queries involved) seems to require that
218        // txn-wal always keep track of the latest write, even when it's
219        // known to have been applied. `snapshot_stats` is an estimate anyway,
220        // it doesn't even attempt to account for things like consolidation, so
221        // this seems fine for now.
222        let as_of = self.latest_write.clone().map(Antichain::from_elem);
223        data_since.snapshot_stats(as_of)
224    }
225
226    /// See [ReadHandle::snapshot_stats].
227    pub fn snapshot_stats_from_leased<K, V, D>(
228        &self,
229        data_since: &ReadHandle<K, V, T, D>,
230    ) -> impl Future<Output = Result<SnapshotStats, Since<T>>> + Send + 'static
231    where
232        K: Debug + Codec + Ord,
233        V: Debug + Codec + Ord,
234        D: Ord + Semigroup + Codec64 + Send + Sync,
235    {
236        // This is used by the optimizer in planning to get cost statistics, so
237        // it can't use `unblock_read`. Instead use the "translated as_of"
238        // trick we invented but didn't end up using for read of the shard
239        // contents. The reason we didn't use it for that was because we'd have
240        // to deal with advancing timestamps of the updates we read, but the
241        // stats we return here don't have that issue.
242        //
243        // TODO: If we don't have a `latest_write`, then the `None` option to
244        // `snapshot_stats` is not quite correct because of pubsub races
245        // (probably marginal) and historical `as_of`s (probably less marginal
246        // but not common in mz right now). Fixing this more precisely in a
247        // performant way (i.e. no crdb queries involved) seems to require that
248        // txn-wal always keep track of the latest write, even when it's
249        // known to have been applied. `snapshot_stats` is an estimate anyway,
250        // it doesn't even attempt to account for things like consolidation, so
251        // this seems fine for now.
252        let as_of = self.latest_write.clone().map(Antichain::from_elem);
253        data_since.snapshot_stats(as_of)
254    }
255
256    /// See [ReadHandle::snapshot_parts_stats].
257    pub async fn snapshot_parts_stats<K, V, D>(
258        &self,
259        data_read: &ReadHandle<K, V, T, D>,
260    ) -> Result<SnapshotPartsStats, Since<T>>
261    where
262        K: Debug + Codec + Ord,
263        V: Debug + Codec + Ord,
264        D: Debug + Semigroup + Ord + Codec64 + Send + Sync,
265    {
266        let data_write = WriteHandle::from_read(data_read, "unblock_read");
267        self.unblock_read(data_write).await;
268        data_read
269            .snapshot_parts_stats(Antichain::from_elem(self.as_of.clone()))
270            .await
271    }
272
273    pub(crate) fn validate(&self) -> Result<(), String> {
274        if let Some(latest_write) = self.latest_write.as_ref() {
275            if !(latest_write <= &self.as_of) {
276                return Err(format!(
277                    "latest_write {:?} not <= as_of {:?}",
278                    self.latest_write, self.as_of
279                ));
280            }
281        }
282        if !(self.as_of < self.empty_to) {
283            return Err(format!(
284                "as_of {:?} not < empty_to {:?}",
285                self.as_of, self.empty_to
286            ));
287        }
288        Ok(())
289    }
290}
291
292/// The next action to take in a data shard `Listen`.
293///
294/// See [crate::txn_cache::TxnsCacheState::data_listen_next].
295#[derive(Debug)]
296#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
297pub enum DataListenNext<T> {
298    /// Read the data shard normally, until this timestamp is less_equal what
299    /// has been read.
300    ReadDataTo(T),
301    /// It is known that there are no writes between the progress given to the
302    /// `data_listen_next` call and this timestamp. Advance the data shard
303    /// listen progress to this (exclusive) frontier.
304    EmitLogicalProgress(T),
305    /// The data shard listen has caught up to what has been written to the txns
306    /// shard. Wait for it to progress with `update_gt` and call
307    /// `data_listen_next` again.
308    WaitForTxnsProgress,
309}
310
311/// A mapping between the physical upper of a data shard and the largest upper
312/// which is known to logically have the same contents.
313///
314/// Said another way, `[physical_upper,logical_upper)` is known to be empty (in
315/// the "definite" sense).
316///
317/// Invariant: physical_upper <= logical_upper
318#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
319pub struct DataRemapEntry<T> {
320    /// The physical upper of a data shard.
321    pub physical_upper: T,
322    /// An upper bound on the times known to be empty of writes via txns since
323    /// `physical_upper`.
324    pub logical_upper: T,
325}
326
327/// Keeps track of a [`DataRemapEntry`] for shard `data_id`.
328#[derive(Debug)]
329pub(crate) struct DataSubscribe<T> {
330    /// The id of the data shard.
331    pub(crate) data_id: ShardId,
332    /// The initial snapshot, used to unblock the reading of the initial
333    /// snapshot.
334    pub(crate) snapshot: Option<DataSnapshot<T>>,
335    /// The physical and logical upper of `data_id`.
336    pub(crate) remap: DataRemapEntry<T>,
337}
338
339/// An active subscription of [`DataRemapEntry`]s for a data shard.
340#[derive(Debug)]
341pub struct DataSubscription<T> {
342    /// Metadata and current [`DataRemapEntry`] for the data shard.
343    subscribe: DataSubscribe<T>,
344    /// Channel to send [`DataRemapEntry`]s.
345    tx: mpsc::UnboundedSender<DataRemapEntry<T>>,
346}
347
348#[async_trait::async_trait]
349pub(crate) trait UnblockRead<T>: Debug + Send {
350    async fn unblock_read(self: Box<Self>, snapshot: DataSnapshot<T>);
351}
352
353#[async_trait::async_trait]
354impl<K, V, T, D> UnblockRead<T> for WriteHandle<K, V, T, D>
355where
356    K: Debug + Codec + Send + Sync,
357    V: Debug + Codec + Send + Sync,
358    T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
359    D: Debug + Semigroup + Ord + Codec64 + Send + Sync,
360{
361    async fn unblock_read(self: Box<Self>, snapshot: DataSnapshot<T>) {
362        snapshot.unblock_read(*self).await;
363    }
364}
365
366/// A shared [TxnsCache] running in a task and communicated with over a channel.
367#[derive(Debug, Clone)]
368pub struct TxnsRead<T> {
369    txns_id: ShardId,
370    tx: mpsc::UnboundedSender<TxnsReadCmd<T>>,
371    _read_task: Arc<AbortOnDropHandle<()>>,
372    _subscribe_task: Arc<AbortOnDropHandle<()>>,
373}
374
375impl<T: Timestamp + Lattice + Codec64 + Sync> TxnsRead<T> {
376    /// Starts the task worker and returns a handle for communicating with it.
377    pub async fn start<C>(client: PersistClient, txns_id: ShardId) -> Self
378    where
379        T: TotalOrder + StepForward,
380        C: TxnsCodec + 'static,
381    {
382        let (tx, rx) = mpsc::unbounded_channel();
383
384        let (mut subscribe_task, cache) =
385            TxnsSubscribeTask::<T, C>::open(&client, txns_id, None, tx.clone()).await;
386
387        let mut task = TxnsReadTask {
388            rx,
389            cache,
390            pending_waits_by_ts: BTreeSet::new(),
391            pending_waits_by_id: BTreeMap::new(),
392            data_subscriptions: Vec::new(),
393        };
394
395        let read_task =
396            mz_ore::task::spawn(|| "txn-wal::read_task", async move { task.run().await });
397
398        let subscribe_task = mz_ore::task::spawn(|| "txn-wal::subscribe_task", async move {
399            subscribe_task.run().await
400        });
401
402        TxnsRead {
403            txns_id,
404            tx,
405            _read_task: Arc::new(read_task.abort_on_drop()),
406            _subscribe_task: Arc::new(subscribe_task.abort_on_drop()),
407        }
408    }
409
410    /// Returns the [ShardId] of the txns shard.
411    pub fn txns_id(&self) -> &ShardId {
412        &self.txns_id
413    }
414
415    /// See [crate::txn_cache::TxnsCacheState::data_snapshot].
416    pub async fn data_snapshot(&self, data_id: ShardId, as_of: T) -> DataSnapshot<T> {
417        self.send(|tx| TxnsReadCmd::DataSnapshot { data_id, as_of, tx })
418            .await
419    }
420
421    /// Initiate a subscription to `data_id`.
422    ///
423    /// Returns a channel that [`DataRemapEntry`]s are sent over.
424    pub(crate) async fn data_subscribe(
425        &self,
426        data_id: ShardId,
427        as_of: T,
428        unblock: Box<dyn UnblockRead<T>>,
429    ) -> mpsc::UnboundedReceiver<DataRemapEntry<T>> {
430        self.send(|tx| TxnsReadCmd::DataSubscribe {
431            data_id,
432            as_of,
433            unblock,
434            tx,
435        })
436        .await
437    }
438
439    /// See [TxnsCache::update_ge].
440    pub async fn update_ge(&self, ts: T) {
441        let wait = WaitTs::GreaterEqual(ts);
442        self.update(wait).await
443    }
444
445    /// See [TxnsCache::update_gt].
446    pub async fn update_gt(&self, ts: T) {
447        let wait = WaitTs::GreaterThan(ts);
448        self.update(wait).await
449    }
450
451    async fn update(&self, wait: WaitTs<T>) {
452        let id = Uuid::new_v4();
453        let res = self.send(|tx| TxnsReadCmd::Wait {
454            id: id.clone(),
455            ts: wait,
456            tx,
457        });
458
459        // We install a drop guard so that we can cancel the wait in case the
460        // future is cancelled/dropped.
461        let mut cancel_guard = CancelWaitOnDrop {
462            id,
463            tx: Some(self.tx.clone()),
464        };
465
466        let res = res.await;
467
468        // We don't have to cancel the wait on drop anymore.
469        cancel_guard.complete();
470
471        res
472    }
473
474    async fn send<R: std::fmt::Debug>(
475        &self,
476        cmd: impl FnOnce(oneshot::Sender<R>) -> TxnsReadCmd<T>,
477    ) -> R {
478        let (tx, rx) = oneshot::channel();
479        let req = cmd(tx);
480        let () = self.tx.send(req).expect("task unexpectedly shut down");
481        rx.await.expect("task unexpectedly shut down")
482    }
483}
484
485/// Cancels an in-flight wait command when dropped, unless the given `tx` is
486/// yanked before that.
487struct CancelWaitOnDrop<T> {
488    id: Uuid,
489    tx: Option<mpsc::UnboundedSender<TxnsReadCmd<T>>>,
490}
491
492impl<T> CancelWaitOnDrop<T> {
493    /// Marks the wait command as complete. This guard will no longer send a
494    /// cancel command when dropped.
495    pub fn complete(&mut self) {
496        self.tx.take();
497    }
498}
499
500impl<T> Drop for CancelWaitOnDrop<T> {
501    fn drop(&mut self) {
502        let tx = match self.tx.take() {
503            Some(tx) => tx,
504            None => {
505                // No need to cancel anymore!
506                return;
507            }
508        };
509
510        let _ = tx.send(TxnsReadCmd::CancelWait {
511            id: self.id.clone(),
512        });
513    }
514}
515
516#[derive(Debug)]
517enum TxnsReadCmd<T> {
518    Updates {
519        entries: Vec<(TxnsEntry, T, i64)>,
520        frontier: T,
521    },
522    DataSnapshot {
523        data_id: ShardId,
524        as_of: T,
525        tx: oneshot::Sender<DataSnapshot<T>>,
526    },
527    DataSubscribe {
528        data_id: ShardId,
529        as_of: T,
530        unblock: Box<dyn UnblockRead<T>>,
531        tx: oneshot::Sender<mpsc::UnboundedReceiver<DataRemapEntry<T>>>,
532    },
533    Wait {
534        id: Uuid,
535        ts: WaitTs<T>,
536        tx: oneshot::Sender<()>,
537    },
538    CancelWait {
539        id: Uuid,
540    },
541}
542
543#[derive(Debug, PartialEq, Eq, Clone)]
544enum WaitTs<T> {
545    GreaterEqual(T),
546    GreaterThan(T),
547}
548
549// Specially made for keeping `WaitTs` in a `BTreeSet` and peeling them off in
550// the order in which they would be retired.
551//
552// [`WaitTs`] with different timestamps are ordered according to their
553// timestamps. For [`WaitTs`] with the same timestamp, we have to order
554// `GreaterEqual` before `GreaterThan`, because those can be retired
555// earlier/they are less "strict" in how far they need the frontier to advance.
556impl<T: Ord> Ord for WaitTs<T> {
557    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
558        let self_ts = match self {
559            WaitTs::GreaterEqual(ts) => ts,
560            WaitTs::GreaterThan(ts) => ts,
561        };
562        let other_ts = match other {
563            WaitTs::GreaterEqual(ts) => ts,
564            WaitTs::GreaterThan(ts) => ts,
565        };
566
567        if self_ts < other_ts {
568            Ordering::Less
569        } else if *self_ts > *other_ts {
570            Ordering::Greater
571        } else if matches!(self, WaitTs::GreaterEqual(_)) && matches!(other, WaitTs::GreaterThan(_))
572        {
573            Ordering::Less
574        } else if matches!(self, WaitTs::GreaterThan(_)) && matches!(other, WaitTs::GreaterEqual(_))
575        {
576            Ordering::Greater
577        } else {
578            Ordering::Equal
579        }
580    }
581}
582
583impl<T: Ord> PartialOrd for WaitTs<T> {
584    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
585        Some(self.cmp(other))
586    }
587}
588
589impl<T: Timestamp + Lattice> WaitTs<T> {
590    /// Returns `true` iff (sic) this [WaitTs] is ready.
591    fn is_ready(&self, frontier: &T) -> bool {
592        match &self {
593            WaitTs::GreaterEqual(ts) => {
594                if frontier >= ts {
595                    return true;
596                }
597            }
598            WaitTs::GreaterThan(ts) => {
599                if frontier > ts {
600                    return true;
601                }
602            }
603        };
604
605        false
606    }
607}
608
609#[derive(Debug)]
610struct TxnsReadTask<T> {
611    rx: mpsc::UnboundedReceiver<TxnsReadCmd<T>>,
612    cache: TxnsCacheState<T>,
613    pending_waits_by_ts: BTreeSet<(WaitTs<T>, Uuid)>,
614    pending_waits_by_id: BTreeMap<Uuid, PendingWait<T>>,
615    data_subscriptions: Vec<DataSubscription<T>>,
616}
617
618/// A pending "wait" notification that we will complete once the frontier
619/// advances far enough.
620#[derive(Debug)]
621struct PendingWait<T> {
622    ts: WaitTs<T>,
623    tx: Option<oneshot::Sender<()>>,
624}
625
626impl<T: Timestamp + Lattice + Codec64> PendingWait<T> {
627    /// Returns `true` if this [PendingWait] is completed.
628    ///
629    /// A pending wait is completed when the frontier advances far enough or
630    /// when the receiver side hangs up.
631    fn maybe_complete(&mut self, frontier: &T) -> bool {
632        if self.tx.is_none() {
633            // Already completed!
634            return true;
635        }
636
637        if self.ts.is_ready(frontier) {
638            let _ = self.tx.take().expect("known to exist").send(());
639            return true;
640        }
641
642        if let Some(tx) = self.tx.as_ref() {
643            if tx.is_closed() {
644                // Receiver dropped, so also complete.
645                self.tx.take();
646                return true;
647            }
648        }
649
650        false
651    }
652}
653
654impl<T> TxnsReadTask<T>
655where
656    T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
657{
658    async fn run(&mut self) {
659        while let Some(cmd) = self.rx.recv().await {
660            match cmd {
661                TxnsReadCmd::Updates { entries, frontier } => {
662                    tracing::trace!(
663                        "updates from subscribe task at ({:?}): {:?}",
664                        frontier,
665                        entries
666                    );
667
668                    self.cache.push_entries(entries.clone(), frontier.clone());
669
670                    self.data_subscriptions
671                        .retain(|subscription| !subscription.tx.is_closed());
672                    for subscription in &mut self.data_subscriptions {
673                        Self::update_subscription(subscription, &self.cache);
674                    }
675
676                    // The frontier has advanced, so respond to waits and retain
677                    // those that still have to wait.
678
679                    loop {
680                        let first_wait = self.pending_waits_by_ts.first();
681
682                        let (wait_ts, id) = match first_wait {
683                            Some(wait) => wait,
684                            None => break,
685                        };
686
687                        let completed = wait_ts.is_ready(&frontier);
688
689                        if completed {
690                            let mut wait = self
691                                .pending_waits_by_id
692                                .remove(id)
693                                .expect("wait must be in map");
694
695                            let really_completed = wait.maybe_complete(&frontier);
696                            assert!(really_completed);
697
698                            self.pending_waits_by_ts.pop_first();
699                        } else {
700                            // All further wait's timestamps are higher. We're
701                            // using a `BTreeSet`, which is ordered!
702                            break;
703                        }
704                    }
705                }
706                TxnsReadCmd::DataSnapshot { data_id, as_of, tx } => {
707                    let res = self.cache.data_snapshot(data_id, as_of.clone());
708                    let _ = tx.send(res);
709                }
710                TxnsReadCmd::DataSubscribe {
711                    data_id,
712                    as_of,
713                    unblock,
714                    tx,
715                } => {
716                    let mut subscribe = self.cache.data_subscribe(data_id, as_of.clone());
717                    if let Some(snapshot) = subscribe.snapshot.take() {
718                        mz_ore::task::spawn(
719                            || "txn-wal::unblock_subscribe",
720                            unblock.unblock_read(snapshot),
721                        );
722                    }
723                    let (sub_tx, sub_rx) = mpsc::unbounded_channel();
724                    // Send the initial remap entry.
725                    sub_tx
726                        .send(subscribe.remap.clone())
727                        .expect("receiver still in scope");
728                    let mut subscription = DataSubscription {
729                        subscribe,
730                        tx: sub_tx,
731                    };
732                    // Fill the subscriber in on the updates from as_of to the current progress.
733                    Self::update_subscription(&mut subscription, &self.cache);
734                    self.data_subscriptions.push(subscription);
735                    let _ = tx.send(sub_rx);
736                }
737                TxnsReadCmd::Wait { id, ts, tx } => {
738                    let mut pending_wait = PendingWait { ts, tx: Some(tx) };
739                    let completed = pending_wait.maybe_complete(&self.cache.progress_exclusive);
740                    if !completed {
741                        let wait_ts = pending_wait.ts.clone();
742                        self.pending_waits_by_ts.insert((wait_ts, id.clone()));
743                        self.pending_waits_by_id.insert(id, pending_wait);
744                    }
745                }
746                TxnsReadCmd::CancelWait { id } => {
747                    // A waiter may have been dropped after a wait completed,
748                    // but before hearing about the completion. In that case
749                    // they will have tried to cancel an already cleaned up
750                    // wait.
751                    if let Some(pending_wait) = self.pending_waits_by_id.remove(&id) {
752                        self.pending_waits_by_ts.remove(&(pending_wait.ts, id));
753                    }
754                }
755            }
756        }
757        warn!("TxnsReadTask shutting down");
758    }
759
760    fn update_subscription(subscription: &mut DataSubscription<T>, cache: &TxnsCacheState<T>) {
761        loop {
762            match cache.data_listen_next(
763                &subscription.subscribe.data_id,
764                &subscription.subscribe.remap.logical_upper,
765            ) {
766                // The data shard got a write!
767                DataListenNext::ReadDataTo(new_upper) => {
768                    // A write means both the physical and logical upper advance.
769                    subscription.subscribe.remap.physical_upper = new_upper.clone();
770                    subscription.subscribe.remap.logical_upper = new_upper.clone();
771                }
772                // We know there are no writes in `[logical_upper,
773                // new_progress)`, so advance our output frontier.
774                DataListenNext::EmitLogicalProgress(new_progress) => {
775                    assert!(subscription.subscribe.remap.physical_upper < new_progress);
776                    assert!(subscription.subscribe.remap.logical_upper < new_progress);
777
778                    subscription.subscribe.remap.logical_upper = new_progress.clone();
779                }
780                // We've caught up to the txns upper, and we have to wait for
781                // more before updates before sending more pairs.
782                DataListenNext::WaitForTxnsProgress => break,
783            };
784            // Not an error if the receiver hung up, they just need be cleaned up at some point.
785            let _ = subscription.tx.send(subscription.subscribe.remap.clone());
786        }
787        assert_eq!(
788            cache.progress_exclusive, subscription.subscribe.remap.logical_upper,
789            "we should update the subscription up to the current progress_exclusive"
790        );
791    }
792}
793
794/// Reads txn updates from a [Subscribe] and forwards them to a [TxnsReadTask]
795/// when receiving a progress update.
796#[derive(Debug)]
797struct TxnsSubscribeTask<T, C: TxnsCodec = TxnsCodecDefault> {
798    txns_subscribe: Subscribe<C::Key, C::Val, T, i64>,
799
800    /// Staged update that we will consume and forward to the [TxnsReadTask]
801    /// when we receive a progress update.
802    buf: Vec<(TxnsEntry, T, i64)>,
803
804    /// For sending updates to the main [TxnsReadTask].
805    tx: mpsc::UnboundedSender<TxnsReadCmd<T>>,
806
807    /// If Some, this cache only tracks the indicated data shard as a
808    /// performance optimization. When used, only some methods (in particular,
809    /// the ones necessary for the txns_progress operator) are supported.
810    ///
811    /// TODO: It'd be nice to make this a compile time thing. I have some ideas,
812    /// but they're decently invasive, so leave it for a followup.
813    only_data_id: Option<ShardId>,
814}
815
816impl<T, C> TxnsSubscribeTask<T, C>
817where
818    T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
819    C: TxnsCodec,
820{
821    /// Creates a [TxnsSubscribeTask] reading from the given txn shard that
822    /// forwards updates (entries and progress) to the given `tx`.
823    ///
824    /// This returns both the created task and a [TxnsCacheState] that can be
825    /// used to interact with the txn system and into which the updates should
826    /// be funneled.
827    ///
828    /// NOTE: We create both the [TxnsSubscribeTask] and the [TxnsCacheState] at
829    /// the same time because the cache is initialized with a `since_ts`, which
830    /// we get from the same [ReadHandle] that we use to initialize the
831    /// [Subscribe].
832    pub async fn open(
833        client: &PersistClient,
834        txns_id: ShardId,
835        only_data_id: Option<ShardId>,
836        tx: mpsc::UnboundedSender<TxnsReadCmd<T>>,
837    ) -> (Self, TxnsCacheState<T>) {
838        let (txns_key_schema, txns_val_schema) = C::schemas();
839        let txns_read: ReadHandle<<C as TxnsCodec>::Key, <C as TxnsCodec>::Val, T, i64> = client
840            .open_leased_reader(
841                txns_id,
842                Arc::new(txns_key_schema),
843                Arc::new(txns_val_schema),
844                Diagnostics {
845                    shard_name: "txns".to_owned(),
846                    handle_purpose: "read txns".to_owned(),
847                },
848                USE_CRITICAL_SINCE_TXN.get(client.dyncfgs()),
849            )
850            .await
851            .expect("txns schema shouldn't change");
852        let (state, txns_subscribe) = TxnsCacheState::init::<C>(only_data_id, txns_read).await;
853        let subscribe_task = TxnsSubscribeTask {
854            txns_subscribe,
855            buf: Vec::new(),
856            tx,
857            only_data_id,
858        };
859
860        (subscribe_task, state)
861    }
862
863    async fn run(&mut self) {
864        loop {
865            let events = self.txns_subscribe.next(None).await;
866            for event in events {
867                match event {
868                    ListenEvent::Progress(frontier) => {
869                        let frontier_ts = frontier
870                            .into_option()
871                            .expect("nothing should close the txns shard");
872                        let entries = std::mem::take(&mut self.buf);
873                        let res = self.tx.send(TxnsReadCmd::Updates {
874                            entries,
875                            frontier: frontier_ts,
876                        });
877                        if let Err(e) = res {
878                            warn!("TxnsSubscribeTask shutting down: {}", e);
879                            return;
880                        }
881                    }
882                    ListenEvent::Updates(parts) => {
883                        TxnsCache::<T, C>::fetch_parts(
884                            self.only_data_id.clone(),
885                            &mut self.txns_subscribe,
886                            parts,
887                            &mut self.buf,
888                        )
889                        .await;
890                    }
891                };
892            }
893        }
894    }
895}
896
897#[cfg(test)]
898mod tests {
899    use super::WaitTs;
900
901    #[mz_ore::test]
902    fn wait_ts_ord() {
903        let mut waits = vec![
904            WaitTs::GreaterThan(3),
905            WaitTs::GreaterThan(2),
906            WaitTs::GreaterEqual(2),
907            WaitTs::GreaterThan(1),
908        ];
909
910        waits.sort();
911
912        let expected = vec![
913            WaitTs::GreaterThan(1),
914            WaitTs::GreaterEqual(2),
915            WaitTs::GreaterThan(2),
916            WaitTs::GreaterThan(3),
917        ];
918
919        assert_eq!(waits, expected);
920    }
921}