mz_persist_client/
read.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Read capabilities and handles
11
12use async_stream::stream;
13use std::backtrace::Backtrace;
14use std::collections::BTreeMap;
15use std::fmt::Debug;
16use std::future::Future;
17use std::sync::Arc;
18use std::time::Duration;
19
20use differential_dataflow::consolidation::consolidate_updates;
21use differential_dataflow::difference::Semigroup;
22use differential_dataflow::lattice::Lattice;
23use differential_dataflow::trace::Description;
24use futures::Stream;
25use futures_util::{StreamExt, stream};
26use mz_dyncfg::Config;
27use mz_ore::instrument;
28use mz_ore::now::EpochMillis;
29use mz_ore::task::{AbortOnDropHandle, JoinHandle, RuntimeExt};
30use mz_persist::location::{Blob, SeqNo};
31use mz_persist_types::columnar::{ColumnDecoder, Schema};
32use mz_persist_types::{Codec, Codec64};
33use proptest_derive::Arbitrary;
34use serde::{Deserialize, Serialize};
35use timely::PartialOrder;
36use timely::progress::{Antichain, Timestamp};
37use tokio::runtime::Handle;
38use tracing::{Instrument, debug_span, warn};
39use uuid::Uuid;
40
41use crate::batch::BLOB_TARGET_SIZE;
42use crate::cfg::{COMPACTION_MEMORY_BOUND_BYTES, RetryParameters};
43use crate::fetch::{FetchBatchFilter, FetchedPart, Lease, LeasedBatchPart, fetch_leased_part};
44use crate::internal::encoding::Schemas;
45use crate::internal::machine::{ExpireFn, Machine};
46use crate::internal::metrics::Metrics;
47use crate::internal::state::{BatchPart, HollowBatch};
48use crate::internal::watch::StateWatch;
49use crate::iter::{Consolidator, StructuredSort};
50use crate::schema::SchemaCache;
51use crate::stats::{SnapshotPartStats, SnapshotPartsStats, SnapshotStats};
52use crate::{GarbageCollector, PersistConfig, ShardId, parse_id};
53
54pub use crate::internal::encoding::LazyPartStats;
55pub use crate::internal::state::Since;
56
57/// An opaque identifier for a reader of a persist durable TVC (aka shard).
58#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
59#[serde(try_from = "String", into = "String")]
60pub struct LeasedReaderId(pub(crate) [u8; 16]);
61
62impl std::fmt::Display for LeasedReaderId {
63    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64        write!(f, "r{}", Uuid::from_bytes(self.0))
65    }
66}
67
68impl std::fmt::Debug for LeasedReaderId {
69    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70        write!(f, "LeasedReaderId({})", Uuid::from_bytes(self.0))
71    }
72}
73
74impl std::str::FromStr for LeasedReaderId {
75    type Err = String;
76
77    fn from_str(s: &str) -> Result<Self, Self::Err> {
78        parse_id('r', "LeasedReaderId", s).map(LeasedReaderId)
79    }
80}
81
82impl From<LeasedReaderId> for String {
83    fn from(reader_id: LeasedReaderId) -> Self {
84        reader_id.to_string()
85    }
86}
87
88impl TryFrom<String> for LeasedReaderId {
89    type Error = String;
90
91    fn try_from(s: String) -> Result<Self, Self::Error> {
92        s.parse()
93    }
94}
95
96impl LeasedReaderId {
97    pub(crate) fn new() -> Self {
98        LeasedReaderId(*Uuid::new_v4().as_bytes())
99    }
100}
101
102/// Capable of generating a snapshot of all data at `as_of`, followed by a
103/// listen of all updates.
104///
105/// For more details, see [`ReadHandle::snapshot`] and [`Listen`].
106#[derive(Debug)]
107pub struct Subscribe<K: Codec, V: Codec, T, D> {
108    snapshot: Option<Vec<LeasedBatchPart<T>>>,
109    listen: Listen<K, V, T, D>,
110}
111
112impl<K, V, T, D> Subscribe<K, V, T, D>
113where
114    K: Debug + Codec,
115    V: Debug + Codec,
116    T: Timestamp + Lattice + Codec64 + Sync,
117    D: Semigroup + Codec64 + Send + Sync,
118{
119    fn new(snapshot_parts: Vec<LeasedBatchPart<T>>, listen: Listen<K, V, T, D>) -> Self {
120        Subscribe {
121            snapshot: Some(snapshot_parts),
122            listen,
123        }
124    }
125
126    /// Returns a `LeasedBatchPart` enriched with the proper metadata.
127    ///
128    /// First returns snapshot parts, until they're exhausted, at which point
129    /// begins returning listen parts.
130    ///
131    /// The returned `Antichain` represents the subscription progress as it will
132    /// be _after_ the returned parts are fetched.
133    #[instrument(level = "debug", fields(shard = %self.listen.handle.machine.shard_id()))]
134    pub async fn next(
135        &mut self,
136        // If Some, an override for the default listen sleep retry parameters.
137        listen_retry: Option<RetryParameters>,
138    ) -> Vec<ListenEvent<T, LeasedBatchPart<T>>> {
139        match self.snapshot.take() {
140            Some(parts) => vec![ListenEvent::Updates(parts)],
141            None => {
142                let (parts, upper) = self.listen.next(listen_retry).await;
143                vec![ListenEvent::Updates(parts), ListenEvent::Progress(upper)]
144            }
145        }
146    }
147}
148
149impl<K, V, T, D> Subscribe<K, V, T, D>
150where
151    K: Debug + Codec,
152    V: Debug + Codec,
153    T: Timestamp + Lattice + Codec64 + Sync,
154    D: Semigroup + Codec64 + Send + Sync,
155{
156    /// Equivalent to `next`, but rather than returning a [`LeasedBatchPart`],
157    /// fetches and returns the data from within it.
158    #[instrument(level = "debug", fields(shard = %self.listen.handle.machine.shard_id()))]
159    pub async fn fetch_next(
160        &mut self,
161    ) -> Vec<ListenEvent<T, ((Result<K, String>, Result<V, String>), T, D)>> {
162        let events = self.next(None).await;
163        let new_len = events
164            .iter()
165            .map(|event| match event {
166                ListenEvent::Updates(parts) => parts.len(),
167                ListenEvent::Progress(_) => 1,
168            })
169            .sum();
170        let mut ret = Vec::with_capacity(new_len);
171        for event in events {
172            match event {
173                ListenEvent::Updates(parts) => {
174                    for part in parts {
175                        let fetched_part = self.listen.fetch_batch_part(part).await;
176                        let updates = fetched_part.collect::<Vec<_>>();
177                        if !updates.is_empty() {
178                            ret.push(ListenEvent::Updates(updates));
179                        }
180                    }
181                }
182                ListenEvent::Progress(progress) => ret.push(ListenEvent::Progress(progress)),
183            }
184        }
185        ret
186    }
187
188    /// Fetches the contents of `part` and returns its lease.
189    pub async fn fetch_batch_part(&mut self, part: LeasedBatchPart<T>) -> FetchedPart<K, V, T, D> {
190        self.listen.fetch_batch_part(part).await
191    }
192}
193
194impl<K, V, T, D> Subscribe<K, V, T, D>
195where
196    K: Debug + Codec,
197    V: Debug + Codec,
198    T: Timestamp + Lattice + Codec64 + Sync,
199    D: Semigroup + Codec64 + Send + Sync,
200{
201    /// Politely expires this subscribe, releasing its lease.
202    ///
203    /// There is a best-effort impl in Drop for [`ReadHandle`] to expire the
204    /// [`ReadHandle`] held by the subscribe that wasn't explicitly expired
205    /// with this method. When possible, explicit expiry is still preferred
206    /// because the Drop one is best effort and is dependant on a tokio
207    /// [Handle] being available in the TLC at the time of drop (which is a bit
208    /// subtle). Also, explicit expiry allows for control over when it happens.
209    pub async fn expire(mut self) {
210        let _ = self.snapshot.take(); // Drop all leased parts.
211        self.listen.expire().await;
212    }
213}
214
215/// Data and progress events of a shard subscription.
216///
217/// TODO: Unify this with [timely::dataflow::operators::capture::event::Event].
218#[derive(Debug, PartialEq)]
219pub enum ListenEvent<T, D> {
220    /// Progress of the shard.
221    Progress(Antichain<T>),
222    /// Data of the shard.
223    Updates(Vec<D>),
224}
225
226/// An ongoing subscription of updates to a shard.
227#[derive(Debug)]
228pub struct Listen<K: Codec, V: Codec, T, D> {
229    handle: ReadHandle<K, V, T, D>,
230    watch: StateWatch<K, V, T, D>,
231
232    as_of: Antichain<T>,
233    since: Antichain<T>,
234    frontier: Antichain<T>,
235}
236
237impl<K, V, T, D> Listen<K, V, T, D>
238where
239    K: Debug + Codec,
240    V: Debug + Codec,
241    T: Timestamp + Lattice + Codec64 + Sync,
242    D: Semigroup + Codec64 + Send + Sync,
243{
244    async fn new(mut handle: ReadHandle<K, V, T, D>, as_of: Antichain<T>) -> Self {
245        let since = as_of.clone();
246        // This listen only needs to distinguish things after its frontier
247        // (initially as_of although the frontier is inclusive and the as_of
248        // isn't). Be a good citizen and downgrade early.
249        handle.downgrade_since(&since).await;
250
251        let watch = handle.machine.applier.watch();
252        Listen {
253            handle,
254            watch,
255            since,
256            frontier: as_of.clone(),
257            as_of,
258        }
259    }
260
261    /// An exclusive upper bound on the progress of this Listen.
262    pub fn frontier(&self) -> &Antichain<T> {
263        &self.frontier
264    }
265
266    /// Attempt to pull out the next values of this subscription.
267    ///
268    /// The returned [`LeasedBatchPart`] is appropriate to use with
269    /// `crate::fetch::fetch_leased_part`.
270    ///
271    /// The returned `Antichain` represents the subscription progress as it will
272    /// be _after_ the returned parts are fetched.
273    pub async fn next(
274        &mut self,
275        // If Some, an override for the default listen sleep retry parameters.
276        retry: Option<RetryParameters>,
277    ) -> (Vec<LeasedBatchPart<T>>, Antichain<T>) {
278        let batch = self
279            .handle
280            .machine
281            .next_listen_batch(
282                &self.frontier,
283                &mut self.watch,
284                Some(&self.handle.reader_id),
285                retry,
286            )
287            .await;
288
289        // A lot of things across mz have to line up to hold the following
290        // invariant and violations only show up as subtle correctness errors,
291        // so explictly validate it here. Better to panic and roll back a
292        // release than be incorrect (also potentially corrupting a sink).
293        //
294        // Note that the since check is intentionally less_than, not less_equal.
295        // If a batch's since is X, that means we can no longer distinguish X
296        // (beyond self.frontier) from X-1 (not beyond self.frontier) to keep
297        // former and filter out the latter.
298        assert!(
299            PartialOrder::less_than(batch.desc.since(), &self.frontier)
300                // Special case when the frontier == the as_of (i.e. the first
301                // time this is called on a new Listen). Because as_of is
302                // _exclusive_, we don't need to be able to distinguish X from
303                // X-1.
304                || (self.frontier == self.as_of
305                    && PartialOrder::less_equal(batch.desc.since(), &self.frontier)),
306            "Listen on {} received a batch {:?} advanced past the listen frontier {:?}",
307            self.handle.machine.shard_id(),
308            batch.desc,
309            self.frontier
310        );
311
312        let new_frontier = batch.desc.upper().clone();
313
314        // We will have a new frontier, so this is an opportunity to downgrade our
315        // since capability. Go through `maybe_heartbeat` so we can rate limit
316        // this along with our heartbeats.
317        //
318        // HACK! Everything would be simpler if we could downgrade since to the
319        // new frontier, but we can't. The next call needs to be able to
320        // distinguish between the times T at the frontier (to emit updates with
321        // these times) and T-1 (to filter them). Advancing the since to
322        // frontier would erase the ability to distinguish between them. Ideally
323        // we'd use what is conceptually "batch.upper - 1" (the greatest
324        // elements that are still strictly less than batch.upper, which will be
325        // the new value of self.frontier after this call returns), but the
326        // trait bounds on T don't give us a way to compute that directly.
327        // Instead, we sniff out any elements in self.frontier (the upper of the
328        // batch the last time we called this) that are strictly less_than the
329        // batch upper to compute a new since. For totally ordered times
330        // (currently always the case in mz) self.frontier will always have a
331        // single element and it will be less_than upper, but the following
332        // logic is (hopefully) correct for partially order times as well. We
333        // could also abuse the fact that every time we actually emit is
334        // guaranteed by definition to be less_than upper to be a bit more
335        // prompt, but this would involve a lot more temporary antichains and
336        // it's unclear if that's worth it.
337        for x in self.frontier.elements().iter() {
338            let less_than_upper = batch.desc.upper().elements().iter().any(|u| x.less_than(u));
339            if less_than_upper {
340                self.since.join_assign(&Antichain::from_elem(x.clone()));
341            }
342        }
343
344        // IMPORTANT! Make sure this `lease_batch_parts` stays before the
345        // `maybe_downgrade_since` call. Otherwise, we might give up our
346        // capability on the batch's SeqNo before we lease it, which could lead
347        // to blobs that it references being GC'd.
348        let filter = FetchBatchFilter::Listen {
349            as_of: self.as_of.clone(),
350            lower: self.frontier.clone(),
351        };
352        let parts = self.handle.lease_batch_parts(batch, filter).collect().await;
353
354        self.handle.maybe_downgrade_since(&self.since).await;
355
356        // NB: Keep this after we use self.frontier to join_assign self.since
357        // and also after we construct metadata.
358        self.frontier = new_frontier;
359
360        (parts, self.frontier.clone())
361    }
362}
363
364impl<K, V, T, D> Listen<K, V, T, D>
365where
366    K: Debug + Codec,
367    V: Debug + Codec,
368    T: Timestamp + Lattice + Codec64 + Sync,
369    D: Semigroup + Codec64 + Send + Sync,
370{
371    /// Attempt to pull out the next values of this subscription.
372    ///
373    /// The updates received in [ListenEvent::Updates] should be assumed to be in arbitrary order
374    /// and not necessarily consolidated. However, the timestamp of each individual update will be
375    /// greater than or equal to the last received [ListenEvent::Progress] frontier (or this
376    /// [Listen]'s initial `as_of` frontier if no progress event has been emitted yet) and less
377    /// than the next [ListenEvent::Progress] frontier.
378    ///
379    /// If you have a use for consolidated listen output, given that snapshots can't be
380    /// consolidated, come talk to us!
381    #[instrument(level = "debug", name = "listen::next", fields(shard = %self.handle.machine.shard_id()))]
382    pub async fn fetch_next(
383        &mut self,
384    ) -> Vec<ListenEvent<T, ((Result<K, String>, Result<V, String>), T, D)>> {
385        let (parts, progress) = self.next(None).await;
386        let mut ret = Vec::with_capacity(parts.len() + 1);
387        for part in parts {
388            let fetched_part = self.fetch_batch_part(part).await;
389            let updates = fetched_part.collect::<Vec<_>>();
390            if !updates.is_empty() {
391                ret.push(ListenEvent::Updates(updates));
392            }
393        }
394        ret.push(ListenEvent::Progress(progress));
395        ret
396    }
397
398    /// Convert listener into futures::Stream
399    pub fn into_stream(
400        mut self,
401    ) -> impl Stream<Item = ListenEvent<T, ((Result<K, String>, Result<V, String>), T, D)>> {
402        async_stream::stream!({
403            loop {
404                for msg in self.fetch_next().await {
405                    yield msg;
406                }
407            }
408        })
409    }
410
411    /// Test helper to read from the listener until the given frontier is
412    /// reached. Because compaction can arbitrarily combine batches, we only
413    /// return the final progress info.
414    #[cfg(test)]
415    #[track_caller]
416    pub async fn read_until(
417        &mut self,
418        ts: &T,
419    ) -> (
420        Vec<((Result<K, String>, Result<V, String>), T, D)>,
421        Antichain<T>,
422    ) {
423        let mut updates = Vec::new();
424        let mut frontier = Antichain::from_elem(T::minimum());
425        while self.frontier.less_than(ts) {
426            for event in self.fetch_next().await {
427                match event {
428                    ListenEvent::Updates(mut x) => updates.append(&mut x),
429                    ListenEvent::Progress(x) => frontier = x,
430                }
431            }
432        }
433        // Unlike most tests, intentionally don't consolidate updates here
434        // because Listen replays them at the original fidelity.
435        (updates, frontier)
436    }
437}
438
439impl<K, V, T, D> Listen<K, V, T, D>
440where
441    K: Debug + Codec,
442    V: Debug + Codec,
443    T: Timestamp + Lattice + Codec64 + Sync,
444    D: Semigroup + Codec64 + Send + Sync,
445{
446    /// Fetches the contents of `part` and returns its lease.
447    ///
448    /// This is broken out into its own function to provide a trivial means for
449    /// [`Subscribe`], which contains a [`Listen`], to fetch batches.
450    async fn fetch_batch_part(&mut self, part: LeasedBatchPart<T>) -> FetchedPart<K, V, T, D> {
451        let fetched_part = fetch_leased_part(
452            &self.handle.cfg,
453            &part,
454            self.handle.blob.as_ref(),
455            Arc::clone(&self.handle.metrics),
456            &self.handle.metrics.read.listen,
457            &self.handle.machine.applier.shard_metrics,
458            &self.handle.reader_id,
459            self.handle.read_schemas.clone(),
460            &mut self.handle.schema_cache,
461        )
462        .await;
463        fetched_part
464    }
465
466    /// Politely expires this listen, releasing its lease.
467    ///
468    /// There is a best-effort impl in Drop for [`ReadHandle`] to expire the
469    /// [`ReadHandle`] held by the listen that wasn't explicitly expired with
470    /// this method. When possible, explicit expiry is still preferred because
471    /// the Drop one is best effort and is dependant on a tokio [Handle] being
472    /// available in the TLC at the time of drop (which is a bit subtle). Also,
473    /// explicit expiry allows for control over when it happens.
474    pub async fn expire(self) {
475        self.handle.expire().await
476    }
477}
478
479/// A "capability" granting the ability to read the state of some shard at times
480/// greater or equal to `self.since()`.
481///
482/// Production users should call [Self::expire] before dropping a ReadHandle so
483/// that it can expire its leases. If/when rust gets AsyncDrop, this will be
484/// done automatically.
485///
486/// All async methods on ReadHandle retry for as long as they are able, but the
487/// returned [std::future::Future]s implement "cancel on drop" semantics. This
488/// means that callers can add a timeout using [tokio::time::timeout] or
489/// [tokio::time::timeout_at].
490///
491/// ```rust,no_run
492/// # let mut read: mz_persist_client::read::ReadHandle<String, String, u64, i64> = unimplemented!();
493/// # let timeout: std::time::Duration = unimplemented!();
494/// # let new_since: timely::progress::Antichain<u64> = unimplemented!();
495/// # async {
496/// tokio::time::timeout(timeout, read.downgrade_since(&new_since)).await
497/// # };
498/// ```
499#[derive(Debug)]
500pub struct ReadHandle<K: Codec, V: Codec, T, D> {
501    pub(crate) cfg: PersistConfig,
502    pub(crate) metrics: Arc<Metrics>,
503    pub(crate) machine: Machine<K, V, T, D>,
504    pub(crate) gc: GarbageCollector<K, V, T, D>,
505    pub(crate) blob: Arc<dyn Blob>,
506    pub(crate) reader_id: LeasedReaderId,
507    pub(crate) read_schemas: Schemas<K, V>,
508    pub(crate) schema_cache: SchemaCache<K, V, T, D>,
509
510    since: Antichain<T>,
511    pub(crate) last_heartbeat: EpochMillis,
512    pub(crate) leased_seqnos: BTreeMap<SeqNo, Lease>,
513    pub(crate) unexpired_state: Option<UnexpiredReadHandleState>,
514}
515
516/// Length of time after a reader's last operation after which the reader may be
517/// expired.
518pub(crate) const READER_LEASE_DURATION: Config<Duration> = Config::new(
519    "persist_reader_lease_duration",
520    Duration::from_secs(60 * 15),
521    "The time after which we'll clean up stale read leases",
522);
523
524impl<K, V, T, D> ReadHandle<K, V, T, D>
525where
526    K: Debug + Codec,
527    V: Debug + Codec,
528    T: Timestamp + Lattice + Codec64 + Sync,
529    D: Semigroup + Codec64 + Send + Sync,
530{
531    pub(crate) async fn new(
532        cfg: PersistConfig,
533        metrics: Arc<Metrics>,
534        machine: Machine<K, V, T, D>,
535        gc: GarbageCollector<K, V, T, D>,
536        blob: Arc<dyn Blob>,
537        reader_id: LeasedReaderId,
538        read_schemas: Schemas<K, V>,
539        since: Antichain<T>,
540        last_heartbeat: EpochMillis,
541    ) -> Self {
542        let schema_cache = machine.applier.schema_cache();
543        let expire_fn = Self::expire_fn(machine.clone(), gc.clone(), reader_id.clone());
544        ReadHandle {
545            cfg,
546            metrics: Arc::clone(&metrics),
547            machine: machine.clone(),
548            gc: gc.clone(),
549            blob,
550            reader_id: reader_id.clone(),
551            read_schemas,
552            schema_cache,
553            since,
554            last_heartbeat,
555            leased_seqnos: BTreeMap::new(),
556            unexpired_state: Some(UnexpiredReadHandleState {
557                expire_fn,
558                _heartbeat_tasks: machine
559                    .start_reader_heartbeat_tasks(reader_id, gc)
560                    .await
561                    .into_iter()
562                    .map(JoinHandle::abort_on_drop)
563                    .collect(),
564            }),
565        }
566    }
567
568    /// This handle's shard id.
569    pub fn shard_id(&self) -> ShardId {
570        self.machine.shard_id()
571    }
572
573    /// This handle's `since` frontier.
574    ///
575    /// This will always be greater or equal to the shard-global `since`.
576    pub fn since(&self) -> &Antichain<T> {
577        &self.since
578    }
579
580    fn outstanding_seqno(&mut self) -> Option<SeqNo> {
581        while let Some(first) = self.leased_seqnos.first_entry() {
582            if first.get().count() <= 1 {
583                first.remove();
584            } else {
585                return Some(*first.key());
586            }
587        }
588        None
589    }
590
591    /// Forwards the since frontier of this handle, giving up the ability to
592    /// read at times not greater or equal to `new_since`.
593    ///
594    /// This may trigger (asynchronous) compaction and consolidation in the
595    /// system. A `new_since` of the empty antichain "finishes" this shard,
596    /// promising that no more data will ever be read by this handle.
597    ///
598    /// This also acts as a heartbeat for the reader lease (including if called
599    /// with `new_since` equal to something like `self.since()` or the minimum
600    /// timestamp, making the call a no-op).
601    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
602    pub async fn downgrade_since(&mut self, new_since: &Antichain<T>) {
603        // Guaranteed to be the smallest/oldest outstanding lease on a `SeqNo`.
604        let outstanding_seqno = self.outstanding_seqno();
605
606        let heartbeat_ts = (self.cfg.now)();
607        let (_seqno, current_reader_since, maintenance) = self
608            .machine
609            .downgrade_since(&self.reader_id, outstanding_seqno, new_since, heartbeat_ts)
610            .await;
611
612        // Debugging for database-issues#4590.
613        if let Some(outstanding_seqno) = outstanding_seqno {
614            let seqnos_held = _seqno.0.saturating_sub(outstanding_seqno.0);
615            // We get just over 1 seqno-per-second on average for a shard in
616            // prod, so this is about an hour.
617            const SEQNOS_HELD_THRESHOLD: u64 = 60 * 60;
618            if seqnos_held >= SEQNOS_HELD_THRESHOLD {
619                tracing::info!(
620                    "{} reader {} holding an unexpected number of seqnos {} vs {}: {:?}. bt: {:?}",
621                    self.machine.shard_id(),
622                    self.reader_id,
623                    outstanding_seqno,
624                    _seqno,
625                    self.leased_seqnos.keys().take(10).collect::<Vec<_>>(),
626                    // The Debug impl of backtrace is less aesthetic, but will put the trace
627                    // on a single line and play more nicely with our Honeycomb quota
628                    Backtrace::force_capture(),
629                );
630            }
631        }
632
633        self.since = current_reader_since.0;
634        // A heartbeat is just any downgrade_since traffic, so update the
635        // internal rate limiter here to play nicely with `maybe_heartbeat`.
636        self.last_heartbeat = heartbeat_ts;
637        maintenance.start_performing(&self.machine, &self.gc);
638    }
639
640    /// Returns an ongoing subscription of updates to a shard.
641    ///
642    /// The stream includes all data at times greater than `as_of`. Combined
643    /// with [Self::snapshot] it will produce exactly correct results: the
644    /// snapshot is the TVCs contents at `as_of` and all subsequent updates
645    /// occur at exactly their indicated time. The recipient should only
646    /// downgrade their read capability when they are certain they have all data
647    /// through the frontier they would downgrade to.
648    ///
649    /// This takes ownership of the ReadHandle so the Listen can use it to
650    /// [Self::downgrade_since] as it progresses. If you need to keep this
651    /// handle, then [Self::clone] it before calling listen.
652    ///
653    /// The `Since` error indicates that the requested `as_of` cannot be served
654    /// (the caller has out of date information) and includes the smallest
655    /// `as_of` that would have been accepted.
656    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
657    pub async fn listen(self, as_of: Antichain<T>) -> Result<Listen<K, V, T, D>, Since<T>> {
658        let () = self.machine.verify_listen(&as_of)?;
659        Ok(Listen::new(self, as_of).await)
660    }
661
662    /// Returns all of the contents of the shard TVC at `as_of` broken up into
663    /// [`LeasedBatchPart`]es. These parts can be "turned in" via
664    /// `crate::fetch::fetch_batch_part` to receive the data they contain.
665    ///
666    /// This command returns the contents of this shard as of `as_of` once they
667    /// are known. This may "block" (in an async-friendly way) if `as_of` is
668    /// greater or equal to the current `upper` of the shard. The recipient
669    /// should only downgrade their read capability when they are certain they
670    /// have all data through the frontier they would downgrade to.
671    ///
672    /// The `Since` error indicates that the requested `as_of` cannot be served
673    /// (the caller has out of date information) and includes the smallest
674    /// `as_of` that would have been accepted.
675    #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
676    pub async fn snapshot(
677        &mut self,
678        as_of: Antichain<T>,
679    ) -> Result<Vec<LeasedBatchPart<T>>, Since<T>> {
680        let batches = self.machine.snapshot(&as_of).await?;
681
682        if !PartialOrder::less_equal(self.since(), &as_of) {
683            return Err(Since(self.since().clone()));
684        }
685
686        let filter = FetchBatchFilter::Snapshot { as_of };
687        let mut leased_parts = Vec::new();
688        for batch in batches {
689            // Flatten the HollowBatch into one LeasedBatchPart per key. Each key
690            // corresponds to a "part" or s3 object. This allows persist_source
691            // to distribute work by parts (smallish, more even size) instead of
692            // batches (arbitrarily large).
693            leased_parts.extend(
694                self.lease_batch_parts(batch, filter.clone())
695                    .collect::<Vec<_>>()
696                    .await,
697            );
698        }
699        Ok(leased_parts)
700    }
701
702    /// Returns a snapshot of all of a shard's data using `as_of`, followed by
703    /// listening to any future updates.
704    ///
705    /// For more details on this operation's semantics, see [Self::snapshot] and
706    /// [Self::listen].
707    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
708    pub async fn subscribe(
709        mut self,
710        as_of: Antichain<T>,
711    ) -> Result<Subscribe<K, V, T, D>, Since<T>> {
712        let snapshot_parts = self.snapshot(as_of.clone()).await?;
713        let listen = self.listen(as_of.clone()).await?;
714        Ok(Subscribe::new(snapshot_parts, listen))
715    }
716
717    fn lease_batch_part(
718        &mut self,
719        desc: Description<T>,
720        part: BatchPart<T>,
721        filter: FetchBatchFilter<T>,
722    ) -> LeasedBatchPart<T> {
723        LeasedBatchPart {
724            metrics: Arc::clone(&self.metrics),
725            shard_id: self.machine.shard_id(),
726            reader_id: self.reader_id.clone(),
727            filter,
728            desc,
729            part,
730            leased_seqno: self.machine.seqno(),
731            lease: Some(self.lease_seqno()),
732            filter_pushdown_audit: false,
733        }
734    }
735
736    fn lease_batch_parts(
737        &mut self,
738        batch: HollowBatch<T>,
739        filter: FetchBatchFilter<T>,
740    ) -> impl Stream<Item = LeasedBatchPart<T>> + '_ {
741        stream! {
742            let blob = Arc::clone(&self.blob);
743            let metrics = Arc::clone(&self.metrics);
744            let desc = batch.desc.clone();
745            for await part in batch.part_stream(self.shard_id(), &*blob, &*metrics) {
746                yield self.lease_batch_part(desc.clone(), part.expect("leased part").into_owned(), filter.clone())
747            }
748        }
749    }
750
751    /// Tracks that the `ReadHandle`'s machine's current `SeqNo` is being
752    /// "leased out" to a `LeasedBatchPart`, and cannot be garbage
753    /// collected until its lease has been returned.
754    fn lease_seqno(&mut self) -> Lease {
755        let seqno = self.machine.seqno();
756        let lease = self.leased_seqnos.entry(seqno).or_default();
757        lease.clone()
758    }
759
760    /// Returns an independent [ReadHandle] with a new [LeasedReaderId] but the
761    /// same `since`.
762    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
763    pub async fn clone(&self, purpose: &str) -> Self {
764        let new_reader_id = LeasedReaderId::new();
765        let machine = self.machine.clone();
766        let gc = self.gc.clone();
767        let heartbeat_ts = (self.cfg.now)();
768        let (reader_state, maintenance) = machine
769            .register_leased_reader(
770                &new_reader_id,
771                purpose,
772                READER_LEASE_DURATION.get(&self.cfg),
773                heartbeat_ts,
774                false,
775            )
776            .await;
777        maintenance.start_performing(&machine, &gc);
778        // The point of clone is that you're guaranteed to have the same (or
779        // greater) since capability, verify that.
780        // TODO: better if it's the same since capability exactly.
781        assert!(PartialOrder::less_equal(&reader_state.since, &self.since));
782        let new_reader = ReadHandle::new(
783            self.cfg.clone(),
784            Arc::clone(&self.metrics),
785            machine,
786            gc,
787            Arc::clone(&self.blob),
788            new_reader_id,
789            self.read_schemas.clone(),
790            reader_state.since,
791            heartbeat_ts,
792        )
793        .await;
794        new_reader
795    }
796
797    /// A rate-limited version of [Self::downgrade_since].
798    ///
799    /// This is an internally rate limited helper, designed to allow users to
800    /// call it as frequently as they like. Call this or [Self::downgrade_since],
801    /// on some interval that is "frequent" compared to the read lease duration.
802    pub async fn maybe_downgrade_since(&mut self, new_since: &Antichain<T>) {
803        let min_elapsed = READER_LEASE_DURATION.get(&self.cfg) / 4;
804        let elapsed_since_last_heartbeat =
805            Duration::from_millis((self.cfg.now)().saturating_sub(self.last_heartbeat));
806        if elapsed_since_last_heartbeat >= min_elapsed {
807            self.downgrade_since(new_since).await;
808        }
809    }
810
811    /// Politely expires this reader, releasing its lease.
812    ///
813    /// There is a best-effort impl in Drop to expire a reader that wasn't
814    /// explictly expired with this method. When possible, explicit expiry is
815    /// still preferred because the Drop one is best effort and is dependant on
816    /// a tokio [Handle] being available in the TLC at the time of drop (which
817    /// is a bit subtle). Also, explicit expiry allows for control over when it
818    /// happens.
819    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
820    pub async fn expire(mut self) {
821        // We drop the unexpired state before expiring the reader to ensure the
822        // heartbeat tasks can never observe the expired state. This doesn't
823        // matter for correctness, but avoids confusing log output if the
824        // heartbeat task were to discover that its lease has been expired.
825        let Some(unexpired_state) = self.unexpired_state.take() else {
826            return;
827        };
828        unexpired_state.expire_fn.0().await;
829    }
830
831    fn expire_fn(
832        machine: Machine<K, V, T, D>,
833        gc: GarbageCollector<K, V, T, D>,
834        reader_id: LeasedReaderId,
835    ) -> ExpireFn {
836        ExpireFn(Box::new(move || {
837            Box::pin(async move {
838                let (_, maintenance) = machine.expire_leased_reader(&reader_id).await;
839                maintenance.start_performing(&machine, &gc);
840            })
841        }))
842    }
843
844    /// Test helper for a [Self::listen] call that is expected to succeed.
845    #[cfg(test)]
846    #[track_caller]
847    pub async fn expect_listen(self, as_of: T) -> Listen<K, V, T, D> {
848        self.listen(Antichain::from_elem(as_of))
849            .await
850            .expect("cannot serve requested as_of")
851    }
852}
853
854/// State for a read handle that has not been explicitly expired.
855#[derive(Debug)]
856pub(crate) struct UnexpiredReadHandleState {
857    expire_fn: ExpireFn,
858    pub(crate) _heartbeat_tasks: Vec<AbortOnDropHandle<()>>,
859}
860
861/// An incremental cursor through a particular shard, returned from [ReadHandle::snapshot_cursor].
862///
863/// To read an entire dataset, the
864/// client should call `next` until it returns `None`, which signals all data has been returned...
865/// but it's also free to abandon the instance at any time if it eg. only needs a few entries.
866#[derive(Debug)]
867pub struct Cursor<K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64> {
868    consolidator: CursorConsolidator<K, V, T, D>,
869    _lease: Lease,
870    read_schemas: Schemas<K, V>,
871}
872
873#[derive(Debug)]
874enum CursorConsolidator<K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64> {
875    Structured {
876        consolidator: Consolidator<T, D, StructuredSort<K, V, T, D>>,
877        max_len: usize,
878        max_bytes: usize,
879    },
880}
881
882impl<K, V, T, D> Cursor<K, V, T, D>
883where
884    K: Debug + Codec + Ord,
885    V: Debug + Codec + Ord,
886    T: Timestamp + Lattice + Codec64 + Sync,
887    D: Semigroup + Ord + Codec64 + Send + Sync,
888{
889    /// Grab the next batch of consolidated data.
890    pub async fn next(
891        &mut self,
892    ) -> Option<impl Iterator<Item = ((Result<K, String>, Result<V, String>), T, D)> + '_> {
893        match &mut self.consolidator {
894            CursorConsolidator::Structured {
895                consolidator,
896                max_len,
897                max_bytes,
898            } => {
899                let part = consolidator
900                    .next_chunk(*max_len, *max_bytes)
901                    .await
902                    .expect("fetching a leased part")?;
903                let key_decoder = self
904                    .read_schemas
905                    .key
906                    .decoder_any(part.key.as_ref())
907                    .expect("ok");
908                let val_decoder = self
909                    .read_schemas
910                    .val
911                    .decoder_any(part.val.as_ref())
912                    .expect("ok");
913                let iter = (0..part.len()).map(move |i| {
914                    let mut k = K::default();
915                    let mut v = V::default();
916                    key_decoder.decode(i, &mut k);
917                    val_decoder.decode(i, &mut v);
918                    let t = T::decode(part.time.value(i).to_le_bytes());
919                    let d = D::decode(part.diff.value(i).to_le_bytes());
920                    ((Ok(k), Ok(v)), t, d)
921                });
922
923                Some(iter)
924            }
925        }
926    }
927}
928
929impl<K, V, T, D> ReadHandle<K, V, T, D>
930where
931    K: Debug + Codec + Ord,
932    V: Debug + Codec + Ord,
933    T: Timestamp + Lattice + Codec64 + Sync,
934    D: Semigroup + Ord + Codec64 + Send + Sync,
935{
936    /// Generates a [Self::snapshot], and fetches all of the batches it
937    /// contains.
938    ///
939    /// The output is consolidated. Furthermore, to keep memory usage down when
940    /// reading a snapshot that consolidates well, this consolidates as it goes.
941    ///
942    /// Potential future improvements (if necessary):
943    /// - Accept something like a `F: Fn(K,V) -> (K,V)` argument, which looks
944    ///   like an MFP you might be pushing down. Reason being that if you are
945    ///   projecting or transforming in a way that allows further consolidation,
946    ///   amazing.
947    /// - Reuse any code we write to streaming-merge consolidate in
948    ///   persist_source here.
949    pub async fn snapshot_and_fetch(
950        &mut self,
951        as_of: Antichain<T>,
952    ) -> Result<Vec<((Result<K, String>, Result<V, String>), T, D)>, Since<T>> {
953        let mut cursor = self.snapshot_cursor(as_of, |_| true).await?;
954        let mut contents = Vec::new();
955        while let Some(iter) = cursor.next().await {
956            contents.extend(iter);
957        }
958
959        // We don't currently guarantee that encoding is one-to-one, so we still need to
960        // consolidate the decoded outputs. However, let's report if this isn't a noop.
961        let old_len = contents.len();
962        consolidate_updates(&mut contents);
963        if old_len != contents.len() {
964            // TODO(bkirwi): do we need more / finer-grained metrics for this?
965            self.machine
966                .applier
967                .shard_metrics
968                .unconsolidated_snapshot
969                .inc();
970        }
971
972        Ok(contents)
973    }
974
975    /// Generates a [Self::snapshot], and fetches all of the batches it
976    /// contains.
977    ///
978    /// To keep memory usage down when reading a snapshot that consolidates well, this consolidates
979    /// as it goes. However, note that only the serialized data is consolidated: the deserialized
980    /// data will only be consolidated if your K/V codecs are one-to-one.
981    pub async fn snapshot_cursor(
982        &mut self,
983        as_of: Antichain<T>,
984        should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
985    ) -> Result<Cursor<K, V, T, D>, Since<T>> {
986        let batches = self.machine.snapshot(&as_of).await?;
987
988        let context = format!("{}[as_of={:?}]", self.shard_id(), as_of.elements());
989        let filter = FetchBatchFilter::Snapshot {
990            as_of: as_of.clone(),
991        };
992        let lease = self.lease_seqno();
993
994        let consolidator = {
995            let mut consolidator = Consolidator::new(
996                context,
997                self.shard_id(),
998                StructuredSort::new(self.read_schemas.clone()),
999                Arc::clone(&self.blob),
1000                Arc::clone(&self.metrics),
1001                Arc::clone(&self.machine.applier.shard_metrics),
1002                self.metrics.read.snapshot.clone(),
1003                filter,
1004                COMPACTION_MEMORY_BOUND_BYTES.get(&self.cfg),
1005            );
1006            for batch in batches {
1007                for (meta, run) in batch.runs() {
1008                    consolidator.enqueue_run(
1009                        &batch.desc,
1010                        meta,
1011                        run.into_iter()
1012                            .filter(|p| should_fetch_part(p.stats()))
1013                            .cloned(),
1014                    );
1015                }
1016            }
1017            CursorConsolidator::Structured {
1018                consolidator,
1019                // This default may end up consolidating more records than previously
1020                // for cases like fast-path peeks, where only the first few entries are used.
1021                // If this is a noticeable performance impact, thread the max-len in from the caller.
1022                max_len: self.cfg.compaction_yield_after_n_updates,
1023                max_bytes: BLOB_TARGET_SIZE.get(&self.cfg).max(1),
1024            }
1025        };
1026
1027        Ok(Cursor {
1028            consolidator,
1029            _lease: lease,
1030            read_schemas: self.read_schemas.clone(),
1031        })
1032    }
1033
1034    /// Returns aggregate statistics about the contents of the shard TVC at the
1035    /// given frontier.
1036    ///
1037    /// This command returns the contents of this shard as of `as_of` once they
1038    /// are known. This may "block" (in an async-friendly way) if `as_of` is
1039    /// greater or equal to the current `upper` of the shard. If `None` is given
1040    /// for `as_of`, then the latest stats known by this process are used.
1041    ///
1042    /// The `Since` error indicates that the requested `as_of` cannot be served
1043    /// (the caller has out of date information) and includes the smallest
1044    /// `as_of` that would have been accepted.
1045    pub fn snapshot_stats(
1046        &self,
1047        as_of: Option<Antichain<T>>,
1048    ) -> impl Future<Output = Result<SnapshotStats, Since<T>>> + Send + 'static {
1049        let machine = self.machine.clone();
1050        async move {
1051            let batches = match as_of {
1052                Some(as_of) => machine.snapshot(&as_of).await?,
1053                None => machine.applier.all_batches(),
1054            };
1055            let num_updates = batches.iter().map(|b| b.len).sum();
1056            Ok(SnapshotStats {
1057                shard_id: machine.shard_id(),
1058                num_updates,
1059            })
1060        }
1061    }
1062
1063    /// Returns aggregate statistics about the contents of the shard TVC at the
1064    /// given frontier.
1065    ///
1066    /// This command returns the contents of this shard as of `as_of` once they
1067    /// are known. This may "block" (in an async-friendly way) if `as_of` is
1068    /// greater or equal to the current `upper` of the shard.
1069    ///
1070    /// The `Since` error indicates that the requested `as_of` cannot be served
1071    /// (the caller has out of date information) and includes the smallest
1072    /// `as_of` that would have been accepted.
1073    pub async fn snapshot_parts_stats(
1074        &self,
1075        as_of: Antichain<T>,
1076    ) -> Result<SnapshotPartsStats, Since<T>> {
1077        let batches = self.machine.snapshot(&as_of).await?;
1078        let parts = stream::iter(&batches)
1079            .flat_map(|b| b.part_stream(self.shard_id(), &*self.blob, &*self.metrics))
1080            .map(|p| {
1081                let p = p.expect("live batch");
1082                SnapshotPartStats {
1083                    encoded_size_bytes: p.encoded_size_bytes(),
1084                    stats: p.stats().cloned(),
1085                }
1086            })
1087            .collect()
1088            .await;
1089        Ok(SnapshotPartsStats {
1090            metrics: Arc::clone(&self.machine.applier.metrics),
1091            shard_id: self.machine.shard_id(),
1092            parts,
1093        })
1094    }
1095}
1096
1097impl<K, V, T, D> ReadHandle<K, V, T, D>
1098where
1099    K: Debug + Codec + Ord,
1100    V: Debug + Codec + Ord,
1101    T: Timestamp + Lattice + Codec64 + Sync,
1102    D: Semigroup + Codec64 + Send + Sync,
1103{
1104    /// Generates a [Self::snapshot], and streams out all of the updates
1105    /// it contains in bounded memory.
1106    ///
1107    /// The output is not consolidated.
1108    pub async fn snapshot_and_stream(
1109        &mut self,
1110        as_of: Antichain<T>,
1111    ) -> Result<
1112        impl Stream<Item = ((Result<K, String>, Result<V, String>), T, D)> + use<K, V, T, D>,
1113        Since<T>,
1114    > {
1115        let snap = self.snapshot(as_of).await?;
1116
1117        let blob = Arc::clone(&self.blob);
1118        let metrics = Arc::clone(&self.metrics);
1119        let snapshot_metrics = self.metrics.read.snapshot.clone();
1120        let shard_metrics = Arc::clone(&self.machine.applier.shard_metrics);
1121        let reader_id = self.reader_id.clone();
1122        let schemas = self.read_schemas.clone();
1123        let mut schema_cache = self.schema_cache.clone();
1124        let persist_cfg = self.cfg.clone();
1125        let stream = async_stream::stream! {
1126            for part in snap {
1127                let mut fetched_part = fetch_leased_part(
1128                    &persist_cfg,
1129                    &part,
1130                    blob.as_ref(),
1131                    Arc::clone(&metrics),
1132                    &snapshot_metrics,
1133                    &shard_metrics,
1134                    &reader_id,
1135                    schemas.clone(),
1136                    &mut schema_cache,
1137                )
1138                .await;
1139
1140                while let Some(next) = fetched_part.next() {
1141                    yield next;
1142                }
1143            }
1144        };
1145
1146        Ok(stream)
1147    }
1148}
1149
1150impl<K, V, T, D> ReadHandle<K, V, T, D>
1151where
1152    K: Debug + Codec + Ord,
1153    V: Debug + Codec + Ord,
1154    T: Timestamp + Lattice + Codec64 + Ord + Sync,
1155    D: Semigroup + Ord + Codec64 + Send + Sync,
1156{
1157    /// Test helper to generate a [Self::snapshot] call that is expected to
1158    /// succeed, process its batches, and then return its data sorted.
1159    #[cfg(test)]
1160    #[track_caller]
1161    pub async fn expect_snapshot_and_fetch(
1162        &mut self,
1163        as_of: T,
1164    ) -> Vec<((Result<K, String>, Result<V, String>), T, D)> {
1165        let mut ret = self
1166            .snapshot_and_fetch(Antichain::from_elem(as_of))
1167            .await
1168            .expect("cannot serve requested as_of");
1169
1170        ret.sort();
1171        ret
1172    }
1173}
1174
1175impl<K: Codec, V: Codec, T, D> Drop for ReadHandle<K, V, T, D> {
1176    fn drop(&mut self) {
1177        // We drop the unexpired state before expiring the reader to ensure the
1178        // heartbeat tasks can never observe the expired state. This doesn't
1179        // matter for correctness, but avoids confusing log output if the
1180        // heartbeat task were to discover that its lease has been expired.
1181        let Some(unexpired_state) = self.unexpired_state.take() else {
1182            return;
1183        };
1184
1185        let handle = match Handle::try_current() {
1186            Ok(x) => x,
1187            Err(_) => {
1188                warn!(
1189                    "ReadHandle {} dropped without being explicitly expired, falling back to lease timeout",
1190                    self.reader_id
1191                );
1192                return;
1193            }
1194        };
1195        // Spawn a best-effort task to expire this read handle. It's fine if
1196        // this doesn't run to completion, we'd just have to wait out the lease
1197        // before the shard-global since is unblocked.
1198        //
1199        // Intentionally create the span outside the task to set the parent.
1200        let expire_span = debug_span!("drop::expire");
1201        handle.spawn_named(
1202            || format!("ReadHandle::expire ({})", self.reader_id),
1203            unexpired_state.expire_fn.0().instrument(expire_span),
1204        );
1205    }
1206}
1207
1208#[cfg(test)]
1209mod tests {
1210    use std::pin;
1211    use std::str::FromStr;
1212
1213    use mz_dyncfg::ConfigUpdates;
1214    use mz_ore::cast::CastFrom;
1215    use mz_ore::metrics::MetricsRegistry;
1216    use mz_persist::mem::{MemBlob, MemBlobConfig, MemConsensus};
1217    use mz_persist::unreliable::{UnreliableConsensus, UnreliableHandle};
1218    use serde::{Deserialize, Serialize};
1219    use serde_json::json;
1220    use tokio_stream::StreamExt;
1221
1222    use crate::async_runtime::IsolatedRuntime;
1223    use crate::batch::BLOB_TARGET_SIZE;
1224    use crate::cache::StateCache;
1225    use crate::internal::metrics::Metrics;
1226    use crate::rpc::NoopPubSubSender;
1227    use crate::tests::{all_ok, new_test_client};
1228    use crate::{Diagnostics, PersistClient, PersistConfig, ShardId};
1229
1230    use super::*;
1231
1232    // Verifies `Subscribe` can be dropped while holding snapshot batches.
1233    #[mz_persist_proc::test(tokio::test)]
1234    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1235    async fn drop_unused_subscribe(dyncfgs: ConfigUpdates) {
1236        let data = [
1237            (("0".to_owned(), "zero".to_owned()), 0, 1),
1238            (("1".to_owned(), "one".to_owned()), 1, 1),
1239            (("2".to_owned(), "two".to_owned()), 2, 1),
1240        ];
1241
1242        let (mut write, read) = new_test_client(&dyncfgs)
1243            .await
1244            .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1245            .await;
1246
1247        write.expect_compare_and_append(&data[0..1], 0, 1).await;
1248        write.expect_compare_and_append(&data[1..2], 1, 2).await;
1249        write.expect_compare_and_append(&data[2..3], 2, 3).await;
1250
1251        let subscribe = read
1252            .subscribe(timely::progress::Antichain::from_elem(2))
1253            .await
1254            .unwrap();
1255        assert!(
1256            !subscribe.snapshot.as_ref().unwrap().is_empty(),
1257            "snapshot must have batches for test to be meaningful"
1258        );
1259        drop(subscribe);
1260    }
1261
1262    // Verifies that we streaming-consolidate away identical key-values in the same batch.
1263    #[mz_persist_proc::test(tokio::test)]
1264    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1265    async fn streaming_consolidate(dyncfgs: ConfigUpdates) {
1266        let data = &[
1267            // Identical records should sum together...
1268            (("k".to_owned(), "v".to_owned()), 0, 1),
1269            (("k".to_owned(), "v".to_owned()), 1, 1),
1270            (("k".to_owned(), "v".to_owned()), 2, 1),
1271            // ...and when they cancel out entirely they should be omitted.
1272            (("k2".to_owned(), "v".to_owned()), 0, 1),
1273            (("k2".to_owned(), "v".to_owned()), 1, -1),
1274        ];
1275
1276        let (mut write, read) = {
1277            let client = new_test_client(&dyncfgs).await;
1278            client.cfg.set_config(&BLOB_TARGET_SIZE, 1000); // So our batch stays together!
1279            client
1280                .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1281                .await
1282        };
1283
1284        write.expect_compare_and_append(data, 0, 5).await;
1285
1286        let mut snapshot = read
1287            .subscribe(timely::progress::Antichain::from_elem(4))
1288            .await
1289            .unwrap();
1290
1291        let mut updates = vec![];
1292        'outer: loop {
1293            for event in snapshot.fetch_next().await {
1294                match event {
1295                    ListenEvent::Progress(t) => {
1296                        if !t.less_than(&4) {
1297                            break 'outer;
1298                        }
1299                    }
1300                    ListenEvent::Updates(data) => {
1301                        updates.extend(data);
1302                    }
1303                }
1304            }
1305        }
1306        assert_eq!(
1307            updates,
1308            &[((Ok("k".to_owned()), Ok("v".to_owned())), 4u64, 3i64)],
1309        )
1310    }
1311
1312    #[mz_persist_proc::test(tokio::test)]
1313    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1314    async fn snapshot_and_stream(dyncfgs: ConfigUpdates) {
1315        let data = &mut [
1316            (("k1".to_owned(), "v1".to_owned()), 0, 1),
1317            (("k2".to_owned(), "v2".to_owned()), 1, 1),
1318            (("k3".to_owned(), "v3".to_owned()), 2, 1),
1319            (("k4".to_owned(), "v4".to_owned()), 2, 1),
1320            (("k5".to_owned(), "v5".to_owned()), 3, 1),
1321        ];
1322
1323        let (mut write, mut read) = {
1324            let client = new_test_client(&dyncfgs).await;
1325            client.cfg.set_config(&BLOB_TARGET_SIZE, 0); // split batches across multiple parts
1326            client
1327                .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1328                .await
1329        };
1330
1331        write.expect_compare_and_append(&data[0..2], 0, 2).await;
1332        write.expect_compare_and_append(&data[2..4], 2, 3).await;
1333        write.expect_compare_and_append(&data[4..], 3, 4).await;
1334
1335        let as_of = Antichain::from_elem(3);
1336        let mut snapshot = pin::pin!(read.snapshot_and_stream(as_of.clone()).await.unwrap());
1337
1338        let mut snapshot_rows = vec![];
1339        while let Some(((k, v), t, d)) = snapshot.next().await {
1340            snapshot_rows.push(((k.expect("valid key"), v.expect("valid key")), t, d));
1341        }
1342
1343        for ((_k, _v), t, _d) in data.as_mut_slice() {
1344            t.advance_by(as_of.borrow());
1345        }
1346
1347        assert_eq!(data.as_slice(), snapshot_rows.as_slice());
1348    }
1349
1350    // Verifies the semantics of `SeqNo` leases + checks dropping `LeasedBatchPart` semantics.
1351    #[mz_persist_proc::test(tokio::test)]
1352    #[cfg_attr(miri, ignore)] // https://github.com/MaterializeInc/database-issues/issues/5964
1353    async fn seqno_leases(dyncfgs: ConfigUpdates) {
1354        let mut data = vec![];
1355        for i in 0..20 {
1356            data.push(((i.to_string(), i.to_string()), i, 1))
1357        }
1358
1359        let shard_id = ShardId::new();
1360
1361        let client = new_test_client(&dyncfgs).await;
1362        let (mut write, read) = client
1363            .expect_open::<String, String, u64, i64>(shard_id)
1364            .await;
1365
1366        // Seed with some values
1367        let mut offset = 0;
1368        let mut width = 2;
1369
1370        for i in offset..offset + width {
1371            write
1372                .expect_compare_and_append(
1373                    &data[i..i + 1],
1374                    u64::cast_from(i),
1375                    u64::cast_from(i) + 1,
1376                )
1377                .await;
1378        }
1379        offset += width;
1380
1381        // Create machinery for subscribe + fetch
1382        let mut fetcher = client
1383            .create_batch_fetcher::<String, String, u64, i64>(
1384                shard_id,
1385                Default::default(),
1386                Default::default(),
1387                false,
1388                Diagnostics::for_tests(),
1389            )
1390            .await
1391            .unwrap();
1392
1393        let mut subscribe = read
1394            .subscribe(timely::progress::Antichain::from_elem(1))
1395            .await
1396            .expect("cannot serve requested as_of");
1397
1398        // Determine sequence number at outset.
1399        let original_seqno_since = subscribe.listen.handle.machine.applier.seqno_since();
1400
1401        let mut parts = vec![];
1402
1403        width = 4;
1404        // Collect parts while continuing to write values
1405        for i in offset..offset + width {
1406            for event in subscribe.next(None).await {
1407                if let ListenEvent::Updates(mut new_parts) = event {
1408                    parts.append(&mut new_parts);
1409                    // Here and elsewhere we "cheat" and immediately downgrade the since
1410                    // to demonstrate the effects of SeqNo leases immediately.
1411                    subscribe
1412                        .listen
1413                        .handle
1414                        .downgrade_since(&subscribe.listen.since)
1415                        .await;
1416                }
1417            }
1418
1419            write
1420                .expect_compare_and_append(
1421                    &data[i..i + 1],
1422                    u64::cast_from(i),
1423                    u64::cast_from(i) + 1,
1424                )
1425                .await;
1426
1427            // SeqNo is not downgraded
1428            assert_eq!(
1429                subscribe.listen.handle.machine.applier.seqno_since(),
1430                original_seqno_since
1431            );
1432        }
1433
1434        offset += width;
1435
1436        let mut seqno_since = subscribe.listen.handle.machine.applier.seqno_since();
1437
1438        // We're starting out with the original, non-downgraded SeqNo
1439        assert_eq!(seqno_since, original_seqno_since);
1440
1441        // We have to handle the parts we generate during the next loop to
1442        // ensure they don't panic.
1443        let mut subsequent_parts = vec![];
1444
1445        // Ensure monotonicity of seqnos we're processing, otherwise the
1446        // invariant we're testing (returning the last part of a seqno will
1447        // downgrade its since) will not hold.
1448        let mut this_seqno = SeqNo::minimum();
1449
1450        // Repeat the same process as above, more or less, while fetching + returning parts
1451        for (mut i, part) in parts.into_iter().enumerate() {
1452            let part_seqno = part.leased_seqno;
1453            let last_seqno = this_seqno;
1454            this_seqno = part_seqno;
1455            assert!(this_seqno >= last_seqno);
1456
1457            let _ = fetcher.fetch_leased_part(&part).await;
1458            drop(part);
1459
1460            // Simulates an exchange
1461            for event in subscribe.next(None).await {
1462                if let ListenEvent::Updates(parts) = event {
1463                    for part in parts {
1464                        if let (_, Some(lease)) = part.into_exchangeable_part() {
1465                            subsequent_parts.push(lease);
1466                        }
1467                    }
1468                }
1469            }
1470
1471            subscribe
1472                .listen
1473                .handle
1474                .downgrade_since(&subscribe.listen.since)
1475                .await;
1476
1477            // Write more new values
1478            i += offset;
1479            write
1480                .expect_compare_and_append(
1481                    &data[i..i + 1],
1482                    u64::cast_from(i),
1483                    u64::cast_from(i) + 1,
1484                )
1485                .await;
1486
1487            // We should expect the SeqNo to be downgraded if this part's SeqNo
1488            // is no longer leased to any other parts, either.
1489            let expect_downgrade = subscribe.listen.handle.outstanding_seqno() > Some(part_seqno);
1490
1491            let new_seqno_since = subscribe.listen.handle.machine.applier.seqno_since();
1492            if expect_downgrade {
1493                assert!(new_seqno_since > seqno_since);
1494            } else {
1495                assert_eq!(new_seqno_since, seqno_since);
1496            }
1497            seqno_since = new_seqno_since;
1498        }
1499
1500        // SeqNo since was downgraded
1501        assert!(seqno_since > original_seqno_since);
1502
1503        // Return any outstanding parts, to prevent a panic!
1504        drop(subsequent_parts);
1505        drop(subscribe);
1506    }
1507
1508    #[mz_ore::test]
1509    fn reader_id_human_readable_serde() {
1510        #[derive(Debug, Serialize, Deserialize)]
1511        struct Container {
1512            reader_id: LeasedReaderId,
1513        }
1514
1515        // roundtrip through json
1516        let id =
1517            LeasedReaderId::from_str("r00000000-1234-5678-0000-000000000000").expect("valid id");
1518        assert_eq!(
1519            id,
1520            serde_json::from_value(serde_json::to_value(id.clone()).expect("serializable"))
1521                .expect("deserializable")
1522        );
1523
1524        // deserialize a serialized string directly
1525        assert_eq!(
1526            id,
1527            serde_json::from_str("\"r00000000-1234-5678-0000-000000000000\"")
1528                .expect("deserializable")
1529        );
1530
1531        // roundtrip id through a container type
1532        let json = json!({ "reader_id": id });
1533        assert_eq!(
1534            "{\"reader_id\":\"r00000000-1234-5678-0000-000000000000\"}",
1535            &json.to_string()
1536        );
1537        let container: Container = serde_json::from_value(json).expect("deserializable");
1538        assert_eq!(container.reader_id, id);
1539    }
1540
1541    // Verifies performance optimizations where a Listener doesn't fetch the
1542    // latest Consensus state if the one it currently has can serve the next
1543    // request.
1544    #[mz_ore::test(tokio::test)]
1545    #[cfg_attr(miri, ignore)] // too slow
1546    async fn skip_consensus_fetch_optimization() {
1547        let data = vec![
1548            (("0".to_owned(), "zero".to_owned()), 0, 1),
1549            (("1".to_owned(), "one".to_owned()), 1, 1),
1550            (("2".to_owned(), "two".to_owned()), 2, 1),
1551        ];
1552
1553        let cfg = PersistConfig::new_for_tests();
1554        let blob = Arc::new(MemBlob::open(MemBlobConfig::default()));
1555        let consensus = Arc::new(MemConsensus::default());
1556        let unreliable = UnreliableHandle::default();
1557        unreliable.totally_available();
1558        let consensus = Arc::new(UnreliableConsensus::new(consensus, unreliable.clone()));
1559        let metrics = Arc::new(Metrics::new(&cfg, &MetricsRegistry::new()));
1560        let pubsub_sender = Arc::new(NoopPubSubSender);
1561        let (mut write, mut read) = PersistClient::new(
1562            cfg,
1563            blob,
1564            consensus,
1565            metrics,
1566            Arc::new(IsolatedRuntime::default()),
1567            Arc::new(StateCache::new_no_metrics()),
1568            pubsub_sender,
1569        )
1570        .expect("client construction failed")
1571        .expect_open::<String, String, u64, i64>(ShardId::new())
1572        .await;
1573
1574        write.expect_compare_and_append(&data[0..1], 0, 1).await;
1575        write.expect_compare_and_append(&data[1..2], 1, 2).await;
1576        write.expect_compare_and_append(&data[2..3], 2, 3).await;
1577
1578        let snapshot = read.expect_snapshot_and_fetch(2).await;
1579        let mut listen = read.expect_listen(0).await;
1580
1581        // Manually advance the listener's machine so that it has the latest
1582        // state by fetching the first events from next. This is awkward but
1583        // only necessary because we're about to do some weird things with
1584        // unreliable.
1585        let listen_actual = listen.fetch_next().await;
1586        let expected_events = vec![ListenEvent::Progress(Antichain::from_elem(1))];
1587        assert_eq!(listen_actual, expected_events);
1588
1589        // At this point, the snapshot and listen's state should have all the
1590        // writes. Test this by making consensus completely unavailable.
1591        unreliable.totally_unavailable();
1592        assert_eq!(snapshot, all_ok(&data, 2));
1593        assert_eq!(
1594            listen.read_until(&3).await,
1595            (all_ok(&data[1..], 1), Antichain::from_elem(3))
1596        );
1597    }
1598}