1use async_stream::stream;
13use std::backtrace::Backtrace;
14use std::collections::BTreeMap;
15use std::fmt::Debug;
16use std::future::Future;
17use std::sync::Arc;
18use std::time::Duration;
19
20use differential_dataflow::consolidation::consolidate_updates;
21use differential_dataflow::difference::Semigroup;
22use differential_dataflow::lattice::Lattice;
23use differential_dataflow::trace::Description;
24use futures::Stream;
25use futures_util::{StreamExt, stream};
26use mz_dyncfg::Config;
27use mz_ore::instrument;
28use mz_ore::now::EpochMillis;
29use mz_ore::task::{AbortOnDropHandle, JoinHandle, RuntimeExt};
30use mz_persist::location::{Blob, SeqNo};
31use mz_persist_types::columnar::{ColumnDecoder, Schema};
32use mz_persist_types::{Codec, Codec64};
33use proptest_derive::Arbitrary;
34use serde::{Deserialize, Serialize};
35use timely::PartialOrder;
36use timely::progress::{Antichain, Timestamp};
37use tokio::runtime::Handle;
38use tracing::{Instrument, debug_span, warn};
39use uuid::Uuid;
40
41use crate::batch::BLOB_TARGET_SIZE;
42use crate::cfg::{COMPACTION_MEMORY_BOUND_BYTES, RetryParameters};
43use crate::fetch::{FetchBatchFilter, FetchedPart, Lease, LeasedBatchPart, fetch_leased_part};
44use crate::internal::encoding::Schemas;
45use crate::internal::machine::{ExpireFn, Machine};
46use crate::internal::metrics::{Metrics, ReadMetrics, ShardMetrics};
47use crate::internal::state::{BatchPart, HollowBatch};
48use crate::internal::watch::StateWatch;
49use crate::iter::{Consolidator, StructuredSort};
50use crate::schema::SchemaCache;
51use crate::stats::{SnapshotPartStats, SnapshotPartsStats, SnapshotStats};
52use crate::{GarbageCollector, PersistConfig, ShardId, parse_id};
53
54pub use crate::internal::encoding::LazyPartStats;
55pub use crate::internal::state::Since;
56
57#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
59#[serde(try_from = "String", into = "String")]
60pub struct LeasedReaderId(pub(crate) [u8; 16]);
61
62impl std::fmt::Display for LeasedReaderId {
63 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 write!(f, "r{}", Uuid::from_bytes(self.0))
65 }
66}
67
68impl std::fmt::Debug for LeasedReaderId {
69 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70 write!(f, "LeasedReaderId({})", Uuid::from_bytes(self.0))
71 }
72}
73
74impl std::str::FromStr for LeasedReaderId {
75 type Err = String;
76
77 fn from_str(s: &str) -> Result<Self, Self::Err> {
78 parse_id('r', "LeasedReaderId", s).map(LeasedReaderId)
79 }
80}
81
82impl From<LeasedReaderId> for String {
83 fn from(reader_id: LeasedReaderId) -> Self {
84 reader_id.to_string()
85 }
86}
87
88impl TryFrom<String> for LeasedReaderId {
89 type Error = String;
90
91 fn try_from(s: String) -> Result<Self, Self::Error> {
92 s.parse()
93 }
94}
95
96impl LeasedReaderId {
97 pub(crate) fn new() -> Self {
98 LeasedReaderId(*Uuid::new_v4().as_bytes())
99 }
100}
101
102#[derive(Debug)]
107pub struct Subscribe<K: Codec, V: Codec, T, D> {
108 snapshot: Option<Vec<LeasedBatchPart<T>>>,
109 listen: Listen<K, V, T, D>,
110}
111
112impl<K, V, T, D> Subscribe<K, V, T, D>
113where
114 K: Debug + Codec,
115 V: Debug + Codec,
116 T: Timestamp + Lattice + Codec64 + Sync,
117 D: Semigroup + Codec64 + Send + Sync,
118{
119 fn new(snapshot_parts: Vec<LeasedBatchPart<T>>, listen: Listen<K, V, T, D>) -> Self {
120 Subscribe {
121 snapshot: Some(snapshot_parts),
122 listen,
123 }
124 }
125
126 #[instrument(level = "debug", fields(shard = %self.listen.handle.machine.shard_id()))]
134 pub async fn next(
135 &mut self,
136 listen_retry: Option<RetryParameters>,
138 ) -> Vec<ListenEvent<T, LeasedBatchPart<T>>> {
139 match self.snapshot.take() {
140 Some(parts) => vec![ListenEvent::Updates(parts)],
141 None => {
142 let (parts, upper) = self.listen.next(listen_retry).await;
143 vec![ListenEvent::Updates(parts), ListenEvent::Progress(upper)]
144 }
145 }
146 }
147}
148
149impl<K, V, T, D> Subscribe<K, V, T, D>
150where
151 K: Debug + Codec,
152 V: Debug + Codec,
153 T: Timestamp + Lattice + Codec64 + Sync,
154 D: Semigroup + Codec64 + Send + Sync,
155{
156 #[instrument(level = "debug", fields(shard = %self.listen.handle.machine.shard_id()))]
159 pub async fn fetch_next(
160 &mut self,
161 ) -> Vec<ListenEvent<T, ((Result<K, String>, Result<V, String>), T, D)>> {
162 let events = self.next(None).await;
163 let new_len = events
164 .iter()
165 .map(|event| match event {
166 ListenEvent::Updates(parts) => parts.len(),
167 ListenEvent::Progress(_) => 1,
168 })
169 .sum();
170 let mut ret = Vec::with_capacity(new_len);
171 for event in events {
172 match event {
173 ListenEvent::Updates(parts) => {
174 for part in parts {
175 let fetched_part = self.listen.fetch_batch_part(part).await;
176 let updates = fetched_part.collect::<Vec<_>>();
177 if !updates.is_empty() {
178 ret.push(ListenEvent::Updates(updates));
179 }
180 }
181 }
182 ListenEvent::Progress(progress) => ret.push(ListenEvent::Progress(progress)),
183 }
184 }
185 ret
186 }
187
188 pub async fn fetch_batch_part(&mut self, part: LeasedBatchPart<T>) -> FetchedPart<K, V, T, D> {
190 self.listen.fetch_batch_part(part).await
191 }
192}
193
194impl<K, V, T, D> Subscribe<K, V, T, D>
195where
196 K: Debug + Codec,
197 V: Debug + Codec,
198 T: Timestamp + Lattice + Codec64 + Sync,
199 D: Semigroup + Codec64 + Send + Sync,
200{
201 pub async fn expire(mut self) {
210 let _ = self.snapshot.take(); self.listen.expire().await;
212 }
213}
214
215#[derive(Debug, PartialEq)]
219pub enum ListenEvent<T, D> {
220 Progress(Antichain<T>),
222 Updates(Vec<D>),
224}
225
226#[derive(Debug)]
228pub struct Listen<K: Codec, V: Codec, T, D> {
229 handle: ReadHandle<K, V, T, D>,
230 watch: StateWatch<K, V, T, D>,
231
232 as_of: Antichain<T>,
233 since: Antichain<T>,
234 frontier: Antichain<T>,
235}
236
237impl<K, V, T, D> Listen<K, V, T, D>
238where
239 K: Debug + Codec,
240 V: Debug + Codec,
241 T: Timestamp + Lattice + Codec64 + Sync,
242 D: Semigroup + Codec64 + Send + Sync,
243{
244 async fn new(mut handle: ReadHandle<K, V, T, D>, as_of: Antichain<T>) -> Self {
245 let since = as_of.clone();
246 handle.downgrade_since(&since).await;
250
251 let watch = handle.machine.applier.watch();
252 Listen {
253 handle,
254 watch,
255 since,
256 frontier: as_of.clone(),
257 as_of,
258 }
259 }
260
261 pub fn frontier(&self) -> &Antichain<T> {
263 &self.frontier
264 }
265
266 pub async fn next(
274 &mut self,
275 retry: Option<RetryParameters>,
277 ) -> (Vec<LeasedBatchPart<T>>, Antichain<T>) {
278 let batch = self
279 .handle
280 .machine
281 .next_listen_batch(
282 &self.frontier,
283 &mut self.watch,
284 Some(&self.handle.reader_id),
285 retry,
286 )
287 .await;
288
289 assert!(
299 PartialOrder::less_than(batch.desc.since(), &self.frontier)
300 || (self.frontier == self.as_of
305 && PartialOrder::less_equal(batch.desc.since(), &self.frontier)),
306 "Listen on {} received a batch {:?} advanced past the listen frontier {:?}",
307 self.handle.machine.shard_id(),
308 batch.desc,
309 self.frontier
310 );
311
312 let new_frontier = batch.desc.upper().clone();
313
314 for x in self.frontier.elements().iter() {
338 let less_than_upper = batch.desc.upper().elements().iter().any(|u| x.less_than(u));
339 if less_than_upper {
340 self.since.join_assign(&Antichain::from_elem(x.clone()));
341 }
342 }
343
344 let filter = FetchBatchFilter::Listen {
349 as_of: self.as_of.clone(),
350 lower: self.frontier.clone(),
351 };
352 let parts = self.handle.lease_batch_parts(batch, filter).collect().await;
353
354 self.handle.maybe_downgrade_since(&self.since).await;
355
356 self.frontier = new_frontier;
359
360 (parts, self.frontier.clone())
361 }
362}
363
364impl<K, V, T, D> Listen<K, V, T, D>
365where
366 K: Debug + Codec,
367 V: Debug + Codec,
368 T: Timestamp + Lattice + Codec64 + Sync,
369 D: Semigroup + Codec64 + Send + Sync,
370{
371 #[instrument(level = "debug", name = "listen::next", fields(shard = %self.handle.machine.shard_id()))]
382 pub async fn fetch_next(
383 &mut self,
384 ) -> Vec<ListenEvent<T, ((Result<K, String>, Result<V, String>), T, D)>> {
385 let (parts, progress) = self.next(None).await;
386 let mut ret = Vec::with_capacity(parts.len() + 1);
387 for part in parts {
388 let fetched_part = self.fetch_batch_part(part).await;
389 let updates = fetched_part.collect::<Vec<_>>();
390 if !updates.is_empty() {
391 ret.push(ListenEvent::Updates(updates));
392 }
393 }
394 ret.push(ListenEvent::Progress(progress));
395 ret
396 }
397
398 pub fn into_stream(
400 mut self,
401 ) -> impl Stream<Item = ListenEvent<T, ((Result<K, String>, Result<V, String>), T, D)>> {
402 async_stream::stream!({
403 loop {
404 for msg in self.fetch_next().await {
405 yield msg;
406 }
407 }
408 })
409 }
410
411 #[cfg(test)]
415 #[track_caller]
416 pub async fn read_until(
417 &mut self,
418 ts: &T,
419 ) -> (
420 Vec<((Result<K, String>, Result<V, String>), T, D)>,
421 Antichain<T>,
422 ) {
423 let mut updates = Vec::new();
424 let mut frontier = Antichain::from_elem(T::minimum());
425 while self.frontier.less_than(ts) {
426 for event in self.fetch_next().await {
427 match event {
428 ListenEvent::Updates(mut x) => updates.append(&mut x),
429 ListenEvent::Progress(x) => frontier = x,
430 }
431 }
432 }
433 (updates, frontier)
436 }
437}
438
439impl<K, V, T, D> Listen<K, V, T, D>
440where
441 K: Debug + Codec,
442 V: Debug + Codec,
443 T: Timestamp + Lattice + Codec64 + Sync,
444 D: Semigroup + Codec64 + Send + Sync,
445{
446 async fn fetch_batch_part(&mut self, part: LeasedBatchPart<T>) -> FetchedPart<K, V, T, D> {
451 let fetched_part = fetch_leased_part(
452 &self.handle.cfg,
453 &part,
454 self.handle.blob.as_ref(),
455 Arc::clone(&self.handle.metrics),
456 &self.handle.metrics.read.listen,
457 &self.handle.machine.applier.shard_metrics,
458 &self.handle.reader_id,
459 self.handle.read_schemas.clone(),
460 &mut self.handle.schema_cache,
461 )
462 .await;
463 fetched_part
464 }
465
466 pub async fn expire(self) {
475 self.handle.expire().await
476 }
477}
478
479#[derive(Debug)]
500pub struct ReadHandle<K: Codec, V: Codec, T, D> {
501 pub(crate) cfg: PersistConfig,
502 pub(crate) metrics: Arc<Metrics>,
503 pub(crate) machine: Machine<K, V, T, D>,
504 pub(crate) gc: GarbageCollector<K, V, T, D>,
505 pub(crate) blob: Arc<dyn Blob>,
506 pub(crate) reader_id: LeasedReaderId,
507 pub(crate) read_schemas: Schemas<K, V>,
508 pub(crate) schema_cache: SchemaCache<K, V, T, D>,
509
510 since: Antichain<T>,
511 pub(crate) last_heartbeat: EpochMillis,
512 pub(crate) leased_seqnos: BTreeMap<SeqNo, Lease>,
513 pub(crate) unexpired_state: Option<UnexpiredReadHandleState>,
514}
515
516pub(crate) const READER_LEASE_DURATION: Config<Duration> = Config::new(
519 "persist_reader_lease_duration",
520 Duration::from_secs(60 * 15),
521 "The time after which we'll clean up stale read leases",
522);
523
524impl<K, V, T, D> ReadHandle<K, V, T, D>
525where
526 K: Debug + Codec,
527 V: Debug + Codec,
528 T: Timestamp + Lattice + Codec64 + Sync,
529 D: Semigroup + Codec64 + Send + Sync,
530{
531 pub(crate) async fn new(
532 cfg: PersistConfig,
533 metrics: Arc<Metrics>,
534 machine: Machine<K, V, T, D>,
535 gc: GarbageCollector<K, V, T, D>,
536 blob: Arc<dyn Blob>,
537 reader_id: LeasedReaderId,
538 read_schemas: Schemas<K, V>,
539 since: Antichain<T>,
540 last_heartbeat: EpochMillis,
541 ) -> Self {
542 let schema_cache = machine.applier.schema_cache();
543 let expire_fn = Self::expire_fn(machine.clone(), gc.clone(), reader_id.clone());
544 ReadHandle {
545 cfg,
546 metrics: Arc::clone(&metrics),
547 machine: machine.clone(),
548 gc: gc.clone(),
549 blob,
550 reader_id: reader_id.clone(),
551 read_schemas,
552 schema_cache,
553 since,
554 last_heartbeat,
555 leased_seqnos: BTreeMap::new(),
556 unexpired_state: Some(UnexpiredReadHandleState {
557 expire_fn,
558 _heartbeat_tasks: machine
559 .start_reader_heartbeat_tasks(reader_id, gc)
560 .await
561 .into_iter()
562 .map(JoinHandle::abort_on_drop)
563 .collect(),
564 }),
565 }
566 }
567
568 pub fn shard_id(&self) -> ShardId {
570 self.machine.shard_id()
571 }
572
573 pub fn since(&self) -> &Antichain<T> {
577 &self.since
578 }
579
580 fn outstanding_seqno(&mut self) -> Option<SeqNo> {
581 while let Some(first) = self.leased_seqnos.first_entry() {
582 if first.get().count() <= 1 {
583 first.remove();
584 } else {
585 return Some(*first.key());
586 }
587 }
588 None
589 }
590
591 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
602 pub async fn downgrade_since(&mut self, new_since: &Antichain<T>) {
603 let outstanding_seqno = self.outstanding_seqno();
605
606 let heartbeat_ts = (self.cfg.now)();
607 let (_seqno, current_reader_since, maintenance) = self
608 .machine
609 .downgrade_since(&self.reader_id, outstanding_seqno, new_since, heartbeat_ts)
610 .await;
611
612 if let Some(outstanding_seqno) = outstanding_seqno {
614 let seqnos_held = _seqno.0.saturating_sub(outstanding_seqno.0);
615 const SEQNOS_HELD_THRESHOLD: u64 = 60 * 60;
618 if seqnos_held >= SEQNOS_HELD_THRESHOLD {
619 tracing::info!(
620 "{} reader {} holding an unexpected number of seqnos {} vs {}: {:?}. bt: {:?}",
621 self.machine.shard_id(),
622 self.reader_id,
623 outstanding_seqno,
624 _seqno,
625 self.leased_seqnos.keys().take(10).collect::<Vec<_>>(),
626 Backtrace::force_capture(),
629 );
630 }
631 }
632
633 self.since = current_reader_since.0;
634 self.last_heartbeat = heartbeat_ts;
637 maintenance.start_performing(&self.machine, &self.gc);
638 }
639
640 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
657 pub async fn listen(self, as_of: Antichain<T>) -> Result<Listen<K, V, T, D>, Since<T>> {
658 let () = self.machine.verify_listen(&as_of)?;
659 Ok(Listen::new(self, as_of).await)
660 }
661
662 #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
676 pub async fn snapshot(
677 &mut self,
678 as_of: Antichain<T>,
679 ) -> Result<Vec<LeasedBatchPart<T>>, Since<T>> {
680 let batches = self.machine.snapshot(&as_of).await?;
681
682 if !PartialOrder::less_equal(self.since(), &as_of) {
683 return Err(Since(self.since().clone()));
684 }
685
686 let filter = FetchBatchFilter::Snapshot { as_of };
687 let mut leased_parts = Vec::new();
688 for batch in batches {
689 leased_parts.extend(
694 self.lease_batch_parts(batch, filter.clone())
695 .collect::<Vec<_>>()
696 .await,
697 );
698 }
699 Ok(leased_parts)
700 }
701
702 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
708 pub async fn subscribe(
709 mut self,
710 as_of: Antichain<T>,
711 ) -> Result<Subscribe<K, V, T, D>, Since<T>> {
712 let snapshot_parts = self.snapshot(as_of.clone()).await?;
713 let listen = self.listen(as_of.clone()).await?;
714 Ok(Subscribe::new(snapshot_parts, listen))
715 }
716
717 fn lease_batch_part(
718 &mut self,
719 desc: Description<T>,
720 part: BatchPart<T>,
721 filter: FetchBatchFilter<T>,
722 ) -> LeasedBatchPart<T> {
723 LeasedBatchPart {
724 metrics: Arc::clone(&self.metrics),
725 shard_id: self.machine.shard_id(),
726 filter,
727 desc,
728 part,
729 lease: self.lease_seqno(),
730 filter_pushdown_audit: false,
731 }
732 }
733
734 fn lease_batch_parts(
735 &mut self,
736 batch: HollowBatch<T>,
737 filter: FetchBatchFilter<T>,
738 ) -> impl Stream<Item = LeasedBatchPart<T>> + '_ {
739 stream! {
740 let blob = Arc::clone(&self.blob);
741 let metrics = Arc::clone(&self.metrics);
742 let desc = batch.desc.clone();
743 for await part in batch.part_stream(self.shard_id(), &*blob, &*metrics) {
744 yield self.lease_batch_part(desc.clone(), part.expect("leased part").into_owned(), filter.clone())
745 }
746 }
747 }
748
749 fn lease_seqno(&mut self) -> Lease {
753 let seqno = self.machine.seqno();
754 let lease = self
755 .leased_seqnos
756 .entry(seqno)
757 .or_insert_with(|| Lease::new(seqno));
758 lease.clone()
759 }
760
761 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
764 pub async fn clone(&self, purpose: &str) -> Self {
765 let new_reader_id = LeasedReaderId::new();
766 let machine = self.machine.clone();
767 let gc = self.gc.clone();
768 let heartbeat_ts = (self.cfg.now)();
769 let (reader_state, maintenance) = machine
770 .register_leased_reader(
771 &new_reader_id,
772 purpose,
773 READER_LEASE_DURATION.get(&self.cfg),
774 heartbeat_ts,
775 false,
776 )
777 .await;
778 maintenance.start_performing(&machine, &gc);
779 assert!(PartialOrder::less_equal(&reader_state.since, &self.since));
783 let new_reader = ReadHandle::new(
784 self.cfg.clone(),
785 Arc::clone(&self.metrics),
786 machine,
787 gc,
788 Arc::clone(&self.blob),
789 new_reader_id,
790 self.read_schemas.clone(),
791 reader_state.since,
792 heartbeat_ts,
793 )
794 .await;
795 new_reader
796 }
797
798 pub async fn maybe_downgrade_since(&mut self, new_since: &Antichain<T>) {
804 let min_elapsed = READER_LEASE_DURATION.get(&self.cfg) / 4;
805 let elapsed_since_last_heartbeat =
806 Duration::from_millis((self.cfg.now)().saturating_sub(self.last_heartbeat));
807 if elapsed_since_last_heartbeat >= min_elapsed {
808 self.downgrade_since(new_since).await;
809 }
810 }
811
812 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
821 pub async fn expire(mut self) {
822 let Some(unexpired_state) = self.unexpired_state.take() else {
827 return;
828 };
829 unexpired_state.expire_fn.0().await;
830 }
831
832 fn expire_fn(
833 machine: Machine<K, V, T, D>,
834 gc: GarbageCollector<K, V, T, D>,
835 reader_id: LeasedReaderId,
836 ) -> ExpireFn {
837 ExpireFn(Box::new(move || {
838 Box::pin(async move {
839 let (_, maintenance) = machine.expire_leased_reader(&reader_id).await;
840 maintenance.start_performing(&machine, &gc);
841 })
842 }))
843 }
844
845 #[cfg(test)]
847 #[track_caller]
848 pub async fn expect_listen(self, as_of: T) -> Listen<K, V, T, D> {
849 self.listen(Antichain::from_elem(as_of))
850 .await
851 .expect("cannot serve requested as_of")
852 }
853}
854
855#[derive(Debug)]
857pub(crate) struct UnexpiredReadHandleState {
858 expire_fn: ExpireFn,
859 pub(crate) _heartbeat_tasks: Vec<AbortOnDropHandle<()>>,
860}
861
862#[derive(Debug)]
868pub struct Cursor<K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64, L = Lease> {
869 pub(crate) consolidator: CursorConsolidator<K, V, T, D>,
870 pub(crate) _lease: L,
871 pub(crate) read_schemas: Schemas<K, V>,
872}
873
874impl<K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64, L> Cursor<K, V, T, D, L> {
875 pub fn into_lease(self: Self) -> L {
878 self._lease
879 }
880}
881
882#[derive(Debug)]
883pub(crate) enum CursorConsolidator<K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64> {
884 Structured {
885 consolidator: Consolidator<T, D, StructuredSort<K, V, T, D>>,
886 max_len: usize,
887 max_bytes: usize,
888 },
889}
890
891impl<K, V, T, D, L> Cursor<K, V, T, D, L>
892where
893 K: Debug + Codec + Ord,
894 V: Debug + Codec + Ord,
895 T: Timestamp + Lattice + Codec64 + Sync,
896 D: Semigroup + Ord + Codec64 + Send + Sync,
897{
898 pub async fn next(
900 &mut self,
901 ) -> Option<impl Iterator<Item = ((Result<K, String>, Result<V, String>), T, D)> + '_> {
902 match &mut self.consolidator {
903 CursorConsolidator::Structured {
904 consolidator,
905 max_len,
906 max_bytes,
907 } => {
908 let part = consolidator
909 .next_chunk(*max_len, *max_bytes)
910 .await
911 .expect("fetching a leased part")?;
912 let key_decoder = self
913 .read_schemas
914 .key
915 .decoder_any(part.key.as_ref())
916 .expect("ok");
917 let val_decoder = self
918 .read_schemas
919 .val
920 .decoder_any(part.val.as_ref())
921 .expect("ok");
922 let iter = (0..part.len()).map(move |i| {
923 let mut k = K::default();
924 let mut v = V::default();
925 key_decoder.decode(i, &mut k);
926 val_decoder.decode(i, &mut v);
927 let t = T::decode(part.time.value(i).to_le_bytes());
928 let d = D::decode(part.diff.value(i).to_le_bytes());
929 ((Ok(k), Ok(v)), t, d)
930 });
931
932 Some(iter)
933 }
934 }
935 }
936}
937
938impl<K, V, T, D> ReadHandle<K, V, T, D>
939where
940 K: Debug + Codec + Ord,
941 V: Debug + Codec + Ord,
942 T: Timestamp + Lattice + Codec64 + Sync,
943 D: Semigroup + Ord + Codec64 + Send + Sync,
944{
945 pub async fn snapshot_and_fetch(
959 &mut self,
960 as_of: Antichain<T>,
961 ) -> Result<Vec<((Result<K, String>, Result<V, String>), T, D)>, Since<T>> {
962 let mut cursor = self.snapshot_cursor(as_of, |_| true).await?;
963 let mut contents = Vec::new();
964 while let Some(iter) = cursor.next().await {
965 contents.extend(iter);
966 }
967
968 let old_len = contents.len();
971 consolidate_updates(&mut contents);
972 if old_len != contents.len() {
973 self.machine
975 .applier
976 .shard_metrics
977 .unconsolidated_snapshot
978 .inc();
979 }
980
981 Ok(contents)
982 }
983
984 pub async fn snapshot_cursor(
991 &mut self,
992 as_of: Antichain<T>,
993 should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
994 ) -> Result<Cursor<K, V, T, D>, Since<T>> {
995 let batches = self.machine.snapshot(&as_of).await?;
996 let lease = self.lease_seqno();
997
998 Self::read_batches_consolidated(
999 &self.cfg,
1000 Arc::clone(&self.metrics),
1001 Arc::clone(&self.machine.applier.shard_metrics),
1002 self.metrics.read.snapshot.clone(),
1003 Arc::clone(&self.blob),
1004 self.shard_id(),
1005 as_of,
1006 self.read_schemas.clone(),
1007 &batches,
1008 lease,
1009 should_fetch_part,
1010 )
1011 }
1012
1013 pub(crate) fn read_batches_consolidated<L>(
1014 persist_cfg: &PersistConfig,
1015 metrics: Arc<Metrics>,
1016 shard_metrics: Arc<ShardMetrics>,
1017 read_metrics: ReadMetrics,
1018 blob: Arc<dyn Blob>,
1019 shard_id: ShardId,
1020 as_of: Antichain<T>,
1021 schemas: Schemas<K, V>,
1022 batches: &[HollowBatch<T>],
1023 lease: L,
1024 should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
1025 ) -> Result<Cursor<K, V, T, D, L>, Since<T>> {
1026 let context = format!("{}[as_of={:?}]", shard_id, as_of.elements());
1027 let filter = FetchBatchFilter::Snapshot {
1028 as_of: as_of.clone(),
1029 };
1030
1031 let consolidator = {
1032 let mut consolidator = Consolidator::new(
1033 context,
1034 shard_id,
1035 StructuredSort::new(schemas.clone()),
1036 blob,
1037 metrics,
1038 shard_metrics,
1039 read_metrics,
1040 filter,
1041 COMPACTION_MEMORY_BOUND_BYTES.get(persist_cfg),
1042 );
1043 for batch in batches {
1044 for (meta, run) in batch.runs() {
1045 consolidator.enqueue_run(
1046 &batch.desc,
1047 meta,
1048 run.into_iter()
1049 .filter(|p| should_fetch_part(p.stats()))
1050 .cloned(),
1051 );
1052 }
1053 }
1054 CursorConsolidator::Structured {
1055 consolidator,
1056 max_len: persist_cfg.compaction_yield_after_n_updates,
1060 max_bytes: BLOB_TARGET_SIZE.get(persist_cfg).max(1),
1061 }
1062 };
1063
1064 Ok(Cursor {
1065 consolidator,
1066 _lease: lease,
1067 read_schemas: schemas,
1068 })
1069 }
1070
1071 pub fn snapshot_stats(
1083 &self,
1084 as_of: Option<Antichain<T>>,
1085 ) -> impl Future<Output = Result<SnapshotStats, Since<T>>> + Send + 'static {
1086 let machine = self.machine.clone();
1087 async move {
1088 let batches = match as_of {
1089 Some(as_of) => machine.snapshot(&as_of).await?,
1090 None => machine.applier.all_batches(),
1091 };
1092 let num_updates = batches.iter().map(|b| b.len).sum();
1093 Ok(SnapshotStats {
1094 shard_id: machine.shard_id(),
1095 num_updates,
1096 })
1097 }
1098 }
1099
1100 pub async fn snapshot_parts_stats(
1111 &self,
1112 as_of: Antichain<T>,
1113 ) -> Result<SnapshotPartsStats, Since<T>> {
1114 let batches = self.machine.snapshot(&as_of).await?;
1115 let parts = stream::iter(&batches)
1116 .flat_map(|b| b.part_stream(self.shard_id(), &*self.blob, &*self.metrics))
1117 .map(|p| {
1118 let p = p.expect("live batch");
1119 SnapshotPartStats {
1120 encoded_size_bytes: p.encoded_size_bytes(),
1121 stats: p.stats().cloned(),
1122 }
1123 })
1124 .collect()
1125 .await;
1126 Ok(SnapshotPartsStats {
1127 metrics: Arc::clone(&self.machine.applier.metrics),
1128 shard_id: self.machine.shard_id(),
1129 parts,
1130 })
1131 }
1132}
1133
1134impl<K, V, T, D> ReadHandle<K, V, T, D>
1135where
1136 K: Debug + Codec + Ord,
1137 V: Debug + Codec + Ord,
1138 T: Timestamp + Lattice + Codec64 + Sync,
1139 D: Semigroup + Codec64 + Send + Sync,
1140{
1141 pub async fn snapshot_and_stream(
1146 &mut self,
1147 as_of: Antichain<T>,
1148 ) -> Result<
1149 impl Stream<Item = ((Result<K, String>, Result<V, String>), T, D)> + use<K, V, T, D>,
1150 Since<T>,
1151 > {
1152 let snap = self.snapshot(as_of).await?;
1153
1154 let blob = Arc::clone(&self.blob);
1155 let metrics = Arc::clone(&self.metrics);
1156 let snapshot_metrics = self.metrics.read.snapshot.clone();
1157 let shard_metrics = Arc::clone(&self.machine.applier.shard_metrics);
1158 let reader_id = self.reader_id.clone();
1159 let schemas = self.read_schemas.clone();
1160 let mut schema_cache = self.schema_cache.clone();
1161 let persist_cfg = self.cfg.clone();
1162 let stream = async_stream::stream! {
1163 for part in snap {
1164 let mut fetched_part = fetch_leased_part(
1165 &persist_cfg,
1166 &part,
1167 blob.as_ref(),
1168 Arc::clone(&metrics),
1169 &snapshot_metrics,
1170 &shard_metrics,
1171 &reader_id,
1172 schemas.clone(),
1173 &mut schema_cache,
1174 )
1175 .await;
1176
1177 while let Some(next) = fetched_part.next() {
1178 yield next;
1179 }
1180 }
1181 };
1182
1183 Ok(stream)
1184 }
1185}
1186
1187impl<K, V, T, D> ReadHandle<K, V, T, D>
1188where
1189 K: Debug + Codec + Ord,
1190 V: Debug + Codec + Ord,
1191 T: Timestamp + Lattice + Codec64 + Ord + Sync,
1192 D: Semigroup + Ord + Codec64 + Send + Sync,
1193{
1194 #[cfg(test)]
1197 #[track_caller]
1198 pub async fn expect_snapshot_and_fetch(
1199 &mut self,
1200 as_of: T,
1201 ) -> Vec<((Result<K, String>, Result<V, String>), T, D)> {
1202 let mut ret = self
1203 .snapshot_and_fetch(Antichain::from_elem(as_of))
1204 .await
1205 .expect("cannot serve requested as_of");
1206
1207 ret.sort();
1208 ret
1209 }
1210}
1211
1212impl<K: Codec, V: Codec, T, D> Drop for ReadHandle<K, V, T, D> {
1213 fn drop(&mut self) {
1214 let Some(unexpired_state) = self.unexpired_state.take() else {
1219 return;
1220 };
1221
1222 let handle = match Handle::try_current() {
1223 Ok(x) => x,
1224 Err(_) => {
1225 warn!(
1226 "ReadHandle {} dropped without being explicitly expired, falling back to lease timeout",
1227 self.reader_id
1228 );
1229 return;
1230 }
1231 };
1232 let expire_span = debug_span!("drop::expire");
1238 handle.spawn_named(
1239 || format!("ReadHandle::expire ({})", self.reader_id),
1240 unexpired_state.expire_fn.0().instrument(expire_span),
1241 );
1242 }
1243}
1244
1245#[cfg(test)]
1246mod tests {
1247 use std::pin;
1248 use std::str::FromStr;
1249
1250 use mz_dyncfg::ConfigUpdates;
1251 use mz_ore::cast::CastFrom;
1252 use mz_ore::metrics::MetricsRegistry;
1253 use mz_persist::mem::{MemBlob, MemBlobConfig, MemConsensus};
1254 use mz_persist::unreliable::{UnreliableConsensus, UnreliableHandle};
1255 use serde::{Deserialize, Serialize};
1256 use serde_json::json;
1257 use tokio_stream::StreamExt;
1258
1259 use crate::async_runtime::IsolatedRuntime;
1260 use crate::batch::BLOB_TARGET_SIZE;
1261 use crate::cache::StateCache;
1262 use crate::internal::metrics::Metrics;
1263 use crate::rpc::NoopPubSubSender;
1264 use crate::tests::{all_ok, new_test_client};
1265 use crate::{Diagnostics, PersistClient, PersistConfig, ShardId};
1266
1267 use super::*;
1268
1269 #[mz_persist_proc::test(tokio::test)]
1271 #[cfg_attr(miri, ignore)] async fn drop_unused_subscribe(dyncfgs: ConfigUpdates) {
1273 let data = [
1274 (("0".to_owned(), "zero".to_owned()), 0, 1),
1275 (("1".to_owned(), "one".to_owned()), 1, 1),
1276 (("2".to_owned(), "two".to_owned()), 2, 1),
1277 ];
1278
1279 let (mut write, read) = new_test_client(&dyncfgs)
1280 .await
1281 .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1282 .await;
1283
1284 write.expect_compare_and_append(&data[0..1], 0, 1).await;
1285 write.expect_compare_and_append(&data[1..2], 1, 2).await;
1286 write.expect_compare_and_append(&data[2..3], 2, 3).await;
1287
1288 let subscribe = read
1289 .subscribe(timely::progress::Antichain::from_elem(2))
1290 .await
1291 .unwrap();
1292 assert!(
1293 !subscribe.snapshot.as_ref().unwrap().is_empty(),
1294 "snapshot must have batches for test to be meaningful"
1295 );
1296 drop(subscribe);
1297 }
1298
1299 #[mz_persist_proc::test(tokio::test)]
1301 #[cfg_attr(miri, ignore)] async fn streaming_consolidate(dyncfgs: ConfigUpdates) {
1303 let data = &[
1304 (("k".to_owned(), "v".to_owned()), 0, 1),
1306 (("k".to_owned(), "v".to_owned()), 1, 1),
1307 (("k".to_owned(), "v".to_owned()), 2, 1),
1308 (("k2".to_owned(), "v".to_owned()), 0, 1),
1310 (("k2".to_owned(), "v".to_owned()), 1, -1),
1311 ];
1312
1313 let (mut write, read) = {
1314 let client = new_test_client(&dyncfgs).await;
1315 client.cfg.set_config(&BLOB_TARGET_SIZE, 1000); client
1317 .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1318 .await
1319 };
1320
1321 write.expect_compare_and_append(data, 0, 5).await;
1322
1323 let mut snapshot = read
1324 .subscribe(timely::progress::Antichain::from_elem(4))
1325 .await
1326 .unwrap();
1327
1328 let mut updates = vec![];
1329 'outer: loop {
1330 for event in snapshot.fetch_next().await {
1331 match event {
1332 ListenEvent::Progress(t) => {
1333 if !t.less_than(&4) {
1334 break 'outer;
1335 }
1336 }
1337 ListenEvent::Updates(data) => {
1338 updates.extend(data);
1339 }
1340 }
1341 }
1342 }
1343 assert_eq!(
1344 updates,
1345 &[((Ok("k".to_owned()), Ok("v".to_owned())), 4u64, 3i64)],
1346 )
1347 }
1348
1349 #[mz_persist_proc::test(tokio::test)]
1350 #[cfg_attr(miri, ignore)] async fn snapshot_and_stream(dyncfgs: ConfigUpdates) {
1352 let data = &mut [
1353 (("k1".to_owned(), "v1".to_owned()), 0, 1),
1354 (("k2".to_owned(), "v2".to_owned()), 1, 1),
1355 (("k3".to_owned(), "v3".to_owned()), 2, 1),
1356 (("k4".to_owned(), "v4".to_owned()), 2, 1),
1357 (("k5".to_owned(), "v5".to_owned()), 3, 1),
1358 ];
1359
1360 let (mut write, mut read) = {
1361 let client = new_test_client(&dyncfgs).await;
1362 client.cfg.set_config(&BLOB_TARGET_SIZE, 0); client
1364 .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1365 .await
1366 };
1367
1368 write.expect_compare_and_append(&data[0..2], 0, 2).await;
1369 write.expect_compare_and_append(&data[2..4], 2, 3).await;
1370 write.expect_compare_and_append(&data[4..], 3, 4).await;
1371
1372 let as_of = Antichain::from_elem(3);
1373 let mut snapshot = pin::pin!(read.snapshot_and_stream(as_of.clone()).await.unwrap());
1374
1375 let mut snapshot_rows = vec![];
1376 while let Some(((k, v), t, d)) = snapshot.next().await {
1377 snapshot_rows.push(((k.expect("valid key"), v.expect("valid key")), t, d));
1378 }
1379
1380 for ((_k, _v), t, _d) in data.as_mut_slice() {
1381 t.advance_by(as_of.borrow());
1382 }
1383
1384 assert_eq!(data.as_slice(), snapshot_rows.as_slice());
1385 }
1386
1387 #[mz_persist_proc::test(tokio::test)]
1389 #[cfg_attr(miri, ignore)] async fn seqno_leases(dyncfgs: ConfigUpdates) {
1391 let mut data = vec![];
1392 for i in 0..20 {
1393 data.push(((i.to_string(), i.to_string()), i, 1))
1394 }
1395
1396 let shard_id = ShardId::new();
1397
1398 let client = new_test_client(&dyncfgs).await;
1399 let (mut write, read) = client
1400 .expect_open::<String, String, u64, i64>(shard_id)
1401 .await;
1402
1403 let mut offset = 0;
1405 let mut width = 2;
1406
1407 for i in offset..offset + width {
1408 write
1409 .expect_compare_and_append(
1410 &data[i..i + 1],
1411 u64::cast_from(i),
1412 u64::cast_from(i) + 1,
1413 )
1414 .await;
1415 }
1416 offset += width;
1417
1418 let mut fetcher = client
1420 .create_batch_fetcher::<String, String, u64, i64>(
1421 shard_id,
1422 Default::default(),
1423 Default::default(),
1424 false,
1425 Diagnostics::for_tests(),
1426 )
1427 .await
1428 .unwrap();
1429
1430 let mut subscribe = read
1431 .subscribe(timely::progress::Antichain::from_elem(1))
1432 .await
1433 .expect("cannot serve requested as_of");
1434
1435 let original_seqno_since = subscribe.listen.handle.machine.applier.seqno_since();
1437
1438 let mut parts = vec![];
1439
1440 width = 4;
1441 for i in offset..offset + width {
1443 for event in subscribe.next(None).await {
1444 if let ListenEvent::Updates(mut new_parts) = event {
1445 parts.append(&mut new_parts);
1446 subscribe
1449 .listen
1450 .handle
1451 .downgrade_since(&subscribe.listen.since)
1452 .await;
1453 }
1454 }
1455
1456 write
1457 .expect_compare_and_append(
1458 &data[i..i + 1],
1459 u64::cast_from(i),
1460 u64::cast_from(i) + 1,
1461 )
1462 .await;
1463
1464 assert_eq!(
1466 subscribe.listen.handle.machine.applier.seqno_since(),
1467 original_seqno_since
1468 );
1469 }
1470
1471 offset += width;
1472
1473 let mut seqno_since = subscribe.listen.handle.machine.applier.seqno_since();
1474
1475 assert_eq!(seqno_since, original_seqno_since);
1477
1478 let mut subsequent_parts = vec![];
1481
1482 let mut this_seqno = SeqNo::minimum();
1486
1487 for (mut i, part) in parts.into_iter().enumerate() {
1489 let part_seqno = part.lease.seqno();
1490 let last_seqno = this_seqno;
1491 this_seqno = part_seqno;
1492 assert!(this_seqno >= last_seqno);
1493
1494 let (part, lease) = part.into_exchangeable_part();
1495 let _ = fetcher.fetch_leased_part(part).await;
1496 drop(lease);
1497
1498 for event in subscribe.next(None).await {
1500 if let ListenEvent::Updates(parts) = event {
1501 for part in parts {
1502 let (_, lease) = part.into_exchangeable_part();
1503 subsequent_parts.push(lease);
1504 }
1505 }
1506 }
1507
1508 subscribe
1509 .listen
1510 .handle
1511 .downgrade_since(&subscribe.listen.since)
1512 .await;
1513
1514 i += offset;
1516 write
1517 .expect_compare_and_append(
1518 &data[i..i + 1],
1519 u64::cast_from(i),
1520 u64::cast_from(i) + 1,
1521 )
1522 .await;
1523
1524 let expect_downgrade = subscribe.listen.handle.outstanding_seqno() > Some(part_seqno);
1527
1528 let new_seqno_since = subscribe.listen.handle.machine.applier.seqno_since();
1529 if expect_downgrade {
1530 assert!(new_seqno_since > seqno_since);
1531 } else {
1532 assert_eq!(new_seqno_since, seqno_since);
1533 }
1534 seqno_since = new_seqno_since;
1535 }
1536
1537 assert!(seqno_since > original_seqno_since);
1539
1540 drop(subsequent_parts);
1542 drop(subscribe);
1543 }
1544
1545 #[mz_ore::test]
1546 fn reader_id_human_readable_serde() {
1547 #[derive(Debug, Serialize, Deserialize)]
1548 struct Container {
1549 reader_id: LeasedReaderId,
1550 }
1551
1552 let id =
1554 LeasedReaderId::from_str("r00000000-1234-5678-0000-000000000000").expect("valid id");
1555 assert_eq!(
1556 id,
1557 serde_json::from_value(serde_json::to_value(id.clone()).expect("serializable"))
1558 .expect("deserializable")
1559 );
1560
1561 assert_eq!(
1563 id,
1564 serde_json::from_str("\"r00000000-1234-5678-0000-000000000000\"")
1565 .expect("deserializable")
1566 );
1567
1568 let json = json!({ "reader_id": id });
1570 assert_eq!(
1571 "{\"reader_id\":\"r00000000-1234-5678-0000-000000000000\"}",
1572 &json.to_string()
1573 );
1574 let container: Container = serde_json::from_value(json).expect("deserializable");
1575 assert_eq!(container.reader_id, id);
1576 }
1577
1578 #[mz_ore::test(tokio::test)]
1582 #[cfg_attr(miri, ignore)] async fn skip_consensus_fetch_optimization() {
1584 let data = vec![
1585 (("0".to_owned(), "zero".to_owned()), 0, 1),
1586 (("1".to_owned(), "one".to_owned()), 1, 1),
1587 (("2".to_owned(), "two".to_owned()), 2, 1),
1588 ];
1589
1590 let cfg = PersistConfig::new_for_tests();
1591 let blob = Arc::new(MemBlob::open(MemBlobConfig::default()));
1592 let consensus = Arc::new(MemConsensus::default());
1593 let unreliable = UnreliableHandle::default();
1594 unreliable.totally_available();
1595 let consensus = Arc::new(UnreliableConsensus::new(consensus, unreliable.clone()));
1596 let metrics = Arc::new(Metrics::new(&cfg, &MetricsRegistry::new()));
1597 let pubsub_sender = Arc::new(NoopPubSubSender);
1598 let (mut write, mut read) = PersistClient::new(
1599 cfg,
1600 blob,
1601 consensus,
1602 metrics,
1603 Arc::new(IsolatedRuntime::default()),
1604 Arc::new(StateCache::new_no_metrics()),
1605 pubsub_sender,
1606 )
1607 .expect("client construction failed")
1608 .expect_open::<String, String, u64, i64>(ShardId::new())
1609 .await;
1610
1611 write.expect_compare_and_append(&data[0..1], 0, 1).await;
1612 write.expect_compare_and_append(&data[1..2], 1, 2).await;
1613 write.expect_compare_and_append(&data[2..3], 2, 3).await;
1614
1615 let snapshot = read.expect_snapshot_and_fetch(2).await;
1616 let mut listen = read.expect_listen(0).await;
1617
1618 let listen_actual = listen.fetch_next().await;
1623 let expected_events = vec![ListenEvent::Progress(Antichain::from_elem(1))];
1624 assert_eq!(listen_actual, expected_events);
1625
1626 unreliable.totally_unavailable();
1629 assert_eq!(snapshot, all_ok(&data, 2));
1630 assert_eq!(
1631 listen.read_until(&3).await,
1632 (all_ok(&data[1..], 1), Antichain::from_elem(3))
1633 );
1634 }
1635}