mz_persist_client/
read.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Read capabilities and handles
11
12use async_stream::stream;
13use std::backtrace::Backtrace;
14use std::collections::BTreeMap;
15use std::fmt::Debug;
16use std::future::Future;
17use std::sync::Arc;
18use std::time::Duration;
19
20use differential_dataflow::consolidation::consolidate_updates;
21use differential_dataflow::difference::Semigroup;
22use differential_dataflow::lattice::Lattice;
23use differential_dataflow::trace::Description;
24use futures::Stream;
25use futures_util::{StreamExt, stream};
26use mz_dyncfg::Config;
27use mz_ore::halt;
28use mz_ore::instrument;
29use mz_ore::now::EpochMillis;
30use mz_ore::task::{AbortOnDropHandle, JoinHandle, RuntimeExt};
31use mz_persist::location::{Blob, SeqNo};
32use mz_persist_types::columnar::{ColumnDecoder, Schema};
33use mz_persist_types::{Codec, Codec64};
34use proptest_derive::Arbitrary;
35use serde::{Deserialize, Serialize};
36use timely::PartialOrder;
37use timely::order::TotalOrder;
38use timely::progress::{Antichain, Timestamp};
39use tokio::runtime::Handle;
40use tracing::{Instrument, debug_span, warn};
41use uuid::Uuid;
42
43use crate::batch::BLOB_TARGET_SIZE;
44use crate::cfg::{COMPACTION_MEMORY_BOUND_BYTES, RetryParameters};
45use crate::fetch::FetchConfig;
46use crate::fetch::{FetchBatchFilter, FetchedPart, Lease, LeasedBatchPart, fetch_leased_part};
47use crate::internal::encoding::Schemas;
48use crate::internal::machine::{ExpireFn, Machine};
49use crate::internal::metrics::{Metrics, ReadMetrics, ShardMetrics};
50use crate::internal::state::{BatchPart, HollowBatch};
51use crate::internal::watch::StateWatch;
52use crate::iter::{Consolidator, StructuredSort};
53use crate::schema::SchemaCache;
54use crate::stats::{SnapshotPartStats, SnapshotPartsStats, SnapshotStats};
55use crate::{GarbageCollector, PersistConfig, ShardId, parse_id};
56
57pub use crate::internal::encoding::LazyPartStats;
58pub use crate::internal::state::Since;
59
60/// An opaque identifier for a reader of a persist durable TVC (aka shard).
61#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
62#[serde(try_from = "String", into = "String")]
63pub struct LeasedReaderId(pub(crate) [u8; 16]);
64
65impl std::fmt::Display for LeasedReaderId {
66    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67        write!(f, "r{}", Uuid::from_bytes(self.0))
68    }
69}
70
71impl std::fmt::Debug for LeasedReaderId {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        write!(f, "LeasedReaderId({})", Uuid::from_bytes(self.0))
74    }
75}
76
77impl std::str::FromStr for LeasedReaderId {
78    type Err = String;
79
80    fn from_str(s: &str) -> Result<Self, Self::Err> {
81        parse_id("r", "LeasedReaderId", s).map(LeasedReaderId)
82    }
83}
84
85impl From<LeasedReaderId> for String {
86    fn from(reader_id: LeasedReaderId) -> Self {
87        reader_id.to_string()
88    }
89}
90
91impl TryFrom<String> for LeasedReaderId {
92    type Error = String;
93
94    fn try_from(s: String) -> Result<Self, Self::Error> {
95        s.parse()
96    }
97}
98
99impl LeasedReaderId {
100    pub(crate) fn new() -> Self {
101        LeasedReaderId(*Uuid::new_v4().as_bytes())
102    }
103}
104
105/// Capable of generating a snapshot of all data at `as_of`, followed by a
106/// listen of all updates.
107///
108/// For more details, see [`ReadHandle::snapshot`] and [`Listen`].
109#[derive(Debug)]
110pub struct Subscribe<K: Codec, V: Codec, T, D> {
111    snapshot: Option<Vec<LeasedBatchPart<T>>>,
112    listen: Listen<K, V, T, D>,
113}
114
115impl<K, V, T, D> Subscribe<K, V, T, D>
116where
117    K: Debug + Codec,
118    V: Debug + Codec,
119    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
120    D: Semigroup + Codec64 + Send + Sync,
121{
122    fn new(snapshot_parts: Vec<LeasedBatchPart<T>>, listen: Listen<K, V, T, D>) -> Self {
123        Subscribe {
124            snapshot: Some(snapshot_parts),
125            listen,
126        }
127    }
128
129    /// Returns a `LeasedBatchPart` enriched with the proper metadata.
130    ///
131    /// First returns snapshot parts, until they're exhausted, at which point
132    /// begins returning listen parts.
133    ///
134    /// The returned `Antichain` represents the subscription progress as it will
135    /// be _after_ the returned parts are fetched.
136    #[instrument(level = "debug", fields(shard = %self.listen.handle.machine.shard_id()))]
137    pub async fn next(
138        &mut self,
139        // If Some, an override for the default listen sleep retry parameters.
140        listen_retry: Option<RetryParameters>,
141    ) -> Vec<ListenEvent<T, LeasedBatchPart<T>>> {
142        match self.snapshot.take() {
143            Some(parts) => vec![ListenEvent::Updates(parts)],
144            None => {
145                let (parts, upper) = self.listen.next(listen_retry).await;
146                vec![ListenEvent::Updates(parts), ListenEvent::Progress(upper)]
147            }
148        }
149    }
150}
151
152impl<K, V, T, D> Subscribe<K, V, T, D>
153where
154    K: Debug + Codec,
155    V: Debug + Codec,
156    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
157    D: Semigroup + Codec64 + Send + Sync,
158{
159    /// Equivalent to `next`, but rather than returning a [`LeasedBatchPart`],
160    /// fetches and returns the data from within it.
161    #[instrument(level = "debug", fields(shard = %self.listen.handle.machine.shard_id()))]
162    pub async fn fetch_next(
163        &mut self,
164    ) -> Vec<ListenEvent<T, ((Result<K, String>, Result<V, String>), T, D)>> {
165        let events = self.next(None).await;
166        let new_len = events
167            .iter()
168            .map(|event| match event {
169                ListenEvent::Updates(parts) => parts.len(),
170                ListenEvent::Progress(_) => 1,
171            })
172            .sum();
173        let mut ret = Vec::with_capacity(new_len);
174        for event in events {
175            match event {
176                ListenEvent::Updates(parts) => {
177                    for part in parts {
178                        let fetched_part = self.listen.fetch_batch_part(part).await;
179                        let updates = fetched_part.collect::<Vec<_>>();
180                        if !updates.is_empty() {
181                            ret.push(ListenEvent::Updates(updates));
182                        }
183                    }
184                }
185                ListenEvent::Progress(progress) => ret.push(ListenEvent::Progress(progress)),
186            }
187        }
188        ret
189    }
190
191    /// Fetches the contents of `part` and returns its lease.
192    pub async fn fetch_batch_part(&mut self, part: LeasedBatchPart<T>) -> FetchedPart<K, V, T, D> {
193        self.listen.fetch_batch_part(part).await
194    }
195}
196
197impl<K, V, T, D> Subscribe<K, V, T, D>
198where
199    K: Debug + Codec,
200    V: Debug + Codec,
201    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
202    D: Semigroup + Codec64 + Send + Sync,
203{
204    /// Politely expires this subscribe, releasing its lease.
205    ///
206    /// There is a best-effort impl in Drop for [`ReadHandle`] to expire the
207    /// [`ReadHandle`] held by the subscribe that wasn't explicitly expired
208    /// with this method. When possible, explicit expiry is still preferred
209    /// because the Drop one is best effort and is dependant on a tokio
210    /// [Handle] being available in the TLC at the time of drop (which is a bit
211    /// subtle). Also, explicit expiry allows for control over when it happens.
212    pub async fn expire(mut self) {
213        let _ = self.snapshot.take(); // Drop all leased parts.
214        self.listen.expire().await;
215    }
216}
217
218/// Data and progress events of a shard subscription.
219///
220/// TODO: Unify this with [timely::dataflow::operators::capture::event::Event].
221#[derive(Debug, PartialEq)]
222pub enum ListenEvent<T, D> {
223    /// Progress of the shard.
224    Progress(Antichain<T>),
225    /// Data of the shard.
226    Updates(Vec<D>),
227}
228
229/// An ongoing subscription of updates to a shard.
230#[derive(Debug)]
231pub struct Listen<K: Codec, V: Codec, T, D> {
232    handle: ReadHandle<K, V, T, D>,
233    watch: StateWatch<K, V, T, D>,
234
235    as_of: Antichain<T>,
236    since: Antichain<T>,
237    frontier: Antichain<T>,
238}
239
240impl<K, V, T, D> Listen<K, V, T, D>
241where
242    K: Debug + Codec,
243    V: Debug + Codec,
244    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
245    D: Semigroup + Codec64 + Send + Sync,
246{
247    async fn new(
248        mut handle: ReadHandle<K, V, T, D>,
249        as_of: Antichain<T>,
250    ) -> Result<Self, Since<T>> {
251        let () = handle.machine.verify_listen(&as_of)?;
252
253        let since = as_of.clone();
254        if !PartialOrder::less_equal(handle.since(), &since) {
255            // We can't guarantee that the as-of will be available by the time we start reading,
256            // since our handle is already downgraded too far.
257            return Err(Since(handle.since().clone()));
258        }
259        // This listen only needs to distinguish things after its frontier
260        // (initially as_of although the frontier is inclusive and the as_of
261        // isn't). Be a good citizen and downgrade early.
262        handle.downgrade_since(&since).await;
263
264        let watch = handle.machine.applier.watch();
265        Ok(Listen {
266            handle,
267            watch,
268            since,
269            frontier: as_of.clone(),
270            as_of,
271        })
272    }
273
274    /// An exclusive upper bound on the progress of this Listen.
275    pub fn frontier(&self) -> &Antichain<T> {
276        &self.frontier
277    }
278
279    /// Attempt to pull out the next values of this subscription.
280    ///
281    /// The returned [`LeasedBatchPart`] is appropriate to use with
282    /// `crate::fetch::fetch_leased_part`.
283    ///
284    /// The returned `Antichain` represents the subscription progress as it will
285    /// be _after_ the returned parts are fetched.
286    pub async fn next(
287        &mut self,
288        // If Some, an override for the default listen sleep retry parameters.
289        retry: Option<RetryParameters>,
290    ) -> (Vec<LeasedBatchPart<T>>, Antichain<T>) {
291        let batch = self
292            .handle
293            .machine
294            .next_listen_batch(
295                &self.frontier,
296                &mut self.watch,
297                Some(&self.handle.reader_id),
298                retry,
299            )
300            .await;
301
302        // A lot of things across mz have to line up to hold the following
303        // invariant and violations only show up as subtle correctness errors,
304        // so explicitly validate it here. Better to panic and roll back a
305        // release than be incorrect (also potentially corrupting a sink).
306        //
307        // Note that the since check is intentionally less_than, not less_equal.
308        // If a batch's since is X, that means we can no longer distinguish X
309        // (beyond self.frontier) from X-1 (not beyond self.frontier) to keep
310        // former and filter out the latter.
311        let acceptable_desc = PartialOrder::less_than(batch.desc.since(), &self.frontier)
312            // Special case when the frontier == the as_of (i.e. the first
313            // time this is called on a new Listen). Because as_of is
314            // _exclusive_, we don't need to be able to distinguish X from
315            // X-1.
316            || (self.frontier == self.as_of
317            && PartialOrder::less_equal(batch.desc.since(), &self.frontier));
318        if !acceptable_desc {
319            let lease_state = self
320                .handle
321                .machine
322                .applier
323                .reader_lease(self.handle.reader_id.clone());
324            if let Some(lease) = lease_state {
325                panic!(
326                    "Listen on {} received a batch {:?} advanced past the listen frontier {:?}, but the lease has not expired: {:?}",
327                    self.handle.machine.shard_id(),
328                    batch.desc,
329                    self.frontier,
330                    lease
331                )
332            } else {
333                // Ideally we'd percolate this error up, so callers could eg. restart a dataflow
334                // instead of restarting a process...
335                halt!(
336                    "Listen on {} received a batch {:?} advanced past the listen frontier {:?} after the reader has expired. \
337                     This can happen in exceptional cases: a machine goes to sleep or is running out of memory or CPU, for example.",
338                    self.handle.machine.shard_id(),
339                    batch.desc,
340                    self.frontier
341                )
342            }
343        }
344
345        let new_frontier = batch.desc.upper().clone();
346
347        // We will have a new frontier, so this is an opportunity to downgrade our
348        // since capability. Go through `maybe_heartbeat` so we can rate limit
349        // this along with our heartbeats.
350        //
351        // HACK! Everything would be simpler if we could downgrade since to the
352        // new frontier, but we can't. The next call needs to be able to
353        // distinguish between the times T at the frontier (to emit updates with
354        // these times) and T-1 (to filter them). Advancing the since to
355        // frontier would erase the ability to distinguish between them. Ideally
356        // we'd use what is conceptually "batch.upper - 1" (the greatest
357        // elements that are still strictly less than batch.upper, which will be
358        // the new value of self.frontier after this call returns), but the
359        // trait bounds on T don't give us a way to compute that directly.
360        // Instead, we sniff out any elements in self.frontier (the upper of the
361        // batch the last time we called this) that are strictly less_than the
362        // batch upper to compute a new since. For totally ordered times
363        // (currently always the case in mz) self.frontier will always have a
364        // single element and it will be less_than upper. We
365        // could also abuse the fact that every time we actually emit is
366        // guaranteed by definition to be less_than upper to be a bit more
367        // prompt, but this would involve a lot more temporary antichains and
368        // it's unclear if that's worth it.
369        for x in self.frontier.elements().iter() {
370            let less_than_upper = batch.desc.upper().elements().iter().any(|u| x.less_than(u));
371            if less_than_upper {
372                self.since.join_assign(&Antichain::from_elem(x.clone()));
373            }
374        }
375
376        // IMPORTANT! Make sure this `lease_batch_parts` stays before the
377        // `maybe_downgrade_since` call. Otherwise, we might give up our
378        // capability on the batch's SeqNo before we lease it, which could lead
379        // to blobs that it references being GC'd.
380        let filter = FetchBatchFilter::Listen {
381            as_of: self.as_of.clone(),
382            lower: self.frontier.clone(),
383        };
384        let parts = self.handle.lease_batch_parts(batch, filter).collect().await;
385
386        self.handle.maybe_downgrade_since(&self.since).await;
387
388        // NB: Keep this after we use self.frontier to join_assign self.since
389        // and also after we construct metadata.
390        self.frontier = new_frontier;
391
392        (parts, self.frontier.clone())
393    }
394}
395
396impl<K, V, T, D> Listen<K, V, T, D>
397where
398    K: Debug + Codec,
399    V: Debug + Codec,
400    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
401    D: Semigroup + Codec64 + Send + Sync,
402{
403    /// Attempt to pull out the next values of this subscription.
404    ///
405    /// The updates received in [ListenEvent::Updates] should be assumed to be in arbitrary order
406    /// and not necessarily consolidated. However, the timestamp of each individual update will be
407    /// greater than or equal to the last received [ListenEvent::Progress] frontier (or this
408    /// [Listen]'s initial `as_of` frontier if no progress event has been emitted yet) and less
409    /// than the next [ListenEvent::Progress] frontier.
410    ///
411    /// If you have a use for consolidated listen output, given that snapshots can't be
412    /// consolidated, come talk to us!
413    #[instrument(level = "debug", name = "listen::next", fields(shard = %self.handle.machine.shard_id()))]
414    pub async fn fetch_next(
415        &mut self,
416    ) -> Vec<ListenEvent<T, ((Result<K, String>, Result<V, String>), T, D)>> {
417        let (parts, progress) = self.next(None).await;
418        let mut ret = Vec::with_capacity(parts.len() + 1);
419        for part in parts {
420            let fetched_part = self.fetch_batch_part(part).await;
421            let updates = fetched_part.collect::<Vec<_>>();
422            if !updates.is_empty() {
423                ret.push(ListenEvent::Updates(updates));
424            }
425        }
426        ret.push(ListenEvent::Progress(progress));
427        ret
428    }
429
430    /// Convert listener into futures::Stream
431    pub fn into_stream(
432        mut self,
433    ) -> impl Stream<Item = ListenEvent<T, ((Result<K, String>, Result<V, String>), T, D)>> {
434        async_stream::stream!({
435            loop {
436                for msg in self.fetch_next().await {
437                    yield msg;
438                }
439            }
440        })
441    }
442
443    /// Test helper to read from the listener until the given frontier is
444    /// reached. Because compaction can arbitrarily combine batches, we only
445    /// return the final progress info.
446    #[cfg(test)]
447    #[track_caller]
448    pub async fn read_until(
449        &mut self,
450        ts: &T,
451    ) -> (
452        Vec<((Result<K, String>, Result<V, String>), T, D)>,
453        Antichain<T>,
454    ) {
455        let mut updates = Vec::new();
456        let mut frontier = Antichain::from_elem(T::minimum());
457        while self.frontier.less_than(ts) {
458            for event in self.fetch_next().await {
459                match event {
460                    ListenEvent::Updates(mut x) => updates.append(&mut x),
461                    ListenEvent::Progress(x) => frontier = x,
462                }
463            }
464        }
465        // Unlike most tests, intentionally don't consolidate updates here
466        // because Listen replays them at the original fidelity.
467        (updates, frontier)
468    }
469}
470
471impl<K, V, T, D> Listen<K, V, T, D>
472where
473    K: Debug + Codec,
474    V: Debug + Codec,
475    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
476    D: Semigroup + Codec64 + Send + Sync,
477{
478    /// Fetches the contents of `part` and returns its lease.
479    ///
480    /// This is broken out into its own function to provide a trivial means for
481    /// [`Subscribe`], which contains a [`Listen`], to fetch batches.
482    async fn fetch_batch_part(&mut self, part: LeasedBatchPart<T>) -> FetchedPart<K, V, T, D> {
483        let fetched_part = fetch_leased_part(
484            &self.handle.cfg,
485            &part,
486            self.handle.blob.as_ref(),
487            Arc::clone(&self.handle.metrics),
488            &self.handle.metrics.read.listen,
489            &self.handle.machine.applier.shard_metrics,
490            &self.handle.reader_id,
491            self.handle.read_schemas.clone(),
492            &mut self.handle.schema_cache,
493        )
494        .await;
495        fetched_part
496    }
497
498    /// Politely expires this listen, releasing its lease.
499    ///
500    /// There is a best-effort impl in Drop for [`ReadHandle`] to expire the
501    /// [`ReadHandle`] held by the listen that wasn't explicitly expired with
502    /// this method. When possible, explicit expiry is still preferred because
503    /// the Drop one is best effort and is dependant on a tokio [Handle] being
504    /// available in the TLC at the time of drop (which is a bit subtle). Also,
505    /// explicit expiry allows for control over when it happens.
506    pub async fn expire(self) {
507        self.handle.expire().await
508    }
509}
510
511/// A "capability" granting the ability to read the state of some shard at times
512/// greater or equal to `self.since()`.
513///
514/// Production users should call [Self::expire] before dropping a ReadHandle so
515/// that it can expire its leases. If/when rust gets AsyncDrop, this will be
516/// done automatically.
517///
518/// All async methods on ReadHandle retry for as long as they are able, but the
519/// returned [std::future::Future]s implement "cancel on drop" semantics. This
520/// means that callers can add a timeout using [tokio::time::timeout] or
521/// [tokio::time::timeout_at].
522///
523/// ```rust,no_run
524/// # let mut read: mz_persist_client::read::ReadHandle<String, String, u64, i64> = unimplemented!();
525/// # let timeout: std::time::Duration = unimplemented!();
526/// # let new_since: timely::progress::Antichain<u64> = unimplemented!();
527/// # async {
528/// tokio::time::timeout(timeout, read.downgrade_since(&new_since)).await
529/// # };
530/// ```
531#[derive(Debug)]
532pub struct ReadHandle<K: Codec, V: Codec, T, D> {
533    pub(crate) cfg: PersistConfig,
534    pub(crate) metrics: Arc<Metrics>,
535    pub(crate) machine: Machine<K, V, T, D>,
536    pub(crate) gc: GarbageCollector<K, V, T, D>,
537    pub(crate) blob: Arc<dyn Blob>,
538    pub(crate) reader_id: LeasedReaderId,
539    pub(crate) read_schemas: Schemas<K, V>,
540    pub(crate) schema_cache: SchemaCache<K, V, T, D>,
541
542    since: Antichain<T>,
543    pub(crate) last_heartbeat: EpochMillis,
544    pub(crate) leased_seqnos: BTreeMap<SeqNo, Lease>,
545    pub(crate) unexpired_state: Option<UnexpiredReadHandleState>,
546}
547
548/// Length of time after a reader's last operation after which the reader may be
549/// expired.
550pub(crate) const READER_LEASE_DURATION: Config<Duration> = Config::new(
551    "persist_reader_lease_duration",
552    Duration::from_secs(60 * 15),
553    "The time after which we'll clean up stale read leases",
554);
555
556impl<K, V, T, D> ReadHandle<K, V, T, D>
557where
558    K: Debug + Codec,
559    V: Debug + Codec,
560    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
561    D: Semigroup + Codec64 + Send + Sync,
562{
563    pub(crate) async fn new(
564        cfg: PersistConfig,
565        metrics: Arc<Metrics>,
566        machine: Machine<K, V, T, D>,
567        gc: GarbageCollector<K, V, T, D>,
568        blob: Arc<dyn Blob>,
569        reader_id: LeasedReaderId,
570        read_schemas: Schemas<K, V>,
571        since: Antichain<T>,
572        last_heartbeat: EpochMillis,
573    ) -> Self {
574        let schema_cache = machine.applier.schema_cache();
575        let expire_fn = Self::expire_fn(machine.clone(), gc.clone(), reader_id.clone());
576        ReadHandle {
577            cfg,
578            metrics: Arc::clone(&metrics),
579            machine: machine.clone(),
580            gc: gc.clone(),
581            blob,
582            reader_id: reader_id.clone(),
583            read_schemas,
584            schema_cache,
585            since,
586            last_heartbeat,
587            leased_seqnos: BTreeMap::new(),
588            unexpired_state: Some(UnexpiredReadHandleState {
589                expire_fn,
590                _heartbeat_tasks: machine
591                    .start_reader_heartbeat_tasks(reader_id, gc)
592                    .await
593                    .into_iter()
594                    .map(JoinHandle::abort_on_drop)
595                    .collect(),
596            }),
597        }
598    }
599
600    /// This handle's shard id.
601    pub fn shard_id(&self) -> ShardId {
602        self.machine.shard_id()
603    }
604
605    /// This handle's `since` frontier.
606    ///
607    /// This will always be greater or equal to the shard-global `since`.
608    pub fn since(&self) -> &Antichain<T> {
609        &self.since
610    }
611
612    fn outstanding_seqno(&mut self) -> Option<SeqNo> {
613        while let Some(first) = self.leased_seqnos.first_entry() {
614            if first.get().count() <= 1 {
615                first.remove();
616            } else {
617                return Some(*first.key());
618            }
619        }
620        None
621    }
622
623    /// Forwards the since frontier of this handle, giving up the ability to
624    /// read at times not greater or equal to `new_since`.
625    ///
626    /// This may trigger (asynchronous) compaction and consolidation in the
627    /// system. A `new_since` of the empty antichain "finishes" this shard,
628    /// promising that no more data will ever be read by this handle.
629    ///
630    /// This also acts as a heartbeat for the reader lease (including if called
631    /// with `new_since` equal to something like `self.since()` or the minimum
632    /// timestamp, making the call a no-op).
633    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
634    pub async fn downgrade_since(&mut self, new_since: &Antichain<T>) {
635        // Guaranteed to be the smallest/oldest outstanding lease on a `SeqNo`.
636        let outstanding_seqno = self.outstanding_seqno();
637
638        let heartbeat_ts = (self.cfg.now)();
639        let (_seqno, current_reader_since, maintenance) = self
640            .machine
641            .downgrade_since(&self.reader_id, outstanding_seqno, new_since, heartbeat_ts)
642            .await;
643
644        // Debugging for database-issues#4590.
645        if let Some(outstanding_seqno) = outstanding_seqno {
646            let seqnos_held = _seqno.0.saturating_sub(outstanding_seqno.0);
647            // We get just over 1 seqno-per-second on average for a shard in
648            // prod, so this is about an hour.
649            const SEQNOS_HELD_THRESHOLD: u64 = 60 * 60;
650            if seqnos_held >= SEQNOS_HELD_THRESHOLD {
651                tracing::info!(
652                    "{} reader {} holding an unexpected number of seqnos {} vs {}: {:?}. bt: {:?}",
653                    self.machine.shard_id(),
654                    self.reader_id,
655                    outstanding_seqno,
656                    _seqno,
657                    self.leased_seqnos.keys().take(10).collect::<Vec<_>>(),
658                    // The Debug impl of backtrace is less aesthetic, but will put the trace
659                    // on a single line and play more nicely with our Honeycomb quota
660                    Backtrace::force_capture(),
661                );
662            }
663        }
664
665        self.since = current_reader_since.0;
666        // A heartbeat is just any downgrade_since traffic, so update the
667        // internal rate limiter here to play nicely with `maybe_heartbeat`.
668        self.last_heartbeat = heartbeat_ts;
669        maintenance.start_performing(&self.machine, &self.gc);
670    }
671
672    /// Returns an ongoing subscription of updates to a shard.
673    ///
674    /// The stream includes all data at times greater than `as_of`. Combined
675    /// with [Self::snapshot] it will produce exactly correct results: the
676    /// snapshot is the TVCs contents at `as_of` and all subsequent updates
677    /// occur at exactly their indicated time. The recipient should only
678    /// downgrade their read capability when they are certain they have all data
679    /// through the frontier they would downgrade to.
680    ///
681    /// This takes ownership of the ReadHandle so the Listen can use it to
682    /// [Self::downgrade_since] as it progresses. If you need to keep this
683    /// handle, then [Self::clone] it before calling listen.
684    ///
685    /// The `Since` error indicates that the requested `as_of` cannot be served
686    /// (the caller has out of date information) and includes the smallest
687    /// `as_of` that would have been accepted.
688    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
689    pub async fn listen(self, as_of: Antichain<T>) -> Result<Listen<K, V, T, D>, Since<T>> {
690        Listen::new(self, as_of).await
691    }
692
693    /// Returns all of the contents of the shard TVC at `as_of` broken up into
694    /// [`LeasedBatchPart`]es. These parts can be "turned in" via
695    /// `crate::fetch::fetch_batch_part` to receive the data they contain.
696    ///
697    /// This command returns the contents of this shard as of `as_of` once they
698    /// are known. This may "block" (in an async-friendly way) if `as_of` is
699    /// greater or equal to the current `upper` of the shard. The recipient
700    /// should only downgrade their read capability when they are certain they
701    /// have all data through the frontier they would downgrade to.
702    ///
703    /// The `Since` error indicates that the requested `as_of` cannot be served
704    /// (the caller has out of date information) and includes the smallest
705    /// `as_of` that would have been accepted.
706    #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
707    pub async fn snapshot(
708        &mut self,
709        as_of: Antichain<T>,
710    ) -> Result<Vec<LeasedBatchPart<T>>, Since<T>> {
711        let batches = self.machine.snapshot(&as_of).await?;
712
713        if !PartialOrder::less_equal(self.since(), &as_of) {
714            return Err(Since(self.since().clone()));
715        }
716
717        let filter = FetchBatchFilter::Snapshot { as_of };
718        let mut leased_parts = Vec::new();
719        for batch in batches {
720            // Flatten the HollowBatch into one LeasedBatchPart per key. Each key
721            // corresponds to a "part" or s3 object. This allows persist_source
722            // to distribute work by parts (smallish, more even size) instead of
723            // batches (arbitrarily large).
724            leased_parts.extend(
725                self.lease_batch_parts(batch, filter.clone())
726                    .collect::<Vec<_>>()
727                    .await,
728            );
729        }
730        Ok(leased_parts)
731    }
732
733    /// Returns a snapshot of all of a shard's data using `as_of`, followed by
734    /// listening to any future updates.
735    ///
736    /// For more details on this operation's semantics, see [Self::snapshot] and
737    /// [Self::listen].
738    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
739    pub async fn subscribe(
740        mut self,
741        as_of: Antichain<T>,
742    ) -> Result<Subscribe<K, V, T, D>, Since<T>> {
743        let snapshot_parts = self.snapshot(as_of.clone()).await?;
744        let listen = self.listen(as_of.clone()).await?;
745        Ok(Subscribe::new(snapshot_parts, listen))
746    }
747
748    fn lease_batch_part(
749        &mut self,
750        desc: Description<T>,
751        part: BatchPart<T>,
752        filter: FetchBatchFilter<T>,
753    ) -> LeasedBatchPart<T> {
754        LeasedBatchPart {
755            metrics: Arc::clone(&self.metrics),
756            shard_id: self.machine.shard_id(),
757            filter,
758            desc,
759            part,
760            lease: self.lease_seqno(),
761            filter_pushdown_audit: false,
762        }
763    }
764
765    fn lease_batch_parts(
766        &mut self,
767        batch: HollowBatch<T>,
768        filter: FetchBatchFilter<T>,
769    ) -> impl Stream<Item = LeasedBatchPart<T>> + '_ {
770        stream! {
771            let blob = Arc::clone(&self.blob);
772            let metrics = Arc::clone(&self.metrics);
773            let desc = batch.desc.clone();
774            for await part in batch.part_stream(self.shard_id(), &*blob, &*metrics) {
775                yield self.lease_batch_part(desc.clone(), part.expect("leased part").into_owned(), filter.clone())
776            }
777        }
778    }
779
780    /// Tracks that the `ReadHandle`'s machine's current `SeqNo` is being
781    /// "leased out" to a `LeasedBatchPart`, and cannot be garbage
782    /// collected until its lease has been returned.
783    fn lease_seqno(&mut self) -> Lease {
784        let seqno = self.machine.seqno();
785        let lease = self
786            .leased_seqnos
787            .entry(seqno)
788            .or_insert_with(|| Lease::new(seqno));
789        lease.clone()
790    }
791
792    /// Returns an independent [ReadHandle] with a new [LeasedReaderId] but the
793    /// same `since`.
794    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
795    pub async fn clone(&self, purpose: &str) -> Self {
796        let new_reader_id = LeasedReaderId::new();
797        let machine = self.machine.clone();
798        let gc = self.gc.clone();
799        let heartbeat_ts = (self.cfg.now)();
800        let (reader_state, maintenance) = machine
801            .register_leased_reader(
802                &new_reader_id,
803                purpose,
804                READER_LEASE_DURATION.get(&self.cfg),
805                heartbeat_ts,
806                false,
807            )
808            .await;
809        maintenance.start_performing(&machine, &gc);
810        // The point of clone is that you're guaranteed to have the same (or
811        // greater) since capability, verify that.
812        // TODO: better if it's the same since capability exactly.
813        assert!(PartialOrder::less_equal(&reader_state.since, &self.since));
814        let new_reader = ReadHandle::new(
815            self.cfg.clone(),
816            Arc::clone(&self.metrics),
817            machine,
818            gc,
819            Arc::clone(&self.blob),
820            new_reader_id,
821            self.read_schemas.clone(),
822            reader_state.since,
823            heartbeat_ts,
824        )
825        .await;
826        new_reader
827    }
828
829    /// A rate-limited version of [Self::downgrade_since].
830    ///
831    /// This is an internally rate limited helper, designed to allow users to
832    /// call it as frequently as they like. Call this or [Self::downgrade_since],
833    /// on some interval that is "frequent" compared to the read lease duration.
834    pub async fn maybe_downgrade_since(&mut self, new_since: &Antichain<T>) {
835        let min_elapsed = READER_LEASE_DURATION.get(&self.cfg) / 4;
836        let elapsed_since_last_heartbeat =
837            Duration::from_millis((self.cfg.now)().saturating_sub(self.last_heartbeat));
838        if elapsed_since_last_heartbeat >= min_elapsed {
839            self.downgrade_since(new_since).await;
840        }
841    }
842
843    /// Politely expires this reader, releasing its lease.
844    ///
845    /// There is a best-effort impl in Drop to expire a reader that wasn't
846    /// explictly expired with this method. When possible, explicit expiry is
847    /// still preferred because the Drop one is best effort and is dependant on
848    /// a tokio [Handle] being available in the TLC at the time of drop (which
849    /// is a bit subtle). Also, explicit expiry allows for control over when it
850    /// happens.
851    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
852    pub async fn expire(mut self) {
853        // We drop the unexpired state before expiring the reader to ensure the
854        // heartbeat tasks can never observe the expired state. This doesn't
855        // matter for correctness, but avoids confusing log output if the
856        // heartbeat task were to discover that its lease has been expired.
857        let Some(unexpired_state) = self.unexpired_state.take() else {
858            return;
859        };
860        unexpired_state.expire_fn.0().await;
861    }
862
863    fn expire_fn(
864        machine: Machine<K, V, T, D>,
865        gc: GarbageCollector<K, V, T, D>,
866        reader_id: LeasedReaderId,
867    ) -> ExpireFn {
868        ExpireFn(Box::new(move || {
869            Box::pin(async move {
870                let (_, maintenance) = machine.expire_leased_reader(&reader_id).await;
871                maintenance.start_performing(&machine, &gc);
872            })
873        }))
874    }
875
876    /// Test helper for a [Self::listen] call that is expected to succeed.
877    #[cfg(test)]
878    #[track_caller]
879    pub async fn expect_listen(self, as_of: T) -> Listen<K, V, T, D> {
880        self.listen(Antichain::from_elem(as_of))
881            .await
882            .expect("cannot serve requested as_of")
883    }
884}
885
886/// State for a read handle that has not been explicitly expired.
887#[derive(Debug)]
888pub(crate) struct UnexpiredReadHandleState {
889    expire_fn: ExpireFn,
890    pub(crate) _heartbeat_tasks: Vec<AbortOnDropHandle<()>>,
891}
892
893/// An incremental cursor through a particular shard, returned from [ReadHandle::snapshot_cursor].
894///
895/// To read an entire dataset, the
896/// client should call `next` until it returns `None`, which signals all data has been returned...
897/// but it's also free to abandon the instance at any time if it eg. only needs a few entries.
898#[derive(Debug)]
899pub struct Cursor<K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64, L = Lease> {
900    consolidator: Consolidator<T, D, StructuredSort<K, V, T, D>>,
901    max_len: usize,
902    max_bytes: usize,
903    _lease: L,
904    read_schemas: Schemas<K, V>,
905}
906
907impl<K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64, L> Cursor<K, V, T, D, L> {
908    /// Extracts and returns the lease from the cursor. Allowing the caller to
909    /// do any necessary cleanup associated with the lease.
910    pub fn into_lease(self: Self) -> L {
911        self._lease
912    }
913}
914
915impl<K, V, T, D, L> Cursor<K, V, T, D, L>
916where
917    K: Debug + Codec + Ord,
918    V: Debug + Codec + Ord,
919    T: Timestamp + Lattice + Codec64 + Sync,
920    D: Semigroup + Ord + Codec64 + Send + Sync,
921{
922    /// Grab the next batch of consolidated data.
923    pub async fn next(
924        &mut self,
925    ) -> Option<impl Iterator<Item = ((Result<K, String>, Result<V, String>), T, D)> + '_> {
926        let Self {
927            consolidator,
928            max_len,
929            max_bytes,
930            _lease,
931            read_schemas: _,
932        } = self;
933
934        let part = consolidator
935            .next_chunk(*max_len, *max_bytes)
936            .await
937            .expect("fetching a leased part")?;
938        let key_decoder = self
939            .read_schemas
940            .key
941            .decoder_any(part.key.as_ref())
942            .expect("ok");
943        let val_decoder = self
944            .read_schemas
945            .val
946            .decoder_any(part.val.as_ref())
947            .expect("ok");
948        let iter = (0..part.len()).map(move |i| {
949            let mut k = K::default();
950            let mut v = V::default();
951            key_decoder.decode(i, &mut k);
952            val_decoder.decode(i, &mut v);
953            let t = T::decode(part.time.value(i).to_le_bytes());
954            let d = D::decode(part.diff.value(i).to_le_bytes());
955            ((Ok(k), Ok(v)), t, d)
956        });
957
958        Some(iter)
959    }
960}
961
962impl<K, V, T, D> ReadHandle<K, V, T, D>
963where
964    K: Debug + Codec + Ord,
965    V: Debug + Codec + Ord,
966    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
967    D: Semigroup + Ord + Codec64 + Send + Sync,
968{
969    /// Generates a [Self::snapshot], and fetches all of the batches it
970    /// contains.
971    ///
972    /// The output is consolidated. Furthermore, to keep memory usage down when
973    /// reading a snapshot that consolidates well, this consolidates as it goes.
974    ///
975    /// Potential future improvements (if necessary):
976    /// - Accept something like a `F: Fn(K,V) -> (K,V)` argument, which looks
977    ///   like an MFP you might be pushing down. Reason being that if you are
978    ///   projecting or transforming in a way that allows further consolidation,
979    ///   amazing.
980    /// - Reuse any code we write to streaming-merge consolidate in
981    ///   persist_source here.
982    pub async fn snapshot_and_fetch(
983        &mut self,
984        as_of: Antichain<T>,
985    ) -> Result<Vec<((Result<K, String>, Result<V, String>), T, D)>, Since<T>> {
986        let mut cursor = self.snapshot_cursor(as_of, |_| true).await?;
987        let mut contents = Vec::new();
988        while let Some(iter) = cursor.next().await {
989            contents.extend(iter);
990        }
991
992        // We don't currently guarantee that encoding is one-to-one, so we still need to
993        // consolidate the decoded outputs. However, let's report if this isn't a noop.
994        let old_len = contents.len();
995        consolidate_updates(&mut contents);
996        if old_len != contents.len() {
997            // TODO(bkirwi): do we need more / finer-grained metrics for this?
998            self.machine
999                .applier
1000                .shard_metrics
1001                .unconsolidated_snapshot
1002                .inc();
1003        }
1004
1005        Ok(contents)
1006    }
1007
1008    /// Generates a [Self::snapshot], and fetches all of the batches it
1009    /// contains.
1010    ///
1011    /// To keep memory usage down when reading a snapshot that consolidates well, this consolidates
1012    /// as it goes. However, note that only the serialized data is consolidated: the deserialized
1013    /// data will only be consolidated if your K/V codecs are one-to-one.
1014    pub async fn snapshot_cursor(
1015        &mut self,
1016        as_of: Antichain<T>,
1017        should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
1018    ) -> Result<Cursor<K, V, T, D>, Since<T>> {
1019        let batches = self.machine.snapshot(&as_of).await?;
1020        let lease = self.lease_seqno();
1021
1022        Self::read_batches_consolidated(
1023            &self.cfg,
1024            Arc::clone(&self.metrics),
1025            Arc::clone(&self.machine.applier.shard_metrics),
1026            self.metrics.read.snapshot.clone(),
1027            Arc::clone(&self.blob),
1028            self.shard_id(),
1029            as_of,
1030            self.read_schemas.clone(),
1031            &batches,
1032            lease,
1033            should_fetch_part,
1034            COMPACTION_MEMORY_BOUND_BYTES.get(&self.cfg),
1035        )
1036    }
1037
1038    pub(crate) fn read_batches_consolidated<L>(
1039        persist_cfg: &PersistConfig,
1040        metrics: Arc<Metrics>,
1041        shard_metrics: Arc<ShardMetrics>,
1042        read_metrics: ReadMetrics,
1043        blob: Arc<dyn Blob>,
1044        shard_id: ShardId,
1045        as_of: Antichain<T>,
1046        schemas: Schemas<K, V>,
1047        batches: &[HollowBatch<T>],
1048        lease: L,
1049        should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
1050        memory_budget_bytes: usize,
1051    ) -> Result<Cursor<K, V, T, D, L>, Since<T>> {
1052        let context = format!("{}[as_of={:?}]", shard_id, as_of.elements());
1053        let filter = FetchBatchFilter::Snapshot {
1054            as_of: as_of.clone(),
1055        };
1056
1057        let mut consolidator = Consolidator::new(
1058            context,
1059            FetchConfig::from_persist_config(persist_cfg),
1060            shard_id,
1061            StructuredSort::new(schemas.clone()),
1062            blob,
1063            metrics,
1064            shard_metrics,
1065            read_metrics,
1066            filter,
1067            None,
1068            memory_budget_bytes,
1069        );
1070        for batch in batches {
1071            for (meta, run) in batch.runs() {
1072                consolidator.enqueue_run(
1073                    &batch.desc,
1074                    meta,
1075                    run.into_iter()
1076                        .filter(|p| should_fetch_part(p.stats()))
1077                        .cloned(),
1078                );
1079            }
1080        }
1081        // This default may end up consolidating more records than previously
1082        // for cases like fast-path peeks, where only the first few entries are used.
1083        // If this is a noticeable performance impact, thread the max-len in from the caller.
1084        let max_len = persist_cfg.compaction_yield_after_n_updates;
1085        let max_bytes = BLOB_TARGET_SIZE.get(persist_cfg).max(1);
1086
1087        Ok(Cursor {
1088            consolidator,
1089            max_len,
1090            max_bytes,
1091            _lease: lease,
1092            read_schemas: schemas,
1093        })
1094    }
1095
1096    /// Returns aggregate statistics about the contents of the shard TVC at the
1097    /// given frontier.
1098    ///
1099    /// This command returns the contents of this shard as of `as_of` once they
1100    /// are known. This may "block" (in an async-friendly way) if `as_of` is
1101    /// greater or equal to the current `upper` of the shard. If `None` is given
1102    /// for `as_of`, then the latest stats known by this process are used.
1103    ///
1104    /// The `Since` error indicates that the requested `as_of` cannot be served
1105    /// (the caller has out of date information) and includes the smallest
1106    /// `as_of` that would have been accepted.
1107    pub fn snapshot_stats(
1108        &self,
1109        as_of: Option<Antichain<T>>,
1110    ) -> impl Future<Output = Result<SnapshotStats, Since<T>>> + Send + 'static {
1111        let machine = self.machine.clone();
1112        async move {
1113            let batches = match as_of {
1114                Some(as_of) => machine.snapshot(&as_of).await?,
1115                None => machine.applier.all_batches(),
1116            };
1117            let num_updates = batches.iter().map(|b| b.len).sum();
1118            Ok(SnapshotStats {
1119                shard_id: machine.shard_id(),
1120                num_updates,
1121            })
1122        }
1123    }
1124
1125    /// Returns aggregate statistics about the contents of the shard TVC at the
1126    /// given frontier.
1127    ///
1128    /// This command returns the contents of this shard as of `as_of` once they
1129    /// are known. This may "block" (in an async-friendly way) if `as_of` is
1130    /// greater or equal to the current `upper` of the shard.
1131    ///
1132    /// The `Since` error indicates that the requested `as_of` cannot be served
1133    /// (the caller has out of date information) and includes the smallest
1134    /// `as_of` that would have been accepted.
1135    pub async fn snapshot_parts_stats(
1136        &self,
1137        as_of: Antichain<T>,
1138    ) -> Result<SnapshotPartsStats, Since<T>> {
1139        let batches = self.machine.snapshot(&as_of).await?;
1140        let parts = stream::iter(&batches)
1141            .flat_map(|b| b.part_stream(self.shard_id(), &*self.blob, &*self.metrics))
1142            .map(|p| {
1143                let p = p.expect("live batch");
1144                SnapshotPartStats {
1145                    encoded_size_bytes: p.encoded_size_bytes(),
1146                    stats: p.stats().cloned(),
1147                }
1148            })
1149            .collect()
1150            .await;
1151        Ok(SnapshotPartsStats {
1152            metrics: Arc::clone(&self.machine.applier.metrics),
1153            shard_id: self.machine.shard_id(),
1154            parts,
1155        })
1156    }
1157}
1158
1159impl<K, V, T, D> ReadHandle<K, V, T, D>
1160where
1161    K: Debug + Codec + Ord,
1162    V: Debug + Codec + Ord,
1163    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
1164    D: Semigroup + Codec64 + Send + Sync,
1165{
1166    /// Generates a [Self::snapshot], and streams out all of the updates
1167    /// it contains in bounded memory.
1168    ///
1169    /// The output is not consolidated.
1170    pub async fn snapshot_and_stream(
1171        &mut self,
1172        as_of: Antichain<T>,
1173    ) -> Result<
1174        impl Stream<Item = ((Result<K, String>, Result<V, String>), T, D)> + use<K, V, T, D>,
1175        Since<T>,
1176    > {
1177        let snap = self.snapshot(as_of).await?;
1178
1179        let blob = Arc::clone(&self.blob);
1180        let metrics = Arc::clone(&self.metrics);
1181        let snapshot_metrics = self.metrics.read.snapshot.clone();
1182        let shard_metrics = Arc::clone(&self.machine.applier.shard_metrics);
1183        let reader_id = self.reader_id.clone();
1184        let schemas = self.read_schemas.clone();
1185        let mut schema_cache = self.schema_cache.clone();
1186        let persist_cfg = self.cfg.clone();
1187        let stream = async_stream::stream! {
1188            for part in snap {
1189                let mut fetched_part = fetch_leased_part(
1190                    &persist_cfg,
1191                    &part,
1192                    blob.as_ref(),
1193                    Arc::clone(&metrics),
1194                    &snapshot_metrics,
1195                    &shard_metrics,
1196                    &reader_id,
1197                    schemas.clone(),
1198                    &mut schema_cache,
1199                )
1200                .await;
1201
1202                while let Some(next) = fetched_part.next() {
1203                    yield next;
1204                }
1205            }
1206        };
1207
1208        Ok(stream)
1209    }
1210}
1211
1212impl<K, V, T, D> ReadHandle<K, V, T, D>
1213where
1214    K: Debug + Codec + Ord,
1215    V: Debug + Codec + Ord,
1216    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
1217    D: Semigroup + Ord + Codec64 + Send + Sync,
1218{
1219    /// Test helper to generate a [Self::snapshot] call that is expected to
1220    /// succeed, process its batches, and then return its data sorted.
1221    #[cfg(test)]
1222    #[track_caller]
1223    pub async fn expect_snapshot_and_fetch(
1224        &mut self,
1225        as_of: T,
1226    ) -> Vec<((Result<K, String>, Result<V, String>), T, D)> {
1227        let mut ret = self
1228            .snapshot_and_fetch(Antichain::from_elem(as_of))
1229            .await
1230            .expect("cannot serve requested as_of");
1231
1232        ret.sort();
1233        ret
1234    }
1235}
1236
1237impl<K: Codec, V: Codec, T, D> Drop for ReadHandle<K, V, T, D> {
1238    fn drop(&mut self) {
1239        // We drop the unexpired state before expiring the reader to ensure the
1240        // heartbeat tasks can never observe the expired state. This doesn't
1241        // matter for correctness, but avoids confusing log output if the
1242        // heartbeat task were to discover that its lease has been expired.
1243        let Some(unexpired_state) = self.unexpired_state.take() else {
1244            return;
1245        };
1246
1247        let handle = match Handle::try_current() {
1248            Ok(x) => x,
1249            Err(_) => {
1250                warn!(
1251                    "ReadHandle {} dropped without being explicitly expired, falling back to lease timeout",
1252                    self.reader_id
1253                );
1254                return;
1255            }
1256        };
1257        // Spawn a best-effort task to expire this read handle. It's fine if
1258        // this doesn't run to completion, we'd just have to wait out the lease
1259        // before the shard-global since is unblocked.
1260        //
1261        // Intentionally create the span outside the task to set the parent.
1262        let expire_span = debug_span!("drop::expire");
1263        handle.spawn_named(
1264            || format!("ReadHandle::expire ({})", self.reader_id),
1265            unexpired_state.expire_fn.0().instrument(expire_span),
1266        );
1267    }
1268}
1269
1270#[cfg(test)]
1271mod tests {
1272    use std::pin;
1273    use std::str::FromStr;
1274
1275    use mz_dyncfg::ConfigUpdates;
1276    use mz_ore::cast::CastFrom;
1277    use mz_ore::metrics::MetricsRegistry;
1278    use mz_persist::mem::{MemBlob, MemBlobConfig, MemConsensus};
1279    use mz_persist::unreliable::{UnreliableConsensus, UnreliableHandle};
1280    use serde::{Deserialize, Serialize};
1281    use serde_json::json;
1282    use tokio_stream::StreamExt;
1283
1284    use crate::async_runtime::IsolatedRuntime;
1285    use crate::batch::BLOB_TARGET_SIZE;
1286    use crate::cache::StateCache;
1287    use crate::internal::metrics::Metrics;
1288    use crate::rpc::NoopPubSubSender;
1289    use crate::tests::{all_ok, new_test_client};
1290    use crate::{Diagnostics, PersistClient, PersistConfig, ShardId};
1291
1292    use super::*;
1293
1294    // Verifies `Subscribe` can be dropped while holding snapshot batches.
1295    #[mz_persist_proc::test(tokio::test)]
1296    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1297    async fn drop_unused_subscribe(dyncfgs: ConfigUpdates) {
1298        let data = [
1299            (("0".to_owned(), "zero".to_owned()), 0, 1),
1300            (("1".to_owned(), "one".to_owned()), 1, 1),
1301            (("2".to_owned(), "two".to_owned()), 2, 1),
1302        ];
1303
1304        let (mut write, read) = new_test_client(&dyncfgs)
1305            .await
1306            .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1307            .await;
1308
1309        write.expect_compare_and_append(&data[0..1], 0, 1).await;
1310        write.expect_compare_and_append(&data[1..2], 1, 2).await;
1311        write.expect_compare_and_append(&data[2..3], 2, 3).await;
1312
1313        let subscribe = read
1314            .subscribe(timely::progress::Antichain::from_elem(2))
1315            .await
1316            .unwrap();
1317        assert!(
1318            !subscribe.snapshot.as_ref().unwrap().is_empty(),
1319            "snapshot must have batches for test to be meaningful"
1320        );
1321        drop(subscribe);
1322    }
1323
1324    // Verifies that we streaming-consolidate away identical key-values in the same batch.
1325    #[mz_persist_proc::test(tokio::test)]
1326    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1327    async fn streaming_consolidate(dyncfgs: ConfigUpdates) {
1328        let data = &[
1329            // Identical records should sum together...
1330            (("k".to_owned(), "v".to_owned()), 0, 1),
1331            (("k".to_owned(), "v".to_owned()), 1, 1),
1332            (("k".to_owned(), "v".to_owned()), 2, 1),
1333            // ...and when they cancel out entirely they should be omitted.
1334            (("k2".to_owned(), "v".to_owned()), 0, 1),
1335            (("k2".to_owned(), "v".to_owned()), 1, -1),
1336        ];
1337
1338        let (mut write, read) = {
1339            let client = new_test_client(&dyncfgs).await;
1340            client.cfg.set_config(&BLOB_TARGET_SIZE, 1000); // So our batch stays together!
1341            client
1342                .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1343                .await
1344        };
1345
1346        write.expect_compare_and_append(data, 0, 5).await;
1347
1348        let mut snapshot = read
1349            .subscribe(timely::progress::Antichain::from_elem(4))
1350            .await
1351            .unwrap();
1352
1353        let mut updates = vec![];
1354        'outer: loop {
1355            for event in snapshot.fetch_next().await {
1356                match event {
1357                    ListenEvent::Progress(t) => {
1358                        if !t.less_than(&4) {
1359                            break 'outer;
1360                        }
1361                    }
1362                    ListenEvent::Updates(data) => {
1363                        updates.extend(data);
1364                    }
1365                }
1366            }
1367        }
1368        assert_eq!(
1369            updates,
1370            &[((Ok("k".to_owned()), Ok("v".to_owned())), 4u64, 3i64)],
1371        )
1372    }
1373
1374    #[mz_persist_proc::test(tokio::test)]
1375    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1376    async fn snapshot_and_stream(dyncfgs: ConfigUpdates) {
1377        let data = &mut [
1378            (("k1".to_owned(), "v1".to_owned()), 0, 1),
1379            (("k2".to_owned(), "v2".to_owned()), 1, 1),
1380            (("k3".to_owned(), "v3".to_owned()), 2, 1),
1381            (("k4".to_owned(), "v4".to_owned()), 2, 1),
1382            (("k5".to_owned(), "v5".to_owned()), 3, 1),
1383        ];
1384
1385        let (mut write, mut read) = {
1386            let client = new_test_client(&dyncfgs).await;
1387            client.cfg.set_config(&BLOB_TARGET_SIZE, 0); // split batches across multiple parts
1388            client
1389                .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1390                .await
1391        };
1392
1393        write.expect_compare_and_append(&data[0..2], 0, 2).await;
1394        write.expect_compare_and_append(&data[2..4], 2, 3).await;
1395        write.expect_compare_and_append(&data[4..], 3, 4).await;
1396
1397        let as_of = Antichain::from_elem(3);
1398        let mut snapshot = pin::pin!(read.snapshot_and_stream(as_of.clone()).await.unwrap());
1399
1400        let mut snapshot_rows = vec![];
1401        while let Some(((k, v), t, d)) = snapshot.next().await {
1402            snapshot_rows.push(((k.expect("valid key"), v.expect("valid key")), t, d));
1403        }
1404
1405        for ((_k, _v), t, _d) in data.as_mut_slice() {
1406            t.advance_by(as_of.borrow());
1407        }
1408
1409        assert_eq!(data.as_slice(), snapshot_rows.as_slice());
1410    }
1411
1412    // Verifies the semantics of `SeqNo` leases + checks dropping `LeasedBatchPart` semantics.
1413    #[mz_persist_proc::test(tokio::test)]
1414    #[cfg_attr(miri, ignore)] // https://github.com/MaterializeInc/database-issues/issues/5964
1415    async fn seqno_leases(dyncfgs: ConfigUpdates) {
1416        let mut data = vec![];
1417        for i in 0..20 {
1418            data.push(((i.to_string(), i.to_string()), i, 1))
1419        }
1420
1421        let shard_id = ShardId::new();
1422
1423        let client = new_test_client(&dyncfgs).await;
1424        let (mut write, read) = client
1425            .expect_open::<String, String, u64, i64>(shard_id)
1426            .await;
1427
1428        // Seed with some values
1429        let mut offset = 0;
1430        let mut width = 2;
1431
1432        for i in offset..offset + width {
1433            write
1434                .expect_compare_and_append(
1435                    &data[i..i + 1],
1436                    u64::cast_from(i),
1437                    u64::cast_from(i) + 1,
1438                )
1439                .await;
1440        }
1441        offset += width;
1442
1443        // Create machinery for subscribe + fetch
1444        let mut fetcher = client
1445            .create_batch_fetcher::<String, String, u64, i64>(
1446                shard_id,
1447                Default::default(),
1448                Default::default(),
1449                false,
1450                Diagnostics::for_tests(),
1451            )
1452            .await
1453            .unwrap();
1454
1455        let mut subscribe = read
1456            .subscribe(timely::progress::Antichain::from_elem(1))
1457            .await
1458            .expect("cannot serve requested as_of");
1459
1460        // Determine sequence number at outset.
1461        let original_seqno_since = subscribe.listen.handle.machine.applier.seqno_since();
1462
1463        let mut parts = vec![];
1464
1465        width = 4;
1466        // Collect parts while continuing to write values
1467        for i in offset..offset + width {
1468            for event in subscribe.next(None).await {
1469                if let ListenEvent::Updates(mut new_parts) = event {
1470                    parts.append(&mut new_parts);
1471                    // Here and elsewhere we "cheat" and immediately downgrade the since
1472                    // to demonstrate the effects of SeqNo leases immediately.
1473                    subscribe
1474                        .listen
1475                        .handle
1476                        .downgrade_since(&subscribe.listen.since)
1477                        .await;
1478                }
1479            }
1480
1481            write
1482                .expect_compare_and_append(
1483                    &data[i..i + 1],
1484                    u64::cast_from(i),
1485                    u64::cast_from(i) + 1,
1486                )
1487                .await;
1488
1489            // SeqNo is not downgraded
1490            assert_eq!(
1491                subscribe.listen.handle.machine.applier.seqno_since(),
1492                original_seqno_since
1493            );
1494        }
1495
1496        offset += width;
1497
1498        let mut seqno_since = subscribe.listen.handle.machine.applier.seqno_since();
1499
1500        // We're starting out with the original, non-downgraded SeqNo
1501        assert_eq!(seqno_since, original_seqno_since);
1502
1503        // We have to handle the parts we generate during the next loop to
1504        // ensure they don't panic.
1505        let mut subsequent_parts = vec![];
1506
1507        // Ensure monotonicity of seqnos we're processing, otherwise the
1508        // invariant we're testing (returning the last part of a seqno will
1509        // downgrade its since) will not hold.
1510        let mut this_seqno = SeqNo::minimum();
1511
1512        // Repeat the same process as above, more or less, while fetching + returning parts
1513        for (mut i, part) in parts.into_iter().enumerate() {
1514            let part_seqno = part.lease.seqno();
1515            let last_seqno = this_seqno;
1516            this_seqno = part_seqno;
1517            assert!(this_seqno >= last_seqno);
1518
1519            let (part, lease) = part.into_exchangeable_part();
1520            let _ = fetcher.fetch_leased_part(part).await;
1521            drop(lease);
1522
1523            // Simulates an exchange
1524            for event in subscribe.next(None).await {
1525                if let ListenEvent::Updates(parts) = event {
1526                    for part in parts {
1527                        let (_, lease) = part.into_exchangeable_part();
1528                        subsequent_parts.push(lease);
1529                    }
1530                }
1531            }
1532
1533            subscribe
1534                .listen
1535                .handle
1536                .downgrade_since(&subscribe.listen.since)
1537                .await;
1538
1539            // Write more new values
1540            i += offset;
1541            write
1542                .expect_compare_and_append(
1543                    &data[i..i + 1],
1544                    u64::cast_from(i),
1545                    u64::cast_from(i) + 1,
1546                )
1547                .await;
1548
1549            // We should expect the SeqNo to be downgraded if this part's SeqNo
1550            // is no longer leased to any other parts, either.
1551            let expect_downgrade = subscribe.listen.handle.outstanding_seqno() > Some(part_seqno);
1552
1553            let new_seqno_since = subscribe.listen.handle.machine.applier.seqno_since();
1554            if expect_downgrade {
1555                assert!(new_seqno_since > seqno_since);
1556            } else {
1557                assert_eq!(new_seqno_since, seqno_since);
1558            }
1559            seqno_since = new_seqno_since;
1560        }
1561
1562        // SeqNo since was downgraded
1563        assert!(seqno_since > original_seqno_since);
1564
1565        // Return any outstanding parts, to prevent a panic!
1566        drop(subsequent_parts);
1567        drop(subscribe);
1568    }
1569
1570    #[mz_ore::test]
1571    fn reader_id_human_readable_serde() {
1572        #[derive(Debug, Serialize, Deserialize)]
1573        struct Container {
1574            reader_id: LeasedReaderId,
1575        }
1576
1577        // roundtrip through json
1578        let id =
1579            LeasedReaderId::from_str("r00000000-1234-5678-0000-000000000000").expect("valid id");
1580        assert_eq!(
1581            id,
1582            serde_json::from_value(serde_json::to_value(id.clone()).expect("serializable"))
1583                .expect("deserializable")
1584        );
1585
1586        // deserialize a serialized string directly
1587        assert_eq!(
1588            id,
1589            serde_json::from_str("\"r00000000-1234-5678-0000-000000000000\"")
1590                .expect("deserializable")
1591        );
1592
1593        // roundtrip id through a container type
1594        let json = json!({ "reader_id": id });
1595        assert_eq!(
1596            "{\"reader_id\":\"r00000000-1234-5678-0000-000000000000\"}",
1597            &json.to_string()
1598        );
1599        let container: Container = serde_json::from_value(json).expect("deserializable");
1600        assert_eq!(container.reader_id, id);
1601    }
1602
1603    // Verifies performance optimizations where a Listener doesn't fetch the
1604    // latest Consensus state if the one it currently has can serve the next
1605    // request.
1606    #[mz_ore::test(tokio::test)]
1607    #[cfg_attr(miri, ignore)] // too slow
1608    async fn skip_consensus_fetch_optimization() {
1609        let data = vec![
1610            (("0".to_owned(), "zero".to_owned()), 0, 1),
1611            (("1".to_owned(), "one".to_owned()), 1, 1),
1612            (("2".to_owned(), "two".to_owned()), 2, 1),
1613        ];
1614
1615        let cfg = PersistConfig::new_for_tests();
1616        let blob = Arc::new(MemBlob::open(MemBlobConfig::default()));
1617        let consensus = Arc::new(MemConsensus::default());
1618        let unreliable = UnreliableHandle::default();
1619        unreliable.totally_available();
1620        let consensus = Arc::new(UnreliableConsensus::new(consensus, unreliable.clone()));
1621        let metrics = Arc::new(Metrics::new(&cfg, &MetricsRegistry::new()));
1622        let pubsub_sender = Arc::new(NoopPubSubSender);
1623        let (mut write, mut read) = PersistClient::new(
1624            cfg,
1625            blob,
1626            consensus,
1627            metrics,
1628            Arc::new(IsolatedRuntime::new_for_tests()),
1629            Arc::new(StateCache::new_no_metrics()),
1630            pubsub_sender,
1631        )
1632        .expect("client construction failed")
1633        .expect_open::<String, String, u64, i64>(ShardId::new())
1634        .await;
1635
1636        write.expect_compare_and_append(&data[0..1], 0, 1).await;
1637        write.expect_compare_and_append(&data[1..2], 1, 2).await;
1638        write.expect_compare_and_append(&data[2..3], 2, 3).await;
1639
1640        let snapshot = read.expect_snapshot_and_fetch(2).await;
1641        let mut listen = read.expect_listen(0).await;
1642
1643        // Manually advance the listener's machine so that it has the latest
1644        // state by fetching the first events from next. This is awkward but
1645        // only necessary because we're about to do some weird things with
1646        // unreliable.
1647        let listen_actual = listen.fetch_next().await;
1648        let expected_events = vec![ListenEvent::Progress(Antichain::from_elem(1))];
1649        assert_eq!(listen_actual, expected_events);
1650
1651        // At this point, the snapshot and listen's state should have all the
1652        // writes. Test this by making consensus completely unavailable.
1653        unreliable.totally_unavailable();
1654        assert_eq!(snapshot, all_ok(&data, 2));
1655        assert_eq!(
1656            listen.read_until(&3).await,
1657            (all_ok(&data[1..], 1), Antichain::from_elem(3))
1658        );
1659    }
1660}