Skip to main content

mz_persist_client/
read.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Read capabilities and handles
11
12use async_stream::stream;
13use std::collections::BTreeMap;
14use std::fmt::Debug;
15use std::future::Future;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use differential_dataflow::Hashable;
20use differential_dataflow::consolidation::consolidate_updates;
21use differential_dataflow::difference::Monoid;
22use differential_dataflow::lattice::Lattice;
23use futures::Stream;
24use futures_util::{StreamExt, stream};
25use mz_dyncfg::Config;
26use mz_ore::cast::CastLossy;
27use mz_ore::halt;
28use mz_ore::instrument;
29use mz_ore::task::JoinHandle;
30use mz_persist::location::{Blob, SeqNo};
31use mz_persist_types::columnar::{ColumnDecoder, Schema};
32use mz_persist_types::{Codec, Codec64};
33use proptest_derive::Arbitrary;
34use serde::{Deserialize, Serialize};
35use timely::PartialOrder;
36use timely::order::TotalOrder;
37use timely::progress::{Antichain, Timestamp};
38use tracing::warn;
39use uuid::Uuid;
40
41use crate::batch::BLOB_TARGET_SIZE;
42use crate::cfg::{COMPACTION_MEMORY_BOUND_BYTES, RetryParameters};
43use crate::fetch::FetchConfig;
44use crate::fetch::{FetchBatchFilter, FetchedPart, Lease, LeasedBatchPart, fetch_leased_part};
45use crate::internal::encoding::Schemas;
46use crate::internal::machine::{Machine, next_listen_batch_retry_params};
47use crate::internal::metrics::{Metrics, ReadMetrics, ShardMetrics};
48use crate::internal::state::{HollowBatch, LeasedReaderState, SnapshotErr};
49use crate::internal::watch::{AwaitableState, StateWatch};
50use crate::iter::{Consolidator, StructuredSort};
51use crate::schema::SchemaCache;
52use crate::stats::{SnapshotPartStats, SnapshotPartsStats, SnapshotStats};
53use crate::{GarbageCollector, PersistConfig, ShardId, parse_id};
54
55pub use crate::internal::encoding::LazyPartStats;
56pub use crate::internal::state::Since;
57
58/// An opaque identifier for a reader of a persist durable TVC (aka shard).
59#[derive(
60    Arbitrary,
61    Clone,
62    PartialEq,
63    Eq,
64    PartialOrd,
65    Ord,
66    Hash,
67    Serialize,
68    Deserialize
69)]
70#[serde(try_from = "String", into = "String")]
71pub struct LeasedReaderId(pub(crate) [u8; 16]);
72
73impl std::fmt::Display for LeasedReaderId {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        write!(f, "r{}", Uuid::from_bytes(self.0))
76    }
77}
78
79impl std::fmt::Debug for LeasedReaderId {
80    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81        write!(f, "LeasedReaderId({})", Uuid::from_bytes(self.0))
82    }
83}
84
85impl std::str::FromStr for LeasedReaderId {
86    type Err = String;
87
88    fn from_str(s: &str) -> Result<Self, Self::Err> {
89        parse_id("r", "LeasedReaderId", s).map(LeasedReaderId)
90    }
91}
92
93impl From<LeasedReaderId> for String {
94    fn from(reader_id: LeasedReaderId) -> Self {
95        reader_id.to_string()
96    }
97}
98
99impl TryFrom<String> for LeasedReaderId {
100    type Error = String;
101
102    fn try_from(s: String) -> Result<Self, Self::Error> {
103        s.parse()
104    }
105}
106
107impl LeasedReaderId {
108    pub(crate) fn new() -> Self {
109        LeasedReaderId(*Uuid::new_v4().as_bytes())
110    }
111}
112
113/// Capable of generating a snapshot of all data at `as_of`, followed by a
114/// listen of all updates.
115///
116/// For more details, see [`ReadHandle::snapshot`] and [`Listen`].
117#[derive(Debug)]
118pub struct Subscribe<K: Codec, V: Codec, T, D> {
119    snapshot: Option<Vec<LeasedBatchPart<T>>>,
120    listen: Listen<K, V, T, D>,
121}
122
123impl<K, V, T, D> Subscribe<K, V, T, D>
124where
125    K: Debug + Codec,
126    V: Debug + Codec,
127    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
128    D: Monoid + Codec64 + Send + Sync,
129{
130    fn new(snapshot_parts: Vec<LeasedBatchPart<T>>, listen: Listen<K, V, T, D>) -> Self {
131        Subscribe {
132            snapshot: Some(snapshot_parts),
133            listen,
134        }
135    }
136
137    /// Returns a `LeasedBatchPart` enriched with the proper metadata.
138    ///
139    /// First returns snapshot parts, until they're exhausted, at which point
140    /// begins returning listen parts.
141    ///
142    /// The returned `Antichain` represents the subscription progress as it will
143    /// be _after_ the returned parts are fetched.
144    #[instrument(level = "debug", fields(shard = %self.listen.handle.machine.shard_id()))]
145    pub async fn next(
146        &mut self,
147        // If Some, an override for the default listen sleep retry parameters.
148        listen_retry: Option<RetryParameters>,
149    ) -> Vec<ListenEvent<T, LeasedBatchPart<T>>> {
150        match self.snapshot.take() {
151            Some(parts) => vec![ListenEvent::Updates(parts)],
152            None => {
153                let (parts, upper) = self.listen.next(listen_retry).await;
154                vec![ListenEvent::Updates(parts), ListenEvent::Progress(upper)]
155            }
156        }
157    }
158}
159
160impl<K, V, T, D> Subscribe<K, V, T, D>
161where
162    K: Debug + Codec,
163    V: Debug + Codec,
164    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
165    D: Monoid + Codec64 + Send + Sync,
166{
167    /// Equivalent to `next`, but rather than returning a [`LeasedBatchPart`],
168    /// fetches and returns the data from within it.
169    #[instrument(level = "debug", fields(shard = %self.listen.handle.machine.shard_id()))]
170    pub async fn fetch_next(&mut self) -> Vec<ListenEvent<T, ((K, V), T, D)>> {
171        let events = self.next(None).await;
172        let new_len = events
173            .iter()
174            .map(|event| match event {
175                ListenEvent::Updates(parts) => parts.len(),
176                ListenEvent::Progress(_) => 1,
177            })
178            .sum();
179        let mut ret = Vec::with_capacity(new_len);
180        for event in events {
181            match event {
182                ListenEvent::Updates(parts) => {
183                    for part in parts {
184                        let fetched_part = self.listen.fetch_batch_part(part).await;
185                        let updates = fetched_part.collect::<Vec<_>>();
186                        if !updates.is_empty() {
187                            ret.push(ListenEvent::Updates(updates));
188                        }
189                    }
190                }
191                ListenEvent::Progress(progress) => ret.push(ListenEvent::Progress(progress)),
192            }
193        }
194        ret
195    }
196
197    /// Fetches the contents of `part` and returns its lease.
198    pub async fn fetch_batch_part(&mut self, part: LeasedBatchPart<T>) -> FetchedPart<K, V, T, D> {
199        self.listen.fetch_batch_part(part).await
200    }
201}
202
203impl<K, V, T, D> Subscribe<K, V, T, D>
204where
205    K: Debug + Codec,
206    V: Debug + Codec,
207    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
208    D: Monoid + Codec64 + Send + Sync,
209{
210    /// Politely expires this subscribe, releasing its lease.
211    ///
212    /// There is a best-effort impl in Drop to expire the
213    /// [`ReadHandle`] held by the subscribe that wasn't explicitly expired
214    /// with this method. When possible, explicit expiry is still preferred
215    /// because it also ensures that the background task is complete.
216    pub async fn expire(mut self) {
217        let _ = self.snapshot.take(); // Drop all leased parts.
218        self.listen.expire().await;
219    }
220}
221
222/// Data and progress events of a shard subscription.
223///
224/// TODO: Unify this with [timely::dataflow::operators::capture::event::Event].
225#[derive(Debug, PartialEq)]
226pub enum ListenEvent<T, D> {
227    /// Progress of the shard.
228    Progress(Antichain<T>),
229    /// Data of the shard.
230    Updates(Vec<D>),
231}
232
233/// An ongoing subscription of updates to a shard.
234#[derive(Debug)]
235pub struct Listen<K: Codec, V: Codec, T, D> {
236    handle: ReadHandle<K, V, T, D>,
237    as_of: Antichain<T>,
238    since: Antichain<T>,
239    frontier: Antichain<T>,
240}
241
242impl<K, V, T, D> Listen<K, V, T, D>
243where
244    K: Debug + Codec,
245    V: Debug + Codec,
246    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
247    D: Monoid + Codec64 + Send + Sync,
248{
249    async fn new(
250        mut handle: ReadHandle<K, V, T, D>,
251        as_of: Antichain<T>,
252    ) -> Result<Self, Since<T>> {
253        let () = handle.machine.verify_listen(&as_of)?;
254
255        let since = as_of.clone();
256        if !PartialOrder::less_equal(handle.since(), &since) {
257            // We can't guarantee that the as-of will be available by the time we start reading,
258            // since our handle is already downgraded too far.
259            return Err(Since(handle.since().clone()));
260        }
261        // This listen only needs to distinguish things after its frontier
262        // (initially as_of although the frontier is inclusive and the as_of
263        // isn't). Be a good citizen and downgrade early.
264        handle.downgrade_since(&since).await;
265        Ok(Listen {
266            handle,
267            since,
268            frontier: as_of.clone(),
269            as_of,
270        })
271    }
272
273    /// An exclusive upper bound on the progress of this Listen.
274    pub fn frontier(&self) -> &Antichain<T> {
275        &self.frontier
276    }
277
278    /// Attempt to pull out the next values of this subscription.
279    ///
280    /// The returned [`LeasedBatchPart`] is appropriate to use with
281    /// `crate::fetch::fetch_leased_part`.
282    ///
283    /// The returned `Antichain` represents the subscription progress as it will
284    /// be _after_ the returned parts are fetched.
285    pub async fn next(
286        &mut self,
287        // If Some, an override for the default listen sleep retry parameters.
288        retry: Option<RetryParameters>,
289    ) -> (Vec<LeasedBatchPart<T>>, Antichain<T>) {
290        // Wait until the upper is past our frontier - ie. there is another batch for us to process.
291        let retry = retry
292            .unwrap_or_else(|| next_listen_batch_retry_params(&self.handle.machine.applier.cfg));
293        self.handle
294            .machine
295            .wait_for_upper_past(
296                &self.frontier,
297                &mut self.handle.watch,
298                Some(&self.handle.reader_id),
299                &self.handle.metrics.retries.next_listen_batch,
300                retry,
301            )
302            .await;
303
304        // Obtain a lease before grabbing the upcoming batch from state.
305        let lease = self.handle.lease_seqno().await;
306        let batch = match self
307            .handle
308            .machine
309            .applier
310            .next_listen_batch(&self.frontier)
311        {
312            Ok(batch) => batch,
313            Err(seqno) => {
314                panic!(
315                    "waited for upper past {frontier:?}, but no listen batch was available at {seqno:?}!",
316                    frontier = self.frontier.elements()
317                );
318            }
319        };
320
321        // A lot of things across mz have to line up to hold the following
322        // invariant and violations only show up as subtle correctness errors,
323        // so explicitly validate it here. Better to panic and roll back a
324        // release than be incorrect (also potentially corrupting a sink).
325        //
326        // Note that the since check is intentionally less_than, not less_equal.
327        // If a batch's since is X, that means we can no longer distinguish X
328        // (beyond self.frontier) from X-1 (not beyond self.frontier) to keep
329        // former and filter out the latter.
330        let acceptable_desc = PartialOrder::less_than(batch.desc.since(), &self.frontier)
331            // Special case when the frontier == the as_of (i.e. the first
332            // time this is called on a new Listen). Because as_of is
333            // _exclusive_, we don't need to be able to distinguish X from
334            // X-1.
335            || (self.frontier == self.as_of
336            && PartialOrder::less_equal(batch.desc.since(), &self.frontier));
337        if !acceptable_desc {
338            let lease_state = self
339                .handle
340                .machine
341                .applier
342                .reader_lease(self.handle.reader_id.clone());
343            if let Some(lease) = lease_state {
344                panic!(
345                    "Listen on {} received a batch {:?} advanced past the listen frontier {:?}, but the lease has not expired: {:?}",
346                    self.handle.machine.shard_id(),
347                    batch.desc,
348                    self.frontier,
349                    lease
350                )
351            } else {
352                // Ideally we'd percolate this error up, so callers could eg. restart a dataflow
353                // instead of restarting a process...
354                halt!(
355                    "Listen on {} received a batch {:?} advanced past the listen frontier {:?} after the reader has expired. \
356                     This can happen in exceptional cases: a machine goes to sleep or is running out of memory or CPU, for example.",
357                    self.handle.machine.shard_id(),
358                    batch.desc,
359                    self.frontier
360                )
361            }
362        }
363
364        let new_frontier = batch.desc.upper().clone();
365
366        // We will have a new frontier, so this is an opportunity to downgrade our
367        // since capability. Go through `maybe_heartbeat` so we can rate limit
368        // this along with our heartbeats.
369        //
370        // HACK! Everything would be simpler if we could downgrade since to the
371        // new frontier, but we can't. The next call needs to be able to
372        // distinguish between the times T at the frontier (to emit updates with
373        // these times) and T-1 (to filter them). Advancing the since to
374        // frontier would erase the ability to distinguish between them. Ideally
375        // we'd use what is conceptually "batch.upper - 1" (the greatest
376        // elements that are still strictly less than batch.upper, which will be
377        // the new value of self.frontier after this call returns), but the
378        // trait bounds on T don't give us a way to compute that directly.
379        // Instead, we sniff out any elements in self.frontier (the upper of the
380        // batch the last time we called this) that are strictly less_than the
381        // batch upper to compute a new since. For totally ordered times
382        // (currently always the case in mz) self.frontier will always have a
383        // single element and it will be less_than upper. We
384        // could also abuse the fact that every time we actually emit is
385        // guaranteed by definition to be less_than upper to be a bit more
386        // prompt, but this would involve a lot more temporary antichains and
387        // it's unclear if that's worth it.
388        for x in self.frontier.elements().iter() {
389            let less_than_upper = batch.desc.upper().elements().iter().any(|u| x.less_than(u));
390            if less_than_upper {
391                self.since.join_assign(&Antichain::from_elem(x.clone()));
392            }
393        }
394
395        // IMPORTANT! Make sure this `lease_batch_parts` stays before the
396        // `maybe_downgrade_since` call. Otherwise, we might give up our
397        // capability on the batch's SeqNo before we lease it, which could lead
398        // to blobs that it references being GC'd.
399        let filter = FetchBatchFilter::Listen {
400            as_of: self.as_of.clone(),
401            lower: self.frontier.clone(),
402        };
403        let parts = self
404            .handle
405            .lease_batch_parts(lease, batch, filter)
406            .collect()
407            .await;
408
409        self.handle.maybe_downgrade_since(&self.since).await;
410
411        // NB: Keep this after we use self.frontier to join_assign self.since
412        // and also after we construct metadata.
413        self.frontier = new_frontier;
414
415        (parts, self.frontier.clone())
416    }
417}
418
419impl<K, V, T, D> Listen<K, V, T, D>
420where
421    K: Debug + Codec,
422    V: Debug + Codec,
423    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
424    D: Monoid + Codec64 + Send + Sync,
425{
426    /// Attempt to pull out the next values of this subscription.
427    ///
428    /// The updates received in [ListenEvent::Updates] should be assumed to be in arbitrary order
429    /// and not necessarily consolidated. However, the timestamp of each individual update will be
430    /// greater than or equal to the last received [ListenEvent::Progress] frontier (or this
431    /// [Listen]'s initial `as_of` frontier if no progress event has been emitted yet) and less
432    /// than the next [ListenEvent::Progress] frontier.
433    ///
434    /// If you have a use for consolidated listen output, given that snapshots can't be
435    /// consolidated, come talk to us!
436    #[instrument(level = "debug", name = "listen::next", fields(shard = %self.handle.machine.shard_id()))]
437    pub async fn fetch_next(&mut self) -> Vec<ListenEvent<T, ((K, V), T, D)>> {
438        let (parts, progress) = self.next(None).await;
439        let mut ret = Vec::with_capacity(parts.len() + 1);
440        for part in parts {
441            let fetched_part = self.fetch_batch_part(part).await;
442            let updates = fetched_part.collect::<Vec<_>>();
443            if !updates.is_empty() {
444                ret.push(ListenEvent::Updates(updates));
445            }
446        }
447        ret.push(ListenEvent::Progress(progress));
448        ret
449    }
450
451    /// Convert listener into futures::Stream
452    pub fn into_stream(mut self) -> impl Stream<Item = ListenEvent<T, ((K, V), T, D)>> {
453        async_stream::stream!({
454            loop {
455                for msg in self.fetch_next().await {
456                    yield msg;
457                }
458            }
459        })
460    }
461
462    /// Test helper to read from the listener until the given frontier is
463    /// reached. Because compaction can arbitrarily combine batches, we only
464    /// return the final progress info.
465    #[cfg(test)]
466    #[track_caller]
467    pub async fn read_until(&mut self, ts: &T) -> (Vec<((K, V), T, D)>, Antichain<T>) {
468        let mut updates = Vec::new();
469        let mut frontier = Antichain::from_elem(T::minimum());
470        while self.frontier.less_than(ts) {
471            for event in self.fetch_next().await {
472                match event {
473                    ListenEvent::Updates(mut x) => updates.append(&mut x),
474                    ListenEvent::Progress(x) => frontier = x,
475                }
476            }
477        }
478        // Unlike most tests, intentionally don't consolidate updates here
479        // because Listen replays them at the original fidelity.
480        (updates, frontier)
481    }
482}
483
484impl<K, V, T, D> Listen<K, V, T, D>
485where
486    K: Debug + Codec,
487    V: Debug + Codec,
488    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
489    D: Monoid + Codec64 + Send + Sync,
490{
491    /// Fetches the contents of `part` and returns its lease.
492    ///
493    /// This is broken out into its own function to provide a trivial means for
494    /// [`Subscribe`], which contains a [`Listen`], to fetch batches.
495    async fn fetch_batch_part(&mut self, part: LeasedBatchPart<T>) -> FetchedPart<K, V, T, D> {
496        let fetched_part = fetch_leased_part(
497            &self.handle.cfg,
498            &part,
499            self.handle.blob.as_ref(),
500            Arc::clone(&self.handle.metrics),
501            &self.handle.metrics.read.listen,
502            &self.handle.machine.applier.shard_metrics,
503            &self.handle.reader_id,
504            self.handle.read_schemas.clone(),
505            &mut self.handle.schema_cache,
506        )
507        .await;
508        fetched_part
509    }
510
511    /// Politely expires this listen, releasing its lease.
512    ///
513    /// There is a best-effort impl in to expire the
514    /// [`ReadHandle`] held by the listen that wasn't explicitly expired with
515    /// this method. When possible, explicit expiry is still preferred because
516    /// it also ensures that the background task is complete.
517    pub async fn expire(self) {
518        self.handle.expire().await
519    }
520}
521
522/// The state for read holds shared between the heartbeat task and the main client,
523/// including the since frontier and the seqno hold.
524#[derive(Debug)]
525pub(crate) struct ReadHolds<T> {
526    /// The frontier we should hold the time-based lease back to.
527    held_since: Antichain<T>,
528    /// The since hold we've actually committed to state. Should always be <=
529    /// the since hold.
530    applied_since: Antichain<T>,
531    /// The largest seqno we've observed in the state.
532    recent_seqno: SeqNo,
533    /// The set of active leases. We hold back the seqno to the minimum lease or
534    /// the recent_seqno, whichever is earlier.
535    leases: BTreeMap<SeqNo, Lease>,
536    /// True iff this state is expired.
537    expired: bool,
538    /// Used to trigger the background task to heartbeat state, instead of waiting for
539    /// the next tick.
540    request_sync: bool,
541}
542
543impl<T> ReadHolds<T>
544where
545    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
546{
547    pub fn downgrade_since(&mut self, since: &Antichain<T>) {
548        self.held_since.join_assign(since);
549    }
550
551    pub fn observe_seqno(&mut self, seqno: SeqNo) {
552        self.recent_seqno = seqno.max(self.recent_seqno);
553    }
554
555    pub fn lease_seqno(&mut self) -> Lease {
556        let seqno = self.recent_seqno;
557        let lease = self
558            .leases
559            .entry(seqno)
560            .or_insert_with(|| Lease::new(seqno));
561        lease.clone()
562    }
563
564    pub fn outstanding_seqno(&mut self) -> SeqNo {
565        while let Some(first) = self.leases.first_entry() {
566            if first.get().count() <= 1 {
567                first.remove();
568            } else {
569                return *first.key();
570            }
571        }
572        self.recent_seqno
573    }
574}
575
576/// A "capability" granting the ability to read the state of some shard at times
577/// greater or equal to `self.since()`.
578///
579/// Production users should call [Self::expire] before dropping a ReadHandle so
580/// that it can expire its leases. If/when rust gets AsyncDrop, this will be
581/// done automatically.
582///
583/// All async methods on ReadHandle retry for as long as they are able, but the
584/// returned [std::future::Future]s implement "cancel on drop" semantics. This
585/// means that callers can add a timeout using [tokio::time::timeout] or
586/// [tokio::time::timeout_at].
587///
588/// ```rust,no_run
589/// # let mut read: mz_persist_client::read::ReadHandle<String, String, u64, i64> = unimplemented!();
590/// # let timeout: std::time::Duration = unimplemented!();
591/// # let new_since: timely::progress::Antichain<u64> = unimplemented!();
592/// # async {
593/// tokio::time::timeout(timeout, read.downgrade_since(&new_since)).await
594/// # };
595/// ```
596#[derive(Debug)]
597pub struct ReadHandle<K: Codec, V: Codec, T, D> {
598    pub(crate) cfg: PersistConfig,
599    pub(crate) metrics: Arc<Metrics>,
600    pub(crate) machine: Machine<K, V, T, D>,
601    pub(crate) gc: GarbageCollector<K, V, T, D>,
602    pub(crate) blob: Arc<dyn Blob>,
603    watch: StateWatch<K, V, T, D>,
604
605    pub(crate) reader_id: LeasedReaderId,
606    pub(crate) read_schemas: Schemas<K, V>,
607    pub(crate) schema_cache: SchemaCache<K, V, T, D>,
608
609    since: Antichain<T>,
610    pub(crate) hold_state: AwaitableState<ReadHolds<T>>,
611    pub(crate) unexpired_state: Option<UnexpiredReadHandleState>,
612}
613
614/// Length of time after a reader's last operation after which the reader may be
615/// expired.
616pub(crate) const READER_LEASE_DURATION: Config<Duration> = Config::new(
617    "persist_reader_lease_duration",
618    Duration::from_secs(60 * 15),
619    "The time after which we'll clean up stale read leases",
620);
621
622impl<K, V, T, D> ReadHandle<K, V, T, D>
623where
624    K: Debug + Codec,
625    V: Debug + Codec,
626    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
627    D: Monoid + Codec64 + Send + Sync,
628{
629    #[allow(clippy::unused_async)]
630    pub(crate) async fn new(
631        cfg: PersistConfig,
632        metrics: Arc<Metrics>,
633        machine: Machine<K, V, T, D>,
634        gc: GarbageCollector<K, V, T, D>,
635        blob: Arc<dyn Blob>,
636        reader_id: LeasedReaderId,
637        read_schemas: Schemas<K, V>,
638        state: LeasedReaderState<T>,
639    ) -> Self {
640        let schema_cache = machine.applier.schema_cache();
641        let hold_state = AwaitableState::new(ReadHolds {
642            held_since: state.since.clone(),
643            applied_since: state.since.clone(),
644            recent_seqno: state.seqno,
645            leases: Default::default(),
646            expired: false,
647            request_sync: false,
648        });
649        ReadHandle {
650            cfg,
651            metrics: Arc::clone(&metrics),
652            machine: machine.clone(),
653            gc: gc.clone(),
654            blob,
655            watch: machine.applier.watch(),
656            reader_id: reader_id.clone(),
657            read_schemas,
658            schema_cache,
659            since: state.since,
660            hold_state: hold_state.clone(),
661            unexpired_state: Some(UnexpiredReadHandleState {
662                heartbeat_task: Self::start_reader_heartbeat_task(
663                    machine, reader_id, gc, hold_state,
664                ),
665            }),
666        }
667    }
668
669    fn start_reader_heartbeat_task(
670        machine: Machine<K, V, T, D>,
671        reader_id: LeasedReaderId,
672        gc: GarbageCollector<K, V, T, D>,
673        leased_seqnos: AwaitableState<ReadHolds<T>>,
674    ) -> JoinHandle<()> {
675        let metrics = Arc::clone(&machine.applier.metrics);
676        let name = format!(
677            "persist::heartbeat_read({},{})",
678            machine.shard_id(),
679            reader_id
680        );
681        mz_ore::task::spawn(|| name, {
682            metrics.tasks.heartbeat_read.instrument_task(async move {
683                Self::reader_heartbeat_task(machine, reader_id, gc, leased_seqnos).await
684            })
685        })
686    }
687
688    async fn reader_heartbeat_task(
689        machine: Machine<K, V, T, D>,
690        reader_id: LeasedReaderId,
691        gc: GarbageCollector<K, V, T, D>,
692        leased_seqnos: AwaitableState<ReadHolds<T>>,
693    ) {
694        let sleep_duration = READER_LEASE_DURATION.get(&machine.applier.cfg) / 4;
695        // Jitter the first tick to avoid a thundering herd when many readers are started around
696        // the same instant, like during deploys.
697        let jitter: f64 = f64::cast_lossy(reader_id.hashed()) / f64::cast_lossy(u64::MAX);
698        let mut interval = tokio::time::interval_at(
699            tokio::time::Instant::now() + sleep_duration.mul_f64(jitter),
700            sleep_duration,
701        );
702        let mut held_since = leased_seqnos.read(|s| s.held_since.clone());
703        loop {
704            let before_sleep = Instant::now();
705            let _woke_by_tick = tokio::select! {
706                _tick = interval.tick() => {
707                    true
708                }
709                _whatever = leased_seqnos.wait_while(|s| !s.request_sync) => {
710                    false
711                }
712            };
713
714            let elapsed_since_before_sleeping = before_sleep.elapsed();
715            if elapsed_since_before_sleeping > sleep_duration + Duration::from_secs(60) {
716                warn!(
717                    "reader ({}) of shard ({}) went {}s between heartbeats",
718                    reader_id,
719                    machine.shard_id(),
720                    elapsed_since_before_sleeping.as_secs_f64()
721                );
722            }
723
724            let before_heartbeat = Instant::now();
725            let heartbeat_ms = (machine.applier.cfg.now)();
726            let current_seqno = machine.seqno();
727            let result = leased_seqnos.modify(|s| {
728                if s.expired {
729                    Err(())
730                } else {
731                    s.observe_seqno(current_seqno);
732                    s.request_sync = false;
733                    held_since.join_assign(&s.held_since);
734                    Ok(s.outstanding_seqno())
735                }
736            });
737            let actual_since = match result {
738                Ok(held_seqno) => {
739                    let (seqno, actual_since, maintenance) = machine
740                        .downgrade_since(&reader_id, held_seqno, &held_since, heartbeat_ms)
741                        .await;
742                    leased_seqnos.modify(|s| {
743                        s.applied_since.clone_from(&actual_since.0);
744                        s.observe_seqno(seqno)
745                    });
746                    maintenance.start_performing(&machine, &gc);
747                    actual_since
748                }
749                Err(()) => {
750                    let (seqno, maintenance) = machine.expire_leased_reader(&reader_id).await;
751                    leased_seqnos.modify(|s| s.observe_seqno(seqno));
752                    maintenance.start_performing(&machine, &gc);
753                    break;
754                }
755            };
756
757            let elapsed_since_heartbeat = before_heartbeat.elapsed();
758            if elapsed_since_heartbeat > Duration::from_secs(60) {
759                warn!(
760                    "reader ({}) of shard ({}) heartbeat call took {}s",
761                    reader_id,
762                    machine.shard_id(),
763                    elapsed_since_heartbeat.as_secs_f64(),
764                );
765            }
766
767            if PartialOrder::less_than(&held_since, &actual_since.0) {
768                // If the read handle was intentionally expired, this task
769                // *should* be aborted before it observes the expiration. So if
770                // we get here, this task somehow failed to keep the read lease
771                // alive. Warn loudly, because there's now a live read handle to
772                // an expired shard that will panic if used, but don't panic,
773                // just in case there is some edge case that results in this
774                // task observing the intentional expiration of a read handle.
775                warn!(
776                    "heartbeat task for reader ({}) of shard ({}) exiting due to expired lease \
777                     while read handle is live",
778                    reader_id,
779                    machine.shard_id(),
780                );
781                return;
782            }
783        }
784    }
785
786    /// This handle's shard id.
787    pub fn shard_id(&self) -> ShardId {
788        self.machine.shard_id()
789    }
790
791    /// This handle's `since` frontier.
792    ///
793    /// This will always be greater or equal to the shard-global `since`.
794    pub fn since(&self) -> &Antichain<T> {
795        &self.since
796    }
797
798    #[cfg(test)]
799    fn outstanding_seqno(&self) -> SeqNo {
800        let current_seqno = self.machine.seqno();
801        self.hold_state.modify(|s| {
802            s.observe_seqno(current_seqno);
803            s.outstanding_seqno()
804        })
805    }
806
807    /// Forwards the since frontier of this handle, giving up the ability to
808    /// read at times not greater or equal to `new_since`.
809    ///
810    /// This may trigger (asynchronous) compaction and consolidation in the
811    /// system. A `new_since` of the empty antichain "finishes" this shard,
812    /// promising that no more data will ever be read by this handle.
813    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
814    pub async fn downgrade_since(&mut self, new_since: &Antichain<T>) {
815        self.since = new_since.clone();
816        self.hold_state.modify(|s| {
817            s.downgrade_since(new_since);
818            s.request_sync = true;
819        });
820        self.hold_state
821            .wait_while(|s| PartialOrder::less_than(&s.applied_since, new_since))
822            .await;
823    }
824
825    /// Returns an ongoing subscription of updates to a shard.
826    ///
827    /// The stream includes all data at times greater than `as_of`. Combined
828    /// with [Self::snapshot] it will produce exactly correct results: the
829    /// snapshot is the TVCs contents at `as_of` and all subsequent updates
830    /// occur at exactly their indicated time. The recipient should only
831    /// downgrade their read capability when they are certain they have all data
832    /// through the frontier they would downgrade to.
833    ///
834    /// This takes ownership of the ReadHandle so the Listen can use it to
835    /// [Self::downgrade_since] as it progresses. If you need to keep this
836    /// handle, then [Self::clone] it before calling listen.
837    ///
838    /// The `Since` error indicates that the requested `as_of` cannot be served
839    /// (the caller has out of date information) and includes the smallest
840    /// `as_of` that would have been accepted.
841    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
842    pub async fn listen(self, as_of: Antichain<T>) -> Result<Listen<K, V, T, D>, Since<T>> {
843        Listen::new(self, as_of).await
844    }
845
846    async fn snapshot_batches(
847        &mut self,
848        as_of: Antichain<T>,
849    ) -> Result<(Lease, Vec<HollowBatch<T>>), Since<T>> {
850        self.machine
851            .wait_for_upper_past(
852                &as_of,
853                &mut self.watch,
854                Some(&self.reader_id),
855                &self.metrics.retries.snapshot,
856                RetryParameters::persist_defaults(),
857            )
858            .await;
859        let lease = self.lease_seqno().await;
860        let batches = match self.machine.applier.snapshot(&as_of) {
861            Ok(data) => data,
862            Err(SnapshotErr::AsOfHistoricalDistinctionsLost(since)) => return Err(since),
863            Err(SnapshotErr::AsOfNotYetAvailable(seqno, upper)) => {
864                panic!(
865                    "waited for upper past {as_of:?}, but at latest seqno {seqno:?} the frontier was only {upper:?}",
866                    as_of = as_of.elements(),
867                    upper = upper.0.elements(),
868                )
869            }
870        };
871        Ok((lease, batches))
872    }
873
874    /// Returns all of the contents of the shard TVC at `as_of` broken up into
875    /// [`LeasedBatchPart`]es. These parts can be "turned in" via
876    /// `crate::fetch::fetch_batch_part` to receive the data they contain.
877    ///
878    /// This command returns the contents of this shard as of `as_of` once they
879    /// are known. This may "block" (in an async-friendly way) if `as_of` is
880    /// greater or equal to the current `upper` of the shard. The recipient
881    /// should only downgrade their read capability when they are certain they
882    /// have all data through the frontier they would downgrade to.
883    ///
884    /// The `Since` error indicates that the requested `as_of` cannot be served
885    /// (the caller has out of date information) and includes the smallest
886    /// `as_of` that would have been accepted.
887    #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
888    pub async fn snapshot(
889        &mut self,
890        as_of: Antichain<T>,
891    ) -> Result<Vec<LeasedBatchPart<T>>, Since<T>> {
892        let (lease, batches) = self.snapshot_batches(as_of.clone()).await?;
893
894        if !PartialOrder::less_equal(self.since(), &as_of) {
895            return Err(Since(self.since().clone()));
896        }
897
898        let filter = FetchBatchFilter::Snapshot { as_of };
899        let mut leased_parts = Vec::new();
900        for batch in batches {
901            // Flatten the HollowBatch into one LeasedBatchPart per key. Each key
902            // corresponds to a "part" or s3 object. This allows persist_source
903            // to distribute work by parts (smallish, more even size) instead of
904            // batches (arbitrarily large).
905            leased_parts.extend(
906                self.lease_batch_parts(lease.clone(), batch, filter.clone())
907                    .collect::<Vec<_>>()
908                    .await,
909            );
910        }
911        Ok(leased_parts)
912    }
913
914    /// Returns a snapshot of all of a shard's data using `as_of`, followed by
915    /// listening to any future updates.
916    ///
917    /// For more details on this operation's semantics, see [Self::snapshot] and
918    /// [Self::listen].
919    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
920    pub async fn subscribe(
921        mut self,
922        as_of: Antichain<T>,
923    ) -> Result<Subscribe<K, V, T, D>, Since<T>> {
924        let snapshot_parts = self.snapshot(as_of.clone()).await?;
925        let listen = self.listen(as_of.clone()).await?;
926        Ok(Subscribe::new(snapshot_parts, listen))
927    }
928
929    fn lease_batch_parts(
930        &mut self,
931        lease: Lease,
932        batch: HollowBatch<T>,
933        filter: FetchBatchFilter<T>,
934    ) -> impl Stream<Item = LeasedBatchPart<T>> + '_ {
935        stream! {
936            let blob = Arc::clone(&self.blob);
937            let metrics = Arc::clone(&self.metrics);
938            let desc = batch.desc.clone();
939            for await part in batch.part_stream(self.shard_id(), &*blob, &*metrics) {
940                yield LeasedBatchPart {
941                    metrics: Arc::clone(&self.metrics),
942                    shard_id: self.machine.shard_id(),
943                    filter: filter.clone(),
944                    desc: desc.clone(),
945                    part: part.expect("leased part").into_owned(),
946                    lease: lease.clone(),
947                    filter_pushdown_audit: false,
948                }
949            }
950        }
951    }
952
953    /// Tracks that the `ReadHandle`'s machine's current `SeqNo` is being
954    /// "leased out" to a `LeasedBatchPart`. This ensures that until the reader is expired or the
955    /// [Lease] is dropped, we will hold onto all versions of the state with this sequence number
956    /// or larger. In particular, this means that any batches that are present in the state at
957    /// the leased seqno _or in any future state_ will be preserved as long as the lease is active.
958    ///
959    /// Note that this doesn't present batches from earlier versions of state from being GCed,
960    /// even if they were observed just before the lease call. Callers should generally take a lease
961    /// first, and then inspect state to find any batches to return or process.
962    async fn lease_seqno(&mut self) -> Lease {
963        let current_seqno = self.machine.seqno();
964        let lease = self.hold_state.modify(|s| {
965            s.observe_seqno(current_seqno);
966            s.lease_seqno()
967        });
968        // The seqno we've leased may be the seqno observed by our heartbeat task, which could be
969        // ahead of the last state we saw. Ensure we only observe states in the future of our hold.
970        // (Since these are backed by the same state in the same process, this should all be pretty
971        // fast.)
972        self.watch.wait_for_seqno_ge(lease.seqno()).await;
973        lease
974    }
975
976    /// Returns an independent [ReadHandle] with a new [LeasedReaderId] but the
977    /// same `since`.
978    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
979    pub async fn clone(&self, purpose: &str) -> Self {
980        let new_reader_id = LeasedReaderId::new();
981        let machine = self.machine.clone();
982        let gc = self.gc.clone();
983        let heartbeat_ts = (self.cfg.now)();
984        let (reader_state, maintenance) = machine
985            .register_leased_reader(
986                &new_reader_id,
987                purpose,
988                READER_LEASE_DURATION.get(&self.cfg),
989                heartbeat_ts,
990                false,
991            )
992            .await;
993        maintenance.start_performing(&machine, &gc);
994        // The point of clone is that you're guaranteed to have the same (or
995        // greater) since capability, verify that.
996        // TODO: better if it's the same since capability exactly.
997        assert!(PartialOrder::less_equal(&reader_state.since, &self.since));
998        let new_reader = ReadHandle::new(
999            self.cfg.clone(),
1000            Arc::clone(&self.metrics),
1001            machine,
1002            gc,
1003            Arc::clone(&self.blob),
1004            new_reader_id,
1005            self.read_schemas.clone(),
1006            reader_state,
1007        )
1008        .await;
1009        new_reader
1010    }
1011
1012    /// A rate-limited version of [Self::downgrade_since].
1013    ///
1014    /// Users can call this as frequently as they wish; changes will be periodically
1015    /// flushed to state by the heartbeat task.
1016    #[allow(clippy::unused_async)]
1017    pub async fn maybe_downgrade_since(&mut self, new_since: &Antichain<T>) {
1018        self.since = new_since.clone();
1019        self.hold_state.modify(|s| {
1020            s.downgrade_since(new_since);
1021        });
1022    }
1023
1024    /// Politely expires this reader, releasing its lease.
1025    ///
1026    /// There is a best-effort impl in Drop to expire a reader that wasn't
1027    /// explictly expired with this method. When possible, explicit expiry is
1028    /// still preferred because it also ensures that the background task is complete.
1029    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
1030    pub async fn expire(mut self) {
1031        self.hold_state.modify(|s| {
1032            s.expired = true;
1033            s.request_sync = true;
1034        });
1035        let Some(unexpired_state) = self.unexpired_state.take() else {
1036            return;
1037        };
1038        unexpired_state.heartbeat_task.await;
1039    }
1040
1041    /// Test helper for a [Self::listen] call that is expected to succeed.
1042    #[cfg(test)]
1043    #[track_caller]
1044    pub async fn expect_listen(self, as_of: T) -> Listen<K, V, T, D> {
1045        self.listen(Antichain::from_elem(as_of))
1046            .await
1047            .expect("cannot serve requested as_of")
1048    }
1049}
1050
1051/// State for a read handle that has not been explicitly expired.
1052#[derive(Debug)]
1053pub(crate) struct UnexpiredReadHandleState {
1054    pub(crate) heartbeat_task: JoinHandle<()>,
1055}
1056
1057/// An incremental cursor through a particular shard, returned from [ReadHandle::snapshot_cursor].
1058///
1059/// To read an entire dataset, the
1060/// client should call `next` until it returns `None`, which signals all data has been returned...
1061/// but it's also free to abandon the instance at any time if it eg. only needs a few entries.
1062#[derive(Debug)]
1063pub struct Cursor<K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64, L = Lease> {
1064    consolidator: Consolidator<T, D, StructuredSort<K, V, T, D>>,
1065    max_len: usize,
1066    max_bytes: usize,
1067    _lease: L,
1068    read_schemas: Schemas<K, V>,
1069}
1070
1071impl<K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64, L> Cursor<K, V, T, D, L> {
1072    /// Extracts and returns the lease from the cursor. Allowing the caller to
1073    /// do any necessary cleanup associated with the lease.
1074    pub fn into_lease(self: Self) -> L {
1075        self._lease
1076    }
1077}
1078
1079impl<K, V, T, D, L> Cursor<K, V, T, D, L>
1080where
1081    K: Debug + Codec + Ord,
1082    V: Debug + Codec + Ord,
1083    T: Timestamp + Lattice + Codec64 + Sync,
1084    D: Monoid + Ord + Codec64 + Send + Sync,
1085{
1086    /// Grab the next batch of consolidated data.
1087    pub async fn next(&mut self) -> Option<impl Iterator<Item = ((K, V), T, D)> + '_> {
1088        let Self {
1089            consolidator,
1090            max_len,
1091            max_bytes,
1092            _lease,
1093            read_schemas: _,
1094        } = self;
1095
1096        let part = consolidator
1097            .next_chunk(*max_len, *max_bytes)
1098            .await
1099            .expect("fetching a leased part")?;
1100        let key_decoder = self
1101            .read_schemas
1102            .key
1103            .decoder_any(part.key.as_ref())
1104            .expect("ok");
1105        let val_decoder = self
1106            .read_schemas
1107            .val
1108            .decoder_any(part.val.as_ref())
1109            .expect("ok");
1110        let iter = (0..part.len()).map(move |i| {
1111            let mut k = K::default();
1112            let mut v = V::default();
1113            key_decoder.decode(i, &mut k);
1114            val_decoder.decode(i, &mut v);
1115            let t = T::decode(part.time.value(i).to_le_bytes());
1116            let d = D::decode(part.diff.value(i).to_le_bytes());
1117            ((k, v), t, d)
1118        });
1119
1120        Some(iter)
1121    }
1122}
1123
1124impl<K, V, T, D> ReadHandle<K, V, T, D>
1125where
1126    K: Debug + Codec + Ord,
1127    V: Debug + Codec + Ord,
1128    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
1129    D: Monoid + Ord + Codec64 + Send + Sync,
1130{
1131    /// Generates a [Self::snapshot], and fetches all of the batches it
1132    /// contains.
1133    ///
1134    /// The output is consolidated. Furthermore, to keep memory usage down when
1135    /// reading a snapshot that consolidates well, this consolidates as it goes.
1136    ///
1137    /// Potential future improvements (if necessary):
1138    /// - Accept something like a `F: Fn(K,V) -> (K,V)` argument, which looks
1139    ///   like an MFP you might be pushing down. Reason being that if you are
1140    ///   projecting or transforming in a way that allows further consolidation,
1141    ///   amazing.
1142    /// - Reuse any code we write to streaming-merge consolidate in
1143    ///   persist_source here.
1144    pub async fn snapshot_and_fetch(
1145        &mut self,
1146        as_of: Antichain<T>,
1147    ) -> Result<Vec<((K, V), T, D)>, Since<T>> {
1148        let mut cursor = self.snapshot_cursor(as_of, |_| true).await?;
1149        let mut contents = Vec::new();
1150        while let Some(iter) = cursor.next().await {
1151            contents.extend(iter);
1152        }
1153
1154        // We don't currently guarantee that encoding is one-to-one, so we still need to
1155        // consolidate the decoded outputs. However, let's report if this isn't a noop.
1156        let old_len = contents.len();
1157        consolidate_updates(&mut contents);
1158        if old_len != contents.len() {
1159            // TODO(bkirwi): do we need more / finer-grained metrics for this?
1160            self.machine
1161                .applier
1162                .shard_metrics
1163                .unconsolidated_snapshot
1164                .inc();
1165        }
1166
1167        Ok(contents)
1168    }
1169
1170    /// Generates a [Self::snapshot], and fetches all of the batches it
1171    /// contains.
1172    ///
1173    /// To keep memory usage down when reading a snapshot that consolidates well, this consolidates
1174    /// as it goes. However, note that only the serialized data is consolidated: the deserialized
1175    /// data will only be consolidated if your K/V codecs are one-to-one.
1176    pub async fn snapshot_cursor(
1177        &mut self,
1178        as_of: Antichain<T>,
1179        should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
1180    ) -> Result<Cursor<K, V, T, D>, Since<T>> {
1181        let (lease, batches) = self.snapshot_batches(as_of.clone()).await?;
1182
1183        Self::read_batches_consolidated(
1184            &self.cfg,
1185            Arc::clone(&self.metrics),
1186            Arc::clone(&self.machine.applier.shard_metrics),
1187            self.metrics.read.snapshot.clone(),
1188            Arc::clone(&self.blob),
1189            self.shard_id(),
1190            as_of,
1191            self.read_schemas.clone(),
1192            &batches,
1193            lease,
1194            should_fetch_part,
1195            COMPACTION_MEMORY_BOUND_BYTES.get(&self.cfg),
1196        )
1197    }
1198
1199    pub(crate) fn read_batches_consolidated<L>(
1200        persist_cfg: &PersistConfig,
1201        metrics: Arc<Metrics>,
1202        shard_metrics: Arc<ShardMetrics>,
1203        read_metrics: ReadMetrics,
1204        blob: Arc<dyn Blob>,
1205        shard_id: ShardId,
1206        as_of: Antichain<T>,
1207        schemas: Schemas<K, V>,
1208        batches: &[HollowBatch<T>],
1209        lease: L,
1210        should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
1211        memory_budget_bytes: usize,
1212    ) -> Result<Cursor<K, V, T, D, L>, Since<T>> {
1213        let context = format!("{}[as_of={:?}]", shard_id, as_of.elements());
1214        let filter = FetchBatchFilter::Snapshot {
1215            as_of: as_of.clone(),
1216        };
1217
1218        let mut consolidator = Consolidator::new(
1219            context,
1220            FetchConfig::from_persist_config(persist_cfg),
1221            shard_id,
1222            StructuredSort::new(schemas.clone()),
1223            blob,
1224            metrics,
1225            shard_metrics,
1226            read_metrics,
1227            filter,
1228            None,
1229            memory_budget_bytes,
1230        );
1231        for batch in batches {
1232            for (meta, run) in batch.runs() {
1233                consolidator.enqueue_run(
1234                    &batch.desc,
1235                    meta,
1236                    run.into_iter()
1237                        .filter(|p| should_fetch_part(p.stats()))
1238                        .cloned(),
1239                );
1240            }
1241        }
1242        // This default may end up consolidating more records than previously
1243        // for cases like fast-path peeks, where only the first few entries are used.
1244        // If this is a noticeable performance impact, thread the max-len in from the caller.
1245        let max_len = persist_cfg.compaction_yield_after_n_updates;
1246        let max_bytes = BLOB_TARGET_SIZE.get(persist_cfg).max(1);
1247
1248        Ok(Cursor {
1249            consolidator,
1250            max_len,
1251            max_bytes,
1252            _lease: lease,
1253            read_schemas: schemas,
1254        })
1255    }
1256
1257    /// Returns aggregate statistics about the contents of the shard TVC at the
1258    /// given frontier.
1259    ///
1260    /// This command returns the contents of this shard as of `as_of` once they
1261    /// are known. This may "block" (in an async-friendly way) if `as_of` is
1262    /// greater or equal to the current `upper` of the shard. If `None` is given
1263    /// for `as_of`, then the latest stats known by this process are used.
1264    ///
1265    /// The `Since` error indicates that the requested `as_of` cannot be served
1266    /// (the caller has out of date information) and includes the smallest
1267    /// `as_of` that would have been accepted.
1268    pub fn snapshot_stats(
1269        &self,
1270        as_of: Option<Antichain<T>>,
1271    ) -> impl Future<Output = Result<SnapshotStats, Since<T>>> + Send + 'static {
1272        let machine = self.machine.clone();
1273        async move {
1274            let batches = match as_of {
1275                Some(as_of) => machine.unleased_snapshot(&as_of).await?,
1276                None => machine.applier.all_batches(),
1277            };
1278            let num_updates = batches.iter().map(|b| b.len).sum();
1279            Ok(SnapshotStats {
1280                shard_id: machine.shard_id(),
1281                num_updates,
1282            })
1283        }
1284    }
1285
1286    /// Returns aggregate statistics about the contents of the shard TVC at the
1287    /// given frontier.
1288    ///
1289    /// This command returns the contents of this shard as of `as_of` once they
1290    /// are known. This may "block" (in an async-friendly way) if `as_of` is
1291    /// greater or equal to the current `upper` of the shard.
1292    ///
1293    /// The `Since` error indicates that the requested `as_of` cannot be served
1294    /// (the caller has out of date information) and includes the smallest
1295    /// `as_of` that would have been accepted.
1296    pub async fn snapshot_parts_stats(
1297        &self,
1298        as_of: Antichain<T>,
1299    ) -> Result<SnapshotPartsStats, Since<T>> {
1300        let batches = self.machine.unleased_snapshot(&as_of).await?;
1301        let parts = stream::iter(&batches)
1302            .flat_map(|b| b.part_stream(self.shard_id(), &*self.blob, &*self.metrics))
1303            .map(|p| {
1304                let p = p.expect("live batch");
1305                SnapshotPartStats {
1306                    encoded_size_bytes: p.encoded_size_bytes(),
1307                    stats: p.stats().cloned(),
1308                }
1309            })
1310            .collect()
1311            .await;
1312        Ok(SnapshotPartsStats {
1313            metrics: Arc::clone(&self.machine.applier.metrics),
1314            shard_id: self.machine.shard_id(),
1315            parts,
1316        })
1317    }
1318}
1319
1320impl<K, V, T, D> ReadHandle<K, V, T, D>
1321where
1322    K: Debug + Codec + Ord,
1323    V: Debug + Codec + Ord,
1324    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
1325    D: Monoid + Codec64 + Send + Sync,
1326{
1327    /// Generates a [Self::snapshot], and streams out all of the updates
1328    /// it contains in bounded memory.
1329    ///
1330    /// The output is not consolidated.
1331    pub async fn snapshot_and_stream(
1332        &mut self,
1333        as_of: Antichain<T>,
1334    ) -> Result<impl Stream<Item = ((K, V), T, D)> + use<K, V, T, D>, Since<T>> {
1335        let snap = self.snapshot(as_of).await?;
1336
1337        let blob = Arc::clone(&self.blob);
1338        let metrics = Arc::clone(&self.metrics);
1339        let snapshot_metrics = self.metrics.read.snapshot.clone();
1340        let shard_metrics = Arc::clone(&self.machine.applier.shard_metrics);
1341        let reader_id = self.reader_id.clone();
1342        let schemas = self.read_schemas.clone();
1343        let mut schema_cache = self.schema_cache.clone();
1344        let persist_cfg = self.cfg.clone();
1345        let stream = async_stream::stream! {
1346            for part in snap {
1347                let mut fetched_part = fetch_leased_part(
1348                    &persist_cfg,
1349                    &part,
1350                    blob.as_ref(),
1351                    Arc::clone(&metrics),
1352                    &snapshot_metrics,
1353                    &shard_metrics,
1354                    &reader_id,
1355                    schemas.clone(),
1356                    &mut schema_cache,
1357                )
1358                .await;
1359
1360                while let Some(next) = fetched_part.next() {
1361                    yield next;
1362                }
1363            }
1364        };
1365
1366        Ok(stream)
1367    }
1368}
1369
1370impl<K, V, T, D> ReadHandle<K, V, T, D>
1371where
1372    K: Debug + Codec + Ord,
1373    V: Debug + Codec + Ord,
1374    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
1375    D: Monoid + Ord + Codec64 + Send + Sync,
1376{
1377    /// Test helper to generate a [Self::snapshot] call that is expected to
1378    /// succeed, process its batches, and then return its data sorted.
1379    #[cfg(test)]
1380    #[track_caller]
1381    pub async fn expect_snapshot_and_fetch(&mut self, as_of: T) -> Vec<((K, V), T, D)> {
1382        let mut ret = self
1383            .snapshot_and_fetch(Antichain::from_elem(as_of))
1384            .await
1385            .expect("cannot serve requested as_of");
1386
1387        ret.sort();
1388        ret
1389    }
1390}
1391
1392impl<K: Codec, V: Codec, T, D> Drop for ReadHandle<K, V, T, D> {
1393    fn drop(&mut self) {
1394        self.hold_state.modify(|s| {
1395            s.expired = true;
1396            s.request_sync = true;
1397        });
1398    }
1399}
1400
1401#[cfg(test)]
1402mod tests {
1403    use std::pin;
1404    use std::str::FromStr;
1405
1406    use mz_dyncfg::ConfigUpdates;
1407    use mz_ore::cast::CastFrom;
1408    use mz_ore::metrics::MetricsRegistry;
1409    use mz_persist::mem::{MemBlob, MemBlobConfig, MemConsensus};
1410    use mz_persist::unreliable::{UnreliableConsensus, UnreliableHandle};
1411    use serde::{Deserialize, Serialize};
1412    use serde_json::json;
1413    use tokio_stream::StreamExt;
1414
1415    use crate::async_runtime::IsolatedRuntime;
1416    use crate::batch::BLOB_TARGET_SIZE;
1417    use crate::cache::StateCache;
1418    use crate::internal::metrics::Metrics;
1419    use crate::rpc::NoopPubSubSender;
1420    use crate::tests::{all_ok, new_test_client};
1421    use crate::{Diagnostics, PersistClient, PersistConfig, ShardId};
1422
1423    use super::*;
1424
1425    // Verifies `Subscribe` can be dropped while holding snapshot batches.
1426    #[mz_persist_proc::test(tokio::test)]
1427    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1428    async fn drop_unused_subscribe(dyncfgs: ConfigUpdates) {
1429        let data = [
1430            (("0".to_owned(), "zero".to_owned()), 0, 1),
1431            (("1".to_owned(), "one".to_owned()), 1, 1),
1432            (("2".to_owned(), "two".to_owned()), 2, 1),
1433        ];
1434
1435        let (mut write, read) = new_test_client(&dyncfgs)
1436            .await
1437            .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1438            .await;
1439
1440        write.expect_compare_and_append(&data[0..1], 0, 1).await;
1441        write.expect_compare_and_append(&data[1..2], 1, 2).await;
1442        write.expect_compare_and_append(&data[2..3], 2, 3).await;
1443
1444        let subscribe = read
1445            .subscribe(timely::progress::Antichain::from_elem(2))
1446            .await
1447            .unwrap();
1448        assert!(
1449            !subscribe.snapshot.as_ref().unwrap().is_empty(),
1450            "snapshot must have batches for test to be meaningful"
1451        );
1452        drop(subscribe);
1453    }
1454
1455    // Verifies that we streaming-consolidate away identical key-values in the same batch.
1456    #[mz_persist_proc::test(tokio::test)]
1457    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1458    async fn streaming_consolidate(dyncfgs: ConfigUpdates) {
1459        let data = &[
1460            // Identical records should sum together...
1461            (("k".to_owned(), "v".to_owned()), 0, 1),
1462            (("k".to_owned(), "v".to_owned()), 1, 1),
1463            (("k".to_owned(), "v".to_owned()), 2, 1),
1464            // ...and when they cancel out entirely they should be omitted.
1465            (("k2".to_owned(), "v".to_owned()), 0, 1),
1466            (("k2".to_owned(), "v".to_owned()), 1, -1),
1467        ];
1468
1469        let (mut write, read) = {
1470            let client = new_test_client(&dyncfgs).await;
1471            client.cfg.set_config(&BLOB_TARGET_SIZE, 1000); // So our batch stays together!
1472            client
1473                .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1474                .await
1475        };
1476
1477        write.expect_compare_and_append(data, 0, 5).await;
1478
1479        let mut snapshot = read
1480            .subscribe(timely::progress::Antichain::from_elem(4))
1481            .await
1482            .unwrap();
1483
1484        let mut updates = vec![];
1485        'outer: loop {
1486            for event in snapshot.fetch_next().await {
1487                match event {
1488                    ListenEvent::Progress(t) => {
1489                        if !t.less_than(&4) {
1490                            break 'outer;
1491                        }
1492                    }
1493                    ListenEvent::Updates(data) => {
1494                        updates.extend(data);
1495                    }
1496                }
1497            }
1498        }
1499        assert_eq!(updates, &[(("k".to_owned(), "v".to_owned()), 4u64, 3i64)],)
1500    }
1501
1502    #[mz_persist_proc::test(tokio::test)]
1503    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1504    async fn snapshot_and_stream(dyncfgs: ConfigUpdates) {
1505        let data = &mut [
1506            (("k1".to_owned(), "v1".to_owned()), 0, 1),
1507            (("k2".to_owned(), "v2".to_owned()), 1, 1),
1508            (("k3".to_owned(), "v3".to_owned()), 2, 1),
1509            (("k4".to_owned(), "v4".to_owned()), 2, 1),
1510            (("k5".to_owned(), "v5".to_owned()), 3, 1),
1511        ];
1512
1513        let (mut write, mut read) = {
1514            let client = new_test_client(&dyncfgs).await;
1515            client.cfg.set_config(&BLOB_TARGET_SIZE, 0); // split batches across multiple parts
1516            client
1517                .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1518                .await
1519        };
1520
1521        write.expect_compare_and_append(&data[0..2], 0, 2).await;
1522        write.expect_compare_and_append(&data[2..4], 2, 3).await;
1523        write.expect_compare_and_append(&data[4..], 3, 4).await;
1524
1525        let as_of = Antichain::from_elem(3);
1526        let mut snapshot = pin::pin!(read.snapshot_and_stream(as_of.clone()).await.unwrap());
1527
1528        let mut snapshot_rows = vec![];
1529        while let Some(((k, v), t, d)) = snapshot.next().await {
1530            snapshot_rows.push(((k, v), t, d));
1531        }
1532
1533        for ((_k, _v), t, _d) in data.as_mut_slice() {
1534            t.advance_by(as_of.borrow());
1535        }
1536
1537        assert_eq!(data.as_slice(), snapshot_rows.as_slice());
1538    }
1539
1540    // Verifies the semantics of `SeqNo` leases + checks dropping `LeasedBatchPart` semantics.
1541    #[mz_persist_proc::test(tokio::test)]
1542    #[cfg_attr(miri, ignore)] // https://github.com/MaterializeInc/database-issues/issues/5964
1543    async fn seqno_leases(dyncfgs: ConfigUpdates) {
1544        let mut data = vec![];
1545        for i in 0..20 {
1546            data.push(((i.to_string(), i.to_string()), i, 1))
1547        }
1548
1549        let shard_id = ShardId::new();
1550
1551        let client = new_test_client(&dyncfgs).await;
1552        let (mut write, read) = client
1553            .expect_open::<String, String, u64, i64>(shard_id)
1554            .await;
1555
1556        // Seed with some values
1557        let mut offset = 0;
1558        let mut width = 2;
1559
1560        for i in offset..offset + width {
1561            write
1562                .expect_compare_and_append(
1563                    &data[i..i + 1],
1564                    u64::cast_from(i),
1565                    u64::cast_from(i) + 1,
1566                )
1567                .await;
1568        }
1569        offset += width;
1570
1571        // Create machinery for subscribe + fetch
1572        let mut fetcher = client
1573            .create_batch_fetcher::<String, String, u64, i64>(
1574                shard_id,
1575                Default::default(),
1576                Default::default(),
1577                false,
1578                Diagnostics::for_tests(),
1579            )
1580            .await
1581            .unwrap();
1582
1583        let mut subscribe = read
1584            .subscribe(timely::progress::Antichain::from_elem(1))
1585            .await
1586            .expect("cannot serve requested as_of");
1587
1588        // Determine the sequence number held by our subscribe.
1589        let original_seqno_since = subscribe.listen.handle.outstanding_seqno();
1590        if let Some(snapshot) = &subscribe.snapshot {
1591            for part in snapshot {
1592                assert!(
1593                    part.lease.seqno() >= original_seqno_since,
1594                    "our seqno hold must cover all parts"
1595                );
1596            }
1597        }
1598
1599        let mut parts = vec![];
1600
1601        width = 4;
1602        // Collect parts while continuing to write values
1603        for i in offset..offset + width {
1604            for event in subscribe.next(None).await {
1605                if let ListenEvent::Updates(mut new_parts) = event {
1606                    parts.append(&mut new_parts);
1607                    // Here and elsewhere we "cheat" and immediately downgrade the since
1608                    // to demonstrate the effects of SeqNo leases immediately.
1609                    subscribe
1610                        .listen
1611                        .handle
1612                        .downgrade_since(&subscribe.listen.since)
1613                        .await;
1614                }
1615            }
1616
1617            write
1618                .expect_compare_and_append(
1619                    &data[i..i + 1],
1620                    u64::cast_from(i),
1621                    u64::cast_from(i) + 1,
1622                )
1623                .await;
1624
1625            // SeqNo is not downgraded
1626            assert_eq!(
1627                subscribe.listen.handle.machine.applier.seqno_since(),
1628                original_seqno_since
1629            );
1630        }
1631
1632        offset += width;
1633
1634        let mut seqno_since = subscribe.listen.handle.machine.applier.seqno_since();
1635
1636        // We're starting out with the original, non-downgraded SeqNo
1637        assert_eq!(seqno_since, original_seqno_since);
1638
1639        // We have to handle the parts we generate during the next loop to
1640        // ensure they don't panic.
1641        let mut subsequent_parts = vec![];
1642
1643        // Ensure monotonicity of seqnos we're processing, otherwise the
1644        // invariant we're testing (returning the last part of a seqno will
1645        // downgrade its since) will not hold.
1646        let mut this_seqno = SeqNo::minimum();
1647
1648        // Repeat the same process as above, more or less, while fetching + returning parts
1649        for (mut i, part) in parts.into_iter().enumerate() {
1650            let part_seqno = part.lease.seqno();
1651            let last_seqno = this_seqno;
1652            this_seqno = part_seqno;
1653            assert!(this_seqno >= last_seqno);
1654
1655            let (part, lease) = part.into_exchangeable_part();
1656            let _ = fetcher.fetch_leased_part(part).await;
1657            drop(lease);
1658
1659            // Simulates an exchange
1660            for event in subscribe.next(None).await {
1661                if let ListenEvent::Updates(parts) = event {
1662                    for part in parts {
1663                        let (_, lease) = part.into_exchangeable_part();
1664                        subsequent_parts.push(lease);
1665                    }
1666                }
1667            }
1668
1669            subscribe
1670                .listen
1671                .handle
1672                .downgrade_since(&subscribe.listen.since)
1673                .await;
1674
1675            // Write more new values
1676            i += offset;
1677            write
1678                .expect_compare_and_append(
1679                    &data[i..i + 1],
1680                    u64::cast_from(i),
1681                    u64::cast_from(i) + 1,
1682                )
1683                .await;
1684
1685            // We should expect the SeqNo to be downgraded if this part's SeqNo
1686            // is no longer leased to any other parts, either.
1687            let expect_downgrade = subscribe.listen.handle.outstanding_seqno() > part_seqno;
1688
1689            let new_seqno_since = subscribe.listen.handle.machine.applier.seqno_since();
1690            if expect_downgrade {
1691                assert!(new_seqno_since > seqno_since);
1692            } else {
1693                assert_eq!(new_seqno_since, seqno_since);
1694            }
1695            seqno_since = new_seqno_since;
1696        }
1697
1698        // SeqNo since was downgraded
1699        assert!(seqno_since > original_seqno_since);
1700
1701        // Return any outstanding parts, to prevent a panic!
1702        drop(subsequent_parts);
1703        drop(subscribe);
1704    }
1705
1706    #[mz_ore::test]
1707    fn reader_id_human_readable_serde() {
1708        #[derive(Debug, Serialize, Deserialize)]
1709        struct Container {
1710            reader_id: LeasedReaderId,
1711        }
1712
1713        // roundtrip through json
1714        let id =
1715            LeasedReaderId::from_str("r00000000-1234-5678-0000-000000000000").expect("valid id");
1716        assert_eq!(
1717            id,
1718            serde_json::from_value(serde_json::to_value(id.clone()).expect("serializable"))
1719                .expect("deserializable")
1720        );
1721
1722        // deserialize a serialized string directly
1723        assert_eq!(
1724            id,
1725            serde_json::from_str("\"r00000000-1234-5678-0000-000000000000\"")
1726                .expect("deserializable")
1727        );
1728
1729        // roundtrip id through a container type
1730        let json = json!({ "reader_id": id });
1731        assert_eq!(
1732            "{\"reader_id\":\"r00000000-1234-5678-0000-000000000000\"}",
1733            &json.to_string()
1734        );
1735        let container: Container = serde_json::from_value(json).expect("deserializable");
1736        assert_eq!(container.reader_id, id);
1737    }
1738
1739    // Verifies performance optimizations where a Listener doesn't fetch the
1740    // latest Consensus state if the one it currently has can serve the next
1741    // request.
1742    #[mz_ore::test(tokio::test)]
1743    #[cfg_attr(miri, ignore)] // too slow
1744    async fn skip_consensus_fetch_optimization() {
1745        let data = vec![
1746            (("0".to_owned(), "zero".to_owned()), 0, 1),
1747            (("1".to_owned(), "one".to_owned()), 1, 1),
1748            (("2".to_owned(), "two".to_owned()), 2, 1),
1749        ];
1750
1751        let cfg = PersistConfig::new_for_tests();
1752        let blob = Arc::new(MemBlob::open(MemBlobConfig::default()));
1753        let consensus = Arc::new(MemConsensus::default());
1754        let unreliable = UnreliableHandle::default();
1755        unreliable.totally_available();
1756        let consensus = Arc::new(UnreliableConsensus::new(consensus, unreliable.clone()));
1757        let metrics = Arc::new(Metrics::new(&cfg, &MetricsRegistry::new()));
1758        let pubsub_sender = Arc::new(NoopPubSubSender);
1759        let (mut write, mut read) = PersistClient::new(
1760            cfg,
1761            blob,
1762            consensus,
1763            metrics,
1764            Arc::new(IsolatedRuntime::new_for_tests()),
1765            Arc::new(StateCache::new_no_metrics()),
1766            pubsub_sender,
1767        )
1768        .expect("client construction failed")
1769        .expect_open::<String, String, u64, i64>(ShardId::new())
1770        .await;
1771
1772        write.expect_compare_and_append(&data[0..1], 0, 1).await;
1773        write.expect_compare_and_append(&data[1..2], 1, 2).await;
1774        write.expect_compare_and_append(&data[2..3], 2, 3).await;
1775
1776        let snapshot = read.expect_snapshot_and_fetch(2).await;
1777        let mut listen = read.expect_listen(0).await;
1778
1779        // Manually advance the listener's machine so that it has the latest
1780        // state by fetching the first events from next. This is awkward but
1781        // only necessary because we're about to do some weird things with
1782        // unreliable.
1783        let listen_actual = listen.fetch_next().await;
1784        let expected_events = vec![ListenEvent::Progress(Antichain::from_elem(1))];
1785        assert_eq!(listen_actual, expected_events);
1786
1787        // At this point, the snapshot and listen's state should have all the
1788        // writes. Test this by making consensus completely unavailable.
1789        unreliable.totally_unavailable();
1790        assert_eq!(snapshot, all_ok(&data, 2));
1791        assert_eq!(
1792            listen.read_until(&3).await,
1793            (all_ok(&data[1..], 1), Antichain::from_elem(3))
1794        );
1795    }
1796}