mz_txn_wal/
txn_cache.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A cache of the txn shard contents.
11
12use std::cmp::{max, min};
13use std::collections::{BTreeMap, VecDeque};
14use std::fmt::Debug;
15use std::ops::{Deref, DerefMut};
16use std::sync::Arc;
17
18use differential_dataflow::consolidation::consolidate_updates;
19use differential_dataflow::hashable::Hashable;
20use differential_dataflow::lattice::Lattice;
21use itertools::Itertools;
22use mz_ore::cast::CastFrom;
23use mz_ore::collections::HashMap;
24use mz_ore::instrument;
25use mz_persist_client::cfg::USE_CRITICAL_SINCE_TXN;
26use mz_persist_client::fetch::LeasedBatchPart;
27use mz_persist_client::metrics::encode_ts_metric;
28use mz_persist_client::read::{ListenEvent, ReadHandle, Subscribe};
29use mz_persist_client::write::WriteHandle;
30use mz_persist_client::{Diagnostics, PersistClient, ShardId};
31use mz_persist_types::txn::{TxnsCodec, TxnsEntry};
32use mz_persist_types::{Codec64, StepForward};
33use timely::order::TotalOrder;
34use timely::progress::{Antichain, Timestamp};
35use tracing::debug;
36
37use crate::TxnsCodecDefault;
38use crate::metrics::Metrics;
39use crate::txn_read::{DataListenNext, DataRemapEntry, DataSnapshot, DataSubscribe};
40
41/// A cache of the txn shard contents, optimized for various in-memory
42/// operations.
43///
44/// # Implementation Details
45///
46/// Reads of data shards are almost as straightforward as writes. A data shard
47/// may be read normally, using snapshots, subscriptions, shard_source, etc,
48/// through the most recent non-empty write. However, the upper of the txns
49/// shard (and thus the logical upper of the data shard) may be arbitrarily far
50/// ahead of the physical upper of the data shard. As a result, we do the
51/// following:
52///
53/// - To take a snapshot of a data shard, the `as_of` is passed through
54///   unchanged if the timestamp of that shard's latest non-empty write is past
55///   it. Otherwise, we know the times between them have no writes and can fill
56///   them with empty updates. Concretely, to read a snapshot as of `T`:
57///   - We read the txns shard contents up through and including `T`, blocking
58///     until the upper passes `T` if necessary.
59///   - We then find, for the requested data shard, the latest non-empty write
60///     at a timestamp `T' <= T`.
61///   - We wait for `T'` to be applied by watching the data shard upper.
62///   - We `compare_and_append` empty updates for `(T', T]`, which is known by
63///     the txn system to not have writes for this shard (otherwise we'd have
64///     picked a different `T'`).
65///   - We read the snapshot at `T` as normal.
66/// - To iterate a listen on a data shard, when writes haven't been read yet
67///   they are passed through unchanged, otherwise if the txns shard indicates
68///   that there are ranges of empty time progress is returned, otherwise
69///   progress to the txns shard will indicate when new information is
70///   available.
71///
72/// Note that all of the above can be determined solely by information in the
73/// txns shard. In particular, non-empty writes are indicated by updates with
74/// positive diffs.
75///
76/// Also note that the above is structured such that it is possible to write a
77/// timely operator with the data shard as an input, passing on all payloads
78/// unchanged and simply manipulating capabilities in response to data and txns
79/// shard progress. See [crate::operator::txns_progress].
80#[derive(Debug)]
81pub struct TxnsCacheState<T> {
82    txns_id: ShardId,
83    /// The since of the txn_shard when this cache was initialized.
84    /// Some writes with a timestamp < than this may have been applied and
85    /// tidied, so this cache has no way of learning about them.
86    ///
87    /// Invariant: never changes.
88    pub(crate) init_ts: T,
89    /// The contents of this cache are updated up to, but not including, this time.
90    pub(crate) progress_exclusive: T,
91
92    next_batch_id: usize,
93    /// The batches needing application as of the current progress.
94    ///
95    /// This is indexed by a "batch id" that is internal to this object because
96    /// timestamps are not unique.
97    ///
98    /// Invariant: Values are sorted by timestamp.
99    pub(crate) unapplied_batches: BTreeMap<usize, (ShardId, Vec<u8>, T)>,
100    /// An index into `unapplied_batches` keyed by the serialized batch.
101    batch_idx: HashMap<Vec<u8>, usize>,
102    /// The times at which each data shard has been written.
103    ///
104    /// Invariant: Contains all unapplied writes and registers.
105    /// Invariant: Contains the latest write and registertaion >= init_ts for all shards.
106    pub(crate) datas: BTreeMap<ShardId, DataTimes<T>>,
107    /// The registers and forgets needing application as of the current progress.
108    ///
109    /// Invariant: Values are sorted by timestamp.
110    pub(crate) unapplied_registers: VecDeque<(ShardId, T)>,
111
112    /// If Some, this cache only tracks the indicated data shard as a
113    /// performance optimization. When used, only some methods (in particular,
114    /// the ones necessary for the txns_progress operator) are supported.
115    ///
116    /// TODO: It'd be nice to make this a compile time thing. I have some ideas,
117    /// but they're decently invasive, so leave it for a followup.
118    only_data_id: Option<ShardId>,
119}
120
121/// A self-updating [TxnsCacheState].
122#[derive(Debug)]
123pub struct TxnsCache<T, C: TxnsCodec = TxnsCodecDefault> {
124    /// A subscribe over the txn shard.
125    pub(crate) txns_subscribe: Subscribe<C::Key, C::Val, T, i64>,
126    /// Pending updates for timestamps that haven't closed.
127    pub(crate) buf: Vec<(TxnsEntry, T, i64)>,
128    state: TxnsCacheState<T>,
129}
130
131impl<T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync> TxnsCacheState<T> {
132    /// Creates a new empty [`TxnsCacheState`].
133    ///
134    /// `init_ts` must be == the critical handle's since of the txn shard.
135    fn new(txns_id: ShardId, init_ts: T, only_data_id: Option<ShardId>) -> Self {
136        TxnsCacheState {
137            txns_id,
138            init_ts,
139            progress_exclusive: T::minimum(),
140            next_batch_id: 0,
141            unapplied_batches: BTreeMap::new(),
142            batch_idx: HashMap::new(),
143            datas: BTreeMap::new(),
144            unapplied_registers: VecDeque::new(),
145            only_data_id,
146        }
147    }
148
149    /// Creates and initializes a new [`TxnsCacheState`].
150    ///
151    /// `txns_read` is a [`ReadHandle`] on the txn shard.
152    pub(crate) async fn init<C: TxnsCodec>(
153        only_data_id: Option<ShardId>,
154        txns_read: ReadHandle<C::Key, C::Val, T, i64>,
155    ) -> (Self, Subscribe<C::Key, C::Val, T, i64>) {
156        let txns_id = txns_read.shard_id();
157        let as_of = txns_read.since().clone();
158        let since_ts = as_of.as_option().expect("txns shard is not closed").clone();
159        let mut txns_subscribe = txns_read
160            .subscribe(as_of)
161            .await
162            .expect("handle holds a capability");
163        let mut state = Self::new(txns_id, since_ts.clone(), only_data_id.clone());
164        let mut buf = Vec::new();
165        // The cache must be updated to `since_ts` to maintain the invariant
166        // that `state.since_ts <= state.progress_exclusive`.
167        TxnsCache::<T, C>::update(
168            &mut state,
169            &mut txns_subscribe,
170            &mut buf,
171            only_data_id,
172            |progress_exclusive| progress_exclusive >= &since_ts,
173        )
174        .await;
175        debug_assert_eq!(state.validate(), Ok(()));
176        (state, txns_subscribe)
177    }
178
179    /// Returns the [ShardId] of the txns shard.
180    pub fn txns_id(&self) -> ShardId {
181        self.txns_id
182    }
183
184    /// Returns whether the data shard was registered to the txns set as of the
185    /// current progress.
186    ///
187    /// Specifically, a data shard is registered if the most recent register
188    /// timestamp is set but the most recent forget timestamp is not set.
189    ///
190    /// This function accepts a timestamp as input, but that timestamp must be
191    /// equal to the progress exclusive, or else the function panics. It mainly
192    /// acts as a way for the caller to think about the logical time at which
193    /// this function executes. Times in the past may have been compacted away,
194    /// and we can't always return an accurate answer. If this function isn't
195    /// sufficient, you can usually find what you're looking for by inspecting
196    /// the times in the most recent registration.
197    pub fn registered_at_progress(&self, data_id: &ShardId, ts: &T) -> bool {
198        self.assert_only_data_id(data_id);
199        assert_eq!(self.progress_exclusive, *ts);
200        let Some(data_times) = self.datas.get(data_id) else {
201            return false;
202        };
203        data_times.last_reg().forget_ts.is_none()
204    }
205
206    /// Returns the set of all data shards registered to the txns set as of the
207    /// current progress. See [Self::registered_at_progress].
208    pub(crate) fn all_registered_at_progress(&self, ts: &T) -> Vec<ShardId> {
209        assert_eq!(self.only_data_id, None);
210        assert_eq!(self.progress_exclusive, *ts);
211        self.datas
212            .iter()
213            .filter(|(_, data_times)| data_times.last_reg().forget_ts.is_none())
214            .map(|(data_id, _)| *data_id)
215            .collect()
216    }
217
218    /// Returns a token exchangeable for a snapshot of a data shard.
219    ///
220    /// A data shard might be definite at times past the physical upper because
221    /// of invariants maintained by this txn system. As a result, this method
222    /// discovers the latest potentially unapplied write before the `as_of`.
223    ///
224    /// Callers must first wait for [`TxnsCache::update_gt`] with the same or
225    /// later timestamp to return. Panics otherwise.
226    pub fn data_snapshot(&self, data_id: ShardId, as_of: T) -> DataSnapshot<T> {
227        self.assert_only_data_id(&data_id);
228        assert!(self.progress_exclusive > as_of);
229        // `empty_to` will often be used as the input to `data_listen_next`.
230        // `data_listen_next` needs a timestamp that is greater than or equal
231        // to the init_ts. See the comment above the assert in
232        // `data_listen_next` for more details.
233        //
234        // TODO: Once the txn shard itself always tracks the most recent write
235        // for every shard, we can remove this and always use
236        // `as_of.step_forward()`.
237        let empty_to = max(as_of.step_forward(), self.init_ts.clone());
238        let Some(all) = self.datas.get(&data_id) else {
239            // Not registered currently, so we know there are no unapplied
240            // writes.
241            return DataSnapshot {
242                data_id,
243                latest_write: None,
244                as_of,
245                empty_to,
246            };
247        };
248
249        let min_unapplied_ts = self
250            .unapplied_batches
251            .first_key_value()
252            .map(|(_, (_, _, ts))| ts)
253            .unwrap_or(&self.progress_exclusive);
254        let latest_write = all
255            .writes
256            .iter()
257            .rev()
258            .find(|x| **x <= as_of && *x >= min_unapplied_ts)
259            .cloned();
260
261        debug!(
262            "data_snapshot {:.9} latest_write={:?} as_of={:?} empty_to={:?}: all={:?}",
263            data_id.to_string(),
264            latest_write,
265            as_of,
266            empty_to,
267            all,
268        );
269        let ret = DataSnapshot {
270            data_id: data_id.clone(),
271            latest_write,
272            as_of,
273            empty_to,
274        };
275        assert_eq!(ret.validate(), Ok(()));
276        ret
277    }
278
279    // TODO(jkosh44) This method can likely be simplified to return
280    // DataRemapEntry directly.
281    /// Returns the next action to take when iterating a Listen on a data shard.
282    ///
283    /// A data shard Listen is executed by repeatedly calling this method with
284    /// an exclusive progress frontier. The returned value indicates an action
285    /// to take. Some of these actions advance the progress frontier, which
286    /// results in calling this method again with a higher timestamp, and thus a
287    /// new action. See [DataListenNext] for specifications of the actions.
288    ///
289    /// Note that this is a state machine on `self.progress_exclusive` and the
290    /// listen progress. DataListenNext indicates which state transitions to
291    /// take.
292    pub fn data_listen_next(&self, data_id: &ShardId, ts: &T) -> DataListenNext<T> {
293        self.assert_only_data_id(data_id);
294        assert!(
295            &self.progress_exclusive >= ts,
296            "ts {:?} is past progress_exclusive {:?}",
297            ts,
298            self.progress_exclusive
299        );
300        // There may be applied and tidied writes before the init_ts that the
301        // cache is unaware of. So if this method is called with a timestamp
302        // less than the initial since, it may mistakenly tell the caller to
303        // `EmitLogicalProgress(self.progress_exclusive)` instead of the
304        // correct answer of `ReadTo(tidied_write_ts)`.
305        //
306        // We know for a fact that there are no unapplied writes, registers, or
307        // forgets before the init_ts because the since of the txn shard is
308        // always held back to the earliest unapplied event. There may be some
309        // untidied events with a lower timestamp than the init_ts, but they
310        // are guaranteed to be applied.
311        //
312        // TODO: Once the txn shard itself always tracks the most recent write
313        // for every shard, we can remove this assert. It will always be
314        // correct to return ReadTo(latest_write_ts) if there are any writes,
315        // and then `EmitLogicalProgress(self.progress_exclusive)`.
316        assert!(
317            ts >= &self.init_ts,
318            "ts {:?} is not past initial since {:?}",
319            ts,
320            self.init_ts
321        );
322        use DataListenNext::*;
323        let data_times = self.datas.get(data_id);
324        debug!(
325            "data_listen_next {:.9} {:?}: progress={:?} times={:?}",
326            data_id.to_string(),
327            ts,
328            self.progress_exclusive,
329            data_times,
330        );
331        let Some(data_times) = data_times else {
332            // Not registered, maybe it will be in the future? In the meantime,
333            // treat it like a normal shard (i.e. pass through reads) and check
334            // again later.
335            if ts < &self.progress_exclusive {
336                return ReadDataTo(self.progress_exclusive.clone());
337            } else {
338                return WaitForTxnsProgress;
339            }
340        };
341        let physical_ts = data_times.latest_physical_ts();
342        let last_reg = data_times.last_reg();
343        if ts >= &self.progress_exclusive {
344            // All caught up, we have to wait.
345            WaitForTxnsProgress
346        } else if ts <= physical_ts {
347            // There was some physical write, so read up to that time.
348            ReadDataTo(physical_ts.step_forward())
349        } else if last_reg.forget_ts.is_none() {
350            // Emitting logical progress at the wrong time is a correctness bug,
351            // so be extra defensive about the necessary conditions: the most
352            // recent registration is still active, and we're in it.
353            assert!(last_reg.contains(ts));
354            EmitLogicalProgress(self.progress_exclusive.clone())
355        } else {
356            // The most recent forget is set, which means it's not registered as of
357            // the latest information we have. Read to the current progress point
358            // normally.
359
360            assert!(ts > &last_reg.register_ts && last_reg.forget_ts.is_some());
361            ReadDataTo(self.progress_exclusive.clone())
362        }
363    }
364
365    /// Returns a token exchangeable for a subscribe of a data shard.
366    ///
367    /// Callers must first wait for [`TxnsCache::update_gt`] with the same or
368    /// later timestamp to return. Panics otherwise.
369    pub(crate) fn data_subscribe(&self, data_id: ShardId, as_of: T) -> DataSubscribe<T> {
370        self.assert_only_data_id(&data_id);
371        assert!(self.progress_exclusive > as_of);
372        let snapshot = self.data_snapshot(data_id, as_of);
373        let remap = DataRemapEntry {
374            physical_upper: snapshot.empty_to.clone(),
375            logical_upper: snapshot.empty_to.clone(),
376        };
377        DataSubscribe {
378            data_id,
379            snapshot: Some(snapshot),
380            remap,
381        }
382    }
383
384    /// Returns the minimum timestamp not known to be applied by this cache.
385    pub fn min_unapplied_ts(&self) -> &T {
386        assert_eq!(self.only_data_id, None);
387        self.min_unapplied_ts_inner()
388    }
389
390    fn min_unapplied_ts_inner(&self) -> &T {
391        // We maintain an invariant that the values in the unapplied_batches map
392        // are sorted by timestamp, thus the first one must be the minimum.
393        let min_batch_ts = self
394            .unapplied_batches
395            .first_key_value()
396            .map(|(_, (_, _, ts))| ts)
397            // If we don't have any known unapplied batches, then the next
398            // timestamp that could be written must potentially have an
399            // unapplied batch.
400            .unwrap_or(&self.progress_exclusive);
401        let min_register_ts = self
402            .unapplied_registers
403            .front()
404            .map(|(_, ts)| ts)
405            .unwrap_or(&self.progress_exclusive);
406
407        min(min_batch_ts, min_register_ts)
408    }
409
410    /// Returns the operations needing application as of the current progress.
411    pub(crate) fn unapplied(&self) -> impl Iterator<Item = (&ShardId, Unapplied<'_>, &T)> {
412        assert_eq!(self.only_data_id, None);
413        let registers = self
414            .unapplied_registers
415            .iter()
416            .map(|(data_id, ts)| (data_id, Unapplied::RegisterForget, ts));
417        let batches = self
418            .unapplied_batches
419            .values()
420            .fold(
421                BTreeMap::new(),
422                |mut accum: BTreeMap<_, Vec<_>>, (data_id, batch, ts)| {
423                    accum.entry((ts, data_id)).or_default().push(batch);
424                    accum
425                },
426            )
427            .into_iter()
428            .map(|((ts, data_id), batches)| (data_id, Unapplied::Batch(batches), ts));
429        // This will emit registers and forgets before batches at the same timestamp. Currently,
430        // this is fine because for a single data shard you can't combine registers, forgets, and
431        // batches at the same timestamp. In the future if we allow combining these operations in
432        // a single op, then we probably want to emit registers, then batches, then forgets or we
433        // can make forget exclusive in which case we'd emit it before batches.
434        registers.merge_by(batches, |(_, _, ts1), (_, _, ts2)| ts1 <= ts2)
435    }
436
437    /// Filters out retractions known to have made it into the txns shard.
438    ///
439    /// This is called with a set of things that are known to have been applied
440    /// and in preparation for retracting them. The caller will attempt to
441    /// retract everything not filtered out by this method in a CaA with an
442    /// expected upper of `expected_txns_upper`. So, we catch up to that point,
443    /// and keep everything that is still outstanding. If the CaA fails with an
444    /// expected upper mismatch, then it must call this method again on the next
445    /// attempt with the new expected upper (new retractions may have made it
446    /// into the txns shard in the meantime).
447    ///
448    /// Callers must first wait for [`TxnsCache::update_ge`] with the same or
449    /// later timestamp to return. Panics otherwise.
450    pub(crate) fn filter_retractions<'a>(
451        &'a self,
452        expected_txns_upper: &T,
453        retractions: impl Iterator<Item = (&'a Vec<u8>, &'a ([u8; 8], ShardId))>,
454    ) -> impl Iterator<Item = (&'a Vec<u8>, &'a ([u8; 8], ShardId))> {
455        assert_eq!(self.only_data_id, None);
456        assert!(&self.progress_exclusive >= expected_txns_upper);
457        retractions.filter(|(batch_raw, _)| self.batch_idx.contains_key(*batch_raw))
458    }
459
460    /// Update contents with `entries` and mark this cache as progressed up to `progress`.
461    pub(crate) fn push_entries(&mut self, mut entries: Vec<(TxnsEntry, T, i64)>, progress: T) {
462        // Persist emits the times sorted by little endian encoding,
463        // which is not what we want. If we ever expose an interface for
464        // registering and committing to a data shard at the same
465        // timestamp, this will also have to sort registrations first.
466        consolidate_updates(&mut entries);
467        entries.sort_by(|(a, _, _), (b, _, _)| a.ts::<T>().cmp(&b.ts::<T>()));
468        for (e, t, d) in entries {
469            match e {
470                TxnsEntry::Register(data_id, ts) => {
471                    let ts = T::decode(ts);
472                    debug_assert!(ts <= t);
473                    self.push_register(data_id, ts, d, t);
474                }
475                TxnsEntry::Append(data_id, ts, batch) => {
476                    let ts = T::decode(ts);
477                    debug_assert!(ts <= t);
478                    self.push_append(data_id, batch, ts, d)
479                }
480            }
481        }
482        self.progress_exclusive = progress;
483        debug_assert_eq!(self.validate(), Ok(()));
484    }
485
486    fn push_register(&mut self, data_id: ShardId, ts: T, diff: i64, compacted_ts: T) {
487        self.assert_only_data_id(&data_id);
488        // Since we keep the original non-advanced timestamp around, retractions
489        // necessarily might be for times in the past, so `|| diff < 0`.
490        debug_assert!(ts >= self.progress_exclusive || diff < 0);
491        if let Some(only_data_id) = self.only_data_id.as_ref() {
492            if only_data_id != &data_id {
493                return;
494            }
495        }
496
497        // The shard has not compacted past the register/forget ts, so it may not have been applied.
498        if ts == compacted_ts {
499            self.unapplied_registers.push_back((data_id, ts.clone()));
500        }
501
502        if diff == 1 {
503            debug!(
504                "cache learned {:.9} registered t={:?}",
505                data_id.to_string(),
506                ts
507            );
508            let entry = self.datas.entry(data_id).or_default();
509            // Sanity check that if there is a registration, then we've closed
510            // it off.
511            if let Some(last_reg) = entry.registered.back() {
512                assert!(last_reg.forget_ts.is_some())
513            }
514            entry.registered.push_back(DataRegistered {
515                register_ts: ts,
516                forget_ts: None,
517            });
518        } else if diff == -1 {
519            debug!(
520                "cache learned {:.9} forgotten t={:?}",
521                data_id.to_string(),
522                ts
523            );
524            let active_reg = self
525                .datas
526                .get_mut(&data_id)
527                .and_then(|x| x.registered.back_mut())
528                .expect("data shard should be registered before forget");
529            assert_eq!(active_reg.forget_ts.replace(ts), None);
530        } else {
531            unreachable!("only +1/-1 diffs are used");
532        }
533        debug_assert_eq!(self.validate(), Ok(()));
534    }
535
536    fn push_append(&mut self, data_id: ShardId, batch: Vec<u8>, ts: T, diff: i64) {
537        self.assert_only_data_id(&data_id);
538        // Since we keep the original non-advanced timestamp around, retractions
539        // necessarily might be for times in the past, so `|| diff < 0`.
540        debug_assert!(ts >= self.progress_exclusive || diff < 0);
541        if let Some(only_data_id) = self.only_data_id.as_ref() {
542            if only_data_id != &data_id {
543                return;
544            }
545        }
546
547        if diff == 1 {
548            debug!(
549                "cache learned {:.9} committed t={:?} b={}",
550                data_id.to_string(),
551                ts,
552                batch.hashed(),
553            );
554            let idx = self.next_batch_id;
555            self.next_batch_id += 1;
556            let prev = self.batch_idx.insert(batch.clone(), idx);
557            assert_eq!(prev, None);
558            let prev = self
559                .unapplied_batches
560                .insert(idx, (data_id, batch, ts.clone()));
561            assert_eq!(prev, None);
562            let times = self.datas.get_mut(&data_id).expect("data is initialized");
563            // Sanity check that shard is registered.
564            assert_eq!(times.last_reg().forget_ts, None);
565
566            // Only add the timestamp if it's not already in the deque. We don't
567            // track all writes in this but track at what timestamps we have
568            // _any_ writes.
569            if times.writes.back() != Some(&ts) {
570                times.writes.push_back(ts);
571            }
572        } else if diff == -1 {
573            debug!(
574                "cache learned {:.9} applied t={:?} b={}",
575                data_id.to_string(),
576                ts,
577                batch.hashed(),
578            );
579            let idx = self
580                .batch_idx
581                .remove(&batch)
582                .expect("invariant violation: batch should exist");
583            let prev = self
584                .unapplied_batches
585                .remove(&idx)
586                .expect("invariant violation: batch index should exist");
587            debug_assert_eq!(data_id, prev.0);
588            debug_assert_eq!(batch, prev.1);
589            // Insertion timestamp should be less equal retraction timestamp.
590            debug_assert!(prev.2 <= ts);
591        } else {
592            unreachable!("only +1/-1 diffs are used");
593        }
594        self.compact_data_times(&data_id);
595        debug_assert_eq!(self.validate(), Ok(()));
596    }
597
598    /// Informs the cache that all registers and forgets less than ts have been
599    /// applied.
600    pub(crate) fn mark_register_applied(&mut self, ts: &T) {
601        self.unapplied_registers
602            .retain(|(_, register_ts)| ts < register_ts);
603        debug_assert_eq!(self.validate(), Ok(()));
604    }
605
606    /// Compact the internal representation for `data_id` by removing all data
607    /// that is not needed to maintain the following invariants:
608    ///
609    ///   - The latest write and registration for each shard are kept in
610    ///     `self.datas`.
611    ///   - All unapplied writes and registrations are kept in `self.datas`.
612    ///   - All writes in `self.datas` are contained by some registration in
613    ///     `self.datas`.
614    fn compact_data_times(&mut self, data_id: &ShardId) {
615        let Some(times) = self.datas.get_mut(data_id) else {
616            return;
617        };
618
619        debug!("cache compact {:.9} times={:?}", data_id.to_string(), times);
620
621        if let Some(unapplied_write_ts) = self
622            .unapplied_batches
623            .first_key_value()
624            .map(|(_, (_, _, ts))| ts)
625        {
626            debug!(
627                "cache compact {:.9} unapplied_write_ts={:?}",
628                data_id.to_string(),
629                unapplied_write_ts,
630            );
631            while let Some(write_ts) = times.writes.front() {
632                if times.writes.len() == 1 || write_ts >= unapplied_write_ts {
633                    break;
634                }
635                times.writes.pop_front();
636            }
637        } else {
638            times.writes.drain(..times.writes.len() - 1);
639        }
640        let unapplied_reg_ts = self.unapplied_registers.front().map(|(_, ts)| ts);
641        let min_write_ts = times.writes.front();
642        let min_reg_ts = [unapplied_reg_ts, min_write_ts].into_iter().flatten().min();
643        if let Some(min_reg_ts) = min_reg_ts {
644            debug!(
645                "cache compact {:.9} unapplied_reg_ts={:?} min_write_ts={:?} min_reg_ts={:?}",
646                data_id.to_string(),
647                unapplied_reg_ts,
648                min_write_ts,
649                min_reg_ts,
650            );
651            while let Some(reg) = times.registered.front() {
652                match &reg.forget_ts {
653                    Some(forget_ts) if forget_ts >= min_reg_ts => break,
654                    _ if times.registered.len() == 1 => break,
655                    _ => {
656                        assert!(
657                            reg.forget_ts.is_some(),
658                            "only the latest reg can have no forget ts"
659                        );
660                        times.registered.pop_front();
661                    }
662                }
663            }
664        } else {
665            times.registered.drain(..times.registered.len() - 1);
666        }
667
668        debug!(
669            "cache compact DONE {:.9} times={:?}",
670            data_id.to_string(),
671            times
672        );
673    }
674
675    pub(crate) fn update_gauges(&self, metrics: &Metrics) {
676        metrics
677            .data_shard_count
678            .set(u64::cast_from(self.datas.len()));
679        metrics
680            .batches
681            .unapplied_count
682            .set(u64::cast_from(self.unapplied_batches.len()));
683        let unapplied_batches_bytes = self
684            .unapplied_batches
685            .values()
686            .map(|(_, x, _)| x.len())
687            .sum::<usize>();
688        metrics
689            .batches
690            .unapplied_bytes
691            .set(u64::cast_from(unapplied_batches_bytes));
692        metrics
693            .batches
694            .unapplied_min_ts
695            .set(encode_ts_metric(&Antichain::from_elem(
696                self.min_unapplied_ts().clone(),
697            )));
698    }
699
700    fn assert_only_data_id(&self, data_id: &ShardId) {
701        if let Some(only_data_id) = self.only_data_id.as_ref() {
702            assert_eq!(data_id, only_data_id);
703        }
704    }
705
706    pub(crate) fn validate(&self) -> Result<(), String> {
707        // Unapplied batches are all indexed and sorted.
708        if self.batch_idx.len() != self.unapplied_batches.len() {
709            return Err(format!(
710                "expected index len {} to match what it's indexing {}",
711                self.batch_idx.len(),
712                self.unapplied_batches.len()
713            ));
714        }
715
716        let mut prev_batch_ts = T::minimum();
717        for (idx, (_, batch, ts)) in self.unapplied_batches.iter() {
718            if self.batch_idx.get(batch) != Some(idx) {
719                return Err(format!(
720                    "expected batch to be indexed at {} got {:?}",
721                    idx,
722                    self.batch_idx.get(batch)
723                ));
724            }
725            if ts < &prev_batch_ts {
726                return Err(format!(
727                    "unapplied batch timestamp {:?} out of order after {:?}",
728                    ts, prev_batch_ts
729                ));
730            }
731            prev_batch_ts = ts.clone();
732        }
733
734        // Unapplied registers are sorted.
735        let mut prev_register_ts = T::minimum();
736        for (_, ts) in self.unapplied_registers.iter() {
737            if ts < &prev_register_ts {
738                return Err(format!(
739                    "unapplied register timestamp {:?} out of order after {:?}",
740                    ts, prev_register_ts
741                ));
742            }
743            prev_register_ts = ts.clone();
744        }
745
746        let min_unapplied_ts = self.min_unapplied_ts_inner();
747
748        for (data_id, data_times) in self.datas.iter() {
749            let () = data_times.validate()?;
750
751            if let Some(ts) = data_times.writes.front() {
752                // Writes are compacted.
753                if min_unapplied_ts > ts && data_times.writes.len() > 1 {
754                    return Err(format!(
755                        "{:?} write ts {:?} not past min unapplied ts {:?}",
756                        data_id, ts, min_unapplied_ts
757                    ));
758                }
759            }
760
761            // datas contains all unapplied writes.
762            if let Some((_, (_, _, unapplied_ts))) = self
763                .unapplied_batches
764                .iter()
765                .find(|(_, (shard_id, _, _))| shard_id == data_id)
766            {
767                if let Some(write_ts) = data_times.writes.front() {
768                    if write_ts > unapplied_ts {
769                        return Err(format!(
770                            "{:?} min write ts {:?} past min unapplied batch ts {:?}",
771                            data_id, write_ts, unapplied_ts
772                        ));
773                    }
774                }
775            }
776
777            // datas contains all unapplied register/forgets.
778            if let Some((_, unapplied_ts)) = self
779                .unapplied_registers
780                .iter()
781                .find(|(shard_id, _)| shard_id == data_id)
782            {
783                let register_ts = &data_times.first_reg().register_ts;
784                if register_ts > unapplied_ts {
785                    return Err(format!(
786                        "{:?} min register ts {:?} past min unapplied register ts {:?}",
787                        data_id, register_ts, unapplied_ts
788                    ));
789                }
790            }
791        }
792
793        Ok(())
794    }
795}
796
797impl<T, C> TxnsCache<T, C>
798where
799    T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
800    C: TxnsCodec,
801{
802    /// Initialize the txn shard at `init_ts` and returns a [TxnsCache] reading
803    /// from that shard.
804    pub(crate) async fn init(
805        init_ts: T,
806        txns_read: ReadHandle<C::Key, C::Val, T, i64>,
807        txns_write: &mut WriteHandle<C::Key, C::Val, T, i64>,
808    ) -> Self {
809        let () = crate::empty_caa(|| "txns init", txns_write, init_ts.clone()).await;
810        let mut ret = Self::from_read(txns_read, None).await;
811        let _ = ret.update_gt(&init_ts).await;
812        ret
813    }
814
815    /// Returns a [TxnsCache] reading from the given txn shard.
816    ///
817    /// `txns_id` identifies which shard will be used as the txns WAL. MZ will
818    /// likely have one of these per env, used by all processes and the same
819    /// across restarts.
820    pub async fn open(
821        client: &PersistClient,
822        txns_id: ShardId,
823        only_data_id: Option<ShardId>,
824    ) -> Self {
825        let (txns_key_schema, txns_val_schema) = C::schemas();
826        let txns_read = client
827            .open_leased_reader(
828                txns_id,
829                Arc::new(txns_key_schema),
830                Arc::new(txns_val_schema),
831                Diagnostics {
832                    shard_name: "txns".to_owned(),
833                    handle_purpose: "read txns".to_owned(),
834                },
835                USE_CRITICAL_SINCE_TXN.get(client.dyncfgs()),
836            )
837            .await
838            .expect("txns schema shouldn't change");
839        Self::from_read(txns_read, only_data_id).await
840    }
841
842    async fn from_read(
843        txns_read: ReadHandle<C::Key, C::Val, T, i64>,
844        only_data_id: Option<ShardId>,
845    ) -> Self {
846        let (state, txns_subscribe) = TxnsCacheState::init::<C>(only_data_id, txns_read).await;
847        TxnsCache {
848            txns_subscribe,
849            buf: Vec::new(),
850            state,
851        }
852    }
853
854    /// Invariant: afterward, self.progress_exclusive will be > ts
855    ///
856    /// Returns the `progress_exclusive` of the cache after updating.
857    #[must_use]
858    #[instrument(level = "debug", fields(ts = ?ts))]
859    pub async fn update_gt(&mut self, ts: &T) -> &T {
860        let only_data_id = self.only_data_id.clone();
861        Self::update(
862            &mut self.state,
863            &mut self.txns_subscribe,
864            &mut self.buf,
865            only_data_id,
866            |progress_exclusive| progress_exclusive > ts,
867        )
868        .await;
869        debug_assert!(&self.progress_exclusive > ts);
870        debug_assert_eq!(self.validate(), Ok(()));
871        &self.progress_exclusive
872    }
873
874    /// Invariant: afterward, self.progress_exclusive will be >= ts
875    ///
876    /// Returns the `progress_exclusive` of the cache after updating.
877    #[must_use]
878    #[instrument(level = "debug", fields(ts = ?ts))]
879    pub async fn update_ge(&mut self, ts: &T) -> &T {
880        let only_data_id = self.only_data_id.clone();
881        Self::update(
882            &mut self.state,
883            &mut self.txns_subscribe,
884            &mut self.buf,
885            only_data_id,
886            |progress_exclusive| progress_exclusive >= ts,
887        )
888        .await;
889        debug_assert!(&self.progress_exclusive >= ts);
890        debug_assert_eq!(self.validate(), Ok(()));
891        &self.progress_exclusive
892    }
893
894    /// Listen to the txns shard for events until `done` returns true.
895    async fn update<F: Fn(&T) -> bool>(
896        state: &mut TxnsCacheState<T>,
897        txns_subscribe: &mut Subscribe<C::Key, C::Val, T, i64>,
898        buf: &mut Vec<(TxnsEntry, T, i64)>,
899        only_data_id: Option<ShardId>,
900        done: F,
901    ) {
902        while !done(&state.progress_exclusive) {
903            let events = txns_subscribe.next(None).await;
904            for event in events {
905                match event {
906                    ListenEvent::Progress(frontier) => {
907                        let progress = frontier
908                            .into_option()
909                            .expect("nothing should close the txns shard");
910                        state.push_entries(std::mem::take(buf), progress);
911                    }
912                    ListenEvent::Updates(parts) => {
913                        Self::fetch_parts(only_data_id, txns_subscribe, parts, buf).await;
914                    }
915                };
916            }
917        }
918        debug_assert_eq!(state.validate(), Ok(()));
919        debug!(
920            "cache correct before {:?} len={} least_ts={:?}",
921            state.progress_exclusive,
922            state.unapplied_batches.len(),
923            state
924                .unapplied_batches
925                .first_key_value()
926                .map(|(_, (_, _, ts))| ts),
927        );
928    }
929
930    pub(crate) async fn fetch_parts(
931        only_data_id: Option<ShardId>,
932        txns_subscribe: &mut Subscribe<C::Key, C::Val, T, i64>,
933        parts: Vec<LeasedBatchPart<T>>,
934        updates: &mut Vec<(TxnsEntry, T, i64)>,
935    ) {
936        // We filter out unrelated data in two passes. The first is
937        // `should_fetch_part`, which allows us to skip entire fetches
938        // from s3/Blob. Then, if a part does need to be fetched, it
939        // still might contain info about unrelated data shards, and we
940        // filter those out before buffering in `updates`.
941        for part in parts {
942            let should_fetch_part = Self::should_fetch_part(only_data_id.as_ref(), &part);
943            debug!(
944                "should_fetch_part={} for {:?} {:?}",
945                should_fetch_part,
946                only_data_id,
947                part.stats()
948            );
949            if !should_fetch_part {
950                drop(part);
951                continue;
952            }
953            let part_updates = txns_subscribe.fetch_batch_part(part).await;
954            let part_updates = part_updates.map(|((k, v), t, d)| {
955                let (k, v) = (k.expect("valid key"), v.expect("valid val"));
956                (C::decode(k, v), t, d)
957            });
958            if let Some(only_data_id) = only_data_id.as_ref() {
959                updates.extend(part_updates.filter(|(x, _, _)| x.data_id() == only_data_id));
960            } else {
961                updates.extend(part_updates);
962            }
963        }
964    }
965
966    fn should_fetch_part(only_data_id: Option<&ShardId>, part: &LeasedBatchPart<T>) -> bool {
967        let Some(only_data_id) = only_data_id else {
968            return true;
969        };
970        // This `part.stats()` call involves decoding and the only_data_id=None
971        // case is common-ish, so make sure to keep it after that early return.
972        let Some(stats) = part.stats() else {
973            return true;
974        };
975        C::should_fetch_part(only_data_id, &stats).unwrap_or(true)
976    }
977}
978
979impl<T, C: TxnsCodec> Deref for TxnsCache<T, C> {
980    type Target = TxnsCacheState<T>;
981    fn deref(&self) -> &Self::Target {
982        &self.state
983    }
984}
985
986impl<T, C: TxnsCodec> DerefMut for TxnsCache<T, C> {
987    fn deref_mut(&mut self) -> &mut Self::Target {
988        &mut self.state
989    }
990}
991
992#[derive(Debug)]
993pub(crate) struct DataTimes<T> {
994    /// The times at which the data shard was in the txns set.
995    ///
996    /// Invariants:
997    ///
998    /// - At least one registration (otherwise we filter this out of the cache).
999    /// - These are in increasing order.
1000    /// - These are non-overlapping intervals.
1001    /// - Everything in writes is in one of these intervals.
1002    pub(crate) registered: VecDeque<DataRegistered<T>>,
1003    /// Invariant: These are in increasing order.
1004    pub(crate) writes: VecDeque<T>,
1005}
1006
1007impl<T> Default for DataTimes<T> {
1008    fn default() -> Self {
1009        Self {
1010            registered: Default::default(),
1011            writes: Default::default(),
1012        }
1013    }
1014}
1015
1016#[derive(Debug)]
1017pub(crate) struct DataRegistered<T> {
1018    /// The inclusive time at which the data shard was added to the txns set.
1019    ///
1020    /// If this time has been advanced by compaction, writes might be at times
1021    /// equal to it.
1022    pub(crate) register_ts: T,
1023    /// The inclusive time at which the data shard was removed from the txns
1024    /// set, or None if it hasn't yet been removed.
1025    pub(crate) forget_ts: Option<T>,
1026}
1027
1028impl<T: Timestamp + TotalOrder> DataRegistered<T> {
1029    pub(crate) fn contains(&self, ts: &T) -> bool {
1030        &self.register_ts <= ts && self.forget_ts.as_ref().map_or(true, |x| ts <= x)
1031    }
1032}
1033
1034impl<T: Timestamp + TotalOrder> DataTimes<T> {
1035    pub(crate) fn last_reg(&self) -> &DataRegistered<T> {
1036        self.registered.back().expect("at least one registration")
1037    }
1038
1039    fn first_reg(&self) -> &DataRegistered<T> {
1040        self.registered.front().expect("at least one registration")
1041    }
1042
1043    /// Returns the latest known physical upper of a data shard.
1044    fn latest_physical_ts(&self) -> &T {
1045        let last_reg = self.last_reg();
1046        let mut physical_ts = &last_reg.register_ts;
1047        if let Some(forget_ts) = &last_reg.forget_ts {
1048            physical_ts = max(physical_ts, forget_ts);
1049        }
1050        if let Some(latest_write) = self.writes.back() {
1051            physical_ts = max(physical_ts, latest_write);
1052        }
1053        physical_ts
1054    }
1055
1056    pub(crate) fn validate(&self) -> Result<(), String> {
1057        // Writes are sorted.
1058        let mut prev_ts = T::minimum();
1059        for ts in self.writes.iter() {
1060            if ts < &prev_ts {
1061                return Err(format!(
1062                    "write ts {:?} out of order after {:?}",
1063                    ts, prev_ts
1064                ));
1065            }
1066            prev_ts = ts.clone();
1067        }
1068
1069        // Registered is sorted and non-overlapping.
1070        let mut prev_ts = T::minimum();
1071        let mut writes_idx = 0;
1072        for x in self.registered.iter() {
1073            if x.register_ts < prev_ts {
1074                return Err(format!(
1075                    "register ts {:?} out of order after {:?}",
1076                    x.register_ts, prev_ts
1077                ));
1078            }
1079            if let Some(forget_ts) = x.forget_ts.as_ref() {
1080                if !(&x.register_ts <= forget_ts) {
1081                    return Err(format!(
1082                        "register ts {:?} not less_equal forget ts {:?}",
1083                        x.register_ts, forget_ts
1084                    ));
1085                }
1086                prev_ts.clone_from(forget_ts);
1087            }
1088            // Also peel off any writes in this interval.
1089            while let Some(write_ts) = self.writes.get(writes_idx) {
1090                if write_ts < &x.register_ts {
1091                    return Err(format!(
1092                        "write ts {:?} not in any register interval {:?}",
1093                        write_ts, self.registered
1094                    ));
1095                }
1096                if let Some(forget_ts) = x.forget_ts.as_ref() {
1097                    if write_ts <= forget_ts {
1098                        writes_idx += 1;
1099                        continue;
1100                    }
1101                }
1102                break;
1103            }
1104        }
1105
1106        // Check for writes after the last interval.
1107        let Some(reg_back) = self.registered.back() else {
1108            return Err("registered was empty".into());
1109        };
1110        if writes_idx != self.writes.len() && reg_back.forget_ts.is_some() {
1111            return Err(format!(
1112                "write ts {:?} not in any register interval {:?}",
1113                self.writes, self.registered
1114            ));
1115        }
1116
1117        Ok(())
1118    }
1119}
1120
1121#[derive(Debug)]
1122pub(crate) enum Unapplied<'a> {
1123    RegisterForget,
1124    Batch(Vec<&'a Vec<u8>>),
1125}
1126
1127#[cfg(test)]
1128mod tests {
1129    use DataListenNext::*;
1130    use mz_ore::assert_err;
1131    use mz_persist_client::PersistClient;
1132    use mz_persist_types::codec_impls::{ShardIdSchema, VecU8Schema};
1133
1134    use crate::operator::DataSubscribe;
1135    use crate::tests::reader;
1136    use crate::txns::TxnsHandle;
1137
1138    use super::*;
1139
1140    impl TxnsCache<u64, TxnsCodecDefault> {
1141        pub(crate) async fn expect_open(
1142            init_ts: u64,
1143            txns: &TxnsHandle<String, (), u64, i64>,
1144        ) -> Self {
1145            let mut ret = TxnsCache::open(&txns.datas.client, txns.txns_id(), None).await;
1146            let _ = ret.update_gt(&init_ts).await;
1147            ret
1148        }
1149
1150        pub(crate) async fn expect_snapshot(
1151            &mut self,
1152            client: &PersistClient,
1153            data_id: ShardId,
1154            as_of: u64,
1155        ) -> Vec<String> {
1156            let mut data_read = reader(client, data_id).await;
1157            let _ = self.update_gt(&as_of).await;
1158            let mut snapshot = self
1159                .data_snapshot(data_read.shard_id(), as_of)
1160                .snapshot_and_fetch(&mut data_read)
1161                .await
1162                .unwrap();
1163            snapshot.sort();
1164            snapshot
1165                .into_iter()
1166                .flat_map(|((k, v), _t, d)| {
1167                    let (k, ()) = (k.unwrap(), v.unwrap());
1168                    std::iter::repeat(k).take(usize::try_from(d).unwrap())
1169                })
1170                .collect()
1171        }
1172
1173        pub(crate) fn expect_subscribe(
1174            &self,
1175            client: &PersistClient,
1176            data_id: ShardId,
1177            as_of: u64,
1178        ) -> DataSubscribe {
1179            DataSubscribe::new(
1180                "test",
1181                client.clone(),
1182                self.txns_id,
1183                data_id,
1184                as_of,
1185                Antichain::new(),
1186                true,
1187            )
1188        }
1189    }
1190
1191    #[mz_ore::test]
1192    fn txns_cache_data_snapshot_and_listen_next() {
1193        let d0 = ShardId::new();
1194        let ds = |latest_write: Option<u64>, as_of: u64, empty_to: u64| -> DataSnapshot<u64> {
1195            DataSnapshot {
1196                data_id: d0,
1197                latest_write,
1198                as_of,
1199                empty_to,
1200            }
1201        };
1202        #[track_caller]
1203        fn testcase(
1204            cache: &mut TxnsCacheState<u64>,
1205            ts: u64,
1206            data_id: ShardId,
1207            snap_expected: DataSnapshot<u64>,
1208            listen_expected: DataListenNext<u64>,
1209        ) {
1210            cache.progress_exclusive = ts + 1;
1211            assert_eq!(cache.data_snapshot(data_id, ts), snap_expected);
1212            assert_eq!(cache.data_listen_next(&data_id, &ts), listen_expected);
1213            assert_eq!(
1214                cache.data_listen_next(&data_id, &(ts + 1)),
1215                WaitForTxnsProgress
1216            );
1217        }
1218
1219        // This attempts to exercise all the various interesting edge cases of
1220        // data_snapshot and data_listen_subscribe using the following sequence
1221        // of events:
1222        //
1223        // - Registrations at: [2,8], [15,16]
1224        // - Direct writes at: 1, 13
1225        // - Writes via txns at: 4, 5, 7
1226        let mut c = TxnsCacheState::new(ShardId::new(), 0, None);
1227
1228        // empty
1229        assert_eq!(c.progress_exclusive, 0);
1230        #[allow(clippy::disallowed_methods)] // not using enhanced panic handler in tests
1231        let result = std::panic::catch_unwind(|| c.data_snapshot(d0, 0));
1232        assert_err!(result);
1233        assert_eq!(c.data_listen_next(&d0, &0), WaitForTxnsProgress);
1234
1235        // ts 0 (never registered)
1236        testcase(&mut c, 0, d0, ds(None, 0, 1), ReadDataTo(1));
1237
1238        // ts 1 (direct write)
1239        // - The cache knows everything < 2.
1240        // - d0 is not registered in the cache.
1241        // - We know the shard can't be written to via txn < 2.
1242        // - So go read the shard normally up to 2.
1243        testcase(&mut c, 1, d0, ds(None, 1, 2), ReadDataTo(2));
1244
1245        // ts 2 (register)
1246        c.push_register(d0, 2, 1, 2);
1247        testcase(&mut c, 2, d0, ds(None, 2, 3), ReadDataTo(3));
1248
1249        // ts 3 (registered, not written)
1250        testcase(&mut c, 3, d0, ds(None, 3, 4), EmitLogicalProgress(4));
1251
1252        // ts 4 (written via txns)
1253        c.push_append(d0, vec![4], 4, 1);
1254        testcase(&mut c, 4, d0, ds(Some(4), 4, 5), ReadDataTo(5));
1255
1256        // ts 5 (written via txns, write at preceding ts)
1257        c.push_append(d0, vec![5], 5, 1);
1258        testcase(&mut c, 5, d0, ds(Some(5), 5, 6), ReadDataTo(6));
1259
1260        // ts 6 (registered, not written, write at preceding ts)
1261        testcase(&mut c, 6, d0, ds(Some(5), 6, 7), EmitLogicalProgress(7));
1262
1263        // ts 7 (written via txns, write at non-preceding ts)
1264        c.push_append(d0, vec![7], 7, 1);
1265        testcase(&mut c, 7, d0, ds(Some(7), 7, 8), ReadDataTo(8));
1266
1267        // ts 8 (apply and tidy write from ts 4)
1268        c.push_append(d0, vec![4], 8, -1);
1269        testcase(&mut c, 8, d0, ds(Some(7), 8, 9), EmitLogicalProgress(9));
1270
1271        // ts 9 (apply and tidy write from ts 5)
1272        c.push_append(d0, vec![5], 9, -1);
1273        testcase(&mut c, 9, d0, ds(Some(7), 9, 10), EmitLogicalProgress(10));
1274
1275        // ts 10 (apply and tidy write from ts 7)
1276        c.push_append(d0, vec![7], 10, -1);
1277        testcase(&mut c, 10, d0, ds(None, 10, 11), EmitLogicalProgress(11));
1278
1279        // ts 11 (forget)
1280        // Revisit when
1281        // https://github.com/MaterializeInc/database-issues/issues/7746 is fixed,
1282        // it's unclear how to encode the register timestamp in a forget.
1283        c.push_register(d0, 11, -1, 11);
1284        testcase(&mut c, 11, d0, ds(None, 11, 12), ReadDataTo(12));
1285
1286        // ts 12 (not registered, not written). This ReadDataTo would block until
1287        // the write happens at ts 13.
1288        testcase(&mut c, 12, d0, ds(None, 12, 13), ReadDataTo(13));
1289
1290        // ts 13 (written directly)
1291        testcase(&mut c, 13, d0, ds(None, 13, 14), ReadDataTo(14));
1292
1293        // ts 14 (not registered, not written) This ReadDataTo would block until
1294        // the register happens at 15.
1295        testcase(&mut c, 14, d0, ds(None, 14, 15), ReadDataTo(15));
1296
1297        // ts 15 (registered, previously forgotten)
1298        c.push_register(d0, 15, 1, 15);
1299        testcase(&mut c, 15, d0, ds(None, 15, 16), ReadDataTo(16));
1300
1301        // ts 16 (forgotten, registered at preceding ts)
1302        // Revisit when
1303        // https://github.com/MaterializeInc/database-issues/issues/7746 is fixed,
1304        // it's unclear how to encode the register timestamp in a forget.
1305        c.push_register(d0, 16, -1, 16);
1306        testcase(&mut c, 16, d0, ds(None, 16, 17), ReadDataTo(17));
1307
1308        // Now that we have more history, some of the old answers change! In
1309        // particular, we have more information on unapplied writes, empty
1310        // times, and can ReadDataTo much later times.
1311
1312        assert_eq!(c.data_snapshot(d0, 0), ds(None, 0, 1));
1313        assert_eq!(c.data_snapshot(d0, 1), ds(None, 1, 2));
1314        assert_eq!(c.data_snapshot(d0, 2), ds(None, 2, 3));
1315        assert_eq!(c.data_snapshot(d0, 3), ds(None, 3, 4));
1316        assert_eq!(c.data_snapshot(d0, 4), ds(None, 4, 5));
1317        assert_eq!(c.data_snapshot(d0, 5), ds(None, 5, 6));
1318        assert_eq!(c.data_snapshot(d0, 6), ds(None, 6, 7));
1319        assert_eq!(c.data_snapshot(d0, 7), ds(None, 7, 8));
1320        assert_eq!(c.data_snapshot(d0, 8), ds(None, 8, 9));
1321        assert_eq!(c.data_snapshot(d0, 9), ds(None, 9, 10));
1322        assert_eq!(c.data_snapshot(d0, 10), ds(None, 10, 11));
1323        assert_eq!(c.data_snapshot(d0, 11), ds(None, 11, 12));
1324        assert_eq!(c.data_snapshot(d0, 12), ds(None, 12, 13));
1325        assert_eq!(c.data_snapshot(d0, 13), ds(None, 13, 14));
1326        assert_eq!(c.data_snapshot(d0, 14), ds(None, 14, 15));
1327        assert_eq!(c.data_snapshot(d0, 15), ds(None, 15, 16));
1328        assert_eq!(c.data_snapshot(d0, 16), ds(None, 16, 17));
1329
1330        assert_eq!(c.data_listen_next(&d0, &0), ReadDataTo(17));
1331        assert_eq!(c.data_listen_next(&d0, &1), ReadDataTo(17));
1332        assert_eq!(c.data_listen_next(&d0, &2), ReadDataTo(17));
1333        assert_eq!(c.data_listen_next(&d0, &3), ReadDataTo(17));
1334        assert_eq!(c.data_listen_next(&d0, &4), ReadDataTo(17));
1335        assert_eq!(c.data_listen_next(&d0, &5), ReadDataTo(17));
1336        assert_eq!(c.data_listen_next(&d0, &6), ReadDataTo(17));
1337        assert_eq!(c.data_listen_next(&d0, &7), ReadDataTo(17));
1338        assert_eq!(c.data_listen_next(&d0, &8), ReadDataTo(17));
1339        assert_eq!(c.data_listen_next(&d0, &9), ReadDataTo(17));
1340        assert_eq!(c.data_listen_next(&d0, &10), ReadDataTo(17));
1341        assert_eq!(c.data_listen_next(&d0, &11), ReadDataTo(17));
1342        assert_eq!(c.data_listen_next(&d0, &12), ReadDataTo(17));
1343        assert_eq!(c.data_listen_next(&d0, &13), ReadDataTo(17));
1344        assert_eq!(c.data_listen_next(&d0, &14), ReadDataTo(17));
1345        assert_eq!(c.data_listen_next(&d0, &15), ReadDataTo(17));
1346        assert_eq!(c.data_listen_next(&d0, &16), ReadDataTo(17));
1347        assert_eq!(c.data_listen_next(&d0, &17), WaitForTxnsProgress);
1348    }
1349
1350    #[mz_ore::test(tokio::test)]
1351    #[cfg_attr(miri, ignore)] // too slow
1352    async fn empty_to() {
1353        let client = PersistClient::new_for_tests().await;
1354        let mut txns = TxnsHandle::expect_open(client.clone()).await;
1355        let d0 = txns.expect_register(1).await;
1356
1357        // During code review, we discussed an alternate implementation of
1358        // empty_to that was an Option: None when we knew about a write > the
1359        // as_of, and Some when we didn't. The None case would mean that we
1360        // don't need to CaA empty updates in. This is quite appealing, but
1361        // would cause an issue with the guarantee that `apply_le(as_of)` is
1362        // sufficient to unblock a read. Specifically:
1363        //
1364        // - Write at 3, but don't apply.
1365        // - Write at 5, but don't apply.
1366        // - Catch the cache up past the write at 5.
1367        // - Run apply_le(4) to unblock a read a 4.
1368        // - Run a snapshot at 4.
1369        // - If nothing else applies the write at 5, the snapshot would
1370        //   deadlock.
1371        for ts in [3, 5] {
1372            let mut txn = txns.begin();
1373            txn.write(&d0, "3".into(), (), 1).await;
1374            let _apply = txn.commit_at(&mut txns, ts).await.unwrap();
1375        }
1376        let _ = txns.txns_cache.update_gt(&5).await;
1377        txns.apply_le(&4).await;
1378        let snap = txns.txns_cache.data_snapshot(d0, 4);
1379        let mut data_read = reader(&client, d0).await;
1380        // This shouldn't deadlock.
1381        let contents = snap.snapshot_and_fetch(&mut data_read).await.unwrap();
1382        assert_eq!(contents.len(), 1);
1383
1384        // Sanity check that the scenario played out like we said above.
1385        assert_eq!(snap.empty_to, 5);
1386    }
1387
1388    #[mz_ore::test]
1389    fn data_times_validate() {
1390        fn dt(register_forget_ts: &[u64], write_ts: &[u64]) -> Result<(), ()> {
1391            let mut dt = DataTimes::default();
1392            for x in register_forget_ts {
1393                if let Some(back) = dt.registered.back_mut() {
1394                    if back.forget_ts == None {
1395                        back.forget_ts = Some(*x);
1396                        continue;
1397                    }
1398                }
1399                dt.registered.push_back(DataRegistered {
1400                    register_ts: *x,
1401                    forget_ts: None,
1402                })
1403            }
1404            dt.writes = write_ts.into_iter().cloned().collect();
1405            dt.validate().map_err(|_| ())
1406        }
1407
1408        // Valid
1409        assert_eq!(dt(&[1], &[2, 3]), Ok(()));
1410        assert_eq!(dt(&[1, 3], &[2]), Ok(()));
1411        assert_eq!(dt(&[1, 3, 5], &[2, 6, 7]), Ok(()));
1412        assert_eq!(dt(&[1, 3, 5], &[2, 6, 7]), Ok(()));
1413        assert_eq!(dt(&[1, 1], &[1]), Ok(()));
1414
1415        // Invalid
1416        assert_eq!(dt(&[], &[]), Err(()));
1417        assert_eq!(dt(&[1], &[0]), Err(()));
1418        assert_eq!(dt(&[1, 3], &[4]), Err(()));
1419        assert_eq!(dt(&[1, 3, 5], &[4]), Err(()));
1420        assert_eq!(dt(&[1, 4], &[3, 2]), Err(()));
1421    }
1422
1423    /// Regression test for a bug caught by higher level tests in CI:
1424    /// - Commit a write at 5
1425    /// - Apply it and commit the tidy retraction at 20.
1426    /// - Catch up to both of these in the TxnsHandle and call compact_to(10).
1427    ///   The TxnsHandle knows the write has been applied and lets it CaDS the
1428    ///   txns shard since to 10.
1429    /// - Open a TxnsCache starting at the txns shard since (10) to serve a
1430    ///   snapshot at 12. Catch it up through 12, but _not_ the tidy at 20.
1431    /// - This TxnsCache gets the write with a ts compacted forward to 10, but
1432    ///   no retraction. The snapshot resolves with an incorrect latest_write of
1433    ///   `Some(10)`.
1434    /// - The unblock read waits for this write to be applied before doing the
1435    ///   empty CaA, but this write never existed so it hangs forever.
1436    #[mz_ore::test(tokio::test)]
1437    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1438    async fn regression_compact_latest_write() {
1439        let client = PersistClient::new_for_tests().await;
1440        let mut txns = TxnsHandle::expect_open(client.clone()).await;
1441        let log = txns.new_log();
1442        let d0 = txns.expect_register(1).await;
1443
1444        let tidy_5 = txns.expect_commit_at(5, d0, &["5"], &log).await;
1445        let _ = txns.expect_commit_at(15, d0, &["15"], &log).await;
1446        txns.tidy_at(20, tidy_5).await.unwrap();
1447        let _ = txns.txns_cache.update_gt(&20).await;
1448        assert_eq!(txns.txns_cache.min_unapplied_ts(), &15);
1449        txns.compact_to(10).await;
1450
1451        let mut txns_read = client
1452            .open_leased_reader(
1453                txns.txns_id(),
1454                Arc::new(ShardIdSchema),
1455                Arc::new(VecU8Schema),
1456                Diagnostics::for_tests(),
1457                true,
1458            )
1459            .await
1460            .expect("txns schema shouldn't change");
1461        txns_read.downgrade_since(&Antichain::from_elem(10)).await;
1462        let mut cache = TxnsCache::<_, TxnsCodecDefault>::from_read(txns_read, None).await;
1463        let _ = cache.update_gt(&15).await;
1464        let snap = cache.data_snapshot(d0, 12);
1465        assert_eq!(snap.latest_write, Some(5));
1466    }
1467
1468    // Regression test for a bug where we were sorting TxnEvents by the
1469    // compacted timestamp instead of the original one when applying them to a
1470    // cache. This caused them to be applied in a surprising order (e.g. forget
1471    // before register).
1472    #[mz_ore::test(tokio::test)]
1473    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1474    async fn regression_ts_sort() {
1475        let client = PersistClient::new_for_tests().await;
1476        let txns = TxnsHandle::expect_open(client.clone()).await;
1477        let mut cache = TxnsCache::expect_open(0, &txns).await;
1478        let d0 = ShardId::new();
1479
1480        // With the bug, this panics via an internal sanity assertion.
1481        cache.push_entries(
1482            vec![
1483                (TxnsEntry::Register(d0, u64::encode(&2)), 2, -1),
1484                (TxnsEntry::Register(d0, u64::encode(&1)), 2, 1),
1485            ],
1486            3,
1487        );
1488    }
1489
1490    /// Tests that `data_snapshot` and `data_listen_next` properly handle an
1491    /// `init_ts` > 0.
1492    #[mz_ore::test(tokio::test)]
1493    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1494    async fn data_compacted() {
1495        let d0 = ShardId::new();
1496        let mut c = TxnsCacheState::new(ShardId::new(), 10, None);
1497        c.progress_exclusive = 20;
1498
1499        #[allow(clippy::disallowed_methods)] // not using enhanced panic handler in tests
1500        let result = std::panic::catch_unwind(|| c.data_listen_next(&d0, &0));
1501        assert_err!(result);
1502
1503        let ds = c.data_snapshot(d0, 0);
1504        assert_eq!(
1505            ds,
1506            DataSnapshot {
1507                data_id: d0,
1508                latest_write: None,
1509                as_of: 0,
1510                empty_to: 10,
1511            }
1512        );
1513    }
1514}