mz_persist_client/
read.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Read capabilities and handles
11
12use async_stream::stream;
13use std::backtrace::Backtrace;
14use std::collections::BTreeMap;
15use std::fmt::Debug;
16use std::future::Future;
17use std::sync::Arc;
18use std::time::Duration;
19
20use differential_dataflow::consolidation::consolidate_updates;
21use differential_dataflow::difference::Monoid;
22use differential_dataflow::lattice::Lattice;
23use differential_dataflow::trace::Description;
24use futures::Stream;
25use futures_util::{StreamExt, stream};
26use mz_dyncfg::Config;
27use mz_ore::halt;
28use mz_ore::instrument;
29use mz_ore::now::EpochMillis;
30use mz_ore::task::{AbortOnDropHandle, JoinHandle, RuntimeExt};
31use mz_persist::location::{Blob, SeqNo};
32use mz_persist_types::columnar::{ColumnDecoder, Schema};
33use mz_persist_types::{Codec, Codec64};
34use proptest_derive::Arbitrary;
35use serde::{Deserialize, Serialize};
36use timely::PartialOrder;
37use timely::order::TotalOrder;
38use timely::progress::{Antichain, Timestamp};
39use tokio::runtime::Handle;
40use tracing::{Instrument, debug_span, warn};
41use uuid::Uuid;
42
43use crate::batch::BLOB_TARGET_SIZE;
44use crate::cfg::{COMPACTION_MEMORY_BOUND_BYTES, RetryParameters};
45use crate::fetch::FetchConfig;
46use crate::fetch::{FetchBatchFilter, FetchedPart, Lease, LeasedBatchPart, fetch_leased_part};
47use crate::internal::encoding::Schemas;
48use crate::internal::machine::{ExpireFn, Machine};
49use crate::internal::metrics::{Metrics, ReadMetrics, ShardMetrics};
50use crate::internal::state::{BatchPart, HollowBatch};
51use crate::internal::watch::StateWatch;
52use crate::iter::{Consolidator, StructuredSort};
53use crate::schema::SchemaCache;
54use crate::stats::{SnapshotPartStats, SnapshotPartsStats, SnapshotStats};
55use crate::{GarbageCollector, PersistConfig, ShardId, parse_id};
56
57pub use crate::internal::encoding::LazyPartStats;
58pub use crate::internal::state::Since;
59
60/// An opaque identifier for a reader of a persist durable TVC (aka shard).
61#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
62#[serde(try_from = "String", into = "String")]
63pub struct LeasedReaderId(pub(crate) [u8; 16]);
64
65impl std::fmt::Display for LeasedReaderId {
66    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67        write!(f, "r{}", Uuid::from_bytes(self.0))
68    }
69}
70
71impl std::fmt::Debug for LeasedReaderId {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        write!(f, "LeasedReaderId({})", Uuid::from_bytes(self.0))
74    }
75}
76
77impl std::str::FromStr for LeasedReaderId {
78    type Err = String;
79
80    fn from_str(s: &str) -> Result<Self, Self::Err> {
81        parse_id("r", "LeasedReaderId", s).map(LeasedReaderId)
82    }
83}
84
85impl From<LeasedReaderId> for String {
86    fn from(reader_id: LeasedReaderId) -> Self {
87        reader_id.to_string()
88    }
89}
90
91impl TryFrom<String> for LeasedReaderId {
92    type Error = String;
93
94    fn try_from(s: String) -> Result<Self, Self::Error> {
95        s.parse()
96    }
97}
98
99impl LeasedReaderId {
100    pub(crate) fn new() -> Self {
101        LeasedReaderId(*Uuid::new_v4().as_bytes())
102    }
103}
104
105/// Capable of generating a snapshot of all data at `as_of`, followed by a
106/// listen of all updates.
107///
108/// For more details, see [`ReadHandle::snapshot`] and [`Listen`].
109#[derive(Debug)]
110pub struct Subscribe<K: Codec, V: Codec, T, D> {
111    snapshot: Option<Vec<LeasedBatchPart<T>>>,
112    listen: Listen<K, V, T, D>,
113}
114
115impl<K, V, T, D> Subscribe<K, V, T, D>
116where
117    K: Debug + Codec,
118    V: Debug + Codec,
119    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
120    D: Monoid + Codec64 + Send + Sync,
121{
122    fn new(snapshot_parts: Vec<LeasedBatchPart<T>>, listen: Listen<K, V, T, D>) -> Self {
123        Subscribe {
124            snapshot: Some(snapshot_parts),
125            listen,
126        }
127    }
128
129    /// Returns a `LeasedBatchPart` enriched with the proper metadata.
130    ///
131    /// First returns snapshot parts, until they're exhausted, at which point
132    /// begins returning listen parts.
133    ///
134    /// The returned `Antichain` represents the subscription progress as it will
135    /// be _after_ the returned parts are fetched.
136    #[instrument(level = "debug", fields(shard = %self.listen.handle.machine.shard_id()))]
137    pub async fn next(
138        &mut self,
139        // If Some, an override for the default listen sleep retry parameters.
140        listen_retry: Option<RetryParameters>,
141    ) -> Vec<ListenEvent<T, LeasedBatchPart<T>>> {
142        match self.snapshot.take() {
143            Some(parts) => vec![ListenEvent::Updates(parts)],
144            None => {
145                let (parts, upper) = self.listen.next(listen_retry).await;
146                vec![ListenEvent::Updates(parts), ListenEvent::Progress(upper)]
147            }
148        }
149    }
150}
151
152impl<K, V, T, D> Subscribe<K, V, T, D>
153where
154    K: Debug + Codec,
155    V: Debug + Codec,
156    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
157    D: Monoid + Codec64 + Send + Sync,
158{
159    /// Equivalent to `next`, but rather than returning a [`LeasedBatchPart`],
160    /// fetches and returns the data from within it.
161    #[instrument(level = "debug", fields(shard = %self.listen.handle.machine.shard_id()))]
162    pub async fn fetch_next(&mut self) -> Vec<ListenEvent<T, ((K, V), T, D)>> {
163        let events = self.next(None).await;
164        let new_len = events
165            .iter()
166            .map(|event| match event {
167                ListenEvent::Updates(parts) => parts.len(),
168                ListenEvent::Progress(_) => 1,
169            })
170            .sum();
171        let mut ret = Vec::with_capacity(new_len);
172        for event in events {
173            match event {
174                ListenEvent::Updates(parts) => {
175                    for part in parts {
176                        let fetched_part = self.listen.fetch_batch_part(part).await;
177                        let updates = fetched_part.collect::<Vec<_>>();
178                        if !updates.is_empty() {
179                            ret.push(ListenEvent::Updates(updates));
180                        }
181                    }
182                }
183                ListenEvent::Progress(progress) => ret.push(ListenEvent::Progress(progress)),
184            }
185        }
186        ret
187    }
188
189    /// Fetches the contents of `part` and returns its lease.
190    pub async fn fetch_batch_part(&mut self, part: LeasedBatchPart<T>) -> FetchedPart<K, V, T, D> {
191        self.listen.fetch_batch_part(part).await
192    }
193}
194
195impl<K, V, T, D> Subscribe<K, V, T, D>
196where
197    K: Debug + Codec,
198    V: Debug + Codec,
199    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
200    D: Monoid + Codec64 + Send + Sync,
201{
202    /// Politely expires this subscribe, releasing its lease.
203    ///
204    /// There is a best-effort impl in Drop for [`ReadHandle`] to expire the
205    /// [`ReadHandle`] held by the subscribe that wasn't explicitly expired
206    /// with this method. When possible, explicit expiry is still preferred
207    /// because the Drop one is best effort and is dependant on a tokio
208    /// [Handle] being available in the TLC at the time of drop (which is a bit
209    /// subtle). Also, explicit expiry allows for control over when it happens.
210    pub async fn expire(mut self) {
211        let _ = self.snapshot.take(); // Drop all leased parts.
212        self.listen.expire().await;
213    }
214}
215
216/// Data and progress events of a shard subscription.
217///
218/// TODO: Unify this with [timely::dataflow::operators::capture::event::Event].
219#[derive(Debug, PartialEq)]
220pub enum ListenEvent<T, D> {
221    /// Progress of the shard.
222    Progress(Antichain<T>),
223    /// Data of the shard.
224    Updates(Vec<D>),
225}
226
227/// An ongoing subscription of updates to a shard.
228#[derive(Debug)]
229pub struct Listen<K: Codec, V: Codec, T, D> {
230    handle: ReadHandle<K, V, T, D>,
231    watch: StateWatch<K, V, T, D>,
232
233    as_of: Antichain<T>,
234    since: Antichain<T>,
235    frontier: Antichain<T>,
236}
237
238impl<K, V, T, D> Listen<K, V, T, D>
239where
240    K: Debug + Codec,
241    V: Debug + Codec,
242    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
243    D: Monoid + Codec64 + Send + Sync,
244{
245    async fn new(
246        mut handle: ReadHandle<K, V, T, D>,
247        as_of: Antichain<T>,
248    ) -> Result<Self, Since<T>> {
249        let () = handle.machine.verify_listen(&as_of)?;
250
251        let since = as_of.clone();
252        if !PartialOrder::less_equal(handle.since(), &since) {
253            // We can't guarantee that the as-of will be available by the time we start reading,
254            // since our handle is already downgraded too far.
255            return Err(Since(handle.since().clone()));
256        }
257        // This listen only needs to distinguish things after its frontier
258        // (initially as_of although the frontier is inclusive and the as_of
259        // isn't). Be a good citizen and downgrade early.
260        handle.downgrade_since(&since).await;
261
262        let watch = handle.machine.applier.watch();
263        Ok(Listen {
264            handle,
265            watch,
266            since,
267            frontier: as_of.clone(),
268            as_of,
269        })
270    }
271
272    /// An exclusive upper bound on the progress of this Listen.
273    pub fn frontier(&self) -> &Antichain<T> {
274        &self.frontier
275    }
276
277    /// Attempt to pull out the next values of this subscription.
278    ///
279    /// The returned [`LeasedBatchPart`] is appropriate to use with
280    /// `crate::fetch::fetch_leased_part`.
281    ///
282    /// The returned `Antichain` represents the subscription progress as it will
283    /// be _after_ the returned parts are fetched.
284    pub async fn next(
285        &mut self,
286        // If Some, an override for the default listen sleep retry parameters.
287        retry: Option<RetryParameters>,
288    ) -> (Vec<LeasedBatchPart<T>>, Antichain<T>) {
289        let batch = loop {
290            let min_elapsed = self.handle.heartbeat_duration();
291            let next_batch = self.handle.machine.next_listen_batch(
292                &self.frontier,
293                &mut self.watch,
294                Some(&self.handle.reader_id),
295                retry,
296            );
297            match tokio::time::timeout(min_elapsed, next_batch).await {
298                Ok(batch) => break batch,
299                Err(_elapsed) => {
300                    self.handle.maybe_downgrade_since(&self.since).await;
301                }
302            }
303        };
304
305        // A lot of things across mz have to line up to hold the following
306        // invariant and violations only show up as subtle correctness errors,
307        // so explicitly validate it here. Better to panic and roll back a
308        // release than be incorrect (also potentially corrupting a sink).
309        //
310        // Note that the since check is intentionally less_than, not less_equal.
311        // If a batch's since is X, that means we can no longer distinguish X
312        // (beyond self.frontier) from X-1 (not beyond self.frontier) to keep
313        // former and filter out the latter.
314        let acceptable_desc = PartialOrder::less_than(batch.desc.since(), &self.frontier)
315            // Special case when the frontier == the as_of (i.e. the first
316            // time this is called on a new Listen). Because as_of is
317            // _exclusive_, we don't need to be able to distinguish X from
318            // X-1.
319            || (self.frontier == self.as_of
320            && PartialOrder::less_equal(batch.desc.since(), &self.frontier));
321        if !acceptable_desc {
322            let lease_state = self
323                .handle
324                .machine
325                .applier
326                .reader_lease(self.handle.reader_id.clone());
327            if let Some(lease) = lease_state {
328                panic!(
329                    "Listen on {} received a batch {:?} advanced past the listen frontier {:?}, but the lease has not expired: {:?}",
330                    self.handle.machine.shard_id(),
331                    batch.desc,
332                    self.frontier,
333                    lease
334                )
335            } else {
336                // Ideally we'd percolate this error up, so callers could eg. restart a dataflow
337                // instead of restarting a process...
338                halt!(
339                    "Listen on {} received a batch {:?} advanced past the listen frontier {:?} after the reader has expired. \
340                     This can happen in exceptional cases: a machine goes to sleep or is running out of memory or CPU, for example.",
341                    self.handle.machine.shard_id(),
342                    batch.desc,
343                    self.frontier
344                )
345            }
346        }
347
348        let new_frontier = batch.desc.upper().clone();
349
350        // We will have a new frontier, so this is an opportunity to downgrade our
351        // since capability. Go through `maybe_heartbeat` so we can rate limit
352        // this along with our heartbeats.
353        //
354        // HACK! Everything would be simpler if we could downgrade since to the
355        // new frontier, but we can't. The next call needs to be able to
356        // distinguish between the times T at the frontier (to emit updates with
357        // these times) and T-1 (to filter them). Advancing the since to
358        // frontier would erase the ability to distinguish between them. Ideally
359        // we'd use what is conceptually "batch.upper - 1" (the greatest
360        // elements that are still strictly less than batch.upper, which will be
361        // the new value of self.frontier after this call returns), but the
362        // trait bounds on T don't give us a way to compute that directly.
363        // Instead, we sniff out any elements in self.frontier (the upper of the
364        // batch the last time we called this) that are strictly less_than the
365        // batch upper to compute a new since. For totally ordered times
366        // (currently always the case in mz) self.frontier will always have a
367        // single element and it will be less_than upper. We
368        // could also abuse the fact that every time we actually emit is
369        // guaranteed by definition to be less_than upper to be a bit more
370        // prompt, but this would involve a lot more temporary antichains and
371        // it's unclear if that's worth it.
372        for x in self.frontier.elements().iter() {
373            let less_than_upper = batch.desc.upper().elements().iter().any(|u| x.less_than(u));
374            if less_than_upper {
375                self.since.join_assign(&Antichain::from_elem(x.clone()));
376            }
377        }
378
379        // IMPORTANT! Make sure this `lease_batch_parts` stays before the
380        // `maybe_downgrade_since` call. Otherwise, we might give up our
381        // capability on the batch's SeqNo before we lease it, which could lead
382        // to blobs that it references being GC'd.
383        let filter = FetchBatchFilter::Listen {
384            as_of: self.as_of.clone(),
385            lower: self.frontier.clone(),
386        };
387        let parts = self.handle.lease_batch_parts(batch, filter).collect().await;
388
389        self.handle.maybe_downgrade_since(&self.since).await;
390
391        // NB: Keep this after we use self.frontier to join_assign self.since
392        // and also after we construct metadata.
393        self.frontier = new_frontier;
394
395        (parts, self.frontier.clone())
396    }
397}
398
399impl<K, V, T, D> Listen<K, V, T, D>
400where
401    K: Debug + Codec,
402    V: Debug + Codec,
403    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
404    D: Monoid + Codec64 + Send + Sync,
405{
406    /// Attempt to pull out the next values of this subscription.
407    ///
408    /// The updates received in [ListenEvent::Updates] should be assumed to be in arbitrary order
409    /// and not necessarily consolidated. However, the timestamp of each individual update will be
410    /// greater than or equal to the last received [ListenEvent::Progress] frontier (or this
411    /// [Listen]'s initial `as_of` frontier if no progress event has been emitted yet) and less
412    /// than the next [ListenEvent::Progress] frontier.
413    ///
414    /// If you have a use for consolidated listen output, given that snapshots can't be
415    /// consolidated, come talk to us!
416    #[instrument(level = "debug", name = "listen::next", fields(shard = %self.handle.machine.shard_id()))]
417    pub async fn fetch_next(&mut self) -> Vec<ListenEvent<T, ((K, V), T, D)>> {
418        let (parts, progress) = self.next(None).await;
419        let mut ret = Vec::with_capacity(parts.len() + 1);
420        for part in parts {
421            let fetched_part = self.fetch_batch_part(part).await;
422            let updates = fetched_part.collect::<Vec<_>>();
423            if !updates.is_empty() {
424                ret.push(ListenEvent::Updates(updates));
425            }
426        }
427        ret.push(ListenEvent::Progress(progress));
428        ret
429    }
430
431    /// Convert listener into futures::Stream
432    pub fn into_stream(mut self) -> impl Stream<Item = ListenEvent<T, ((K, V), T, D)>> {
433        async_stream::stream!({
434            loop {
435                for msg in self.fetch_next().await {
436                    yield msg;
437                }
438            }
439        })
440    }
441
442    /// Test helper to read from the listener until the given frontier is
443    /// reached. Because compaction can arbitrarily combine batches, we only
444    /// return the final progress info.
445    #[cfg(test)]
446    #[track_caller]
447    pub async fn read_until(&mut self, ts: &T) -> (Vec<((K, V), T, D)>, Antichain<T>) {
448        let mut updates = Vec::new();
449        let mut frontier = Antichain::from_elem(T::minimum());
450        while self.frontier.less_than(ts) {
451            for event in self.fetch_next().await {
452                match event {
453                    ListenEvent::Updates(mut x) => updates.append(&mut x),
454                    ListenEvent::Progress(x) => frontier = x,
455                }
456            }
457        }
458        // Unlike most tests, intentionally don't consolidate updates here
459        // because Listen replays them at the original fidelity.
460        (updates, frontier)
461    }
462}
463
464impl<K, V, T, D> Listen<K, V, T, D>
465where
466    K: Debug + Codec,
467    V: Debug + Codec,
468    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
469    D: Monoid + Codec64 + Send + Sync,
470{
471    /// Fetches the contents of `part` and returns its lease.
472    ///
473    /// This is broken out into its own function to provide a trivial means for
474    /// [`Subscribe`], which contains a [`Listen`], to fetch batches.
475    async fn fetch_batch_part(&mut self, part: LeasedBatchPart<T>) -> FetchedPart<K, V, T, D> {
476        let fetched_part = fetch_leased_part(
477            &self.handle.cfg,
478            &part,
479            self.handle.blob.as_ref(),
480            Arc::clone(&self.handle.metrics),
481            &self.handle.metrics.read.listen,
482            &self.handle.machine.applier.shard_metrics,
483            &self.handle.reader_id,
484            self.handle.read_schemas.clone(),
485            &mut self.handle.schema_cache,
486        )
487        .await;
488        fetched_part
489    }
490
491    /// Politely expires this listen, releasing its lease.
492    ///
493    /// There is a best-effort impl in Drop for [`ReadHandle`] to expire the
494    /// [`ReadHandle`] held by the listen that wasn't explicitly expired with
495    /// this method. When possible, explicit expiry is still preferred because
496    /// the Drop one is best effort and is dependant on a tokio [Handle] being
497    /// available in the TLC at the time of drop (which is a bit subtle). Also,
498    /// explicit expiry allows for control over when it happens.
499    pub async fn expire(self) {
500        self.handle.expire().await
501    }
502}
503
504/// A "capability" granting the ability to read the state of some shard at times
505/// greater or equal to `self.since()`.
506///
507/// Production users should call [Self::expire] before dropping a ReadHandle so
508/// that it can expire its leases. If/when rust gets AsyncDrop, this will be
509/// done automatically.
510///
511/// All async methods on ReadHandle retry for as long as they are able, but the
512/// returned [std::future::Future]s implement "cancel on drop" semantics. This
513/// means that callers can add a timeout using [tokio::time::timeout] or
514/// [tokio::time::timeout_at].
515///
516/// ```rust,no_run
517/// # let mut read: mz_persist_client::read::ReadHandle<String, String, u64, i64> = unimplemented!();
518/// # let timeout: std::time::Duration = unimplemented!();
519/// # let new_since: timely::progress::Antichain<u64> = unimplemented!();
520/// # async {
521/// tokio::time::timeout(timeout, read.downgrade_since(&new_since)).await
522/// # };
523/// ```
524#[derive(Debug)]
525pub struct ReadHandle<K: Codec, V: Codec, T, D> {
526    pub(crate) cfg: PersistConfig,
527    pub(crate) metrics: Arc<Metrics>,
528    pub(crate) machine: Machine<K, V, T, D>,
529    pub(crate) gc: GarbageCollector<K, V, T, D>,
530    pub(crate) blob: Arc<dyn Blob>,
531    pub(crate) reader_id: LeasedReaderId,
532    pub(crate) read_schemas: Schemas<K, V>,
533    pub(crate) schema_cache: SchemaCache<K, V, T, D>,
534
535    since: Antichain<T>,
536    pub(crate) last_heartbeat: EpochMillis,
537    pub(crate) leased_seqnos: BTreeMap<SeqNo, Lease>,
538    pub(crate) unexpired_state: Option<UnexpiredReadHandleState>,
539}
540
541/// Length of time after a reader's last operation after which the reader may be
542/// expired.
543pub(crate) const READER_LEASE_DURATION: Config<Duration> = Config::new(
544    "persist_reader_lease_duration",
545    Duration::from_secs(60 * 15),
546    "The time after which we'll clean up stale read leases",
547);
548
549impl<K, V, T, D> ReadHandle<K, V, T, D>
550where
551    K: Debug + Codec,
552    V: Debug + Codec,
553    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
554    D: Monoid + Codec64 + Send + Sync,
555{
556    pub(crate) async fn new(
557        cfg: PersistConfig,
558        metrics: Arc<Metrics>,
559        machine: Machine<K, V, T, D>,
560        gc: GarbageCollector<K, V, T, D>,
561        blob: Arc<dyn Blob>,
562        reader_id: LeasedReaderId,
563        read_schemas: Schemas<K, V>,
564        since: Antichain<T>,
565        last_heartbeat: EpochMillis,
566    ) -> Self {
567        let schema_cache = machine.applier.schema_cache();
568        let expire_fn = Self::expire_fn(machine.clone(), gc.clone(), reader_id.clone());
569        ReadHandle {
570            cfg,
571            metrics: Arc::clone(&metrics),
572            machine: machine.clone(),
573            gc: gc.clone(),
574            blob,
575            reader_id: reader_id.clone(),
576            read_schemas,
577            schema_cache,
578            since,
579            last_heartbeat,
580            leased_seqnos: BTreeMap::new(),
581            unexpired_state: Some(UnexpiredReadHandleState {
582                expire_fn,
583                _heartbeat_tasks: machine
584                    .start_reader_heartbeat_tasks(reader_id, gc)
585                    .await
586                    .into_iter()
587                    .map(JoinHandle::abort_on_drop)
588                    .collect(),
589            }),
590        }
591    }
592
593    /// This handle's shard id.
594    pub fn shard_id(&self) -> ShardId {
595        self.machine.shard_id()
596    }
597
598    /// This handle's `since` frontier.
599    ///
600    /// This will always be greater or equal to the shard-global `since`.
601    pub fn since(&self) -> &Antichain<T> {
602        &self.since
603    }
604
605    fn outstanding_seqno(&mut self) -> Option<SeqNo> {
606        while let Some(first) = self.leased_seqnos.first_entry() {
607            if first.get().count() <= 1 {
608                first.remove();
609            } else {
610                return Some(*first.key());
611            }
612        }
613        None
614    }
615
616    /// Forwards the since frontier of this handle, giving up the ability to
617    /// read at times not greater or equal to `new_since`.
618    ///
619    /// This may trigger (asynchronous) compaction and consolidation in the
620    /// system. A `new_since` of the empty antichain "finishes" this shard,
621    /// promising that no more data will ever be read by this handle.
622    ///
623    /// This also acts as a heartbeat for the reader lease (including if called
624    /// with `new_since` equal to something like `self.since()` or the minimum
625    /// timestamp, making the call a no-op).
626    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
627    pub async fn downgrade_since(&mut self, new_since: &Antichain<T>) {
628        // Guaranteed to be the smallest/oldest outstanding lease on a `SeqNo`.
629        let outstanding_seqno = self.outstanding_seqno();
630
631        let heartbeat_ts = (self.cfg.now)();
632        let (_seqno, current_reader_since, maintenance) = self
633            .machine
634            .downgrade_since(&self.reader_id, outstanding_seqno, new_since, heartbeat_ts)
635            .await;
636
637        // Debugging for database-issues#4590.
638        if let Some(outstanding_seqno) = outstanding_seqno {
639            let seqnos_held = _seqno.0.saturating_sub(outstanding_seqno.0);
640            // We get just over 1 seqno-per-second on average for a shard in
641            // prod, so this is about an hour.
642            const SEQNOS_HELD_THRESHOLD: u64 = 60 * 60;
643            if seqnos_held >= SEQNOS_HELD_THRESHOLD {
644                tracing::info!(
645                    "{} reader {} holding an unexpected number of seqnos {} vs {}: {:?}. bt: {:?}",
646                    self.machine.shard_id(),
647                    self.reader_id,
648                    outstanding_seqno,
649                    _seqno,
650                    self.leased_seqnos.keys().take(10).collect::<Vec<_>>(),
651                    // The Debug impl of backtrace is less aesthetic, but will put the trace
652                    // on a single line and play more nicely with our Honeycomb quota
653                    Backtrace::force_capture(),
654                );
655            }
656        }
657
658        self.since = current_reader_since.0;
659        // A heartbeat is just any downgrade_since traffic, so update the
660        // internal rate limiter here to play nicely with `maybe_heartbeat`.
661        self.last_heartbeat = heartbeat_ts;
662        maintenance.start_performing(&self.machine, &self.gc);
663    }
664
665    /// Returns an ongoing subscription of updates to a shard.
666    ///
667    /// The stream includes all data at times greater than `as_of`. Combined
668    /// with [Self::snapshot] it will produce exactly correct results: the
669    /// snapshot is the TVCs contents at `as_of` and all subsequent updates
670    /// occur at exactly their indicated time. The recipient should only
671    /// downgrade their read capability when they are certain they have all data
672    /// through the frontier they would downgrade to.
673    ///
674    /// This takes ownership of the ReadHandle so the Listen can use it to
675    /// [Self::downgrade_since] as it progresses. If you need to keep this
676    /// handle, then [Self::clone] it before calling listen.
677    ///
678    /// The `Since` error indicates that the requested `as_of` cannot be served
679    /// (the caller has out of date information) and includes the smallest
680    /// `as_of` that would have been accepted.
681    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
682    pub async fn listen(self, as_of: Antichain<T>) -> Result<Listen<K, V, T, D>, Since<T>> {
683        Listen::new(self, as_of).await
684    }
685
686    /// Returns all of the contents of the shard TVC at `as_of` broken up into
687    /// [`LeasedBatchPart`]es. These parts can be "turned in" via
688    /// `crate::fetch::fetch_batch_part` to receive the data they contain.
689    ///
690    /// This command returns the contents of this shard as of `as_of` once they
691    /// are known. This may "block" (in an async-friendly way) if `as_of` is
692    /// greater or equal to the current `upper` of the shard. The recipient
693    /// should only downgrade their read capability when they are certain they
694    /// have all data through the frontier they would downgrade to.
695    ///
696    /// The `Since` error indicates that the requested `as_of` cannot be served
697    /// (the caller has out of date information) and includes the smallest
698    /// `as_of` that would have been accepted.
699    #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
700    pub async fn snapshot(
701        &mut self,
702        as_of: Antichain<T>,
703    ) -> Result<Vec<LeasedBatchPart<T>>, Since<T>> {
704        let batches = loop {
705            let min_elapsed = self.heartbeat_duration();
706            match tokio::time::timeout(min_elapsed, self.machine.snapshot(&as_of)).await {
707                Ok(Ok(batches)) => break batches,
708                Ok(Err(since)) => return Err(since),
709                Err(_timeout) => {
710                    let since = self.since().clone();
711                    self.maybe_downgrade_since(&since).await;
712                }
713            }
714        };
715
716        if !PartialOrder::less_equal(self.since(), &as_of) {
717            return Err(Since(self.since().clone()));
718        }
719
720        let filter = FetchBatchFilter::Snapshot { as_of };
721        let mut leased_parts = Vec::new();
722        for batch in batches {
723            // Flatten the HollowBatch into one LeasedBatchPart per key. Each key
724            // corresponds to a "part" or s3 object. This allows persist_source
725            // to distribute work by parts (smallish, more even size) instead of
726            // batches (arbitrarily large).
727            leased_parts.extend(
728                self.lease_batch_parts(batch, filter.clone())
729                    .collect::<Vec<_>>()
730                    .await,
731            );
732        }
733        Ok(leased_parts)
734    }
735
736    /// Returns a snapshot of all of a shard's data using `as_of`, followed by
737    /// listening to any future updates.
738    ///
739    /// For more details on this operation's semantics, see [Self::snapshot] and
740    /// [Self::listen].
741    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
742    pub async fn subscribe(
743        mut self,
744        as_of: Antichain<T>,
745    ) -> Result<Subscribe<K, V, T, D>, Since<T>> {
746        let snapshot_parts = self.snapshot(as_of.clone()).await?;
747        let listen = self.listen(as_of.clone()).await?;
748        Ok(Subscribe::new(snapshot_parts, listen))
749    }
750
751    fn lease_batch_part(
752        &mut self,
753        desc: Description<T>,
754        part: BatchPart<T>,
755        filter: FetchBatchFilter<T>,
756    ) -> LeasedBatchPart<T> {
757        LeasedBatchPart {
758            metrics: Arc::clone(&self.metrics),
759            shard_id: self.machine.shard_id(),
760            filter,
761            desc,
762            part,
763            lease: self.lease_seqno(),
764            filter_pushdown_audit: false,
765        }
766    }
767
768    fn lease_batch_parts(
769        &mut self,
770        batch: HollowBatch<T>,
771        filter: FetchBatchFilter<T>,
772    ) -> impl Stream<Item = LeasedBatchPart<T>> + '_ {
773        stream! {
774            let blob = Arc::clone(&self.blob);
775            let metrics = Arc::clone(&self.metrics);
776            let desc = batch.desc.clone();
777            for await part in batch.part_stream(self.shard_id(), &*blob, &*metrics) {
778                yield self.lease_batch_part(desc.clone(), part.expect("leased part").into_owned(), filter.clone())
779            }
780        }
781    }
782
783    /// Tracks that the `ReadHandle`'s machine's current `SeqNo` is being
784    /// "leased out" to a `LeasedBatchPart`, and cannot be garbage
785    /// collected until its lease has been returned.
786    fn lease_seqno(&mut self) -> Lease {
787        let seqno = self.machine.seqno();
788        let lease = self
789            .leased_seqnos
790            .entry(seqno)
791            .or_insert_with(|| Lease::new(seqno));
792        lease.clone()
793    }
794
795    /// Returns an independent [ReadHandle] with a new [LeasedReaderId] but the
796    /// same `since`.
797    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
798    pub async fn clone(&self, purpose: &str) -> Self {
799        let new_reader_id = LeasedReaderId::new();
800        let machine = self.machine.clone();
801        let gc = self.gc.clone();
802        let heartbeat_ts = (self.cfg.now)();
803        let (reader_state, maintenance) = machine
804            .register_leased_reader(
805                &new_reader_id,
806                purpose,
807                READER_LEASE_DURATION.get(&self.cfg),
808                heartbeat_ts,
809                false,
810            )
811            .await;
812        maintenance.start_performing(&machine, &gc);
813        // The point of clone is that you're guaranteed to have the same (or
814        // greater) since capability, verify that.
815        // TODO: better if it's the same since capability exactly.
816        assert!(PartialOrder::less_equal(&reader_state.since, &self.since));
817        let new_reader = ReadHandle::new(
818            self.cfg.clone(),
819            Arc::clone(&self.metrics),
820            machine,
821            gc,
822            Arc::clone(&self.blob),
823            new_reader_id,
824            self.read_schemas.clone(),
825            reader_state.since,
826            heartbeat_ts,
827        )
828        .await;
829        new_reader
830    }
831
832    fn heartbeat_duration(&self) -> Duration {
833        READER_LEASE_DURATION.get(&self.cfg) / 4
834    }
835
836    /// A rate-limited version of [Self::downgrade_since].
837    ///
838    /// This is an internally rate limited helper, designed to allow users to
839    /// call it as frequently as they like. Call this or [Self::downgrade_since],
840    /// on some interval that is "frequent" compared to the read lease duration.
841    pub async fn maybe_downgrade_since(&mut self, new_since: &Antichain<T>) {
842        let min_elapsed = self.heartbeat_duration();
843        let elapsed_since_last_heartbeat =
844            Duration::from_millis((self.cfg.now)().saturating_sub(self.last_heartbeat));
845        if elapsed_since_last_heartbeat >= min_elapsed {
846            self.downgrade_since(new_since).await;
847        }
848    }
849
850    /// Politely expires this reader, releasing its lease.
851    ///
852    /// There is a best-effort impl in Drop to expire a reader that wasn't
853    /// explictly expired with this method. When possible, explicit expiry is
854    /// still preferred because the Drop one is best effort and is dependant on
855    /// a tokio [Handle] being available in the TLC at the time of drop (which
856    /// is a bit subtle). Also, explicit expiry allows for control over when it
857    /// happens.
858    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
859    pub async fn expire(mut self) {
860        // We drop the unexpired state before expiring the reader to ensure the
861        // heartbeat tasks can never observe the expired state. This doesn't
862        // matter for correctness, but avoids confusing log output if the
863        // heartbeat task were to discover that its lease has been expired.
864        let Some(unexpired_state) = self.unexpired_state.take() else {
865            return;
866        };
867        unexpired_state.expire_fn.0().await;
868    }
869
870    fn expire_fn(
871        machine: Machine<K, V, T, D>,
872        gc: GarbageCollector<K, V, T, D>,
873        reader_id: LeasedReaderId,
874    ) -> ExpireFn {
875        ExpireFn(Box::new(move || {
876            Box::pin(async move {
877                let (_, maintenance) = machine.expire_leased_reader(&reader_id).await;
878                maintenance.start_performing(&machine, &gc);
879            })
880        }))
881    }
882
883    /// Test helper for a [Self::listen] call that is expected to succeed.
884    #[cfg(test)]
885    #[track_caller]
886    pub async fn expect_listen(self, as_of: T) -> Listen<K, V, T, D> {
887        self.listen(Antichain::from_elem(as_of))
888            .await
889            .expect("cannot serve requested as_of")
890    }
891}
892
893/// State for a read handle that has not been explicitly expired.
894#[derive(Debug)]
895pub(crate) struct UnexpiredReadHandleState {
896    expire_fn: ExpireFn,
897    pub(crate) _heartbeat_tasks: Vec<AbortOnDropHandle<()>>,
898}
899
900/// An incremental cursor through a particular shard, returned from [ReadHandle::snapshot_cursor].
901///
902/// To read an entire dataset, the
903/// client should call `next` until it returns `None`, which signals all data has been returned...
904/// but it's also free to abandon the instance at any time if it eg. only needs a few entries.
905#[derive(Debug)]
906pub struct Cursor<K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64, L = Lease> {
907    consolidator: Consolidator<T, D, StructuredSort<K, V, T, D>>,
908    max_len: usize,
909    max_bytes: usize,
910    _lease: L,
911    read_schemas: Schemas<K, V>,
912}
913
914impl<K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64, L> Cursor<K, V, T, D, L> {
915    /// Extracts and returns the lease from the cursor. Allowing the caller to
916    /// do any necessary cleanup associated with the lease.
917    pub fn into_lease(self: Self) -> L {
918        self._lease
919    }
920}
921
922impl<K, V, T, D, L> Cursor<K, V, T, D, L>
923where
924    K: Debug + Codec + Ord,
925    V: Debug + Codec + Ord,
926    T: Timestamp + Lattice + Codec64 + Sync,
927    D: Monoid + Ord + Codec64 + Send + Sync,
928{
929    /// Grab the next batch of consolidated data.
930    pub async fn next(&mut self) -> Option<impl Iterator<Item = ((K, V), T, D)> + '_> {
931        let Self {
932            consolidator,
933            max_len,
934            max_bytes,
935            _lease,
936            read_schemas: _,
937        } = self;
938
939        let part = consolidator
940            .next_chunk(*max_len, *max_bytes)
941            .await
942            .expect("fetching a leased part")?;
943        let key_decoder = self
944            .read_schemas
945            .key
946            .decoder_any(part.key.as_ref())
947            .expect("ok");
948        let val_decoder = self
949            .read_schemas
950            .val
951            .decoder_any(part.val.as_ref())
952            .expect("ok");
953        let iter = (0..part.len()).map(move |i| {
954            let mut k = K::default();
955            let mut v = V::default();
956            key_decoder.decode(i, &mut k);
957            val_decoder.decode(i, &mut v);
958            let t = T::decode(part.time.value(i).to_le_bytes());
959            let d = D::decode(part.diff.value(i).to_le_bytes());
960            ((k, v), t, d)
961        });
962
963        Some(iter)
964    }
965}
966
967impl<K, V, T, D> ReadHandle<K, V, T, D>
968where
969    K: Debug + Codec + Ord,
970    V: Debug + Codec + Ord,
971    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
972    D: Monoid + Ord + Codec64 + Send + Sync,
973{
974    /// Generates a [Self::snapshot], and fetches all of the batches it
975    /// contains.
976    ///
977    /// The output is consolidated. Furthermore, to keep memory usage down when
978    /// reading a snapshot that consolidates well, this consolidates as it goes.
979    ///
980    /// Potential future improvements (if necessary):
981    /// - Accept something like a `F: Fn(K,V) -> (K,V)` argument, which looks
982    ///   like an MFP you might be pushing down. Reason being that if you are
983    ///   projecting or transforming in a way that allows further consolidation,
984    ///   amazing.
985    /// - Reuse any code we write to streaming-merge consolidate in
986    ///   persist_source here.
987    pub async fn snapshot_and_fetch(
988        &mut self,
989        as_of: Antichain<T>,
990    ) -> Result<Vec<((K, V), T, D)>, Since<T>> {
991        let mut cursor = self.snapshot_cursor(as_of, |_| true).await?;
992        let mut contents = Vec::new();
993        while let Some(iter) = cursor.next().await {
994            contents.extend(iter);
995        }
996
997        // We don't currently guarantee that encoding is one-to-one, so we still need to
998        // consolidate the decoded outputs. However, let's report if this isn't a noop.
999        let old_len = contents.len();
1000        consolidate_updates(&mut contents);
1001        if old_len != contents.len() {
1002            // TODO(bkirwi): do we need more / finer-grained metrics for this?
1003            self.machine
1004                .applier
1005                .shard_metrics
1006                .unconsolidated_snapshot
1007                .inc();
1008        }
1009
1010        Ok(contents)
1011    }
1012
1013    /// Generates a [Self::snapshot], and fetches all of the batches it
1014    /// contains.
1015    ///
1016    /// To keep memory usage down when reading a snapshot that consolidates well, this consolidates
1017    /// as it goes. However, note that only the serialized data is consolidated: the deserialized
1018    /// data will only be consolidated if your K/V codecs are one-to-one.
1019    pub async fn snapshot_cursor(
1020        &mut self,
1021        as_of: Antichain<T>,
1022        should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
1023    ) -> Result<Cursor<K, V, T, D>, Since<T>> {
1024        let batches = self.machine.snapshot(&as_of).await?;
1025        let lease = self.lease_seqno();
1026
1027        Self::read_batches_consolidated(
1028            &self.cfg,
1029            Arc::clone(&self.metrics),
1030            Arc::clone(&self.machine.applier.shard_metrics),
1031            self.metrics.read.snapshot.clone(),
1032            Arc::clone(&self.blob),
1033            self.shard_id(),
1034            as_of,
1035            self.read_schemas.clone(),
1036            &batches,
1037            lease,
1038            should_fetch_part,
1039            COMPACTION_MEMORY_BOUND_BYTES.get(&self.cfg),
1040        )
1041    }
1042
1043    pub(crate) fn read_batches_consolidated<L>(
1044        persist_cfg: &PersistConfig,
1045        metrics: Arc<Metrics>,
1046        shard_metrics: Arc<ShardMetrics>,
1047        read_metrics: ReadMetrics,
1048        blob: Arc<dyn Blob>,
1049        shard_id: ShardId,
1050        as_of: Antichain<T>,
1051        schemas: Schemas<K, V>,
1052        batches: &[HollowBatch<T>],
1053        lease: L,
1054        should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
1055        memory_budget_bytes: usize,
1056    ) -> Result<Cursor<K, V, T, D, L>, Since<T>> {
1057        let context = format!("{}[as_of={:?}]", shard_id, as_of.elements());
1058        let filter = FetchBatchFilter::Snapshot {
1059            as_of: as_of.clone(),
1060        };
1061
1062        let mut consolidator = Consolidator::new(
1063            context,
1064            FetchConfig::from_persist_config(persist_cfg),
1065            shard_id,
1066            StructuredSort::new(schemas.clone()),
1067            blob,
1068            metrics,
1069            shard_metrics,
1070            read_metrics,
1071            filter,
1072            None,
1073            memory_budget_bytes,
1074        );
1075        for batch in batches {
1076            for (meta, run) in batch.runs() {
1077                consolidator.enqueue_run(
1078                    &batch.desc,
1079                    meta,
1080                    run.into_iter()
1081                        .filter(|p| should_fetch_part(p.stats()))
1082                        .cloned(),
1083                );
1084            }
1085        }
1086        // This default may end up consolidating more records than previously
1087        // for cases like fast-path peeks, where only the first few entries are used.
1088        // If this is a noticeable performance impact, thread the max-len in from the caller.
1089        let max_len = persist_cfg.compaction_yield_after_n_updates;
1090        let max_bytes = BLOB_TARGET_SIZE.get(persist_cfg).max(1);
1091
1092        Ok(Cursor {
1093            consolidator,
1094            max_len,
1095            max_bytes,
1096            _lease: lease,
1097            read_schemas: schemas,
1098        })
1099    }
1100
1101    /// Returns aggregate statistics about the contents of the shard TVC at the
1102    /// given frontier.
1103    ///
1104    /// This command returns the contents of this shard as of `as_of` once they
1105    /// are known. This may "block" (in an async-friendly way) if `as_of` is
1106    /// greater or equal to the current `upper` of the shard. If `None` is given
1107    /// for `as_of`, then the latest stats known by this process are used.
1108    ///
1109    /// The `Since` error indicates that the requested `as_of` cannot be served
1110    /// (the caller has out of date information) and includes the smallest
1111    /// `as_of` that would have been accepted.
1112    pub fn snapshot_stats(
1113        &self,
1114        as_of: Option<Antichain<T>>,
1115    ) -> impl Future<Output = Result<SnapshotStats, Since<T>>> + Send + 'static {
1116        let machine = self.machine.clone();
1117        async move {
1118            let batches = match as_of {
1119                Some(as_of) => machine.snapshot(&as_of).await?,
1120                None => machine.applier.all_batches(),
1121            };
1122            let num_updates = batches.iter().map(|b| b.len).sum();
1123            Ok(SnapshotStats {
1124                shard_id: machine.shard_id(),
1125                num_updates,
1126            })
1127        }
1128    }
1129
1130    /// Returns aggregate statistics about the contents of the shard TVC at the
1131    /// given frontier.
1132    ///
1133    /// This command returns the contents of this shard as of `as_of` once they
1134    /// are known. This may "block" (in an async-friendly way) if `as_of` is
1135    /// greater or equal to the current `upper` of the shard.
1136    ///
1137    /// The `Since` error indicates that the requested `as_of` cannot be served
1138    /// (the caller has out of date information) and includes the smallest
1139    /// `as_of` that would have been accepted.
1140    pub async fn snapshot_parts_stats(
1141        &self,
1142        as_of: Antichain<T>,
1143    ) -> Result<SnapshotPartsStats, Since<T>> {
1144        let batches = self.machine.snapshot(&as_of).await?;
1145        let parts = stream::iter(&batches)
1146            .flat_map(|b| b.part_stream(self.shard_id(), &*self.blob, &*self.metrics))
1147            .map(|p| {
1148                let p = p.expect("live batch");
1149                SnapshotPartStats {
1150                    encoded_size_bytes: p.encoded_size_bytes(),
1151                    stats: p.stats().cloned(),
1152                }
1153            })
1154            .collect()
1155            .await;
1156        Ok(SnapshotPartsStats {
1157            metrics: Arc::clone(&self.machine.applier.metrics),
1158            shard_id: self.machine.shard_id(),
1159            parts,
1160        })
1161    }
1162}
1163
1164impl<K, V, T, D> ReadHandle<K, V, T, D>
1165where
1166    K: Debug + Codec + Ord,
1167    V: Debug + Codec + Ord,
1168    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
1169    D: Monoid + Codec64 + Send + Sync,
1170{
1171    /// Generates a [Self::snapshot], and streams out all of the updates
1172    /// it contains in bounded memory.
1173    ///
1174    /// The output is not consolidated.
1175    pub async fn snapshot_and_stream(
1176        &mut self,
1177        as_of: Antichain<T>,
1178    ) -> Result<impl Stream<Item = ((K, V), T, D)> + use<K, V, T, D>, Since<T>> {
1179        let snap = self.snapshot(as_of).await?;
1180
1181        let blob = Arc::clone(&self.blob);
1182        let metrics = Arc::clone(&self.metrics);
1183        let snapshot_metrics = self.metrics.read.snapshot.clone();
1184        let shard_metrics = Arc::clone(&self.machine.applier.shard_metrics);
1185        let reader_id = self.reader_id.clone();
1186        let schemas = self.read_schemas.clone();
1187        let mut schema_cache = self.schema_cache.clone();
1188        let persist_cfg = self.cfg.clone();
1189        let stream = async_stream::stream! {
1190            for part in snap {
1191                let mut fetched_part = fetch_leased_part(
1192                    &persist_cfg,
1193                    &part,
1194                    blob.as_ref(),
1195                    Arc::clone(&metrics),
1196                    &snapshot_metrics,
1197                    &shard_metrics,
1198                    &reader_id,
1199                    schemas.clone(),
1200                    &mut schema_cache,
1201                )
1202                .await;
1203
1204                while let Some(next) = fetched_part.next() {
1205                    yield next;
1206                }
1207            }
1208        };
1209
1210        Ok(stream)
1211    }
1212}
1213
1214impl<K, V, T, D> ReadHandle<K, V, T, D>
1215where
1216    K: Debug + Codec + Ord,
1217    V: Debug + Codec + Ord,
1218    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
1219    D: Monoid + Ord + Codec64 + Send + Sync,
1220{
1221    /// Test helper to generate a [Self::snapshot] call that is expected to
1222    /// succeed, process its batches, and then return its data sorted.
1223    #[cfg(test)]
1224    #[track_caller]
1225    pub async fn expect_snapshot_and_fetch(&mut self, as_of: T) -> Vec<((K, V), T, D)> {
1226        let mut ret = self
1227            .snapshot_and_fetch(Antichain::from_elem(as_of))
1228            .await
1229            .expect("cannot serve requested as_of");
1230
1231        ret.sort();
1232        ret
1233    }
1234}
1235
1236impl<K: Codec, V: Codec, T, D> Drop for ReadHandle<K, V, T, D> {
1237    fn drop(&mut self) {
1238        // We drop the unexpired state before expiring the reader to ensure the
1239        // heartbeat tasks can never observe the expired state. This doesn't
1240        // matter for correctness, but avoids confusing log output if the
1241        // heartbeat task were to discover that its lease has been expired.
1242        let Some(unexpired_state) = self.unexpired_state.take() else {
1243            return;
1244        };
1245
1246        let handle = match Handle::try_current() {
1247            Ok(x) => x,
1248            Err(_) => {
1249                warn!(
1250                    "ReadHandle {} dropped without being explicitly expired, falling back to lease timeout",
1251                    self.reader_id
1252                );
1253                return;
1254            }
1255        };
1256        // Spawn a best-effort task to expire this read handle. It's fine if
1257        // this doesn't run to completion, we'd just have to wait out the lease
1258        // before the shard-global since is unblocked.
1259        //
1260        // Intentionally create the span outside the task to set the parent.
1261        let expire_span = debug_span!("drop::expire");
1262        handle.spawn_named(
1263            || format!("ReadHandle::expire ({})", self.reader_id),
1264            unexpired_state.expire_fn.0().instrument(expire_span),
1265        );
1266    }
1267}
1268
1269#[cfg(test)]
1270mod tests {
1271    use std::pin;
1272    use std::str::FromStr;
1273
1274    use mz_dyncfg::ConfigUpdates;
1275    use mz_ore::cast::CastFrom;
1276    use mz_ore::metrics::MetricsRegistry;
1277    use mz_persist::mem::{MemBlob, MemBlobConfig, MemConsensus};
1278    use mz_persist::unreliable::{UnreliableConsensus, UnreliableHandle};
1279    use serde::{Deserialize, Serialize};
1280    use serde_json::json;
1281    use tokio_stream::StreamExt;
1282
1283    use crate::async_runtime::IsolatedRuntime;
1284    use crate::batch::BLOB_TARGET_SIZE;
1285    use crate::cache::StateCache;
1286    use crate::internal::metrics::Metrics;
1287    use crate::rpc::NoopPubSubSender;
1288    use crate::tests::{all_ok, new_test_client};
1289    use crate::{Diagnostics, PersistClient, PersistConfig, ShardId};
1290
1291    use super::*;
1292
1293    // Verifies `Subscribe` can be dropped while holding snapshot batches.
1294    #[mz_persist_proc::test(tokio::test)]
1295    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1296    async fn drop_unused_subscribe(dyncfgs: ConfigUpdates) {
1297        let data = [
1298            (("0".to_owned(), "zero".to_owned()), 0, 1),
1299            (("1".to_owned(), "one".to_owned()), 1, 1),
1300            (("2".to_owned(), "two".to_owned()), 2, 1),
1301        ];
1302
1303        let (mut write, read) = new_test_client(&dyncfgs)
1304            .await
1305            .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1306            .await;
1307
1308        write.expect_compare_and_append(&data[0..1], 0, 1).await;
1309        write.expect_compare_and_append(&data[1..2], 1, 2).await;
1310        write.expect_compare_and_append(&data[2..3], 2, 3).await;
1311
1312        let subscribe = read
1313            .subscribe(timely::progress::Antichain::from_elem(2))
1314            .await
1315            .unwrap();
1316        assert!(
1317            !subscribe.snapshot.as_ref().unwrap().is_empty(),
1318            "snapshot must have batches for test to be meaningful"
1319        );
1320        drop(subscribe);
1321    }
1322
1323    // Verifies that we streaming-consolidate away identical key-values in the same batch.
1324    #[mz_persist_proc::test(tokio::test)]
1325    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1326    async fn streaming_consolidate(dyncfgs: ConfigUpdates) {
1327        let data = &[
1328            // Identical records should sum together...
1329            (("k".to_owned(), "v".to_owned()), 0, 1),
1330            (("k".to_owned(), "v".to_owned()), 1, 1),
1331            (("k".to_owned(), "v".to_owned()), 2, 1),
1332            // ...and when they cancel out entirely they should be omitted.
1333            (("k2".to_owned(), "v".to_owned()), 0, 1),
1334            (("k2".to_owned(), "v".to_owned()), 1, -1),
1335        ];
1336
1337        let (mut write, read) = {
1338            let client = new_test_client(&dyncfgs).await;
1339            client.cfg.set_config(&BLOB_TARGET_SIZE, 1000); // So our batch stays together!
1340            client
1341                .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1342                .await
1343        };
1344
1345        write.expect_compare_and_append(data, 0, 5).await;
1346
1347        let mut snapshot = read
1348            .subscribe(timely::progress::Antichain::from_elem(4))
1349            .await
1350            .unwrap();
1351
1352        let mut updates = vec![];
1353        'outer: loop {
1354            for event in snapshot.fetch_next().await {
1355                match event {
1356                    ListenEvent::Progress(t) => {
1357                        if !t.less_than(&4) {
1358                            break 'outer;
1359                        }
1360                    }
1361                    ListenEvent::Updates(data) => {
1362                        updates.extend(data);
1363                    }
1364                }
1365            }
1366        }
1367        assert_eq!(updates, &[(("k".to_owned(), "v".to_owned()), 4u64, 3i64)],)
1368    }
1369
1370    #[mz_persist_proc::test(tokio::test)]
1371    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1372    async fn snapshot_and_stream(dyncfgs: ConfigUpdates) {
1373        let data = &mut [
1374            (("k1".to_owned(), "v1".to_owned()), 0, 1),
1375            (("k2".to_owned(), "v2".to_owned()), 1, 1),
1376            (("k3".to_owned(), "v3".to_owned()), 2, 1),
1377            (("k4".to_owned(), "v4".to_owned()), 2, 1),
1378            (("k5".to_owned(), "v5".to_owned()), 3, 1),
1379        ];
1380
1381        let (mut write, mut read) = {
1382            let client = new_test_client(&dyncfgs).await;
1383            client.cfg.set_config(&BLOB_TARGET_SIZE, 0); // split batches across multiple parts
1384            client
1385                .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1386                .await
1387        };
1388
1389        write.expect_compare_and_append(&data[0..2], 0, 2).await;
1390        write.expect_compare_and_append(&data[2..4], 2, 3).await;
1391        write.expect_compare_and_append(&data[4..], 3, 4).await;
1392
1393        let as_of = Antichain::from_elem(3);
1394        let mut snapshot = pin::pin!(read.snapshot_and_stream(as_of.clone()).await.unwrap());
1395
1396        let mut snapshot_rows = vec![];
1397        while let Some(((k, v), t, d)) = snapshot.next().await {
1398            snapshot_rows.push(((k, v), t, d));
1399        }
1400
1401        for ((_k, _v), t, _d) in data.as_mut_slice() {
1402            t.advance_by(as_of.borrow());
1403        }
1404
1405        assert_eq!(data.as_slice(), snapshot_rows.as_slice());
1406    }
1407
1408    // Verifies the semantics of `SeqNo` leases + checks dropping `LeasedBatchPart` semantics.
1409    #[mz_persist_proc::test(tokio::test)]
1410    #[cfg_attr(miri, ignore)] // https://github.com/MaterializeInc/database-issues/issues/5964
1411    async fn seqno_leases(dyncfgs: ConfigUpdates) {
1412        let mut data = vec![];
1413        for i in 0..20 {
1414            data.push(((i.to_string(), i.to_string()), i, 1))
1415        }
1416
1417        let shard_id = ShardId::new();
1418
1419        let client = new_test_client(&dyncfgs).await;
1420        let (mut write, read) = client
1421            .expect_open::<String, String, u64, i64>(shard_id)
1422            .await;
1423
1424        // Seed with some values
1425        let mut offset = 0;
1426        let mut width = 2;
1427
1428        for i in offset..offset + width {
1429            write
1430                .expect_compare_and_append(
1431                    &data[i..i + 1],
1432                    u64::cast_from(i),
1433                    u64::cast_from(i) + 1,
1434                )
1435                .await;
1436        }
1437        offset += width;
1438
1439        // Create machinery for subscribe + fetch
1440        let mut fetcher = client
1441            .create_batch_fetcher::<String, String, u64, i64>(
1442                shard_id,
1443                Default::default(),
1444                Default::default(),
1445                false,
1446                Diagnostics::for_tests(),
1447            )
1448            .await
1449            .unwrap();
1450
1451        let mut subscribe = read
1452            .subscribe(timely::progress::Antichain::from_elem(1))
1453            .await
1454            .expect("cannot serve requested as_of");
1455
1456        // Determine sequence number at outset.
1457        let original_seqno_since = subscribe.listen.handle.machine.applier.seqno_since();
1458
1459        let mut parts = vec![];
1460
1461        width = 4;
1462        // Collect parts while continuing to write values
1463        for i in offset..offset + width {
1464            for event in subscribe.next(None).await {
1465                if let ListenEvent::Updates(mut new_parts) = event {
1466                    parts.append(&mut new_parts);
1467                    // Here and elsewhere we "cheat" and immediately downgrade the since
1468                    // to demonstrate the effects of SeqNo leases immediately.
1469                    subscribe
1470                        .listen
1471                        .handle
1472                        .downgrade_since(&subscribe.listen.since)
1473                        .await;
1474                }
1475            }
1476
1477            write
1478                .expect_compare_and_append(
1479                    &data[i..i + 1],
1480                    u64::cast_from(i),
1481                    u64::cast_from(i) + 1,
1482                )
1483                .await;
1484
1485            // SeqNo is not downgraded
1486            assert_eq!(
1487                subscribe.listen.handle.machine.applier.seqno_since(),
1488                original_seqno_since
1489            );
1490        }
1491
1492        offset += width;
1493
1494        let mut seqno_since = subscribe.listen.handle.machine.applier.seqno_since();
1495
1496        // We're starting out with the original, non-downgraded SeqNo
1497        assert_eq!(seqno_since, original_seqno_since);
1498
1499        // We have to handle the parts we generate during the next loop to
1500        // ensure they don't panic.
1501        let mut subsequent_parts = vec![];
1502
1503        // Ensure monotonicity of seqnos we're processing, otherwise the
1504        // invariant we're testing (returning the last part of a seqno will
1505        // downgrade its since) will not hold.
1506        let mut this_seqno = SeqNo::minimum();
1507
1508        // Repeat the same process as above, more or less, while fetching + returning parts
1509        for (mut i, part) in parts.into_iter().enumerate() {
1510            let part_seqno = part.lease.seqno();
1511            let last_seqno = this_seqno;
1512            this_seqno = part_seqno;
1513            assert!(this_seqno >= last_seqno);
1514
1515            let (part, lease) = part.into_exchangeable_part();
1516            let _ = fetcher.fetch_leased_part(part).await;
1517            drop(lease);
1518
1519            // Simulates an exchange
1520            for event in subscribe.next(None).await {
1521                if let ListenEvent::Updates(parts) = event {
1522                    for part in parts {
1523                        let (_, lease) = part.into_exchangeable_part();
1524                        subsequent_parts.push(lease);
1525                    }
1526                }
1527            }
1528
1529            subscribe
1530                .listen
1531                .handle
1532                .downgrade_since(&subscribe.listen.since)
1533                .await;
1534
1535            // Write more new values
1536            i += offset;
1537            write
1538                .expect_compare_and_append(
1539                    &data[i..i + 1],
1540                    u64::cast_from(i),
1541                    u64::cast_from(i) + 1,
1542                )
1543                .await;
1544
1545            // We should expect the SeqNo to be downgraded if this part's SeqNo
1546            // is no longer leased to any other parts, either.
1547            let expect_downgrade = subscribe.listen.handle.outstanding_seqno() > Some(part_seqno);
1548
1549            let new_seqno_since = subscribe.listen.handle.machine.applier.seqno_since();
1550            if expect_downgrade {
1551                assert!(new_seqno_since > seqno_since);
1552            } else {
1553                assert_eq!(new_seqno_since, seqno_since);
1554            }
1555            seqno_since = new_seqno_since;
1556        }
1557
1558        // SeqNo since was downgraded
1559        assert!(seqno_since > original_seqno_since);
1560
1561        // Return any outstanding parts, to prevent a panic!
1562        drop(subsequent_parts);
1563        drop(subscribe);
1564    }
1565
1566    #[mz_ore::test]
1567    fn reader_id_human_readable_serde() {
1568        #[derive(Debug, Serialize, Deserialize)]
1569        struct Container {
1570            reader_id: LeasedReaderId,
1571        }
1572
1573        // roundtrip through json
1574        let id =
1575            LeasedReaderId::from_str("r00000000-1234-5678-0000-000000000000").expect("valid id");
1576        assert_eq!(
1577            id,
1578            serde_json::from_value(serde_json::to_value(id.clone()).expect("serializable"))
1579                .expect("deserializable")
1580        );
1581
1582        // deserialize a serialized string directly
1583        assert_eq!(
1584            id,
1585            serde_json::from_str("\"r00000000-1234-5678-0000-000000000000\"")
1586                .expect("deserializable")
1587        );
1588
1589        // roundtrip id through a container type
1590        let json = json!({ "reader_id": id });
1591        assert_eq!(
1592            "{\"reader_id\":\"r00000000-1234-5678-0000-000000000000\"}",
1593            &json.to_string()
1594        );
1595        let container: Container = serde_json::from_value(json).expect("deserializable");
1596        assert_eq!(container.reader_id, id);
1597    }
1598
1599    // Verifies performance optimizations where a Listener doesn't fetch the
1600    // latest Consensus state if the one it currently has can serve the next
1601    // request.
1602    #[mz_ore::test(tokio::test)]
1603    #[cfg_attr(miri, ignore)] // too slow
1604    async fn skip_consensus_fetch_optimization() {
1605        let data = vec![
1606            (("0".to_owned(), "zero".to_owned()), 0, 1),
1607            (("1".to_owned(), "one".to_owned()), 1, 1),
1608            (("2".to_owned(), "two".to_owned()), 2, 1),
1609        ];
1610
1611        let cfg = PersistConfig::new_for_tests();
1612        let blob = Arc::new(MemBlob::open(MemBlobConfig::default()));
1613        let consensus = Arc::new(MemConsensus::default());
1614        let unreliable = UnreliableHandle::default();
1615        unreliable.totally_available();
1616        let consensus = Arc::new(UnreliableConsensus::new(consensus, unreliable.clone()));
1617        let metrics = Arc::new(Metrics::new(&cfg, &MetricsRegistry::new()));
1618        let pubsub_sender = Arc::new(NoopPubSubSender);
1619        let (mut write, mut read) = PersistClient::new(
1620            cfg,
1621            blob,
1622            consensus,
1623            metrics,
1624            Arc::new(IsolatedRuntime::new_for_tests()),
1625            Arc::new(StateCache::new_no_metrics()),
1626            pubsub_sender,
1627        )
1628        .expect("client construction failed")
1629        .expect_open::<String, String, u64, i64>(ShardId::new())
1630        .await;
1631
1632        write.expect_compare_and_append(&data[0..1], 0, 1).await;
1633        write.expect_compare_and_append(&data[1..2], 1, 2).await;
1634        write.expect_compare_and_append(&data[2..3], 2, 3).await;
1635
1636        let snapshot = read.expect_snapshot_and_fetch(2).await;
1637        let mut listen = read.expect_listen(0).await;
1638
1639        // Manually advance the listener's machine so that it has the latest
1640        // state by fetching the first events from next. This is awkward but
1641        // only necessary because we're about to do some weird things with
1642        // unreliable.
1643        let listen_actual = listen.fetch_next().await;
1644        let expected_events = vec![ListenEvent::Progress(Antichain::from_elem(1))];
1645        assert_eq!(listen_actual, expected_events);
1646
1647        // At this point, the snapshot and listen's state should have all the
1648        // writes. Test this by making consensus completely unavailable.
1649        unreliable.totally_unavailable();
1650        assert_eq!(snapshot, all_ok(&data, 2));
1651        assert_eq!(
1652            listen.read_until(&3).await,
1653            (all_ok(&data[1..], 1), Antichain::from_elem(3))
1654        );
1655    }
1656}