1use async_stream::stream;
13use std::backtrace::Backtrace;
14use std::collections::BTreeMap;
15use std::fmt::Debug;
16use std::future::Future;
17use std::sync::Arc;
18use std::time::Duration;
19
20use differential_dataflow::consolidation::consolidate_updates;
21use differential_dataflow::difference::Monoid;
22use differential_dataflow::lattice::Lattice;
23use differential_dataflow::trace::Description;
24use futures::Stream;
25use futures_util::{StreamExt, stream};
26use mz_dyncfg::Config;
27use mz_ore::halt;
28use mz_ore::instrument;
29use mz_ore::now::EpochMillis;
30use mz_ore::task::{AbortOnDropHandle, JoinHandle, RuntimeExt};
31use mz_persist::location::{Blob, SeqNo};
32use mz_persist_types::columnar::{ColumnDecoder, Schema};
33use mz_persist_types::{Codec, Codec64};
34use proptest_derive::Arbitrary;
35use serde::{Deserialize, Serialize};
36use timely::PartialOrder;
37use timely::order::TotalOrder;
38use timely::progress::{Antichain, Timestamp};
39use tokio::runtime::Handle;
40use tracing::{Instrument, debug_span, warn};
41use uuid::Uuid;
42
43use crate::batch::BLOB_TARGET_SIZE;
44use crate::cfg::{COMPACTION_MEMORY_BOUND_BYTES, RetryParameters};
45use crate::fetch::FetchConfig;
46use crate::fetch::{FetchBatchFilter, FetchedPart, Lease, LeasedBatchPart, fetch_leased_part};
47use crate::internal::encoding::Schemas;
48use crate::internal::machine::{ExpireFn, Machine};
49use crate::internal::metrics::{Metrics, ReadMetrics, ShardMetrics};
50use crate::internal::state::{BatchPart, HollowBatch};
51use crate::internal::watch::StateWatch;
52use crate::iter::{Consolidator, StructuredSort};
53use crate::schema::SchemaCache;
54use crate::stats::{SnapshotPartStats, SnapshotPartsStats, SnapshotStats};
55use crate::{GarbageCollector, PersistConfig, ShardId, parse_id};
56
57pub use crate::internal::encoding::LazyPartStats;
58pub use crate::internal::state::Since;
59
60#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
62#[serde(try_from = "String", into = "String")]
63pub struct LeasedReaderId(pub(crate) [u8; 16]);
64
65impl std::fmt::Display for LeasedReaderId {
66 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67 write!(f, "r{}", Uuid::from_bytes(self.0))
68 }
69}
70
71impl std::fmt::Debug for LeasedReaderId {
72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 write!(f, "LeasedReaderId({})", Uuid::from_bytes(self.0))
74 }
75}
76
77impl std::str::FromStr for LeasedReaderId {
78 type Err = String;
79
80 fn from_str(s: &str) -> Result<Self, Self::Err> {
81 parse_id("r", "LeasedReaderId", s).map(LeasedReaderId)
82 }
83}
84
85impl From<LeasedReaderId> for String {
86 fn from(reader_id: LeasedReaderId) -> Self {
87 reader_id.to_string()
88 }
89}
90
91impl TryFrom<String> for LeasedReaderId {
92 type Error = String;
93
94 fn try_from(s: String) -> Result<Self, Self::Error> {
95 s.parse()
96 }
97}
98
99impl LeasedReaderId {
100 pub(crate) fn new() -> Self {
101 LeasedReaderId(*Uuid::new_v4().as_bytes())
102 }
103}
104
105#[derive(Debug)]
110pub struct Subscribe<K: Codec, V: Codec, T, D> {
111 snapshot: Option<Vec<LeasedBatchPart<T>>>,
112 listen: Listen<K, V, T, D>,
113}
114
115impl<K, V, T, D> Subscribe<K, V, T, D>
116where
117 K: Debug + Codec,
118 V: Debug + Codec,
119 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
120 D: Monoid + Codec64 + Send + Sync,
121{
122 fn new(snapshot_parts: Vec<LeasedBatchPart<T>>, listen: Listen<K, V, T, D>) -> Self {
123 Subscribe {
124 snapshot: Some(snapshot_parts),
125 listen,
126 }
127 }
128
129 #[instrument(level = "debug", fields(shard = %self.listen.handle.machine.shard_id()))]
137 pub async fn next(
138 &mut self,
139 listen_retry: Option<RetryParameters>,
141 ) -> Vec<ListenEvent<T, LeasedBatchPart<T>>> {
142 match self.snapshot.take() {
143 Some(parts) => vec![ListenEvent::Updates(parts)],
144 None => {
145 let (parts, upper) = self.listen.next(listen_retry).await;
146 vec![ListenEvent::Updates(parts), ListenEvent::Progress(upper)]
147 }
148 }
149 }
150}
151
152impl<K, V, T, D> Subscribe<K, V, T, D>
153where
154 K: Debug + Codec,
155 V: Debug + Codec,
156 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
157 D: Monoid + Codec64 + Send + Sync,
158{
159 #[instrument(level = "debug", fields(shard = %self.listen.handle.machine.shard_id()))]
162 pub async fn fetch_next(
163 &mut self,
164 ) -> Vec<ListenEvent<T, ((Result<K, String>, Result<V, String>), T, D)>> {
165 let events = self.next(None).await;
166 let new_len = events
167 .iter()
168 .map(|event| match event {
169 ListenEvent::Updates(parts) => parts.len(),
170 ListenEvent::Progress(_) => 1,
171 })
172 .sum();
173 let mut ret = Vec::with_capacity(new_len);
174 for event in events {
175 match event {
176 ListenEvent::Updates(parts) => {
177 for part in parts {
178 let fetched_part = self.listen.fetch_batch_part(part).await;
179 let updates = fetched_part.collect::<Vec<_>>();
180 if !updates.is_empty() {
181 ret.push(ListenEvent::Updates(updates));
182 }
183 }
184 }
185 ListenEvent::Progress(progress) => ret.push(ListenEvent::Progress(progress)),
186 }
187 }
188 ret
189 }
190
191 pub async fn fetch_batch_part(&mut self, part: LeasedBatchPart<T>) -> FetchedPart<K, V, T, D> {
193 self.listen.fetch_batch_part(part).await
194 }
195}
196
197impl<K, V, T, D> Subscribe<K, V, T, D>
198where
199 K: Debug + Codec,
200 V: Debug + Codec,
201 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
202 D: Monoid + Codec64 + Send + Sync,
203{
204 pub async fn expire(mut self) {
213 let _ = self.snapshot.take(); self.listen.expire().await;
215 }
216}
217
218#[derive(Debug, PartialEq)]
222pub enum ListenEvent<T, D> {
223 Progress(Antichain<T>),
225 Updates(Vec<D>),
227}
228
229#[derive(Debug)]
231pub struct Listen<K: Codec, V: Codec, T, D> {
232 handle: ReadHandle<K, V, T, D>,
233 watch: StateWatch<K, V, T, D>,
234
235 as_of: Antichain<T>,
236 since: Antichain<T>,
237 frontier: Antichain<T>,
238}
239
240impl<K, V, T, D> Listen<K, V, T, D>
241where
242 K: Debug + Codec,
243 V: Debug + Codec,
244 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
245 D: Monoid + Codec64 + Send + Sync,
246{
247 async fn new(
248 mut handle: ReadHandle<K, V, T, D>,
249 as_of: Antichain<T>,
250 ) -> Result<Self, Since<T>> {
251 let () = handle.machine.verify_listen(&as_of)?;
252
253 let since = as_of.clone();
254 if !PartialOrder::less_equal(handle.since(), &since) {
255 return Err(Since(handle.since().clone()));
258 }
259 handle.downgrade_since(&since).await;
263
264 let watch = handle.machine.applier.watch();
265 Ok(Listen {
266 handle,
267 watch,
268 since,
269 frontier: as_of.clone(),
270 as_of,
271 })
272 }
273
274 pub fn frontier(&self) -> &Antichain<T> {
276 &self.frontier
277 }
278
279 pub async fn next(
287 &mut self,
288 retry: Option<RetryParameters>,
290 ) -> (Vec<LeasedBatchPart<T>>, Antichain<T>) {
291 let batch = loop {
292 let min_elapsed = self.handle.heartbeat_duration();
293 let next_batch = self.handle.machine.next_listen_batch(
294 &self.frontier,
295 &mut self.watch,
296 Some(&self.handle.reader_id),
297 retry,
298 );
299 match tokio::time::timeout(min_elapsed, next_batch).await {
300 Ok(batch) => break batch,
301 Err(_elapsed) => {
302 self.handle.maybe_downgrade_since(&self.since).await;
303 }
304 }
305 };
306
307 let acceptable_desc = PartialOrder::less_than(batch.desc.since(), &self.frontier)
317 || (self.frontier == self.as_of
322 && PartialOrder::less_equal(batch.desc.since(), &self.frontier));
323 if !acceptable_desc {
324 let lease_state = self
325 .handle
326 .machine
327 .applier
328 .reader_lease(self.handle.reader_id.clone());
329 if let Some(lease) = lease_state {
330 panic!(
331 "Listen on {} received a batch {:?} advanced past the listen frontier {:?}, but the lease has not expired: {:?}",
332 self.handle.machine.shard_id(),
333 batch.desc,
334 self.frontier,
335 lease
336 )
337 } else {
338 halt!(
341 "Listen on {} received a batch {:?} advanced past the listen frontier {:?} after the reader has expired. \
342 This can happen in exceptional cases: a machine goes to sleep or is running out of memory or CPU, for example.",
343 self.handle.machine.shard_id(),
344 batch.desc,
345 self.frontier
346 )
347 }
348 }
349
350 let new_frontier = batch.desc.upper().clone();
351
352 for x in self.frontier.elements().iter() {
375 let less_than_upper = batch.desc.upper().elements().iter().any(|u| x.less_than(u));
376 if less_than_upper {
377 self.since.join_assign(&Antichain::from_elem(x.clone()));
378 }
379 }
380
381 let filter = FetchBatchFilter::Listen {
386 as_of: self.as_of.clone(),
387 lower: self.frontier.clone(),
388 };
389 let parts = self.handle.lease_batch_parts(batch, filter).collect().await;
390
391 self.handle.maybe_downgrade_since(&self.since).await;
392
393 self.frontier = new_frontier;
396
397 (parts, self.frontier.clone())
398 }
399}
400
401impl<K, V, T, D> Listen<K, V, T, D>
402where
403 K: Debug + Codec,
404 V: Debug + Codec,
405 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
406 D: Monoid + Codec64 + Send + Sync,
407{
408 #[instrument(level = "debug", name = "listen::next", fields(shard = %self.handle.machine.shard_id()))]
419 pub async fn fetch_next(
420 &mut self,
421 ) -> Vec<ListenEvent<T, ((Result<K, String>, Result<V, String>), T, D)>> {
422 let (parts, progress) = self.next(None).await;
423 let mut ret = Vec::with_capacity(parts.len() + 1);
424 for part in parts {
425 let fetched_part = self.fetch_batch_part(part).await;
426 let updates = fetched_part.collect::<Vec<_>>();
427 if !updates.is_empty() {
428 ret.push(ListenEvent::Updates(updates));
429 }
430 }
431 ret.push(ListenEvent::Progress(progress));
432 ret
433 }
434
435 pub fn into_stream(
437 mut self,
438 ) -> impl Stream<Item = ListenEvent<T, ((Result<K, String>, Result<V, String>), T, D)>> {
439 async_stream::stream!({
440 loop {
441 for msg in self.fetch_next().await {
442 yield msg;
443 }
444 }
445 })
446 }
447
448 #[cfg(test)]
452 #[track_caller]
453 pub async fn read_until(
454 &mut self,
455 ts: &T,
456 ) -> (
457 Vec<((Result<K, String>, Result<V, String>), T, D)>,
458 Antichain<T>,
459 ) {
460 let mut updates = Vec::new();
461 let mut frontier = Antichain::from_elem(T::minimum());
462 while self.frontier.less_than(ts) {
463 for event in self.fetch_next().await {
464 match event {
465 ListenEvent::Updates(mut x) => updates.append(&mut x),
466 ListenEvent::Progress(x) => frontier = x,
467 }
468 }
469 }
470 (updates, frontier)
473 }
474}
475
476impl<K, V, T, D> Listen<K, V, T, D>
477where
478 K: Debug + Codec,
479 V: Debug + Codec,
480 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
481 D: Monoid + Codec64 + Send + Sync,
482{
483 async fn fetch_batch_part(&mut self, part: LeasedBatchPart<T>) -> FetchedPart<K, V, T, D> {
488 let fetched_part = fetch_leased_part(
489 &self.handle.cfg,
490 &part,
491 self.handle.blob.as_ref(),
492 Arc::clone(&self.handle.metrics),
493 &self.handle.metrics.read.listen,
494 &self.handle.machine.applier.shard_metrics,
495 &self.handle.reader_id,
496 self.handle.read_schemas.clone(),
497 &mut self.handle.schema_cache,
498 )
499 .await;
500 fetched_part
501 }
502
503 pub async fn expire(self) {
512 self.handle.expire().await
513 }
514}
515
516#[derive(Debug)]
537pub struct ReadHandle<K: Codec, V: Codec, T, D> {
538 pub(crate) cfg: PersistConfig,
539 pub(crate) metrics: Arc<Metrics>,
540 pub(crate) machine: Machine<K, V, T, D>,
541 pub(crate) gc: GarbageCollector<K, V, T, D>,
542 pub(crate) blob: Arc<dyn Blob>,
543 pub(crate) reader_id: LeasedReaderId,
544 pub(crate) read_schemas: Schemas<K, V>,
545 pub(crate) schema_cache: SchemaCache<K, V, T, D>,
546
547 since: Antichain<T>,
548 pub(crate) last_heartbeat: EpochMillis,
549 pub(crate) leased_seqnos: BTreeMap<SeqNo, Lease>,
550 pub(crate) unexpired_state: Option<UnexpiredReadHandleState>,
551}
552
553pub(crate) const READER_LEASE_DURATION: Config<Duration> = Config::new(
556 "persist_reader_lease_duration",
557 Duration::from_secs(60 * 15),
558 "The time after which we'll clean up stale read leases",
559);
560
561impl<K, V, T, D> ReadHandle<K, V, T, D>
562where
563 K: Debug + Codec,
564 V: Debug + Codec,
565 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
566 D: Monoid + Codec64 + Send + Sync,
567{
568 pub(crate) async fn new(
569 cfg: PersistConfig,
570 metrics: Arc<Metrics>,
571 machine: Machine<K, V, T, D>,
572 gc: GarbageCollector<K, V, T, D>,
573 blob: Arc<dyn Blob>,
574 reader_id: LeasedReaderId,
575 read_schemas: Schemas<K, V>,
576 since: Antichain<T>,
577 last_heartbeat: EpochMillis,
578 ) -> Self {
579 let schema_cache = machine.applier.schema_cache();
580 let expire_fn = Self::expire_fn(machine.clone(), gc.clone(), reader_id.clone());
581 ReadHandle {
582 cfg,
583 metrics: Arc::clone(&metrics),
584 machine: machine.clone(),
585 gc: gc.clone(),
586 blob,
587 reader_id: reader_id.clone(),
588 read_schemas,
589 schema_cache,
590 since,
591 last_heartbeat,
592 leased_seqnos: BTreeMap::new(),
593 unexpired_state: Some(UnexpiredReadHandleState {
594 expire_fn,
595 _heartbeat_tasks: machine
596 .start_reader_heartbeat_tasks(reader_id, gc)
597 .await
598 .into_iter()
599 .map(JoinHandle::abort_on_drop)
600 .collect(),
601 }),
602 }
603 }
604
605 pub fn shard_id(&self) -> ShardId {
607 self.machine.shard_id()
608 }
609
610 pub fn since(&self) -> &Antichain<T> {
614 &self.since
615 }
616
617 fn outstanding_seqno(&mut self) -> Option<SeqNo> {
618 while let Some(first) = self.leased_seqnos.first_entry() {
619 if first.get().count() <= 1 {
620 first.remove();
621 } else {
622 return Some(*first.key());
623 }
624 }
625 None
626 }
627
628 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
639 pub async fn downgrade_since(&mut self, new_since: &Antichain<T>) {
640 let outstanding_seqno = self.outstanding_seqno();
642
643 let heartbeat_ts = (self.cfg.now)();
644 let (_seqno, current_reader_since, maintenance) = self
645 .machine
646 .downgrade_since(&self.reader_id, outstanding_seqno, new_since, heartbeat_ts)
647 .await;
648
649 if let Some(outstanding_seqno) = outstanding_seqno {
651 let seqnos_held = _seqno.0.saturating_sub(outstanding_seqno.0);
652 const SEQNOS_HELD_THRESHOLD: u64 = 60 * 60;
655 if seqnos_held >= SEQNOS_HELD_THRESHOLD {
656 tracing::info!(
657 "{} reader {} holding an unexpected number of seqnos {} vs {}: {:?}. bt: {:?}",
658 self.machine.shard_id(),
659 self.reader_id,
660 outstanding_seqno,
661 _seqno,
662 self.leased_seqnos.keys().take(10).collect::<Vec<_>>(),
663 Backtrace::force_capture(),
666 );
667 }
668 }
669
670 self.since = current_reader_since.0;
671 self.last_heartbeat = heartbeat_ts;
674 maintenance.start_performing(&self.machine, &self.gc);
675 }
676
677 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
694 pub async fn listen(self, as_of: Antichain<T>) -> Result<Listen<K, V, T, D>, Since<T>> {
695 Listen::new(self, as_of).await
696 }
697
698 #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
712 pub async fn snapshot(
713 &mut self,
714 as_of: Antichain<T>,
715 ) -> Result<Vec<LeasedBatchPart<T>>, Since<T>> {
716 let batches = loop {
717 let min_elapsed = self.heartbeat_duration();
718 match tokio::time::timeout(min_elapsed, self.machine.snapshot(&as_of)).await {
719 Ok(Ok(batches)) => break batches,
720 Ok(Err(since)) => return Err(since),
721 Err(_timeout) => {
722 let since = self.since().clone();
723 self.maybe_downgrade_since(&since).await;
724 }
725 }
726 };
727
728 if !PartialOrder::less_equal(self.since(), &as_of) {
729 return Err(Since(self.since().clone()));
730 }
731
732 let filter = FetchBatchFilter::Snapshot { as_of };
733 let mut leased_parts = Vec::new();
734 for batch in batches {
735 leased_parts.extend(
740 self.lease_batch_parts(batch, filter.clone())
741 .collect::<Vec<_>>()
742 .await,
743 );
744 }
745 Ok(leased_parts)
746 }
747
748 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
754 pub async fn subscribe(
755 mut self,
756 as_of: Antichain<T>,
757 ) -> Result<Subscribe<K, V, T, D>, Since<T>> {
758 let snapshot_parts = self.snapshot(as_of.clone()).await?;
759 let listen = self.listen(as_of.clone()).await?;
760 Ok(Subscribe::new(snapshot_parts, listen))
761 }
762
763 fn lease_batch_part(
764 &mut self,
765 desc: Description<T>,
766 part: BatchPart<T>,
767 filter: FetchBatchFilter<T>,
768 ) -> LeasedBatchPart<T> {
769 LeasedBatchPart {
770 metrics: Arc::clone(&self.metrics),
771 shard_id: self.machine.shard_id(),
772 filter,
773 desc,
774 part,
775 lease: self.lease_seqno(),
776 filter_pushdown_audit: false,
777 }
778 }
779
780 fn lease_batch_parts(
781 &mut self,
782 batch: HollowBatch<T>,
783 filter: FetchBatchFilter<T>,
784 ) -> impl Stream<Item = LeasedBatchPart<T>> + '_ {
785 stream! {
786 let blob = Arc::clone(&self.blob);
787 let metrics = Arc::clone(&self.metrics);
788 let desc = batch.desc.clone();
789 for await part in batch.part_stream(self.shard_id(), &*blob, &*metrics) {
790 yield self.lease_batch_part(desc.clone(), part.expect("leased part").into_owned(), filter.clone())
791 }
792 }
793 }
794
795 fn lease_seqno(&mut self) -> Lease {
799 let seqno = self.machine.seqno();
800 let lease = self
801 .leased_seqnos
802 .entry(seqno)
803 .or_insert_with(|| Lease::new(seqno));
804 lease.clone()
805 }
806
807 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
810 pub async fn clone(&self, purpose: &str) -> Self {
811 let new_reader_id = LeasedReaderId::new();
812 let machine = self.machine.clone();
813 let gc = self.gc.clone();
814 let heartbeat_ts = (self.cfg.now)();
815 let (reader_state, maintenance) = machine
816 .register_leased_reader(
817 &new_reader_id,
818 purpose,
819 READER_LEASE_DURATION.get(&self.cfg),
820 heartbeat_ts,
821 false,
822 )
823 .await;
824 maintenance.start_performing(&machine, &gc);
825 assert!(PartialOrder::less_equal(&reader_state.since, &self.since));
829 let new_reader = ReadHandle::new(
830 self.cfg.clone(),
831 Arc::clone(&self.metrics),
832 machine,
833 gc,
834 Arc::clone(&self.blob),
835 new_reader_id,
836 self.read_schemas.clone(),
837 reader_state.since,
838 heartbeat_ts,
839 )
840 .await;
841 new_reader
842 }
843
844 fn heartbeat_duration(&self) -> Duration {
845 READER_LEASE_DURATION.get(&self.cfg) / 4
846 }
847
848 pub async fn maybe_downgrade_since(&mut self, new_since: &Antichain<T>) {
854 let min_elapsed = self.heartbeat_duration();
855 let elapsed_since_last_heartbeat =
856 Duration::from_millis((self.cfg.now)().saturating_sub(self.last_heartbeat));
857 if elapsed_since_last_heartbeat >= min_elapsed {
858 self.downgrade_since(new_since).await;
859 }
860 }
861
862 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
871 pub async fn expire(mut self) {
872 let Some(unexpired_state) = self.unexpired_state.take() else {
877 return;
878 };
879 unexpired_state.expire_fn.0().await;
880 }
881
882 fn expire_fn(
883 machine: Machine<K, V, T, D>,
884 gc: GarbageCollector<K, V, T, D>,
885 reader_id: LeasedReaderId,
886 ) -> ExpireFn {
887 ExpireFn(Box::new(move || {
888 Box::pin(async move {
889 let (_, maintenance) = machine.expire_leased_reader(&reader_id).await;
890 maintenance.start_performing(&machine, &gc);
891 })
892 }))
893 }
894
895 #[cfg(test)]
897 #[track_caller]
898 pub async fn expect_listen(self, as_of: T) -> Listen<K, V, T, D> {
899 self.listen(Antichain::from_elem(as_of))
900 .await
901 .expect("cannot serve requested as_of")
902 }
903}
904
905#[derive(Debug)]
907pub(crate) struct UnexpiredReadHandleState {
908 expire_fn: ExpireFn,
909 pub(crate) _heartbeat_tasks: Vec<AbortOnDropHandle<()>>,
910}
911
912#[derive(Debug)]
918pub struct Cursor<K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64, L = Lease> {
919 consolidator: Consolidator<T, D, StructuredSort<K, V, T, D>>,
920 max_len: usize,
921 max_bytes: usize,
922 _lease: L,
923 read_schemas: Schemas<K, V>,
924}
925
926impl<K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64, L> Cursor<K, V, T, D, L> {
927 pub fn into_lease(self: Self) -> L {
930 self._lease
931 }
932}
933
934impl<K, V, T, D, L> Cursor<K, V, T, D, L>
935where
936 K: Debug + Codec + Ord,
937 V: Debug + Codec + Ord,
938 T: Timestamp + Lattice + Codec64 + Sync,
939 D: Monoid + Ord + Codec64 + Send + Sync,
940{
941 pub async fn next(
943 &mut self,
944 ) -> Option<impl Iterator<Item = ((Result<K, String>, Result<V, String>), T, D)> + '_> {
945 let Self {
946 consolidator,
947 max_len,
948 max_bytes,
949 _lease,
950 read_schemas: _,
951 } = self;
952
953 let part = consolidator
954 .next_chunk(*max_len, *max_bytes)
955 .await
956 .expect("fetching a leased part")?;
957 let key_decoder = self
958 .read_schemas
959 .key
960 .decoder_any(part.key.as_ref())
961 .expect("ok");
962 let val_decoder = self
963 .read_schemas
964 .val
965 .decoder_any(part.val.as_ref())
966 .expect("ok");
967 let iter = (0..part.len()).map(move |i| {
968 let mut k = K::default();
969 let mut v = V::default();
970 key_decoder.decode(i, &mut k);
971 val_decoder.decode(i, &mut v);
972 let t = T::decode(part.time.value(i).to_le_bytes());
973 let d = D::decode(part.diff.value(i).to_le_bytes());
974 ((Ok(k), Ok(v)), t, d)
975 });
976
977 Some(iter)
978 }
979}
980
981impl<K, V, T, D> ReadHandle<K, V, T, D>
982where
983 K: Debug + Codec + Ord,
984 V: Debug + Codec + Ord,
985 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
986 D: Monoid + Ord + Codec64 + Send + Sync,
987{
988 pub async fn snapshot_and_fetch(
1002 &mut self,
1003 as_of: Antichain<T>,
1004 ) -> Result<Vec<((Result<K, String>, Result<V, String>), T, D)>, Since<T>> {
1005 let mut cursor = self.snapshot_cursor(as_of, |_| true).await?;
1006 let mut contents = Vec::new();
1007 while let Some(iter) = cursor.next().await {
1008 contents.extend(iter);
1009 }
1010
1011 let old_len = contents.len();
1014 consolidate_updates(&mut contents);
1015 if old_len != contents.len() {
1016 self.machine
1018 .applier
1019 .shard_metrics
1020 .unconsolidated_snapshot
1021 .inc();
1022 }
1023
1024 Ok(contents)
1025 }
1026
1027 pub async fn snapshot_cursor(
1034 &mut self,
1035 as_of: Antichain<T>,
1036 should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
1037 ) -> Result<Cursor<K, V, T, D>, Since<T>> {
1038 let batches = self.machine.snapshot(&as_of).await?;
1039 let lease = self.lease_seqno();
1040
1041 Self::read_batches_consolidated(
1042 &self.cfg,
1043 Arc::clone(&self.metrics),
1044 Arc::clone(&self.machine.applier.shard_metrics),
1045 self.metrics.read.snapshot.clone(),
1046 Arc::clone(&self.blob),
1047 self.shard_id(),
1048 as_of,
1049 self.read_schemas.clone(),
1050 &batches,
1051 lease,
1052 should_fetch_part,
1053 COMPACTION_MEMORY_BOUND_BYTES.get(&self.cfg),
1054 )
1055 }
1056
1057 pub(crate) fn read_batches_consolidated<L>(
1058 persist_cfg: &PersistConfig,
1059 metrics: Arc<Metrics>,
1060 shard_metrics: Arc<ShardMetrics>,
1061 read_metrics: ReadMetrics,
1062 blob: Arc<dyn Blob>,
1063 shard_id: ShardId,
1064 as_of: Antichain<T>,
1065 schemas: Schemas<K, V>,
1066 batches: &[HollowBatch<T>],
1067 lease: L,
1068 should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
1069 memory_budget_bytes: usize,
1070 ) -> Result<Cursor<K, V, T, D, L>, Since<T>> {
1071 let context = format!("{}[as_of={:?}]", shard_id, as_of.elements());
1072 let filter = FetchBatchFilter::Snapshot {
1073 as_of: as_of.clone(),
1074 };
1075
1076 let mut consolidator = Consolidator::new(
1077 context,
1078 FetchConfig::from_persist_config(persist_cfg),
1079 shard_id,
1080 StructuredSort::new(schemas.clone()),
1081 blob,
1082 metrics,
1083 shard_metrics,
1084 read_metrics,
1085 filter,
1086 None,
1087 memory_budget_bytes,
1088 );
1089 for batch in batches {
1090 for (meta, run) in batch.runs() {
1091 consolidator.enqueue_run(
1092 &batch.desc,
1093 meta,
1094 run.into_iter()
1095 .filter(|p| should_fetch_part(p.stats()))
1096 .cloned(),
1097 );
1098 }
1099 }
1100 let max_len = persist_cfg.compaction_yield_after_n_updates;
1104 let max_bytes = BLOB_TARGET_SIZE.get(persist_cfg).max(1);
1105
1106 Ok(Cursor {
1107 consolidator,
1108 max_len,
1109 max_bytes,
1110 _lease: lease,
1111 read_schemas: schemas,
1112 })
1113 }
1114
1115 pub fn snapshot_stats(
1127 &self,
1128 as_of: Option<Antichain<T>>,
1129 ) -> impl Future<Output = Result<SnapshotStats, Since<T>>> + Send + 'static {
1130 let machine = self.machine.clone();
1131 async move {
1132 let batches = match as_of {
1133 Some(as_of) => machine.snapshot(&as_of).await?,
1134 None => machine.applier.all_batches(),
1135 };
1136 let num_updates = batches.iter().map(|b| b.len).sum();
1137 Ok(SnapshotStats {
1138 shard_id: machine.shard_id(),
1139 num_updates,
1140 })
1141 }
1142 }
1143
1144 pub async fn snapshot_parts_stats(
1155 &self,
1156 as_of: Antichain<T>,
1157 ) -> Result<SnapshotPartsStats, Since<T>> {
1158 let batches = self.machine.snapshot(&as_of).await?;
1159 let parts = stream::iter(&batches)
1160 .flat_map(|b| b.part_stream(self.shard_id(), &*self.blob, &*self.metrics))
1161 .map(|p| {
1162 let p = p.expect("live batch");
1163 SnapshotPartStats {
1164 encoded_size_bytes: p.encoded_size_bytes(),
1165 stats: p.stats().cloned(),
1166 }
1167 })
1168 .collect()
1169 .await;
1170 Ok(SnapshotPartsStats {
1171 metrics: Arc::clone(&self.machine.applier.metrics),
1172 shard_id: self.machine.shard_id(),
1173 parts,
1174 })
1175 }
1176}
1177
1178impl<K, V, T, D> ReadHandle<K, V, T, D>
1179where
1180 K: Debug + Codec + Ord,
1181 V: Debug + Codec + Ord,
1182 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
1183 D: Monoid + Codec64 + Send + Sync,
1184{
1185 pub async fn snapshot_and_stream(
1190 &mut self,
1191 as_of: Antichain<T>,
1192 ) -> Result<
1193 impl Stream<Item = ((Result<K, String>, Result<V, String>), T, D)> + use<K, V, T, D>,
1194 Since<T>,
1195 > {
1196 let snap = self.snapshot(as_of).await?;
1197
1198 let blob = Arc::clone(&self.blob);
1199 let metrics = Arc::clone(&self.metrics);
1200 let snapshot_metrics = self.metrics.read.snapshot.clone();
1201 let shard_metrics = Arc::clone(&self.machine.applier.shard_metrics);
1202 let reader_id = self.reader_id.clone();
1203 let schemas = self.read_schemas.clone();
1204 let mut schema_cache = self.schema_cache.clone();
1205 let persist_cfg = self.cfg.clone();
1206 let stream = async_stream::stream! {
1207 for part in snap {
1208 let mut fetched_part = fetch_leased_part(
1209 &persist_cfg,
1210 &part,
1211 blob.as_ref(),
1212 Arc::clone(&metrics),
1213 &snapshot_metrics,
1214 &shard_metrics,
1215 &reader_id,
1216 schemas.clone(),
1217 &mut schema_cache,
1218 )
1219 .await;
1220
1221 while let Some(next) = fetched_part.next() {
1222 yield next;
1223 }
1224 }
1225 };
1226
1227 Ok(stream)
1228 }
1229}
1230
1231impl<K, V, T, D> ReadHandle<K, V, T, D>
1232where
1233 K: Debug + Codec + Ord,
1234 V: Debug + Codec + Ord,
1235 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
1236 D: Monoid + Ord + Codec64 + Send + Sync,
1237{
1238 #[cfg(test)]
1241 #[track_caller]
1242 pub async fn expect_snapshot_and_fetch(
1243 &mut self,
1244 as_of: T,
1245 ) -> Vec<((Result<K, String>, Result<V, String>), T, D)> {
1246 let mut ret = self
1247 .snapshot_and_fetch(Antichain::from_elem(as_of))
1248 .await
1249 .expect("cannot serve requested as_of");
1250
1251 ret.sort();
1252 ret
1253 }
1254}
1255
1256impl<K: Codec, V: Codec, T, D> Drop for ReadHandle<K, V, T, D> {
1257 fn drop(&mut self) {
1258 let Some(unexpired_state) = self.unexpired_state.take() else {
1263 return;
1264 };
1265
1266 let handle = match Handle::try_current() {
1267 Ok(x) => x,
1268 Err(_) => {
1269 warn!(
1270 "ReadHandle {} dropped without being explicitly expired, falling back to lease timeout",
1271 self.reader_id
1272 );
1273 return;
1274 }
1275 };
1276 let expire_span = debug_span!("drop::expire");
1282 handle.spawn_named(
1283 || format!("ReadHandle::expire ({})", self.reader_id),
1284 unexpired_state.expire_fn.0().instrument(expire_span),
1285 );
1286 }
1287}
1288
1289#[cfg(test)]
1290mod tests {
1291 use std::pin;
1292 use std::str::FromStr;
1293
1294 use mz_dyncfg::ConfigUpdates;
1295 use mz_ore::cast::CastFrom;
1296 use mz_ore::metrics::MetricsRegistry;
1297 use mz_persist::mem::{MemBlob, MemBlobConfig, MemConsensus};
1298 use mz_persist::unreliable::{UnreliableConsensus, UnreliableHandle};
1299 use serde::{Deserialize, Serialize};
1300 use serde_json::json;
1301 use tokio_stream::StreamExt;
1302
1303 use crate::async_runtime::IsolatedRuntime;
1304 use crate::batch::BLOB_TARGET_SIZE;
1305 use crate::cache::StateCache;
1306 use crate::internal::metrics::Metrics;
1307 use crate::rpc::NoopPubSubSender;
1308 use crate::tests::{all_ok, new_test_client};
1309 use crate::{Diagnostics, PersistClient, PersistConfig, ShardId};
1310
1311 use super::*;
1312
1313 #[mz_persist_proc::test(tokio::test)]
1315 #[cfg_attr(miri, ignore)] async fn drop_unused_subscribe(dyncfgs: ConfigUpdates) {
1317 let data = [
1318 (("0".to_owned(), "zero".to_owned()), 0, 1),
1319 (("1".to_owned(), "one".to_owned()), 1, 1),
1320 (("2".to_owned(), "two".to_owned()), 2, 1),
1321 ];
1322
1323 let (mut write, read) = new_test_client(&dyncfgs)
1324 .await
1325 .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1326 .await;
1327
1328 write.expect_compare_and_append(&data[0..1], 0, 1).await;
1329 write.expect_compare_and_append(&data[1..2], 1, 2).await;
1330 write.expect_compare_and_append(&data[2..3], 2, 3).await;
1331
1332 let subscribe = read
1333 .subscribe(timely::progress::Antichain::from_elem(2))
1334 .await
1335 .unwrap();
1336 assert!(
1337 !subscribe.snapshot.as_ref().unwrap().is_empty(),
1338 "snapshot must have batches for test to be meaningful"
1339 );
1340 drop(subscribe);
1341 }
1342
1343 #[mz_persist_proc::test(tokio::test)]
1345 #[cfg_attr(miri, ignore)] async fn streaming_consolidate(dyncfgs: ConfigUpdates) {
1347 let data = &[
1348 (("k".to_owned(), "v".to_owned()), 0, 1),
1350 (("k".to_owned(), "v".to_owned()), 1, 1),
1351 (("k".to_owned(), "v".to_owned()), 2, 1),
1352 (("k2".to_owned(), "v".to_owned()), 0, 1),
1354 (("k2".to_owned(), "v".to_owned()), 1, -1),
1355 ];
1356
1357 let (mut write, read) = {
1358 let client = new_test_client(&dyncfgs).await;
1359 client.cfg.set_config(&BLOB_TARGET_SIZE, 1000); client
1361 .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1362 .await
1363 };
1364
1365 write.expect_compare_and_append(data, 0, 5).await;
1366
1367 let mut snapshot = read
1368 .subscribe(timely::progress::Antichain::from_elem(4))
1369 .await
1370 .unwrap();
1371
1372 let mut updates = vec![];
1373 'outer: loop {
1374 for event in snapshot.fetch_next().await {
1375 match event {
1376 ListenEvent::Progress(t) => {
1377 if !t.less_than(&4) {
1378 break 'outer;
1379 }
1380 }
1381 ListenEvent::Updates(data) => {
1382 updates.extend(data);
1383 }
1384 }
1385 }
1386 }
1387 assert_eq!(
1388 updates,
1389 &[((Ok("k".to_owned()), Ok("v".to_owned())), 4u64, 3i64)],
1390 )
1391 }
1392
1393 #[mz_persist_proc::test(tokio::test)]
1394 #[cfg_attr(miri, ignore)] async fn snapshot_and_stream(dyncfgs: ConfigUpdates) {
1396 let data = &mut [
1397 (("k1".to_owned(), "v1".to_owned()), 0, 1),
1398 (("k2".to_owned(), "v2".to_owned()), 1, 1),
1399 (("k3".to_owned(), "v3".to_owned()), 2, 1),
1400 (("k4".to_owned(), "v4".to_owned()), 2, 1),
1401 (("k5".to_owned(), "v5".to_owned()), 3, 1),
1402 ];
1403
1404 let (mut write, mut read) = {
1405 let client = new_test_client(&dyncfgs).await;
1406 client.cfg.set_config(&BLOB_TARGET_SIZE, 0); client
1408 .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1409 .await
1410 };
1411
1412 write.expect_compare_and_append(&data[0..2], 0, 2).await;
1413 write.expect_compare_and_append(&data[2..4], 2, 3).await;
1414 write.expect_compare_and_append(&data[4..], 3, 4).await;
1415
1416 let as_of = Antichain::from_elem(3);
1417 let mut snapshot = pin::pin!(read.snapshot_and_stream(as_of.clone()).await.unwrap());
1418
1419 let mut snapshot_rows = vec![];
1420 while let Some(((k, v), t, d)) = snapshot.next().await {
1421 snapshot_rows.push(((k.expect("valid key"), v.expect("valid key")), t, d));
1422 }
1423
1424 for ((_k, _v), t, _d) in data.as_mut_slice() {
1425 t.advance_by(as_of.borrow());
1426 }
1427
1428 assert_eq!(data.as_slice(), snapshot_rows.as_slice());
1429 }
1430
1431 #[mz_persist_proc::test(tokio::test)]
1433 #[cfg_attr(miri, ignore)] async fn seqno_leases(dyncfgs: ConfigUpdates) {
1435 let mut data = vec![];
1436 for i in 0..20 {
1437 data.push(((i.to_string(), i.to_string()), i, 1))
1438 }
1439
1440 let shard_id = ShardId::new();
1441
1442 let client = new_test_client(&dyncfgs).await;
1443 let (mut write, read) = client
1444 .expect_open::<String, String, u64, i64>(shard_id)
1445 .await;
1446
1447 let mut offset = 0;
1449 let mut width = 2;
1450
1451 for i in offset..offset + width {
1452 write
1453 .expect_compare_and_append(
1454 &data[i..i + 1],
1455 u64::cast_from(i),
1456 u64::cast_from(i) + 1,
1457 )
1458 .await;
1459 }
1460 offset += width;
1461
1462 let mut fetcher = client
1464 .create_batch_fetcher::<String, String, u64, i64>(
1465 shard_id,
1466 Default::default(),
1467 Default::default(),
1468 false,
1469 Diagnostics::for_tests(),
1470 )
1471 .await
1472 .unwrap();
1473
1474 let mut subscribe = read
1475 .subscribe(timely::progress::Antichain::from_elem(1))
1476 .await
1477 .expect("cannot serve requested as_of");
1478
1479 let original_seqno_since = subscribe.listen.handle.machine.applier.seqno_since();
1481
1482 let mut parts = vec![];
1483
1484 width = 4;
1485 for i in offset..offset + width {
1487 for event in subscribe.next(None).await {
1488 if let ListenEvent::Updates(mut new_parts) = event {
1489 parts.append(&mut new_parts);
1490 subscribe
1493 .listen
1494 .handle
1495 .downgrade_since(&subscribe.listen.since)
1496 .await;
1497 }
1498 }
1499
1500 write
1501 .expect_compare_and_append(
1502 &data[i..i + 1],
1503 u64::cast_from(i),
1504 u64::cast_from(i) + 1,
1505 )
1506 .await;
1507
1508 assert_eq!(
1510 subscribe.listen.handle.machine.applier.seqno_since(),
1511 original_seqno_since
1512 );
1513 }
1514
1515 offset += width;
1516
1517 let mut seqno_since = subscribe.listen.handle.machine.applier.seqno_since();
1518
1519 assert_eq!(seqno_since, original_seqno_since);
1521
1522 let mut subsequent_parts = vec![];
1525
1526 let mut this_seqno = SeqNo::minimum();
1530
1531 for (mut i, part) in parts.into_iter().enumerate() {
1533 let part_seqno = part.lease.seqno();
1534 let last_seqno = this_seqno;
1535 this_seqno = part_seqno;
1536 assert!(this_seqno >= last_seqno);
1537
1538 let (part, lease) = part.into_exchangeable_part();
1539 let _ = fetcher.fetch_leased_part(part).await;
1540 drop(lease);
1541
1542 for event in subscribe.next(None).await {
1544 if let ListenEvent::Updates(parts) = event {
1545 for part in parts {
1546 let (_, lease) = part.into_exchangeable_part();
1547 subsequent_parts.push(lease);
1548 }
1549 }
1550 }
1551
1552 subscribe
1553 .listen
1554 .handle
1555 .downgrade_since(&subscribe.listen.since)
1556 .await;
1557
1558 i += offset;
1560 write
1561 .expect_compare_and_append(
1562 &data[i..i + 1],
1563 u64::cast_from(i),
1564 u64::cast_from(i) + 1,
1565 )
1566 .await;
1567
1568 let expect_downgrade = subscribe.listen.handle.outstanding_seqno() > Some(part_seqno);
1571
1572 let new_seqno_since = subscribe.listen.handle.machine.applier.seqno_since();
1573 if expect_downgrade {
1574 assert!(new_seqno_since > seqno_since);
1575 } else {
1576 assert_eq!(new_seqno_since, seqno_since);
1577 }
1578 seqno_since = new_seqno_since;
1579 }
1580
1581 assert!(seqno_since > original_seqno_since);
1583
1584 drop(subsequent_parts);
1586 drop(subscribe);
1587 }
1588
1589 #[mz_ore::test]
1590 fn reader_id_human_readable_serde() {
1591 #[derive(Debug, Serialize, Deserialize)]
1592 struct Container {
1593 reader_id: LeasedReaderId,
1594 }
1595
1596 let id =
1598 LeasedReaderId::from_str("r00000000-1234-5678-0000-000000000000").expect("valid id");
1599 assert_eq!(
1600 id,
1601 serde_json::from_value(serde_json::to_value(id.clone()).expect("serializable"))
1602 .expect("deserializable")
1603 );
1604
1605 assert_eq!(
1607 id,
1608 serde_json::from_str("\"r00000000-1234-5678-0000-000000000000\"")
1609 .expect("deserializable")
1610 );
1611
1612 let json = json!({ "reader_id": id });
1614 assert_eq!(
1615 "{\"reader_id\":\"r00000000-1234-5678-0000-000000000000\"}",
1616 &json.to_string()
1617 );
1618 let container: Container = serde_json::from_value(json).expect("deserializable");
1619 assert_eq!(container.reader_id, id);
1620 }
1621
1622 #[mz_ore::test(tokio::test)]
1626 #[cfg_attr(miri, ignore)] async fn skip_consensus_fetch_optimization() {
1628 let data = vec![
1629 (("0".to_owned(), "zero".to_owned()), 0, 1),
1630 (("1".to_owned(), "one".to_owned()), 1, 1),
1631 (("2".to_owned(), "two".to_owned()), 2, 1),
1632 ];
1633
1634 let cfg = PersistConfig::new_for_tests();
1635 let blob = Arc::new(MemBlob::open(MemBlobConfig::default()));
1636 let consensus = Arc::new(MemConsensus::default());
1637 let unreliable = UnreliableHandle::default();
1638 unreliable.totally_available();
1639 let consensus = Arc::new(UnreliableConsensus::new(consensus, unreliable.clone()));
1640 let metrics = Arc::new(Metrics::new(&cfg, &MetricsRegistry::new()));
1641 let pubsub_sender = Arc::new(NoopPubSubSender);
1642 let (mut write, mut read) = PersistClient::new(
1643 cfg,
1644 blob,
1645 consensus,
1646 metrics,
1647 Arc::new(IsolatedRuntime::new_for_tests()),
1648 Arc::new(StateCache::new_no_metrics()),
1649 pubsub_sender,
1650 )
1651 .expect("client construction failed")
1652 .expect_open::<String, String, u64, i64>(ShardId::new())
1653 .await;
1654
1655 write.expect_compare_and_append(&data[0..1], 0, 1).await;
1656 write.expect_compare_and_append(&data[1..2], 1, 2).await;
1657 write.expect_compare_and_append(&data[2..3], 2, 3).await;
1658
1659 let snapshot = read.expect_snapshot_and_fetch(2).await;
1660 let mut listen = read.expect_listen(0).await;
1661
1662 let listen_actual = listen.fetch_next().await;
1667 let expected_events = vec![ListenEvent::Progress(Antichain::from_elem(1))];
1668 assert_eq!(listen_actual, expected_events);
1669
1670 unreliable.totally_unavailable();
1673 assert_eq!(snapshot, all_ok(&data, 2));
1674 assert_eq!(
1675 listen.read_until(&3).await,
1676 (all_ok(&data[1..], 1), Antichain::from_elem(3))
1677 );
1678 }
1679}