1use async_stream::stream;
13use std::backtrace::Backtrace;
14use std::collections::BTreeMap;
15use std::fmt::Debug;
16use std::future::Future;
17use std::sync::Arc;
18use std::time::Duration;
19
20use differential_dataflow::consolidation::consolidate_updates;
21use differential_dataflow::difference::Monoid;
22use differential_dataflow::lattice::Lattice;
23use differential_dataflow::trace::Description;
24use futures::Stream;
25use futures_util::{StreamExt, stream};
26use mz_dyncfg::Config;
27use mz_ore::halt;
28use mz_ore::instrument;
29use mz_ore::now::EpochMillis;
30use mz_ore::task::{AbortOnDropHandle, JoinHandle, RuntimeExt};
31use mz_persist::location::{Blob, SeqNo};
32use mz_persist_types::columnar::{ColumnDecoder, Schema};
33use mz_persist_types::{Codec, Codec64};
34use proptest_derive::Arbitrary;
35use serde::{Deserialize, Serialize};
36use timely::PartialOrder;
37use timely::order::TotalOrder;
38use timely::progress::{Antichain, Timestamp};
39use tokio::runtime::Handle;
40use tracing::{Instrument, debug_span, warn};
41use uuid::Uuid;
42
43use crate::batch::BLOB_TARGET_SIZE;
44use crate::cfg::{COMPACTION_MEMORY_BOUND_BYTES, RetryParameters};
45use crate::fetch::FetchConfig;
46use crate::fetch::{FetchBatchFilter, FetchedPart, Lease, LeasedBatchPart, fetch_leased_part};
47use crate::internal::encoding::Schemas;
48use crate::internal::machine::{ExpireFn, Machine};
49use crate::internal::metrics::{Metrics, ReadMetrics, ShardMetrics};
50use crate::internal::state::{BatchPart, HollowBatch};
51use crate::internal::watch::StateWatch;
52use crate::iter::{Consolidator, StructuredSort};
53use crate::schema::SchemaCache;
54use crate::stats::{SnapshotPartStats, SnapshotPartsStats, SnapshotStats};
55use crate::{GarbageCollector, PersistConfig, ShardId, parse_id};
56
57pub use crate::internal::encoding::LazyPartStats;
58pub use crate::internal::state::Since;
59
60#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
62#[serde(try_from = "String", into = "String")]
63pub struct LeasedReaderId(pub(crate) [u8; 16]);
64
65impl std::fmt::Display for LeasedReaderId {
66 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67 write!(f, "r{}", Uuid::from_bytes(self.0))
68 }
69}
70
71impl std::fmt::Debug for LeasedReaderId {
72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 write!(f, "LeasedReaderId({})", Uuid::from_bytes(self.0))
74 }
75}
76
77impl std::str::FromStr for LeasedReaderId {
78 type Err = String;
79
80 fn from_str(s: &str) -> Result<Self, Self::Err> {
81 parse_id("r", "LeasedReaderId", s).map(LeasedReaderId)
82 }
83}
84
85impl From<LeasedReaderId> for String {
86 fn from(reader_id: LeasedReaderId) -> Self {
87 reader_id.to_string()
88 }
89}
90
91impl TryFrom<String> for LeasedReaderId {
92 type Error = String;
93
94 fn try_from(s: String) -> Result<Self, Self::Error> {
95 s.parse()
96 }
97}
98
99impl LeasedReaderId {
100 pub(crate) fn new() -> Self {
101 LeasedReaderId(*Uuid::new_v4().as_bytes())
102 }
103}
104
105#[derive(Debug)]
110pub struct Subscribe<K: Codec, V: Codec, T, D> {
111 snapshot: Option<Vec<LeasedBatchPart<T>>>,
112 listen: Listen<K, V, T, D>,
113}
114
115impl<K, V, T, D> Subscribe<K, V, T, D>
116where
117 K: Debug + Codec,
118 V: Debug + Codec,
119 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
120 D: Monoid + Codec64 + Send + Sync,
121{
122 fn new(snapshot_parts: Vec<LeasedBatchPart<T>>, listen: Listen<K, V, T, D>) -> Self {
123 Subscribe {
124 snapshot: Some(snapshot_parts),
125 listen,
126 }
127 }
128
129 #[instrument(level = "debug", fields(shard = %self.listen.handle.machine.shard_id()))]
137 pub async fn next(
138 &mut self,
139 listen_retry: Option<RetryParameters>,
141 ) -> Vec<ListenEvent<T, LeasedBatchPart<T>>> {
142 match self.snapshot.take() {
143 Some(parts) => vec![ListenEvent::Updates(parts)],
144 None => {
145 let (parts, upper) = self.listen.next(listen_retry).await;
146 vec![ListenEvent::Updates(parts), ListenEvent::Progress(upper)]
147 }
148 }
149 }
150}
151
152impl<K, V, T, D> Subscribe<K, V, T, D>
153where
154 K: Debug + Codec,
155 V: Debug + Codec,
156 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
157 D: Monoid + Codec64 + Send + Sync,
158{
159 #[instrument(level = "debug", fields(shard = %self.listen.handle.machine.shard_id()))]
162 pub async fn fetch_next(&mut self) -> Vec<ListenEvent<T, ((K, V), T, D)>> {
163 let events = self.next(None).await;
164 let new_len = events
165 .iter()
166 .map(|event| match event {
167 ListenEvent::Updates(parts) => parts.len(),
168 ListenEvent::Progress(_) => 1,
169 })
170 .sum();
171 let mut ret = Vec::with_capacity(new_len);
172 for event in events {
173 match event {
174 ListenEvent::Updates(parts) => {
175 for part in parts {
176 let fetched_part = self.listen.fetch_batch_part(part).await;
177 let updates = fetched_part.collect::<Vec<_>>();
178 if !updates.is_empty() {
179 ret.push(ListenEvent::Updates(updates));
180 }
181 }
182 }
183 ListenEvent::Progress(progress) => ret.push(ListenEvent::Progress(progress)),
184 }
185 }
186 ret
187 }
188
189 pub async fn fetch_batch_part(&mut self, part: LeasedBatchPart<T>) -> FetchedPart<K, V, T, D> {
191 self.listen.fetch_batch_part(part).await
192 }
193}
194
195impl<K, V, T, D> Subscribe<K, V, T, D>
196where
197 K: Debug + Codec,
198 V: Debug + Codec,
199 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
200 D: Monoid + Codec64 + Send + Sync,
201{
202 pub async fn expire(mut self) {
211 let _ = self.snapshot.take(); self.listen.expire().await;
213 }
214}
215
216#[derive(Debug, PartialEq)]
220pub enum ListenEvent<T, D> {
221 Progress(Antichain<T>),
223 Updates(Vec<D>),
225}
226
227#[derive(Debug)]
229pub struct Listen<K: Codec, V: Codec, T, D> {
230 handle: ReadHandle<K, V, T, D>,
231 watch: StateWatch<K, V, T, D>,
232
233 as_of: Antichain<T>,
234 since: Antichain<T>,
235 frontier: Antichain<T>,
236}
237
238impl<K, V, T, D> Listen<K, V, T, D>
239where
240 K: Debug + Codec,
241 V: Debug + Codec,
242 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
243 D: Monoid + Codec64 + Send + Sync,
244{
245 async fn new(
246 mut handle: ReadHandle<K, V, T, D>,
247 as_of: Antichain<T>,
248 ) -> Result<Self, Since<T>> {
249 let () = handle.machine.verify_listen(&as_of)?;
250
251 let since = as_of.clone();
252 if !PartialOrder::less_equal(handle.since(), &since) {
253 return Err(Since(handle.since().clone()));
256 }
257 handle.downgrade_since(&since).await;
261
262 let watch = handle.machine.applier.watch();
263 Ok(Listen {
264 handle,
265 watch,
266 since,
267 frontier: as_of.clone(),
268 as_of,
269 })
270 }
271
272 pub fn frontier(&self) -> &Antichain<T> {
274 &self.frontier
275 }
276
277 pub async fn next(
285 &mut self,
286 retry: Option<RetryParameters>,
288 ) -> (Vec<LeasedBatchPart<T>>, Antichain<T>) {
289 let batch = loop {
290 let min_elapsed = self.handle.heartbeat_duration();
291 let next_batch = self.handle.machine.next_listen_batch(
292 &self.frontier,
293 &mut self.watch,
294 Some(&self.handle.reader_id),
295 retry,
296 );
297 match tokio::time::timeout(min_elapsed, next_batch).await {
298 Ok(batch) => break batch,
299 Err(_elapsed) => {
300 self.handle.maybe_downgrade_since(&self.since).await;
301 }
302 }
303 };
304
305 let acceptable_desc = PartialOrder::less_than(batch.desc.since(), &self.frontier)
315 || (self.frontier == self.as_of
320 && PartialOrder::less_equal(batch.desc.since(), &self.frontier));
321 if !acceptable_desc {
322 let lease_state = self
323 .handle
324 .machine
325 .applier
326 .reader_lease(self.handle.reader_id.clone());
327 if let Some(lease) = lease_state {
328 panic!(
329 "Listen on {} received a batch {:?} advanced past the listen frontier {:?}, but the lease has not expired: {:?}",
330 self.handle.machine.shard_id(),
331 batch.desc,
332 self.frontier,
333 lease
334 )
335 } else {
336 halt!(
339 "Listen on {} received a batch {:?} advanced past the listen frontier {:?} after the reader has expired. \
340 This can happen in exceptional cases: a machine goes to sleep or is running out of memory or CPU, for example.",
341 self.handle.machine.shard_id(),
342 batch.desc,
343 self.frontier
344 )
345 }
346 }
347
348 let new_frontier = batch.desc.upper().clone();
349
350 for x in self.frontier.elements().iter() {
373 let less_than_upper = batch.desc.upper().elements().iter().any(|u| x.less_than(u));
374 if less_than_upper {
375 self.since.join_assign(&Antichain::from_elem(x.clone()));
376 }
377 }
378
379 let filter = FetchBatchFilter::Listen {
384 as_of: self.as_of.clone(),
385 lower: self.frontier.clone(),
386 };
387 let parts = self.handle.lease_batch_parts(batch, filter).collect().await;
388
389 self.handle.maybe_downgrade_since(&self.since).await;
390
391 self.frontier = new_frontier;
394
395 (parts, self.frontier.clone())
396 }
397}
398
399impl<K, V, T, D> Listen<K, V, T, D>
400where
401 K: Debug + Codec,
402 V: Debug + Codec,
403 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
404 D: Monoid + Codec64 + Send + Sync,
405{
406 #[instrument(level = "debug", name = "listen::next", fields(shard = %self.handle.machine.shard_id()))]
417 pub async fn fetch_next(&mut self) -> Vec<ListenEvent<T, ((K, V), T, D)>> {
418 let (parts, progress) = self.next(None).await;
419 let mut ret = Vec::with_capacity(parts.len() + 1);
420 for part in parts {
421 let fetched_part = self.fetch_batch_part(part).await;
422 let updates = fetched_part.collect::<Vec<_>>();
423 if !updates.is_empty() {
424 ret.push(ListenEvent::Updates(updates));
425 }
426 }
427 ret.push(ListenEvent::Progress(progress));
428 ret
429 }
430
431 pub fn into_stream(mut self) -> impl Stream<Item = ListenEvent<T, ((K, V), T, D)>> {
433 async_stream::stream!({
434 loop {
435 for msg in self.fetch_next().await {
436 yield msg;
437 }
438 }
439 })
440 }
441
442 #[cfg(test)]
446 #[track_caller]
447 pub async fn read_until(&mut self, ts: &T) -> (Vec<((K, V), T, D)>, Antichain<T>) {
448 let mut updates = Vec::new();
449 let mut frontier = Antichain::from_elem(T::minimum());
450 while self.frontier.less_than(ts) {
451 for event in self.fetch_next().await {
452 match event {
453 ListenEvent::Updates(mut x) => updates.append(&mut x),
454 ListenEvent::Progress(x) => frontier = x,
455 }
456 }
457 }
458 (updates, frontier)
461 }
462}
463
464impl<K, V, T, D> Listen<K, V, T, D>
465where
466 K: Debug + Codec,
467 V: Debug + Codec,
468 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
469 D: Monoid + Codec64 + Send + Sync,
470{
471 async fn fetch_batch_part(&mut self, part: LeasedBatchPart<T>) -> FetchedPart<K, V, T, D> {
476 let fetched_part = fetch_leased_part(
477 &self.handle.cfg,
478 &part,
479 self.handle.blob.as_ref(),
480 Arc::clone(&self.handle.metrics),
481 &self.handle.metrics.read.listen,
482 &self.handle.machine.applier.shard_metrics,
483 &self.handle.reader_id,
484 self.handle.read_schemas.clone(),
485 &mut self.handle.schema_cache,
486 )
487 .await;
488 fetched_part
489 }
490
491 pub async fn expire(self) {
500 self.handle.expire().await
501 }
502}
503
504#[derive(Debug)]
525pub struct ReadHandle<K: Codec, V: Codec, T, D> {
526 pub(crate) cfg: PersistConfig,
527 pub(crate) metrics: Arc<Metrics>,
528 pub(crate) machine: Machine<K, V, T, D>,
529 pub(crate) gc: GarbageCollector<K, V, T, D>,
530 pub(crate) blob: Arc<dyn Blob>,
531 pub(crate) reader_id: LeasedReaderId,
532 pub(crate) read_schemas: Schemas<K, V>,
533 pub(crate) schema_cache: SchemaCache<K, V, T, D>,
534
535 since: Antichain<T>,
536 pub(crate) last_heartbeat: EpochMillis,
537 pub(crate) leased_seqnos: BTreeMap<SeqNo, Lease>,
538 pub(crate) unexpired_state: Option<UnexpiredReadHandleState>,
539}
540
541pub(crate) const READER_LEASE_DURATION: Config<Duration> = Config::new(
544 "persist_reader_lease_duration",
545 Duration::from_secs(60 * 15),
546 "The time after which we'll clean up stale read leases",
547);
548
549impl<K, V, T, D> ReadHandle<K, V, T, D>
550where
551 K: Debug + Codec,
552 V: Debug + Codec,
553 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
554 D: Monoid + Codec64 + Send + Sync,
555{
556 pub(crate) async fn new(
557 cfg: PersistConfig,
558 metrics: Arc<Metrics>,
559 machine: Machine<K, V, T, D>,
560 gc: GarbageCollector<K, V, T, D>,
561 blob: Arc<dyn Blob>,
562 reader_id: LeasedReaderId,
563 read_schemas: Schemas<K, V>,
564 since: Antichain<T>,
565 last_heartbeat: EpochMillis,
566 ) -> Self {
567 let schema_cache = machine.applier.schema_cache();
568 let expire_fn = Self::expire_fn(machine.clone(), gc.clone(), reader_id.clone());
569 ReadHandle {
570 cfg,
571 metrics: Arc::clone(&metrics),
572 machine: machine.clone(),
573 gc: gc.clone(),
574 blob,
575 reader_id: reader_id.clone(),
576 read_schemas,
577 schema_cache,
578 since,
579 last_heartbeat,
580 leased_seqnos: BTreeMap::new(),
581 unexpired_state: Some(UnexpiredReadHandleState {
582 expire_fn,
583 _heartbeat_tasks: machine
584 .start_reader_heartbeat_tasks(reader_id, gc)
585 .await
586 .into_iter()
587 .map(JoinHandle::abort_on_drop)
588 .collect(),
589 }),
590 }
591 }
592
593 pub fn shard_id(&self) -> ShardId {
595 self.machine.shard_id()
596 }
597
598 pub fn since(&self) -> &Antichain<T> {
602 &self.since
603 }
604
605 fn outstanding_seqno(&mut self) -> Option<SeqNo> {
606 while let Some(first) = self.leased_seqnos.first_entry() {
607 if first.get().count() <= 1 {
608 first.remove();
609 } else {
610 return Some(*first.key());
611 }
612 }
613 None
614 }
615
616 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
627 pub async fn downgrade_since(&mut self, new_since: &Antichain<T>) {
628 let outstanding_seqno = self.outstanding_seqno();
630
631 let heartbeat_ts = (self.cfg.now)();
632 let (_seqno, current_reader_since, maintenance) = self
633 .machine
634 .downgrade_since(&self.reader_id, outstanding_seqno, new_since, heartbeat_ts)
635 .await;
636
637 if let Some(outstanding_seqno) = outstanding_seqno {
639 let seqnos_held = _seqno.0.saturating_sub(outstanding_seqno.0);
640 const SEQNOS_HELD_THRESHOLD: u64 = 60 * 60;
643 if seqnos_held >= SEQNOS_HELD_THRESHOLD {
644 tracing::info!(
645 "{} reader {} holding an unexpected number of seqnos {} vs {}: {:?}. bt: {:?}",
646 self.machine.shard_id(),
647 self.reader_id,
648 outstanding_seqno,
649 _seqno,
650 self.leased_seqnos.keys().take(10).collect::<Vec<_>>(),
651 Backtrace::force_capture(),
654 );
655 }
656 }
657
658 self.since = current_reader_since.0;
659 self.last_heartbeat = heartbeat_ts;
662 maintenance.start_performing(&self.machine, &self.gc);
663 }
664
665 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
682 pub async fn listen(self, as_of: Antichain<T>) -> Result<Listen<K, V, T, D>, Since<T>> {
683 Listen::new(self, as_of).await
684 }
685
686 #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
700 pub async fn snapshot(
701 &mut self,
702 as_of: Antichain<T>,
703 ) -> Result<Vec<LeasedBatchPart<T>>, Since<T>> {
704 let batches = loop {
705 let min_elapsed = self.heartbeat_duration();
706 match tokio::time::timeout(min_elapsed, self.machine.snapshot(&as_of)).await {
707 Ok(Ok(batches)) => break batches,
708 Ok(Err(since)) => return Err(since),
709 Err(_timeout) => {
710 let since = self.since().clone();
711 self.maybe_downgrade_since(&since).await;
712 }
713 }
714 };
715
716 if !PartialOrder::less_equal(self.since(), &as_of) {
717 return Err(Since(self.since().clone()));
718 }
719
720 let filter = FetchBatchFilter::Snapshot { as_of };
721 let mut leased_parts = Vec::new();
722 for batch in batches {
723 leased_parts.extend(
728 self.lease_batch_parts(batch, filter.clone())
729 .collect::<Vec<_>>()
730 .await,
731 );
732 }
733 Ok(leased_parts)
734 }
735
736 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
742 pub async fn subscribe(
743 mut self,
744 as_of: Antichain<T>,
745 ) -> Result<Subscribe<K, V, T, D>, Since<T>> {
746 let snapshot_parts = self.snapshot(as_of.clone()).await?;
747 let listen = self.listen(as_of.clone()).await?;
748 Ok(Subscribe::new(snapshot_parts, listen))
749 }
750
751 fn lease_batch_part(
752 &mut self,
753 desc: Description<T>,
754 part: BatchPart<T>,
755 filter: FetchBatchFilter<T>,
756 ) -> LeasedBatchPart<T> {
757 LeasedBatchPart {
758 metrics: Arc::clone(&self.metrics),
759 shard_id: self.machine.shard_id(),
760 filter,
761 desc,
762 part,
763 lease: self.lease_seqno(),
764 filter_pushdown_audit: false,
765 }
766 }
767
768 fn lease_batch_parts(
769 &mut self,
770 batch: HollowBatch<T>,
771 filter: FetchBatchFilter<T>,
772 ) -> impl Stream<Item = LeasedBatchPart<T>> + '_ {
773 stream! {
774 let blob = Arc::clone(&self.blob);
775 let metrics = Arc::clone(&self.metrics);
776 let desc = batch.desc.clone();
777 for await part in batch.part_stream(self.shard_id(), &*blob, &*metrics) {
778 yield self.lease_batch_part(desc.clone(), part.expect("leased part").into_owned(), filter.clone())
779 }
780 }
781 }
782
783 fn lease_seqno(&mut self) -> Lease {
787 let seqno = self.machine.seqno();
788 let lease = self
789 .leased_seqnos
790 .entry(seqno)
791 .or_insert_with(|| Lease::new(seqno));
792 lease.clone()
793 }
794
795 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
798 pub async fn clone(&self, purpose: &str) -> Self {
799 let new_reader_id = LeasedReaderId::new();
800 let machine = self.machine.clone();
801 let gc = self.gc.clone();
802 let heartbeat_ts = (self.cfg.now)();
803 let (reader_state, maintenance) = machine
804 .register_leased_reader(
805 &new_reader_id,
806 purpose,
807 READER_LEASE_DURATION.get(&self.cfg),
808 heartbeat_ts,
809 false,
810 )
811 .await;
812 maintenance.start_performing(&machine, &gc);
813 assert!(PartialOrder::less_equal(&reader_state.since, &self.since));
817 let new_reader = ReadHandle::new(
818 self.cfg.clone(),
819 Arc::clone(&self.metrics),
820 machine,
821 gc,
822 Arc::clone(&self.blob),
823 new_reader_id,
824 self.read_schemas.clone(),
825 reader_state.since,
826 heartbeat_ts,
827 )
828 .await;
829 new_reader
830 }
831
832 fn heartbeat_duration(&self) -> Duration {
833 READER_LEASE_DURATION.get(&self.cfg) / 4
834 }
835
836 pub async fn maybe_downgrade_since(&mut self, new_since: &Antichain<T>) {
842 let min_elapsed = self.heartbeat_duration();
843 let elapsed_since_last_heartbeat =
844 Duration::from_millis((self.cfg.now)().saturating_sub(self.last_heartbeat));
845 if elapsed_since_last_heartbeat >= min_elapsed {
846 self.downgrade_since(new_since).await;
847 }
848 }
849
850 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
859 pub async fn expire(mut self) {
860 let Some(unexpired_state) = self.unexpired_state.take() else {
865 return;
866 };
867 unexpired_state.expire_fn.0().await;
868 }
869
870 fn expire_fn(
871 machine: Machine<K, V, T, D>,
872 gc: GarbageCollector<K, V, T, D>,
873 reader_id: LeasedReaderId,
874 ) -> ExpireFn {
875 ExpireFn(Box::new(move || {
876 Box::pin(async move {
877 let (_, maintenance) = machine.expire_leased_reader(&reader_id).await;
878 maintenance.start_performing(&machine, &gc);
879 })
880 }))
881 }
882
883 #[cfg(test)]
885 #[track_caller]
886 pub async fn expect_listen(self, as_of: T) -> Listen<K, V, T, D> {
887 self.listen(Antichain::from_elem(as_of))
888 .await
889 .expect("cannot serve requested as_of")
890 }
891}
892
893#[derive(Debug)]
895pub(crate) struct UnexpiredReadHandleState {
896 expire_fn: ExpireFn,
897 pub(crate) _heartbeat_tasks: Vec<AbortOnDropHandle<()>>,
898}
899
900#[derive(Debug)]
906pub struct Cursor<K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64, L = Lease> {
907 consolidator: Consolidator<T, D, StructuredSort<K, V, T, D>>,
908 max_len: usize,
909 max_bytes: usize,
910 _lease: L,
911 read_schemas: Schemas<K, V>,
912}
913
914impl<K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64, L> Cursor<K, V, T, D, L> {
915 pub fn into_lease(self: Self) -> L {
918 self._lease
919 }
920}
921
922impl<K, V, T, D, L> Cursor<K, V, T, D, L>
923where
924 K: Debug + Codec + Ord,
925 V: Debug + Codec + Ord,
926 T: Timestamp + Lattice + Codec64 + Sync,
927 D: Monoid + Ord + Codec64 + Send + Sync,
928{
929 pub async fn next(&mut self) -> Option<impl Iterator<Item = ((K, V), T, D)> + '_> {
931 let Self {
932 consolidator,
933 max_len,
934 max_bytes,
935 _lease,
936 read_schemas: _,
937 } = self;
938
939 let part = consolidator
940 .next_chunk(*max_len, *max_bytes)
941 .await
942 .expect("fetching a leased part")?;
943 let key_decoder = self
944 .read_schemas
945 .key
946 .decoder_any(part.key.as_ref())
947 .expect("ok");
948 let val_decoder = self
949 .read_schemas
950 .val
951 .decoder_any(part.val.as_ref())
952 .expect("ok");
953 let iter = (0..part.len()).map(move |i| {
954 let mut k = K::default();
955 let mut v = V::default();
956 key_decoder.decode(i, &mut k);
957 val_decoder.decode(i, &mut v);
958 let t = T::decode(part.time.value(i).to_le_bytes());
959 let d = D::decode(part.diff.value(i).to_le_bytes());
960 ((k, v), t, d)
961 });
962
963 Some(iter)
964 }
965}
966
967impl<K, V, T, D> ReadHandle<K, V, T, D>
968where
969 K: Debug + Codec + Ord,
970 V: Debug + Codec + Ord,
971 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
972 D: Monoid + Ord + Codec64 + Send + Sync,
973{
974 pub async fn snapshot_and_fetch(
988 &mut self,
989 as_of: Antichain<T>,
990 ) -> Result<Vec<((K, V), T, D)>, Since<T>> {
991 let mut cursor = self.snapshot_cursor(as_of, |_| true).await?;
992 let mut contents = Vec::new();
993 while let Some(iter) = cursor.next().await {
994 contents.extend(iter);
995 }
996
997 let old_len = contents.len();
1000 consolidate_updates(&mut contents);
1001 if old_len != contents.len() {
1002 self.machine
1004 .applier
1005 .shard_metrics
1006 .unconsolidated_snapshot
1007 .inc();
1008 }
1009
1010 Ok(contents)
1011 }
1012
1013 pub async fn snapshot_cursor(
1020 &mut self,
1021 as_of: Antichain<T>,
1022 should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
1023 ) -> Result<Cursor<K, V, T, D>, Since<T>> {
1024 let batches = self.machine.snapshot(&as_of).await?;
1025 let lease = self.lease_seqno();
1026
1027 Self::read_batches_consolidated(
1028 &self.cfg,
1029 Arc::clone(&self.metrics),
1030 Arc::clone(&self.machine.applier.shard_metrics),
1031 self.metrics.read.snapshot.clone(),
1032 Arc::clone(&self.blob),
1033 self.shard_id(),
1034 as_of,
1035 self.read_schemas.clone(),
1036 &batches,
1037 lease,
1038 should_fetch_part,
1039 COMPACTION_MEMORY_BOUND_BYTES.get(&self.cfg),
1040 )
1041 }
1042
1043 pub(crate) fn read_batches_consolidated<L>(
1044 persist_cfg: &PersistConfig,
1045 metrics: Arc<Metrics>,
1046 shard_metrics: Arc<ShardMetrics>,
1047 read_metrics: ReadMetrics,
1048 blob: Arc<dyn Blob>,
1049 shard_id: ShardId,
1050 as_of: Antichain<T>,
1051 schemas: Schemas<K, V>,
1052 batches: &[HollowBatch<T>],
1053 lease: L,
1054 should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
1055 memory_budget_bytes: usize,
1056 ) -> Result<Cursor<K, V, T, D, L>, Since<T>> {
1057 let context = format!("{}[as_of={:?}]", shard_id, as_of.elements());
1058 let filter = FetchBatchFilter::Snapshot {
1059 as_of: as_of.clone(),
1060 };
1061
1062 let mut consolidator = Consolidator::new(
1063 context,
1064 FetchConfig::from_persist_config(persist_cfg),
1065 shard_id,
1066 StructuredSort::new(schemas.clone()),
1067 blob,
1068 metrics,
1069 shard_metrics,
1070 read_metrics,
1071 filter,
1072 None,
1073 memory_budget_bytes,
1074 );
1075 for batch in batches {
1076 for (meta, run) in batch.runs() {
1077 consolidator.enqueue_run(
1078 &batch.desc,
1079 meta,
1080 run.into_iter()
1081 .filter(|p| should_fetch_part(p.stats()))
1082 .cloned(),
1083 );
1084 }
1085 }
1086 let max_len = persist_cfg.compaction_yield_after_n_updates;
1090 let max_bytes = BLOB_TARGET_SIZE.get(persist_cfg).max(1);
1091
1092 Ok(Cursor {
1093 consolidator,
1094 max_len,
1095 max_bytes,
1096 _lease: lease,
1097 read_schemas: schemas,
1098 })
1099 }
1100
1101 pub fn snapshot_stats(
1113 &self,
1114 as_of: Option<Antichain<T>>,
1115 ) -> impl Future<Output = Result<SnapshotStats, Since<T>>> + Send + 'static {
1116 let machine = self.machine.clone();
1117 async move {
1118 let batches = match as_of {
1119 Some(as_of) => machine.snapshot(&as_of).await?,
1120 None => machine.applier.all_batches(),
1121 };
1122 let num_updates = batches.iter().map(|b| b.len).sum();
1123 Ok(SnapshotStats {
1124 shard_id: machine.shard_id(),
1125 num_updates,
1126 })
1127 }
1128 }
1129
1130 pub async fn snapshot_parts_stats(
1141 &self,
1142 as_of: Antichain<T>,
1143 ) -> Result<SnapshotPartsStats, Since<T>> {
1144 let batches = self.machine.snapshot(&as_of).await?;
1145 let parts = stream::iter(&batches)
1146 .flat_map(|b| b.part_stream(self.shard_id(), &*self.blob, &*self.metrics))
1147 .map(|p| {
1148 let p = p.expect("live batch");
1149 SnapshotPartStats {
1150 encoded_size_bytes: p.encoded_size_bytes(),
1151 stats: p.stats().cloned(),
1152 }
1153 })
1154 .collect()
1155 .await;
1156 Ok(SnapshotPartsStats {
1157 metrics: Arc::clone(&self.machine.applier.metrics),
1158 shard_id: self.machine.shard_id(),
1159 parts,
1160 })
1161 }
1162}
1163
1164impl<K, V, T, D> ReadHandle<K, V, T, D>
1165where
1166 K: Debug + Codec + Ord,
1167 V: Debug + Codec + Ord,
1168 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
1169 D: Monoid + Codec64 + Send + Sync,
1170{
1171 pub async fn snapshot_and_stream(
1176 &mut self,
1177 as_of: Antichain<T>,
1178 ) -> Result<impl Stream<Item = ((K, V), T, D)> + use<K, V, T, D>, Since<T>> {
1179 let snap = self.snapshot(as_of).await?;
1180
1181 let blob = Arc::clone(&self.blob);
1182 let metrics = Arc::clone(&self.metrics);
1183 let snapshot_metrics = self.metrics.read.snapshot.clone();
1184 let shard_metrics = Arc::clone(&self.machine.applier.shard_metrics);
1185 let reader_id = self.reader_id.clone();
1186 let schemas = self.read_schemas.clone();
1187 let mut schema_cache = self.schema_cache.clone();
1188 let persist_cfg = self.cfg.clone();
1189 let stream = async_stream::stream! {
1190 for part in snap {
1191 let mut fetched_part = fetch_leased_part(
1192 &persist_cfg,
1193 &part,
1194 blob.as_ref(),
1195 Arc::clone(&metrics),
1196 &snapshot_metrics,
1197 &shard_metrics,
1198 &reader_id,
1199 schemas.clone(),
1200 &mut schema_cache,
1201 )
1202 .await;
1203
1204 while let Some(next) = fetched_part.next() {
1205 yield next;
1206 }
1207 }
1208 };
1209
1210 Ok(stream)
1211 }
1212}
1213
1214impl<K, V, T, D> ReadHandle<K, V, T, D>
1215where
1216 K: Debug + Codec + Ord,
1217 V: Debug + Codec + Ord,
1218 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
1219 D: Monoid + Ord + Codec64 + Send + Sync,
1220{
1221 #[cfg(test)]
1224 #[track_caller]
1225 pub async fn expect_snapshot_and_fetch(&mut self, as_of: T) -> Vec<((K, V), T, D)> {
1226 let mut ret = self
1227 .snapshot_and_fetch(Antichain::from_elem(as_of))
1228 .await
1229 .expect("cannot serve requested as_of");
1230
1231 ret.sort();
1232 ret
1233 }
1234}
1235
1236impl<K: Codec, V: Codec, T, D> Drop for ReadHandle<K, V, T, D> {
1237 fn drop(&mut self) {
1238 let Some(unexpired_state) = self.unexpired_state.take() else {
1243 return;
1244 };
1245
1246 let handle = match Handle::try_current() {
1247 Ok(x) => x,
1248 Err(_) => {
1249 warn!(
1250 "ReadHandle {} dropped without being explicitly expired, falling back to lease timeout",
1251 self.reader_id
1252 );
1253 return;
1254 }
1255 };
1256 let expire_span = debug_span!("drop::expire");
1262 handle.spawn_named(
1263 || format!("ReadHandle::expire ({})", self.reader_id),
1264 unexpired_state.expire_fn.0().instrument(expire_span),
1265 );
1266 }
1267}
1268
1269#[cfg(test)]
1270mod tests {
1271 use std::pin;
1272 use std::str::FromStr;
1273
1274 use mz_dyncfg::ConfigUpdates;
1275 use mz_ore::cast::CastFrom;
1276 use mz_ore::metrics::MetricsRegistry;
1277 use mz_persist::mem::{MemBlob, MemBlobConfig, MemConsensus};
1278 use mz_persist::unreliable::{UnreliableConsensus, UnreliableHandle};
1279 use serde::{Deserialize, Serialize};
1280 use serde_json::json;
1281 use tokio_stream::StreamExt;
1282
1283 use crate::async_runtime::IsolatedRuntime;
1284 use crate::batch::BLOB_TARGET_SIZE;
1285 use crate::cache::StateCache;
1286 use crate::internal::metrics::Metrics;
1287 use crate::rpc::NoopPubSubSender;
1288 use crate::tests::{all_ok, new_test_client};
1289 use crate::{Diagnostics, PersistClient, PersistConfig, ShardId};
1290
1291 use super::*;
1292
1293 #[mz_persist_proc::test(tokio::test)]
1295 #[cfg_attr(miri, ignore)] async fn drop_unused_subscribe(dyncfgs: ConfigUpdates) {
1297 let data = [
1298 (("0".to_owned(), "zero".to_owned()), 0, 1),
1299 (("1".to_owned(), "one".to_owned()), 1, 1),
1300 (("2".to_owned(), "two".to_owned()), 2, 1),
1301 ];
1302
1303 let (mut write, read) = new_test_client(&dyncfgs)
1304 .await
1305 .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1306 .await;
1307
1308 write.expect_compare_and_append(&data[0..1], 0, 1).await;
1309 write.expect_compare_and_append(&data[1..2], 1, 2).await;
1310 write.expect_compare_and_append(&data[2..3], 2, 3).await;
1311
1312 let subscribe = read
1313 .subscribe(timely::progress::Antichain::from_elem(2))
1314 .await
1315 .unwrap();
1316 assert!(
1317 !subscribe.snapshot.as_ref().unwrap().is_empty(),
1318 "snapshot must have batches for test to be meaningful"
1319 );
1320 drop(subscribe);
1321 }
1322
1323 #[mz_persist_proc::test(tokio::test)]
1325 #[cfg_attr(miri, ignore)] async fn streaming_consolidate(dyncfgs: ConfigUpdates) {
1327 let data = &[
1328 (("k".to_owned(), "v".to_owned()), 0, 1),
1330 (("k".to_owned(), "v".to_owned()), 1, 1),
1331 (("k".to_owned(), "v".to_owned()), 2, 1),
1332 (("k2".to_owned(), "v".to_owned()), 0, 1),
1334 (("k2".to_owned(), "v".to_owned()), 1, -1),
1335 ];
1336
1337 let (mut write, read) = {
1338 let client = new_test_client(&dyncfgs).await;
1339 client.cfg.set_config(&BLOB_TARGET_SIZE, 1000); client
1341 .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1342 .await
1343 };
1344
1345 write.expect_compare_and_append(data, 0, 5).await;
1346
1347 let mut snapshot = read
1348 .subscribe(timely::progress::Antichain::from_elem(4))
1349 .await
1350 .unwrap();
1351
1352 let mut updates = vec![];
1353 'outer: loop {
1354 for event in snapshot.fetch_next().await {
1355 match event {
1356 ListenEvent::Progress(t) => {
1357 if !t.less_than(&4) {
1358 break 'outer;
1359 }
1360 }
1361 ListenEvent::Updates(data) => {
1362 updates.extend(data);
1363 }
1364 }
1365 }
1366 }
1367 assert_eq!(updates, &[(("k".to_owned(), "v".to_owned()), 4u64, 3i64)],)
1368 }
1369
1370 #[mz_persist_proc::test(tokio::test)]
1371 #[cfg_attr(miri, ignore)] async fn snapshot_and_stream(dyncfgs: ConfigUpdates) {
1373 let data = &mut [
1374 (("k1".to_owned(), "v1".to_owned()), 0, 1),
1375 (("k2".to_owned(), "v2".to_owned()), 1, 1),
1376 (("k3".to_owned(), "v3".to_owned()), 2, 1),
1377 (("k4".to_owned(), "v4".to_owned()), 2, 1),
1378 (("k5".to_owned(), "v5".to_owned()), 3, 1),
1379 ];
1380
1381 let (mut write, mut read) = {
1382 let client = new_test_client(&dyncfgs).await;
1383 client.cfg.set_config(&BLOB_TARGET_SIZE, 0); client
1385 .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1386 .await
1387 };
1388
1389 write.expect_compare_and_append(&data[0..2], 0, 2).await;
1390 write.expect_compare_and_append(&data[2..4], 2, 3).await;
1391 write.expect_compare_and_append(&data[4..], 3, 4).await;
1392
1393 let as_of = Antichain::from_elem(3);
1394 let mut snapshot = pin::pin!(read.snapshot_and_stream(as_of.clone()).await.unwrap());
1395
1396 let mut snapshot_rows = vec![];
1397 while let Some(((k, v), t, d)) = snapshot.next().await {
1398 snapshot_rows.push(((k, v), t, d));
1399 }
1400
1401 for ((_k, _v), t, _d) in data.as_mut_slice() {
1402 t.advance_by(as_of.borrow());
1403 }
1404
1405 assert_eq!(data.as_slice(), snapshot_rows.as_slice());
1406 }
1407
1408 #[mz_persist_proc::test(tokio::test)]
1410 #[cfg_attr(miri, ignore)] async fn seqno_leases(dyncfgs: ConfigUpdates) {
1412 let mut data = vec![];
1413 for i in 0..20 {
1414 data.push(((i.to_string(), i.to_string()), i, 1))
1415 }
1416
1417 let shard_id = ShardId::new();
1418
1419 let client = new_test_client(&dyncfgs).await;
1420 let (mut write, read) = client
1421 .expect_open::<String, String, u64, i64>(shard_id)
1422 .await;
1423
1424 let mut offset = 0;
1426 let mut width = 2;
1427
1428 for i in offset..offset + width {
1429 write
1430 .expect_compare_and_append(
1431 &data[i..i + 1],
1432 u64::cast_from(i),
1433 u64::cast_from(i) + 1,
1434 )
1435 .await;
1436 }
1437 offset += width;
1438
1439 let mut fetcher = client
1441 .create_batch_fetcher::<String, String, u64, i64>(
1442 shard_id,
1443 Default::default(),
1444 Default::default(),
1445 false,
1446 Diagnostics::for_tests(),
1447 )
1448 .await
1449 .unwrap();
1450
1451 let mut subscribe = read
1452 .subscribe(timely::progress::Antichain::from_elem(1))
1453 .await
1454 .expect("cannot serve requested as_of");
1455
1456 let original_seqno_since = subscribe.listen.handle.machine.applier.seqno_since();
1458
1459 let mut parts = vec![];
1460
1461 width = 4;
1462 for i in offset..offset + width {
1464 for event in subscribe.next(None).await {
1465 if let ListenEvent::Updates(mut new_parts) = event {
1466 parts.append(&mut new_parts);
1467 subscribe
1470 .listen
1471 .handle
1472 .downgrade_since(&subscribe.listen.since)
1473 .await;
1474 }
1475 }
1476
1477 write
1478 .expect_compare_and_append(
1479 &data[i..i + 1],
1480 u64::cast_from(i),
1481 u64::cast_from(i) + 1,
1482 )
1483 .await;
1484
1485 assert_eq!(
1487 subscribe.listen.handle.machine.applier.seqno_since(),
1488 original_seqno_since
1489 );
1490 }
1491
1492 offset += width;
1493
1494 let mut seqno_since = subscribe.listen.handle.machine.applier.seqno_since();
1495
1496 assert_eq!(seqno_since, original_seqno_since);
1498
1499 let mut subsequent_parts = vec![];
1502
1503 let mut this_seqno = SeqNo::minimum();
1507
1508 for (mut i, part) in parts.into_iter().enumerate() {
1510 let part_seqno = part.lease.seqno();
1511 let last_seqno = this_seqno;
1512 this_seqno = part_seqno;
1513 assert!(this_seqno >= last_seqno);
1514
1515 let (part, lease) = part.into_exchangeable_part();
1516 let _ = fetcher.fetch_leased_part(part).await;
1517 drop(lease);
1518
1519 for event in subscribe.next(None).await {
1521 if let ListenEvent::Updates(parts) = event {
1522 for part in parts {
1523 let (_, lease) = part.into_exchangeable_part();
1524 subsequent_parts.push(lease);
1525 }
1526 }
1527 }
1528
1529 subscribe
1530 .listen
1531 .handle
1532 .downgrade_since(&subscribe.listen.since)
1533 .await;
1534
1535 i += offset;
1537 write
1538 .expect_compare_and_append(
1539 &data[i..i + 1],
1540 u64::cast_from(i),
1541 u64::cast_from(i) + 1,
1542 )
1543 .await;
1544
1545 let expect_downgrade = subscribe.listen.handle.outstanding_seqno() > Some(part_seqno);
1548
1549 let new_seqno_since = subscribe.listen.handle.machine.applier.seqno_since();
1550 if expect_downgrade {
1551 assert!(new_seqno_since > seqno_since);
1552 } else {
1553 assert_eq!(new_seqno_since, seqno_since);
1554 }
1555 seqno_since = new_seqno_since;
1556 }
1557
1558 assert!(seqno_since > original_seqno_since);
1560
1561 drop(subsequent_parts);
1563 drop(subscribe);
1564 }
1565
1566 #[mz_ore::test]
1567 fn reader_id_human_readable_serde() {
1568 #[derive(Debug, Serialize, Deserialize)]
1569 struct Container {
1570 reader_id: LeasedReaderId,
1571 }
1572
1573 let id =
1575 LeasedReaderId::from_str("r00000000-1234-5678-0000-000000000000").expect("valid id");
1576 assert_eq!(
1577 id,
1578 serde_json::from_value(serde_json::to_value(id.clone()).expect("serializable"))
1579 .expect("deserializable")
1580 );
1581
1582 assert_eq!(
1584 id,
1585 serde_json::from_str("\"r00000000-1234-5678-0000-000000000000\"")
1586 .expect("deserializable")
1587 );
1588
1589 let json = json!({ "reader_id": id });
1591 assert_eq!(
1592 "{\"reader_id\":\"r00000000-1234-5678-0000-000000000000\"}",
1593 &json.to_string()
1594 );
1595 let container: Container = serde_json::from_value(json).expect("deserializable");
1596 assert_eq!(container.reader_id, id);
1597 }
1598
1599 #[mz_ore::test(tokio::test)]
1603 #[cfg_attr(miri, ignore)] async fn skip_consensus_fetch_optimization() {
1605 let data = vec![
1606 (("0".to_owned(), "zero".to_owned()), 0, 1),
1607 (("1".to_owned(), "one".to_owned()), 1, 1),
1608 (("2".to_owned(), "two".to_owned()), 2, 1),
1609 ];
1610
1611 let cfg = PersistConfig::new_for_tests();
1612 let blob = Arc::new(MemBlob::open(MemBlobConfig::default()));
1613 let consensus = Arc::new(MemConsensus::default());
1614 let unreliable = UnreliableHandle::default();
1615 unreliable.totally_available();
1616 let consensus = Arc::new(UnreliableConsensus::new(consensus, unreliable.clone()));
1617 let metrics = Arc::new(Metrics::new(&cfg, &MetricsRegistry::new()));
1618 let pubsub_sender = Arc::new(NoopPubSubSender);
1619 let (mut write, mut read) = PersistClient::new(
1620 cfg,
1621 blob,
1622 consensus,
1623 metrics,
1624 Arc::new(IsolatedRuntime::new_for_tests()),
1625 Arc::new(StateCache::new_no_metrics()),
1626 pubsub_sender,
1627 )
1628 .expect("client construction failed")
1629 .expect_open::<String, String, u64, i64>(ShardId::new())
1630 .await;
1631
1632 write.expect_compare_and_append(&data[0..1], 0, 1).await;
1633 write.expect_compare_and_append(&data[1..2], 1, 2).await;
1634 write.expect_compare_and_append(&data[2..3], 2, 3).await;
1635
1636 let snapshot = read.expect_snapshot_and_fetch(2).await;
1637 let mut listen = read.expect_listen(0).await;
1638
1639 let listen_actual = listen.fetch_next().await;
1644 let expected_events = vec![ListenEvent::Progress(Antichain::from_elem(1))];
1645 assert_eq!(listen_actual, expected_events);
1646
1647 unreliable.totally_unavailable();
1650 assert_eq!(snapshot, all_ok(&data, 2));
1651 assert_eq!(
1652 listen.read_until(&3).await,
1653 (all_ok(&data[1..], 1), Antichain::from_elem(3))
1654 );
1655 }
1656}