mz_persist_client/
read.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Read capabilities and handles
11
12use async_stream::stream;
13use std::backtrace::Backtrace;
14use std::collections::BTreeMap;
15use std::fmt::Debug;
16use std::future::Future;
17use std::sync::Arc;
18use std::time::Duration;
19
20use differential_dataflow::consolidation::consolidate_updates;
21use differential_dataflow::difference::Monoid;
22use differential_dataflow::lattice::Lattice;
23use differential_dataflow::trace::Description;
24use futures::Stream;
25use futures_util::{StreamExt, stream};
26use mz_dyncfg::Config;
27use mz_ore::halt;
28use mz_ore::instrument;
29use mz_ore::now::EpochMillis;
30use mz_ore::task::{AbortOnDropHandle, JoinHandle, RuntimeExt};
31use mz_persist::location::{Blob, SeqNo};
32use mz_persist_types::columnar::{ColumnDecoder, Schema};
33use mz_persist_types::{Codec, Codec64};
34use proptest_derive::Arbitrary;
35use serde::{Deserialize, Serialize};
36use timely::PartialOrder;
37use timely::order::TotalOrder;
38use timely::progress::{Antichain, Timestamp};
39use tokio::runtime::Handle;
40use tracing::{Instrument, debug_span, warn};
41use uuid::Uuid;
42
43use crate::batch::BLOB_TARGET_SIZE;
44use crate::cfg::{COMPACTION_MEMORY_BOUND_BYTES, RetryParameters};
45use crate::fetch::FetchConfig;
46use crate::fetch::{FetchBatchFilter, FetchedPart, Lease, LeasedBatchPart, fetch_leased_part};
47use crate::internal::encoding::Schemas;
48use crate::internal::machine::{ExpireFn, Machine};
49use crate::internal::metrics::{Metrics, ReadMetrics, ShardMetrics};
50use crate::internal::state::{BatchPart, HollowBatch};
51use crate::internal::watch::StateWatch;
52use crate::iter::{Consolidator, StructuredSort};
53use crate::schema::SchemaCache;
54use crate::stats::{SnapshotPartStats, SnapshotPartsStats, SnapshotStats};
55use crate::{GarbageCollector, PersistConfig, ShardId, parse_id};
56
57pub use crate::internal::encoding::LazyPartStats;
58pub use crate::internal::state::Since;
59
60/// An opaque identifier for a reader of a persist durable TVC (aka shard).
61#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
62#[serde(try_from = "String", into = "String")]
63pub struct LeasedReaderId(pub(crate) [u8; 16]);
64
65impl std::fmt::Display for LeasedReaderId {
66    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67        write!(f, "r{}", Uuid::from_bytes(self.0))
68    }
69}
70
71impl std::fmt::Debug for LeasedReaderId {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        write!(f, "LeasedReaderId({})", Uuid::from_bytes(self.0))
74    }
75}
76
77impl std::str::FromStr for LeasedReaderId {
78    type Err = String;
79
80    fn from_str(s: &str) -> Result<Self, Self::Err> {
81        parse_id("r", "LeasedReaderId", s).map(LeasedReaderId)
82    }
83}
84
85impl From<LeasedReaderId> for String {
86    fn from(reader_id: LeasedReaderId) -> Self {
87        reader_id.to_string()
88    }
89}
90
91impl TryFrom<String> for LeasedReaderId {
92    type Error = String;
93
94    fn try_from(s: String) -> Result<Self, Self::Error> {
95        s.parse()
96    }
97}
98
99impl LeasedReaderId {
100    pub(crate) fn new() -> Self {
101        LeasedReaderId(*Uuid::new_v4().as_bytes())
102    }
103}
104
105/// Capable of generating a snapshot of all data at `as_of`, followed by a
106/// listen of all updates.
107///
108/// For more details, see [`ReadHandle::snapshot`] and [`Listen`].
109#[derive(Debug)]
110pub struct Subscribe<K: Codec, V: Codec, T, D> {
111    snapshot: Option<Vec<LeasedBatchPart<T>>>,
112    listen: Listen<K, V, T, D>,
113}
114
115impl<K, V, T, D> Subscribe<K, V, T, D>
116where
117    K: Debug + Codec,
118    V: Debug + Codec,
119    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
120    D: Monoid + Codec64 + Send + Sync,
121{
122    fn new(snapshot_parts: Vec<LeasedBatchPart<T>>, listen: Listen<K, V, T, D>) -> Self {
123        Subscribe {
124            snapshot: Some(snapshot_parts),
125            listen,
126        }
127    }
128
129    /// Returns a `LeasedBatchPart` enriched with the proper metadata.
130    ///
131    /// First returns snapshot parts, until they're exhausted, at which point
132    /// begins returning listen parts.
133    ///
134    /// The returned `Antichain` represents the subscription progress as it will
135    /// be _after_ the returned parts are fetched.
136    #[instrument(level = "debug", fields(shard = %self.listen.handle.machine.shard_id()))]
137    pub async fn next(
138        &mut self,
139        // If Some, an override for the default listen sleep retry parameters.
140        listen_retry: Option<RetryParameters>,
141    ) -> Vec<ListenEvent<T, LeasedBatchPart<T>>> {
142        match self.snapshot.take() {
143            Some(parts) => vec![ListenEvent::Updates(parts)],
144            None => {
145                let (parts, upper) = self.listen.next(listen_retry).await;
146                vec![ListenEvent::Updates(parts), ListenEvent::Progress(upper)]
147            }
148        }
149    }
150}
151
152impl<K, V, T, D> Subscribe<K, V, T, D>
153where
154    K: Debug + Codec,
155    V: Debug + Codec,
156    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
157    D: Monoid + Codec64 + Send + Sync,
158{
159    /// Equivalent to `next`, but rather than returning a [`LeasedBatchPart`],
160    /// fetches and returns the data from within it.
161    #[instrument(level = "debug", fields(shard = %self.listen.handle.machine.shard_id()))]
162    pub async fn fetch_next(
163        &mut self,
164    ) -> Vec<ListenEvent<T, ((Result<K, String>, Result<V, String>), T, D)>> {
165        let events = self.next(None).await;
166        let new_len = events
167            .iter()
168            .map(|event| match event {
169                ListenEvent::Updates(parts) => parts.len(),
170                ListenEvent::Progress(_) => 1,
171            })
172            .sum();
173        let mut ret = Vec::with_capacity(new_len);
174        for event in events {
175            match event {
176                ListenEvent::Updates(parts) => {
177                    for part in parts {
178                        let fetched_part = self.listen.fetch_batch_part(part).await;
179                        let updates = fetched_part.collect::<Vec<_>>();
180                        if !updates.is_empty() {
181                            ret.push(ListenEvent::Updates(updates));
182                        }
183                    }
184                }
185                ListenEvent::Progress(progress) => ret.push(ListenEvent::Progress(progress)),
186            }
187        }
188        ret
189    }
190
191    /// Fetches the contents of `part` and returns its lease.
192    pub async fn fetch_batch_part(&mut self, part: LeasedBatchPart<T>) -> FetchedPart<K, V, T, D> {
193        self.listen.fetch_batch_part(part).await
194    }
195}
196
197impl<K, V, T, D> Subscribe<K, V, T, D>
198where
199    K: Debug + Codec,
200    V: Debug + Codec,
201    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
202    D: Monoid + Codec64 + Send + Sync,
203{
204    /// Politely expires this subscribe, releasing its lease.
205    ///
206    /// There is a best-effort impl in Drop for [`ReadHandle`] to expire the
207    /// [`ReadHandle`] held by the subscribe that wasn't explicitly expired
208    /// with this method. When possible, explicit expiry is still preferred
209    /// because the Drop one is best effort and is dependant on a tokio
210    /// [Handle] being available in the TLC at the time of drop (which is a bit
211    /// subtle). Also, explicit expiry allows for control over when it happens.
212    pub async fn expire(mut self) {
213        let _ = self.snapshot.take(); // Drop all leased parts.
214        self.listen.expire().await;
215    }
216}
217
218/// Data and progress events of a shard subscription.
219///
220/// TODO: Unify this with [timely::dataflow::operators::capture::event::Event].
221#[derive(Debug, PartialEq)]
222pub enum ListenEvent<T, D> {
223    /// Progress of the shard.
224    Progress(Antichain<T>),
225    /// Data of the shard.
226    Updates(Vec<D>),
227}
228
229/// An ongoing subscription of updates to a shard.
230#[derive(Debug)]
231pub struct Listen<K: Codec, V: Codec, T, D> {
232    handle: ReadHandle<K, V, T, D>,
233    watch: StateWatch<K, V, T, D>,
234
235    as_of: Antichain<T>,
236    since: Antichain<T>,
237    frontier: Antichain<T>,
238}
239
240impl<K, V, T, D> Listen<K, V, T, D>
241where
242    K: Debug + Codec,
243    V: Debug + Codec,
244    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
245    D: Monoid + Codec64 + Send + Sync,
246{
247    async fn new(
248        mut handle: ReadHandle<K, V, T, D>,
249        as_of: Antichain<T>,
250    ) -> Result<Self, Since<T>> {
251        let () = handle.machine.verify_listen(&as_of)?;
252
253        let since = as_of.clone();
254        if !PartialOrder::less_equal(handle.since(), &since) {
255            // We can't guarantee that the as-of will be available by the time we start reading,
256            // since our handle is already downgraded too far.
257            return Err(Since(handle.since().clone()));
258        }
259        // This listen only needs to distinguish things after its frontier
260        // (initially as_of although the frontier is inclusive and the as_of
261        // isn't). Be a good citizen and downgrade early.
262        handle.downgrade_since(&since).await;
263
264        let watch = handle.machine.applier.watch();
265        Ok(Listen {
266            handle,
267            watch,
268            since,
269            frontier: as_of.clone(),
270            as_of,
271        })
272    }
273
274    /// An exclusive upper bound on the progress of this Listen.
275    pub fn frontier(&self) -> &Antichain<T> {
276        &self.frontier
277    }
278
279    /// Attempt to pull out the next values of this subscription.
280    ///
281    /// The returned [`LeasedBatchPart`] is appropriate to use with
282    /// `crate::fetch::fetch_leased_part`.
283    ///
284    /// The returned `Antichain` represents the subscription progress as it will
285    /// be _after_ the returned parts are fetched.
286    pub async fn next(
287        &mut self,
288        // If Some, an override for the default listen sleep retry parameters.
289        retry: Option<RetryParameters>,
290    ) -> (Vec<LeasedBatchPart<T>>, Antichain<T>) {
291        let batch = loop {
292            let min_elapsed = self.handle.heartbeat_duration();
293            let next_batch = self.handle.machine.next_listen_batch(
294                &self.frontier,
295                &mut self.watch,
296                Some(&self.handle.reader_id),
297                retry,
298            );
299            match tokio::time::timeout(min_elapsed, next_batch).await {
300                Ok(batch) => break batch,
301                Err(_elapsed) => {
302                    self.handle.maybe_downgrade_since(&self.since).await;
303                }
304            }
305        };
306
307        // A lot of things across mz have to line up to hold the following
308        // invariant and violations only show up as subtle correctness errors,
309        // so explicitly validate it here. Better to panic and roll back a
310        // release than be incorrect (also potentially corrupting a sink).
311        //
312        // Note that the since check is intentionally less_than, not less_equal.
313        // If a batch's since is X, that means we can no longer distinguish X
314        // (beyond self.frontier) from X-1 (not beyond self.frontier) to keep
315        // former and filter out the latter.
316        let acceptable_desc = PartialOrder::less_than(batch.desc.since(), &self.frontier)
317            // Special case when the frontier == the as_of (i.e. the first
318            // time this is called on a new Listen). Because as_of is
319            // _exclusive_, we don't need to be able to distinguish X from
320            // X-1.
321            || (self.frontier == self.as_of
322            && PartialOrder::less_equal(batch.desc.since(), &self.frontier));
323        if !acceptable_desc {
324            let lease_state = self
325                .handle
326                .machine
327                .applier
328                .reader_lease(self.handle.reader_id.clone());
329            if let Some(lease) = lease_state {
330                panic!(
331                    "Listen on {} received a batch {:?} advanced past the listen frontier {:?}, but the lease has not expired: {:?}",
332                    self.handle.machine.shard_id(),
333                    batch.desc,
334                    self.frontier,
335                    lease
336                )
337            } else {
338                // Ideally we'd percolate this error up, so callers could eg. restart a dataflow
339                // instead of restarting a process...
340                halt!(
341                    "Listen on {} received a batch {:?} advanced past the listen frontier {:?} after the reader has expired. \
342                     This can happen in exceptional cases: a machine goes to sleep or is running out of memory or CPU, for example.",
343                    self.handle.machine.shard_id(),
344                    batch.desc,
345                    self.frontier
346                )
347            }
348        }
349
350        let new_frontier = batch.desc.upper().clone();
351
352        // We will have a new frontier, so this is an opportunity to downgrade our
353        // since capability. Go through `maybe_heartbeat` so we can rate limit
354        // this along with our heartbeats.
355        //
356        // HACK! Everything would be simpler if we could downgrade since to the
357        // new frontier, but we can't. The next call needs to be able to
358        // distinguish between the times T at the frontier (to emit updates with
359        // these times) and T-1 (to filter them). Advancing the since to
360        // frontier would erase the ability to distinguish between them. Ideally
361        // we'd use what is conceptually "batch.upper - 1" (the greatest
362        // elements that are still strictly less than batch.upper, which will be
363        // the new value of self.frontier after this call returns), but the
364        // trait bounds on T don't give us a way to compute that directly.
365        // Instead, we sniff out any elements in self.frontier (the upper of the
366        // batch the last time we called this) that are strictly less_than the
367        // batch upper to compute a new since. For totally ordered times
368        // (currently always the case in mz) self.frontier will always have a
369        // single element and it will be less_than upper. We
370        // could also abuse the fact that every time we actually emit is
371        // guaranteed by definition to be less_than upper to be a bit more
372        // prompt, but this would involve a lot more temporary antichains and
373        // it's unclear if that's worth it.
374        for x in self.frontier.elements().iter() {
375            let less_than_upper = batch.desc.upper().elements().iter().any(|u| x.less_than(u));
376            if less_than_upper {
377                self.since.join_assign(&Antichain::from_elem(x.clone()));
378            }
379        }
380
381        // IMPORTANT! Make sure this `lease_batch_parts` stays before the
382        // `maybe_downgrade_since` call. Otherwise, we might give up our
383        // capability on the batch's SeqNo before we lease it, which could lead
384        // to blobs that it references being GC'd.
385        let filter = FetchBatchFilter::Listen {
386            as_of: self.as_of.clone(),
387            lower: self.frontier.clone(),
388        };
389        let parts = self.handle.lease_batch_parts(batch, filter).collect().await;
390
391        self.handle.maybe_downgrade_since(&self.since).await;
392
393        // NB: Keep this after we use self.frontier to join_assign self.since
394        // and also after we construct metadata.
395        self.frontier = new_frontier;
396
397        (parts, self.frontier.clone())
398    }
399}
400
401impl<K, V, T, D> Listen<K, V, T, D>
402where
403    K: Debug + Codec,
404    V: Debug + Codec,
405    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
406    D: Monoid + Codec64 + Send + Sync,
407{
408    /// Attempt to pull out the next values of this subscription.
409    ///
410    /// The updates received in [ListenEvent::Updates] should be assumed to be in arbitrary order
411    /// and not necessarily consolidated. However, the timestamp of each individual update will be
412    /// greater than or equal to the last received [ListenEvent::Progress] frontier (or this
413    /// [Listen]'s initial `as_of` frontier if no progress event has been emitted yet) and less
414    /// than the next [ListenEvent::Progress] frontier.
415    ///
416    /// If you have a use for consolidated listen output, given that snapshots can't be
417    /// consolidated, come talk to us!
418    #[instrument(level = "debug", name = "listen::next", fields(shard = %self.handle.machine.shard_id()))]
419    pub async fn fetch_next(
420        &mut self,
421    ) -> Vec<ListenEvent<T, ((Result<K, String>, Result<V, String>), T, D)>> {
422        let (parts, progress) = self.next(None).await;
423        let mut ret = Vec::with_capacity(parts.len() + 1);
424        for part in parts {
425            let fetched_part = self.fetch_batch_part(part).await;
426            let updates = fetched_part.collect::<Vec<_>>();
427            if !updates.is_empty() {
428                ret.push(ListenEvent::Updates(updates));
429            }
430        }
431        ret.push(ListenEvent::Progress(progress));
432        ret
433    }
434
435    /// Convert listener into futures::Stream
436    pub fn into_stream(
437        mut self,
438    ) -> impl Stream<Item = ListenEvent<T, ((Result<K, String>, Result<V, String>), T, D)>> {
439        async_stream::stream!({
440            loop {
441                for msg in self.fetch_next().await {
442                    yield msg;
443                }
444            }
445        })
446    }
447
448    /// Test helper to read from the listener until the given frontier is
449    /// reached. Because compaction can arbitrarily combine batches, we only
450    /// return the final progress info.
451    #[cfg(test)]
452    #[track_caller]
453    pub async fn read_until(
454        &mut self,
455        ts: &T,
456    ) -> (
457        Vec<((Result<K, String>, Result<V, String>), T, D)>,
458        Antichain<T>,
459    ) {
460        let mut updates = Vec::new();
461        let mut frontier = Antichain::from_elem(T::minimum());
462        while self.frontier.less_than(ts) {
463            for event in self.fetch_next().await {
464                match event {
465                    ListenEvent::Updates(mut x) => updates.append(&mut x),
466                    ListenEvent::Progress(x) => frontier = x,
467                }
468            }
469        }
470        // Unlike most tests, intentionally don't consolidate updates here
471        // because Listen replays them at the original fidelity.
472        (updates, frontier)
473    }
474}
475
476impl<K, V, T, D> Listen<K, V, T, D>
477where
478    K: Debug + Codec,
479    V: Debug + Codec,
480    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
481    D: Monoid + Codec64 + Send + Sync,
482{
483    /// Fetches the contents of `part` and returns its lease.
484    ///
485    /// This is broken out into its own function to provide a trivial means for
486    /// [`Subscribe`], which contains a [`Listen`], to fetch batches.
487    async fn fetch_batch_part(&mut self, part: LeasedBatchPart<T>) -> FetchedPart<K, V, T, D> {
488        let fetched_part = fetch_leased_part(
489            &self.handle.cfg,
490            &part,
491            self.handle.blob.as_ref(),
492            Arc::clone(&self.handle.metrics),
493            &self.handle.metrics.read.listen,
494            &self.handle.machine.applier.shard_metrics,
495            &self.handle.reader_id,
496            self.handle.read_schemas.clone(),
497            &mut self.handle.schema_cache,
498        )
499        .await;
500        fetched_part
501    }
502
503    /// Politely expires this listen, releasing its lease.
504    ///
505    /// There is a best-effort impl in Drop for [`ReadHandle`] to expire the
506    /// [`ReadHandle`] held by the listen that wasn't explicitly expired with
507    /// this method. When possible, explicit expiry is still preferred because
508    /// the Drop one is best effort and is dependant on a tokio [Handle] being
509    /// available in the TLC at the time of drop (which is a bit subtle). Also,
510    /// explicit expiry allows for control over when it happens.
511    pub async fn expire(self) {
512        self.handle.expire().await
513    }
514}
515
516/// A "capability" granting the ability to read the state of some shard at times
517/// greater or equal to `self.since()`.
518///
519/// Production users should call [Self::expire] before dropping a ReadHandle so
520/// that it can expire its leases. If/when rust gets AsyncDrop, this will be
521/// done automatically.
522///
523/// All async methods on ReadHandle retry for as long as they are able, but the
524/// returned [std::future::Future]s implement "cancel on drop" semantics. This
525/// means that callers can add a timeout using [tokio::time::timeout] or
526/// [tokio::time::timeout_at].
527///
528/// ```rust,no_run
529/// # let mut read: mz_persist_client::read::ReadHandle<String, String, u64, i64> = unimplemented!();
530/// # let timeout: std::time::Duration = unimplemented!();
531/// # let new_since: timely::progress::Antichain<u64> = unimplemented!();
532/// # async {
533/// tokio::time::timeout(timeout, read.downgrade_since(&new_since)).await
534/// # };
535/// ```
536#[derive(Debug)]
537pub struct ReadHandle<K: Codec, V: Codec, T, D> {
538    pub(crate) cfg: PersistConfig,
539    pub(crate) metrics: Arc<Metrics>,
540    pub(crate) machine: Machine<K, V, T, D>,
541    pub(crate) gc: GarbageCollector<K, V, T, D>,
542    pub(crate) blob: Arc<dyn Blob>,
543    pub(crate) reader_id: LeasedReaderId,
544    pub(crate) read_schemas: Schemas<K, V>,
545    pub(crate) schema_cache: SchemaCache<K, V, T, D>,
546
547    since: Antichain<T>,
548    pub(crate) last_heartbeat: EpochMillis,
549    pub(crate) leased_seqnos: BTreeMap<SeqNo, Lease>,
550    pub(crate) unexpired_state: Option<UnexpiredReadHandleState>,
551}
552
553/// Length of time after a reader's last operation after which the reader may be
554/// expired.
555pub(crate) const READER_LEASE_DURATION: Config<Duration> = Config::new(
556    "persist_reader_lease_duration",
557    Duration::from_secs(60 * 15),
558    "The time after which we'll clean up stale read leases",
559);
560
561impl<K, V, T, D> ReadHandle<K, V, T, D>
562where
563    K: Debug + Codec,
564    V: Debug + Codec,
565    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
566    D: Monoid + Codec64 + Send + Sync,
567{
568    pub(crate) async fn new(
569        cfg: PersistConfig,
570        metrics: Arc<Metrics>,
571        machine: Machine<K, V, T, D>,
572        gc: GarbageCollector<K, V, T, D>,
573        blob: Arc<dyn Blob>,
574        reader_id: LeasedReaderId,
575        read_schemas: Schemas<K, V>,
576        since: Antichain<T>,
577        last_heartbeat: EpochMillis,
578    ) -> Self {
579        let schema_cache = machine.applier.schema_cache();
580        let expire_fn = Self::expire_fn(machine.clone(), gc.clone(), reader_id.clone());
581        ReadHandle {
582            cfg,
583            metrics: Arc::clone(&metrics),
584            machine: machine.clone(),
585            gc: gc.clone(),
586            blob,
587            reader_id: reader_id.clone(),
588            read_schemas,
589            schema_cache,
590            since,
591            last_heartbeat,
592            leased_seqnos: BTreeMap::new(),
593            unexpired_state: Some(UnexpiredReadHandleState {
594                expire_fn,
595                _heartbeat_tasks: machine
596                    .start_reader_heartbeat_tasks(reader_id, gc)
597                    .await
598                    .into_iter()
599                    .map(JoinHandle::abort_on_drop)
600                    .collect(),
601            }),
602        }
603    }
604
605    /// This handle's shard id.
606    pub fn shard_id(&self) -> ShardId {
607        self.machine.shard_id()
608    }
609
610    /// This handle's `since` frontier.
611    ///
612    /// This will always be greater or equal to the shard-global `since`.
613    pub fn since(&self) -> &Antichain<T> {
614        &self.since
615    }
616
617    fn outstanding_seqno(&mut self) -> Option<SeqNo> {
618        while let Some(first) = self.leased_seqnos.first_entry() {
619            if first.get().count() <= 1 {
620                first.remove();
621            } else {
622                return Some(*first.key());
623            }
624        }
625        None
626    }
627
628    /// Forwards the since frontier of this handle, giving up the ability to
629    /// read at times not greater or equal to `new_since`.
630    ///
631    /// This may trigger (asynchronous) compaction and consolidation in the
632    /// system. A `new_since` of the empty antichain "finishes" this shard,
633    /// promising that no more data will ever be read by this handle.
634    ///
635    /// This also acts as a heartbeat for the reader lease (including if called
636    /// with `new_since` equal to something like `self.since()` or the minimum
637    /// timestamp, making the call a no-op).
638    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
639    pub async fn downgrade_since(&mut self, new_since: &Antichain<T>) {
640        // Guaranteed to be the smallest/oldest outstanding lease on a `SeqNo`.
641        let outstanding_seqno = self.outstanding_seqno();
642
643        let heartbeat_ts = (self.cfg.now)();
644        let (_seqno, current_reader_since, maintenance) = self
645            .machine
646            .downgrade_since(&self.reader_id, outstanding_seqno, new_since, heartbeat_ts)
647            .await;
648
649        // Debugging for database-issues#4590.
650        if let Some(outstanding_seqno) = outstanding_seqno {
651            let seqnos_held = _seqno.0.saturating_sub(outstanding_seqno.0);
652            // We get just over 1 seqno-per-second on average for a shard in
653            // prod, so this is about an hour.
654            const SEQNOS_HELD_THRESHOLD: u64 = 60 * 60;
655            if seqnos_held >= SEQNOS_HELD_THRESHOLD {
656                tracing::info!(
657                    "{} reader {} holding an unexpected number of seqnos {} vs {}: {:?}. bt: {:?}",
658                    self.machine.shard_id(),
659                    self.reader_id,
660                    outstanding_seqno,
661                    _seqno,
662                    self.leased_seqnos.keys().take(10).collect::<Vec<_>>(),
663                    // The Debug impl of backtrace is less aesthetic, but will put the trace
664                    // on a single line and play more nicely with our Honeycomb quota
665                    Backtrace::force_capture(),
666                );
667            }
668        }
669
670        self.since = current_reader_since.0;
671        // A heartbeat is just any downgrade_since traffic, so update the
672        // internal rate limiter here to play nicely with `maybe_heartbeat`.
673        self.last_heartbeat = heartbeat_ts;
674        maintenance.start_performing(&self.machine, &self.gc);
675    }
676
677    /// Returns an ongoing subscription of updates to a shard.
678    ///
679    /// The stream includes all data at times greater than `as_of`. Combined
680    /// with [Self::snapshot] it will produce exactly correct results: the
681    /// snapshot is the TVCs contents at `as_of` and all subsequent updates
682    /// occur at exactly their indicated time. The recipient should only
683    /// downgrade their read capability when they are certain they have all data
684    /// through the frontier they would downgrade to.
685    ///
686    /// This takes ownership of the ReadHandle so the Listen can use it to
687    /// [Self::downgrade_since] as it progresses. If you need to keep this
688    /// handle, then [Self::clone] it before calling listen.
689    ///
690    /// The `Since` error indicates that the requested `as_of` cannot be served
691    /// (the caller has out of date information) and includes the smallest
692    /// `as_of` that would have been accepted.
693    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
694    pub async fn listen(self, as_of: Antichain<T>) -> Result<Listen<K, V, T, D>, Since<T>> {
695        Listen::new(self, as_of).await
696    }
697
698    /// Returns all of the contents of the shard TVC at `as_of` broken up into
699    /// [`LeasedBatchPart`]es. These parts can be "turned in" via
700    /// `crate::fetch::fetch_batch_part` to receive the data they contain.
701    ///
702    /// This command returns the contents of this shard as of `as_of` once they
703    /// are known. This may "block" (in an async-friendly way) if `as_of` is
704    /// greater or equal to the current `upper` of the shard. The recipient
705    /// should only downgrade their read capability when they are certain they
706    /// have all data through the frontier they would downgrade to.
707    ///
708    /// The `Since` error indicates that the requested `as_of` cannot be served
709    /// (the caller has out of date information) and includes the smallest
710    /// `as_of` that would have been accepted.
711    #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
712    pub async fn snapshot(
713        &mut self,
714        as_of: Antichain<T>,
715    ) -> Result<Vec<LeasedBatchPart<T>>, Since<T>> {
716        let batches = loop {
717            let min_elapsed = self.heartbeat_duration();
718            match tokio::time::timeout(min_elapsed, self.machine.snapshot(&as_of)).await {
719                Ok(Ok(batches)) => break batches,
720                Ok(Err(since)) => return Err(since),
721                Err(_timeout) => {
722                    let since = self.since().clone();
723                    self.maybe_downgrade_since(&since).await;
724                }
725            }
726        };
727
728        if !PartialOrder::less_equal(self.since(), &as_of) {
729            return Err(Since(self.since().clone()));
730        }
731
732        let filter = FetchBatchFilter::Snapshot { as_of };
733        let mut leased_parts = Vec::new();
734        for batch in batches {
735            // Flatten the HollowBatch into one LeasedBatchPart per key. Each key
736            // corresponds to a "part" or s3 object. This allows persist_source
737            // to distribute work by parts (smallish, more even size) instead of
738            // batches (arbitrarily large).
739            leased_parts.extend(
740                self.lease_batch_parts(batch, filter.clone())
741                    .collect::<Vec<_>>()
742                    .await,
743            );
744        }
745        Ok(leased_parts)
746    }
747
748    /// Returns a snapshot of all of a shard's data using `as_of`, followed by
749    /// listening to any future updates.
750    ///
751    /// For more details on this operation's semantics, see [Self::snapshot] and
752    /// [Self::listen].
753    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
754    pub async fn subscribe(
755        mut self,
756        as_of: Antichain<T>,
757    ) -> Result<Subscribe<K, V, T, D>, Since<T>> {
758        let snapshot_parts = self.snapshot(as_of.clone()).await?;
759        let listen = self.listen(as_of.clone()).await?;
760        Ok(Subscribe::new(snapshot_parts, listen))
761    }
762
763    fn lease_batch_part(
764        &mut self,
765        desc: Description<T>,
766        part: BatchPart<T>,
767        filter: FetchBatchFilter<T>,
768    ) -> LeasedBatchPart<T> {
769        LeasedBatchPart {
770            metrics: Arc::clone(&self.metrics),
771            shard_id: self.machine.shard_id(),
772            filter,
773            desc,
774            part,
775            lease: self.lease_seqno(),
776            filter_pushdown_audit: false,
777        }
778    }
779
780    fn lease_batch_parts(
781        &mut self,
782        batch: HollowBatch<T>,
783        filter: FetchBatchFilter<T>,
784    ) -> impl Stream<Item = LeasedBatchPart<T>> + '_ {
785        stream! {
786            let blob = Arc::clone(&self.blob);
787            let metrics = Arc::clone(&self.metrics);
788            let desc = batch.desc.clone();
789            for await part in batch.part_stream(self.shard_id(), &*blob, &*metrics) {
790                yield self.lease_batch_part(desc.clone(), part.expect("leased part").into_owned(), filter.clone())
791            }
792        }
793    }
794
795    /// Tracks that the `ReadHandle`'s machine's current `SeqNo` is being
796    /// "leased out" to a `LeasedBatchPart`, and cannot be garbage
797    /// collected until its lease has been returned.
798    fn lease_seqno(&mut self) -> Lease {
799        let seqno = self.machine.seqno();
800        let lease = self
801            .leased_seqnos
802            .entry(seqno)
803            .or_insert_with(|| Lease::new(seqno));
804        lease.clone()
805    }
806
807    /// Returns an independent [ReadHandle] with a new [LeasedReaderId] but the
808    /// same `since`.
809    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
810    pub async fn clone(&self, purpose: &str) -> Self {
811        let new_reader_id = LeasedReaderId::new();
812        let machine = self.machine.clone();
813        let gc = self.gc.clone();
814        let heartbeat_ts = (self.cfg.now)();
815        let (reader_state, maintenance) = machine
816            .register_leased_reader(
817                &new_reader_id,
818                purpose,
819                READER_LEASE_DURATION.get(&self.cfg),
820                heartbeat_ts,
821                false,
822            )
823            .await;
824        maintenance.start_performing(&machine, &gc);
825        // The point of clone is that you're guaranteed to have the same (or
826        // greater) since capability, verify that.
827        // TODO: better if it's the same since capability exactly.
828        assert!(PartialOrder::less_equal(&reader_state.since, &self.since));
829        let new_reader = ReadHandle::new(
830            self.cfg.clone(),
831            Arc::clone(&self.metrics),
832            machine,
833            gc,
834            Arc::clone(&self.blob),
835            new_reader_id,
836            self.read_schemas.clone(),
837            reader_state.since,
838            heartbeat_ts,
839        )
840        .await;
841        new_reader
842    }
843
844    fn heartbeat_duration(&self) -> Duration {
845        READER_LEASE_DURATION.get(&self.cfg) / 4
846    }
847
848    /// A rate-limited version of [Self::downgrade_since].
849    ///
850    /// This is an internally rate limited helper, designed to allow users to
851    /// call it as frequently as they like. Call this or [Self::downgrade_since],
852    /// on some interval that is "frequent" compared to the read lease duration.
853    pub async fn maybe_downgrade_since(&mut self, new_since: &Antichain<T>) {
854        let min_elapsed = self.heartbeat_duration();
855        let elapsed_since_last_heartbeat =
856            Duration::from_millis((self.cfg.now)().saturating_sub(self.last_heartbeat));
857        if elapsed_since_last_heartbeat >= min_elapsed {
858            self.downgrade_since(new_since).await;
859        }
860    }
861
862    /// Politely expires this reader, releasing its lease.
863    ///
864    /// There is a best-effort impl in Drop to expire a reader that wasn't
865    /// explictly expired with this method. When possible, explicit expiry is
866    /// still preferred because the Drop one is best effort and is dependant on
867    /// a tokio [Handle] being available in the TLC at the time of drop (which
868    /// is a bit subtle). Also, explicit expiry allows for control over when it
869    /// happens.
870    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
871    pub async fn expire(mut self) {
872        // We drop the unexpired state before expiring the reader to ensure the
873        // heartbeat tasks can never observe the expired state. This doesn't
874        // matter for correctness, but avoids confusing log output if the
875        // heartbeat task were to discover that its lease has been expired.
876        let Some(unexpired_state) = self.unexpired_state.take() else {
877            return;
878        };
879        unexpired_state.expire_fn.0().await;
880    }
881
882    fn expire_fn(
883        machine: Machine<K, V, T, D>,
884        gc: GarbageCollector<K, V, T, D>,
885        reader_id: LeasedReaderId,
886    ) -> ExpireFn {
887        ExpireFn(Box::new(move || {
888            Box::pin(async move {
889                let (_, maintenance) = machine.expire_leased_reader(&reader_id).await;
890                maintenance.start_performing(&machine, &gc);
891            })
892        }))
893    }
894
895    /// Test helper for a [Self::listen] call that is expected to succeed.
896    #[cfg(test)]
897    #[track_caller]
898    pub async fn expect_listen(self, as_of: T) -> Listen<K, V, T, D> {
899        self.listen(Antichain::from_elem(as_of))
900            .await
901            .expect("cannot serve requested as_of")
902    }
903}
904
905/// State for a read handle that has not been explicitly expired.
906#[derive(Debug)]
907pub(crate) struct UnexpiredReadHandleState {
908    expire_fn: ExpireFn,
909    pub(crate) _heartbeat_tasks: Vec<AbortOnDropHandle<()>>,
910}
911
912/// An incremental cursor through a particular shard, returned from [ReadHandle::snapshot_cursor].
913///
914/// To read an entire dataset, the
915/// client should call `next` until it returns `None`, which signals all data has been returned...
916/// but it's also free to abandon the instance at any time if it eg. only needs a few entries.
917#[derive(Debug)]
918pub struct Cursor<K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64, L = Lease> {
919    consolidator: Consolidator<T, D, StructuredSort<K, V, T, D>>,
920    max_len: usize,
921    max_bytes: usize,
922    _lease: L,
923    read_schemas: Schemas<K, V>,
924}
925
926impl<K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64, L> Cursor<K, V, T, D, L> {
927    /// Extracts and returns the lease from the cursor. Allowing the caller to
928    /// do any necessary cleanup associated with the lease.
929    pub fn into_lease(self: Self) -> L {
930        self._lease
931    }
932}
933
934impl<K, V, T, D, L> Cursor<K, V, T, D, L>
935where
936    K: Debug + Codec + Ord,
937    V: Debug + Codec + Ord,
938    T: Timestamp + Lattice + Codec64 + Sync,
939    D: Monoid + Ord + Codec64 + Send + Sync,
940{
941    /// Grab the next batch of consolidated data.
942    pub async fn next(
943        &mut self,
944    ) -> Option<impl Iterator<Item = ((Result<K, String>, Result<V, String>), T, D)> + '_> {
945        let Self {
946            consolidator,
947            max_len,
948            max_bytes,
949            _lease,
950            read_schemas: _,
951        } = self;
952
953        let part = consolidator
954            .next_chunk(*max_len, *max_bytes)
955            .await
956            .expect("fetching a leased part")?;
957        let key_decoder = self
958            .read_schemas
959            .key
960            .decoder_any(part.key.as_ref())
961            .expect("ok");
962        let val_decoder = self
963            .read_schemas
964            .val
965            .decoder_any(part.val.as_ref())
966            .expect("ok");
967        let iter = (0..part.len()).map(move |i| {
968            let mut k = K::default();
969            let mut v = V::default();
970            key_decoder.decode(i, &mut k);
971            val_decoder.decode(i, &mut v);
972            let t = T::decode(part.time.value(i).to_le_bytes());
973            let d = D::decode(part.diff.value(i).to_le_bytes());
974            ((Ok(k), Ok(v)), t, d)
975        });
976
977        Some(iter)
978    }
979}
980
981impl<K, V, T, D> ReadHandle<K, V, T, D>
982where
983    K: Debug + Codec + Ord,
984    V: Debug + Codec + Ord,
985    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
986    D: Monoid + Ord + Codec64 + Send + Sync,
987{
988    /// Generates a [Self::snapshot], and fetches all of the batches it
989    /// contains.
990    ///
991    /// The output is consolidated. Furthermore, to keep memory usage down when
992    /// reading a snapshot that consolidates well, this consolidates as it goes.
993    ///
994    /// Potential future improvements (if necessary):
995    /// - Accept something like a `F: Fn(K,V) -> (K,V)` argument, which looks
996    ///   like an MFP you might be pushing down. Reason being that if you are
997    ///   projecting or transforming in a way that allows further consolidation,
998    ///   amazing.
999    /// - Reuse any code we write to streaming-merge consolidate in
1000    ///   persist_source here.
1001    pub async fn snapshot_and_fetch(
1002        &mut self,
1003        as_of: Antichain<T>,
1004    ) -> Result<Vec<((Result<K, String>, Result<V, String>), T, D)>, Since<T>> {
1005        let mut cursor = self.snapshot_cursor(as_of, |_| true).await?;
1006        let mut contents = Vec::new();
1007        while let Some(iter) = cursor.next().await {
1008            contents.extend(iter);
1009        }
1010
1011        // We don't currently guarantee that encoding is one-to-one, so we still need to
1012        // consolidate the decoded outputs. However, let's report if this isn't a noop.
1013        let old_len = contents.len();
1014        consolidate_updates(&mut contents);
1015        if old_len != contents.len() {
1016            // TODO(bkirwi): do we need more / finer-grained metrics for this?
1017            self.machine
1018                .applier
1019                .shard_metrics
1020                .unconsolidated_snapshot
1021                .inc();
1022        }
1023
1024        Ok(contents)
1025    }
1026
1027    /// Generates a [Self::snapshot], and fetches all of the batches it
1028    /// contains.
1029    ///
1030    /// To keep memory usage down when reading a snapshot that consolidates well, this consolidates
1031    /// as it goes. However, note that only the serialized data is consolidated: the deserialized
1032    /// data will only be consolidated if your K/V codecs are one-to-one.
1033    pub async fn snapshot_cursor(
1034        &mut self,
1035        as_of: Antichain<T>,
1036        should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
1037    ) -> Result<Cursor<K, V, T, D>, Since<T>> {
1038        let batches = self.machine.snapshot(&as_of).await?;
1039        let lease = self.lease_seqno();
1040
1041        Self::read_batches_consolidated(
1042            &self.cfg,
1043            Arc::clone(&self.metrics),
1044            Arc::clone(&self.machine.applier.shard_metrics),
1045            self.metrics.read.snapshot.clone(),
1046            Arc::clone(&self.blob),
1047            self.shard_id(),
1048            as_of,
1049            self.read_schemas.clone(),
1050            &batches,
1051            lease,
1052            should_fetch_part,
1053            COMPACTION_MEMORY_BOUND_BYTES.get(&self.cfg),
1054        )
1055    }
1056
1057    pub(crate) fn read_batches_consolidated<L>(
1058        persist_cfg: &PersistConfig,
1059        metrics: Arc<Metrics>,
1060        shard_metrics: Arc<ShardMetrics>,
1061        read_metrics: ReadMetrics,
1062        blob: Arc<dyn Blob>,
1063        shard_id: ShardId,
1064        as_of: Antichain<T>,
1065        schemas: Schemas<K, V>,
1066        batches: &[HollowBatch<T>],
1067        lease: L,
1068        should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
1069        memory_budget_bytes: usize,
1070    ) -> Result<Cursor<K, V, T, D, L>, Since<T>> {
1071        let context = format!("{}[as_of={:?}]", shard_id, as_of.elements());
1072        let filter = FetchBatchFilter::Snapshot {
1073            as_of: as_of.clone(),
1074        };
1075
1076        let mut consolidator = Consolidator::new(
1077            context,
1078            FetchConfig::from_persist_config(persist_cfg),
1079            shard_id,
1080            StructuredSort::new(schemas.clone()),
1081            blob,
1082            metrics,
1083            shard_metrics,
1084            read_metrics,
1085            filter,
1086            None,
1087            memory_budget_bytes,
1088        );
1089        for batch in batches {
1090            for (meta, run) in batch.runs() {
1091                consolidator.enqueue_run(
1092                    &batch.desc,
1093                    meta,
1094                    run.into_iter()
1095                        .filter(|p| should_fetch_part(p.stats()))
1096                        .cloned(),
1097                );
1098            }
1099        }
1100        // This default may end up consolidating more records than previously
1101        // for cases like fast-path peeks, where only the first few entries are used.
1102        // If this is a noticeable performance impact, thread the max-len in from the caller.
1103        let max_len = persist_cfg.compaction_yield_after_n_updates;
1104        let max_bytes = BLOB_TARGET_SIZE.get(persist_cfg).max(1);
1105
1106        Ok(Cursor {
1107            consolidator,
1108            max_len,
1109            max_bytes,
1110            _lease: lease,
1111            read_schemas: schemas,
1112        })
1113    }
1114
1115    /// Returns aggregate statistics about the contents of the shard TVC at the
1116    /// given frontier.
1117    ///
1118    /// This command returns the contents of this shard as of `as_of` once they
1119    /// are known. This may "block" (in an async-friendly way) if `as_of` is
1120    /// greater or equal to the current `upper` of the shard. If `None` is given
1121    /// for `as_of`, then the latest stats known by this process are used.
1122    ///
1123    /// The `Since` error indicates that the requested `as_of` cannot be served
1124    /// (the caller has out of date information) and includes the smallest
1125    /// `as_of` that would have been accepted.
1126    pub fn snapshot_stats(
1127        &self,
1128        as_of: Option<Antichain<T>>,
1129    ) -> impl Future<Output = Result<SnapshotStats, Since<T>>> + Send + 'static {
1130        let machine = self.machine.clone();
1131        async move {
1132            let batches = match as_of {
1133                Some(as_of) => machine.snapshot(&as_of).await?,
1134                None => machine.applier.all_batches(),
1135            };
1136            let num_updates = batches.iter().map(|b| b.len).sum();
1137            Ok(SnapshotStats {
1138                shard_id: machine.shard_id(),
1139                num_updates,
1140            })
1141        }
1142    }
1143
1144    /// Returns aggregate statistics about the contents of the shard TVC at the
1145    /// given frontier.
1146    ///
1147    /// This command returns the contents of this shard as of `as_of` once they
1148    /// are known. This may "block" (in an async-friendly way) if `as_of` is
1149    /// greater or equal to the current `upper` of the shard.
1150    ///
1151    /// The `Since` error indicates that the requested `as_of` cannot be served
1152    /// (the caller has out of date information) and includes the smallest
1153    /// `as_of` that would have been accepted.
1154    pub async fn snapshot_parts_stats(
1155        &self,
1156        as_of: Antichain<T>,
1157    ) -> Result<SnapshotPartsStats, Since<T>> {
1158        let batches = self.machine.snapshot(&as_of).await?;
1159        let parts = stream::iter(&batches)
1160            .flat_map(|b| b.part_stream(self.shard_id(), &*self.blob, &*self.metrics))
1161            .map(|p| {
1162                let p = p.expect("live batch");
1163                SnapshotPartStats {
1164                    encoded_size_bytes: p.encoded_size_bytes(),
1165                    stats: p.stats().cloned(),
1166                }
1167            })
1168            .collect()
1169            .await;
1170        Ok(SnapshotPartsStats {
1171            metrics: Arc::clone(&self.machine.applier.metrics),
1172            shard_id: self.machine.shard_id(),
1173            parts,
1174        })
1175    }
1176}
1177
1178impl<K, V, T, D> ReadHandle<K, V, T, D>
1179where
1180    K: Debug + Codec + Ord,
1181    V: Debug + Codec + Ord,
1182    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
1183    D: Monoid + Codec64 + Send + Sync,
1184{
1185    /// Generates a [Self::snapshot], and streams out all of the updates
1186    /// it contains in bounded memory.
1187    ///
1188    /// The output is not consolidated.
1189    pub async fn snapshot_and_stream(
1190        &mut self,
1191        as_of: Antichain<T>,
1192    ) -> Result<
1193        impl Stream<Item = ((Result<K, String>, Result<V, String>), T, D)> + use<K, V, T, D>,
1194        Since<T>,
1195    > {
1196        let snap = self.snapshot(as_of).await?;
1197
1198        let blob = Arc::clone(&self.blob);
1199        let metrics = Arc::clone(&self.metrics);
1200        let snapshot_metrics = self.metrics.read.snapshot.clone();
1201        let shard_metrics = Arc::clone(&self.machine.applier.shard_metrics);
1202        let reader_id = self.reader_id.clone();
1203        let schemas = self.read_schemas.clone();
1204        let mut schema_cache = self.schema_cache.clone();
1205        let persist_cfg = self.cfg.clone();
1206        let stream = async_stream::stream! {
1207            for part in snap {
1208                let mut fetched_part = fetch_leased_part(
1209                    &persist_cfg,
1210                    &part,
1211                    blob.as_ref(),
1212                    Arc::clone(&metrics),
1213                    &snapshot_metrics,
1214                    &shard_metrics,
1215                    &reader_id,
1216                    schemas.clone(),
1217                    &mut schema_cache,
1218                )
1219                .await;
1220
1221                while let Some(next) = fetched_part.next() {
1222                    yield next;
1223                }
1224            }
1225        };
1226
1227        Ok(stream)
1228    }
1229}
1230
1231impl<K, V, T, D> ReadHandle<K, V, T, D>
1232where
1233    K: Debug + Codec + Ord,
1234    V: Debug + Codec + Ord,
1235    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
1236    D: Monoid + Ord + Codec64 + Send + Sync,
1237{
1238    /// Test helper to generate a [Self::snapshot] call that is expected to
1239    /// succeed, process its batches, and then return its data sorted.
1240    #[cfg(test)]
1241    #[track_caller]
1242    pub async fn expect_snapshot_and_fetch(
1243        &mut self,
1244        as_of: T,
1245    ) -> Vec<((Result<K, String>, Result<V, String>), T, D)> {
1246        let mut ret = self
1247            .snapshot_and_fetch(Antichain::from_elem(as_of))
1248            .await
1249            .expect("cannot serve requested as_of");
1250
1251        ret.sort();
1252        ret
1253    }
1254}
1255
1256impl<K: Codec, V: Codec, T, D> Drop for ReadHandle<K, V, T, D> {
1257    fn drop(&mut self) {
1258        // We drop the unexpired state before expiring the reader to ensure the
1259        // heartbeat tasks can never observe the expired state. This doesn't
1260        // matter for correctness, but avoids confusing log output if the
1261        // heartbeat task were to discover that its lease has been expired.
1262        let Some(unexpired_state) = self.unexpired_state.take() else {
1263            return;
1264        };
1265
1266        let handle = match Handle::try_current() {
1267            Ok(x) => x,
1268            Err(_) => {
1269                warn!(
1270                    "ReadHandle {} dropped without being explicitly expired, falling back to lease timeout",
1271                    self.reader_id
1272                );
1273                return;
1274            }
1275        };
1276        // Spawn a best-effort task to expire this read handle. It's fine if
1277        // this doesn't run to completion, we'd just have to wait out the lease
1278        // before the shard-global since is unblocked.
1279        //
1280        // Intentionally create the span outside the task to set the parent.
1281        let expire_span = debug_span!("drop::expire");
1282        handle.spawn_named(
1283            || format!("ReadHandle::expire ({})", self.reader_id),
1284            unexpired_state.expire_fn.0().instrument(expire_span),
1285        );
1286    }
1287}
1288
1289#[cfg(test)]
1290mod tests {
1291    use std::pin;
1292    use std::str::FromStr;
1293
1294    use mz_dyncfg::ConfigUpdates;
1295    use mz_ore::cast::CastFrom;
1296    use mz_ore::metrics::MetricsRegistry;
1297    use mz_persist::mem::{MemBlob, MemBlobConfig, MemConsensus};
1298    use mz_persist::unreliable::{UnreliableConsensus, UnreliableHandle};
1299    use serde::{Deserialize, Serialize};
1300    use serde_json::json;
1301    use tokio_stream::StreamExt;
1302
1303    use crate::async_runtime::IsolatedRuntime;
1304    use crate::batch::BLOB_TARGET_SIZE;
1305    use crate::cache::StateCache;
1306    use crate::internal::metrics::Metrics;
1307    use crate::rpc::NoopPubSubSender;
1308    use crate::tests::{all_ok, new_test_client};
1309    use crate::{Diagnostics, PersistClient, PersistConfig, ShardId};
1310
1311    use super::*;
1312
1313    // Verifies `Subscribe` can be dropped while holding snapshot batches.
1314    #[mz_persist_proc::test(tokio::test)]
1315    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1316    async fn drop_unused_subscribe(dyncfgs: ConfigUpdates) {
1317        let data = [
1318            (("0".to_owned(), "zero".to_owned()), 0, 1),
1319            (("1".to_owned(), "one".to_owned()), 1, 1),
1320            (("2".to_owned(), "two".to_owned()), 2, 1),
1321        ];
1322
1323        let (mut write, read) = new_test_client(&dyncfgs)
1324            .await
1325            .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1326            .await;
1327
1328        write.expect_compare_and_append(&data[0..1], 0, 1).await;
1329        write.expect_compare_and_append(&data[1..2], 1, 2).await;
1330        write.expect_compare_and_append(&data[2..3], 2, 3).await;
1331
1332        let subscribe = read
1333            .subscribe(timely::progress::Antichain::from_elem(2))
1334            .await
1335            .unwrap();
1336        assert!(
1337            !subscribe.snapshot.as_ref().unwrap().is_empty(),
1338            "snapshot must have batches for test to be meaningful"
1339        );
1340        drop(subscribe);
1341    }
1342
1343    // Verifies that we streaming-consolidate away identical key-values in the same batch.
1344    #[mz_persist_proc::test(tokio::test)]
1345    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1346    async fn streaming_consolidate(dyncfgs: ConfigUpdates) {
1347        let data = &[
1348            // Identical records should sum together...
1349            (("k".to_owned(), "v".to_owned()), 0, 1),
1350            (("k".to_owned(), "v".to_owned()), 1, 1),
1351            (("k".to_owned(), "v".to_owned()), 2, 1),
1352            // ...and when they cancel out entirely they should be omitted.
1353            (("k2".to_owned(), "v".to_owned()), 0, 1),
1354            (("k2".to_owned(), "v".to_owned()), 1, -1),
1355        ];
1356
1357        let (mut write, read) = {
1358            let client = new_test_client(&dyncfgs).await;
1359            client.cfg.set_config(&BLOB_TARGET_SIZE, 1000); // So our batch stays together!
1360            client
1361                .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1362                .await
1363        };
1364
1365        write.expect_compare_and_append(data, 0, 5).await;
1366
1367        let mut snapshot = read
1368            .subscribe(timely::progress::Antichain::from_elem(4))
1369            .await
1370            .unwrap();
1371
1372        let mut updates = vec![];
1373        'outer: loop {
1374            for event in snapshot.fetch_next().await {
1375                match event {
1376                    ListenEvent::Progress(t) => {
1377                        if !t.less_than(&4) {
1378                            break 'outer;
1379                        }
1380                    }
1381                    ListenEvent::Updates(data) => {
1382                        updates.extend(data);
1383                    }
1384                }
1385            }
1386        }
1387        assert_eq!(
1388            updates,
1389            &[((Ok("k".to_owned()), Ok("v".to_owned())), 4u64, 3i64)],
1390        )
1391    }
1392
1393    #[mz_persist_proc::test(tokio::test)]
1394    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1395    async fn snapshot_and_stream(dyncfgs: ConfigUpdates) {
1396        let data = &mut [
1397            (("k1".to_owned(), "v1".to_owned()), 0, 1),
1398            (("k2".to_owned(), "v2".to_owned()), 1, 1),
1399            (("k3".to_owned(), "v3".to_owned()), 2, 1),
1400            (("k4".to_owned(), "v4".to_owned()), 2, 1),
1401            (("k5".to_owned(), "v5".to_owned()), 3, 1),
1402        ];
1403
1404        let (mut write, mut read) = {
1405            let client = new_test_client(&dyncfgs).await;
1406            client.cfg.set_config(&BLOB_TARGET_SIZE, 0); // split batches across multiple parts
1407            client
1408                .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1409                .await
1410        };
1411
1412        write.expect_compare_and_append(&data[0..2], 0, 2).await;
1413        write.expect_compare_and_append(&data[2..4], 2, 3).await;
1414        write.expect_compare_and_append(&data[4..], 3, 4).await;
1415
1416        let as_of = Antichain::from_elem(3);
1417        let mut snapshot = pin::pin!(read.snapshot_and_stream(as_of.clone()).await.unwrap());
1418
1419        let mut snapshot_rows = vec![];
1420        while let Some(((k, v), t, d)) = snapshot.next().await {
1421            snapshot_rows.push(((k.expect("valid key"), v.expect("valid key")), t, d));
1422        }
1423
1424        for ((_k, _v), t, _d) in data.as_mut_slice() {
1425            t.advance_by(as_of.borrow());
1426        }
1427
1428        assert_eq!(data.as_slice(), snapshot_rows.as_slice());
1429    }
1430
1431    // Verifies the semantics of `SeqNo` leases + checks dropping `LeasedBatchPart` semantics.
1432    #[mz_persist_proc::test(tokio::test)]
1433    #[cfg_attr(miri, ignore)] // https://github.com/MaterializeInc/database-issues/issues/5964
1434    async fn seqno_leases(dyncfgs: ConfigUpdates) {
1435        let mut data = vec![];
1436        for i in 0..20 {
1437            data.push(((i.to_string(), i.to_string()), i, 1))
1438        }
1439
1440        let shard_id = ShardId::new();
1441
1442        let client = new_test_client(&dyncfgs).await;
1443        let (mut write, read) = client
1444            .expect_open::<String, String, u64, i64>(shard_id)
1445            .await;
1446
1447        // Seed with some values
1448        let mut offset = 0;
1449        let mut width = 2;
1450
1451        for i in offset..offset + width {
1452            write
1453                .expect_compare_and_append(
1454                    &data[i..i + 1],
1455                    u64::cast_from(i),
1456                    u64::cast_from(i) + 1,
1457                )
1458                .await;
1459        }
1460        offset += width;
1461
1462        // Create machinery for subscribe + fetch
1463        let mut fetcher = client
1464            .create_batch_fetcher::<String, String, u64, i64>(
1465                shard_id,
1466                Default::default(),
1467                Default::default(),
1468                false,
1469                Diagnostics::for_tests(),
1470            )
1471            .await
1472            .unwrap();
1473
1474        let mut subscribe = read
1475            .subscribe(timely::progress::Antichain::from_elem(1))
1476            .await
1477            .expect("cannot serve requested as_of");
1478
1479        // Determine sequence number at outset.
1480        let original_seqno_since = subscribe.listen.handle.machine.applier.seqno_since();
1481
1482        let mut parts = vec![];
1483
1484        width = 4;
1485        // Collect parts while continuing to write values
1486        for i in offset..offset + width {
1487            for event in subscribe.next(None).await {
1488                if let ListenEvent::Updates(mut new_parts) = event {
1489                    parts.append(&mut new_parts);
1490                    // Here and elsewhere we "cheat" and immediately downgrade the since
1491                    // to demonstrate the effects of SeqNo leases immediately.
1492                    subscribe
1493                        .listen
1494                        .handle
1495                        .downgrade_since(&subscribe.listen.since)
1496                        .await;
1497                }
1498            }
1499
1500            write
1501                .expect_compare_and_append(
1502                    &data[i..i + 1],
1503                    u64::cast_from(i),
1504                    u64::cast_from(i) + 1,
1505                )
1506                .await;
1507
1508            // SeqNo is not downgraded
1509            assert_eq!(
1510                subscribe.listen.handle.machine.applier.seqno_since(),
1511                original_seqno_since
1512            );
1513        }
1514
1515        offset += width;
1516
1517        let mut seqno_since = subscribe.listen.handle.machine.applier.seqno_since();
1518
1519        // We're starting out with the original, non-downgraded SeqNo
1520        assert_eq!(seqno_since, original_seqno_since);
1521
1522        // We have to handle the parts we generate during the next loop to
1523        // ensure they don't panic.
1524        let mut subsequent_parts = vec![];
1525
1526        // Ensure monotonicity of seqnos we're processing, otherwise the
1527        // invariant we're testing (returning the last part of a seqno will
1528        // downgrade its since) will not hold.
1529        let mut this_seqno = SeqNo::minimum();
1530
1531        // Repeat the same process as above, more or less, while fetching + returning parts
1532        for (mut i, part) in parts.into_iter().enumerate() {
1533            let part_seqno = part.lease.seqno();
1534            let last_seqno = this_seqno;
1535            this_seqno = part_seqno;
1536            assert!(this_seqno >= last_seqno);
1537
1538            let (part, lease) = part.into_exchangeable_part();
1539            let _ = fetcher.fetch_leased_part(part).await;
1540            drop(lease);
1541
1542            // Simulates an exchange
1543            for event in subscribe.next(None).await {
1544                if let ListenEvent::Updates(parts) = event {
1545                    for part in parts {
1546                        let (_, lease) = part.into_exchangeable_part();
1547                        subsequent_parts.push(lease);
1548                    }
1549                }
1550            }
1551
1552            subscribe
1553                .listen
1554                .handle
1555                .downgrade_since(&subscribe.listen.since)
1556                .await;
1557
1558            // Write more new values
1559            i += offset;
1560            write
1561                .expect_compare_and_append(
1562                    &data[i..i + 1],
1563                    u64::cast_from(i),
1564                    u64::cast_from(i) + 1,
1565                )
1566                .await;
1567
1568            // We should expect the SeqNo to be downgraded if this part's SeqNo
1569            // is no longer leased to any other parts, either.
1570            let expect_downgrade = subscribe.listen.handle.outstanding_seqno() > Some(part_seqno);
1571
1572            let new_seqno_since = subscribe.listen.handle.machine.applier.seqno_since();
1573            if expect_downgrade {
1574                assert!(new_seqno_since > seqno_since);
1575            } else {
1576                assert_eq!(new_seqno_since, seqno_since);
1577            }
1578            seqno_since = new_seqno_since;
1579        }
1580
1581        // SeqNo since was downgraded
1582        assert!(seqno_since > original_seqno_since);
1583
1584        // Return any outstanding parts, to prevent a panic!
1585        drop(subsequent_parts);
1586        drop(subscribe);
1587    }
1588
1589    #[mz_ore::test]
1590    fn reader_id_human_readable_serde() {
1591        #[derive(Debug, Serialize, Deserialize)]
1592        struct Container {
1593            reader_id: LeasedReaderId,
1594        }
1595
1596        // roundtrip through json
1597        let id =
1598            LeasedReaderId::from_str("r00000000-1234-5678-0000-000000000000").expect("valid id");
1599        assert_eq!(
1600            id,
1601            serde_json::from_value(serde_json::to_value(id.clone()).expect("serializable"))
1602                .expect("deserializable")
1603        );
1604
1605        // deserialize a serialized string directly
1606        assert_eq!(
1607            id,
1608            serde_json::from_str("\"r00000000-1234-5678-0000-000000000000\"")
1609                .expect("deserializable")
1610        );
1611
1612        // roundtrip id through a container type
1613        let json = json!({ "reader_id": id });
1614        assert_eq!(
1615            "{\"reader_id\":\"r00000000-1234-5678-0000-000000000000\"}",
1616            &json.to_string()
1617        );
1618        let container: Container = serde_json::from_value(json).expect("deserializable");
1619        assert_eq!(container.reader_id, id);
1620    }
1621
1622    // Verifies performance optimizations where a Listener doesn't fetch the
1623    // latest Consensus state if the one it currently has can serve the next
1624    // request.
1625    #[mz_ore::test(tokio::test)]
1626    #[cfg_attr(miri, ignore)] // too slow
1627    async fn skip_consensus_fetch_optimization() {
1628        let data = vec![
1629            (("0".to_owned(), "zero".to_owned()), 0, 1),
1630            (("1".to_owned(), "one".to_owned()), 1, 1),
1631            (("2".to_owned(), "two".to_owned()), 2, 1),
1632        ];
1633
1634        let cfg = PersistConfig::new_for_tests();
1635        let blob = Arc::new(MemBlob::open(MemBlobConfig::default()));
1636        let consensus = Arc::new(MemConsensus::default());
1637        let unreliable = UnreliableHandle::default();
1638        unreliable.totally_available();
1639        let consensus = Arc::new(UnreliableConsensus::new(consensus, unreliable.clone()));
1640        let metrics = Arc::new(Metrics::new(&cfg, &MetricsRegistry::new()));
1641        let pubsub_sender = Arc::new(NoopPubSubSender);
1642        let (mut write, mut read) = PersistClient::new(
1643            cfg,
1644            blob,
1645            consensus,
1646            metrics,
1647            Arc::new(IsolatedRuntime::new_for_tests()),
1648            Arc::new(StateCache::new_no_metrics()),
1649            pubsub_sender,
1650        )
1651        .expect("client construction failed")
1652        .expect_open::<String, String, u64, i64>(ShardId::new())
1653        .await;
1654
1655        write.expect_compare_and_append(&data[0..1], 0, 1).await;
1656        write.expect_compare_and_append(&data[1..2], 1, 2).await;
1657        write.expect_compare_and_append(&data[2..3], 2, 3).await;
1658
1659        let snapshot = read.expect_snapshot_and_fetch(2).await;
1660        let mut listen = read.expect_listen(0).await;
1661
1662        // Manually advance the listener's machine so that it has the latest
1663        // state by fetching the first events from next. This is awkward but
1664        // only necessary because we're about to do some weird things with
1665        // unreliable.
1666        let listen_actual = listen.fetch_next().await;
1667        let expected_events = vec![ListenEvent::Progress(Antichain::from_elem(1))];
1668        assert_eq!(listen_actual, expected_events);
1669
1670        // At this point, the snapshot and listen's state should have all the
1671        // writes. Test this by making consensus completely unavailable.
1672        unreliable.totally_unavailable();
1673        assert_eq!(snapshot, all_ok(&data, 2));
1674        assert_eq!(
1675            listen.read_until(&3).await,
1676            (all_ok(&data[1..], 1), Antichain::from_elem(3))
1677        );
1678    }
1679}