1use async_stream::stream;
13use std::backtrace::Backtrace;
14use std::collections::BTreeMap;
15use std::fmt::Debug;
16use std::future::Future;
17use std::sync::Arc;
18use std::time::Duration;
19
20use differential_dataflow::consolidation::consolidate_updates;
21use differential_dataflow::difference::Semigroup;
22use differential_dataflow::lattice::Lattice;
23use differential_dataflow::trace::Description;
24use futures::Stream;
25use futures_util::{StreamExt, stream};
26use mz_dyncfg::Config;
27use mz_ore::instrument;
28use mz_ore::now::EpochMillis;
29use mz_ore::task::{AbortOnDropHandle, JoinHandle, RuntimeExt};
30use mz_persist::location::{Blob, SeqNo};
31use mz_persist_types::columnar::{ColumnDecoder, Schema};
32use mz_persist_types::{Codec, Codec64};
33use proptest_derive::Arbitrary;
34use serde::{Deserialize, Serialize};
35use timely::PartialOrder;
36use timely::progress::{Antichain, Timestamp};
37use tokio::runtime::Handle;
38use tracing::{Instrument, debug_span, warn};
39use uuid::Uuid;
40
41use crate::batch::BLOB_TARGET_SIZE;
42use crate::cfg::{COMPACTION_MEMORY_BOUND_BYTES, RetryParameters};
43use crate::fetch::{FetchBatchFilter, FetchedPart, Lease, LeasedBatchPart, fetch_leased_part};
44use crate::internal::encoding::Schemas;
45use crate::internal::machine::{ExpireFn, Machine};
46use crate::internal::metrics::Metrics;
47use crate::internal::state::{BatchPart, HollowBatch};
48use crate::internal::watch::StateWatch;
49use crate::iter::{Consolidator, StructuredSort};
50use crate::schema::SchemaCache;
51use crate::stats::{SnapshotPartStats, SnapshotPartsStats, SnapshotStats};
52use crate::{GarbageCollector, PersistConfig, ShardId, parse_id};
53
54pub use crate::internal::encoding::LazyPartStats;
55pub use crate::internal::state::Since;
56
57#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
59#[serde(try_from = "String", into = "String")]
60pub struct LeasedReaderId(pub(crate) [u8; 16]);
61
62impl std::fmt::Display for LeasedReaderId {
63 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 write!(f, "r{}", Uuid::from_bytes(self.0))
65 }
66}
67
68impl std::fmt::Debug for LeasedReaderId {
69 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70 write!(f, "LeasedReaderId({})", Uuid::from_bytes(self.0))
71 }
72}
73
74impl std::str::FromStr for LeasedReaderId {
75 type Err = String;
76
77 fn from_str(s: &str) -> Result<Self, Self::Err> {
78 parse_id('r', "LeasedReaderId", s).map(LeasedReaderId)
79 }
80}
81
82impl From<LeasedReaderId> for String {
83 fn from(reader_id: LeasedReaderId) -> Self {
84 reader_id.to_string()
85 }
86}
87
88impl TryFrom<String> for LeasedReaderId {
89 type Error = String;
90
91 fn try_from(s: String) -> Result<Self, Self::Error> {
92 s.parse()
93 }
94}
95
96impl LeasedReaderId {
97 pub(crate) fn new() -> Self {
98 LeasedReaderId(*Uuid::new_v4().as_bytes())
99 }
100}
101
102#[derive(Debug)]
107pub struct Subscribe<K: Codec, V: Codec, T, D> {
108 snapshot: Option<Vec<LeasedBatchPart<T>>>,
109 listen: Listen<K, V, T, D>,
110}
111
112impl<K, V, T, D> Subscribe<K, V, T, D>
113where
114 K: Debug + Codec,
115 V: Debug + Codec,
116 T: Timestamp + Lattice + Codec64 + Sync,
117 D: Semigroup + Codec64 + Send + Sync,
118{
119 fn new(snapshot_parts: Vec<LeasedBatchPart<T>>, listen: Listen<K, V, T, D>) -> Self {
120 Subscribe {
121 snapshot: Some(snapshot_parts),
122 listen,
123 }
124 }
125
126 #[instrument(level = "debug", fields(shard = %self.listen.handle.machine.shard_id()))]
134 pub async fn next(
135 &mut self,
136 listen_retry: Option<RetryParameters>,
138 ) -> Vec<ListenEvent<T, LeasedBatchPart<T>>> {
139 match self.snapshot.take() {
140 Some(parts) => vec![ListenEvent::Updates(parts)],
141 None => {
142 let (parts, upper) = self.listen.next(listen_retry).await;
143 vec![ListenEvent::Updates(parts), ListenEvent::Progress(upper)]
144 }
145 }
146 }
147}
148
149impl<K, V, T, D> Subscribe<K, V, T, D>
150where
151 K: Debug + Codec,
152 V: Debug + Codec,
153 T: Timestamp + Lattice + Codec64 + Sync,
154 D: Semigroup + Codec64 + Send + Sync,
155{
156 #[instrument(level = "debug", fields(shard = %self.listen.handle.machine.shard_id()))]
159 pub async fn fetch_next(
160 &mut self,
161 ) -> Vec<ListenEvent<T, ((Result<K, String>, Result<V, String>), T, D)>> {
162 let events = self.next(None).await;
163 let new_len = events
164 .iter()
165 .map(|event| match event {
166 ListenEvent::Updates(parts) => parts.len(),
167 ListenEvent::Progress(_) => 1,
168 })
169 .sum();
170 let mut ret = Vec::with_capacity(new_len);
171 for event in events {
172 match event {
173 ListenEvent::Updates(parts) => {
174 for part in parts {
175 let fetched_part = self.listen.fetch_batch_part(part).await;
176 let updates = fetched_part.collect::<Vec<_>>();
177 if !updates.is_empty() {
178 ret.push(ListenEvent::Updates(updates));
179 }
180 }
181 }
182 ListenEvent::Progress(progress) => ret.push(ListenEvent::Progress(progress)),
183 }
184 }
185 ret
186 }
187
188 pub async fn fetch_batch_part(&mut self, part: LeasedBatchPart<T>) -> FetchedPart<K, V, T, D> {
190 self.listen.fetch_batch_part(part).await
191 }
192}
193
194impl<K, V, T, D> Subscribe<K, V, T, D>
195where
196 K: Debug + Codec,
197 V: Debug + Codec,
198 T: Timestamp + Lattice + Codec64 + Sync,
199 D: Semigroup + Codec64 + Send + Sync,
200{
201 pub async fn expire(mut self) {
210 let _ = self.snapshot.take(); self.listen.expire().await;
212 }
213}
214
215#[derive(Debug, PartialEq)]
219pub enum ListenEvent<T, D> {
220 Progress(Antichain<T>),
222 Updates(Vec<D>),
224}
225
226#[derive(Debug)]
228pub struct Listen<K: Codec, V: Codec, T, D> {
229 handle: ReadHandle<K, V, T, D>,
230 watch: StateWatch<K, V, T, D>,
231
232 as_of: Antichain<T>,
233 since: Antichain<T>,
234 frontier: Antichain<T>,
235}
236
237impl<K, V, T, D> Listen<K, V, T, D>
238where
239 K: Debug + Codec,
240 V: Debug + Codec,
241 T: Timestamp + Lattice + Codec64 + Sync,
242 D: Semigroup + Codec64 + Send + Sync,
243{
244 async fn new(mut handle: ReadHandle<K, V, T, D>, as_of: Antichain<T>) -> Self {
245 let since = as_of.clone();
246 handle.downgrade_since(&since).await;
250
251 let watch = handle.machine.applier.watch();
252 Listen {
253 handle,
254 watch,
255 since,
256 frontier: as_of.clone(),
257 as_of,
258 }
259 }
260
261 pub fn frontier(&self) -> &Antichain<T> {
263 &self.frontier
264 }
265
266 pub async fn next(
274 &mut self,
275 retry: Option<RetryParameters>,
277 ) -> (Vec<LeasedBatchPart<T>>, Antichain<T>) {
278 let batch = self
279 .handle
280 .machine
281 .next_listen_batch(
282 &self.frontier,
283 &mut self.watch,
284 Some(&self.handle.reader_id),
285 retry,
286 )
287 .await;
288
289 assert!(
299 PartialOrder::less_than(batch.desc.since(), &self.frontier)
300 || (self.frontier == self.as_of
305 && PartialOrder::less_equal(batch.desc.since(), &self.frontier)),
306 "Listen on {} received a batch {:?} advanced past the listen frontier {:?}",
307 self.handle.machine.shard_id(),
308 batch.desc,
309 self.frontier
310 );
311
312 let new_frontier = batch.desc.upper().clone();
313
314 for x in self.frontier.elements().iter() {
338 let less_than_upper = batch.desc.upper().elements().iter().any(|u| x.less_than(u));
339 if less_than_upper {
340 self.since.join_assign(&Antichain::from_elem(x.clone()));
341 }
342 }
343
344 let filter = FetchBatchFilter::Listen {
349 as_of: self.as_of.clone(),
350 lower: self.frontier.clone(),
351 };
352 let parts = self.handle.lease_batch_parts(batch, filter).collect().await;
353
354 self.handle.maybe_downgrade_since(&self.since).await;
355
356 self.frontier = new_frontier;
359
360 (parts, self.frontier.clone())
361 }
362}
363
364impl<K, V, T, D> Listen<K, V, T, D>
365where
366 K: Debug + Codec,
367 V: Debug + Codec,
368 T: Timestamp + Lattice + Codec64 + Sync,
369 D: Semigroup + Codec64 + Send + Sync,
370{
371 #[instrument(level = "debug", name = "listen::next", fields(shard = %self.handle.machine.shard_id()))]
382 pub async fn fetch_next(
383 &mut self,
384 ) -> Vec<ListenEvent<T, ((Result<K, String>, Result<V, String>), T, D)>> {
385 let (parts, progress) = self.next(None).await;
386 let mut ret = Vec::with_capacity(parts.len() + 1);
387 for part in parts {
388 let fetched_part = self.fetch_batch_part(part).await;
389 let updates = fetched_part.collect::<Vec<_>>();
390 if !updates.is_empty() {
391 ret.push(ListenEvent::Updates(updates));
392 }
393 }
394 ret.push(ListenEvent::Progress(progress));
395 ret
396 }
397
398 pub fn into_stream(
400 mut self,
401 ) -> impl Stream<Item = ListenEvent<T, ((Result<K, String>, Result<V, String>), T, D)>> {
402 async_stream::stream!({
403 loop {
404 for msg in self.fetch_next().await {
405 yield msg;
406 }
407 }
408 })
409 }
410
411 #[cfg(test)]
415 #[track_caller]
416 pub async fn read_until(
417 &mut self,
418 ts: &T,
419 ) -> (
420 Vec<((Result<K, String>, Result<V, String>), T, D)>,
421 Antichain<T>,
422 ) {
423 let mut updates = Vec::new();
424 let mut frontier = Antichain::from_elem(T::minimum());
425 while self.frontier.less_than(ts) {
426 for event in self.fetch_next().await {
427 match event {
428 ListenEvent::Updates(mut x) => updates.append(&mut x),
429 ListenEvent::Progress(x) => frontier = x,
430 }
431 }
432 }
433 (updates, frontier)
436 }
437}
438
439impl<K, V, T, D> Listen<K, V, T, D>
440where
441 K: Debug + Codec,
442 V: Debug + Codec,
443 T: Timestamp + Lattice + Codec64 + Sync,
444 D: Semigroup + Codec64 + Send + Sync,
445{
446 async fn fetch_batch_part(&mut self, part: LeasedBatchPart<T>) -> FetchedPart<K, V, T, D> {
451 let fetched_part = fetch_leased_part(
452 &self.handle.cfg,
453 &part,
454 self.handle.blob.as_ref(),
455 Arc::clone(&self.handle.metrics),
456 &self.handle.metrics.read.listen,
457 &self.handle.machine.applier.shard_metrics,
458 &self.handle.reader_id,
459 self.handle.read_schemas.clone(),
460 &mut self.handle.schema_cache,
461 )
462 .await;
463 fetched_part
464 }
465
466 pub async fn expire(self) {
475 self.handle.expire().await
476 }
477}
478
479#[derive(Debug)]
500pub struct ReadHandle<K: Codec, V: Codec, T, D> {
501 pub(crate) cfg: PersistConfig,
502 pub(crate) metrics: Arc<Metrics>,
503 pub(crate) machine: Machine<K, V, T, D>,
504 pub(crate) gc: GarbageCollector<K, V, T, D>,
505 pub(crate) blob: Arc<dyn Blob>,
506 pub(crate) reader_id: LeasedReaderId,
507 pub(crate) read_schemas: Schemas<K, V>,
508 pub(crate) schema_cache: SchemaCache<K, V, T, D>,
509
510 since: Antichain<T>,
511 pub(crate) last_heartbeat: EpochMillis,
512 pub(crate) leased_seqnos: BTreeMap<SeqNo, Lease>,
513 pub(crate) unexpired_state: Option<UnexpiredReadHandleState>,
514}
515
516pub(crate) const READER_LEASE_DURATION: Config<Duration> = Config::new(
519 "persist_reader_lease_duration",
520 Duration::from_secs(60 * 15),
521 "The time after which we'll clean up stale read leases",
522);
523
524impl<K, V, T, D> ReadHandle<K, V, T, D>
525where
526 K: Debug + Codec,
527 V: Debug + Codec,
528 T: Timestamp + Lattice + Codec64 + Sync,
529 D: Semigroup + Codec64 + Send + Sync,
530{
531 pub(crate) async fn new(
532 cfg: PersistConfig,
533 metrics: Arc<Metrics>,
534 machine: Machine<K, V, T, D>,
535 gc: GarbageCollector<K, V, T, D>,
536 blob: Arc<dyn Blob>,
537 reader_id: LeasedReaderId,
538 read_schemas: Schemas<K, V>,
539 since: Antichain<T>,
540 last_heartbeat: EpochMillis,
541 ) -> Self {
542 let schema_cache = machine.applier.schema_cache();
543 let expire_fn = Self::expire_fn(machine.clone(), gc.clone(), reader_id.clone());
544 ReadHandle {
545 cfg,
546 metrics: Arc::clone(&metrics),
547 machine: machine.clone(),
548 gc: gc.clone(),
549 blob,
550 reader_id: reader_id.clone(),
551 read_schemas,
552 schema_cache,
553 since,
554 last_heartbeat,
555 leased_seqnos: BTreeMap::new(),
556 unexpired_state: Some(UnexpiredReadHandleState {
557 expire_fn,
558 _heartbeat_tasks: machine
559 .start_reader_heartbeat_tasks(reader_id, gc)
560 .await
561 .into_iter()
562 .map(JoinHandle::abort_on_drop)
563 .collect(),
564 }),
565 }
566 }
567
568 pub fn shard_id(&self) -> ShardId {
570 self.machine.shard_id()
571 }
572
573 pub fn since(&self) -> &Antichain<T> {
577 &self.since
578 }
579
580 fn outstanding_seqno(&mut self) -> Option<SeqNo> {
581 while let Some(first) = self.leased_seqnos.first_entry() {
582 if first.get().count() <= 1 {
583 first.remove();
584 } else {
585 return Some(*first.key());
586 }
587 }
588 None
589 }
590
591 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
602 pub async fn downgrade_since(&mut self, new_since: &Antichain<T>) {
603 let outstanding_seqno = self.outstanding_seqno();
605
606 let heartbeat_ts = (self.cfg.now)();
607 let (_seqno, current_reader_since, maintenance) = self
608 .machine
609 .downgrade_since(&self.reader_id, outstanding_seqno, new_since, heartbeat_ts)
610 .await;
611
612 if let Some(outstanding_seqno) = outstanding_seqno {
614 let seqnos_held = _seqno.0.saturating_sub(outstanding_seqno.0);
615 const SEQNOS_HELD_THRESHOLD: u64 = 60 * 60;
618 if seqnos_held >= SEQNOS_HELD_THRESHOLD {
619 tracing::info!(
620 "{} reader {} holding an unexpected number of seqnos {} vs {}: {:?}. bt: {:?}",
621 self.machine.shard_id(),
622 self.reader_id,
623 outstanding_seqno,
624 _seqno,
625 self.leased_seqnos.keys().take(10).collect::<Vec<_>>(),
626 Backtrace::force_capture(),
629 );
630 }
631 }
632
633 self.since = current_reader_since.0;
634 self.last_heartbeat = heartbeat_ts;
637 maintenance.start_performing(&self.machine, &self.gc);
638 }
639
640 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
657 pub async fn listen(self, as_of: Antichain<T>) -> Result<Listen<K, V, T, D>, Since<T>> {
658 let () = self.machine.verify_listen(&as_of)?;
659 Ok(Listen::new(self, as_of).await)
660 }
661
662 #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
676 pub async fn snapshot(
677 &mut self,
678 as_of: Antichain<T>,
679 ) -> Result<Vec<LeasedBatchPart<T>>, Since<T>> {
680 let batches = self.machine.snapshot(&as_of).await?;
681
682 if !PartialOrder::less_equal(self.since(), &as_of) {
683 return Err(Since(self.since().clone()));
684 }
685
686 let filter = FetchBatchFilter::Snapshot { as_of };
687 let mut leased_parts = Vec::new();
688 for batch in batches {
689 leased_parts.extend(
694 self.lease_batch_parts(batch, filter.clone())
695 .collect::<Vec<_>>()
696 .await,
697 );
698 }
699 Ok(leased_parts)
700 }
701
702 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
708 pub async fn subscribe(
709 mut self,
710 as_of: Antichain<T>,
711 ) -> Result<Subscribe<K, V, T, D>, Since<T>> {
712 let snapshot_parts = self.snapshot(as_of.clone()).await?;
713 let listen = self.listen(as_of.clone()).await?;
714 Ok(Subscribe::new(snapshot_parts, listen))
715 }
716
717 fn lease_batch_part(
718 &mut self,
719 desc: Description<T>,
720 part: BatchPart<T>,
721 filter: FetchBatchFilter<T>,
722 ) -> LeasedBatchPart<T> {
723 LeasedBatchPart {
724 metrics: Arc::clone(&self.metrics),
725 shard_id: self.machine.shard_id(),
726 reader_id: self.reader_id.clone(),
727 filter,
728 desc,
729 part,
730 leased_seqno: self.machine.seqno(),
731 lease: Some(self.lease_seqno()),
732 filter_pushdown_audit: false,
733 }
734 }
735
736 fn lease_batch_parts(
737 &mut self,
738 batch: HollowBatch<T>,
739 filter: FetchBatchFilter<T>,
740 ) -> impl Stream<Item = LeasedBatchPart<T>> + '_ {
741 stream! {
742 let blob = Arc::clone(&self.blob);
743 let metrics = Arc::clone(&self.metrics);
744 let desc = batch.desc.clone();
745 for await part in batch.part_stream(self.shard_id(), &*blob, &*metrics) {
746 yield self.lease_batch_part(desc.clone(), part.expect("leased part").into_owned(), filter.clone())
747 }
748 }
749 }
750
751 fn lease_seqno(&mut self) -> Lease {
755 let seqno = self.machine.seqno();
756 let lease = self.leased_seqnos.entry(seqno).or_default();
757 lease.clone()
758 }
759
760 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
763 pub async fn clone(&self, purpose: &str) -> Self {
764 let new_reader_id = LeasedReaderId::new();
765 let machine = self.machine.clone();
766 let gc = self.gc.clone();
767 let heartbeat_ts = (self.cfg.now)();
768 let (reader_state, maintenance) = machine
769 .register_leased_reader(
770 &new_reader_id,
771 purpose,
772 READER_LEASE_DURATION.get(&self.cfg),
773 heartbeat_ts,
774 false,
775 )
776 .await;
777 maintenance.start_performing(&machine, &gc);
778 assert!(PartialOrder::less_equal(&reader_state.since, &self.since));
782 let new_reader = ReadHandle::new(
783 self.cfg.clone(),
784 Arc::clone(&self.metrics),
785 machine,
786 gc,
787 Arc::clone(&self.blob),
788 new_reader_id,
789 self.read_schemas.clone(),
790 reader_state.since,
791 heartbeat_ts,
792 )
793 .await;
794 new_reader
795 }
796
797 pub async fn maybe_downgrade_since(&mut self, new_since: &Antichain<T>) {
803 let min_elapsed = READER_LEASE_DURATION.get(&self.cfg) / 4;
804 let elapsed_since_last_heartbeat =
805 Duration::from_millis((self.cfg.now)().saturating_sub(self.last_heartbeat));
806 if elapsed_since_last_heartbeat >= min_elapsed {
807 self.downgrade_since(new_since).await;
808 }
809 }
810
811 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
820 pub async fn expire(mut self) {
821 let Some(unexpired_state) = self.unexpired_state.take() else {
826 return;
827 };
828 unexpired_state.expire_fn.0().await;
829 }
830
831 fn expire_fn(
832 machine: Machine<K, V, T, D>,
833 gc: GarbageCollector<K, V, T, D>,
834 reader_id: LeasedReaderId,
835 ) -> ExpireFn {
836 ExpireFn(Box::new(move || {
837 Box::pin(async move {
838 let (_, maintenance) = machine.expire_leased_reader(&reader_id).await;
839 maintenance.start_performing(&machine, &gc);
840 })
841 }))
842 }
843
844 #[cfg(test)]
846 #[track_caller]
847 pub async fn expect_listen(self, as_of: T) -> Listen<K, V, T, D> {
848 self.listen(Antichain::from_elem(as_of))
849 .await
850 .expect("cannot serve requested as_of")
851 }
852}
853
854#[derive(Debug)]
856pub(crate) struct UnexpiredReadHandleState {
857 expire_fn: ExpireFn,
858 pub(crate) _heartbeat_tasks: Vec<AbortOnDropHandle<()>>,
859}
860
861#[derive(Debug)]
867pub struct Cursor<K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64> {
868 consolidator: CursorConsolidator<K, V, T, D>,
869 _lease: Lease,
870 read_schemas: Schemas<K, V>,
871}
872
873#[derive(Debug)]
874enum CursorConsolidator<K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64> {
875 Structured {
876 consolidator: Consolidator<T, D, StructuredSort<K, V, T, D>>,
877 max_len: usize,
878 max_bytes: usize,
879 },
880}
881
882impl<K, V, T, D> Cursor<K, V, T, D>
883where
884 K: Debug + Codec + Ord,
885 V: Debug + Codec + Ord,
886 T: Timestamp + Lattice + Codec64 + Sync,
887 D: Semigroup + Ord + Codec64 + Send + Sync,
888{
889 pub async fn next(
891 &mut self,
892 ) -> Option<impl Iterator<Item = ((Result<K, String>, Result<V, String>), T, D)> + '_> {
893 match &mut self.consolidator {
894 CursorConsolidator::Structured {
895 consolidator,
896 max_len,
897 max_bytes,
898 } => {
899 let part = consolidator
900 .next_chunk(*max_len, *max_bytes)
901 .await
902 .expect("fetching a leased part")?;
903 let key_decoder = self
904 .read_schemas
905 .key
906 .decoder_any(part.key.as_ref())
907 .expect("ok");
908 let val_decoder = self
909 .read_schemas
910 .val
911 .decoder_any(part.val.as_ref())
912 .expect("ok");
913 let iter = (0..part.len()).map(move |i| {
914 let mut k = K::default();
915 let mut v = V::default();
916 key_decoder.decode(i, &mut k);
917 val_decoder.decode(i, &mut v);
918 let t = T::decode(part.time.value(i).to_le_bytes());
919 let d = D::decode(part.diff.value(i).to_le_bytes());
920 ((Ok(k), Ok(v)), t, d)
921 });
922
923 Some(iter)
924 }
925 }
926 }
927}
928
929impl<K, V, T, D> ReadHandle<K, V, T, D>
930where
931 K: Debug + Codec + Ord,
932 V: Debug + Codec + Ord,
933 T: Timestamp + Lattice + Codec64 + Sync,
934 D: Semigroup + Ord + Codec64 + Send + Sync,
935{
936 pub async fn snapshot_and_fetch(
950 &mut self,
951 as_of: Antichain<T>,
952 ) -> Result<Vec<((Result<K, String>, Result<V, String>), T, D)>, Since<T>> {
953 let mut cursor = self.snapshot_cursor(as_of, |_| true).await?;
954 let mut contents = Vec::new();
955 while let Some(iter) = cursor.next().await {
956 contents.extend(iter);
957 }
958
959 let old_len = contents.len();
962 consolidate_updates(&mut contents);
963 if old_len != contents.len() {
964 self.machine
966 .applier
967 .shard_metrics
968 .unconsolidated_snapshot
969 .inc();
970 }
971
972 Ok(contents)
973 }
974
975 pub async fn snapshot_cursor(
982 &mut self,
983 as_of: Antichain<T>,
984 should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
985 ) -> Result<Cursor<K, V, T, D>, Since<T>> {
986 let batches = self.machine.snapshot(&as_of).await?;
987
988 let context = format!("{}[as_of={:?}]", self.shard_id(), as_of.elements());
989 let filter = FetchBatchFilter::Snapshot {
990 as_of: as_of.clone(),
991 };
992 let lease = self.lease_seqno();
993
994 let consolidator = {
995 let mut consolidator = Consolidator::new(
996 context,
997 self.shard_id(),
998 StructuredSort::new(self.read_schemas.clone()),
999 Arc::clone(&self.blob),
1000 Arc::clone(&self.metrics),
1001 Arc::clone(&self.machine.applier.shard_metrics),
1002 self.metrics.read.snapshot.clone(),
1003 filter,
1004 COMPACTION_MEMORY_BOUND_BYTES.get(&self.cfg),
1005 );
1006 for batch in batches {
1007 for (meta, run) in batch.runs() {
1008 consolidator.enqueue_run(
1009 &batch.desc,
1010 meta,
1011 run.into_iter()
1012 .filter(|p| should_fetch_part(p.stats()))
1013 .cloned(),
1014 );
1015 }
1016 }
1017 CursorConsolidator::Structured {
1018 consolidator,
1019 max_len: self.cfg.compaction_yield_after_n_updates,
1023 max_bytes: BLOB_TARGET_SIZE.get(&self.cfg).max(1),
1024 }
1025 };
1026
1027 Ok(Cursor {
1028 consolidator,
1029 _lease: lease,
1030 read_schemas: self.read_schemas.clone(),
1031 })
1032 }
1033
1034 pub fn snapshot_stats(
1046 &self,
1047 as_of: Option<Antichain<T>>,
1048 ) -> impl Future<Output = Result<SnapshotStats, Since<T>>> + Send + 'static {
1049 let machine = self.machine.clone();
1050 async move {
1051 let batches = match as_of {
1052 Some(as_of) => machine.snapshot(&as_of).await?,
1053 None => machine.applier.all_batches(),
1054 };
1055 let num_updates = batches.iter().map(|b| b.len).sum();
1056 Ok(SnapshotStats {
1057 shard_id: machine.shard_id(),
1058 num_updates,
1059 })
1060 }
1061 }
1062
1063 pub async fn snapshot_parts_stats(
1074 &self,
1075 as_of: Antichain<T>,
1076 ) -> Result<SnapshotPartsStats, Since<T>> {
1077 let batches = self.machine.snapshot(&as_of).await?;
1078 let parts = stream::iter(&batches)
1079 .flat_map(|b| b.part_stream(self.shard_id(), &*self.blob, &*self.metrics))
1080 .map(|p| {
1081 let p = p.expect("live batch");
1082 SnapshotPartStats {
1083 encoded_size_bytes: p.encoded_size_bytes(),
1084 stats: p.stats().cloned(),
1085 }
1086 })
1087 .collect()
1088 .await;
1089 Ok(SnapshotPartsStats {
1090 metrics: Arc::clone(&self.machine.applier.metrics),
1091 shard_id: self.machine.shard_id(),
1092 parts,
1093 })
1094 }
1095}
1096
1097impl<K, V, T, D> ReadHandle<K, V, T, D>
1098where
1099 K: Debug + Codec + Ord,
1100 V: Debug + Codec + Ord,
1101 T: Timestamp + Lattice + Codec64 + Sync,
1102 D: Semigroup + Codec64 + Send + Sync,
1103{
1104 pub async fn snapshot_and_stream(
1109 &mut self,
1110 as_of: Antichain<T>,
1111 ) -> Result<
1112 impl Stream<Item = ((Result<K, String>, Result<V, String>), T, D)> + use<K, V, T, D>,
1113 Since<T>,
1114 > {
1115 let snap = self.snapshot(as_of).await?;
1116
1117 let blob = Arc::clone(&self.blob);
1118 let metrics = Arc::clone(&self.metrics);
1119 let snapshot_metrics = self.metrics.read.snapshot.clone();
1120 let shard_metrics = Arc::clone(&self.machine.applier.shard_metrics);
1121 let reader_id = self.reader_id.clone();
1122 let schemas = self.read_schemas.clone();
1123 let mut schema_cache = self.schema_cache.clone();
1124 let persist_cfg = self.cfg.clone();
1125 let stream = async_stream::stream! {
1126 for part in snap {
1127 let mut fetched_part = fetch_leased_part(
1128 &persist_cfg,
1129 &part,
1130 blob.as_ref(),
1131 Arc::clone(&metrics),
1132 &snapshot_metrics,
1133 &shard_metrics,
1134 &reader_id,
1135 schemas.clone(),
1136 &mut schema_cache,
1137 )
1138 .await;
1139
1140 while let Some(next) = fetched_part.next() {
1141 yield next;
1142 }
1143 }
1144 };
1145
1146 Ok(stream)
1147 }
1148}
1149
1150impl<K, V, T, D> ReadHandle<K, V, T, D>
1151where
1152 K: Debug + Codec + Ord,
1153 V: Debug + Codec + Ord,
1154 T: Timestamp + Lattice + Codec64 + Ord + Sync,
1155 D: Semigroup + Ord + Codec64 + Send + Sync,
1156{
1157 #[cfg(test)]
1160 #[track_caller]
1161 pub async fn expect_snapshot_and_fetch(
1162 &mut self,
1163 as_of: T,
1164 ) -> Vec<((Result<K, String>, Result<V, String>), T, D)> {
1165 let mut ret = self
1166 .snapshot_and_fetch(Antichain::from_elem(as_of))
1167 .await
1168 .expect("cannot serve requested as_of");
1169
1170 ret.sort();
1171 ret
1172 }
1173}
1174
1175impl<K: Codec, V: Codec, T, D> Drop for ReadHandle<K, V, T, D> {
1176 fn drop(&mut self) {
1177 let Some(unexpired_state) = self.unexpired_state.take() else {
1182 return;
1183 };
1184
1185 let handle = match Handle::try_current() {
1186 Ok(x) => x,
1187 Err(_) => {
1188 warn!(
1189 "ReadHandle {} dropped without being explicitly expired, falling back to lease timeout",
1190 self.reader_id
1191 );
1192 return;
1193 }
1194 };
1195 let expire_span = debug_span!("drop::expire");
1201 handle.spawn_named(
1202 || format!("ReadHandle::expire ({})", self.reader_id),
1203 unexpired_state.expire_fn.0().instrument(expire_span),
1204 );
1205 }
1206}
1207
1208#[cfg(test)]
1209mod tests {
1210 use std::pin;
1211 use std::str::FromStr;
1212
1213 use mz_dyncfg::ConfigUpdates;
1214 use mz_ore::cast::CastFrom;
1215 use mz_ore::metrics::MetricsRegistry;
1216 use mz_persist::mem::{MemBlob, MemBlobConfig, MemConsensus};
1217 use mz_persist::unreliable::{UnreliableConsensus, UnreliableHandle};
1218 use serde::{Deserialize, Serialize};
1219 use serde_json::json;
1220 use tokio_stream::StreamExt;
1221
1222 use crate::async_runtime::IsolatedRuntime;
1223 use crate::batch::BLOB_TARGET_SIZE;
1224 use crate::cache::StateCache;
1225 use crate::internal::metrics::Metrics;
1226 use crate::rpc::NoopPubSubSender;
1227 use crate::tests::{all_ok, new_test_client};
1228 use crate::{Diagnostics, PersistClient, PersistConfig, ShardId};
1229
1230 use super::*;
1231
1232 #[mz_persist_proc::test(tokio::test)]
1234 #[cfg_attr(miri, ignore)] async fn drop_unused_subscribe(dyncfgs: ConfigUpdates) {
1236 let data = [
1237 (("0".to_owned(), "zero".to_owned()), 0, 1),
1238 (("1".to_owned(), "one".to_owned()), 1, 1),
1239 (("2".to_owned(), "two".to_owned()), 2, 1),
1240 ];
1241
1242 let (mut write, read) = new_test_client(&dyncfgs)
1243 .await
1244 .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1245 .await;
1246
1247 write.expect_compare_and_append(&data[0..1], 0, 1).await;
1248 write.expect_compare_and_append(&data[1..2], 1, 2).await;
1249 write.expect_compare_and_append(&data[2..3], 2, 3).await;
1250
1251 let subscribe = read
1252 .subscribe(timely::progress::Antichain::from_elem(2))
1253 .await
1254 .unwrap();
1255 assert!(
1256 !subscribe.snapshot.as_ref().unwrap().is_empty(),
1257 "snapshot must have batches for test to be meaningful"
1258 );
1259 drop(subscribe);
1260 }
1261
1262 #[mz_persist_proc::test(tokio::test)]
1264 #[cfg_attr(miri, ignore)] async fn streaming_consolidate(dyncfgs: ConfigUpdates) {
1266 let data = &[
1267 (("k".to_owned(), "v".to_owned()), 0, 1),
1269 (("k".to_owned(), "v".to_owned()), 1, 1),
1270 (("k".to_owned(), "v".to_owned()), 2, 1),
1271 (("k2".to_owned(), "v".to_owned()), 0, 1),
1273 (("k2".to_owned(), "v".to_owned()), 1, -1),
1274 ];
1275
1276 let (mut write, read) = {
1277 let client = new_test_client(&dyncfgs).await;
1278 client.cfg.set_config(&BLOB_TARGET_SIZE, 1000); client
1280 .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1281 .await
1282 };
1283
1284 write.expect_compare_and_append(data, 0, 5).await;
1285
1286 let mut snapshot = read
1287 .subscribe(timely::progress::Antichain::from_elem(4))
1288 .await
1289 .unwrap();
1290
1291 let mut updates = vec![];
1292 'outer: loop {
1293 for event in snapshot.fetch_next().await {
1294 match event {
1295 ListenEvent::Progress(t) => {
1296 if !t.less_than(&4) {
1297 break 'outer;
1298 }
1299 }
1300 ListenEvent::Updates(data) => {
1301 updates.extend(data);
1302 }
1303 }
1304 }
1305 }
1306 assert_eq!(
1307 updates,
1308 &[((Ok("k".to_owned()), Ok("v".to_owned())), 4u64, 3i64)],
1309 )
1310 }
1311
1312 #[mz_persist_proc::test(tokio::test)]
1313 #[cfg_attr(miri, ignore)] async fn snapshot_and_stream(dyncfgs: ConfigUpdates) {
1315 let data = &mut [
1316 (("k1".to_owned(), "v1".to_owned()), 0, 1),
1317 (("k2".to_owned(), "v2".to_owned()), 1, 1),
1318 (("k3".to_owned(), "v3".to_owned()), 2, 1),
1319 (("k4".to_owned(), "v4".to_owned()), 2, 1),
1320 (("k5".to_owned(), "v5".to_owned()), 3, 1),
1321 ];
1322
1323 let (mut write, mut read) = {
1324 let client = new_test_client(&dyncfgs).await;
1325 client.cfg.set_config(&BLOB_TARGET_SIZE, 0); client
1327 .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1328 .await
1329 };
1330
1331 write.expect_compare_and_append(&data[0..2], 0, 2).await;
1332 write.expect_compare_and_append(&data[2..4], 2, 3).await;
1333 write.expect_compare_and_append(&data[4..], 3, 4).await;
1334
1335 let as_of = Antichain::from_elem(3);
1336 let mut snapshot = pin::pin!(read.snapshot_and_stream(as_of.clone()).await.unwrap());
1337
1338 let mut snapshot_rows = vec![];
1339 while let Some(((k, v), t, d)) = snapshot.next().await {
1340 snapshot_rows.push(((k.expect("valid key"), v.expect("valid key")), t, d));
1341 }
1342
1343 for ((_k, _v), t, _d) in data.as_mut_slice() {
1344 t.advance_by(as_of.borrow());
1345 }
1346
1347 assert_eq!(data.as_slice(), snapshot_rows.as_slice());
1348 }
1349
1350 #[mz_persist_proc::test(tokio::test)]
1352 #[cfg_attr(miri, ignore)] async fn seqno_leases(dyncfgs: ConfigUpdates) {
1354 let mut data = vec![];
1355 for i in 0..20 {
1356 data.push(((i.to_string(), i.to_string()), i, 1))
1357 }
1358
1359 let shard_id = ShardId::new();
1360
1361 let client = new_test_client(&dyncfgs).await;
1362 let (mut write, read) = client
1363 .expect_open::<String, String, u64, i64>(shard_id)
1364 .await;
1365
1366 let mut offset = 0;
1368 let mut width = 2;
1369
1370 for i in offset..offset + width {
1371 write
1372 .expect_compare_and_append(
1373 &data[i..i + 1],
1374 u64::cast_from(i),
1375 u64::cast_from(i) + 1,
1376 )
1377 .await;
1378 }
1379 offset += width;
1380
1381 let mut fetcher = client
1383 .create_batch_fetcher::<String, String, u64, i64>(
1384 shard_id,
1385 Default::default(),
1386 Default::default(),
1387 false,
1388 Diagnostics::for_tests(),
1389 )
1390 .await
1391 .unwrap();
1392
1393 let mut subscribe = read
1394 .subscribe(timely::progress::Antichain::from_elem(1))
1395 .await
1396 .expect("cannot serve requested as_of");
1397
1398 let original_seqno_since = subscribe.listen.handle.machine.applier.seqno_since();
1400
1401 let mut parts = vec![];
1402
1403 width = 4;
1404 for i in offset..offset + width {
1406 for event in subscribe.next(None).await {
1407 if let ListenEvent::Updates(mut new_parts) = event {
1408 parts.append(&mut new_parts);
1409 subscribe
1412 .listen
1413 .handle
1414 .downgrade_since(&subscribe.listen.since)
1415 .await;
1416 }
1417 }
1418
1419 write
1420 .expect_compare_and_append(
1421 &data[i..i + 1],
1422 u64::cast_from(i),
1423 u64::cast_from(i) + 1,
1424 )
1425 .await;
1426
1427 assert_eq!(
1429 subscribe.listen.handle.machine.applier.seqno_since(),
1430 original_seqno_since
1431 );
1432 }
1433
1434 offset += width;
1435
1436 let mut seqno_since = subscribe.listen.handle.machine.applier.seqno_since();
1437
1438 assert_eq!(seqno_since, original_seqno_since);
1440
1441 let mut subsequent_parts = vec![];
1444
1445 let mut this_seqno = SeqNo::minimum();
1449
1450 for (mut i, part) in parts.into_iter().enumerate() {
1452 let part_seqno = part.leased_seqno;
1453 let last_seqno = this_seqno;
1454 this_seqno = part_seqno;
1455 assert!(this_seqno >= last_seqno);
1456
1457 let _ = fetcher.fetch_leased_part(&part).await;
1458 drop(part);
1459
1460 for event in subscribe.next(None).await {
1462 if let ListenEvent::Updates(parts) = event {
1463 for part in parts {
1464 if let (_, Some(lease)) = part.into_exchangeable_part() {
1465 subsequent_parts.push(lease);
1466 }
1467 }
1468 }
1469 }
1470
1471 subscribe
1472 .listen
1473 .handle
1474 .downgrade_since(&subscribe.listen.since)
1475 .await;
1476
1477 i += offset;
1479 write
1480 .expect_compare_and_append(
1481 &data[i..i + 1],
1482 u64::cast_from(i),
1483 u64::cast_from(i) + 1,
1484 )
1485 .await;
1486
1487 let expect_downgrade = subscribe.listen.handle.outstanding_seqno() > Some(part_seqno);
1490
1491 let new_seqno_since = subscribe.listen.handle.machine.applier.seqno_since();
1492 if expect_downgrade {
1493 assert!(new_seqno_since > seqno_since);
1494 } else {
1495 assert_eq!(new_seqno_since, seqno_since);
1496 }
1497 seqno_since = new_seqno_since;
1498 }
1499
1500 assert!(seqno_since > original_seqno_since);
1502
1503 drop(subsequent_parts);
1505 drop(subscribe);
1506 }
1507
1508 #[mz_ore::test]
1509 fn reader_id_human_readable_serde() {
1510 #[derive(Debug, Serialize, Deserialize)]
1511 struct Container {
1512 reader_id: LeasedReaderId,
1513 }
1514
1515 let id =
1517 LeasedReaderId::from_str("r00000000-1234-5678-0000-000000000000").expect("valid id");
1518 assert_eq!(
1519 id,
1520 serde_json::from_value(serde_json::to_value(id.clone()).expect("serializable"))
1521 .expect("deserializable")
1522 );
1523
1524 assert_eq!(
1526 id,
1527 serde_json::from_str("\"r00000000-1234-5678-0000-000000000000\"")
1528 .expect("deserializable")
1529 );
1530
1531 let json = json!({ "reader_id": id });
1533 assert_eq!(
1534 "{\"reader_id\":\"r00000000-1234-5678-0000-000000000000\"}",
1535 &json.to_string()
1536 );
1537 let container: Container = serde_json::from_value(json).expect("deserializable");
1538 assert_eq!(container.reader_id, id);
1539 }
1540
1541 #[mz_ore::test(tokio::test)]
1545 #[cfg_attr(miri, ignore)] async fn skip_consensus_fetch_optimization() {
1547 let data = vec![
1548 (("0".to_owned(), "zero".to_owned()), 0, 1),
1549 (("1".to_owned(), "one".to_owned()), 1, 1),
1550 (("2".to_owned(), "two".to_owned()), 2, 1),
1551 ];
1552
1553 let cfg = PersistConfig::new_for_tests();
1554 let blob = Arc::new(MemBlob::open(MemBlobConfig::default()));
1555 let consensus = Arc::new(MemConsensus::default());
1556 let unreliable = UnreliableHandle::default();
1557 unreliable.totally_available();
1558 let consensus = Arc::new(UnreliableConsensus::new(consensus, unreliable.clone()));
1559 let metrics = Arc::new(Metrics::new(&cfg, &MetricsRegistry::new()));
1560 let pubsub_sender = Arc::new(NoopPubSubSender);
1561 let (mut write, mut read) = PersistClient::new(
1562 cfg,
1563 blob,
1564 consensus,
1565 metrics,
1566 Arc::new(IsolatedRuntime::default()),
1567 Arc::new(StateCache::new_no_metrics()),
1568 pubsub_sender,
1569 )
1570 .expect("client construction failed")
1571 .expect_open::<String, String, u64, i64>(ShardId::new())
1572 .await;
1573
1574 write.expect_compare_and_append(&data[0..1], 0, 1).await;
1575 write.expect_compare_and_append(&data[1..2], 1, 2).await;
1576 write.expect_compare_and_append(&data[2..3], 2, 3).await;
1577
1578 let snapshot = read.expect_snapshot_and_fetch(2).await;
1579 let mut listen = read.expect_listen(0).await;
1580
1581 let listen_actual = listen.fetch_next().await;
1586 let expected_events = vec![ListenEvent::Progress(Antichain::from_elem(1))];
1587 assert_eq!(listen_actual, expected_events);
1588
1589 unreliable.totally_unavailable();
1592 assert_eq!(snapshot, all_ok(&data, 2));
1593 assert_eq!(
1594 listen.read_until(&3).await,
1595 (all_ok(&data[1..], 1), Antichain::from_elem(3))
1596 );
1597 }
1598}