mz_persist_client/
read.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Read capabilities and handles
11
12use async_stream::stream;
13use std::backtrace::Backtrace;
14use std::collections::BTreeMap;
15use std::fmt::Debug;
16use std::future::Future;
17use std::sync::Arc;
18use std::time::Duration;
19
20use differential_dataflow::consolidation::consolidate_updates;
21use differential_dataflow::difference::Semigroup;
22use differential_dataflow::lattice::Lattice;
23use differential_dataflow::trace::Description;
24use futures::Stream;
25use futures_util::{StreamExt, stream};
26use mz_dyncfg::Config;
27use mz_ore::instrument;
28use mz_ore::now::EpochMillis;
29use mz_ore::task::{AbortOnDropHandle, JoinHandle, RuntimeExt};
30use mz_persist::location::{Blob, SeqNo};
31use mz_persist_types::columnar::{ColumnDecoder, Schema};
32use mz_persist_types::{Codec, Codec64};
33use proptest_derive::Arbitrary;
34use serde::{Deserialize, Serialize};
35use timely::PartialOrder;
36use timely::progress::{Antichain, Timestamp};
37use tokio::runtime::Handle;
38use tracing::{Instrument, debug_span, warn};
39use uuid::Uuid;
40
41use crate::batch::BLOB_TARGET_SIZE;
42use crate::cfg::{COMPACTION_MEMORY_BOUND_BYTES, RetryParameters};
43use crate::fetch::{FetchBatchFilter, FetchedPart, Lease, LeasedBatchPart, fetch_leased_part};
44use crate::internal::encoding::Schemas;
45use crate::internal::machine::{ExpireFn, Machine};
46use crate::internal::metrics::{Metrics, ReadMetrics, ShardMetrics};
47use crate::internal::state::{BatchPart, HollowBatch};
48use crate::internal::watch::StateWatch;
49use crate::iter::{Consolidator, StructuredSort};
50use crate::schema::SchemaCache;
51use crate::stats::{SnapshotPartStats, SnapshotPartsStats, SnapshotStats};
52use crate::{GarbageCollector, PersistConfig, ShardId, parse_id};
53
54pub use crate::internal::encoding::LazyPartStats;
55pub use crate::internal::state::Since;
56
57/// An opaque identifier for a reader of a persist durable TVC (aka shard).
58#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
59#[serde(try_from = "String", into = "String")]
60pub struct LeasedReaderId(pub(crate) [u8; 16]);
61
62impl std::fmt::Display for LeasedReaderId {
63    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64        write!(f, "r{}", Uuid::from_bytes(self.0))
65    }
66}
67
68impl std::fmt::Debug for LeasedReaderId {
69    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70        write!(f, "LeasedReaderId({})", Uuid::from_bytes(self.0))
71    }
72}
73
74impl std::str::FromStr for LeasedReaderId {
75    type Err = String;
76
77    fn from_str(s: &str) -> Result<Self, Self::Err> {
78        parse_id('r', "LeasedReaderId", s).map(LeasedReaderId)
79    }
80}
81
82impl From<LeasedReaderId> for String {
83    fn from(reader_id: LeasedReaderId) -> Self {
84        reader_id.to_string()
85    }
86}
87
88impl TryFrom<String> for LeasedReaderId {
89    type Error = String;
90
91    fn try_from(s: String) -> Result<Self, Self::Error> {
92        s.parse()
93    }
94}
95
96impl LeasedReaderId {
97    pub(crate) fn new() -> Self {
98        LeasedReaderId(*Uuid::new_v4().as_bytes())
99    }
100}
101
102/// Capable of generating a snapshot of all data at `as_of`, followed by a
103/// listen of all updates.
104///
105/// For more details, see [`ReadHandle::snapshot`] and [`Listen`].
106#[derive(Debug)]
107pub struct Subscribe<K: Codec, V: Codec, T, D> {
108    snapshot: Option<Vec<LeasedBatchPart<T>>>,
109    listen: Listen<K, V, T, D>,
110}
111
112impl<K, V, T, D> Subscribe<K, V, T, D>
113where
114    K: Debug + Codec,
115    V: Debug + Codec,
116    T: Timestamp + Lattice + Codec64 + Sync,
117    D: Semigroup + Codec64 + Send + Sync,
118{
119    fn new(snapshot_parts: Vec<LeasedBatchPart<T>>, listen: Listen<K, V, T, D>) -> Self {
120        Subscribe {
121            snapshot: Some(snapshot_parts),
122            listen,
123        }
124    }
125
126    /// Returns a `LeasedBatchPart` enriched with the proper metadata.
127    ///
128    /// First returns snapshot parts, until they're exhausted, at which point
129    /// begins returning listen parts.
130    ///
131    /// The returned `Antichain` represents the subscription progress as it will
132    /// be _after_ the returned parts are fetched.
133    #[instrument(level = "debug", fields(shard = %self.listen.handle.machine.shard_id()))]
134    pub async fn next(
135        &mut self,
136        // If Some, an override for the default listen sleep retry parameters.
137        listen_retry: Option<RetryParameters>,
138    ) -> Vec<ListenEvent<T, LeasedBatchPart<T>>> {
139        match self.snapshot.take() {
140            Some(parts) => vec![ListenEvent::Updates(parts)],
141            None => {
142                let (parts, upper) = self.listen.next(listen_retry).await;
143                vec![ListenEvent::Updates(parts), ListenEvent::Progress(upper)]
144            }
145        }
146    }
147}
148
149impl<K, V, T, D> Subscribe<K, V, T, D>
150where
151    K: Debug + Codec,
152    V: Debug + Codec,
153    T: Timestamp + Lattice + Codec64 + Sync,
154    D: Semigroup + Codec64 + Send + Sync,
155{
156    /// Equivalent to `next`, but rather than returning a [`LeasedBatchPart`],
157    /// fetches and returns the data from within it.
158    #[instrument(level = "debug", fields(shard = %self.listen.handle.machine.shard_id()))]
159    pub async fn fetch_next(
160        &mut self,
161    ) -> Vec<ListenEvent<T, ((Result<K, String>, Result<V, String>), T, D)>> {
162        let events = self.next(None).await;
163        let new_len = events
164            .iter()
165            .map(|event| match event {
166                ListenEvent::Updates(parts) => parts.len(),
167                ListenEvent::Progress(_) => 1,
168            })
169            .sum();
170        let mut ret = Vec::with_capacity(new_len);
171        for event in events {
172            match event {
173                ListenEvent::Updates(parts) => {
174                    for part in parts {
175                        let fetched_part = self.listen.fetch_batch_part(part).await;
176                        let updates = fetched_part.collect::<Vec<_>>();
177                        if !updates.is_empty() {
178                            ret.push(ListenEvent::Updates(updates));
179                        }
180                    }
181                }
182                ListenEvent::Progress(progress) => ret.push(ListenEvent::Progress(progress)),
183            }
184        }
185        ret
186    }
187
188    /// Fetches the contents of `part` and returns its lease.
189    pub async fn fetch_batch_part(&mut self, part: LeasedBatchPart<T>) -> FetchedPart<K, V, T, D> {
190        self.listen.fetch_batch_part(part).await
191    }
192}
193
194impl<K, V, T, D> Subscribe<K, V, T, D>
195where
196    K: Debug + Codec,
197    V: Debug + Codec,
198    T: Timestamp + Lattice + Codec64 + Sync,
199    D: Semigroup + Codec64 + Send + Sync,
200{
201    /// Politely expires this subscribe, releasing its lease.
202    ///
203    /// There is a best-effort impl in Drop for [`ReadHandle`] to expire the
204    /// [`ReadHandle`] held by the subscribe that wasn't explicitly expired
205    /// with this method. When possible, explicit expiry is still preferred
206    /// because the Drop one is best effort and is dependant on a tokio
207    /// [Handle] being available in the TLC at the time of drop (which is a bit
208    /// subtle). Also, explicit expiry allows for control over when it happens.
209    pub async fn expire(mut self) {
210        let _ = self.snapshot.take(); // Drop all leased parts.
211        self.listen.expire().await;
212    }
213}
214
215/// Data and progress events of a shard subscription.
216///
217/// TODO: Unify this with [timely::dataflow::operators::capture::event::Event].
218#[derive(Debug, PartialEq)]
219pub enum ListenEvent<T, D> {
220    /// Progress of the shard.
221    Progress(Antichain<T>),
222    /// Data of the shard.
223    Updates(Vec<D>),
224}
225
226/// An ongoing subscription of updates to a shard.
227#[derive(Debug)]
228pub struct Listen<K: Codec, V: Codec, T, D> {
229    handle: ReadHandle<K, V, T, D>,
230    watch: StateWatch<K, V, T, D>,
231
232    as_of: Antichain<T>,
233    since: Antichain<T>,
234    frontier: Antichain<T>,
235}
236
237impl<K, V, T, D> Listen<K, V, T, D>
238where
239    K: Debug + Codec,
240    V: Debug + Codec,
241    T: Timestamp + Lattice + Codec64 + Sync,
242    D: Semigroup + Codec64 + Send + Sync,
243{
244    async fn new(mut handle: ReadHandle<K, V, T, D>, as_of: Antichain<T>) -> Self {
245        let since = as_of.clone();
246        // This listen only needs to distinguish things after its frontier
247        // (initially as_of although the frontier is inclusive and the as_of
248        // isn't). Be a good citizen and downgrade early.
249        handle.downgrade_since(&since).await;
250
251        let watch = handle.machine.applier.watch();
252        Listen {
253            handle,
254            watch,
255            since,
256            frontier: as_of.clone(),
257            as_of,
258        }
259    }
260
261    /// An exclusive upper bound on the progress of this Listen.
262    pub fn frontier(&self) -> &Antichain<T> {
263        &self.frontier
264    }
265
266    /// Attempt to pull out the next values of this subscription.
267    ///
268    /// The returned [`LeasedBatchPart`] is appropriate to use with
269    /// `crate::fetch::fetch_leased_part`.
270    ///
271    /// The returned `Antichain` represents the subscription progress as it will
272    /// be _after_ the returned parts are fetched.
273    pub async fn next(
274        &mut self,
275        // If Some, an override for the default listen sleep retry parameters.
276        retry: Option<RetryParameters>,
277    ) -> (Vec<LeasedBatchPart<T>>, Antichain<T>) {
278        let batch = self
279            .handle
280            .machine
281            .next_listen_batch(
282                &self.frontier,
283                &mut self.watch,
284                Some(&self.handle.reader_id),
285                retry,
286            )
287            .await;
288
289        // A lot of things across mz have to line up to hold the following
290        // invariant and violations only show up as subtle correctness errors,
291        // so explictly validate it here. Better to panic and roll back a
292        // release than be incorrect (also potentially corrupting a sink).
293        //
294        // Note that the since check is intentionally less_than, not less_equal.
295        // If a batch's since is X, that means we can no longer distinguish X
296        // (beyond self.frontier) from X-1 (not beyond self.frontier) to keep
297        // former and filter out the latter.
298        assert!(
299            PartialOrder::less_than(batch.desc.since(), &self.frontier)
300                // Special case when the frontier == the as_of (i.e. the first
301                // time this is called on a new Listen). Because as_of is
302                // _exclusive_, we don't need to be able to distinguish X from
303                // X-1.
304                || (self.frontier == self.as_of
305                    && PartialOrder::less_equal(batch.desc.since(), &self.frontier)),
306            "Listen on {} received a batch {:?} advanced past the listen frontier {:?}",
307            self.handle.machine.shard_id(),
308            batch.desc,
309            self.frontier
310        );
311
312        let new_frontier = batch.desc.upper().clone();
313
314        // We will have a new frontier, so this is an opportunity to downgrade our
315        // since capability. Go through `maybe_heartbeat` so we can rate limit
316        // this along with our heartbeats.
317        //
318        // HACK! Everything would be simpler if we could downgrade since to the
319        // new frontier, but we can't. The next call needs to be able to
320        // distinguish between the times T at the frontier (to emit updates with
321        // these times) and T-1 (to filter them). Advancing the since to
322        // frontier would erase the ability to distinguish between them. Ideally
323        // we'd use what is conceptually "batch.upper - 1" (the greatest
324        // elements that are still strictly less than batch.upper, which will be
325        // the new value of self.frontier after this call returns), but the
326        // trait bounds on T don't give us a way to compute that directly.
327        // Instead, we sniff out any elements in self.frontier (the upper of the
328        // batch the last time we called this) that are strictly less_than the
329        // batch upper to compute a new since. For totally ordered times
330        // (currently always the case in mz) self.frontier will always have a
331        // single element and it will be less_than upper, but the following
332        // logic is (hopefully) correct for partially order times as well. We
333        // could also abuse the fact that every time we actually emit is
334        // guaranteed by definition to be less_than upper to be a bit more
335        // prompt, but this would involve a lot more temporary antichains and
336        // it's unclear if that's worth it.
337        for x in self.frontier.elements().iter() {
338            let less_than_upper = batch.desc.upper().elements().iter().any(|u| x.less_than(u));
339            if less_than_upper {
340                self.since.join_assign(&Antichain::from_elem(x.clone()));
341            }
342        }
343
344        // IMPORTANT! Make sure this `lease_batch_parts` stays before the
345        // `maybe_downgrade_since` call. Otherwise, we might give up our
346        // capability on the batch's SeqNo before we lease it, which could lead
347        // to blobs that it references being GC'd.
348        let filter = FetchBatchFilter::Listen {
349            as_of: self.as_of.clone(),
350            lower: self.frontier.clone(),
351        };
352        let parts = self.handle.lease_batch_parts(batch, filter).collect().await;
353
354        self.handle.maybe_downgrade_since(&self.since).await;
355
356        // NB: Keep this after we use self.frontier to join_assign self.since
357        // and also after we construct metadata.
358        self.frontier = new_frontier;
359
360        (parts, self.frontier.clone())
361    }
362}
363
364impl<K, V, T, D> Listen<K, V, T, D>
365where
366    K: Debug + Codec,
367    V: Debug + Codec,
368    T: Timestamp + Lattice + Codec64 + Sync,
369    D: Semigroup + Codec64 + Send + Sync,
370{
371    /// Attempt to pull out the next values of this subscription.
372    ///
373    /// The updates received in [ListenEvent::Updates] should be assumed to be in arbitrary order
374    /// and not necessarily consolidated. However, the timestamp of each individual update will be
375    /// greater than or equal to the last received [ListenEvent::Progress] frontier (or this
376    /// [Listen]'s initial `as_of` frontier if no progress event has been emitted yet) and less
377    /// than the next [ListenEvent::Progress] frontier.
378    ///
379    /// If you have a use for consolidated listen output, given that snapshots can't be
380    /// consolidated, come talk to us!
381    #[instrument(level = "debug", name = "listen::next", fields(shard = %self.handle.machine.shard_id()))]
382    pub async fn fetch_next(
383        &mut self,
384    ) -> Vec<ListenEvent<T, ((Result<K, String>, Result<V, String>), T, D)>> {
385        let (parts, progress) = self.next(None).await;
386        let mut ret = Vec::with_capacity(parts.len() + 1);
387        for part in parts {
388            let fetched_part = self.fetch_batch_part(part).await;
389            let updates = fetched_part.collect::<Vec<_>>();
390            if !updates.is_empty() {
391                ret.push(ListenEvent::Updates(updates));
392            }
393        }
394        ret.push(ListenEvent::Progress(progress));
395        ret
396    }
397
398    /// Convert listener into futures::Stream
399    pub fn into_stream(
400        mut self,
401    ) -> impl Stream<Item = ListenEvent<T, ((Result<K, String>, Result<V, String>), T, D)>> {
402        async_stream::stream!({
403            loop {
404                for msg in self.fetch_next().await {
405                    yield msg;
406                }
407            }
408        })
409    }
410
411    /// Test helper to read from the listener until the given frontier is
412    /// reached. Because compaction can arbitrarily combine batches, we only
413    /// return the final progress info.
414    #[cfg(test)]
415    #[track_caller]
416    pub async fn read_until(
417        &mut self,
418        ts: &T,
419    ) -> (
420        Vec<((Result<K, String>, Result<V, String>), T, D)>,
421        Antichain<T>,
422    ) {
423        let mut updates = Vec::new();
424        let mut frontier = Antichain::from_elem(T::minimum());
425        while self.frontier.less_than(ts) {
426            for event in self.fetch_next().await {
427                match event {
428                    ListenEvent::Updates(mut x) => updates.append(&mut x),
429                    ListenEvent::Progress(x) => frontier = x,
430                }
431            }
432        }
433        // Unlike most tests, intentionally don't consolidate updates here
434        // because Listen replays them at the original fidelity.
435        (updates, frontier)
436    }
437}
438
439impl<K, V, T, D> Listen<K, V, T, D>
440where
441    K: Debug + Codec,
442    V: Debug + Codec,
443    T: Timestamp + Lattice + Codec64 + Sync,
444    D: Semigroup + Codec64 + Send + Sync,
445{
446    /// Fetches the contents of `part` and returns its lease.
447    ///
448    /// This is broken out into its own function to provide a trivial means for
449    /// [`Subscribe`], which contains a [`Listen`], to fetch batches.
450    async fn fetch_batch_part(&mut self, part: LeasedBatchPart<T>) -> FetchedPart<K, V, T, D> {
451        let fetched_part = fetch_leased_part(
452            &self.handle.cfg,
453            &part,
454            self.handle.blob.as_ref(),
455            Arc::clone(&self.handle.metrics),
456            &self.handle.metrics.read.listen,
457            &self.handle.machine.applier.shard_metrics,
458            &self.handle.reader_id,
459            self.handle.read_schemas.clone(),
460            &mut self.handle.schema_cache,
461        )
462        .await;
463        fetched_part
464    }
465
466    /// Politely expires this listen, releasing its lease.
467    ///
468    /// There is a best-effort impl in Drop for [`ReadHandle`] to expire the
469    /// [`ReadHandle`] held by the listen that wasn't explicitly expired with
470    /// this method. When possible, explicit expiry is still preferred because
471    /// the Drop one is best effort and is dependant on a tokio [Handle] being
472    /// available in the TLC at the time of drop (which is a bit subtle). Also,
473    /// explicit expiry allows for control over when it happens.
474    pub async fn expire(self) {
475        self.handle.expire().await
476    }
477}
478
479/// A "capability" granting the ability to read the state of some shard at times
480/// greater or equal to `self.since()`.
481///
482/// Production users should call [Self::expire] before dropping a ReadHandle so
483/// that it can expire its leases. If/when rust gets AsyncDrop, this will be
484/// done automatically.
485///
486/// All async methods on ReadHandle retry for as long as they are able, but the
487/// returned [std::future::Future]s implement "cancel on drop" semantics. This
488/// means that callers can add a timeout using [tokio::time::timeout] or
489/// [tokio::time::timeout_at].
490///
491/// ```rust,no_run
492/// # let mut read: mz_persist_client::read::ReadHandle<String, String, u64, i64> = unimplemented!();
493/// # let timeout: std::time::Duration = unimplemented!();
494/// # let new_since: timely::progress::Antichain<u64> = unimplemented!();
495/// # async {
496/// tokio::time::timeout(timeout, read.downgrade_since(&new_since)).await
497/// # };
498/// ```
499#[derive(Debug)]
500pub struct ReadHandle<K: Codec, V: Codec, T, D> {
501    pub(crate) cfg: PersistConfig,
502    pub(crate) metrics: Arc<Metrics>,
503    pub(crate) machine: Machine<K, V, T, D>,
504    pub(crate) gc: GarbageCollector<K, V, T, D>,
505    pub(crate) blob: Arc<dyn Blob>,
506    pub(crate) reader_id: LeasedReaderId,
507    pub(crate) read_schemas: Schemas<K, V>,
508    pub(crate) schema_cache: SchemaCache<K, V, T, D>,
509
510    since: Antichain<T>,
511    pub(crate) last_heartbeat: EpochMillis,
512    pub(crate) leased_seqnos: BTreeMap<SeqNo, Lease>,
513    pub(crate) unexpired_state: Option<UnexpiredReadHandleState>,
514}
515
516/// Length of time after a reader's last operation after which the reader may be
517/// expired.
518pub(crate) const READER_LEASE_DURATION: Config<Duration> = Config::new(
519    "persist_reader_lease_duration",
520    Duration::from_secs(60 * 15),
521    "The time after which we'll clean up stale read leases",
522);
523
524impl<K, V, T, D> ReadHandle<K, V, T, D>
525where
526    K: Debug + Codec,
527    V: Debug + Codec,
528    T: Timestamp + Lattice + Codec64 + Sync,
529    D: Semigroup + Codec64 + Send + Sync,
530{
531    pub(crate) async fn new(
532        cfg: PersistConfig,
533        metrics: Arc<Metrics>,
534        machine: Machine<K, V, T, D>,
535        gc: GarbageCollector<K, V, T, D>,
536        blob: Arc<dyn Blob>,
537        reader_id: LeasedReaderId,
538        read_schemas: Schemas<K, V>,
539        since: Antichain<T>,
540        last_heartbeat: EpochMillis,
541    ) -> Self {
542        let schema_cache = machine.applier.schema_cache();
543        let expire_fn = Self::expire_fn(machine.clone(), gc.clone(), reader_id.clone());
544        ReadHandle {
545            cfg,
546            metrics: Arc::clone(&metrics),
547            machine: machine.clone(),
548            gc: gc.clone(),
549            blob,
550            reader_id: reader_id.clone(),
551            read_schemas,
552            schema_cache,
553            since,
554            last_heartbeat,
555            leased_seqnos: BTreeMap::new(),
556            unexpired_state: Some(UnexpiredReadHandleState {
557                expire_fn,
558                _heartbeat_tasks: machine
559                    .start_reader_heartbeat_tasks(reader_id, gc)
560                    .await
561                    .into_iter()
562                    .map(JoinHandle::abort_on_drop)
563                    .collect(),
564            }),
565        }
566    }
567
568    /// This handle's shard id.
569    pub fn shard_id(&self) -> ShardId {
570        self.machine.shard_id()
571    }
572
573    /// This handle's `since` frontier.
574    ///
575    /// This will always be greater or equal to the shard-global `since`.
576    pub fn since(&self) -> &Antichain<T> {
577        &self.since
578    }
579
580    fn outstanding_seqno(&mut self) -> Option<SeqNo> {
581        while let Some(first) = self.leased_seqnos.first_entry() {
582            if first.get().count() <= 1 {
583                first.remove();
584            } else {
585                return Some(*first.key());
586            }
587        }
588        None
589    }
590
591    /// Forwards the since frontier of this handle, giving up the ability to
592    /// read at times not greater or equal to `new_since`.
593    ///
594    /// This may trigger (asynchronous) compaction and consolidation in the
595    /// system. A `new_since` of the empty antichain "finishes" this shard,
596    /// promising that no more data will ever be read by this handle.
597    ///
598    /// This also acts as a heartbeat for the reader lease (including if called
599    /// with `new_since` equal to something like `self.since()` or the minimum
600    /// timestamp, making the call a no-op).
601    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
602    pub async fn downgrade_since(&mut self, new_since: &Antichain<T>) {
603        // Guaranteed to be the smallest/oldest outstanding lease on a `SeqNo`.
604        let outstanding_seqno = self.outstanding_seqno();
605
606        let heartbeat_ts = (self.cfg.now)();
607        let (_seqno, current_reader_since, maintenance) = self
608            .machine
609            .downgrade_since(&self.reader_id, outstanding_seqno, new_since, heartbeat_ts)
610            .await;
611
612        // Debugging for database-issues#4590.
613        if let Some(outstanding_seqno) = outstanding_seqno {
614            let seqnos_held = _seqno.0.saturating_sub(outstanding_seqno.0);
615            // We get just over 1 seqno-per-second on average for a shard in
616            // prod, so this is about an hour.
617            const SEQNOS_HELD_THRESHOLD: u64 = 60 * 60;
618            if seqnos_held >= SEQNOS_HELD_THRESHOLD {
619                tracing::info!(
620                    "{} reader {} holding an unexpected number of seqnos {} vs {}: {:?}. bt: {:?}",
621                    self.machine.shard_id(),
622                    self.reader_id,
623                    outstanding_seqno,
624                    _seqno,
625                    self.leased_seqnos.keys().take(10).collect::<Vec<_>>(),
626                    // The Debug impl of backtrace is less aesthetic, but will put the trace
627                    // on a single line and play more nicely with our Honeycomb quota
628                    Backtrace::force_capture(),
629                );
630            }
631        }
632
633        self.since = current_reader_since.0;
634        // A heartbeat is just any downgrade_since traffic, so update the
635        // internal rate limiter here to play nicely with `maybe_heartbeat`.
636        self.last_heartbeat = heartbeat_ts;
637        maintenance.start_performing(&self.machine, &self.gc);
638    }
639
640    /// Returns an ongoing subscription of updates to a shard.
641    ///
642    /// The stream includes all data at times greater than `as_of`. Combined
643    /// with [Self::snapshot] it will produce exactly correct results: the
644    /// snapshot is the TVCs contents at `as_of` and all subsequent updates
645    /// occur at exactly their indicated time. The recipient should only
646    /// downgrade their read capability when they are certain they have all data
647    /// through the frontier they would downgrade to.
648    ///
649    /// This takes ownership of the ReadHandle so the Listen can use it to
650    /// [Self::downgrade_since] as it progresses. If you need to keep this
651    /// handle, then [Self::clone] it before calling listen.
652    ///
653    /// The `Since` error indicates that the requested `as_of` cannot be served
654    /// (the caller has out of date information) and includes the smallest
655    /// `as_of` that would have been accepted.
656    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
657    pub async fn listen(self, as_of: Antichain<T>) -> Result<Listen<K, V, T, D>, Since<T>> {
658        let () = self.machine.verify_listen(&as_of)?;
659        Ok(Listen::new(self, as_of).await)
660    }
661
662    /// Returns all of the contents of the shard TVC at `as_of` broken up into
663    /// [`LeasedBatchPart`]es. These parts can be "turned in" via
664    /// `crate::fetch::fetch_batch_part` to receive the data they contain.
665    ///
666    /// This command returns the contents of this shard as of `as_of` once they
667    /// are known. This may "block" (in an async-friendly way) if `as_of` is
668    /// greater or equal to the current `upper` of the shard. The recipient
669    /// should only downgrade their read capability when they are certain they
670    /// have all data through the frontier they would downgrade to.
671    ///
672    /// The `Since` error indicates that the requested `as_of` cannot be served
673    /// (the caller has out of date information) and includes the smallest
674    /// `as_of` that would have been accepted.
675    #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
676    pub async fn snapshot(
677        &mut self,
678        as_of: Antichain<T>,
679    ) -> Result<Vec<LeasedBatchPart<T>>, Since<T>> {
680        let batches = self.machine.snapshot(&as_of).await?;
681
682        if !PartialOrder::less_equal(self.since(), &as_of) {
683            return Err(Since(self.since().clone()));
684        }
685
686        let filter = FetchBatchFilter::Snapshot { as_of };
687        let mut leased_parts = Vec::new();
688        for batch in batches {
689            // Flatten the HollowBatch into one LeasedBatchPart per key. Each key
690            // corresponds to a "part" or s3 object. This allows persist_source
691            // to distribute work by parts (smallish, more even size) instead of
692            // batches (arbitrarily large).
693            leased_parts.extend(
694                self.lease_batch_parts(batch, filter.clone())
695                    .collect::<Vec<_>>()
696                    .await,
697            );
698        }
699        Ok(leased_parts)
700    }
701
702    /// Returns a snapshot of all of a shard's data using `as_of`, followed by
703    /// listening to any future updates.
704    ///
705    /// For more details on this operation's semantics, see [Self::snapshot] and
706    /// [Self::listen].
707    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
708    pub async fn subscribe(
709        mut self,
710        as_of: Antichain<T>,
711    ) -> Result<Subscribe<K, V, T, D>, Since<T>> {
712        let snapshot_parts = self.snapshot(as_of.clone()).await?;
713        let listen = self.listen(as_of.clone()).await?;
714        Ok(Subscribe::new(snapshot_parts, listen))
715    }
716
717    fn lease_batch_part(
718        &mut self,
719        desc: Description<T>,
720        part: BatchPart<T>,
721        filter: FetchBatchFilter<T>,
722    ) -> LeasedBatchPart<T> {
723        LeasedBatchPart {
724            metrics: Arc::clone(&self.metrics),
725            shard_id: self.machine.shard_id(),
726            filter,
727            desc,
728            part,
729            lease: self.lease_seqno(),
730            filter_pushdown_audit: false,
731        }
732    }
733
734    fn lease_batch_parts(
735        &mut self,
736        batch: HollowBatch<T>,
737        filter: FetchBatchFilter<T>,
738    ) -> impl Stream<Item = LeasedBatchPart<T>> + '_ {
739        stream! {
740            let blob = Arc::clone(&self.blob);
741            let metrics = Arc::clone(&self.metrics);
742            let desc = batch.desc.clone();
743            for await part in batch.part_stream(self.shard_id(), &*blob, &*metrics) {
744                yield self.lease_batch_part(desc.clone(), part.expect("leased part").into_owned(), filter.clone())
745            }
746        }
747    }
748
749    /// Tracks that the `ReadHandle`'s machine's current `SeqNo` is being
750    /// "leased out" to a `LeasedBatchPart`, and cannot be garbage
751    /// collected until its lease has been returned.
752    fn lease_seqno(&mut self) -> Lease {
753        let seqno = self.machine.seqno();
754        let lease = self
755            .leased_seqnos
756            .entry(seqno)
757            .or_insert_with(|| Lease::new(seqno));
758        lease.clone()
759    }
760
761    /// Returns an independent [ReadHandle] with a new [LeasedReaderId] but the
762    /// same `since`.
763    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
764    pub async fn clone(&self, purpose: &str) -> Self {
765        let new_reader_id = LeasedReaderId::new();
766        let machine = self.machine.clone();
767        let gc = self.gc.clone();
768        let heartbeat_ts = (self.cfg.now)();
769        let (reader_state, maintenance) = machine
770            .register_leased_reader(
771                &new_reader_id,
772                purpose,
773                READER_LEASE_DURATION.get(&self.cfg),
774                heartbeat_ts,
775                false,
776            )
777            .await;
778        maintenance.start_performing(&machine, &gc);
779        // The point of clone is that you're guaranteed to have the same (or
780        // greater) since capability, verify that.
781        // TODO: better if it's the same since capability exactly.
782        assert!(PartialOrder::less_equal(&reader_state.since, &self.since));
783        let new_reader = ReadHandle::new(
784            self.cfg.clone(),
785            Arc::clone(&self.metrics),
786            machine,
787            gc,
788            Arc::clone(&self.blob),
789            new_reader_id,
790            self.read_schemas.clone(),
791            reader_state.since,
792            heartbeat_ts,
793        )
794        .await;
795        new_reader
796    }
797
798    /// A rate-limited version of [Self::downgrade_since].
799    ///
800    /// This is an internally rate limited helper, designed to allow users to
801    /// call it as frequently as they like. Call this or [Self::downgrade_since],
802    /// on some interval that is "frequent" compared to the read lease duration.
803    pub async fn maybe_downgrade_since(&mut self, new_since: &Antichain<T>) {
804        let min_elapsed = READER_LEASE_DURATION.get(&self.cfg) / 4;
805        let elapsed_since_last_heartbeat =
806            Duration::from_millis((self.cfg.now)().saturating_sub(self.last_heartbeat));
807        if elapsed_since_last_heartbeat >= min_elapsed {
808            self.downgrade_since(new_since).await;
809        }
810    }
811
812    /// Politely expires this reader, releasing its lease.
813    ///
814    /// There is a best-effort impl in Drop to expire a reader that wasn't
815    /// explictly expired with this method. When possible, explicit expiry is
816    /// still preferred because the Drop one is best effort and is dependant on
817    /// a tokio [Handle] being available in the TLC at the time of drop (which
818    /// is a bit subtle). Also, explicit expiry allows for control over when it
819    /// happens.
820    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
821    pub async fn expire(mut self) {
822        // We drop the unexpired state before expiring the reader to ensure the
823        // heartbeat tasks can never observe the expired state. This doesn't
824        // matter for correctness, but avoids confusing log output if the
825        // heartbeat task were to discover that its lease has been expired.
826        let Some(unexpired_state) = self.unexpired_state.take() else {
827            return;
828        };
829        unexpired_state.expire_fn.0().await;
830    }
831
832    fn expire_fn(
833        machine: Machine<K, V, T, D>,
834        gc: GarbageCollector<K, V, T, D>,
835        reader_id: LeasedReaderId,
836    ) -> ExpireFn {
837        ExpireFn(Box::new(move || {
838            Box::pin(async move {
839                let (_, maintenance) = machine.expire_leased_reader(&reader_id).await;
840                maintenance.start_performing(&machine, &gc);
841            })
842        }))
843    }
844
845    /// Test helper for a [Self::listen] call that is expected to succeed.
846    #[cfg(test)]
847    #[track_caller]
848    pub async fn expect_listen(self, as_of: T) -> Listen<K, V, T, D> {
849        self.listen(Antichain::from_elem(as_of))
850            .await
851            .expect("cannot serve requested as_of")
852    }
853}
854
855/// State for a read handle that has not been explicitly expired.
856#[derive(Debug)]
857pub(crate) struct UnexpiredReadHandleState {
858    expire_fn: ExpireFn,
859    pub(crate) _heartbeat_tasks: Vec<AbortOnDropHandle<()>>,
860}
861
862/// An incremental cursor through a particular shard, returned from [ReadHandle::snapshot_cursor].
863///
864/// To read an entire dataset, the
865/// client should call `next` until it returns `None`, which signals all data has been returned...
866/// but it's also free to abandon the instance at any time if it eg. only needs a few entries.
867#[derive(Debug)]
868pub struct Cursor<K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64, L = Lease> {
869    pub(crate) consolidator: CursorConsolidator<K, V, T, D>,
870    pub(crate) _lease: L,
871    pub(crate) read_schemas: Schemas<K, V>,
872}
873
874impl<K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64, L> Cursor<K, V, T, D, L> {
875    /// Extracts and returns the lease from the cursor. Allowing the caller to
876    /// do any necessary cleanup associated with the lease.
877    pub fn into_lease(self: Self) -> L {
878        self._lease
879    }
880}
881
882#[derive(Debug)]
883pub(crate) enum CursorConsolidator<K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64> {
884    Structured {
885        consolidator: Consolidator<T, D, StructuredSort<K, V, T, D>>,
886        max_len: usize,
887        max_bytes: usize,
888    },
889}
890
891impl<K, V, T, D, L> Cursor<K, V, T, D, L>
892where
893    K: Debug + Codec + Ord,
894    V: Debug + Codec + Ord,
895    T: Timestamp + Lattice + Codec64 + Sync,
896    D: Semigroup + Ord + Codec64 + Send + Sync,
897{
898    /// Grab the next batch of consolidated data.
899    pub async fn next(
900        &mut self,
901    ) -> Option<impl Iterator<Item = ((Result<K, String>, Result<V, String>), T, D)> + '_> {
902        match &mut self.consolidator {
903            CursorConsolidator::Structured {
904                consolidator,
905                max_len,
906                max_bytes,
907            } => {
908                let part = consolidator
909                    .next_chunk(*max_len, *max_bytes)
910                    .await
911                    .expect("fetching a leased part")?;
912                let key_decoder = self
913                    .read_schemas
914                    .key
915                    .decoder_any(part.key.as_ref())
916                    .expect("ok");
917                let val_decoder = self
918                    .read_schemas
919                    .val
920                    .decoder_any(part.val.as_ref())
921                    .expect("ok");
922                let iter = (0..part.len()).map(move |i| {
923                    let mut k = K::default();
924                    let mut v = V::default();
925                    key_decoder.decode(i, &mut k);
926                    val_decoder.decode(i, &mut v);
927                    let t = T::decode(part.time.value(i).to_le_bytes());
928                    let d = D::decode(part.diff.value(i).to_le_bytes());
929                    ((Ok(k), Ok(v)), t, d)
930                });
931
932                Some(iter)
933            }
934        }
935    }
936}
937
938impl<K, V, T, D> ReadHandle<K, V, T, D>
939where
940    K: Debug + Codec + Ord,
941    V: Debug + Codec + Ord,
942    T: Timestamp + Lattice + Codec64 + Sync,
943    D: Semigroup + Ord + Codec64 + Send + Sync,
944{
945    /// Generates a [Self::snapshot], and fetches all of the batches it
946    /// contains.
947    ///
948    /// The output is consolidated. Furthermore, to keep memory usage down when
949    /// reading a snapshot that consolidates well, this consolidates as it goes.
950    ///
951    /// Potential future improvements (if necessary):
952    /// - Accept something like a `F: Fn(K,V) -> (K,V)` argument, which looks
953    ///   like an MFP you might be pushing down. Reason being that if you are
954    ///   projecting or transforming in a way that allows further consolidation,
955    ///   amazing.
956    /// - Reuse any code we write to streaming-merge consolidate in
957    ///   persist_source here.
958    pub async fn snapshot_and_fetch(
959        &mut self,
960        as_of: Antichain<T>,
961    ) -> Result<Vec<((Result<K, String>, Result<V, String>), T, D)>, Since<T>> {
962        let mut cursor = self.snapshot_cursor(as_of, |_| true).await?;
963        let mut contents = Vec::new();
964        while let Some(iter) = cursor.next().await {
965            contents.extend(iter);
966        }
967
968        // We don't currently guarantee that encoding is one-to-one, so we still need to
969        // consolidate the decoded outputs. However, let's report if this isn't a noop.
970        let old_len = contents.len();
971        consolidate_updates(&mut contents);
972        if old_len != contents.len() {
973            // TODO(bkirwi): do we need more / finer-grained metrics for this?
974            self.machine
975                .applier
976                .shard_metrics
977                .unconsolidated_snapshot
978                .inc();
979        }
980
981        Ok(contents)
982    }
983
984    /// Generates a [Self::snapshot], and fetches all of the batches it
985    /// contains.
986    ///
987    /// To keep memory usage down when reading a snapshot that consolidates well, this consolidates
988    /// as it goes. However, note that only the serialized data is consolidated: the deserialized
989    /// data will only be consolidated if your K/V codecs are one-to-one.
990    pub async fn snapshot_cursor(
991        &mut self,
992        as_of: Antichain<T>,
993        should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
994    ) -> Result<Cursor<K, V, T, D>, Since<T>> {
995        let batches = self.machine.snapshot(&as_of).await?;
996        let lease = self.lease_seqno();
997
998        Self::read_batches_consolidated(
999            &self.cfg,
1000            Arc::clone(&self.metrics),
1001            Arc::clone(&self.machine.applier.shard_metrics),
1002            self.metrics.read.snapshot.clone(),
1003            Arc::clone(&self.blob),
1004            self.shard_id(),
1005            as_of,
1006            self.read_schemas.clone(),
1007            &batches,
1008            lease,
1009            should_fetch_part,
1010        )
1011    }
1012
1013    pub(crate) fn read_batches_consolidated<L>(
1014        persist_cfg: &PersistConfig,
1015        metrics: Arc<Metrics>,
1016        shard_metrics: Arc<ShardMetrics>,
1017        read_metrics: ReadMetrics,
1018        blob: Arc<dyn Blob>,
1019        shard_id: ShardId,
1020        as_of: Antichain<T>,
1021        schemas: Schemas<K, V>,
1022        batches: &[HollowBatch<T>],
1023        lease: L,
1024        should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
1025    ) -> Result<Cursor<K, V, T, D, L>, Since<T>> {
1026        let context = format!("{}[as_of={:?}]", shard_id, as_of.elements());
1027        let filter = FetchBatchFilter::Snapshot {
1028            as_of: as_of.clone(),
1029        };
1030
1031        let consolidator = {
1032            let mut consolidator = Consolidator::new(
1033                context,
1034                shard_id,
1035                StructuredSort::new(schemas.clone()),
1036                blob,
1037                metrics,
1038                shard_metrics,
1039                read_metrics,
1040                filter,
1041                COMPACTION_MEMORY_BOUND_BYTES.get(persist_cfg),
1042            );
1043            for batch in batches {
1044                for (meta, run) in batch.runs() {
1045                    consolidator.enqueue_run(
1046                        &batch.desc,
1047                        meta,
1048                        run.into_iter()
1049                            .filter(|p| should_fetch_part(p.stats()))
1050                            .cloned(),
1051                    );
1052                }
1053            }
1054            CursorConsolidator::Structured {
1055                consolidator,
1056                // This default may end up consolidating more records than previously
1057                // for cases like fast-path peeks, where only the first few entries are used.
1058                // If this is a noticeable performance impact, thread the max-len in from the caller.
1059                max_len: persist_cfg.compaction_yield_after_n_updates,
1060                max_bytes: BLOB_TARGET_SIZE.get(persist_cfg).max(1),
1061            }
1062        };
1063
1064        Ok(Cursor {
1065            consolidator,
1066            _lease: lease,
1067            read_schemas: schemas,
1068        })
1069    }
1070
1071    /// Returns aggregate statistics about the contents of the shard TVC at the
1072    /// given frontier.
1073    ///
1074    /// This command returns the contents of this shard as of `as_of` once they
1075    /// are known. This may "block" (in an async-friendly way) if `as_of` is
1076    /// greater or equal to the current `upper` of the shard. If `None` is given
1077    /// for `as_of`, then the latest stats known by this process are used.
1078    ///
1079    /// The `Since` error indicates that the requested `as_of` cannot be served
1080    /// (the caller has out of date information) and includes the smallest
1081    /// `as_of` that would have been accepted.
1082    pub fn snapshot_stats(
1083        &self,
1084        as_of: Option<Antichain<T>>,
1085    ) -> impl Future<Output = Result<SnapshotStats, Since<T>>> + Send + 'static {
1086        let machine = self.machine.clone();
1087        async move {
1088            let batches = match as_of {
1089                Some(as_of) => machine.snapshot(&as_of).await?,
1090                None => machine.applier.all_batches(),
1091            };
1092            let num_updates = batches.iter().map(|b| b.len).sum();
1093            Ok(SnapshotStats {
1094                shard_id: machine.shard_id(),
1095                num_updates,
1096            })
1097        }
1098    }
1099
1100    /// Returns aggregate statistics about the contents of the shard TVC at the
1101    /// given frontier.
1102    ///
1103    /// This command returns the contents of this shard as of `as_of` once they
1104    /// are known. This may "block" (in an async-friendly way) if `as_of` is
1105    /// greater or equal to the current `upper` of the shard.
1106    ///
1107    /// The `Since` error indicates that the requested `as_of` cannot be served
1108    /// (the caller has out of date information) and includes the smallest
1109    /// `as_of` that would have been accepted.
1110    pub async fn snapshot_parts_stats(
1111        &self,
1112        as_of: Antichain<T>,
1113    ) -> Result<SnapshotPartsStats, Since<T>> {
1114        let batches = self.machine.snapshot(&as_of).await?;
1115        let parts = stream::iter(&batches)
1116            .flat_map(|b| b.part_stream(self.shard_id(), &*self.blob, &*self.metrics))
1117            .map(|p| {
1118                let p = p.expect("live batch");
1119                SnapshotPartStats {
1120                    encoded_size_bytes: p.encoded_size_bytes(),
1121                    stats: p.stats().cloned(),
1122                }
1123            })
1124            .collect()
1125            .await;
1126        Ok(SnapshotPartsStats {
1127            metrics: Arc::clone(&self.machine.applier.metrics),
1128            shard_id: self.machine.shard_id(),
1129            parts,
1130        })
1131    }
1132}
1133
1134impl<K, V, T, D> ReadHandle<K, V, T, D>
1135where
1136    K: Debug + Codec + Ord,
1137    V: Debug + Codec + Ord,
1138    T: Timestamp + Lattice + Codec64 + Sync,
1139    D: Semigroup + Codec64 + Send + Sync,
1140{
1141    /// Generates a [Self::snapshot], and streams out all of the updates
1142    /// it contains in bounded memory.
1143    ///
1144    /// The output is not consolidated.
1145    pub async fn snapshot_and_stream(
1146        &mut self,
1147        as_of: Antichain<T>,
1148    ) -> Result<
1149        impl Stream<Item = ((Result<K, String>, Result<V, String>), T, D)> + use<K, V, T, D>,
1150        Since<T>,
1151    > {
1152        let snap = self.snapshot(as_of).await?;
1153
1154        let blob = Arc::clone(&self.blob);
1155        let metrics = Arc::clone(&self.metrics);
1156        let snapshot_metrics = self.metrics.read.snapshot.clone();
1157        let shard_metrics = Arc::clone(&self.machine.applier.shard_metrics);
1158        let reader_id = self.reader_id.clone();
1159        let schemas = self.read_schemas.clone();
1160        let mut schema_cache = self.schema_cache.clone();
1161        let persist_cfg = self.cfg.clone();
1162        let stream = async_stream::stream! {
1163            for part in snap {
1164                let mut fetched_part = fetch_leased_part(
1165                    &persist_cfg,
1166                    &part,
1167                    blob.as_ref(),
1168                    Arc::clone(&metrics),
1169                    &snapshot_metrics,
1170                    &shard_metrics,
1171                    &reader_id,
1172                    schemas.clone(),
1173                    &mut schema_cache,
1174                )
1175                .await;
1176
1177                while let Some(next) = fetched_part.next() {
1178                    yield next;
1179                }
1180            }
1181        };
1182
1183        Ok(stream)
1184    }
1185}
1186
1187impl<K, V, T, D> ReadHandle<K, V, T, D>
1188where
1189    K: Debug + Codec + Ord,
1190    V: Debug + Codec + Ord,
1191    T: Timestamp + Lattice + Codec64 + Ord + Sync,
1192    D: Semigroup + Ord + Codec64 + Send + Sync,
1193{
1194    /// Test helper to generate a [Self::snapshot] call that is expected to
1195    /// succeed, process its batches, and then return its data sorted.
1196    #[cfg(test)]
1197    #[track_caller]
1198    pub async fn expect_snapshot_and_fetch(
1199        &mut self,
1200        as_of: T,
1201    ) -> Vec<((Result<K, String>, Result<V, String>), T, D)> {
1202        let mut ret = self
1203            .snapshot_and_fetch(Antichain::from_elem(as_of))
1204            .await
1205            .expect("cannot serve requested as_of");
1206
1207        ret.sort();
1208        ret
1209    }
1210}
1211
1212impl<K: Codec, V: Codec, T, D> Drop for ReadHandle<K, V, T, D> {
1213    fn drop(&mut self) {
1214        // We drop the unexpired state before expiring the reader to ensure the
1215        // heartbeat tasks can never observe the expired state. This doesn't
1216        // matter for correctness, but avoids confusing log output if the
1217        // heartbeat task were to discover that its lease has been expired.
1218        let Some(unexpired_state) = self.unexpired_state.take() else {
1219            return;
1220        };
1221
1222        let handle = match Handle::try_current() {
1223            Ok(x) => x,
1224            Err(_) => {
1225                warn!(
1226                    "ReadHandle {} dropped without being explicitly expired, falling back to lease timeout",
1227                    self.reader_id
1228                );
1229                return;
1230            }
1231        };
1232        // Spawn a best-effort task to expire this read handle. It's fine if
1233        // this doesn't run to completion, we'd just have to wait out the lease
1234        // before the shard-global since is unblocked.
1235        //
1236        // Intentionally create the span outside the task to set the parent.
1237        let expire_span = debug_span!("drop::expire");
1238        handle.spawn_named(
1239            || format!("ReadHandle::expire ({})", self.reader_id),
1240            unexpired_state.expire_fn.0().instrument(expire_span),
1241        );
1242    }
1243}
1244
1245#[cfg(test)]
1246mod tests {
1247    use std::pin;
1248    use std::str::FromStr;
1249
1250    use mz_dyncfg::ConfigUpdates;
1251    use mz_ore::cast::CastFrom;
1252    use mz_ore::metrics::MetricsRegistry;
1253    use mz_persist::mem::{MemBlob, MemBlobConfig, MemConsensus};
1254    use mz_persist::unreliable::{UnreliableConsensus, UnreliableHandle};
1255    use serde::{Deserialize, Serialize};
1256    use serde_json::json;
1257    use tokio_stream::StreamExt;
1258
1259    use crate::async_runtime::IsolatedRuntime;
1260    use crate::batch::BLOB_TARGET_SIZE;
1261    use crate::cache::StateCache;
1262    use crate::internal::metrics::Metrics;
1263    use crate::rpc::NoopPubSubSender;
1264    use crate::tests::{all_ok, new_test_client};
1265    use crate::{Diagnostics, PersistClient, PersistConfig, ShardId};
1266
1267    use super::*;
1268
1269    // Verifies `Subscribe` can be dropped while holding snapshot batches.
1270    #[mz_persist_proc::test(tokio::test)]
1271    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1272    async fn drop_unused_subscribe(dyncfgs: ConfigUpdates) {
1273        let data = [
1274            (("0".to_owned(), "zero".to_owned()), 0, 1),
1275            (("1".to_owned(), "one".to_owned()), 1, 1),
1276            (("2".to_owned(), "two".to_owned()), 2, 1),
1277        ];
1278
1279        let (mut write, read) = new_test_client(&dyncfgs)
1280            .await
1281            .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1282            .await;
1283
1284        write.expect_compare_and_append(&data[0..1], 0, 1).await;
1285        write.expect_compare_and_append(&data[1..2], 1, 2).await;
1286        write.expect_compare_and_append(&data[2..3], 2, 3).await;
1287
1288        let subscribe = read
1289            .subscribe(timely::progress::Antichain::from_elem(2))
1290            .await
1291            .unwrap();
1292        assert!(
1293            !subscribe.snapshot.as_ref().unwrap().is_empty(),
1294            "snapshot must have batches for test to be meaningful"
1295        );
1296        drop(subscribe);
1297    }
1298
1299    // Verifies that we streaming-consolidate away identical key-values in the same batch.
1300    #[mz_persist_proc::test(tokio::test)]
1301    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1302    async fn streaming_consolidate(dyncfgs: ConfigUpdates) {
1303        let data = &[
1304            // Identical records should sum together...
1305            (("k".to_owned(), "v".to_owned()), 0, 1),
1306            (("k".to_owned(), "v".to_owned()), 1, 1),
1307            (("k".to_owned(), "v".to_owned()), 2, 1),
1308            // ...and when they cancel out entirely they should be omitted.
1309            (("k2".to_owned(), "v".to_owned()), 0, 1),
1310            (("k2".to_owned(), "v".to_owned()), 1, -1),
1311        ];
1312
1313        let (mut write, read) = {
1314            let client = new_test_client(&dyncfgs).await;
1315            client.cfg.set_config(&BLOB_TARGET_SIZE, 1000); // So our batch stays together!
1316            client
1317                .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1318                .await
1319        };
1320
1321        write.expect_compare_and_append(data, 0, 5).await;
1322
1323        let mut snapshot = read
1324            .subscribe(timely::progress::Antichain::from_elem(4))
1325            .await
1326            .unwrap();
1327
1328        let mut updates = vec![];
1329        'outer: loop {
1330            for event in snapshot.fetch_next().await {
1331                match event {
1332                    ListenEvent::Progress(t) => {
1333                        if !t.less_than(&4) {
1334                            break 'outer;
1335                        }
1336                    }
1337                    ListenEvent::Updates(data) => {
1338                        updates.extend(data);
1339                    }
1340                }
1341            }
1342        }
1343        assert_eq!(
1344            updates,
1345            &[((Ok("k".to_owned()), Ok("v".to_owned())), 4u64, 3i64)],
1346        )
1347    }
1348
1349    #[mz_persist_proc::test(tokio::test)]
1350    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1351    async fn snapshot_and_stream(dyncfgs: ConfigUpdates) {
1352        let data = &mut [
1353            (("k1".to_owned(), "v1".to_owned()), 0, 1),
1354            (("k2".to_owned(), "v2".to_owned()), 1, 1),
1355            (("k3".to_owned(), "v3".to_owned()), 2, 1),
1356            (("k4".to_owned(), "v4".to_owned()), 2, 1),
1357            (("k5".to_owned(), "v5".to_owned()), 3, 1),
1358        ];
1359
1360        let (mut write, mut read) = {
1361            let client = new_test_client(&dyncfgs).await;
1362            client.cfg.set_config(&BLOB_TARGET_SIZE, 0); // split batches across multiple parts
1363            client
1364                .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1365                .await
1366        };
1367
1368        write.expect_compare_and_append(&data[0..2], 0, 2).await;
1369        write.expect_compare_and_append(&data[2..4], 2, 3).await;
1370        write.expect_compare_and_append(&data[4..], 3, 4).await;
1371
1372        let as_of = Antichain::from_elem(3);
1373        let mut snapshot = pin::pin!(read.snapshot_and_stream(as_of.clone()).await.unwrap());
1374
1375        let mut snapshot_rows = vec![];
1376        while let Some(((k, v), t, d)) = snapshot.next().await {
1377            snapshot_rows.push(((k.expect("valid key"), v.expect("valid key")), t, d));
1378        }
1379
1380        for ((_k, _v), t, _d) in data.as_mut_slice() {
1381            t.advance_by(as_of.borrow());
1382        }
1383
1384        assert_eq!(data.as_slice(), snapshot_rows.as_slice());
1385    }
1386
1387    // Verifies the semantics of `SeqNo` leases + checks dropping `LeasedBatchPart` semantics.
1388    #[mz_persist_proc::test(tokio::test)]
1389    #[cfg_attr(miri, ignore)] // https://github.com/MaterializeInc/database-issues/issues/5964
1390    async fn seqno_leases(dyncfgs: ConfigUpdates) {
1391        let mut data = vec![];
1392        for i in 0..20 {
1393            data.push(((i.to_string(), i.to_string()), i, 1))
1394        }
1395
1396        let shard_id = ShardId::new();
1397
1398        let client = new_test_client(&dyncfgs).await;
1399        let (mut write, read) = client
1400            .expect_open::<String, String, u64, i64>(shard_id)
1401            .await;
1402
1403        // Seed with some values
1404        let mut offset = 0;
1405        let mut width = 2;
1406
1407        for i in offset..offset + width {
1408            write
1409                .expect_compare_and_append(
1410                    &data[i..i + 1],
1411                    u64::cast_from(i),
1412                    u64::cast_from(i) + 1,
1413                )
1414                .await;
1415        }
1416        offset += width;
1417
1418        // Create machinery for subscribe + fetch
1419        let mut fetcher = client
1420            .create_batch_fetcher::<String, String, u64, i64>(
1421                shard_id,
1422                Default::default(),
1423                Default::default(),
1424                false,
1425                Diagnostics::for_tests(),
1426            )
1427            .await
1428            .unwrap();
1429
1430        let mut subscribe = read
1431            .subscribe(timely::progress::Antichain::from_elem(1))
1432            .await
1433            .expect("cannot serve requested as_of");
1434
1435        // Determine sequence number at outset.
1436        let original_seqno_since = subscribe.listen.handle.machine.applier.seqno_since();
1437
1438        let mut parts = vec![];
1439
1440        width = 4;
1441        // Collect parts while continuing to write values
1442        for i in offset..offset + width {
1443            for event in subscribe.next(None).await {
1444                if let ListenEvent::Updates(mut new_parts) = event {
1445                    parts.append(&mut new_parts);
1446                    // Here and elsewhere we "cheat" and immediately downgrade the since
1447                    // to demonstrate the effects of SeqNo leases immediately.
1448                    subscribe
1449                        .listen
1450                        .handle
1451                        .downgrade_since(&subscribe.listen.since)
1452                        .await;
1453                }
1454            }
1455
1456            write
1457                .expect_compare_and_append(
1458                    &data[i..i + 1],
1459                    u64::cast_from(i),
1460                    u64::cast_from(i) + 1,
1461                )
1462                .await;
1463
1464            // SeqNo is not downgraded
1465            assert_eq!(
1466                subscribe.listen.handle.machine.applier.seqno_since(),
1467                original_seqno_since
1468            );
1469        }
1470
1471        offset += width;
1472
1473        let mut seqno_since = subscribe.listen.handle.machine.applier.seqno_since();
1474
1475        // We're starting out with the original, non-downgraded SeqNo
1476        assert_eq!(seqno_since, original_seqno_since);
1477
1478        // We have to handle the parts we generate during the next loop to
1479        // ensure they don't panic.
1480        let mut subsequent_parts = vec![];
1481
1482        // Ensure monotonicity of seqnos we're processing, otherwise the
1483        // invariant we're testing (returning the last part of a seqno will
1484        // downgrade its since) will not hold.
1485        let mut this_seqno = SeqNo::minimum();
1486
1487        // Repeat the same process as above, more or less, while fetching + returning parts
1488        for (mut i, part) in parts.into_iter().enumerate() {
1489            let part_seqno = part.lease.seqno();
1490            let last_seqno = this_seqno;
1491            this_seqno = part_seqno;
1492            assert!(this_seqno >= last_seqno);
1493
1494            let (part, lease) = part.into_exchangeable_part();
1495            let _ = fetcher.fetch_leased_part(part).await;
1496            drop(lease);
1497
1498            // Simulates an exchange
1499            for event in subscribe.next(None).await {
1500                if let ListenEvent::Updates(parts) = event {
1501                    for part in parts {
1502                        let (_, lease) = part.into_exchangeable_part();
1503                        subsequent_parts.push(lease);
1504                    }
1505                }
1506            }
1507
1508            subscribe
1509                .listen
1510                .handle
1511                .downgrade_since(&subscribe.listen.since)
1512                .await;
1513
1514            // Write more new values
1515            i += offset;
1516            write
1517                .expect_compare_and_append(
1518                    &data[i..i + 1],
1519                    u64::cast_from(i),
1520                    u64::cast_from(i) + 1,
1521                )
1522                .await;
1523
1524            // We should expect the SeqNo to be downgraded if this part's SeqNo
1525            // is no longer leased to any other parts, either.
1526            let expect_downgrade = subscribe.listen.handle.outstanding_seqno() > Some(part_seqno);
1527
1528            let new_seqno_since = subscribe.listen.handle.machine.applier.seqno_since();
1529            if expect_downgrade {
1530                assert!(new_seqno_since > seqno_since);
1531            } else {
1532                assert_eq!(new_seqno_since, seqno_since);
1533            }
1534            seqno_since = new_seqno_since;
1535        }
1536
1537        // SeqNo since was downgraded
1538        assert!(seqno_since > original_seqno_since);
1539
1540        // Return any outstanding parts, to prevent a panic!
1541        drop(subsequent_parts);
1542        drop(subscribe);
1543    }
1544
1545    #[mz_ore::test]
1546    fn reader_id_human_readable_serde() {
1547        #[derive(Debug, Serialize, Deserialize)]
1548        struct Container {
1549            reader_id: LeasedReaderId,
1550        }
1551
1552        // roundtrip through json
1553        let id =
1554            LeasedReaderId::from_str("r00000000-1234-5678-0000-000000000000").expect("valid id");
1555        assert_eq!(
1556            id,
1557            serde_json::from_value(serde_json::to_value(id.clone()).expect("serializable"))
1558                .expect("deserializable")
1559        );
1560
1561        // deserialize a serialized string directly
1562        assert_eq!(
1563            id,
1564            serde_json::from_str("\"r00000000-1234-5678-0000-000000000000\"")
1565                .expect("deserializable")
1566        );
1567
1568        // roundtrip id through a container type
1569        let json = json!({ "reader_id": id });
1570        assert_eq!(
1571            "{\"reader_id\":\"r00000000-1234-5678-0000-000000000000\"}",
1572            &json.to_string()
1573        );
1574        let container: Container = serde_json::from_value(json).expect("deserializable");
1575        assert_eq!(container.reader_id, id);
1576    }
1577
1578    // Verifies performance optimizations where a Listener doesn't fetch the
1579    // latest Consensus state if the one it currently has can serve the next
1580    // request.
1581    #[mz_ore::test(tokio::test)]
1582    #[cfg_attr(miri, ignore)] // too slow
1583    async fn skip_consensus_fetch_optimization() {
1584        let data = vec![
1585            (("0".to_owned(), "zero".to_owned()), 0, 1),
1586            (("1".to_owned(), "one".to_owned()), 1, 1),
1587            (("2".to_owned(), "two".to_owned()), 2, 1),
1588        ];
1589
1590        let cfg = PersistConfig::new_for_tests();
1591        let blob = Arc::new(MemBlob::open(MemBlobConfig::default()));
1592        let consensus = Arc::new(MemConsensus::default());
1593        let unreliable = UnreliableHandle::default();
1594        unreliable.totally_available();
1595        let consensus = Arc::new(UnreliableConsensus::new(consensus, unreliable.clone()));
1596        let metrics = Arc::new(Metrics::new(&cfg, &MetricsRegistry::new()));
1597        let pubsub_sender = Arc::new(NoopPubSubSender);
1598        let (mut write, mut read) = PersistClient::new(
1599            cfg,
1600            blob,
1601            consensus,
1602            metrics,
1603            Arc::new(IsolatedRuntime::default()),
1604            Arc::new(StateCache::new_no_metrics()),
1605            pubsub_sender,
1606        )
1607        .expect("client construction failed")
1608        .expect_open::<String, String, u64, i64>(ShardId::new())
1609        .await;
1610
1611        write.expect_compare_and_append(&data[0..1], 0, 1).await;
1612        write.expect_compare_and_append(&data[1..2], 1, 2).await;
1613        write.expect_compare_and_append(&data[2..3], 2, 3).await;
1614
1615        let snapshot = read.expect_snapshot_and_fetch(2).await;
1616        let mut listen = read.expect_listen(0).await;
1617
1618        // Manually advance the listener's machine so that it has the latest
1619        // state by fetching the first events from next. This is awkward but
1620        // only necessary because we're about to do some weird things with
1621        // unreliable.
1622        let listen_actual = listen.fetch_next().await;
1623        let expected_events = vec![ListenEvent::Progress(Antichain::from_elem(1))];
1624        assert_eq!(listen_actual, expected_events);
1625
1626        // At this point, the snapshot and listen's state should have all the
1627        // writes. Test this by making consensus completely unavailable.
1628        unreliable.totally_unavailable();
1629        assert_eq!(snapshot, all_ok(&data, 2));
1630        assert_eq!(
1631            listen.read_until(&3).await,
1632            (all_ok(&data[1..], 1), Antichain::from_elem(3))
1633        );
1634    }
1635}