Skip to main content

mz_persist_client/
read.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Read capabilities and handles
11
12use async_stream::stream;
13use std::collections::BTreeMap;
14use std::fmt::Debug;
15use std::future::Future;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use differential_dataflow::Hashable;
20use differential_dataflow::consolidation::consolidate_updates;
21use differential_dataflow::difference::Monoid;
22use differential_dataflow::lattice::Lattice;
23use futures::Stream;
24use futures_util::{StreamExt, stream};
25use mz_dyncfg::Config;
26use mz_ore::cast::CastLossy;
27use mz_ore::halt;
28use mz_ore::instrument;
29use mz_ore::task::JoinHandle;
30use mz_persist::location::{Blob, SeqNo};
31use mz_persist_types::columnar::{ColumnDecoder, Schema};
32use mz_persist_types::{Codec, Codec64};
33use proptest_derive::Arbitrary;
34use serde::{Deserialize, Serialize};
35use timely::PartialOrder;
36use timely::order::TotalOrder;
37use timely::progress::{Antichain, Timestamp};
38use tracing::warn;
39use uuid::Uuid;
40
41use crate::batch::BLOB_TARGET_SIZE;
42use crate::cfg::{COMPACTION_MEMORY_BOUND_BYTES, RetryParameters};
43use crate::fetch::FetchConfig;
44use crate::fetch::{FetchBatchFilter, FetchedPart, Lease, LeasedBatchPart, fetch_leased_part};
45use crate::internal::encoding::Schemas;
46use crate::internal::machine::Machine;
47use crate::internal::metrics::{Metrics, ReadMetrics, ShardMetrics};
48use crate::internal::state::{HollowBatch, LeasedReaderState};
49use crate::internal::watch::{AwaitableState, StateWatch};
50use crate::iter::{Consolidator, StructuredSort};
51use crate::schema::SchemaCache;
52use crate::stats::{SnapshotPartStats, SnapshotPartsStats, SnapshotStats};
53use crate::{GarbageCollector, PersistConfig, ShardId, parse_id};
54
55pub use crate::internal::encoding::LazyPartStats;
56pub use crate::internal::state::Since;
57
58/// An opaque identifier for a reader of a persist durable TVC (aka shard).
59#[derive(
60    Arbitrary,
61    Clone,
62    PartialEq,
63    Eq,
64    PartialOrd,
65    Ord,
66    Hash,
67    Serialize,
68    Deserialize
69)]
70#[serde(try_from = "String", into = "String")]
71pub struct LeasedReaderId(pub(crate) [u8; 16]);
72
73impl std::fmt::Display for LeasedReaderId {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        write!(f, "r{}", Uuid::from_bytes(self.0))
76    }
77}
78
79impl std::fmt::Debug for LeasedReaderId {
80    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81        write!(f, "LeasedReaderId({})", Uuid::from_bytes(self.0))
82    }
83}
84
85impl std::str::FromStr for LeasedReaderId {
86    type Err = String;
87
88    fn from_str(s: &str) -> Result<Self, Self::Err> {
89        parse_id("r", "LeasedReaderId", s).map(LeasedReaderId)
90    }
91}
92
93impl From<LeasedReaderId> for String {
94    fn from(reader_id: LeasedReaderId) -> Self {
95        reader_id.to_string()
96    }
97}
98
99impl TryFrom<String> for LeasedReaderId {
100    type Error = String;
101
102    fn try_from(s: String) -> Result<Self, Self::Error> {
103        s.parse()
104    }
105}
106
107impl LeasedReaderId {
108    pub(crate) fn new() -> Self {
109        LeasedReaderId(*Uuid::new_v4().as_bytes())
110    }
111}
112
113/// Capable of generating a snapshot of all data at `as_of`, followed by a
114/// listen of all updates.
115///
116/// For more details, see [`ReadHandle::snapshot`] and [`Listen`].
117#[derive(Debug)]
118pub struct Subscribe<K: Codec, V: Codec, T, D> {
119    snapshot: Option<Vec<LeasedBatchPart<T>>>,
120    listen: Listen<K, V, T, D>,
121}
122
123impl<K, V, T, D> Subscribe<K, V, T, D>
124where
125    K: Debug + Codec,
126    V: Debug + Codec,
127    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
128    D: Monoid + Codec64 + Send + Sync,
129{
130    fn new(snapshot_parts: Vec<LeasedBatchPart<T>>, listen: Listen<K, V, T, D>) -> Self {
131        Subscribe {
132            snapshot: Some(snapshot_parts),
133            listen,
134        }
135    }
136
137    /// Returns a `LeasedBatchPart` enriched with the proper metadata.
138    ///
139    /// First returns snapshot parts, until they're exhausted, at which point
140    /// begins returning listen parts.
141    ///
142    /// The returned `Antichain` represents the subscription progress as it will
143    /// be _after_ the returned parts are fetched.
144    #[instrument(level = "debug", fields(shard = %self.listen.handle.machine.shard_id()))]
145    pub async fn next(
146        &mut self,
147        // If Some, an override for the default listen sleep retry parameters.
148        listen_retry: Option<RetryParameters>,
149    ) -> Vec<ListenEvent<T, LeasedBatchPart<T>>> {
150        match self.snapshot.take() {
151            Some(parts) => vec![ListenEvent::Updates(parts)],
152            None => {
153                let (parts, upper) = self.listen.next(listen_retry).await;
154                vec![ListenEvent::Updates(parts), ListenEvent::Progress(upper)]
155            }
156        }
157    }
158}
159
160impl<K, V, T, D> Subscribe<K, V, T, D>
161where
162    K: Debug + Codec,
163    V: Debug + Codec,
164    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
165    D: Monoid + Codec64 + Send + Sync,
166{
167    /// Equivalent to `next`, but rather than returning a [`LeasedBatchPart`],
168    /// fetches and returns the data from within it.
169    #[instrument(level = "debug", fields(shard = %self.listen.handle.machine.shard_id()))]
170    pub async fn fetch_next(&mut self) -> Vec<ListenEvent<T, ((K, V), T, D)>> {
171        let events = self.next(None).await;
172        let new_len = events
173            .iter()
174            .map(|event| match event {
175                ListenEvent::Updates(parts) => parts.len(),
176                ListenEvent::Progress(_) => 1,
177            })
178            .sum();
179        let mut ret = Vec::with_capacity(new_len);
180        for event in events {
181            match event {
182                ListenEvent::Updates(parts) => {
183                    for part in parts {
184                        let fetched_part = self.listen.fetch_batch_part(part).await;
185                        let updates = fetched_part.collect::<Vec<_>>();
186                        if !updates.is_empty() {
187                            ret.push(ListenEvent::Updates(updates));
188                        }
189                    }
190                }
191                ListenEvent::Progress(progress) => ret.push(ListenEvent::Progress(progress)),
192            }
193        }
194        ret
195    }
196
197    /// Fetches the contents of `part` and returns its lease.
198    pub async fn fetch_batch_part(&mut self, part: LeasedBatchPart<T>) -> FetchedPart<K, V, T, D> {
199        self.listen.fetch_batch_part(part).await
200    }
201}
202
203impl<K, V, T, D> Subscribe<K, V, T, D>
204where
205    K: Debug + Codec,
206    V: Debug + Codec,
207    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
208    D: Monoid + Codec64 + Send + Sync,
209{
210    /// Politely expires this subscribe, releasing its lease.
211    ///
212    /// There is a best-effort impl in Drop to expire the
213    /// [`ReadHandle`] held by the subscribe that wasn't explicitly expired
214    /// with this method. When possible, explicit expiry is still preferred
215    /// because it also ensures that the background task is complete.
216    pub async fn expire(mut self) {
217        let _ = self.snapshot.take(); // Drop all leased parts.
218        self.listen.expire().await;
219    }
220}
221
222/// Data and progress events of a shard subscription.
223///
224/// TODO: Unify this with [timely::dataflow::operators::capture::event::Event].
225#[derive(Debug, PartialEq)]
226pub enum ListenEvent<T, D> {
227    /// Progress of the shard.
228    Progress(Antichain<T>),
229    /// Data of the shard.
230    Updates(Vec<D>),
231}
232
233/// An ongoing subscription of updates to a shard.
234#[derive(Debug)]
235pub struct Listen<K: Codec, V: Codec, T, D> {
236    handle: ReadHandle<K, V, T, D>,
237    as_of: Antichain<T>,
238    since: Antichain<T>,
239    frontier: Antichain<T>,
240}
241
242impl<K, V, T, D> Listen<K, V, T, D>
243where
244    K: Debug + Codec,
245    V: Debug + Codec,
246    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
247    D: Monoid + Codec64 + Send + Sync,
248{
249    async fn new(
250        mut handle: ReadHandle<K, V, T, D>,
251        as_of: Antichain<T>,
252    ) -> Result<Self, Since<T>> {
253        let () = handle.machine.verify_listen(&as_of)?;
254
255        let since = as_of.clone();
256        if !PartialOrder::less_equal(handle.since(), &since) {
257            // We can't guarantee that the as-of will be available by the time we start reading,
258            // since our handle is already downgraded too far.
259            return Err(Since(handle.since().clone()));
260        }
261        // This listen only needs to distinguish things after its frontier
262        // (initially as_of although the frontier is inclusive and the as_of
263        // isn't). Be a good citizen and downgrade early.
264        handle.downgrade_since(&since).await;
265        Ok(Listen {
266            handle,
267            since,
268            frontier: as_of.clone(),
269            as_of,
270        })
271    }
272
273    /// An exclusive upper bound on the progress of this Listen.
274    pub fn frontier(&self) -> &Antichain<T> {
275        &self.frontier
276    }
277
278    /// Attempt to pull out the next values of this subscription.
279    ///
280    /// The returned [`LeasedBatchPart`] is appropriate to use with
281    /// `crate::fetch::fetch_leased_part`.
282    ///
283    /// The returned `Antichain` represents the subscription progress as it will
284    /// be _after_ the returned parts are fetched.
285    pub async fn next(
286        &mut self,
287        // If Some, an override for the default listen sleep retry parameters.
288        retry: Option<RetryParameters>,
289    ) -> (Vec<LeasedBatchPart<T>>, Antichain<T>) {
290        let batch = loop {
291            let min_elapsed = self.handle.heartbeat_duration();
292            let next_batch = self.handle.machine.next_listen_batch(
293                &self.frontier,
294                &mut self.handle.watch,
295                Some(&self.handle.reader_id),
296                retry,
297            );
298            match tokio::time::timeout(min_elapsed, next_batch).await {
299                Ok(batch) => break batch,
300                Err(_elapsed) => {
301                    self.handle.maybe_downgrade_since(&self.since).await;
302                }
303            }
304        };
305
306        // A lot of things across mz have to line up to hold the following
307        // invariant and violations only show up as subtle correctness errors,
308        // so explicitly validate it here. Better to panic and roll back a
309        // release than be incorrect (also potentially corrupting a sink).
310        //
311        // Note that the since check is intentionally less_than, not less_equal.
312        // If a batch's since is X, that means we can no longer distinguish X
313        // (beyond self.frontier) from X-1 (not beyond self.frontier) to keep
314        // former and filter out the latter.
315        let acceptable_desc = PartialOrder::less_than(batch.desc.since(), &self.frontier)
316            // Special case when the frontier == the as_of (i.e. the first
317            // time this is called on a new Listen). Because as_of is
318            // _exclusive_, we don't need to be able to distinguish X from
319            // X-1.
320            || (self.frontier == self.as_of
321            && PartialOrder::less_equal(batch.desc.since(), &self.frontier));
322        if !acceptable_desc {
323            let lease_state = self
324                .handle
325                .machine
326                .applier
327                .reader_lease(self.handle.reader_id.clone());
328            if let Some(lease) = lease_state {
329                panic!(
330                    "Listen on {} received a batch {:?} advanced past the listen frontier {:?}, but the lease has not expired: {:?}",
331                    self.handle.machine.shard_id(),
332                    batch.desc,
333                    self.frontier,
334                    lease
335                )
336            } else {
337                // Ideally we'd percolate this error up, so callers could eg. restart a dataflow
338                // instead of restarting a process...
339                halt!(
340                    "Listen on {} received a batch {:?} advanced past the listen frontier {:?} after the reader has expired. \
341                     This can happen in exceptional cases: a machine goes to sleep or is running out of memory or CPU, for example.",
342                    self.handle.machine.shard_id(),
343                    batch.desc,
344                    self.frontier
345                )
346            }
347        }
348
349        let new_frontier = batch.desc.upper().clone();
350
351        // We will have a new frontier, so this is an opportunity to downgrade our
352        // since capability. Go through `maybe_heartbeat` so we can rate limit
353        // this along with our heartbeats.
354        //
355        // HACK! Everything would be simpler if we could downgrade since to the
356        // new frontier, but we can't. The next call needs to be able to
357        // distinguish between the times T at the frontier (to emit updates with
358        // these times) and T-1 (to filter them). Advancing the since to
359        // frontier would erase the ability to distinguish between them. Ideally
360        // we'd use what is conceptually "batch.upper - 1" (the greatest
361        // elements that are still strictly less than batch.upper, which will be
362        // the new value of self.frontier after this call returns), but the
363        // trait bounds on T don't give us a way to compute that directly.
364        // Instead, we sniff out any elements in self.frontier (the upper of the
365        // batch the last time we called this) that are strictly less_than the
366        // batch upper to compute a new since. For totally ordered times
367        // (currently always the case in mz) self.frontier will always have a
368        // single element and it will be less_than upper. We
369        // could also abuse the fact that every time we actually emit is
370        // guaranteed by definition to be less_than upper to be a bit more
371        // prompt, but this would involve a lot more temporary antichains and
372        // it's unclear if that's worth it.
373        for x in self.frontier.elements().iter() {
374            let less_than_upper = batch.desc.upper().elements().iter().any(|u| x.less_than(u));
375            if less_than_upper {
376                self.since.join_assign(&Antichain::from_elem(x.clone()));
377            }
378        }
379
380        // IMPORTANT! Make sure this `lease_batch_parts` stays before the
381        // `maybe_downgrade_since` call. Otherwise, we might give up our
382        // capability on the batch's SeqNo before we lease it, which could lead
383        // to blobs that it references being GC'd.
384        let filter = FetchBatchFilter::Listen {
385            as_of: self.as_of.clone(),
386            lower: self.frontier.clone(),
387        };
388        let parts = self.handle.lease_batch_parts(batch, filter).collect().await;
389
390        self.handle.maybe_downgrade_since(&self.since).await;
391
392        // NB: Keep this after we use self.frontier to join_assign self.since
393        // and also after we construct metadata.
394        self.frontier = new_frontier;
395
396        (parts, self.frontier.clone())
397    }
398}
399
400impl<K, V, T, D> Listen<K, V, T, D>
401where
402    K: Debug + Codec,
403    V: Debug + Codec,
404    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
405    D: Monoid + Codec64 + Send + Sync,
406{
407    /// Attempt to pull out the next values of this subscription.
408    ///
409    /// The updates received in [ListenEvent::Updates] should be assumed to be in arbitrary order
410    /// and not necessarily consolidated. However, the timestamp of each individual update will be
411    /// greater than or equal to the last received [ListenEvent::Progress] frontier (or this
412    /// [Listen]'s initial `as_of` frontier if no progress event has been emitted yet) and less
413    /// than the next [ListenEvent::Progress] frontier.
414    ///
415    /// If you have a use for consolidated listen output, given that snapshots can't be
416    /// consolidated, come talk to us!
417    #[instrument(level = "debug", name = "listen::next", fields(shard = %self.handle.machine.shard_id()))]
418    pub async fn fetch_next(&mut self) -> Vec<ListenEvent<T, ((K, V), T, D)>> {
419        let (parts, progress) = self.next(None).await;
420        let mut ret = Vec::with_capacity(parts.len() + 1);
421        for part in parts {
422            let fetched_part = self.fetch_batch_part(part).await;
423            let updates = fetched_part.collect::<Vec<_>>();
424            if !updates.is_empty() {
425                ret.push(ListenEvent::Updates(updates));
426            }
427        }
428        ret.push(ListenEvent::Progress(progress));
429        ret
430    }
431
432    /// Convert listener into futures::Stream
433    pub fn into_stream(mut self) -> impl Stream<Item = ListenEvent<T, ((K, V), T, D)>> {
434        async_stream::stream!({
435            loop {
436                for msg in self.fetch_next().await {
437                    yield msg;
438                }
439            }
440        })
441    }
442
443    /// Test helper to read from the listener until the given frontier is
444    /// reached. Because compaction can arbitrarily combine batches, we only
445    /// return the final progress info.
446    #[cfg(test)]
447    #[track_caller]
448    pub async fn read_until(&mut self, ts: &T) -> (Vec<((K, V), T, D)>, Antichain<T>) {
449        let mut updates = Vec::new();
450        let mut frontier = Antichain::from_elem(T::minimum());
451        while self.frontier.less_than(ts) {
452            for event in self.fetch_next().await {
453                match event {
454                    ListenEvent::Updates(mut x) => updates.append(&mut x),
455                    ListenEvent::Progress(x) => frontier = x,
456                }
457            }
458        }
459        // Unlike most tests, intentionally don't consolidate updates here
460        // because Listen replays them at the original fidelity.
461        (updates, frontier)
462    }
463}
464
465impl<K, V, T, D> Listen<K, V, T, D>
466where
467    K: Debug + Codec,
468    V: Debug + Codec,
469    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
470    D: Monoid + Codec64 + Send + Sync,
471{
472    /// Fetches the contents of `part` and returns its lease.
473    ///
474    /// This is broken out into its own function to provide a trivial means for
475    /// [`Subscribe`], which contains a [`Listen`], to fetch batches.
476    async fn fetch_batch_part(&mut self, part: LeasedBatchPart<T>) -> FetchedPart<K, V, T, D> {
477        let fetched_part = fetch_leased_part(
478            &self.handle.cfg,
479            &part,
480            self.handle.blob.as_ref(),
481            Arc::clone(&self.handle.metrics),
482            &self.handle.metrics.read.listen,
483            &self.handle.machine.applier.shard_metrics,
484            &self.handle.reader_id,
485            self.handle.read_schemas.clone(),
486            &mut self.handle.schema_cache,
487        )
488        .await;
489        fetched_part
490    }
491
492    /// Politely expires this listen, releasing its lease.
493    ///
494    /// There is a best-effort impl in to expire the
495    /// [`ReadHandle`] held by the listen that wasn't explicitly expired with
496    /// this method. When possible, explicit expiry is still preferred because
497    /// it also ensures that the background task is complete.
498    pub async fn expire(self) {
499        self.handle.expire().await
500    }
501}
502
503/// The state for read holds shared between the heartbeat task and the main client,
504/// including the since frontier and the seqno hold.
505#[derive(Debug)]
506pub(crate) struct ReadHolds<T> {
507    /// The frontier we should hold the time-based lease back to.
508    held_since: Antichain<T>,
509    /// The since hold we've actually committed to state. Should always be <=
510    /// the since hold.
511    applied_since: Antichain<T>,
512    /// The largest seqno we've observed in the state.
513    recent_seqno: SeqNo,
514    /// The set of active leases. We hold back the seqno to the minimum lease or
515    /// the recent_seqno, whichever is earlier.
516    leases: BTreeMap<SeqNo, Lease>,
517    /// True iff this state is expired.
518    expired: bool,
519    /// Used to trigger the background task to heartbeat state, instead of waiting for
520    /// the next tick.
521    request_sync: bool,
522}
523
524impl<T> ReadHolds<T>
525where
526    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
527{
528    pub fn downgrade_since(&mut self, since: &Antichain<T>) {
529        self.held_since.join_assign(since);
530    }
531
532    pub fn observe_seqno(&mut self, seqno: SeqNo) {
533        self.recent_seqno = seqno.max(self.recent_seqno);
534    }
535
536    pub fn lease_seqno(&mut self) -> Lease {
537        let seqno = self.recent_seqno;
538        let lease = self
539            .leases
540            .entry(seqno)
541            .or_insert_with(|| Lease::new(seqno));
542        lease.clone()
543    }
544
545    pub fn outstanding_seqno(&mut self) -> SeqNo {
546        while let Some(first) = self.leases.first_entry() {
547            if first.get().count() <= 1 {
548                first.remove();
549            } else {
550                return *first.key();
551            }
552        }
553        self.recent_seqno
554    }
555}
556
557/// A "capability" granting the ability to read the state of some shard at times
558/// greater or equal to `self.since()`.
559///
560/// Production users should call [Self::expire] before dropping a ReadHandle so
561/// that it can expire its leases. If/when rust gets AsyncDrop, this will be
562/// done automatically.
563///
564/// All async methods on ReadHandle retry for as long as they are able, but the
565/// returned [std::future::Future]s implement "cancel on drop" semantics. This
566/// means that callers can add a timeout using [tokio::time::timeout] or
567/// [tokio::time::timeout_at].
568///
569/// ```rust,no_run
570/// # let mut read: mz_persist_client::read::ReadHandle<String, String, u64, i64> = unimplemented!();
571/// # let timeout: std::time::Duration = unimplemented!();
572/// # let new_since: timely::progress::Antichain<u64> = unimplemented!();
573/// # async {
574/// tokio::time::timeout(timeout, read.downgrade_since(&new_since)).await
575/// # };
576/// ```
577#[derive(Debug)]
578pub struct ReadHandle<K: Codec, V: Codec, T, D> {
579    pub(crate) cfg: PersistConfig,
580    pub(crate) metrics: Arc<Metrics>,
581    pub(crate) machine: Machine<K, V, T, D>,
582    pub(crate) gc: GarbageCollector<K, V, T, D>,
583    pub(crate) blob: Arc<dyn Blob>,
584    watch: StateWatch<K, V, T, D>,
585
586    pub(crate) reader_id: LeasedReaderId,
587    pub(crate) read_schemas: Schemas<K, V>,
588    pub(crate) schema_cache: SchemaCache<K, V, T, D>,
589
590    since: Antichain<T>,
591    pub(crate) hold_state: AwaitableState<ReadHolds<T>>,
592    pub(crate) unexpired_state: Option<UnexpiredReadHandleState>,
593}
594
595/// Length of time after a reader's last operation after which the reader may be
596/// expired.
597pub(crate) const READER_LEASE_DURATION: Config<Duration> = Config::new(
598    "persist_reader_lease_duration",
599    Duration::from_secs(60 * 15),
600    "The time after which we'll clean up stale read leases",
601);
602
603impl<K, V, T, D> ReadHandle<K, V, T, D>
604where
605    K: Debug + Codec,
606    V: Debug + Codec,
607    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
608    D: Monoid + Codec64 + Send + Sync,
609{
610    #[allow(clippy::unused_async)]
611    pub(crate) async fn new(
612        cfg: PersistConfig,
613        metrics: Arc<Metrics>,
614        machine: Machine<K, V, T, D>,
615        gc: GarbageCollector<K, V, T, D>,
616        blob: Arc<dyn Blob>,
617        reader_id: LeasedReaderId,
618        read_schemas: Schemas<K, V>,
619        state: LeasedReaderState<T>,
620    ) -> Self {
621        let schema_cache = machine.applier.schema_cache();
622        let hold_state = AwaitableState::new(ReadHolds {
623            held_since: state.since.clone(),
624            applied_since: state.since.clone(),
625            recent_seqno: state.seqno,
626            leases: Default::default(),
627            expired: false,
628            request_sync: false,
629        });
630        ReadHandle {
631            cfg,
632            metrics: Arc::clone(&metrics),
633            machine: machine.clone(),
634            gc: gc.clone(),
635            blob,
636            watch: machine.applier.watch(),
637            reader_id: reader_id.clone(),
638            read_schemas,
639            schema_cache,
640            since: state.since,
641            hold_state: hold_state.clone(),
642            unexpired_state: Some(UnexpiredReadHandleState {
643                heartbeat_task: Self::start_reader_heartbeat_task(
644                    machine, reader_id, gc, hold_state,
645                ),
646            }),
647        }
648    }
649
650    fn start_reader_heartbeat_task(
651        machine: Machine<K, V, T, D>,
652        reader_id: LeasedReaderId,
653        gc: GarbageCollector<K, V, T, D>,
654        leased_seqnos: AwaitableState<ReadHolds<T>>,
655    ) -> JoinHandle<()> {
656        let metrics = Arc::clone(&machine.applier.metrics);
657        let name = format!(
658            "persist::heartbeat_read({},{})",
659            machine.shard_id(),
660            reader_id
661        );
662        mz_ore::task::spawn(|| name, {
663            metrics.tasks.heartbeat_read.instrument_task(async move {
664                Self::reader_heartbeat_task(machine, reader_id, gc, leased_seqnos).await
665            })
666        })
667    }
668
669    async fn reader_heartbeat_task(
670        machine: Machine<K, V, T, D>,
671        reader_id: LeasedReaderId,
672        gc: GarbageCollector<K, V, T, D>,
673        leased_seqnos: AwaitableState<ReadHolds<T>>,
674    ) {
675        let sleep_duration = READER_LEASE_DURATION.get(&machine.applier.cfg) / 4;
676        // Jitter the first tick to avoid a thundering herd when many readers are started around
677        // the same instant, like during deploys.
678        let jitter: f64 = f64::cast_lossy(reader_id.hashed()) / f64::cast_lossy(u64::MAX);
679        let mut interval = tokio::time::interval_at(
680            tokio::time::Instant::now() + sleep_duration.mul_f64(jitter),
681            sleep_duration,
682        );
683        let mut held_since = leased_seqnos.read(|s| s.held_since.clone());
684        loop {
685            let before_sleep = Instant::now();
686            let _woke_by_tick = tokio::select! {
687                _tick = interval.tick() => {
688                    true
689                }
690                _whatever = leased_seqnos.wait_while(|s| !s.request_sync) => {
691                    false
692                }
693            };
694
695            let elapsed_since_before_sleeping = before_sleep.elapsed();
696            if elapsed_since_before_sleeping > sleep_duration + Duration::from_secs(60) {
697                warn!(
698                    "reader ({}) of shard ({}) went {}s between heartbeats",
699                    reader_id,
700                    machine.shard_id(),
701                    elapsed_since_before_sleeping.as_secs_f64()
702                );
703            }
704
705            let before_heartbeat = Instant::now();
706            let heartbeat_ms = (machine.applier.cfg.now)();
707            let current_seqno = machine.seqno();
708            let result = leased_seqnos.modify(|s| {
709                if s.expired {
710                    Err(())
711                } else {
712                    s.observe_seqno(current_seqno);
713                    s.request_sync = false;
714                    held_since.join_assign(&s.held_since);
715                    Ok(s.outstanding_seqno())
716                }
717            });
718            let actual_since = match result {
719                Ok(held_seqno) => {
720                    let (seqno, actual_since, maintenance) = machine
721                        .downgrade_since(&reader_id, held_seqno, &held_since, heartbeat_ms)
722                        .await;
723                    leased_seqnos.modify(|s| {
724                        s.applied_since.clone_from(&actual_since.0);
725                        s.observe_seqno(seqno)
726                    });
727                    maintenance.start_performing(&machine, &gc);
728                    actual_since
729                }
730                Err(()) => {
731                    let (seqno, maintenance) = machine.expire_leased_reader(&reader_id).await;
732                    leased_seqnos.modify(|s| s.observe_seqno(seqno));
733                    maintenance.start_performing(&machine, &gc);
734                    break;
735                }
736            };
737
738            let elapsed_since_heartbeat = before_heartbeat.elapsed();
739            if elapsed_since_heartbeat > Duration::from_secs(60) {
740                warn!(
741                    "reader ({}) of shard ({}) heartbeat call took {}s",
742                    reader_id,
743                    machine.shard_id(),
744                    elapsed_since_heartbeat.as_secs_f64(),
745                );
746            }
747
748            if PartialOrder::less_than(&held_since, &actual_since.0) {
749                // If the read handle was intentionally expired, this task
750                // *should* be aborted before it observes the expiration. So if
751                // we get here, this task somehow failed to keep the read lease
752                // alive. Warn loudly, because there's now a live read handle to
753                // an expired shard that will panic if used, but don't panic,
754                // just in case there is some edge case that results in this
755                // task observing the intentional expiration of a read handle.
756                warn!(
757                    "heartbeat task for reader ({}) of shard ({}) exiting due to expired lease \
758                     while read handle is live",
759                    reader_id,
760                    machine.shard_id(),
761                );
762                return;
763            }
764        }
765    }
766
767    /// This handle's shard id.
768    pub fn shard_id(&self) -> ShardId {
769        self.machine.shard_id()
770    }
771
772    /// This handle's `since` frontier.
773    ///
774    /// This will always be greater or equal to the shard-global `since`.
775    pub fn since(&self) -> &Antichain<T> {
776        &self.since
777    }
778
779    #[cfg(test)]
780    fn outstanding_seqno(&self) -> SeqNo {
781        let current_seqno = self.machine.seqno();
782        self.hold_state.modify(|s| {
783            s.observe_seqno(current_seqno);
784            s.outstanding_seqno()
785        })
786    }
787
788    /// Forwards the since frontier of this handle, giving up the ability to
789    /// read at times not greater or equal to `new_since`.
790    ///
791    /// This may trigger (asynchronous) compaction and consolidation in the
792    /// system. A `new_since` of the empty antichain "finishes" this shard,
793    /// promising that no more data will ever be read by this handle.
794    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
795    pub async fn downgrade_since(&mut self, new_since: &Antichain<T>) {
796        self.since = new_since.clone();
797        self.hold_state.modify(|s| {
798            s.downgrade_since(new_since);
799            s.request_sync = true;
800        });
801        self.hold_state
802            .wait_while(|s| PartialOrder::less_than(&s.applied_since, new_since))
803            .await;
804    }
805
806    /// Returns an ongoing subscription of updates to a shard.
807    ///
808    /// The stream includes all data at times greater than `as_of`. Combined
809    /// with [Self::snapshot] it will produce exactly correct results: the
810    /// snapshot is the TVCs contents at `as_of` and all subsequent updates
811    /// occur at exactly their indicated time. The recipient should only
812    /// downgrade their read capability when they are certain they have all data
813    /// through the frontier they would downgrade to.
814    ///
815    /// This takes ownership of the ReadHandle so the Listen can use it to
816    /// [Self::downgrade_since] as it progresses. If you need to keep this
817    /// handle, then [Self::clone] it before calling listen.
818    ///
819    /// The `Since` error indicates that the requested `as_of` cannot be served
820    /// (the caller has out of date information) and includes the smallest
821    /// `as_of` that would have been accepted.
822    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
823    pub async fn listen(self, as_of: Antichain<T>) -> Result<Listen<K, V, T, D>, Since<T>> {
824        Listen::new(self, as_of).await
825    }
826
827    /// Returns all of the contents of the shard TVC at `as_of` broken up into
828    /// [`LeasedBatchPart`]es. These parts can be "turned in" via
829    /// `crate::fetch::fetch_batch_part` to receive the data they contain.
830    ///
831    /// This command returns the contents of this shard as of `as_of` once they
832    /// are known. This may "block" (in an async-friendly way) if `as_of` is
833    /// greater or equal to the current `upper` of the shard. The recipient
834    /// should only downgrade their read capability when they are certain they
835    /// have all data through the frontier they would downgrade to.
836    ///
837    /// The `Since` error indicates that the requested `as_of` cannot be served
838    /// (the caller has out of date information) and includes the smallest
839    /// `as_of` that would have been accepted.
840    #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
841    pub async fn snapshot(
842        &mut self,
843        as_of: Antichain<T>,
844    ) -> Result<Vec<LeasedBatchPart<T>>, Since<T>> {
845        let batches = loop {
846            let min_elapsed = self.heartbeat_duration();
847            match tokio::time::timeout(min_elapsed, self.machine.snapshot(&as_of)).await {
848                Ok(Ok(batches)) => break batches,
849                Ok(Err(since)) => return Err(since),
850                Err(_timeout) => {
851                    let since = self.since().clone();
852                    self.maybe_downgrade_since(&since).await;
853                }
854            }
855        };
856
857        if !PartialOrder::less_equal(self.since(), &as_of) {
858            return Err(Since(self.since().clone()));
859        }
860
861        let filter = FetchBatchFilter::Snapshot { as_of };
862        let mut leased_parts = Vec::new();
863        for batch in batches {
864            // Flatten the HollowBatch into one LeasedBatchPart per key. Each key
865            // corresponds to a "part" or s3 object. This allows persist_source
866            // to distribute work by parts (smallish, more even size) instead of
867            // batches (arbitrarily large).
868            leased_parts.extend(
869                self.lease_batch_parts(batch, filter.clone())
870                    .collect::<Vec<_>>()
871                    .await,
872            );
873        }
874        Ok(leased_parts)
875    }
876
877    /// Returns a snapshot of all of a shard's data using `as_of`, followed by
878    /// listening to any future updates.
879    ///
880    /// For more details on this operation's semantics, see [Self::snapshot] and
881    /// [Self::listen].
882    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
883    pub async fn subscribe(
884        mut self,
885        as_of: Antichain<T>,
886    ) -> Result<Subscribe<K, V, T, D>, Since<T>> {
887        let snapshot_parts = self.snapshot(as_of.clone()).await?;
888        let listen = self.listen(as_of.clone()).await?;
889        Ok(Subscribe::new(snapshot_parts, listen))
890    }
891
892    fn lease_batch_parts(
893        &mut self,
894        batch: HollowBatch<T>,
895        filter: FetchBatchFilter<T>,
896    ) -> impl Stream<Item = LeasedBatchPart<T>> + '_ {
897        stream! {
898            let blob = Arc::clone(&self.blob);
899            let metrics = Arc::clone(&self.metrics);
900            let desc = batch.desc.clone();
901            let lease = self.lease_seqno().await;
902            for await part in batch.part_stream(self.shard_id(), &*blob, &*metrics) {
903                yield LeasedBatchPart {
904                    metrics: Arc::clone(&self.metrics),
905                    shard_id: self.machine.shard_id(),
906                    filter: filter.clone(),
907                    desc: desc.clone(),
908                    part: part.expect("leased part").into_owned(),
909                    lease: lease.clone(),
910                    filter_pushdown_audit: false,
911                }
912            }
913        }
914    }
915
916    /// Tracks that the `ReadHandle`'s machine's current `SeqNo` is being
917    /// "leased out" to a `LeasedBatchPart`, and cannot be garbage
918    /// collected until its lease has been returned.
919    async fn lease_seqno(&mut self) -> Lease {
920        let current_seqno = self.machine.seqno();
921        let lease = self.hold_state.modify(|s| {
922            s.observe_seqno(current_seqno);
923            s.lease_seqno()
924        });
925        // The seqno we've leased may be the seqno observed by our heartbeat task, which could be
926        // ahead of the last state we saw. Ensure we only observe states in the future of our hold.
927        // (Since these are backed by the same state in the same process, this should all be pretty
928        // fast.)
929        self.watch.wait_for_seqno_ge(lease.seqno()).await;
930        lease
931    }
932
933    /// Returns an independent [ReadHandle] with a new [LeasedReaderId] but the
934    /// same `since`.
935    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
936    pub async fn clone(&self, purpose: &str) -> Self {
937        let new_reader_id = LeasedReaderId::new();
938        let machine = self.machine.clone();
939        let gc = self.gc.clone();
940        let heartbeat_ts = (self.cfg.now)();
941        let (reader_state, maintenance) = machine
942            .register_leased_reader(
943                &new_reader_id,
944                purpose,
945                READER_LEASE_DURATION.get(&self.cfg),
946                heartbeat_ts,
947                false,
948            )
949            .await;
950        maintenance.start_performing(&machine, &gc);
951        // The point of clone is that you're guaranteed to have the same (or
952        // greater) since capability, verify that.
953        // TODO: better if it's the same since capability exactly.
954        assert!(PartialOrder::less_equal(&reader_state.since, &self.since));
955        let new_reader = ReadHandle::new(
956            self.cfg.clone(),
957            Arc::clone(&self.metrics),
958            machine,
959            gc,
960            Arc::clone(&self.blob),
961            new_reader_id,
962            self.read_schemas.clone(),
963            reader_state,
964        )
965        .await;
966        new_reader
967    }
968
969    fn heartbeat_duration(&self) -> Duration {
970        READER_LEASE_DURATION.get(&self.cfg) / 4
971    }
972
973    /// A rate-limited version of [Self::downgrade_since].
974    ///
975    /// Users can call this as frequently as they wish; changes will be periodically
976    /// flushed to state by the heartbeat task.
977    #[allow(clippy::unused_async)]
978    pub async fn maybe_downgrade_since(&mut self, new_since: &Antichain<T>) {
979        self.since = new_since.clone();
980        self.hold_state.modify(|s| {
981            s.downgrade_since(new_since);
982        });
983    }
984
985    /// Politely expires this reader, releasing its lease.
986    ///
987    /// There is a best-effort impl in Drop to expire a reader that wasn't
988    /// explictly expired with this method. When possible, explicit expiry is
989    /// still preferred because it also ensures that the background task is complete.
990    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
991    pub async fn expire(mut self) {
992        self.hold_state.modify(|s| {
993            s.expired = true;
994            s.request_sync = true;
995        });
996        let Some(unexpired_state) = self.unexpired_state.take() else {
997            return;
998        };
999        unexpired_state.heartbeat_task.await;
1000    }
1001
1002    /// Test helper for a [Self::listen] call that is expected to succeed.
1003    #[cfg(test)]
1004    #[track_caller]
1005    pub async fn expect_listen(self, as_of: T) -> Listen<K, V, T, D> {
1006        self.listen(Antichain::from_elem(as_of))
1007            .await
1008            .expect("cannot serve requested as_of")
1009    }
1010}
1011
1012/// State for a read handle that has not been explicitly expired.
1013#[derive(Debug)]
1014pub(crate) struct UnexpiredReadHandleState {
1015    pub(crate) heartbeat_task: JoinHandle<()>,
1016}
1017
1018/// An incremental cursor through a particular shard, returned from [ReadHandle::snapshot_cursor].
1019///
1020/// To read an entire dataset, the
1021/// client should call `next` until it returns `None`, which signals all data has been returned...
1022/// but it's also free to abandon the instance at any time if it eg. only needs a few entries.
1023#[derive(Debug)]
1024pub struct Cursor<K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64, L = Lease> {
1025    consolidator: Consolidator<T, D, StructuredSort<K, V, T, D>>,
1026    max_len: usize,
1027    max_bytes: usize,
1028    _lease: L,
1029    read_schemas: Schemas<K, V>,
1030}
1031
1032impl<K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64, L> Cursor<K, V, T, D, L> {
1033    /// Extracts and returns the lease from the cursor. Allowing the caller to
1034    /// do any necessary cleanup associated with the lease.
1035    pub fn into_lease(self: Self) -> L {
1036        self._lease
1037    }
1038}
1039
1040impl<K, V, T, D, L> Cursor<K, V, T, D, L>
1041where
1042    K: Debug + Codec + Ord,
1043    V: Debug + Codec + Ord,
1044    T: Timestamp + Lattice + Codec64 + Sync,
1045    D: Monoid + Ord + Codec64 + Send + Sync,
1046{
1047    /// Grab the next batch of consolidated data.
1048    pub async fn next(&mut self) -> Option<impl Iterator<Item = ((K, V), T, D)> + '_> {
1049        let Self {
1050            consolidator,
1051            max_len,
1052            max_bytes,
1053            _lease,
1054            read_schemas: _,
1055        } = self;
1056
1057        let part = consolidator
1058            .next_chunk(*max_len, *max_bytes)
1059            .await
1060            .expect("fetching a leased part")?;
1061        let key_decoder = self
1062            .read_schemas
1063            .key
1064            .decoder_any(part.key.as_ref())
1065            .expect("ok");
1066        let val_decoder = self
1067            .read_schemas
1068            .val
1069            .decoder_any(part.val.as_ref())
1070            .expect("ok");
1071        let iter = (0..part.len()).map(move |i| {
1072            let mut k = K::default();
1073            let mut v = V::default();
1074            key_decoder.decode(i, &mut k);
1075            val_decoder.decode(i, &mut v);
1076            let t = T::decode(part.time.value(i).to_le_bytes());
1077            let d = D::decode(part.diff.value(i).to_le_bytes());
1078            ((k, v), t, d)
1079        });
1080
1081        Some(iter)
1082    }
1083}
1084
1085impl<K, V, T, D> ReadHandle<K, V, T, D>
1086where
1087    K: Debug + Codec + Ord,
1088    V: Debug + Codec + Ord,
1089    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
1090    D: Monoid + Ord + Codec64 + Send + Sync,
1091{
1092    /// Generates a [Self::snapshot], and fetches all of the batches it
1093    /// contains.
1094    ///
1095    /// The output is consolidated. Furthermore, to keep memory usage down when
1096    /// reading a snapshot that consolidates well, this consolidates as it goes.
1097    ///
1098    /// Potential future improvements (if necessary):
1099    /// - Accept something like a `F: Fn(K,V) -> (K,V)` argument, which looks
1100    ///   like an MFP you might be pushing down. Reason being that if you are
1101    ///   projecting or transforming in a way that allows further consolidation,
1102    ///   amazing.
1103    /// - Reuse any code we write to streaming-merge consolidate in
1104    ///   persist_source here.
1105    pub async fn snapshot_and_fetch(
1106        &mut self,
1107        as_of: Antichain<T>,
1108    ) -> Result<Vec<((K, V), T, D)>, Since<T>> {
1109        let mut cursor = self.snapshot_cursor(as_of, |_| true).await?;
1110        let mut contents = Vec::new();
1111        while let Some(iter) = cursor.next().await {
1112            contents.extend(iter);
1113        }
1114
1115        // We don't currently guarantee that encoding is one-to-one, so we still need to
1116        // consolidate the decoded outputs. However, let's report if this isn't a noop.
1117        let old_len = contents.len();
1118        consolidate_updates(&mut contents);
1119        if old_len != contents.len() {
1120            // TODO(bkirwi): do we need more / finer-grained metrics for this?
1121            self.machine
1122                .applier
1123                .shard_metrics
1124                .unconsolidated_snapshot
1125                .inc();
1126        }
1127
1128        Ok(contents)
1129    }
1130
1131    /// Generates a [Self::snapshot], and fetches all of the batches it
1132    /// contains.
1133    ///
1134    /// To keep memory usage down when reading a snapshot that consolidates well, this consolidates
1135    /// as it goes. However, note that only the serialized data is consolidated: the deserialized
1136    /// data will only be consolidated if your K/V codecs are one-to-one.
1137    pub async fn snapshot_cursor(
1138        &mut self,
1139        as_of: Antichain<T>,
1140        should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
1141    ) -> Result<Cursor<K, V, T, D>, Since<T>> {
1142        let batches = self.machine.snapshot(&as_of).await?;
1143        let lease = self.lease_seqno().await;
1144
1145        Self::read_batches_consolidated(
1146            &self.cfg,
1147            Arc::clone(&self.metrics),
1148            Arc::clone(&self.machine.applier.shard_metrics),
1149            self.metrics.read.snapshot.clone(),
1150            Arc::clone(&self.blob),
1151            self.shard_id(),
1152            as_of,
1153            self.read_schemas.clone(),
1154            &batches,
1155            lease,
1156            should_fetch_part,
1157            COMPACTION_MEMORY_BOUND_BYTES.get(&self.cfg),
1158        )
1159    }
1160
1161    pub(crate) fn read_batches_consolidated<L>(
1162        persist_cfg: &PersistConfig,
1163        metrics: Arc<Metrics>,
1164        shard_metrics: Arc<ShardMetrics>,
1165        read_metrics: ReadMetrics,
1166        blob: Arc<dyn Blob>,
1167        shard_id: ShardId,
1168        as_of: Antichain<T>,
1169        schemas: Schemas<K, V>,
1170        batches: &[HollowBatch<T>],
1171        lease: L,
1172        should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
1173        memory_budget_bytes: usize,
1174    ) -> Result<Cursor<K, V, T, D, L>, Since<T>> {
1175        let context = format!("{}[as_of={:?}]", shard_id, as_of.elements());
1176        let filter = FetchBatchFilter::Snapshot {
1177            as_of: as_of.clone(),
1178        };
1179
1180        let mut consolidator = Consolidator::new(
1181            context,
1182            FetchConfig::from_persist_config(persist_cfg),
1183            shard_id,
1184            StructuredSort::new(schemas.clone()),
1185            blob,
1186            metrics,
1187            shard_metrics,
1188            read_metrics,
1189            filter,
1190            None,
1191            memory_budget_bytes,
1192        );
1193        for batch in batches {
1194            for (meta, run) in batch.runs() {
1195                consolidator.enqueue_run(
1196                    &batch.desc,
1197                    meta,
1198                    run.into_iter()
1199                        .filter(|p| should_fetch_part(p.stats()))
1200                        .cloned(),
1201                );
1202            }
1203        }
1204        // This default may end up consolidating more records than previously
1205        // for cases like fast-path peeks, where only the first few entries are used.
1206        // If this is a noticeable performance impact, thread the max-len in from the caller.
1207        let max_len = persist_cfg.compaction_yield_after_n_updates;
1208        let max_bytes = BLOB_TARGET_SIZE.get(persist_cfg).max(1);
1209
1210        Ok(Cursor {
1211            consolidator,
1212            max_len,
1213            max_bytes,
1214            _lease: lease,
1215            read_schemas: schemas,
1216        })
1217    }
1218
1219    /// Returns aggregate statistics about the contents of the shard TVC at the
1220    /// given frontier.
1221    ///
1222    /// This command returns the contents of this shard as of `as_of` once they
1223    /// are known. This may "block" (in an async-friendly way) if `as_of` is
1224    /// greater or equal to the current `upper` of the shard. If `None` is given
1225    /// for `as_of`, then the latest stats known by this process are used.
1226    ///
1227    /// The `Since` error indicates that the requested `as_of` cannot be served
1228    /// (the caller has out of date information) and includes the smallest
1229    /// `as_of` that would have been accepted.
1230    pub fn snapshot_stats(
1231        &self,
1232        as_of: Option<Antichain<T>>,
1233    ) -> impl Future<Output = Result<SnapshotStats, Since<T>>> + Send + 'static {
1234        let machine = self.machine.clone();
1235        async move {
1236            let batches = match as_of {
1237                Some(as_of) => machine.snapshot(&as_of).await?,
1238                None => machine.applier.all_batches(),
1239            };
1240            let num_updates = batches.iter().map(|b| b.len).sum();
1241            Ok(SnapshotStats {
1242                shard_id: machine.shard_id(),
1243                num_updates,
1244            })
1245        }
1246    }
1247
1248    /// Returns aggregate statistics about the contents of the shard TVC at the
1249    /// given frontier.
1250    ///
1251    /// This command returns the contents of this shard as of `as_of` once they
1252    /// are known. This may "block" (in an async-friendly way) if `as_of` is
1253    /// greater or equal to the current `upper` of the shard.
1254    ///
1255    /// The `Since` error indicates that the requested `as_of` cannot be served
1256    /// (the caller has out of date information) and includes the smallest
1257    /// `as_of` that would have been accepted.
1258    pub async fn snapshot_parts_stats(
1259        &self,
1260        as_of: Antichain<T>,
1261    ) -> Result<SnapshotPartsStats, Since<T>> {
1262        let batches = self.machine.snapshot(&as_of).await?;
1263        let parts = stream::iter(&batches)
1264            .flat_map(|b| b.part_stream(self.shard_id(), &*self.blob, &*self.metrics))
1265            .map(|p| {
1266                let p = p.expect("live batch");
1267                SnapshotPartStats {
1268                    encoded_size_bytes: p.encoded_size_bytes(),
1269                    stats: p.stats().cloned(),
1270                }
1271            })
1272            .collect()
1273            .await;
1274        Ok(SnapshotPartsStats {
1275            metrics: Arc::clone(&self.machine.applier.metrics),
1276            shard_id: self.machine.shard_id(),
1277            parts,
1278        })
1279    }
1280}
1281
1282impl<K, V, T, D> ReadHandle<K, V, T, D>
1283where
1284    K: Debug + Codec + Ord,
1285    V: Debug + Codec + Ord,
1286    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
1287    D: Monoid + Codec64 + Send + Sync,
1288{
1289    /// Generates a [Self::snapshot], and streams out all of the updates
1290    /// it contains in bounded memory.
1291    ///
1292    /// The output is not consolidated.
1293    pub async fn snapshot_and_stream(
1294        &mut self,
1295        as_of: Antichain<T>,
1296    ) -> Result<impl Stream<Item = ((K, V), T, D)> + use<K, V, T, D>, Since<T>> {
1297        let snap = self.snapshot(as_of).await?;
1298
1299        let blob = Arc::clone(&self.blob);
1300        let metrics = Arc::clone(&self.metrics);
1301        let snapshot_metrics = self.metrics.read.snapshot.clone();
1302        let shard_metrics = Arc::clone(&self.machine.applier.shard_metrics);
1303        let reader_id = self.reader_id.clone();
1304        let schemas = self.read_schemas.clone();
1305        let mut schema_cache = self.schema_cache.clone();
1306        let persist_cfg = self.cfg.clone();
1307        let stream = async_stream::stream! {
1308            for part in snap {
1309                let mut fetched_part = fetch_leased_part(
1310                    &persist_cfg,
1311                    &part,
1312                    blob.as_ref(),
1313                    Arc::clone(&metrics),
1314                    &snapshot_metrics,
1315                    &shard_metrics,
1316                    &reader_id,
1317                    schemas.clone(),
1318                    &mut schema_cache,
1319                )
1320                .await;
1321
1322                while let Some(next) = fetched_part.next() {
1323                    yield next;
1324                }
1325            }
1326        };
1327
1328        Ok(stream)
1329    }
1330}
1331
1332impl<K, V, T, D> ReadHandle<K, V, T, D>
1333where
1334    K: Debug + Codec + Ord,
1335    V: Debug + Codec + Ord,
1336    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
1337    D: Monoid + Ord + Codec64 + Send + Sync,
1338{
1339    /// Test helper to generate a [Self::snapshot] call that is expected to
1340    /// succeed, process its batches, and then return its data sorted.
1341    #[cfg(test)]
1342    #[track_caller]
1343    pub async fn expect_snapshot_and_fetch(&mut self, as_of: T) -> Vec<((K, V), T, D)> {
1344        let mut ret = self
1345            .snapshot_and_fetch(Antichain::from_elem(as_of))
1346            .await
1347            .expect("cannot serve requested as_of");
1348
1349        ret.sort();
1350        ret
1351    }
1352}
1353
1354impl<K: Codec, V: Codec, T, D> Drop for ReadHandle<K, V, T, D> {
1355    fn drop(&mut self) {
1356        self.hold_state.modify(|s| {
1357            s.expired = true;
1358            s.request_sync = true;
1359        });
1360    }
1361}
1362
1363#[cfg(test)]
1364mod tests {
1365    use std::pin;
1366    use std::str::FromStr;
1367
1368    use mz_dyncfg::ConfigUpdates;
1369    use mz_ore::cast::CastFrom;
1370    use mz_ore::metrics::MetricsRegistry;
1371    use mz_persist::mem::{MemBlob, MemBlobConfig, MemConsensus};
1372    use mz_persist::unreliable::{UnreliableConsensus, UnreliableHandle};
1373    use serde::{Deserialize, Serialize};
1374    use serde_json::json;
1375    use tokio_stream::StreamExt;
1376
1377    use crate::async_runtime::IsolatedRuntime;
1378    use crate::batch::BLOB_TARGET_SIZE;
1379    use crate::cache::StateCache;
1380    use crate::internal::metrics::Metrics;
1381    use crate::rpc::NoopPubSubSender;
1382    use crate::tests::{all_ok, new_test_client};
1383    use crate::{Diagnostics, PersistClient, PersistConfig, ShardId};
1384
1385    use super::*;
1386
1387    // Verifies `Subscribe` can be dropped while holding snapshot batches.
1388    #[mz_persist_proc::test(tokio::test)]
1389    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1390    async fn drop_unused_subscribe(dyncfgs: ConfigUpdates) {
1391        let data = [
1392            (("0".to_owned(), "zero".to_owned()), 0, 1),
1393            (("1".to_owned(), "one".to_owned()), 1, 1),
1394            (("2".to_owned(), "two".to_owned()), 2, 1),
1395        ];
1396
1397        let (mut write, read) = new_test_client(&dyncfgs)
1398            .await
1399            .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1400            .await;
1401
1402        write.expect_compare_and_append(&data[0..1], 0, 1).await;
1403        write.expect_compare_and_append(&data[1..2], 1, 2).await;
1404        write.expect_compare_and_append(&data[2..3], 2, 3).await;
1405
1406        let subscribe = read
1407            .subscribe(timely::progress::Antichain::from_elem(2))
1408            .await
1409            .unwrap();
1410        assert!(
1411            !subscribe.snapshot.as_ref().unwrap().is_empty(),
1412            "snapshot must have batches for test to be meaningful"
1413        );
1414        drop(subscribe);
1415    }
1416
1417    // Verifies that we streaming-consolidate away identical key-values in the same batch.
1418    #[mz_persist_proc::test(tokio::test)]
1419    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1420    async fn streaming_consolidate(dyncfgs: ConfigUpdates) {
1421        let data = &[
1422            // Identical records should sum together...
1423            (("k".to_owned(), "v".to_owned()), 0, 1),
1424            (("k".to_owned(), "v".to_owned()), 1, 1),
1425            (("k".to_owned(), "v".to_owned()), 2, 1),
1426            // ...and when they cancel out entirely they should be omitted.
1427            (("k2".to_owned(), "v".to_owned()), 0, 1),
1428            (("k2".to_owned(), "v".to_owned()), 1, -1),
1429        ];
1430
1431        let (mut write, read) = {
1432            let client = new_test_client(&dyncfgs).await;
1433            client.cfg.set_config(&BLOB_TARGET_SIZE, 1000); // So our batch stays together!
1434            client
1435                .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1436                .await
1437        };
1438
1439        write.expect_compare_and_append(data, 0, 5).await;
1440
1441        let mut snapshot = read
1442            .subscribe(timely::progress::Antichain::from_elem(4))
1443            .await
1444            .unwrap();
1445
1446        let mut updates = vec![];
1447        'outer: loop {
1448            for event in snapshot.fetch_next().await {
1449                match event {
1450                    ListenEvent::Progress(t) => {
1451                        if !t.less_than(&4) {
1452                            break 'outer;
1453                        }
1454                    }
1455                    ListenEvent::Updates(data) => {
1456                        updates.extend(data);
1457                    }
1458                }
1459            }
1460        }
1461        assert_eq!(updates, &[(("k".to_owned(), "v".to_owned()), 4u64, 3i64)],)
1462    }
1463
1464    #[mz_persist_proc::test(tokio::test)]
1465    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1466    async fn snapshot_and_stream(dyncfgs: ConfigUpdates) {
1467        let data = &mut [
1468            (("k1".to_owned(), "v1".to_owned()), 0, 1),
1469            (("k2".to_owned(), "v2".to_owned()), 1, 1),
1470            (("k3".to_owned(), "v3".to_owned()), 2, 1),
1471            (("k4".to_owned(), "v4".to_owned()), 2, 1),
1472            (("k5".to_owned(), "v5".to_owned()), 3, 1),
1473        ];
1474
1475        let (mut write, mut read) = {
1476            let client = new_test_client(&dyncfgs).await;
1477            client.cfg.set_config(&BLOB_TARGET_SIZE, 0); // split batches across multiple parts
1478            client
1479                .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1480                .await
1481        };
1482
1483        write.expect_compare_and_append(&data[0..2], 0, 2).await;
1484        write.expect_compare_and_append(&data[2..4], 2, 3).await;
1485        write.expect_compare_and_append(&data[4..], 3, 4).await;
1486
1487        let as_of = Antichain::from_elem(3);
1488        let mut snapshot = pin::pin!(read.snapshot_and_stream(as_of.clone()).await.unwrap());
1489
1490        let mut snapshot_rows = vec![];
1491        while let Some(((k, v), t, d)) = snapshot.next().await {
1492            snapshot_rows.push(((k, v), t, d));
1493        }
1494
1495        for ((_k, _v), t, _d) in data.as_mut_slice() {
1496            t.advance_by(as_of.borrow());
1497        }
1498
1499        assert_eq!(data.as_slice(), snapshot_rows.as_slice());
1500    }
1501
1502    // Verifies the semantics of `SeqNo` leases + checks dropping `LeasedBatchPart` semantics.
1503    #[mz_persist_proc::test(tokio::test)]
1504    #[cfg_attr(miri, ignore)] // https://github.com/MaterializeInc/database-issues/issues/5964
1505    async fn seqno_leases(dyncfgs: ConfigUpdates) {
1506        let mut data = vec![];
1507        for i in 0..20 {
1508            data.push(((i.to_string(), i.to_string()), i, 1))
1509        }
1510
1511        let shard_id = ShardId::new();
1512
1513        let client = new_test_client(&dyncfgs).await;
1514        let (mut write, read) = client
1515            .expect_open::<String, String, u64, i64>(shard_id)
1516            .await;
1517
1518        // Seed with some values
1519        let mut offset = 0;
1520        let mut width = 2;
1521
1522        for i in offset..offset + width {
1523            write
1524                .expect_compare_and_append(
1525                    &data[i..i + 1],
1526                    u64::cast_from(i),
1527                    u64::cast_from(i) + 1,
1528                )
1529                .await;
1530        }
1531        offset += width;
1532
1533        // Create machinery for subscribe + fetch
1534        let mut fetcher = client
1535            .create_batch_fetcher::<String, String, u64, i64>(
1536                shard_id,
1537                Default::default(),
1538                Default::default(),
1539                false,
1540                Diagnostics::for_tests(),
1541            )
1542            .await
1543            .unwrap();
1544
1545        let mut subscribe = read
1546            .subscribe(timely::progress::Antichain::from_elem(1))
1547            .await
1548            .expect("cannot serve requested as_of");
1549
1550        // Determine the sequence number held by our subscribe.
1551        let original_seqno_since = subscribe.listen.handle.outstanding_seqno();
1552        if let Some(snapshot) = &subscribe.snapshot {
1553            for part in snapshot {
1554                assert!(
1555                    part.lease.seqno() >= original_seqno_since,
1556                    "our seqno hold must cover all parts"
1557                );
1558            }
1559        }
1560
1561        let mut parts = vec![];
1562
1563        width = 4;
1564        // Collect parts while continuing to write values
1565        for i in offset..offset + width {
1566            for event in subscribe.next(None).await {
1567                if let ListenEvent::Updates(mut new_parts) = event {
1568                    parts.append(&mut new_parts);
1569                    // Here and elsewhere we "cheat" and immediately downgrade the since
1570                    // to demonstrate the effects of SeqNo leases immediately.
1571                    subscribe
1572                        .listen
1573                        .handle
1574                        .downgrade_since(&subscribe.listen.since)
1575                        .await;
1576                }
1577            }
1578
1579            write
1580                .expect_compare_and_append(
1581                    &data[i..i + 1],
1582                    u64::cast_from(i),
1583                    u64::cast_from(i) + 1,
1584                )
1585                .await;
1586
1587            // SeqNo is not downgraded
1588            assert_eq!(
1589                subscribe.listen.handle.machine.applier.seqno_since(),
1590                original_seqno_since
1591            );
1592        }
1593
1594        offset += width;
1595
1596        let mut seqno_since = subscribe.listen.handle.machine.applier.seqno_since();
1597
1598        // We're starting out with the original, non-downgraded SeqNo
1599        assert_eq!(seqno_since, original_seqno_since);
1600
1601        // We have to handle the parts we generate during the next loop to
1602        // ensure they don't panic.
1603        let mut subsequent_parts = vec![];
1604
1605        // Ensure monotonicity of seqnos we're processing, otherwise the
1606        // invariant we're testing (returning the last part of a seqno will
1607        // downgrade its since) will not hold.
1608        let mut this_seqno = SeqNo::minimum();
1609
1610        // Repeat the same process as above, more or less, while fetching + returning parts
1611        for (mut i, part) in parts.into_iter().enumerate() {
1612            let part_seqno = part.lease.seqno();
1613            let last_seqno = this_seqno;
1614            this_seqno = part_seqno;
1615            assert!(this_seqno >= last_seqno);
1616
1617            let (part, lease) = part.into_exchangeable_part();
1618            let _ = fetcher.fetch_leased_part(part).await;
1619            drop(lease);
1620
1621            // Simulates an exchange
1622            for event in subscribe.next(None).await {
1623                if let ListenEvent::Updates(parts) = event {
1624                    for part in parts {
1625                        let (_, lease) = part.into_exchangeable_part();
1626                        subsequent_parts.push(lease);
1627                    }
1628                }
1629            }
1630
1631            subscribe
1632                .listen
1633                .handle
1634                .downgrade_since(&subscribe.listen.since)
1635                .await;
1636
1637            // Write more new values
1638            i += offset;
1639            write
1640                .expect_compare_and_append(
1641                    &data[i..i + 1],
1642                    u64::cast_from(i),
1643                    u64::cast_from(i) + 1,
1644                )
1645                .await;
1646
1647            // We should expect the SeqNo to be downgraded if this part's SeqNo
1648            // is no longer leased to any other parts, either.
1649            let expect_downgrade = subscribe.listen.handle.outstanding_seqno() > part_seqno;
1650
1651            let new_seqno_since = subscribe.listen.handle.machine.applier.seqno_since();
1652            if expect_downgrade {
1653                assert!(new_seqno_since > seqno_since);
1654            } else {
1655                assert_eq!(new_seqno_since, seqno_since);
1656            }
1657            seqno_since = new_seqno_since;
1658        }
1659
1660        // SeqNo since was downgraded
1661        assert!(seqno_since > original_seqno_since);
1662
1663        // Return any outstanding parts, to prevent a panic!
1664        drop(subsequent_parts);
1665        drop(subscribe);
1666    }
1667
1668    #[mz_ore::test]
1669    fn reader_id_human_readable_serde() {
1670        #[derive(Debug, Serialize, Deserialize)]
1671        struct Container {
1672            reader_id: LeasedReaderId,
1673        }
1674
1675        // roundtrip through json
1676        let id =
1677            LeasedReaderId::from_str("r00000000-1234-5678-0000-000000000000").expect("valid id");
1678        assert_eq!(
1679            id,
1680            serde_json::from_value(serde_json::to_value(id.clone()).expect("serializable"))
1681                .expect("deserializable")
1682        );
1683
1684        // deserialize a serialized string directly
1685        assert_eq!(
1686            id,
1687            serde_json::from_str("\"r00000000-1234-5678-0000-000000000000\"")
1688                .expect("deserializable")
1689        );
1690
1691        // roundtrip id through a container type
1692        let json = json!({ "reader_id": id });
1693        assert_eq!(
1694            "{\"reader_id\":\"r00000000-1234-5678-0000-000000000000\"}",
1695            &json.to_string()
1696        );
1697        let container: Container = serde_json::from_value(json).expect("deserializable");
1698        assert_eq!(container.reader_id, id);
1699    }
1700
1701    // Verifies performance optimizations where a Listener doesn't fetch the
1702    // latest Consensus state if the one it currently has can serve the next
1703    // request.
1704    #[mz_ore::test(tokio::test)]
1705    #[cfg_attr(miri, ignore)] // too slow
1706    async fn skip_consensus_fetch_optimization() {
1707        let data = vec![
1708            (("0".to_owned(), "zero".to_owned()), 0, 1),
1709            (("1".to_owned(), "one".to_owned()), 1, 1),
1710            (("2".to_owned(), "two".to_owned()), 2, 1),
1711        ];
1712
1713        let cfg = PersistConfig::new_for_tests();
1714        let blob = Arc::new(MemBlob::open(MemBlobConfig::default()));
1715        let consensus = Arc::new(MemConsensus::default());
1716        let unreliable = UnreliableHandle::default();
1717        unreliable.totally_available();
1718        let consensus = Arc::new(UnreliableConsensus::new(consensus, unreliable.clone()));
1719        let metrics = Arc::new(Metrics::new(&cfg, &MetricsRegistry::new()));
1720        let pubsub_sender = Arc::new(NoopPubSubSender);
1721        let (mut write, mut read) = PersistClient::new(
1722            cfg,
1723            blob,
1724            consensus,
1725            metrics,
1726            Arc::new(IsolatedRuntime::new_for_tests()),
1727            Arc::new(StateCache::new_no_metrics()),
1728            pubsub_sender,
1729        )
1730        .expect("client construction failed")
1731        .expect_open::<String, String, u64, i64>(ShardId::new())
1732        .await;
1733
1734        write.expect_compare_and_append(&data[0..1], 0, 1).await;
1735        write.expect_compare_and_append(&data[1..2], 1, 2).await;
1736        write.expect_compare_and_append(&data[2..3], 2, 3).await;
1737
1738        let snapshot = read.expect_snapshot_and_fetch(2).await;
1739        let mut listen = read.expect_listen(0).await;
1740
1741        // Manually advance the listener's machine so that it has the latest
1742        // state by fetching the first events from next. This is awkward but
1743        // only necessary because we're about to do some weird things with
1744        // unreliable.
1745        let listen_actual = listen.fetch_next().await;
1746        let expected_events = vec![ListenEvent::Progress(Antichain::from_elem(1))];
1747        assert_eq!(listen_actual, expected_events);
1748
1749        // At this point, the snapshot and listen's state should have all the
1750        // writes. Test this by making consensus completely unavailable.
1751        unreliable.totally_unavailable();
1752        assert_eq!(snapshot, all_ok(&data, 2));
1753        assert_eq!(
1754            listen.read_until(&3).await,
1755            (all_ok(&data[1..], 1), Antichain::from_elem(3))
1756        );
1757    }
1758}