mz_txn_wal/
txn_cache.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A cache of the txn shard contents.
11
12use std::cmp::{max, min};
13use std::collections::{BTreeMap, VecDeque};
14use std::fmt::Debug;
15use std::ops::{Deref, DerefMut};
16use std::sync::Arc;
17
18use differential_dataflow::hashable::Hashable;
19use differential_dataflow::lattice::Lattice;
20use itertools::Itertools;
21use mz_ore::cast::CastFrom;
22use mz_ore::collections::HashMap;
23use mz_ore::instrument;
24use mz_persist_client::cfg::USE_CRITICAL_SINCE_TXN;
25use mz_persist_client::fetch::LeasedBatchPart;
26use mz_persist_client::metrics::encode_ts_metric;
27use mz_persist_client::read::{ListenEvent, ReadHandle, Subscribe};
28use mz_persist_client::write::WriteHandle;
29use mz_persist_client::{Diagnostics, PersistClient, ShardId};
30use mz_persist_types::txn::{TxnsCodec, TxnsEntry};
31use mz_persist_types::{Codec64, StepForward};
32use timely::order::TotalOrder;
33use timely::progress::{Antichain, Timestamp};
34use tracing::debug;
35
36use crate::TxnsCodecDefault;
37use crate::metrics::Metrics;
38use crate::txn_read::{DataListenNext, DataRemapEntry, DataSnapshot, DataSubscribe};
39
40/// A cache of the txn shard contents, optimized for various in-memory
41/// operations.
42///
43/// # Implementation Details
44///
45/// Reads of data shards are almost as straightforward as writes. A data shard
46/// may be read normally, using snapshots, subscriptions, shard_source, etc,
47/// through the most recent non-empty write. However, the upper of the txns
48/// shard (and thus the logical upper of the data shard) may be arbitrarily far
49/// ahead of the physical upper of the data shard. As a result, we do the
50/// following:
51///
52/// - To take a snapshot of a data shard, the `as_of` is passed through
53///   unchanged if the timestamp of that shard's latest non-empty write is past
54///   it. Otherwise, we know the times between them have no writes and can fill
55///   them with empty updates. Concretely, to read a snapshot as of `T`:
56///   - We read the txns shard contents up through and including `T`, blocking
57///     until the upper passes `T` if necessary.
58///   - We then find, for the requested data shard, the latest non-empty write
59///     at a timestamp `T' <= T`.
60///   - We wait for `T'` to be applied by watching the data shard upper.
61///   - We `compare_and_append` empty updates for `(T', T]`, which is known by
62///     the txn system to not have writes for this shard (otherwise we'd have
63///     picked a different `T'`).
64///   - We read the snapshot at `T` as normal.
65/// - To iterate a listen on a data shard, when writes haven't been read yet
66///   they are passed through unchanged, otherwise if the txns shard indicates
67///   that there are ranges of empty time progress is returned, otherwise
68///   progress to the txns shard will indicate when new information is
69///   available.
70///
71/// Note that all of the above can be determined solely by information in the
72/// txns shard. In particular, non-empty writes are indicated by updates with
73/// positive diffs.
74///
75/// Also note that the above is structured such that it is possible to write a
76/// timely operator with the data shard as an input, passing on all payloads
77/// unchanged and simply manipulating capabilities in response to data and txns
78/// shard progress. See [crate::operator::txns_progress].
79#[derive(Debug)]
80pub struct TxnsCacheState<T> {
81    txns_id: ShardId,
82    /// The since of the txn_shard when this cache was initialized.
83    /// Some writes with a timestamp < than this may have been applied and
84    /// tidied, so this cache has no way of learning about them.
85    ///
86    /// Invariant: never changes.
87    pub(crate) init_ts: T,
88    /// The contents of this cache are updated up to, but not including, this time.
89    pub(crate) progress_exclusive: T,
90
91    next_batch_id: usize,
92    /// The batches needing application as of the current progress.
93    ///
94    /// This is indexed by a "batch id" that is internal to this object because
95    /// timestamps are not unique.
96    ///
97    /// Invariant: Values are sorted by timestamp.
98    pub(crate) unapplied_batches: BTreeMap<usize, (ShardId, Vec<u8>, T)>,
99    /// An index into `unapplied_batches` keyed by the serialized batch.
100    batch_idx: HashMap<Vec<u8>, usize>,
101    /// The times at which each data shard has been written.
102    ///
103    /// Invariant: Contains all unapplied writes and registers.
104    /// Invariant: Contains the latest write and registertaion >= init_ts for all shards.
105    pub(crate) datas: BTreeMap<ShardId, DataTimes<T>>,
106    /// The registers and forgets needing application as of the current progress.
107    ///
108    /// Invariant: Values are sorted by timestamp.
109    pub(crate) unapplied_registers: VecDeque<(ShardId, T)>,
110
111    /// If Some, this cache only tracks the indicated data shard as a
112    /// performance optimization. When used, only some methods (in particular,
113    /// the ones necessary for the txns_progress operator) are supported.
114    ///
115    /// TODO: It'd be nice to make this a compile time thing. I have some ideas,
116    /// but they're decently invasive, so leave it for a followup.
117    only_data_id: Option<ShardId>,
118}
119
120/// A self-updating [TxnsCacheState].
121#[derive(Debug)]
122pub struct TxnsCache<T, C: TxnsCodec = TxnsCodecDefault> {
123    /// A subscribe over the txn shard.
124    pub(crate) txns_subscribe: Subscribe<C::Key, C::Val, T, i64>,
125    /// Pending updates for timestamps that haven't closed.
126    pub(crate) buf: Vec<(TxnsEntry, T, i64)>,
127    state: TxnsCacheState<T>,
128}
129
130impl<T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync> TxnsCacheState<T> {
131    /// Creates a new empty [`TxnsCacheState`].
132    ///
133    /// `init_ts` must be == the critical handle's since of the txn shard.
134    fn new(txns_id: ShardId, init_ts: T, only_data_id: Option<ShardId>) -> Self {
135        TxnsCacheState {
136            txns_id,
137            init_ts,
138            progress_exclusive: T::minimum(),
139            next_batch_id: 0,
140            unapplied_batches: BTreeMap::new(),
141            batch_idx: HashMap::new(),
142            datas: BTreeMap::new(),
143            unapplied_registers: VecDeque::new(),
144            only_data_id,
145        }
146    }
147
148    /// Creates and initializes a new [`TxnsCacheState`].
149    ///
150    /// `txns_read` is a [`ReadHandle`] on the txn shard.
151    pub(crate) async fn init<C: TxnsCodec>(
152        only_data_id: Option<ShardId>,
153        txns_read: ReadHandle<C::Key, C::Val, T, i64>,
154    ) -> (Self, Subscribe<C::Key, C::Val, T, i64>) {
155        let txns_id = txns_read.shard_id();
156        let as_of = txns_read.since().clone();
157        let since_ts = as_of.as_option().expect("txns shard is not closed").clone();
158        let mut txns_subscribe = txns_read
159            .subscribe(as_of)
160            .await
161            .expect("handle holds a capability");
162        let mut state = Self::new(txns_id, since_ts.clone(), only_data_id.clone());
163        let mut buf = Vec::new();
164        // The cache must be updated to `since_ts` to maintain the invariant
165        // that `state.since_ts <= state.progress_exclusive`.
166        TxnsCache::<T, C>::update(
167            &mut state,
168            &mut txns_subscribe,
169            &mut buf,
170            only_data_id,
171            |progress_exclusive| progress_exclusive >= &since_ts,
172        )
173        .await;
174        debug_assert_eq!(state.validate(), Ok(()));
175        (state, txns_subscribe)
176    }
177
178    /// Returns the [ShardId] of the txns shard.
179    pub fn txns_id(&self) -> ShardId {
180        self.txns_id
181    }
182
183    /// Returns whether the data shard was registered to the txns set as of the
184    /// current progress.
185    ///
186    /// Specifically, a data shard is registered if the most recent register
187    /// timestamp is set but the most recent forget timestamp is not set.
188    ///
189    /// This function accepts a timestamp as input, but that timestamp must be
190    /// equal to the progress exclusive, or else the function panics. It mainly
191    /// acts as a way for the caller to think about the logical time at which
192    /// this function executes. Times in the past may have been compacted away,
193    /// and we can't always return an accurate answer. If this function isn't
194    /// sufficient, you can usually find what you're looking for by inspecting
195    /// the times in the most recent registration.
196    pub fn registered_at_progress(&self, data_id: &ShardId, ts: &T) -> bool {
197        self.assert_only_data_id(data_id);
198        assert_eq!(self.progress_exclusive, *ts);
199        let Some(data_times) = self.datas.get(data_id) else {
200            return false;
201        };
202        data_times.last_reg().forget_ts.is_none()
203    }
204
205    /// Returns the set of all data shards registered to the txns set as of the
206    /// current progress. See [Self::registered_at_progress].
207    pub(crate) fn all_registered_at_progress(&self, ts: &T) -> Vec<ShardId> {
208        assert_eq!(self.only_data_id, None);
209        assert_eq!(self.progress_exclusive, *ts);
210        self.datas
211            .iter()
212            .filter(|(_, data_times)| data_times.last_reg().forget_ts.is_none())
213            .map(|(data_id, _)| *data_id)
214            .collect()
215    }
216
217    /// Returns a token exchangeable for a snapshot of a data shard.
218    ///
219    /// A data shard might be definite at times past the physical upper because
220    /// of invariants maintained by this txn system. As a result, this method
221    /// discovers the latest potentially unapplied write before the `as_of`.
222    ///
223    /// Callers must first wait for [`TxnsCache::update_gt`] with the same or
224    /// later timestamp to return. Panics otherwise.
225    pub fn data_snapshot(&self, data_id: ShardId, as_of: T) -> DataSnapshot<T> {
226        self.assert_only_data_id(&data_id);
227        assert!(self.progress_exclusive > as_of);
228        // `empty_to` will often be used as the input to `data_listen_next`.
229        // `data_listen_next` needs a timestamp that is greater than or equal
230        // to the init_ts. See the comment above the assert in
231        // `data_listen_next` for more details.
232        //
233        // TODO: Once the txn shard itself always tracks the most recent write
234        // for every shard, we can remove this and always use
235        // `as_of.step_forward()`.
236        let empty_to = max(as_of.step_forward(), self.init_ts.clone());
237        let Some(all) = self.datas.get(&data_id) else {
238            // Not registered currently, so we know there are no unapplied
239            // writes.
240            return DataSnapshot {
241                data_id,
242                latest_write: None,
243                as_of,
244                empty_to,
245            };
246        };
247
248        let min_unapplied_ts = self
249            .unapplied_batches
250            .first_key_value()
251            .map(|(_, (_, _, ts))| ts)
252            .unwrap_or(&self.progress_exclusive);
253        let latest_write = all
254            .writes
255            .iter()
256            .rev()
257            .find(|x| **x <= as_of && *x >= min_unapplied_ts)
258            .cloned();
259
260        debug!(
261            "data_snapshot {:.9} latest_write={:?} as_of={:?} empty_to={:?}: all={:?}",
262            data_id.to_string(),
263            latest_write,
264            as_of,
265            empty_to,
266            all,
267        );
268        let ret = DataSnapshot {
269            data_id: data_id.clone(),
270            latest_write,
271            as_of,
272            empty_to,
273        };
274        assert_eq!(ret.validate(), Ok(()));
275        ret
276    }
277
278    // TODO(jkosh44) This method can likely be simplified to return
279    // DataRemapEntry directly.
280    /// Returns the next action to take when iterating a Listen on a data shard.
281    ///
282    /// A data shard Listen is executed by repeatedly calling this method with
283    /// an exclusive progress frontier. The returned value indicates an action
284    /// to take. Some of these actions advance the progress frontier, which
285    /// results in calling this method again with a higher timestamp, and thus a
286    /// new action. See [DataListenNext] for specifications of the actions.
287    ///
288    /// Note that this is a state machine on `self.progress_exclusive` and the
289    /// listen progress. DataListenNext indicates which state transitions to
290    /// take.
291    pub fn data_listen_next(&self, data_id: &ShardId, ts: &T) -> DataListenNext<T> {
292        self.assert_only_data_id(data_id);
293        assert!(
294            &self.progress_exclusive >= ts,
295            "ts {:?} is past progress_exclusive {:?}",
296            ts,
297            self.progress_exclusive
298        );
299        // There may be applied and tidied writes before the init_ts that the
300        // cache is unaware of. So if this method is called with a timestamp
301        // less than the initial since, it may mistakenly tell the caller to
302        // `EmitLogicalProgress(self.progress_exclusive)` instead of the
303        // correct answer of `ReadTo(tidied_write_ts)`.
304        //
305        // We know for a fact that there are no unapplied writes, registers, or
306        // forgets before the init_ts because the since of the txn shard is
307        // always held back to the earliest unapplied event. There may be some
308        // untidied events with a lower timestamp than the init_ts, but they
309        // are guaranteed to be applied.
310        //
311        // TODO: Once the txn shard itself always tracks the most recent write
312        // for every shard, we can remove this assert. It will always be
313        // correct to return ReadTo(latest_write_ts) if there are any writes,
314        // and then `EmitLogicalProgress(self.progress_exclusive)`.
315        assert!(
316            ts >= &self.init_ts,
317            "ts {:?} is not past initial since {:?}",
318            ts,
319            self.init_ts
320        );
321        use DataListenNext::*;
322        let data_times = self.datas.get(data_id);
323        debug!(
324            "data_listen_next {:.9} {:?}: progress={:?} times={:?}",
325            data_id.to_string(),
326            ts,
327            self.progress_exclusive,
328            data_times,
329        );
330        let Some(data_times) = data_times else {
331            // Not registered, maybe it will be in the future? In the meantime,
332            // treat it like a normal shard (i.e. pass through reads) and check
333            // again later.
334            if ts < &self.progress_exclusive {
335                return ReadDataTo(self.progress_exclusive.clone());
336            } else {
337                return WaitForTxnsProgress;
338            }
339        };
340        let physical_ts = data_times.latest_physical_ts();
341        let last_reg = data_times.last_reg();
342        if ts >= &self.progress_exclusive {
343            // All caught up, we have to wait.
344            WaitForTxnsProgress
345        } else if ts <= physical_ts {
346            // There was some physical write, so read up to that time.
347            ReadDataTo(physical_ts.step_forward())
348        } else if last_reg.forget_ts.is_none() {
349            // Emitting logical progress at the wrong time is a correctness bug,
350            // so be extra defensive about the necessary conditions: the most
351            // recent registration is still active, and we're in it.
352            assert!(last_reg.contains(ts));
353            EmitLogicalProgress(self.progress_exclusive.clone())
354        } else {
355            // The most recent forget is set, which means it's not registered as of
356            // the latest information we have. Read to the current progress point
357            // normally.
358
359            assert!(ts > &last_reg.register_ts && last_reg.forget_ts.is_some());
360            ReadDataTo(self.progress_exclusive.clone())
361        }
362    }
363
364    /// Returns a token exchangeable for a subscribe of a data shard.
365    ///
366    /// Callers must first wait for [`TxnsCache::update_gt`] with the same or
367    /// later timestamp to return. Panics otherwise.
368    pub(crate) fn data_subscribe(&self, data_id: ShardId, as_of: T) -> DataSubscribe<T> {
369        self.assert_only_data_id(&data_id);
370        assert!(self.progress_exclusive > as_of);
371        let snapshot = self.data_snapshot(data_id, as_of);
372        let remap = DataRemapEntry {
373            physical_upper: snapshot.empty_to.clone(),
374            logical_upper: snapshot.empty_to.clone(),
375        };
376        DataSubscribe {
377            data_id,
378            snapshot: Some(snapshot),
379            remap,
380        }
381    }
382
383    /// Returns the minimum timestamp not known to be applied by this cache.
384    pub fn min_unapplied_ts(&self) -> &T {
385        assert_eq!(self.only_data_id, None);
386        self.min_unapplied_ts_inner()
387    }
388
389    fn min_unapplied_ts_inner(&self) -> &T {
390        // We maintain an invariant that the values in the unapplied_batches map
391        // are sorted by timestamp, thus the first one must be the minimum.
392        let min_batch_ts = self
393            .unapplied_batches
394            .first_key_value()
395            .map(|(_, (_, _, ts))| ts)
396            // If we don't have any known unapplied batches, then the next
397            // timestamp that could be written must potentially have an
398            // unapplied batch.
399            .unwrap_or(&self.progress_exclusive);
400        let min_register_ts = self
401            .unapplied_registers
402            .front()
403            .map(|(_, ts)| ts)
404            .unwrap_or(&self.progress_exclusive);
405
406        min(min_batch_ts, min_register_ts)
407    }
408
409    /// Returns the operations needing application as of the current progress.
410    pub(crate) fn unapplied(&self) -> impl Iterator<Item = (&ShardId, Unapplied, &T)> {
411        assert_eq!(self.only_data_id, None);
412        let registers = self
413            .unapplied_registers
414            .iter()
415            .map(|(data_id, ts)| (data_id, Unapplied::RegisterForget, ts));
416        let batches = self
417            .unapplied_batches
418            .values()
419            .fold(
420                BTreeMap::new(),
421                |mut accum: BTreeMap<_, Vec<_>>, (data_id, batch, ts)| {
422                    accum.entry((ts, data_id)).or_default().push(batch);
423                    accum
424                },
425            )
426            .into_iter()
427            .map(|((ts, data_id), batches)| (data_id, Unapplied::Batch(batches), ts));
428        // This will emit registers and forgets before batches at the same timestamp. Currently,
429        // this is fine because for a single data shard you can't combine registers, forgets, and
430        // batches at the same timestamp. In the future if we allow combining these operations in
431        // a single op, then we probably want to emit registers, then batches, then forgets or we
432        // can make forget exclusive in which case we'd emit it before batches.
433        registers.merge_by(batches, |(_, _, ts1), (_, _, ts2)| ts1 <= ts2)
434    }
435
436    /// Filters out retractions known to have made it into the txns shard.
437    ///
438    /// This is called with a set of things that are known to have been applied
439    /// and in preparation for retracting them. The caller will attempt to
440    /// retract everything not filtered out by this method in a CaA with an
441    /// expected upper of `expected_txns_upper`. So, we catch up to that point,
442    /// and keep everything that is still outstanding. If the CaA fails with an
443    /// expected upper mismatch, then it must call this method again on the next
444    /// attempt with the new expected upper (new retractions may have made it
445    /// into the txns shard in the meantime).
446    ///
447    /// Callers must first wait for [`TxnsCache::update_ge`] with the same or
448    /// later timestamp to return. Panics otherwise.
449    pub(crate) fn filter_retractions<'a>(
450        &'a self,
451        expected_txns_upper: &T,
452        retractions: impl Iterator<Item = (&'a Vec<u8>, &'a ([u8; 8], ShardId))>,
453    ) -> impl Iterator<Item = (&'a Vec<u8>, &'a ([u8; 8], ShardId))> {
454        assert_eq!(self.only_data_id, None);
455        assert!(&self.progress_exclusive >= expected_txns_upper);
456        retractions.filter(|(batch_raw, _)| self.batch_idx.contains_key(*batch_raw))
457    }
458
459    /// Update contents with `entries` and mark this cache as progressed up to `progress`.
460    pub(crate) fn push_entries(&mut self, mut entries: Vec<(TxnsEntry, T, i64)>, progress: T) {
461        // Persist emits the times sorted by little endian encoding,
462        // which is not what we want. If we ever expose an interface for
463        // registering and committing to a data shard at the same
464        // timestamp, this will also have to sort registrations first.
465        entries.sort_by(|(a, _, _), (b, _, _)| a.ts::<T>().cmp(&b.ts::<T>()));
466        for (e, t, d) in entries {
467            match e {
468                TxnsEntry::Register(data_id, ts) => {
469                    let ts = T::decode(ts);
470                    debug_assert!(ts <= t);
471                    self.push_register(data_id, ts, d, t);
472                }
473                TxnsEntry::Append(data_id, ts, batch) => {
474                    let ts = T::decode(ts);
475                    debug_assert!(ts <= t);
476                    self.push_append(data_id, batch, ts, d)
477                }
478            }
479        }
480        self.progress_exclusive = progress;
481        debug_assert_eq!(self.validate(), Ok(()));
482    }
483
484    fn push_register(&mut self, data_id: ShardId, ts: T, diff: i64, compacted_ts: T) {
485        self.assert_only_data_id(&data_id);
486        // Since we keep the original non-advanced timestamp around, retractions
487        // necessarily might be for times in the past, so `|| diff < 0`.
488        debug_assert!(ts >= self.progress_exclusive || diff < 0);
489        if let Some(only_data_id) = self.only_data_id.as_ref() {
490            if only_data_id != &data_id {
491                return;
492            }
493        }
494
495        // The shard has not compacted past the register/forget ts, so it may not have been applied.
496        if ts == compacted_ts {
497            self.unapplied_registers.push_back((data_id, ts.clone()));
498        }
499
500        if diff == 1 {
501            debug!(
502                "cache learned {:.9} registered t={:?}",
503                data_id.to_string(),
504                ts
505            );
506            let entry = self.datas.entry(data_id).or_default();
507            // Sanity check that if there is a registration, then we've closed
508            // it off.
509            if let Some(last_reg) = entry.registered.back() {
510                assert!(last_reg.forget_ts.is_some())
511            }
512            entry.registered.push_back(DataRegistered {
513                register_ts: ts,
514                forget_ts: None,
515            });
516        } else if diff == -1 {
517            debug!(
518                "cache learned {:.9} forgotten t={:?}",
519                data_id.to_string(),
520                ts
521            );
522            let active_reg = self
523                .datas
524                .get_mut(&data_id)
525                .and_then(|x| x.registered.back_mut())
526                .expect("data shard should be registered before forget");
527            assert_eq!(active_reg.forget_ts.replace(ts), None);
528        } else {
529            unreachable!("only +1/-1 diffs are used");
530        }
531        debug_assert_eq!(self.validate(), Ok(()));
532    }
533
534    fn push_append(&mut self, data_id: ShardId, batch: Vec<u8>, ts: T, diff: i64) {
535        self.assert_only_data_id(&data_id);
536        // Since we keep the original non-advanced timestamp around, retractions
537        // necessarily might be for times in the past, so `|| diff < 0`.
538        debug_assert!(ts >= self.progress_exclusive || diff < 0);
539        if let Some(only_data_id) = self.only_data_id.as_ref() {
540            if only_data_id != &data_id {
541                return;
542            }
543        }
544
545        if diff == 1 {
546            debug!(
547                "cache learned {:.9} committed t={:?} b={}",
548                data_id.to_string(),
549                ts,
550                batch.hashed(),
551            );
552            let idx = self.next_batch_id;
553            self.next_batch_id += 1;
554            let prev = self.batch_idx.insert(batch.clone(), idx);
555            assert_eq!(prev, None);
556            let prev = self
557                .unapplied_batches
558                .insert(idx, (data_id, batch, ts.clone()));
559            assert_eq!(prev, None);
560            let times = self.datas.get_mut(&data_id).expect("data is initialized");
561            // Sanity check that shard is registered.
562            assert_eq!(times.last_reg().forget_ts, None);
563
564            // Only add the timestamp if it's not already in the deque. We don't
565            // track all writes in this but track at what timestamps we have
566            // _any_ writes.
567            if times.writes.back() != Some(&ts) {
568                times.writes.push_back(ts);
569            }
570        } else if diff == -1 {
571            debug!(
572                "cache learned {:.9} applied t={:?} b={}",
573                data_id.to_string(),
574                ts,
575                batch.hashed(),
576            );
577            let idx = self
578                .batch_idx
579                .remove(&batch)
580                .expect("invariant violation: batch should exist");
581            let prev = self
582                .unapplied_batches
583                .remove(&idx)
584                .expect("invariant violation: batch index should exist");
585            debug_assert_eq!(data_id, prev.0);
586            debug_assert_eq!(batch, prev.1);
587            // Insertion timestamp should be less equal retraction timestamp.
588            debug_assert!(prev.2 <= ts);
589        } else {
590            unreachable!("only +1/-1 diffs are used");
591        }
592        self.compact_data_times(&data_id);
593        debug_assert_eq!(self.validate(), Ok(()));
594    }
595
596    /// Informs the cache that all registers and forgets less than ts have been
597    /// applied.
598    pub(crate) fn mark_register_applied(&mut self, ts: &T) {
599        self.unapplied_registers
600            .retain(|(_, register_ts)| ts < register_ts);
601        debug_assert_eq!(self.validate(), Ok(()));
602    }
603
604    /// Compact the internal representation for `data_id` by removing all data
605    /// that is not needed to maintain the following invariants:
606    ///
607    ///   - The latest write and registration for each shard are kept in
608    ///     `self.datas`.
609    ///   - All unapplied writes and registrations are kept in `self.datas`.
610    ///   - All writes in `self.datas` are contained by some registration in
611    ///     `self.datas`.
612    fn compact_data_times(&mut self, data_id: &ShardId) {
613        let Some(times) = self.datas.get_mut(data_id) else {
614            return;
615        };
616
617        debug!("cache compact {:.9} times={:?}", data_id.to_string(), times);
618
619        if let Some(unapplied_write_ts) = self
620            .unapplied_batches
621            .first_key_value()
622            .map(|(_, (_, _, ts))| ts)
623        {
624            debug!(
625                "cache compact {:.9} unapplied_write_ts={:?}",
626                data_id.to_string(),
627                unapplied_write_ts,
628            );
629            while let Some(write_ts) = times.writes.front() {
630                if times.writes.len() == 1 || write_ts >= unapplied_write_ts {
631                    break;
632                }
633                times.writes.pop_front();
634            }
635        } else {
636            times.writes.drain(..times.writes.len() - 1);
637        }
638        let unapplied_reg_ts = self.unapplied_registers.front().map(|(_, ts)| ts);
639        let min_write_ts = times.writes.front();
640        let min_reg_ts = [unapplied_reg_ts, min_write_ts].into_iter().flatten().min();
641        if let Some(min_reg_ts) = min_reg_ts {
642            debug!(
643                "cache compact {:.9} unapplied_reg_ts={:?} min_write_ts={:?} min_reg_ts={:?}",
644                data_id.to_string(),
645                unapplied_reg_ts,
646                min_write_ts,
647                min_reg_ts,
648            );
649            while let Some(reg) = times.registered.front() {
650                match &reg.forget_ts {
651                    Some(forget_ts) if forget_ts >= min_reg_ts => break,
652                    _ if times.registered.len() == 1 => break,
653                    _ => {
654                        assert!(
655                            reg.forget_ts.is_some(),
656                            "only the latest reg can have no forget ts"
657                        );
658                        times.registered.pop_front();
659                    }
660                }
661            }
662        } else {
663            times.registered.drain(..times.registered.len() - 1);
664        }
665
666        debug!(
667            "cache compact DONE {:.9} times={:?}",
668            data_id.to_string(),
669            times
670        );
671    }
672
673    pub(crate) fn update_gauges(&self, metrics: &Metrics) {
674        metrics
675            .data_shard_count
676            .set(u64::cast_from(self.datas.len()));
677        metrics
678            .batches
679            .unapplied_count
680            .set(u64::cast_from(self.unapplied_batches.len()));
681        let unapplied_batches_bytes = self
682            .unapplied_batches
683            .values()
684            .map(|(_, x, _)| x.len())
685            .sum::<usize>();
686        metrics
687            .batches
688            .unapplied_bytes
689            .set(u64::cast_from(unapplied_batches_bytes));
690        metrics
691            .batches
692            .unapplied_min_ts
693            .set(encode_ts_metric(&Antichain::from_elem(
694                self.min_unapplied_ts().clone(),
695            )));
696    }
697
698    fn assert_only_data_id(&self, data_id: &ShardId) {
699        if let Some(only_data_id) = self.only_data_id.as_ref() {
700            assert_eq!(data_id, only_data_id);
701        }
702    }
703
704    pub(crate) fn validate(&self) -> Result<(), String> {
705        // Unapplied batches are all indexed and sorted.
706        if self.batch_idx.len() != self.unapplied_batches.len() {
707            return Err(format!(
708                "expected index len {} to match what it's indexing {}",
709                self.batch_idx.len(),
710                self.unapplied_batches.len()
711            ));
712        }
713
714        let mut prev_batch_ts = T::minimum();
715        for (idx, (_, batch, ts)) in self.unapplied_batches.iter() {
716            if self.batch_idx.get(batch) != Some(idx) {
717                return Err(format!(
718                    "expected batch to be indexed at {} got {:?}",
719                    idx,
720                    self.batch_idx.get(batch)
721                ));
722            }
723            if ts < &prev_batch_ts {
724                return Err(format!(
725                    "unapplied batch timestamp {:?} out of order after {:?}",
726                    ts, prev_batch_ts
727                ));
728            }
729            prev_batch_ts = ts.clone();
730        }
731
732        // Unapplied registers are sorted.
733        let mut prev_register_ts = T::minimum();
734        for (_, ts) in self.unapplied_registers.iter() {
735            if ts < &prev_register_ts {
736                return Err(format!(
737                    "unapplied register timestamp {:?} out of order after {:?}",
738                    ts, prev_register_ts
739                ));
740            }
741            prev_register_ts = ts.clone();
742        }
743
744        let min_unapplied_ts = self.min_unapplied_ts_inner();
745
746        for (data_id, data_times) in self.datas.iter() {
747            let () = data_times.validate()?;
748
749            if let Some(ts) = data_times.writes.front() {
750                // Writes are compacted.
751                if min_unapplied_ts > ts && data_times.writes.len() > 1 {
752                    return Err(format!(
753                        "{:?} write ts {:?} not past min unapplied ts {:?}",
754                        data_id, ts, min_unapplied_ts
755                    ));
756                }
757            }
758
759            // datas contains all unapplied writes.
760            if let Some((_, (_, _, unapplied_ts))) = self
761                .unapplied_batches
762                .iter()
763                .find(|(_, (shard_id, _, _))| shard_id == data_id)
764            {
765                if let Some(write_ts) = data_times.writes.front() {
766                    if write_ts > unapplied_ts {
767                        return Err(format!(
768                            "{:?} min write ts {:?} past min unapplied batch ts {:?}",
769                            data_id, write_ts, unapplied_ts
770                        ));
771                    }
772                }
773            }
774
775            // datas contains all unapplied register/forgets.
776            if let Some((_, unapplied_ts)) = self
777                .unapplied_registers
778                .iter()
779                .find(|(shard_id, _)| shard_id == data_id)
780            {
781                let register_ts = &data_times.first_reg().register_ts;
782                if register_ts > unapplied_ts {
783                    return Err(format!(
784                        "{:?} min register ts {:?} past min unapplied register ts {:?}",
785                        data_id, register_ts, unapplied_ts
786                    ));
787                }
788            }
789        }
790
791        Ok(())
792    }
793}
794
795impl<T, C> TxnsCache<T, C>
796where
797    T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
798    C: TxnsCodec,
799{
800    /// Initialize the txn shard at `init_ts` and returns a [TxnsCache] reading
801    /// from that shard.
802    pub(crate) async fn init(
803        init_ts: T,
804        txns_read: ReadHandle<C::Key, C::Val, T, i64>,
805        txns_write: &mut WriteHandle<C::Key, C::Val, T, i64>,
806    ) -> Self {
807        let () = crate::empty_caa(|| "txns init", txns_write, init_ts.clone()).await;
808        let mut ret = Self::from_read(txns_read, None).await;
809        let _ = ret.update_gt(&init_ts).await;
810        ret
811    }
812
813    /// Returns a [TxnsCache] reading from the given txn shard.
814    ///
815    /// `txns_id` identifies which shard will be used as the txns WAL. MZ will
816    /// likely have one of these per env, used by all processes and the same
817    /// across restarts.
818    pub async fn open(
819        client: &PersistClient,
820        txns_id: ShardId,
821        only_data_id: Option<ShardId>,
822    ) -> Self {
823        let (txns_key_schema, txns_val_schema) = C::schemas();
824        let txns_read = client
825            .open_leased_reader(
826                txns_id,
827                Arc::new(txns_key_schema),
828                Arc::new(txns_val_schema),
829                Diagnostics {
830                    shard_name: "txns".to_owned(),
831                    handle_purpose: "read txns".to_owned(),
832                },
833                USE_CRITICAL_SINCE_TXN.get(client.dyncfgs()),
834            )
835            .await
836            .expect("txns schema shouldn't change");
837        Self::from_read(txns_read, only_data_id).await
838    }
839
840    async fn from_read(
841        txns_read: ReadHandle<C::Key, C::Val, T, i64>,
842        only_data_id: Option<ShardId>,
843    ) -> Self {
844        let (state, txns_subscribe) = TxnsCacheState::init::<C>(only_data_id, txns_read).await;
845        TxnsCache {
846            txns_subscribe,
847            buf: Vec::new(),
848            state,
849        }
850    }
851
852    /// Invariant: afterward, self.progress_exclusive will be > ts
853    ///
854    /// Returns the `progress_exclusive` of the cache after updating.
855    #[must_use]
856    #[instrument(level = "debug", fields(ts = ?ts))]
857    pub async fn update_gt(&mut self, ts: &T) -> &T {
858        let only_data_id = self.only_data_id.clone();
859        Self::update(
860            &mut self.state,
861            &mut self.txns_subscribe,
862            &mut self.buf,
863            only_data_id,
864            |progress_exclusive| progress_exclusive > ts,
865        )
866        .await;
867        debug_assert!(&self.progress_exclusive > ts);
868        debug_assert_eq!(self.validate(), Ok(()));
869        &self.progress_exclusive
870    }
871
872    /// Invariant: afterward, self.progress_exclusive will be >= ts
873    ///
874    /// Returns the `progress_exclusive` of the cache after updating.
875    #[must_use]
876    #[instrument(level = "debug", fields(ts = ?ts))]
877    pub async fn update_ge(&mut self, ts: &T) -> &T {
878        let only_data_id = self.only_data_id.clone();
879        Self::update(
880            &mut self.state,
881            &mut self.txns_subscribe,
882            &mut self.buf,
883            only_data_id,
884            |progress_exclusive| progress_exclusive >= ts,
885        )
886        .await;
887        debug_assert!(&self.progress_exclusive >= ts);
888        debug_assert_eq!(self.validate(), Ok(()));
889        &self.progress_exclusive
890    }
891
892    /// Listen to the txns shard for events until `done` returns true.
893    async fn update<F: Fn(&T) -> bool>(
894        state: &mut TxnsCacheState<T>,
895        txns_subscribe: &mut Subscribe<C::Key, C::Val, T, i64>,
896        buf: &mut Vec<(TxnsEntry, T, i64)>,
897        only_data_id: Option<ShardId>,
898        done: F,
899    ) {
900        while !done(&state.progress_exclusive) {
901            let events = txns_subscribe.next(None).await;
902            for event in events {
903                match event {
904                    ListenEvent::Progress(frontier) => {
905                        let progress = frontier
906                            .into_option()
907                            .expect("nothing should close the txns shard");
908                        state.push_entries(std::mem::take(buf), progress);
909                    }
910                    ListenEvent::Updates(parts) => {
911                        Self::fetch_parts(only_data_id, txns_subscribe, parts, buf).await;
912                    }
913                };
914            }
915        }
916        debug_assert_eq!(state.validate(), Ok(()));
917        debug!(
918            "cache correct before {:?} len={} least_ts={:?}",
919            state.progress_exclusive,
920            state.unapplied_batches.len(),
921            state
922                .unapplied_batches
923                .first_key_value()
924                .map(|(_, (_, _, ts))| ts),
925        );
926    }
927
928    pub(crate) async fn fetch_parts(
929        only_data_id: Option<ShardId>,
930        txns_subscribe: &mut Subscribe<C::Key, C::Val, T, i64>,
931        parts: Vec<LeasedBatchPart<T>>,
932        updates: &mut Vec<(TxnsEntry, T, i64)>,
933    ) {
934        // We filter out unrelated data in two passes. The first is
935        // `should_fetch_part`, which allows us to skip entire fetches
936        // from s3/Blob. Then, if a part does need to be fetched, it
937        // still might contain info about unrelated data shards, and we
938        // filter those out before buffering in `updates`.
939        for part in parts {
940            let should_fetch_part = Self::should_fetch_part(only_data_id.as_ref(), &part);
941            debug!(
942                "should_fetch_part={} for {:?} {:?}",
943                should_fetch_part,
944                only_data_id,
945                part.stats()
946            );
947            if !should_fetch_part {
948                drop(part);
949                continue;
950            }
951            let part_updates = txns_subscribe.fetch_batch_part(part).await;
952            let part_updates = part_updates.map(|((k, v), t, d)| {
953                let (k, v) = (k.expect("valid key"), v.expect("valid val"));
954                (C::decode(k, v), t, d)
955            });
956            if let Some(only_data_id) = only_data_id.as_ref() {
957                updates.extend(part_updates.filter(|(x, _, _)| x.data_id() == only_data_id));
958            } else {
959                updates.extend(part_updates);
960            }
961        }
962    }
963
964    fn should_fetch_part(only_data_id: Option<&ShardId>, part: &LeasedBatchPart<T>) -> bool {
965        let Some(only_data_id) = only_data_id else {
966            return true;
967        };
968        // This `part.stats()` call involves decoding and the only_data_id=None
969        // case is common-ish, so make sure to keep it after that early return.
970        let Some(stats) = part.stats() else {
971            return true;
972        };
973        C::should_fetch_part(only_data_id, &stats).unwrap_or(true)
974    }
975}
976
977impl<T, C: TxnsCodec> Deref for TxnsCache<T, C> {
978    type Target = TxnsCacheState<T>;
979    fn deref(&self) -> &Self::Target {
980        &self.state
981    }
982}
983
984impl<T, C: TxnsCodec> DerefMut for TxnsCache<T, C> {
985    fn deref_mut(&mut self) -> &mut Self::Target {
986        &mut self.state
987    }
988}
989
990#[derive(Debug)]
991pub(crate) struct DataTimes<T> {
992    /// The times at which the data shard was in the txns set.
993    ///
994    /// Invariants:
995    ///
996    /// - At least one registration (otherwise we filter this out of the cache).
997    /// - These are in increasing order.
998    /// - These are non-overlapping intervals.
999    /// - Everything in writes is in one of these intervals.
1000    pub(crate) registered: VecDeque<DataRegistered<T>>,
1001    /// Invariant: These are in increasing order.
1002    pub(crate) writes: VecDeque<T>,
1003}
1004
1005impl<T> Default for DataTimes<T> {
1006    fn default() -> Self {
1007        Self {
1008            registered: Default::default(),
1009            writes: Default::default(),
1010        }
1011    }
1012}
1013
1014#[derive(Debug)]
1015pub(crate) struct DataRegistered<T> {
1016    /// The inclusive time at which the data shard was added to the txns set.
1017    ///
1018    /// If this time has been advanced by compaction, writes might be at times
1019    /// equal to it.
1020    pub(crate) register_ts: T,
1021    /// The inclusive time at which the data shard was removed from the txns
1022    /// set, or None if it hasn't yet been removed.
1023    pub(crate) forget_ts: Option<T>,
1024}
1025
1026impl<T: Timestamp + TotalOrder> DataRegistered<T> {
1027    pub(crate) fn contains(&self, ts: &T) -> bool {
1028        &self.register_ts <= ts && self.forget_ts.as_ref().map_or(true, |x| ts <= x)
1029    }
1030}
1031
1032impl<T: Timestamp + TotalOrder> DataTimes<T> {
1033    pub(crate) fn last_reg(&self) -> &DataRegistered<T> {
1034        self.registered.back().expect("at least one registration")
1035    }
1036
1037    fn first_reg(&self) -> &DataRegistered<T> {
1038        self.registered.front().expect("at least one registration")
1039    }
1040
1041    /// Returns the latest known physical upper of a data shard.
1042    fn latest_physical_ts(&self) -> &T {
1043        let last_reg = self.last_reg();
1044        let mut physical_ts = &last_reg.register_ts;
1045        if let Some(forget_ts) = &last_reg.forget_ts {
1046            physical_ts = max(physical_ts, forget_ts);
1047        }
1048        if let Some(latest_write) = self.writes.back() {
1049            physical_ts = max(physical_ts, latest_write);
1050        }
1051        physical_ts
1052    }
1053
1054    pub(crate) fn validate(&self) -> Result<(), String> {
1055        // Writes are sorted.
1056        let mut prev_ts = T::minimum();
1057        for ts in self.writes.iter() {
1058            if ts < &prev_ts {
1059                return Err(format!(
1060                    "write ts {:?} out of order after {:?}",
1061                    ts, prev_ts
1062                ));
1063            }
1064            prev_ts = ts.clone();
1065        }
1066
1067        // Registered is sorted and non-overlapping.
1068        let mut prev_ts = T::minimum();
1069        let mut writes_idx = 0;
1070        for x in self.registered.iter() {
1071            if x.register_ts < prev_ts {
1072                return Err(format!(
1073                    "register ts {:?} out of order after {:?}",
1074                    x.register_ts, prev_ts
1075                ));
1076            }
1077            if let Some(forget_ts) = x.forget_ts.as_ref() {
1078                if !(&x.register_ts <= forget_ts) {
1079                    return Err(format!(
1080                        "register ts {:?} not less_equal forget ts {:?}",
1081                        x.register_ts, forget_ts
1082                    ));
1083                }
1084                prev_ts.clone_from(forget_ts);
1085            }
1086            // Also peel off any writes in this interval.
1087            while let Some(write_ts) = self.writes.get(writes_idx) {
1088                if write_ts < &x.register_ts {
1089                    return Err(format!(
1090                        "write ts {:?} not in any register interval {:?}",
1091                        write_ts, self.registered
1092                    ));
1093                }
1094                if let Some(forget_ts) = x.forget_ts.as_ref() {
1095                    if write_ts <= forget_ts {
1096                        writes_idx += 1;
1097                        continue;
1098                    }
1099                }
1100                break;
1101            }
1102        }
1103
1104        // Check for writes after the last interval.
1105        let Some(reg_back) = self.registered.back() else {
1106            return Err("registered was empty".into());
1107        };
1108        if writes_idx != self.writes.len() && reg_back.forget_ts.is_some() {
1109            return Err(format!(
1110                "write ts {:?} not in any register interval {:?}",
1111                self.writes, self.registered
1112            ));
1113        }
1114
1115        Ok(())
1116    }
1117}
1118
1119#[derive(Debug)]
1120pub(crate) enum Unapplied<'a> {
1121    RegisterForget,
1122    Batch(Vec<&'a Vec<u8>>),
1123}
1124
1125#[cfg(test)]
1126mod tests {
1127    use DataListenNext::*;
1128    use mz_ore::assert_err;
1129    use mz_persist_client::PersistClient;
1130    use mz_persist_types::codec_impls::{ShardIdSchema, VecU8Schema};
1131
1132    use crate::operator::DataSubscribe;
1133    use crate::tests::reader;
1134    use crate::txns::TxnsHandle;
1135
1136    use super::*;
1137
1138    impl TxnsCache<u64, TxnsCodecDefault> {
1139        pub(crate) async fn expect_open(
1140            init_ts: u64,
1141            txns: &TxnsHandle<String, (), u64, i64>,
1142        ) -> Self {
1143            let mut ret = TxnsCache::open(&txns.datas.client, txns.txns_id(), None).await;
1144            let _ = ret.update_gt(&init_ts).await;
1145            ret
1146        }
1147
1148        pub(crate) async fn expect_snapshot(
1149            &mut self,
1150            client: &PersistClient,
1151            data_id: ShardId,
1152            as_of: u64,
1153        ) -> Vec<String> {
1154            let mut data_read = reader(client, data_id).await;
1155            let _ = self.update_gt(&as_of).await;
1156            let mut snapshot = self
1157                .data_snapshot(data_read.shard_id(), as_of)
1158                .snapshot_and_fetch(&mut data_read)
1159                .await
1160                .unwrap();
1161            snapshot.sort();
1162            snapshot
1163                .into_iter()
1164                .flat_map(|((k, v), _t, d)| {
1165                    let (k, ()) = (k.unwrap(), v.unwrap());
1166                    std::iter::repeat(k).take(usize::try_from(d).unwrap())
1167                })
1168                .collect()
1169        }
1170
1171        pub(crate) fn expect_subscribe(
1172            &self,
1173            client: &PersistClient,
1174            data_id: ShardId,
1175            as_of: u64,
1176        ) -> DataSubscribe {
1177            DataSubscribe::new(
1178                "test",
1179                client.clone(),
1180                self.txns_id,
1181                data_id,
1182                as_of,
1183                Antichain::new(),
1184                true,
1185            )
1186        }
1187    }
1188
1189    #[mz_ore::test]
1190    fn txns_cache_data_snapshot_and_listen_next() {
1191        let d0 = ShardId::new();
1192        let ds = |latest_write: Option<u64>, as_of: u64, empty_to: u64| -> DataSnapshot<u64> {
1193            DataSnapshot {
1194                data_id: d0,
1195                latest_write,
1196                as_of,
1197                empty_to,
1198            }
1199        };
1200        #[track_caller]
1201        fn testcase(
1202            cache: &mut TxnsCacheState<u64>,
1203            ts: u64,
1204            data_id: ShardId,
1205            snap_expected: DataSnapshot<u64>,
1206            listen_expected: DataListenNext<u64>,
1207        ) {
1208            cache.progress_exclusive = ts + 1;
1209            assert_eq!(cache.data_snapshot(data_id, ts), snap_expected);
1210            assert_eq!(cache.data_listen_next(&data_id, &ts), listen_expected);
1211            assert_eq!(
1212                cache.data_listen_next(&data_id, &(ts + 1)),
1213                WaitForTxnsProgress
1214            );
1215        }
1216
1217        // This attempts to exercise all the various interesting edge cases of
1218        // data_snapshot and data_listen_subscribe using the following sequence
1219        // of events:
1220        //
1221        // - Registrations at: [2,8], [15,16]
1222        // - Direct writes at: 1, 13
1223        // - Writes via txns at: 4, 5, 7
1224        let mut c = TxnsCacheState::new(ShardId::new(), 0, None);
1225
1226        // empty
1227        assert_eq!(c.progress_exclusive, 0);
1228        #[allow(clippy::disallowed_methods)] // not using enhanced panic handler in tests
1229        let result = std::panic::catch_unwind(|| c.data_snapshot(d0, 0));
1230        assert_err!(result);
1231        assert_eq!(c.data_listen_next(&d0, &0), WaitForTxnsProgress);
1232
1233        // ts 0 (never registered)
1234        testcase(&mut c, 0, d0, ds(None, 0, 1), ReadDataTo(1));
1235
1236        // ts 1 (direct write)
1237        // - The cache knows everything < 2.
1238        // - d0 is not registered in the cache.
1239        // - We know the shard can't be written to via txn < 2.
1240        // - So go read the shard normally up to 2.
1241        testcase(&mut c, 1, d0, ds(None, 1, 2), ReadDataTo(2));
1242
1243        // ts 2 (register)
1244        c.push_register(d0, 2, 1, 2);
1245        testcase(&mut c, 2, d0, ds(None, 2, 3), ReadDataTo(3));
1246
1247        // ts 3 (registered, not written)
1248        testcase(&mut c, 3, d0, ds(None, 3, 4), EmitLogicalProgress(4));
1249
1250        // ts 4 (written via txns)
1251        c.push_append(d0, vec![4], 4, 1);
1252        testcase(&mut c, 4, d0, ds(Some(4), 4, 5), ReadDataTo(5));
1253
1254        // ts 5 (written via txns, write at preceding ts)
1255        c.push_append(d0, vec![5], 5, 1);
1256        testcase(&mut c, 5, d0, ds(Some(5), 5, 6), ReadDataTo(6));
1257
1258        // ts 6 (registered, not written, write at preceding ts)
1259        testcase(&mut c, 6, d0, ds(Some(5), 6, 7), EmitLogicalProgress(7));
1260
1261        // ts 7 (written via txns, write at non-preceding ts)
1262        c.push_append(d0, vec![7], 7, 1);
1263        testcase(&mut c, 7, d0, ds(Some(7), 7, 8), ReadDataTo(8));
1264
1265        // ts 8 (apply and tidy write from ts 4)
1266        c.push_append(d0, vec![4], 8, -1);
1267        testcase(&mut c, 8, d0, ds(Some(7), 8, 9), EmitLogicalProgress(9));
1268
1269        // ts 9 (apply and tidy write from ts 5)
1270        c.push_append(d0, vec![5], 9, -1);
1271        testcase(&mut c, 9, d0, ds(Some(7), 9, 10), EmitLogicalProgress(10));
1272
1273        // ts 10 (apply and tidy write from ts 7)
1274        c.push_append(d0, vec![7], 10, -1);
1275        testcase(&mut c, 10, d0, ds(None, 10, 11), EmitLogicalProgress(11));
1276
1277        // ts 11 (forget)
1278        // Revisit when
1279        // https://github.com/MaterializeInc/database-issues/issues/7746 is fixed,
1280        // it's unclear how to encode the register timestamp in a forget.
1281        c.push_register(d0, 11, -1, 11);
1282        testcase(&mut c, 11, d0, ds(None, 11, 12), ReadDataTo(12));
1283
1284        // ts 12 (not registered, not written). This ReadDataTo would block until
1285        // the write happens at ts 13.
1286        testcase(&mut c, 12, d0, ds(None, 12, 13), ReadDataTo(13));
1287
1288        // ts 13 (written directly)
1289        testcase(&mut c, 13, d0, ds(None, 13, 14), ReadDataTo(14));
1290
1291        // ts 14 (not registered, not written) This ReadDataTo would block until
1292        // the register happens at 15.
1293        testcase(&mut c, 14, d0, ds(None, 14, 15), ReadDataTo(15));
1294
1295        // ts 15 (registered, previously forgotten)
1296        c.push_register(d0, 15, 1, 15);
1297        testcase(&mut c, 15, d0, ds(None, 15, 16), ReadDataTo(16));
1298
1299        // ts 16 (forgotten, registered at preceding ts)
1300        // Revisit when
1301        // https://github.com/MaterializeInc/database-issues/issues/7746 is fixed,
1302        // it's unclear how to encode the register timestamp in a forget.
1303        c.push_register(d0, 16, -1, 16);
1304        testcase(&mut c, 16, d0, ds(None, 16, 17), ReadDataTo(17));
1305
1306        // Now that we have more history, some of the old answers change! In
1307        // particular, we have more information on unapplied writes, empty
1308        // times, and can ReadDataTo much later times.
1309
1310        assert_eq!(c.data_snapshot(d0, 0), ds(None, 0, 1));
1311        assert_eq!(c.data_snapshot(d0, 1), ds(None, 1, 2));
1312        assert_eq!(c.data_snapshot(d0, 2), ds(None, 2, 3));
1313        assert_eq!(c.data_snapshot(d0, 3), ds(None, 3, 4));
1314        assert_eq!(c.data_snapshot(d0, 4), ds(None, 4, 5));
1315        assert_eq!(c.data_snapshot(d0, 5), ds(None, 5, 6));
1316        assert_eq!(c.data_snapshot(d0, 6), ds(None, 6, 7));
1317        assert_eq!(c.data_snapshot(d0, 7), ds(None, 7, 8));
1318        assert_eq!(c.data_snapshot(d0, 8), ds(None, 8, 9));
1319        assert_eq!(c.data_snapshot(d0, 9), ds(None, 9, 10));
1320        assert_eq!(c.data_snapshot(d0, 10), ds(None, 10, 11));
1321        assert_eq!(c.data_snapshot(d0, 11), ds(None, 11, 12));
1322        assert_eq!(c.data_snapshot(d0, 12), ds(None, 12, 13));
1323        assert_eq!(c.data_snapshot(d0, 13), ds(None, 13, 14));
1324        assert_eq!(c.data_snapshot(d0, 14), ds(None, 14, 15));
1325        assert_eq!(c.data_snapshot(d0, 15), ds(None, 15, 16));
1326        assert_eq!(c.data_snapshot(d0, 16), ds(None, 16, 17));
1327
1328        assert_eq!(c.data_listen_next(&d0, &0), ReadDataTo(17));
1329        assert_eq!(c.data_listen_next(&d0, &1), ReadDataTo(17));
1330        assert_eq!(c.data_listen_next(&d0, &2), ReadDataTo(17));
1331        assert_eq!(c.data_listen_next(&d0, &3), ReadDataTo(17));
1332        assert_eq!(c.data_listen_next(&d0, &4), ReadDataTo(17));
1333        assert_eq!(c.data_listen_next(&d0, &5), ReadDataTo(17));
1334        assert_eq!(c.data_listen_next(&d0, &6), ReadDataTo(17));
1335        assert_eq!(c.data_listen_next(&d0, &7), ReadDataTo(17));
1336        assert_eq!(c.data_listen_next(&d0, &8), ReadDataTo(17));
1337        assert_eq!(c.data_listen_next(&d0, &9), ReadDataTo(17));
1338        assert_eq!(c.data_listen_next(&d0, &10), ReadDataTo(17));
1339        assert_eq!(c.data_listen_next(&d0, &11), ReadDataTo(17));
1340        assert_eq!(c.data_listen_next(&d0, &12), ReadDataTo(17));
1341        assert_eq!(c.data_listen_next(&d0, &13), ReadDataTo(17));
1342        assert_eq!(c.data_listen_next(&d0, &14), ReadDataTo(17));
1343        assert_eq!(c.data_listen_next(&d0, &15), ReadDataTo(17));
1344        assert_eq!(c.data_listen_next(&d0, &16), ReadDataTo(17));
1345        assert_eq!(c.data_listen_next(&d0, &17), WaitForTxnsProgress);
1346    }
1347
1348    #[mz_ore::test(tokio::test)]
1349    #[cfg_attr(miri, ignore)] // too slow
1350    async fn empty_to() {
1351        let client = PersistClient::new_for_tests().await;
1352        let mut txns = TxnsHandle::expect_open(client.clone()).await;
1353        let d0 = txns.expect_register(1).await;
1354
1355        // During code review, we discussed an alternate implementation of
1356        // empty_to that was an Option: None when we knew about a write > the
1357        // as_of, and Some when we didn't. The None case would mean that we
1358        // don't need to CaA empty updates in. This is quite appealing, but
1359        // would cause an issue with the guarantee that `apply_le(as_of)` is
1360        // sufficient to unblock a read. Specifically:
1361        //
1362        // - Write at 3, but don't apply.
1363        // - Write at 5, but don't apply.
1364        // - Catch the cache up past the write at 5.
1365        // - Run apply_le(4) to unblock a read a 4.
1366        // - Run a snapshot at 4.
1367        // - If nothing else applies the write at 5, the snapshot would
1368        //   deadlock.
1369        for ts in [3, 5] {
1370            let mut txn = txns.begin();
1371            txn.write(&d0, "3".into(), (), 1).await;
1372            let _apply = txn.commit_at(&mut txns, ts).await.unwrap();
1373        }
1374        let _ = txns.txns_cache.update_gt(&5).await;
1375        txns.apply_le(&4).await;
1376        let snap = txns.txns_cache.data_snapshot(d0, 4);
1377        let mut data_read = reader(&client, d0).await;
1378        // This shouldn't deadlock.
1379        let contents = snap.snapshot_and_fetch(&mut data_read).await.unwrap();
1380        assert_eq!(contents.len(), 1);
1381
1382        // Sanity check that the scenario played out like we said above.
1383        assert_eq!(snap.empty_to, 5);
1384    }
1385
1386    #[mz_ore::test]
1387    fn data_times_validate() {
1388        fn dt(register_forget_ts: &[u64], write_ts: &[u64]) -> Result<(), ()> {
1389            let mut dt = DataTimes::default();
1390            for x in register_forget_ts {
1391                if let Some(back) = dt.registered.back_mut() {
1392                    if back.forget_ts == None {
1393                        back.forget_ts = Some(*x);
1394                        continue;
1395                    }
1396                }
1397                dt.registered.push_back(DataRegistered {
1398                    register_ts: *x,
1399                    forget_ts: None,
1400                })
1401            }
1402            dt.writes = write_ts.into_iter().cloned().collect();
1403            dt.validate().map_err(|_| ())
1404        }
1405
1406        // Valid
1407        assert_eq!(dt(&[1], &[2, 3]), Ok(()));
1408        assert_eq!(dt(&[1, 3], &[2]), Ok(()));
1409        assert_eq!(dt(&[1, 3, 5], &[2, 6, 7]), Ok(()));
1410        assert_eq!(dt(&[1, 3, 5], &[2, 6, 7]), Ok(()));
1411        assert_eq!(dt(&[1, 1], &[1]), Ok(()));
1412
1413        // Invalid
1414        assert_eq!(dt(&[], &[]), Err(()));
1415        assert_eq!(dt(&[1], &[0]), Err(()));
1416        assert_eq!(dt(&[1, 3], &[4]), Err(()));
1417        assert_eq!(dt(&[1, 3, 5], &[4]), Err(()));
1418        assert_eq!(dt(&[1, 4], &[3, 2]), Err(()));
1419    }
1420
1421    /// Regression test for a bug caught by higher level tests in CI:
1422    /// - Commit a write at 5
1423    /// - Apply it and commit the tidy retraction at 20.
1424    /// - Catch up to both of these in the TxnsHandle and call compact_to(10).
1425    ///   The TxnsHandle knows the write has been applied and lets it CaDS the
1426    ///   txns shard since to 10.
1427    /// - Open a TxnsCache starting at the txns shard since (10) to serve a
1428    ///   snapshot at 12. Catch it up through 12, but _not_ the tidy at 20.
1429    /// - This TxnsCache gets the write with a ts compacted forward to 10, but
1430    ///   no retraction. The snapshot resolves with an incorrect latest_write of
1431    ///   `Some(10)`.
1432    /// - The unblock read waits for this write to be applied before doing the
1433    ///   empty CaA, but this write never existed so it hangs forever.
1434    #[mz_ore::test(tokio::test)]
1435    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1436    async fn regression_compact_latest_write() {
1437        let client = PersistClient::new_for_tests().await;
1438        let mut txns = TxnsHandle::expect_open(client.clone()).await;
1439        let log = txns.new_log();
1440        let d0 = txns.expect_register(1).await;
1441
1442        let tidy_5 = txns.expect_commit_at(5, d0, &["5"], &log).await;
1443        let _ = txns.expect_commit_at(15, d0, &["15"], &log).await;
1444        txns.tidy_at(20, tidy_5).await.unwrap();
1445        let _ = txns.txns_cache.update_gt(&20).await;
1446        assert_eq!(txns.txns_cache.min_unapplied_ts(), &15);
1447        txns.compact_to(10).await;
1448
1449        let mut txns_read = client
1450            .open_leased_reader(
1451                txns.txns_id(),
1452                Arc::new(ShardIdSchema),
1453                Arc::new(VecU8Schema),
1454                Diagnostics::for_tests(),
1455                true,
1456            )
1457            .await
1458            .expect("txns schema shouldn't change");
1459        txns_read.downgrade_since(&Antichain::from_elem(10)).await;
1460        let mut cache = TxnsCache::<_, TxnsCodecDefault>::from_read(txns_read, None).await;
1461        let _ = cache.update_gt(&15).await;
1462        let snap = cache.data_snapshot(d0, 12);
1463        assert_eq!(snap.latest_write, Some(5));
1464    }
1465
1466    // Regression test for a bug where we were sorting TxnEvents by the
1467    // compacted timestamp instead of the original one when applying them to a
1468    // cache. This caused them to be applied in a surprising order (e.g. forget
1469    // before register).
1470    #[mz_ore::test(tokio::test)]
1471    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1472    async fn regression_ts_sort() {
1473        let client = PersistClient::new_for_tests().await;
1474        let txns = TxnsHandle::expect_open(client.clone()).await;
1475        let mut cache = TxnsCache::expect_open(0, &txns).await;
1476        let d0 = ShardId::new();
1477
1478        // With the bug, this panics via an internal sanity assertion.
1479        cache.push_entries(
1480            vec![
1481                (TxnsEntry::Register(d0, u64::encode(&2)), 2, -1),
1482                (TxnsEntry::Register(d0, u64::encode(&1)), 2, 1),
1483            ],
1484            3,
1485        );
1486    }
1487
1488    /// Tests that `data_snapshot` and `data_listen_next` properly handle an
1489    /// `init_ts` > 0.
1490    #[mz_ore::test(tokio::test)]
1491    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1492    async fn data_compacted() {
1493        let d0 = ShardId::new();
1494        let mut c = TxnsCacheState::new(ShardId::new(), 10, None);
1495        c.progress_exclusive = 20;
1496
1497        #[allow(clippy::disallowed_methods)] // not using enhanced panic handler in tests
1498        let result = std::panic::catch_unwind(|| c.data_listen_next(&d0, &0));
1499        assert_err!(result);
1500
1501        let ds = c.data_snapshot(d0, 0);
1502        assert_eq!(
1503            ds,
1504            DataSnapshot {
1505                data_id: d0,
1506                latest_write: None,
1507                as_of: 0,
1508                empty_to: 10,
1509            }
1510        );
1511    }
1512}