1use async_stream::stream;
13use std::backtrace::Backtrace;
14use std::collections::BTreeMap;
15use std::fmt::Debug;
16use std::future::Future;
17use std::sync::Arc;
18use std::time::Duration;
19
20use differential_dataflow::consolidation::consolidate_updates;
21use differential_dataflow::difference::Semigroup;
22use differential_dataflow::lattice::Lattice;
23use differential_dataflow::trace::Description;
24use futures::Stream;
25use futures_util::{StreamExt, stream};
26use mz_dyncfg::Config;
27use mz_ore::halt;
28use mz_ore::instrument;
29use mz_ore::now::EpochMillis;
30use mz_ore::task::{AbortOnDropHandle, JoinHandle, RuntimeExt};
31use mz_persist::location::{Blob, SeqNo};
32use mz_persist_types::columnar::{ColumnDecoder, Schema};
33use mz_persist_types::{Codec, Codec64};
34use proptest_derive::Arbitrary;
35use serde::{Deserialize, Serialize};
36use timely::PartialOrder;
37use timely::order::TotalOrder;
38use timely::progress::{Antichain, Timestamp};
39use tokio::runtime::Handle;
40use tracing::{Instrument, debug_span, warn};
41use uuid::Uuid;
42
43use crate::batch::BLOB_TARGET_SIZE;
44use crate::cfg::{COMPACTION_MEMORY_BOUND_BYTES, RetryParameters};
45use crate::fetch::FetchConfig;
46use crate::fetch::{FetchBatchFilter, FetchedPart, Lease, LeasedBatchPart, fetch_leased_part};
47use crate::internal::encoding::Schemas;
48use crate::internal::machine::{ExpireFn, Machine};
49use crate::internal::metrics::{Metrics, ReadMetrics, ShardMetrics};
50use crate::internal::state::{BatchPart, HollowBatch};
51use crate::internal::watch::StateWatch;
52use crate::iter::{Consolidator, StructuredSort};
53use crate::schema::SchemaCache;
54use crate::stats::{SnapshotPartStats, SnapshotPartsStats, SnapshotStats};
55use crate::{GarbageCollector, PersistConfig, ShardId, parse_id};
56
57pub use crate::internal::encoding::LazyPartStats;
58pub use crate::internal::state::Since;
59
60#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
62#[serde(try_from = "String", into = "String")]
63pub struct LeasedReaderId(pub(crate) [u8; 16]);
64
65impl std::fmt::Display for LeasedReaderId {
66 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67 write!(f, "r{}", Uuid::from_bytes(self.0))
68 }
69}
70
71impl std::fmt::Debug for LeasedReaderId {
72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 write!(f, "LeasedReaderId({})", Uuid::from_bytes(self.0))
74 }
75}
76
77impl std::str::FromStr for LeasedReaderId {
78 type Err = String;
79
80 fn from_str(s: &str) -> Result<Self, Self::Err> {
81 parse_id("r", "LeasedReaderId", s).map(LeasedReaderId)
82 }
83}
84
85impl From<LeasedReaderId> for String {
86 fn from(reader_id: LeasedReaderId) -> Self {
87 reader_id.to_string()
88 }
89}
90
91impl TryFrom<String> for LeasedReaderId {
92 type Error = String;
93
94 fn try_from(s: String) -> Result<Self, Self::Error> {
95 s.parse()
96 }
97}
98
99impl LeasedReaderId {
100 pub(crate) fn new() -> Self {
101 LeasedReaderId(*Uuid::new_v4().as_bytes())
102 }
103}
104
105#[derive(Debug)]
110pub struct Subscribe<K: Codec, V: Codec, T, D> {
111 snapshot: Option<Vec<LeasedBatchPart<T>>>,
112 listen: Listen<K, V, T, D>,
113}
114
115impl<K, V, T, D> Subscribe<K, V, T, D>
116where
117 K: Debug + Codec,
118 V: Debug + Codec,
119 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
120 D: Semigroup + Codec64 + Send + Sync,
121{
122 fn new(snapshot_parts: Vec<LeasedBatchPart<T>>, listen: Listen<K, V, T, D>) -> Self {
123 Subscribe {
124 snapshot: Some(snapshot_parts),
125 listen,
126 }
127 }
128
129 #[instrument(level = "debug", fields(shard = %self.listen.handle.machine.shard_id()))]
137 pub async fn next(
138 &mut self,
139 listen_retry: Option<RetryParameters>,
141 ) -> Vec<ListenEvent<T, LeasedBatchPart<T>>> {
142 match self.snapshot.take() {
143 Some(parts) => vec![ListenEvent::Updates(parts)],
144 None => {
145 let (parts, upper) = self.listen.next(listen_retry).await;
146 vec![ListenEvent::Updates(parts), ListenEvent::Progress(upper)]
147 }
148 }
149 }
150}
151
152impl<K, V, T, D> Subscribe<K, V, T, D>
153where
154 K: Debug + Codec,
155 V: Debug + Codec,
156 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
157 D: Semigroup + Codec64 + Send + Sync,
158{
159 #[instrument(level = "debug", fields(shard = %self.listen.handle.machine.shard_id()))]
162 pub async fn fetch_next(
163 &mut self,
164 ) -> Vec<ListenEvent<T, ((Result<K, String>, Result<V, String>), T, D)>> {
165 let events = self.next(None).await;
166 let new_len = events
167 .iter()
168 .map(|event| match event {
169 ListenEvent::Updates(parts) => parts.len(),
170 ListenEvent::Progress(_) => 1,
171 })
172 .sum();
173 let mut ret = Vec::with_capacity(new_len);
174 for event in events {
175 match event {
176 ListenEvent::Updates(parts) => {
177 for part in parts {
178 let fetched_part = self.listen.fetch_batch_part(part).await;
179 let updates = fetched_part.collect::<Vec<_>>();
180 if !updates.is_empty() {
181 ret.push(ListenEvent::Updates(updates));
182 }
183 }
184 }
185 ListenEvent::Progress(progress) => ret.push(ListenEvent::Progress(progress)),
186 }
187 }
188 ret
189 }
190
191 pub async fn fetch_batch_part(&mut self, part: LeasedBatchPart<T>) -> FetchedPart<K, V, T, D> {
193 self.listen.fetch_batch_part(part).await
194 }
195}
196
197impl<K, V, T, D> Subscribe<K, V, T, D>
198where
199 K: Debug + Codec,
200 V: Debug + Codec,
201 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
202 D: Semigroup + Codec64 + Send + Sync,
203{
204 pub async fn expire(mut self) {
213 let _ = self.snapshot.take(); self.listen.expire().await;
215 }
216}
217
218#[derive(Debug, PartialEq)]
222pub enum ListenEvent<T, D> {
223 Progress(Antichain<T>),
225 Updates(Vec<D>),
227}
228
229#[derive(Debug)]
231pub struct Listen<K: Codec, V: Codec, T, D> {
232 handle: ReadHandle<K, V, T, D>,
233 watch: StateWatch<K, V, T, D>,
234
235 as_of: Antichain<T>,
236 since: Antichain<T>,
237 frontier: Antichain<T>,
238}
239
240impl<K, V, T, D> Listen<K, V, T, D>
241where
242 K: Debug + Codec,
243 V: Debug + Codec,
244 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
245 D: Semigroup + Codec64 + Send + Sync,
246{
247 async fn new(
248 mut handle: ReadHandle<K, V, T, D>,
249 as_of: Antichain<T>,
250 ) -> Result<Self, Since<T>> {
251 let () = handle.machine.verify_listen(&as_of)?;
252
253 let since = as_of.clone();
254 if !PartialOrder::less_equal(handle.since(), &since) {
255 return Err(Since(handle.since().clone()));
258 }
259 handle.downgrade_since(&since).await;
263
264 let watch = handle.machine.applier.watch();
265 Ok(Listen {
266 handle,
267 watch,
268 since,
269 frontier: as_of.clone(),
270 as_of,
271 })
272 }
273
274 pub fn frontier(&self) -> &Antichain<T> {
276 &self.frontier
277 }
278
279 pub async fn next(
287 &mut self,
288 retry: Option<RetryParameters>,
290 ) -> (Vec<LeasedBatchPart<T>>, Antichain<T>) {
291 let batch = self
292 .handle
293 .machine
294 .next_listen_batch(
295 &self.frontier,
296 &mut self.watch,
297 Some(&self.handle.reader_id),
298 retry,
299 )
300 .await;
301
302 let acceptable_desc = PartialOrder::less_than(batch.desc.since(), &self.frontier)
312 || (self.frontier == self.as_of
317 && PartialOrder::less_equal(batch.desc.since(), &self.frontier));
318 if !acceptable_desc {
319 let lease_state = self
320 .handle
321 .machine
322 .applier
323 .reader_lease(self.handle.reader_id.clone());
324 if let Some(lease) = lease_state {
325 panic!(
326 "Listen on {} received a batch {:?} advanced past the listen frontier {:?}, but the lease has not expired: {:?}",
327 self.handle.machine.shard_id(),
328 batch.desc,
329 self.frontier,
330 lease
331 )
332 } else {
333 halt!(
336 "Listen on {} received a batch {:?} advanced past the listen frontier {:?} after the reader has expired. \
337 This can happen in exceptional cases: a machine goes to sleep or is running out of memory or CPU, for example.",
338 self.handle.machine.shard_id(),
339 batch.desc,
340 self.frontier
341 )
342 }
343 }
344
345 let new_frontier = batch.desc.upper().clone();
346
347 for x in self.frontier.elements().iter() {
370 let less_than_upper = batch.desc.upper().elements().iter().any(|u| x.less_than(u));
371 if less_than_upper {
372 self.since.join_assign(&Antichain::from_elem(x.clone()));
373 }
374 }
375
376 let filter = FetchBatchFilter::Listen {
381 as_of: self.as_of.clone(),
382 lower: self.frontier.clone(),
383 };
384 let parts = self.handle.lease_batch_parts(batch, filter).collect().await;
385
386 self.handle.maybe_downgrade_since(&self.since).await;
387
388 self.frontier = new_frontier;
391
392 (parts, self.frontier.clone())
393 }
394}
395
396impl<K, V, T, D> Listen<K, V, T, D>
397where
398 K: Debug + Codec,
399 V: Debug + Codec,
400 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
401 D: Semigroup + Codec64 + Send + Sync,
402{
403 #[instrument(level = "debug", name = "listen::next", fields(shard = %self.handle.machine.shard_id()))]
414 pub async fn fetch_next(
415 &mut self,
416 ) -> Vec<ListenEvent<T, ((Result<K, String>, Result<V, String>), T, D)>> {
417 let (parts, progress) = self.next(None).await;
418 let mut ret = Vec::with_capacity(parts.len() + 1);
419 for part in parts {
420 let fetched_part = self.fetch_batch_part(part).await;
421 let updates = fetched_part.collect::<Vec<_>>();
422 if !updates.is_empty() {
423 ret.push(ListenEvent::Updates(updates));
424 }
425 }
426 ret.push(ListenEvent::Progress(progress));
427 ret
428 }
429
430 pub fn into_stream(
432 mut self,
433 ) -> impl Stream<Item = ListenEvent<T, ((Result<K, String>, Result<V, String>), T, D)>> {
434 async_stream::stream!({
435 loop {
436 for msg in self.fetch_next().await {
437 yield msg;
438 }
439 }
440 })
441 }
442
443 #[cfg(test)]
447 #[track_caller]
448 pub async fn read_until(
449 &mut self,
450 ts: &T,
451 ) -> (
452 Vec<((Result<K, String>, Result<V, String>), T, D)>,
453 Antichain<T>,
454 ) {
455 let mut updates = Vec::new();
456 let mut frontier = Antichain::from_elem(T::minimum());
457 while self.frontier.less_than(ts) {
458 for event in self.fetch_next().await {
459 match event {
460 ListenEvent::Updates(mut x) => updates.append(&mut x),
461 ListenEvent::Progress(x) => frontier = x,
462 }
463 }
464 }
465 (updates, frontier)
468 }
469}
470
471impl<K, V, T, D> Listen<K, V, T, D>
472where
473 K: Debug + Codec,
474 V: Debug + Codec,
475 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
476 D: Semigroup + Codec64 + Send + Sync,
477{
478 async fn fetch_batch_part(&mut self, part: LeasedBatchPart<T>) -> FetchedPart<K, V, T, D> {
483 let fetched_part = fetch_leased_part(
484 &self.handle.cfg,
485 &part,
486 self.handle.blob.as_ref(),
487 Arc::clone(&self.handle.metrics),
488 &self.handle.metrics.read.listen,
489 &self.handle.machine.applier.shard_metrics,
490 &self.handle.reader_id,
491 self.handle.read_schemas.clone(),
492 &mut self.handle.schema_cache,
493 )
494 .await;
495 fetched_part
496 }
497
498 pub async fn expire(self) {
507 self.handle.expire().await
508 }
509}
510
511#[derive(Debug)]
532pub struct ReadHandle<K: Codec, V: Codec, T, D> {
533 pub(crate) cfg: PersistConfig,
534 pub(crate) metrics: Arc<Metrics>,
535 pub(crate) machine: Machine<K, V, T, D>,
536 pub(crate) gc: GarbageCollector<K, V, T, D>,
537 pub(crate) blob: Arc<dyn Blob>,
538 pub(crate) reader_id: LeasedReaderId,
539 pub(crate) read_schemas: Schemas<K, V>,
540 pub(crate) schema_cache: SchemaCache<K, V, T, D>,
541
542 since: Antichain<T>,
543 pub(crate) last_heartbeat: EpochMillis,
544 pub(crate) leased_seqnos: BTreeMap<SeqNo, Lease>,
545 pub(crate) unexpired_state: Option<UnexpiredReadHandleState>,
546}
547
548pub(crate) const READER_LEASE_DURATION: Config<Duration> = Config::new(
551 "persist_reader_lease_duration",
552 Duration::from_secs(60 * 15),
553 "The time after which we'll clean up stale read leases",
554);
555
556impl<K, V, T, D> ReadHandle<K, V, T, D>
557where
558 K: Debug + Codec,
559 V: Debug + Codec,
560 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
561 D: Semigroup + Codec64 + Send + Sync,
562{
563 pub(crate) async fn new(
564 cfg: PersistConfig,
565 metrics: Arc<Metrics>,
566 machine: Machine<K, V, T, D>,
567 gc: GarbageCollector<K, V, T, D>,
568 blob: Arc<dyn Blob>,
569 reader_id: LeasedReaderId,
570 read_schemas: Schemas<K, V>,
571 since: Antichain<T>,
572 last_heartbeat: EpochMillis,
573 ) -> Self {
574 let schema_cache = machine.applier.schema_cache();
575 let expire_fn = Self::expire_fn(machine.clone(), gc.clone(), reader_id.clone());
576 ReadHandle {
577 cfg,
578 metrics: Arc::clone(&metrics),
579 machine: machine.clone(),
580 gc: gc.clone(),
581 blob,
582 reader_id: reader_id.clone(),
583 read_schemas,
584 schema_cache,
585 since,
586 last_heartbeat,
587 leased_seqnos: BTreeMap::new(),
588 unexpired_state: Some(UnexpiredReadHandleState {
589 expire_fn,
590 _heartbeat_tasks: machine
591 .start_reader_heartbeat_tasks(reader_id, gc)
592 .await
593 .into_iter()
594 .map(JoinHandle::abort_on_drop)
595 .collect(),
596 }),
597 }
598 }
599
600 pub fn shard_id(&self) -> ShardId {
602 self.machine.shard_id()
603 }
604
605 pub fn since(&self) -> &Antichain<T> {
609 &self.since
610 }
611
612 fn outstanding_seqno(&mut self) -> Option<SeqNo> {
613 while let Some(first) = self.leased_seqnos.first_entry() {
614 if first.get().count() <= 1 {
615 first.remove();
616 } else {
617 return Some(*first.key());
618 }
619 }
620 None
621 }
622
623 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
634 pub async fn downgrade_since(&mut self, new_since: &Antichain<T>) {
635 let outstanding_seqno = self.outstanding_seqno();
637
638 let heartbeat_ts = (self.cfg.now)();
639 let (_seqno, current_reader_since, maintenance) = self
640 .machine
641 .downgrade_since(&self.reader_id, outstanding_seqno, new_since, heartbeat_ts)
642 .await;
643
644 if let Some(outstanding_seqno) = outstanding_seqno {
646 let seqnos_held = _seqno.0.saturating_sub(outstanding_seqno.0);
647 const SEQNOS_HELD_THRESHOLD: u64 = 60 * 60;
650 if seqnos_held >= SEQNOS_HELD_THRESHOLD {
651 tracing::info!(
652 "{} reader {} holding an unexpected number of seqnos {} vs {}: {:?}. bt: {:?}",
653 self.machine.shard_id(),
654 self.reader_id,
655 outstanding_seqno,
656 _seqno,
657 self.leased_seqnos.keys().take(10).collect::<Vec<_>>(),
658 Backtrace::force_capture(),
661 );
662 }
663 }
664
665 self.since = current_reader_since.0;
666 self.last_heartbeat = heartbeat_ts;
669 maintenance.start_performing(&self.machine, &self.gc);
670 }
671
672 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
689 pub async fn listen(self, as_of: Antichain<T>) -> Result<Listen<K, V, T, D>, Since<T>> {
690 Listen::new(self, as_of).await
691 }
692
693 #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
707 pub async fn snapshot(
708 &mut self,
709 as_of: Antichain<T>,
710 ) -> Result<Vec<LeasedBatchPart<T>>, Since<T>> {
711 let batches = self.machine.snapshot(&as_of).await?;
712
713 if !PartialOrder::less_equal(self.since(), &as_of) {
714 return Err(Since(self.since().clone()));
715 }
716
717 let filter = FetchBatchFilter::Snapshot { as_of };
718 let mut leased_parts = Vec::new();
719 for batch in batches {
720 leased_parts.extend(
725 self.lease_batch_parts(batch, filter.clone())
726 .collect::<Vec<_>>()
727 .await,
728 );
729 }
730 Ok(leased_parts)
731 }
732
733 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
739 pub async fn subscribe(
740 mut self,
741 as_of: Antichain<T>,
742 ) -> Result<Subscribe<K, V, T, D>, Since<T>> {
743 let snapshot_parts = self.snapshot(as_of.clone()).await?;
744 let listen = self.listen(as_of.clone()).await?;
745 Ok(Subscribe::new(snapshot_parts, listen))
746 }
747
748 fn lease_batch_part(
749 &mut self,
750 desc: Description<T>,
751 part: BatchPart<T>,
752 filter: FetchBatchFilter<T>,
753 ) -> LeasedBatchPart<T> {
754 LeasedBatchPart {
755 metrics: Arc::clone(&self.metrics),
756 shard_id: self.machine.shard_id(),
757 filter,
758 desc,
759 part,
760 lease: self.lease_seqno(),
761 filter_pushdown_audit: false,
762 }
763 }
764
765 fn lease_batch_parts(
766 &mut self,
767 batch: HollowBatch<T>,
768 filter: FetchBatchFilter<T>,
769 ) -> impl Stream<Item = LeasedBatchPart<T>> + '_ {
770 stream! {
771 let blob = Arc::clone(&self.blob);
772 let metrics = Arc::clone(&self.metrics);
773 let desc = batch.desc.clone();
774 for await part in batch.part_stream(self.shard_id(), &*blob, &*metrics) {
775 yield self.lease_batch_part(desc.clone(), part.expect("leased part").into_owned(), filter.clone())
776 }
777 }
778 }
779
780 fn lease_seqno(&mut self) -> Lease {
784 let seqno = self.machine.seqno();
785 let lease = self
786 .leased_seqnos
787 .entry(seqno)
788 .or_insert_with(|| Lease::new(seqno));
789 lease.clone()
790 }
791
792 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
795 pub async fn clone(&self, purpose: &str) -> Self {
796 let new_reader_id = LeasedReaderId::new();
797 let machine = self.machine.clone();
798 let gc = self.gc.clone();
799 let heartbeat_ts = (self.cfg.now)();
800 let (reader_state, maintenance) = machine
801 .register_leased_reader(
802 &new_reader_id,
803 purpose,
804 READER_LEASE_DURATION.get(&self.cfg),
805 heartbeat_ts,
806 false,
807 )
808 .await;
809 maintenance.start_performing(&machine, &gc);
810 assert!(PartialOrder::less_equal(&reader_state.since, &self.since));
814 let new_reader = ReadHandle::new(
815 self.cfg.clone(),
816 Arc::clone(&self.metrics),
817 machine,
818 gc,
819 Arc::clone(&self.blob),
820 new_reader_id,
821 self.read_schemas.clone(),
822 reader_state.since,
823 heartbeat_ts,
824 )
825 .await;
826 new_reader
827 }
828
829 pub async fn maybe_downgrade_since(&mut self, new_since: &Antichain<T>) {
835 let min_elapsed = READER_LEASE_DURATION.get(&self.cfg) / 4;
836 let elapsed_since_last_heartbeat =
837 Duration::from_millis((self.cfg.now)().saturating_sub(self.last_heartbeat));
838 if elapsed_since_last_heartbeat >= min_elapsed {
839 self.downgrade_since(new_since).await;
840 }
841 }
842
843 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
852 pub async fn expire(mut self) {
853 let Some(unexpired_state) = self.unexpired_state.take() else {
858 return;
859 };
860 unexpired_state.expire_fn.0().await;
861 }
862
863 fn expire_fn(
864 machine: Machine<K, V, T, D>,
865 gc: GarbageCollector<K, V, T, D>,
866 reader_id: LeasedReaderId,
867 ) -> ExpireFn {
868 ExpireFn(Box::new(move || {
869 Box::pin(async move {
870 let (_, maintenance) = machine.expire_leased_reader(&reader_id).await;
871 maintenance.start_performing(&machine, &gc);
872 })
873 }))
874 }
875
876 #[cfg(test)]
878 #[track_caller]
879 pub async fn expect_listen(self, as_of: T) -> Listen<K, V, T, D> {
880 self.listen(Antichain::from_elem(as_of))
881 .await
882 .expect("cannot serve requested as_of")
883 }
884}
885
886#[derive(Debug)]
888pub(crate) struct UnexpiredReadHandleState {
889 expire_fn: ExpireFn,
890 pub(crate) _heartbeat_tasks: Vec<AbortOnDropHandle<()>>,
891}
892
893#[derive(Debug)]
899pub struct Cursor<K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64, L = Lease> {
900 consolidator: Consolidator<T, D, StructuredSort<K, V, T, D>>,
901 max_len: usize,
902 max_bytes: usize,
903 _lease: L,
904 read_schemas: Schemas<K, V>,
905}
906
907impl<K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64, L> Cursor<K, V, T, D, L> {
908 pub fn into_lease(self: Self) -> L {
911 self._lease
912 }
913}
914
915impl<K, V, T, D, L> Cursor<K, V, T, D, L>
916where
917 K: Debug + Codec + Ord,
918 V: Debug + Codec + Ord,
919 T: Timestamp + Lattice + Codec64 + Sync,
920 D: Semigroup + Ord + Codec64 + Send + Sync,
921{
922 pub async fn next(
924 &mut self,
925 ) -> Option<impl Iterator<Item = ((Result<K, String>, Result<V, String>), T, D)> + '_> {
926 let Self {
927 consolidator,
928 max_len,
929 max_bytes,
930 _lease,
931 read_schemas: _,
932 } = self;
933
934 let part = consolidator
935 .next_chunk(*max_len, *max_bytes)
936 .await
937 .expect("fetching a leased part")?;
938 let key_decoder = self
939 .read_schemas
940 .key
941 .decoder_any(part.key.as_ref())
942 .expect("ok");
943 let val_decoder = self
944 .read_schemas
945 .val
946 .decoder_any(part.val.as_ref())
947 .expect("ok");
948 let iter = (0..part.len()).map(move |i| {
949 let mut k = K::default();
950 let mut v = V::default();
951 key_decoder.decode(i, &mut k);
952 val_decoder.decode(i, &mut v);
953 let t = T::decode(part.time.value(i).to_le_bytes());
954 let d = D::decode(part.diff.value(i).to_le_bytes());
955 ((Ok(k), Ok(v)), t, d)
956 });
957
958 Some(iter)
959 }
960}
961
962impl<K, V, T, D> ReadHandle<K, V, T, D>
963where
964 K: Debug + Codec + Ord,
965 V: Debug + Codec + Ord,
966 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
967 D: Semigroup + Ord + Codec64 + Send + Sync,
968{
969 pub async fn snapshot_and_fetch(
983 &mut self,
984 as_of: Antichain<T>,
985 ) -> Result<Vec<((Result<K, String>, Result<V, String>), T, D)>, Since<T>> {
986 let mut cursor = self.snapshot_cursor(as_of, |_| true).await?;
987 let mut contents = Vec::new();
988 while let Some(iter) = cursor.next().await {
989 contents.extend(iter);
990 }
991
992 let old_len = contents.len();
995 consolidate_updates(&mut contents);
996 if old_len != contents.len() {
997 self.machine
999 .applier
1000 .shard_metrics
1001 .unconsolidated_snapshot
1002 .inc();
1003 }
1004
1005 Ok(contents)
1006 }
1007
1008 pub async fn snapshot_cursor(
1015 &mut self,
1016 as_of: Antichain<T>,
1017 should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
1018 ) -> Result<Cursor<K, V, T, D>, Since<T>> {
1019 let batches = self.machine.snapshot(&as_of).await?;
1020 let lease = self.lease_seqno();
1021
1022 Self::read_batches_consolidated(
1023 &self.cfg,
1024 Arc::clone(&self.metrics),
1025 Arc::clone(&self.machine.applier.shard_metrics),
1026 self.metrics.read.snapshot.clone(),
1027 Arc::clone(&self.blob),
1028 self.shard_id(),
1029 as_of,
1030 self.read_schemas.clone(),
1031 &batches,
1032 lease,
1033 should_fetch_part,
1034 COMPACTION_MEMORY_BOUND_BYTES.get(&self.cfg),
1035 )
1036 }
1037
1038 pub(crate) fn read_batches_consolidated<L>(
1039 persist_cfg: &PersistConfig,
1040 metrics: Arc<Metrics>,
1041 shard_metrics: Arc<ShardMetrics>,
1042 read_metrics: ReadMetrics,
1043 blob: Arc<dyn Blob>,
1044 shard_id: ShardId,
1045 as_of: Antichain<T>,
1046 schemas: Schemas<K, V>,
1047 batches: &[HollowBatch<T>],
1048 lease: L,
1049 should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
1050 memory_budget_bytes: usize,
1051 ) -> Result<Cursor<K, V, T, D, L>, Since<T>> {
1052 let context = format!("{}[as_of={:?}]", shard_id, as_of.elements());
1053 let filter = FetchBatchFilter::Snapshot {
1054 as_of: as_of.clone(),
1055 };
1056
1057 let mut consolidator = Consolidator::new(
1058 context,
1059 FetchConfig::from_persist_config(persist_cfg),
1060 shard_id,
1061 StructuredSort::new(schemas.clone()),
1062 blob,
1063 metrics,
1064 shard_metrics,
1065 read_metrics,
1066 filter,
1067 None,
1068 memory_budget_bytes,
1069 );
1070 for batch in batches {
1071 for (meta, run) in batch.runs() {
1072 consolidator.enqueue_run(
1073 &batch.desc,
1074 meta,
1075 run.into_iter()
1076 .filter(|p| should_fetch_part(p.stats()))
1077 .cloned(),
1078 );
1079 }
1080 }
1081 let max_len = persist_cfg.compaction_yield_after_n_updates;
1085 let max_bytes = BLOB_TARGET_SIZE.get(persist_cfg).max(1);
1086
1087 Ok(Cursor {
1088 consolidator,
1089 max_len,
1090 max_bytes,
1091 _lease: lease,
1092 read_schemas: schemas,
1093 })
1094 }
1095
1096 pub fn snapshot_stats(
1108 &self,
1109 as_of: Option<Antichain<T>>,
1110 ) -> impl Future<Output = Result<SnapshotStats, Since<T>>> + Send + 'static {
1111 let machine = self.machine.clone();
1112 async move {
1113 let batches = match as_of {
1114 Some(as_of) => machine.snapshot(&as_of).await?,
1115 None => machine.applier.all_batches(),
1116 };
1117 let num_updates = batches.iter().map(|b| b.len).sum();
1118 Ok(SnapshotStats {
1119 shard_id: machine.shard_id(),
1120 num_updates,
1121 })
1122 }
1123 }
1124
1125 pub async fn snapshot_parts_stats(
1136 &self,
1137 as_of: Antichain<T>,
1138 ) -> Result<SnapshotPartsStats, Since<T>> {
1139 let batches = self.machine.snapshot(&as_of).await?;
1140 let parts = stream::iter(&batches)
1141 .flat_map(|b| b.part_stream(self.shard_id(), &*self.blob, &*self.metrics))
1142 .map(|p| {
1143 let p = p.expect("live batch");
1144 SnapshotPartStats {
1145 encoded_size_bytes: p.encoded_size_bytes(),
1146 stats: p.stats().cloned(),
1147 }
1148 })
1149 .collect()
1150 .await;
1151 Ok(SnapshotPartsStats {
1152 metrics: Arc::clone(&self.machine.applier.metrics),
1153 shard_id: self.machine.shard_id(),
1154 parts,
1155 })
1156 }
1157}
1158
1159impl<K, V, T, D> ReadHandle<K, V, T, D>
1160where
1161 K: Debug + Codec + Ord,
1162 V: Debug + Codec + Ord,
1163 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
1164 D: Semigroup + Codec64 + Send + Sync,
1165{
1166 pub async fn snapshot_and_stream(
1171 &mut self,
1172 as_of: Antichain<T>,
1173 ) -> Result<
1174 impl Stream<Item = ((Result<K, String>, Result<V, String>), T, D)> + use<K, V, T, D>,
1175 Since<T>,
1176 > {
1177 let snap = self.snapshot(as_of).await?;
1178
1179 let blob = Arc::clone(&self.blob);
1180 let metrics = Arc::clone(&self.metrics);
1181 let snapshot_metrics = self.metrics.read.snapshot.clone();
1182 let shard_metrics = Arc::clone(&self.machine.applier.shard_metrics);
1183 let reader_id = self.reader_id.clone();
1184 let schemas = self.read_schemas.clone();
1185 let mut schema_cache = self.schema_cache.clone();
1186 let persist_cfg = self.cfg.clone();
1187 let stream = async_stream::stream! {
1188 for part in snap {
1189 let mut fetched_part = fetch_leased_part(
1190 &persist_cfg,
1191 &part,
1192 blob.as_ref(),
1193 Arc::clone(&metrics),
1194 &snapshot_metrics,
1195 &shard_metrics,
1196 &reader_id,
1197 schemas.clone(),
1198 &mut schema_cache,
1199 )
1200 .await;
1201
1202 while let Some(next) = fetched_part.next() {
1203 yield next;
1204 }
1205 }
1206 };
1207
1208 Ok(stream)
1209 }
1210}
1211
1212impl<K, V, T, D> ReadHandle<K, V, T, D>
1213where
1214 K: Debug + Codec + Ord,
1215 V: Debug + Codec + Ord,
1216 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
1217 D: Semigroup + Ord + Codec64 + Send + Sync,
1218{
1219 #[cfg(test)]
1222 #[track_caller]
1223 pub async fn expect_snapshot_and_fetch(
1224 &mut self,
1225 as_of: T,
1226 ) -> Vec<((Result<K, String>, Result<V, String>), T, D)> {
1227 let mut ret = self
1228 .snapshot_and_fetch(Antichain::from_elem(as_of))
1229 .await
1230 .expect("cannot serve requested as_of");
1231
1232 ret.sort();
1233 ret
1234 }
1235}
1236
1237impl<K: Codec, V: Codec, T, D> Drop for ReadHandle<K, V, T, D> {
1238 fn drop(&mut self) {
1239 let Some(unexpired_state) = self.unexpired_state.take() else {
1244 return;
1245 };
1246
1247 let handle = match Handle::try_current() {
1248 Ok(x) => x,
1249 Err(_) => {
1250 warn!(
1251 "ReadHandle {} dropped without being explicitly expired, falling back to lease timeout",
1252 self.reader_id
1253 );
1254 return;
1255 }
1256 };
1257 let expire_span = debug_span!("drop::expire");
1263 handle.spawn_named(
1264 || format!("ReadHandle::expire ({})", self.reader_id),
1265 unexpired_state.expire_fn.0().instrument(expire_span),
1266 );
1267 }
1268}
1269
1270#[cfg(test)]
1271mod tests {
1272 use std::pin;
1273 use std::str::FromStr;
1274
1275 use mz_dyncfg::ConfigUpdates;
1276 use mz_ore::cast::CastFrom;
1277 use mz_ore::metrics::MetricsRegistry;
1278 use mz_persist::mem::{MemBlob, MemBlobConfig, MemConsensus};
1279 use mz_persist::unreliable::{UnreliableConsensus, UnreliableHandle};
1280 use serde::{Deserialize, Serialize};
1281 use serde_json::json;
1282 use tokio_stream::StreamExt;
1283
1284 use crate::async_runtime::IsolatedRuntime;
1285 use crate::batch::BLOB_TARGET_SIZE;
1286 use crate::cache::StateCache;
1287 use crate::internal::metrics::Metrics;
1288 use crate::rpc::NoopPubSubSender;
1289 use crate::tests::{all_ok, new_test_client};
1290 use crate::{Diagnostics, PersistClient, PersistConfig, ShardId};
1291
1292 use super::*;
1293
1294 #[mz_persist_proc::test(tokio::test)]
1296 #[cfg_attr(miri, ignore)] async fn drop_unused_subscribe(dyncfgs: ConfigUpdates) {
1298 let data = [
1299 (("0".to_owned(), "zero".to_owned()), 0, 1),
1300 (("1".to_owned(), "one".to_owned()), 1, 1),
1301 (("2".to_owned(), "two".to_owned()), 2, 1),
1302 ];
1303
1304 let (mut write, read) = new_test_client(&dyncfgs)
1305 .await
1306 .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1307 .await;
1308
1309 write.expect_compare_and_append(&data[0..1], 0, 1).await;
1310 write.expect_compare_and_append(&data[1..2], 1, 2).await;
1311 write.expect_compare_and_append(&data[2..3], 2, 3).await;
1312
1313 let subscribe = read
1314 .subscribe(timely::progress::Antichain::from_elem(2))
1315 .await
1316 .unwrap();
1317 assert!(
1318 !subscribe.snapshot.as_ref().unwrap().is_empty(),
1319 "snapshot must have batches for test to be meaningful"
1320 );
1321 drop(subscribe);
1322 }
1323
1324 #[mz_persist_proc::test(tokio::test)]
1326 #[cfg_attr(miri, ignore)] async fn streaming_consolidate(dyncfgs: ConfigUpdates) {
1328 let data = &[
1329 (("k".to_owned(), "v".to_owned()), 0, 1),
1331 (("k".to_owned(), "v".to_owned()), 1, 1),
1332 (("k".to_owned(), "v".to_owned()), 2, 1),
1333 (("k2".to_owned(), "v".to_owned()), 0, 1),
1335 (("k2".to_owned(), "v".to_owned()), 1, -1),
1336 ];
1337
1338 let (mut write, read) = {
1339 let client = new_test_client(&dyncfgs).await;
1340 client.cfg.set_config(&BLOB_TARGET_SIZE, 1000); client
1342 .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1343 .await
1344 };
1345
1346 write.expect_compare_and_append(data, 0, 5).await;
1347
1348 let mut snapshot = read
1349 .subscribe(timely::progress::Antichain::from_elem(4))
1350 .await
1351 .unwrap();
1352
1353 let mut updates = vec![];
1354 'outer: loop {
1355 for event in snapshot.fetch_next().await {
1356 match event {
1357 ListenEvent::Progress(t) => {
1358 if !t.less_than(&4) {
1359 break 'outer;
1360 }
1361 }
1362 ListenEvent::Updates(data) => {
1363 updates.extend(data);
1364 }
1365 }
1366 }
1367 }
1368 assert_eq!(
1369 updates,
1370 &[((Ok("k".to_owned()), Ok("v".to_owned())), 4u64, 3i64)],
1371 )
1372 }
1373
1374 #[mz_persist_proc::test(tokio::test)]
1375 #[cfg_attr(miri, ignore)] async fn snapshot_and_stream(dyncfgs: ConfigUpdates) {
1377 let data = &mut [
1378 (("k1".to_owned(), "v1".to_owned()), 0, 1),
1379 (("k2".to_owned(), "v2".to_owned()), 1, 1),
1380 (("k3".to_owned(), "v3".to_owned()), 2, 1),
1381 (("k4".to_owned(), "v4".to_owned()), 2, 1),
1382 (("k5".to_owned(), "v5".to_owned()), 3, 1),
1383 ];
1384
1385 let (mut write, mut read) = {
1386 let client = new_test_client(&dyncfgs).await;
1387 client.cfg.set_config(&BLOB_TARGET_SIZE, 0); client
1389 .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1390 .await
1391 };
1392
1393 write.expect_compare_and_append(&data[0..2], 0, 2).await;
1394 write.expect_compare_and_append(&data[2..4], 2, 3).await;
1395 write.expect_compare_and_append(&data[4..], 3, 4).await;
1396
1397 let as_of = Antichain::from_elem(3);
1398 let mut snapshot = pin::pin!(read.snapshot_and_stream(as_of.clone()).await.unwrap());
1399
1400 let mut snapshot_rows = vec![];
1401 while let Some(((k, v), t, d)) = snapshot.next().await {
1402 snapshot_rows.push(((k.expect("valid key"), v.expect("valid key")), t, d));
1403 }
1404
1405 for ((_k, _v), t, _d) in data.as_mut_slice() {
1406 t.advance_by(as_of.borrow());
1407 }
1408
1409 assert_eq!(data.as_slice(), snapshot_rows.as_slice());
1410 }
1411
1412 #[mz_persist_proc::test(tokio::test)]
1414 #[cfg_attr(miri, ignore)] async fn seqno_leases(dyncfgs: ConfigUpdates) {
1416 let mut data = vec![];
1417 for i in 0..20 {
1418 data.push(((i.to_string(), i.to_string()), i, 1))
1419 }
1420
1421 let shard_id = ShardId::new();
1422
1423 let client = new_test_client(&dyncfgs).await;
1424 let (mut write, read) = client
1425 .expect_open::<String, String, u64, i64>(shard_id)
1426 .await;
1427
1428 let mut offset = 0;
1430 let mut width = 2;
1431
1432 for i in offset..offset + width {
1433 write
1434 .expect_compare_and_append(
1435 &data[i..i + 1],
1436 u64::cast_from(i),
1437 u64::cast_from(i) + 1,
1438 )
1439 .await;
1440 }
1441 offset += width;
1442
1443 let mut fetcher = client
1445 .create_batch_fetcher::<String, String, u64, i64>(
1446 shard_id,
1447 Default::default(),
1448 Default::default(),
1449 false,
1450 Diagnostics::for_tests(),
1451 )
1452 .await
1453 .unwrap();
1454
1455 let mut subscribe = read
1456 .subscribe(timely::progress::Antichain::from_elem(1))
1457 .await
1458 .expect("cannot serve requested as_of");
1459
1460 let original_seqno_since = subscribe.listen.handle.machine.applier.seqno_since();
1462
1463 let mut parts = vec![];
1464
1465 width = 4;
1466 for i in offset..offset + width {
1468 for event in subscribe.next(None).await {
1469 if let ListenEvent::Updates(mut new_parts) = event {
1470 parts.append(&mut new_parts);
1471 subscribe
1474 .listen
1475 .handle
1476 .downgrade_since(&subscribe.listen.since)
1477 .await;
1478 }
1479 }
1480
1481 write
1482 .expect_compare_and_append(
1483 &data[i..i + 1],
1484 u64::cast_from(i),
1485 u64::cast_from(i) + 1,
1486 )
1487 .await;
1488
1489 assert_eq!(
1491 subscribe.listen.handle.machine.applier.seqno_since(),
1492 original_seqno_since
1493 );
1494 }
1495
1496 offset += width;
1497
1498 let mut seqno_since = subscribe.listen.handle.machine.applier.seqno_since();
1499
1500 assert_eq!(seqno_since, original_seqno_since);
1502
1503 let mut subsequent_parts = vec![];
1506
1507 let mut this_seqno = SeqNo::minimum();
1511
1512 for (mut i, part) in parts.into_iter().enumerate() {
1514 let part_seqno = part.lease.seqno();
1515 let last_seqno = this_seqno;
1516 this_seqno = part_seqno;
1517 assert!(this_seqno >= last_seqno);
1518
1519 let (part, lease) = part.into_exchangeable_part();
1520 let _ = fetcher.fetch_leased_part(part).await;
1521 drop(lease);
1522
1523 for event in subscribe.next(None).await {
1525 if let ListenEvent::Updates(parts) = event {
1526 for part in parts {
1527 let (_, lease) = part.into_exchangeable_part();
1528 subsequent_parts.push(lease);
1529 }
1530 }
1531 }
1532
1533 subscribe
1534 .listen
1535 .handle
1536 .downgrade_since(&subscribe.listen.since)
1537 .await;
1538
1539 i += offset;
1541 write
1542 .expect_compare_and_append(
1543 &data[i..i + 1],
1544 u64::cast_from(i),
1545 u64::cast_from(i) + 1,
1546 )
1547 .await;
1548
1549 let expect_downgrade = subscribe.listen.handle.outstanding_seqno() > Some(part_seqno);
1552
1553 let new_seqno_since = subscribe.listen.handle.machine.applier.seqno_since();
1554 if expect_downgrade {
1555 assert!(new_seqno_since > seqno_since);
1556 } else {
1557 assert_eq!(new_seqno_since, seqno_since);
1558 }
1559 seqno_since = new_seqno_since;
1560 }
1561
1562 assert!(seqno_since > original_seqno_since);
1564
1565 drop(subsequent_parts);
1567 drop(subscribe);
1568 }
1569
1570 #[mz_ore::test]
1571 fn reader_id_human_readable_serde() {
1572 #[derive(Debug, Serialize, Deserialize)]
1573 struct Container {
1574 reader_id: LeasedReaderId,
1575 }
1576
1577 let id =
1579 LeasedReaderId::from_str("r00000000-1234-5678-0000-000000000000").expect("valid id");
1580 assert_eq!(
1581 id,
1582 serde_json::from_value(serde_json::to_value(id.clone()).expect("serializable"))
1583 .expect("deserializable")
1584 );
1585
1586 assert_eq!(
1588 id,
1589 serde_json::from_str("\"r00000000-1234-5678-0000-000000000000\"")
1590 .expect("deserializable")
1591 );
1592
1593 let json = json!({ "reader_id": id });
1595 assert_eq!(
1596 "{\"reader_id\":\"r00000000-1234-5678-0000-000000000000\"}",
1597 &json.to_string()
1598 );
1599 let container: Container = serde_json::from_value(json).expect("deserializable");
1600 assert_eq!(container.reader_id, id);
1601 }
1602
1603 #[mz_ore::test(tokio::test)]
1607 #[cfg_attr(miri, ignore)] async fn skip_consensus_fetch_optimization() {
1609 let data = vec![
1610 (("0".to_owned(), "zero".to_owned()), 0, 1),
1611 (("1".to_owned(), "one".to_owned()), 1, 1),
1612 (("2".to_owned(), "two".to_owned()), 2, 1),
1613 ];
1614
1615 let cfg = PersistConfig::new_for_tests();
1616 let blob = Arc::new(MemBlob::open(MemBlobConfig::default()));
1617 let consensus = Arc::new(MemConsensus::default());
1618 let unreliable = UnreliableHandle::default();
1619 unreliable.totally_available();
1620 let consensus = Arc::new(UnreliableConsensus::new(consensus, unreliable.clone()));
1621 let metrics = Arc::new(Metrics::new(&cfg, &MetricsRegistry::new()));
1622 let pubsub_sender = Arc::new(NoopPubSubSender);
1623 let (mut write, mut read) = PersistClient::new(
1624 cfg,
1625 blob,
1626 consensus,
1627 metrics,
1628 Arc::new(IsolatedRuntime::new_for_tests()),
1629 Arc::new(StateCache::new_no_metrics()),
1630 pubsub_sender,
1631 )
1632 .expect("client construction failed")
1633 .expect_open::<String, String, u64, i64>(ShardId::new())
1634 .await;
1635
1636 write.expect_compare_and_append(&data[0..1], 0, 1).await;
1637 write.expect_compare_and_append(&data[1..2], 1, 2).await;
1638 write.expect_compare_and_append(&data[2..3], 2, 3).await;
1639
1640 let snapshot = read.expect_snapshot_and_fetch(2).await;
1641 let mut listen = read.expect_listen(0).await;
1642
1643 let listen_actual = listen.fetch_next().await;
1648 let expected_events = vec![ListenEvent::Progress(Antichain::from_elem(1))];
1649 assert_eq!(listen_actual, expected_events);
1650
1651 unreliable.totally_unavailable();
1654 assert_eq!(snapshot, all_ok(&data, 2));
1655 assert_eq!(
1656 listen.read_until(&3).await,
1657 (all_ok(&data[1..], 1), Antichain::from_elem(3))
1658 );
1659 }
1660}