1use async_stream::stream;
13use std::collections::BTreeMap;
14use std::fmt::Debug;
15use std::future::Future;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use differential_dataflow::Hashable;
20use differential_dataflow::consolidation::consolidate_updates;
21use differential_dataflow::difference::Monoid;
22use differential_dataflow::lattice::Lattice;
23use futures::Stream;
24use futures_util::{StreamExt, stream};
25use mz_dyncfg::Config;
26use mz_ore::cast::CastLossy;
27use mz_ore::halt;
28use mz_ore::instrument;
29use mz_ore::task::JoinHandle;
30use mz_persist::location::{Blob, SeqNo};
31use mz_persist_types::columnar::{ColumnDecoder, Schema};
32use mz_persist_types::{Codec, Codec64};
33use proptest_derive::Arbitrary;
34use serde::{Deserialize, Serialize};
35use timely::PartialOrder;
36use timely::order::TotalOrder;
37use timely::progress::{Antichain, Timestamp};
38use tracing::warn;
39use uuid::Uuid;
40
41use crate::batch::BLOB_TARGET_SIZE;
42use crate::cfg::{COMPACTION_MEMORY_BOUND_BYTES, RetryParameters};
43use crate::fetch::FetchConfig;
44use crate::fetch::{FetchBatchFilter, FetchedPart, Lease, LeasedBatchPart, fetch_leased_part};
45use crate::internal::encoding::Schemas;
46use crate::internal::machine::{Machine, next_listen_batch_retry_params};
47use crate::internal::metrics::{Metrics, ReadMetrics, ShardMetrics};
48use crate::internal::state::{HollowBatch, LeasedReaderState, SnapshotErr};
49use crate::internal::watch::{AwaitableState, StateWatch};
50use crate::iter::{Consolidator, StructuredSort};
51use crate::schema::SchemaCache;
52use crate::stats::{SnapshotPartStats, SnapshotPartsStats, SnapshotStats};
53use crate::{GarbageCollector, PersistConfig, ShardId, parse_id};
54
55pub use crate::internal::encoding::LazyPartStats;
56pub use crate::internal::state::Since;
57
58#[derive(
60 Arbitrary,
61 Clone,
62 PartialEq,
63 Eq,
64 PartialOrd,
65 Ord,
66 Hash,
67 Serialize,
68 Deserialize
69)]
70#[serde(try_from = "String", into = "String")]
71pub struct LeasedReaderId(pub(crate) [u8; 16]);
72
73impl std::fmt::Display for LeasedReaderId {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 write!(f, "r{}", Uuid::from_bytes(self.0))
76 }
77}
78
79impl std::fmt::Debug for LeasedReaderId {
80 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81 write!(f, "LeasedReaderId({})", Uuid::from_bytes(self.0))
82 }
83}
84
85impl std::str::FromStr for LeasedReaderId {
86 type Err = String;
87
88 fn from_str(s: &str) -> Result<Self, Self::Err> {
89 parse_id("r", "LeasedReaderId", s).map(LeasedReaderId)
90 }
91}
92
93impl From<LeasedReaderId> for String {
94 fn from(reader_id: LeasedReaderId) -> Self {
95 reader_id.to_string()
96 }
97}
98
99impl TryFrom<String> for LeasedReaderId {
100 type Error = String;
101
102 fn try_from(s: String) -> Result<Self, Self::Error> {
103 s.parse()
104 }
105}
106
107impl LeasedReaderId {
108 pub(crate) fn new() -> Self {
109 LeasedReaderId(*Uuid::new_v4().as_bytes())
110 }
111}
112
113#[derive(Debug)]
118pub struct Subscribe<K: Codec, V: Codec, T, D> {
119 snapshot: Option<Vec<LeasedBatchPart<T>>>,
120 listen: Listen<K, V, T, D>,
121}
122
123impl<K, V, T, D> Subscribe<K, V, T, D>
124where
125 K: Debug + Codec,
126 V: Debug + Codec,
127 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
128 D: Monoid + Codec64 + Send + Sync,
129{
130 fn new(snapshot_parts: Vec<LeasedBatchPart<T>>, listen: Listen<K, V, T, D>) -> Self {
131 Subscribe {
132 snapshot: Some(snapshot_parts),
133 listen,
134 }
135 }
136
137 #[instrument(level = "debug", fields(shard = %self.listen.handle.machine.shard_id()))]
145 pub async fn next(
146 &mut self,
147 listen_retry: Option<RetryParameters>,
149 ) -> Vec<ListenEvent<T, LeasedBatchPart<T>>> {
150 match self.snapshot.take() {
151 Some(parts) => vec![ListenEvent::Updates(parts)],
152 None => {
153 let (parts, upper) = self.listen.next(listen_retry).await;
154 vec![ListenEvent::Updates(parts), ListenEvent::Progress(upper)]
155 }
156 }
157 }
158}
159
160impl<K, V, T, D> Subscribe<K, V, T, D>
161where
162 K: Debug + Codec,
163 V: Debug + Codec,
164 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
165 D: Monoid + Codec64 + Send + Sync,
166{
167 #[instrument(level = "debug", fields(shard = %self.listen.handle.machine.shard_id()))]
170 pub async fn fetch_next(&mut self) -> Vec<ListenEvent<T, ((K, V), T, D)>> {
171 let events = self.next(None).await;
172 let new_len = events
173 .iter()
174 .map(|event| match event {
175 ListenEvent::Updates(parts) => parts.len(),
176 ListenEvent::Progress(_) => 1,
177 })
178 .sum();
179 let mut ret = Vec::with_capacity(new_len);
180 for event in events {
181 match event {
182 ListenEvent::Updates(parts) => {
183 for part in parts {
184 let fetched_part = self.listen.fetch_batch_part(part).await;
185 let updates = fetched_part.collect::<Vec<_>>();
186 if !updates.is_empty() {
187 ret.push(ListenEvent::Updates(updates));
188 }
189 }
190 }
191 ListenEvent::Progress(progress) => ret.push(ListenEvent::Progress(progress)),
192 }
193 }
194 ret
195 }
196
197 pub async fn fetch_batch_part(&mut self, part: LeasedBatchPart<T>) -> FetchedPart<K, V, T, D> {
199 self.listen.fetch_batch_part(part).await
200 }
201}
202
203impl<K, V, T, D> Subscribe<K, V, T, D>
204where
205 K: Debug + Codec,
206 V: Debug + Codec,
207 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
208 D: Monoid + Codec64 + Send + Sync,
209{
210 pub async fn expire(mut self) {
217 let _ = self.snapshot.take(); self.listen.expire().await;
219 }
220}
221
222#[derive(Debug, PartialEq)]
226pub enum ListenEvent<T, D> {
227 Progress(Antichain<T>),
229 Updates(Vec<D>),
231}
232
233#[derive(Debug)]
235pub struct Listen<K: Codec, V: Codec, T, D> {
236 handle: ReadHandle<K, V, T, D>,
237 as_of: Antichain<T>,
238 since: Antichain<T>,
239 frontier: Antichain<T>,
240}
241
242impl<K, V, T, D> Listen<K, V, T, D>
243where
244 K: Debug + Codec,
245 V: Debug + Codec,
246 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
247 D: Monoid + Codec64 + Send + Sync,
248{
249 async fn new(
250 mut handle: ReadHandle<K, V, T, D>,
251 as_of: Antichain<T>,
252 ) -> Result<Self, Since<T>> {
253 let () = handle.machine.verify_listen(&as_of)?;
254
255 let since = as_of.clone();
256 if !PartialOrder::less_equal(handle.since(), &since) {
257 return Err(Since(handle.since().clone()));
260 }
261 handle.downgrade_since(&since).await;
265 Ok(Listen {
266 handle,
267 since,
268 frontier: as_of.clone(),
269 as_of,
270 })
271 }
272
273 pub fn frontier(&self) -> &Antichain<T> {
275 &self.frontier
276 }
277
278 pub async fn next(
286 &mut self,
287 retry: Option<RetryParameters>,
289 ) -> (Vec<LeasedBatchPart<T>>, Antichain<T>) {
290 let retry = retry
292 .unwrap_or_else(|| next_listen_batch_retry_params(&self.handle.machine.applier.cfg));
293 self.handle
294 .machine
295 .wait_for_upper_past(
296 &self.frontier,
297 &mut self.handle.watch,
298 Some(&self.handle.reader_id),
299 &self.handle.metrics.retries.next_listen_batch,
300 retry,
301 )
302 .await;
303
304 let lease = self.handle.lease_seqno().await;
306 let batch = match self
307 .handle
308 .machine
309 .applier
310 .next_listen_batch(&self.frontier)
311 {
312 Ok(batch) => batch,
313 Err(seqno) => {
314 panic!(
315 "waited for upper past {frontier:?}, but no listen batch was available at {seqno:?}!",
316 frontier = self.frontier.elements()
317 );
318 }
319 };
320
321 let acceptable_desc = PartialOrder::less_than(batch.desc.since(), &self.frontier)
331 || (self.frontier == self.as_of
336 && PartialOrder::less_equal(batch.desc.since(), &self.frontier));
337 if !acceptable_desc {
338 let lease_state = self
339 .handle
340 .machine
341 .applier
342 .reader_lease(self.handle.reader_id.clone());
343 if let Some(lease) = lease_state {
344 panic!(
345 "Listen on {} received a batch {:?} advanced past the listen frontier {:?}, but the lease has not expired: {:?}",
346 self.handle.machine.shard_id(),
347 batch.desc,
348 self.frontier,
349 lease
350 )
351 } else {
352 halt!(
355 "Listen on {} received a batch {:?} advanced past the listen frontier {:?} after the reader has expired. \
356 This can happen in exceptional cases: a machine goes to sleep or is running out of memory or CPU, for example.",
357 self.handle.machine.shard_id(),
358 batch.desc,
359 self.frontier
360 )
361 }
362 }
363
364 let new_frontier = batch.desc.upper().clone();
365
366 for x in self.frontier.elements().iter() {
389 let less_than_upper = batch.desc.upper().elements().iter().any(|u| x.less_than(u));
390 if less_than_upper {
391 self.since.join_assign(&Antichain::from_elem(x.clone()));
392 }
393 }
394
395 let filter = FetchBatchFilter::Listen {
400 as_of: self.as_of.clone(),
401 lower: self.frontier.clone(),
402 };
403 let parts = self
404 .handle
405 .lease_batch_parts(lease, batch, filter)
406 .collect()
407 .await;
408
409 self.handle.maybe_downgrade_since(&self.since).await;
410
411 self.frontier = new_frontier;
414
415 (parts, self.frontier.clone())
416 }
417}
418
419impl<K, V, T, D> Listen<K, V, T, D>
420where
421 K: Debug + Codec,
422 V: Debug + Codec,
423 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
424 D: Monoid + Codec64 + Send + Sync,
425{
426 #[instrument(level = "debug", name = "listen::next", fields(shard = %self.handle.machine.shard_id()))]
437 pub async fn fetch_next(&mut self) -> Vec<ListenEvent<T, ((K, V), T, D)>> {
438 let (parts, progress) = self.next(None).await;
439 let mut ret = Vec::with_capacity(parts.len() + 1);
440 for part in parts {
441 let fetched_part = self.fetch_batch_part(part).await;
442 let updates = fetched_part.collect::<Vec<_>>();
443 if !updates.is_empty() {
444 ret.push(ListenEvent::Updates(updates));
445 }
446 }
447 ret.push(ListenEvent::Progress(progress));
448 ret
449 }
450
451 pub fn into_stream(mut self) -> impl Stream<Item = ListenEvent<T, ((K, V), T, D)>> {
453 async_stream::stream!({
454 loop {
455 for msg in self.fetch_next().await {
456 yield msg;
457 }
458 }
459 })
460 }
461
462 #[cfg(test)]
466 #[track_caller]
467 pub async fn read_until(&mut self, ts: &T) -> (Vec<((K, V), T, D)>, Antichain<T>) {
468 let mut updates = Vec::new();
469 let mut frontier = Antichain::from_elem(T::minimum());
470 while self.frontier.less_than(ts) {
471 for event in self.fetch_next().await {
472 match event {
473 ListenEvent::Updates(mut x) => updates.append(&mut x),
474 ListenEvent::Progress(x) => frontier = x,
475 }
476 }
477 }
478 (updates, frontier)
481 }
482}
483
484impl<K, V, T, D> Listen<K, V, T, D>
485where
486 K: Debug + Codec,
487 V: Debug + Codec,
488 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
489 D: Monoid + Codec64 + Send + Sync,
490{
491 async fn fetch_batch_part(&mut self, part: LeasedBatchPart<T>) -> FetchedPart<K, V, T, D> {
496 let fetched_part = fetch_leased_part(
497 &self.handle.cfg,
498 &part,
499 self.handle.blob.as_ref(),
500 Arc::clone(&self.handle.metrics),
501 &self.handle.metrics.read.listen,
502 &self.handle.machine.applier.shard_metrics,
503 &self.handle.reader_id,
504 self.handle.read_schemas.clone(),
505 &mut self.handle.schema_cache,
506 )
507 .await;
508 fetched_part
509 }
510
511 pub async fn expire(self) {
518 self.handle.expire().await
519 }
520}
521
522#[derive(Debug)]
525pub(crate) struct ReadHolds<T> {
526 held_since: Antichain<T>,
528 applied_since: Antichain<T>,
531 recent_seqno: SeqNo,
533 leases: BTreeMap<SeqNo, Lease>,
536 expired: bool,
538 request_sync: bool,
541}
542
543impl<T> ReadHolds<T>
544where
545 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
546{
547 pub fn downgrade_since(&mut self, since: &Antichain<T>) {
548 self.held_since.join_assign(since);
549 }
550
551 pub fn observe_seqno(&mut self, seqno: SeqNo) {
552 self.recent_seqno = seqno.max(self.recent_seqno);
553 }
554
555 pub fn lease_seqno(&mut self) -> Lease {
556 let seqno = self.recent_seqno;
557 let lease = self
558 .leases
559 .entry(seqno)
560 .or_insert_with(|| Lease::new(seqno));
561 lease.clone()
562 }
563
564 pub fn outstanding_seqno(&mut self) -> SeqNo {
565 while let Some(first) = self.leases.first_entry() {
566 if first.get().count() <= 1 {
567 first.remove();
568 } else {
569 return *first.key();
570 }
571 }
572 self.recent_seqno
573 }
574}
575
576#[derive(Debug)]
597pub struct ReadHandle<K: Codec, V: Codec, T, D> {
598 pub(crate) cfg: PersistConfig,
599 pub(crate) metrics: Arc<Metrics>,
600 pub(crate) machine: Machine<K, V, T, D>,
601 pub(crate) gc: GarbageCollector<K, V, T, D>,
602 pub(crate) blob: Arc<dyn Blob>,
603 watch: StateWatch<K, V, T, D>,
604
605 pub(crate) reader_id: LeasedReaderId,
606 pub(crate) read_schemas: Schemas<K, V>,
607 pub(crate) schema_cache: SchemaCache<K, V, T, D>,
608
609 since: Antichain<T>,
610 pub(crate) hold_state: AwaitableState<ReadHolds<T>>,
611 pub(crate) unexpired_state: Option<UnexpiredReadHandleState>,
612}
613
614pub(crate) const READER_LEASE_DURATION: Config<Duration> = Config::new(
617 "persist_reader_lease_duration",
618 Duration::from_secs(60 * 15),
619 "The time after which we'll clean up stale read leases",
620);
621
622impl<K, V, T, D> ReadHandle<K, V, T, D>
623where
624 K: Debug + Codec,
625 V: Debug + Codec,
626 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
627 D: Monoid + Codec64 + Send + Sync,
628{
629 #[allow(clippy::unused_async)]
630 pub(crate) async fn new(
631 cfg: PersistConfig,
632 metrics: Arc<Metrics>,
633 machine: Machine<K, V, T, D>,
634 gc: GarbageCollector<K, V, T, D>,
635 blob: Arc<dyn Blob>,
636 reader_id: LeasedReaderId,
637 read_schemas: Schemas<K, V>,
638 state: LeasedReaderState<T>,
639 ) -> Self {
640 let schema_cache = machine.applier.schema_cache();
641 let hold_state = AwaitableState::new(ReadHolds {
642 held_since: state.since.clone(),
643 applied_since: state.since.clone(),
644 recent_seqno: state.seqno,
645 leases: Default::default(),
646 expired: false,
647 request_sync: false,
648 });
649 ReadHandle {
650 cfg,
651 metrics: Arc::clone(&metrics),
652 machine: machine.clone(),
653 gc: gc.clone(),
654 blob,
655 watch: machine.applier.watch(),
656 reader_id: reader_id.clone(),
657 read_schemas,
658 schema_cache,
659 since: state.since,
660 hold_state: hold_state.clone(),
661 unexpired_state: Some(UnexpiredReadHandleState {
662 heartbeat_task: Self::start_reader_heartbeat_task(
663 machine, reader_id, gc, hold_state,
664 ),
665 }),
666 }
667 }
668
669 fn start_reader_heartbeat_task(
670 machine: Machine<K, V, T, D>,
671 reader_id: LeasedReaderId,
672 gc: GarbageCollector<K, V, T, D>,
673 leased_seqnos: AwaitableState<ReadHolds<T>>,
674 ) -> JoinHandle<()> {
675 let metrics = Arc::clone(&machine.applier.metrics);
676 let name = format!(
677 "persist::heartbeat_read({},{})",
678 machine.shard_id(),
679 reader_id
680 );
681 mz_ore::task::spawn(|| name, {
682 metrics.tasks.heartbeat_read.instrument_task(async move {
683 Self::reader_heartbeat_task(machine, reader_id, gc, leased_seqnos).await
684 })
685 })
686 }
687
688 async fn reader_heartbeat_task(
689 machine: Machine<K, V, T, D>,
690 reader_id: LeasedReaderId,
691 gc: GarbageCollector<K, V, T, D>,
692 leased_seqnos: AwaitableState<ReadHolds<T>>,
693 ) {
694 let sleep_duration = READER_LEASE_DURATION.get(&machine.applier.cfg) / 4;
695 let jitter: f64 = f64::cast_lossy(reader_id.hashed()) / f64::cast_lossy(u64::MAX);
698 let mut interval = tokio::time::interval_at(
699 tokio::time::Instant::now() + sleep_duration.mul_f64(jitter),
700 sleep_duration,
701 );
702 let mut held_since = leased_seqnos.read(|s| s.held_since.clone());
703 loop {
704 let before_sleep = Instant::now();
705 let _woke_by_tick = tokio::select! {
706 _tick = interval.tick() => {
707 true
708 }
709 _whatever = leased_seqnos.wait_while(|s| !s.request_sync) => {
710 false
711 }
712 };
713
714 let elapsed_since_before_sleeping = before_sleep.elapsed();
715 if elapsed_since_before_sleeping > sleep_duration + Duration::from_secs(60) {
716 warn!(
717 "reader ({}) of shard ({}) went {}s between heartbeats",
718 reader_id,
719 machine.shard_id(),
720 elapsed_since_before_sleeping.as_secs_f64()
721 );
722 }
723
724 let before_heartbeat = Instant::now();
725 let heartbeat_ms = (machine.applier.cfg.now)();
726 let current_seqno = machine.seqno();
727 let result = leased_seqnos.modify(|s| {
728 if s.expired {
729 Err(())
730 } else {
731 s.observe_seqno(current_seqno);
732 s.request_sync = false;
733 held_since.join_assign(&s.held_since);
734 Ok(s.outstanding_seqno())
735 }
736 });
737 let actual_since = match result {
738 Ok(held_seqno) => {
739 let (seqno, actual_since, maintenance) = machine
740 .downgrade_since(&reader_id, held_seqno, &held_since, heartbeat_ms)
741 .await;
742 leased_seqnos.modify(|s| {
743 s.applied_since.clone_from(&actual_since.0);
744 s.observe_seqno(seqno)
745 });
746 maintenance.start_performing(&machine, &gc);
747 actual_since
748 }
749 Err(()) => {
750 let (seqno, maintenance) = machine.expire_leased_reader(&reader_id).await;
751 leased_seqnos.modify(|s| s.observe_seqno(seqno));
752 maintenance.start_performing(&machine, &gc);
753 break;
754 }
755 };
756
757 let elapsed_since_heartbeat = before_heartbeat.elapsed();
758 if elapsed_since_heartbeat > Duration::from_secs(60) {
759 warn!(
760 "reader ({}) of shard ({}) heartbeat call took {}s",
761 reader_id,
762 machine.shard_id(),
763 elapsed_since_heartbeat.as_secs_f64(),
764 );
765 }
766
767 if PartialOrder::less_than(&held_since, &actual_since.0) {
768 warn!(
776 "heartbeat task for reader ({}) of shard ({}) exiting due to expired lease \
777 while read handle is live",
778 reader_id,
779 machine.shard_id(),
780 );
781 return;
782 }
783 }
784 }
785
786 pub fn shard_id(&self) -> ShardId {
788 self.machine.shard_id()
789 }
790
791 pub fn since(&self) -> &Antichain<T> {
795 &self.since
796 }
797
798 #[cfg(test)]
799 fn outstanding_seqno(&self) -> SeqNo {
800 let current_seqno = self.machine.seqno();
801 self.hold_state.modify(|s| {
802 s.observe_seqno(current_seqno);
803 s.outstanding_seqno()
804 })
805 }
806
807 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
814 pub async fn downgrade_since(&mut self, new_since: &Antichain<T>) {
815 self.since = new_since.clone();
816 self.hold_state.modify(|s| {
817 s.downgrade_since(new_since);
818 s.request_sync = true;
819 });
820 self.hold_state
821 .wait_while(|s| PartialOrder::less_than(&s.applied_since, new_since))
822 .await;
823 }
824
825 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
842 pub async fn listen(self, as_of: Antichain<T>) -> Result<Listen<K, V, T, D>, Since<T>> {
843 Listen::new(self, as_of).await
844 }
845
846 async fn snapshot_batches(
847 &mut self,
848 as_of: Antichain<T>,
849 ) -> Result<(Lease, Vec<HollowBatch<T>>), Since<T>> {
850 self.machine
851 .wait_for_upper_past(
852 &as_of,
853 &mut self.watch,
854 Some(&self.reader_id),
855 &self.metrics.retries.snapshot,
856 RetryParameters::persist_defaults(),
857 )
858 .await;
859 let lease = self.lease_seqno().await;
860 let batches = match self.machine.applier.snapshot(&as_of) {
861 Ok(data) => data,
862 Err(SnapshotErr::AsOfHistoricalDistinctionsLost(since)) => return Err(since),
863 Err(SnapshotErr::AsOfNotYetAvailable(seqno, upper)) => {
864 panic!(
865 "waited for upper past {as_of:?}, but at latest seqno {seqno:?} the frontier was only {upper:?}",
866 as_of = as_of.elements(),
867 upper = upper.0.elements(),
868 )
869 }
870 };
871 Ok((lease, batches))
872 }
873
874 #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
888 pub async fn snapshot(
889 &mut self,
890 as_of: Antichain<T>,
891 ) -> Result<Vec<LeasedBatchPart<T>>, Since<T>> {
892 let (lease, batches) = self.snapshot_batches(as_of.clone()).await?;
893
894 if !PartialOrder::less_equal(self.since(), &as_of) {
895 return Err(Since(self.since().clone()));
896 }
897
898 let filter = FetchBatchFilter::Snapshot { as_of };
899 let mut leased_parts = Vec::new();
900 for batch in batches {
901 leased_parts.extend(
906 self.lease_batch_parts(lease.clone(), batch, filter.clone())
907 .collect::<Vec<_>>()
908 .await,
909 );
910 }
911 Ok(leased_parts)
912 }
913
914 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
920 pub async fn subscribe(
921 mut self,
922 as_of: Antichain<T>,
923 ) -> Result<Subscribe<K, V, T, D>, Since<T>> {
924 let snapshot_parts = self.snapshot(as_of.clone()).await?;
925 let listen = self.listen(as_of.clone()).await?;
926 Ok(Subscribe::new(snapshot_parts, listen))
927 }
928
929 fn lease_batch_parts(
930 &mut self,
931 lease: Lease,
932 batch: HollowBatch<T>,
933 filter: FetchBatchFilter<T>,
934 ) -> impl Stream<Item = LeasedBatchPart<T>> + '_ {
935 stream! {
936 let blob = Arc::clone(&self.blob);
937 let metrics = Arc::clone(&self.metrics);
938 let desc = batch.desc.clone();
939 for await part in batch.part_stream(self.shard_id(), &*blob, &*metrics) {
940 yield LeasedBatchPart {
941 metrics: Arc::clone(&self.metrics),
942 shard_id: self.machine.shard_id(),
943 filter: filter.clone(),
944 desc: desc.clone(),
945 part: part.expect("leased part").into_owned(),
946 lease: lease.clone(),
947 reader_id: self.reader_id.clone(),
948 filter_pushdown_audit: false,
949 }
950 }
951 }
952 }
953
954 async fn lease_seqno(&mut self) -> Lease {
964 let current_seqno = self.machine.seqno();
965 let lease = self.hold_state.modify(|s| {
966 s.observe_seqno(current_seqno);
967 s.lease_seqno()
968 });
969 self.watch.wait_for_seqno_ge(lease.seqno()).await;
974 lease
975 }
976
977 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
980 pub async fn clone(&self, purpose: &str) -> Self {
981 let new_reader_id = LeasedReaderId::new();
982 let machine = self.machine.clone();
983 let gc = self.gc.clone();
984 let heartbeat_ts = (self.cfg.now)();
985 let (reader_state, maintenance) = machine
986 .register_leased_reader(
987 &new_reader_id,
988 purpose,
989 READER_LEASE_DURATION.get(&self.cfg),
990 heartbeat_ts,
991 false,
992 )
993 .await;
994 maintenance.start_performing(&machine, &gc);
995 assert!(PartialOrder::less_equal(&reader_state.since, &self.since));
999 let new_reader = ReadHandle::new(
1000 self.cfg.clone(),
1001 Arc::clone(&self.metrics),
1002 machine,
1003 gc,
1004 Arc::clone(&self.blob),
1005 new_reader_id,
1006 self.read_schemas.clone(),
1007 reader_state,
1008 )
1009 .await;
1010 new_reader
1011 }
1012
1013 #[allow(clippy::unused_async)]
1018 pub async fn maybe_downgrade_since(&mut self, new_since: &Antichain<T>) {
1019 self.since = new_since.clone();
1020 self.hold_state.modify(|s| {
1021 s.downgrade_since(new_since);
1022 });
1023 }
1024
1025 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
1031 pub async fn expire(mut self) {
1032 self.hold_state.modify(|s| {
1033 s.expired = true;
1034 s.request_sync = true;
1035 });
1036 let Some(unexpired_state) = self.unexpired_state.take() else {
1037 return;
1038 };
1039 unexpired_state.heartbeat_task.await;
1040 }
1041
1042 #[cfg(test)]
1044 #[track_caller]
1045 pub async fn expect_listen(self, as_of: T) -> Listen<K, V, T, D> {
1046 self.listen(Antichain::from_elem(as_of))
1047 .await
1048 .expect("cannot serve requested as_of")
1049 }
1050}
1051
1052#[derive(Debug)]
1054pub(crate) struct UnexpiredReadHandleState {
1055 pub(crate) heartbeat_task: JoinHandle<()>,
1056}
1057
1058#[derive(Debug)]
1064pub struct Cursor<K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64, L = Lease> {
1065 consolidator: Consolidator<T, D, StructuredSort<K, V, T, D>>,
1066 max_len: usize,
1067 max_bytes: usize,
1068 _lease: L,
1069 read_schemas: Schemas<K, V>,
1070}
1071
1072impl<K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64, L> Cursor<K, V, T, D, L> {
1073 pub fn into_lease(self: Self) -> L {
1076 self._lease
1077 }
1078}
1079
1080impl<K, V, T, D, L> Cursor<K, V, T, D, L>
1081where
1082 K: Debug + Codec + Ord,
1083 V: Debug + Codec + Ord,
1084 T: Timestamp + Lattice + Codec64 + Sync,
1085 D: Monoid + Ord + Codec64 + Send + Sync,
1086{
1087 pub async fn next(&mut self) -> Option<impl Iterator<Item = ((K, V), T, D)> + '_> {
1089 let Self {
1090 consolidator,
1091 max_len,
1092 max_bytes,
1093 _lease,
1094 read_schemas: _,
1095 } = self;
1096
1097 let part = consolidator
1098 .next_chunk(*max_len, *max_bytes)
1099 .await
1100 .expect("fetching a leased part")?;
1101 let key_decoder = self
1102 .read_schemas
1103 .key
1104 .decoder_any(part.key.as_ref())
1105 .expect("ok");
1106 let val_decoder = self
1107 .read_schemas
1108 .val
1109 .decoder_any(part.val.as_ref())
1110 .expect("ok");
1111 let iter = (0..part.len()).map(move |i| {
1112 let mut k = K::default();
1113 let mut v = V::default();
1114 key_decoder.decode(i, &mut k);
1115 val_decoder.decode(i, &mut v);
1116 let t = T::decode(part.time.value(i).to_le_bytes());
1117 let d = D::decode(part.diff.value(i).to_le_bytes());
1118 ((k, v), t, d)
1119 });
1120
1121 Some(iter)
1122 }
1123}
1124
1125impl<K, V, T, D> ReadHandle<K, V, T, D>
1126where
1127 K: Debug + Codec + Ord,
1128 V: Debug + Codec + Ord,
1129 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
1130 D: Monoid + Ord + Codec64 + Send + Sync,
1131{
1132 pub async fn snapshot_and_fetch(
1146 &mut self,
1147 as_of: Antichain<T>,
1148 ) -> Result<Vec<((K, V), T, D)>, Since<T>> {
1149 let mut cursor = self.snapshot_cursor(as_of, |_| true).await?;
1150 let mut contents = Vec::new();
1151 while let Some(iter) = cursor.next().await {
1152 contents.extend(iter);
1153 }
1154
1155 let old_len = contents.len();
1158 consolidate_updates(&mut contents);
1159 if old_len != contents.len() {
1160 self.machine
1162 .applier
1163 .shard_metrics
1164 .unconsolidated_snapshot
1165 .inc();
1166 }
1167
1168 Ok(contents)
1169 }
1170
1171 pub async fn snapshot_cursor(
1178 &mut self,
1179 as_of: Antichain<T>,
1180 should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
1181 ) -> Result<Cursor<K, V, T, D>, Since<T>> {
1182 let (lease, batches) = self.snapshot_batches(as_of.clone()).await?;
1183
1184 Self::read_batches_consolidated(
1185 &self.cfg,
1186 Arc::clone(&self.metrics),
1187 Arc::clone(&self.machine.applier.shard_metrics),
1188 self.metrics.read.snapshot.clone(),
1189 Arc::clone(&self.blob),
1190 self.shard_id(),
1191 as_of,
1192 self.read_schemas.clone(),
1193 &batches,
1194 lease,
1195 should_fetch_part,
1196 COMPACTION_MEMORY_BOUND_BYTES.get(&self.cfg),
1197 )
1198 }
1199
1200 pub(crate) fn read_batches_consolidated<L>(
1201 persist_cfg: &PersistConfig,
1202 metrics: Arc<Metrics>,
1203 shard_metrics: Arc<ShardMetrics>,
1204 read_metrics: ReadMetrics,
1205 blob: Arc<dyn Blob>,
1206 shard_id: ShardId,
1207 as_of: Antichain<T>,
1208 schemas: Schemas<K, V>,
1209 batches: &[HollowBatch<T>],
1210 lease: L,
1211 should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
1212 memory_budget_bytes: usize,
1213 ) -> Result<Cursor<K, V, T, D, L>, Since<T>> {
1214 let context = format!("{}[as_of={:?}]", shard_id, as_of.elements());
1215 let filter = FetchBatchFilter::Snapshot {
1216 as_of: as_of.clone(),
1217 };
1218
1219 let mut consolidator = Consolidator::new(
1220 context,
1221 FetchConfig::from_persist_config(persist_cfg),
1222 shard_id,
1223 StructuredSort::new(schemas.clone()),
1224 blob,
1225 metrics,
1226 shard_metrics,
1227 read_metrics,
1228 filter,
1229 None,
1230 memory_budget_bytes,
1231 );
1232 for batch in batches {
1233 for (meta, run) in batch.runs() {
1234 consolidator.enqueue_run(
1235 &batch.desc,
1236 meta,
1237 run.into_iter()
1238 .filter(|p| should_fetch_part(p.stats()))
1239 .cloned(),
1240 );
1241 }
1242 }
1243 let max_len = persist_cfg.compaction_yield_after_n_updates;
1247 let max_bytes = BLOB_TARGET_SIZE.get(persist_cfg).max(1);
1248
1249 Ok(Cursor {
1250 consolidator,
1251 max_len,
1252 max_bytes,
1253 _lease: lease,
1254 read_schemas: schemas,
1255 })
1256 }
1257
1258 pub fn snapshot_stats(
1270 &self,
1271 as_of: Option<Antichain<T>>,
1272 ) -> impl Future<Output = Result<SnapshotStats, Since<T>>> + Send + 'static {
1273 let machine = self.machine.clone();
1274 async move {
1275 let batches = match as_of {
1276 Some(as_of) => machine.unleased_snapshot(&as_of).await?,
1277 None => machine.applier.all_batches(),
1278 };
1279 let num_updates = batches.iter().map(|b| b.len).sum();
1280 Ok(SnapshotStats {
1281 shard_id: machine.shard_id(),
1282 num_updates,
1283 })
1284 }
1285 }
1286
1287 pub async fn snapshot_parts_stats(
1298 &self,
1299 as_of: Antichain<T>,
1300 ) -> Result<SnapshotPartsStats, Since<T>> {
1301 let batches = self.machine.unleased_snapshot(&as_of).await?;
1302 let parts = stream::iter(&batches)
1303 .flat_map(|b| b.part_stream(self.shard_id(), &*self.blob, &*self.metrics))
1304 .map(|p| {
1305 let p = p.expect("live batch");
1306 SnapshotPartStats {
1307 encoded_size_bytes: p.encoded_size_bytes(),
1308 stats: p.stats().cloned(),
1309 }
1310 })
1311 .collect()
1312 .await;
1313 Ok(SnapshotPartsStats {
1314 metrics: Arc::clone(&self.machine.applier.metrics),
1315 shard_id: self.machine.shard_id(),
1316 parts,
1317 })
1318 }
1319}
1320
1321impl<K, V, T, D> ReadHandle<K, V, T, D>
1322where
1323 K: Debug + Codec + Ord,
1324 V: Debug + Codec + Ord,
1325 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
1326 D: Monoid + Codec64 + Send + Sync,
1327{
1328 pub async fn snapshot_and_stream(
1333 &mut self,
1334 as_of: Antichain<T>,
1335 ) -> Result<impl Stream<Item = ((K, V), T, D)> + use<K, V, T, D>, Since<T>> {
1336 let snap = self.snapshot(as_of).await?;
1337
1338 let blob = Arc::clone(&self.blob);
1339 let metrics = Arc::clone(&self.metrics);
1340 let snapshot_metrics = self.metrics.read.snapshot.clone();
1341 let shard_metrics = Arc::clone(&self.machine.applier.shard_metrics);
1342 let reader_id = self.reader_id.clone();
1343 let schemas = self.read_schemas.clone();
1344 let mut schema_cache = self.schema_cache.clone();
1345 let persist_cfg = self.cfg.clone();
1346 let stream = async_stream::stream! {
1347 for part in snap {
1348 let mut fetched_part = fetch_leased_part(
1349 &persist_cfg,
1350 &part,
1351 blob.as_ref(),
1352 Arc::clone(&metrics),
1353 &snapshot_metrics,
1354 &shard_metrics,
1355 &reader_id,
1356 schemas.clone(),
1357 &mut schema_cache,
1358 )
1359 .await;
1360
1361 while let Some(next) = fetched_part.next() {
1362 yield next;
1363 }
1364 }
1365 };
1366
1367 Ok(stream)
1368 }
1369}
1370
1371impl<K, V, T, D> ReadHandle<K, V, T, D>
1372where
1373 K: Debug + Codec + Ord,
1374 V: Debug + Codec + Ord,
1375 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
1376 D: Monoid + Ord + Codec64 + Send + Sync,
1377{
1378 #[cfg(test)]
1381 #[track_caller]
1382 pub async fn expect_snapshot_and_fetch(&mut self, as_of: T) -> Vec<((K, V), T, D)> {
1383 let mut ret = self
1384 .snapshot_and_fetch(Antichain::from_elem(as_of))
1385 .await
1386 .expect("cannot serve requested as_of");
1387
1388 ret.sort();
1389 ret
1390 }
1391}
1392
1393impl<K: Codec, V: Codec, T, D> Drop for ReadHandle<K, V, T, D> {
1394 fn drop(&mut self) {
1395 self.hold_state.modify(|s| {
1396 s.expired = true;
1397 s.request_sync = true;
1398 });
1399 }
1400}
1401
1402#[cfg(test)]
1403mod tests {
1404 use std::pin;
1405 use std::str::FromStr;
1406
1407 use mz_dyncfg::ConfigUpdates;
1408 use mz_ore::cast::CastFrom;
1409 use mz_ore::metrics::MetricsRegistry;
1410 use mz_persist::mem::{MemBlob, MemBlobConfig, MemConsensus};
1411 use mz_persist::unreliable::{UnreliableConsensus, UnreliableHandle};
1412 use serde::{Deserialize, Serialize};
1413 use serde_json::json;
1414 use tokio_stream::StreamExt;
1415
1416 use crate::async_runtime::IsolatedRuntime;
1417 use crate::batch::BLOB_TARGET_SIZE;
1418 use crate::cache::StateCache;
1419 use crate::internal::metrics::Metrics;
1420 use crate::rpc::NoopPubSubSender;
1421 use crate::tests::{all_ok, new_test_client};
1422 use crate::{Diagnostics, PersistClient, PersistConfig, ShardId};
1423
1424 use super::*;
1425
1426 #[mz_persist_proc::test(tokio::test)]
1428 #[cfg_attr(miri, ignore)] async fn drop_unused_subscribe(dyncfgs: ConfigUpdates) {
1430 let data = [
1431 (("0".to_owned(), "zero".to_owned()), 0, 1),
1432 (("1".to_owned(), "one".to_owned()), 1, 1),
1433 (("2".to_owned(), "two".to_owned()), 2, 1),
1434 ];
1435
1436 let (mut write, read) = new_test_client(&dyncfgs)
1437 .await
1438 .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1439 .await;
1440
1441 write.expect_compare_and_append(&data[0..1], 0, 1).await;
1442 write.expect_compare_and_append(&data[1..2], 1, 2).await;
1443 write.expect_compare_and_append(&data[2..3], 2, 3).await;
1444
1445 let subscribe = read
1446 .subscribe(timely::progress::Antichain::from_elem(2))
1447 .await
1448 .unwrap();
1449 assert!(
1450 !subscribe.snapshot.as_ref().unwrap().is_empty(),
1451 "snapshot must have batches for test to be meaningful"
1452 );
1453 drop(subscribe);
1454 }
1455
1456 #[mz_persist_proc::test(tokio::test)]
1458 #[cfg_attr(miri, ignore)] async fn streaming_consolidate(dyncfgs: ConfigUpdates) {
1460 let data = &[
1461 (("k".to_owned(), "v".to_owned()), 0, 1),
1463 (("k".to_owned(), "v".to_owned()), 1, 1),
1464 (("k".to_owned(), "v".to_owned()), 2, 1),
1465 (("k2".to_owned(), "v".to_owned()), 0, 1),
1467 (("k2".to_owned(), "v".to_owned()), 1, -1),
1468 ];
1469
1470 let (mut write, read) = {
1471 let client = new_test_client(&dyncfgs).await;
1472 client.cfg.set_config(&BLOB_TARGET_SIZE, 1000); client
1474 .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1475 .await
1476 };
1477
1478 write.expect_compare_and_append(data, 0, 5).await;
1479
1480 let mut snapshot = read
1481 .subscribe(timely::progress::Antichain::from_elem(4))
1482 .await
1483 .unwrap();
1484
1485 let mut updates = vec![];
1486 'outer: loop {
1487 for event in snapshot.fetch_next().await {
1488 match event {
1489 ListenEvent::Progress(t) => {
1490 if !t.less_than(&4) {
1491 break 'outer;
1492 }
1493 }
1494 ListenEvent::Updates(data) => {
1495 updates.extend(data);
1496 }
1497 }
1498 }
1499 }
1500 assert_eq!(updates, &[(("k".to_owned(), "v".to_owned()), 4u64, 3i64)],)
1501 }
1502
1503 #[mz_persist_proc::test(tokio::test)]
1504 #[cfg_attr(miri, ignore)] async fn snapshot_and_stream(dyncfgs: ConfigUpdates) {
1506 let data = &mut [
1507 (("k1".to_owned(), "v1".to_owned()), 0, 1),
1508 (("k2".to_owned(), "v2".to_owned()), 1, 1),
1509 (("k3".to_owned(), "v3".to_owned()), 2, 1),
1510 (("k4".to_owned(), "v4".to_owned()), 2, 1),
1511 (("k5".to_owned(), "v5".to_owned()), 3, 1),
1512 ];
1513
1514 let (mut write, mut read) = {
1515 let client = new_test_client(&dyncfgs).await;
1516 client.cfg.set_config(&BLOB_TARGET_SIZE, 0); client
1518 .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1519 .await
1520 };
1521
1522 write.expect_compare_and_append(&data[0..2], 0, 2).await;
1523 write.expect_compare_and_append(&data[2..4], 2, 3).await;
1524 write.expect_compare_and_append(&data[4..], 3, 4).await;
1525
1526 let as_of = Antichain::from_elem(3);
1527 let mut snapshot = pin::pin!(read.snapshot_and_stream(as_of.clone()).await.unwrap());
1528
1529 let mut snapshot_rows = vec![];
1530 while let Some(((k, v), t, d)) = snapshot.next().await {
1531 snapshot_rows.push(((k, v), t, d));
1532 }
1533
1534 for ((_k, _v), t, _d) in data.as_mut_slice() {
1535 t.advance_by(as_of.borrow());
1536 }
1537
1538 assert_eq!(data.as_slice(), snapshot_rows.as_slice());
1539 }
1540
1541 #[mz_persist_proc::test(tokio::test)]
1543 #[cfg_attr(miri, ignore)] async fn seqno_leases(dyncfgs: ConfigUpdates) {
1545 let mut data = vec![];
1546 for i in 0..20 {
1547 data.push(((i.to_string(), i.to_string()), i, 1))
1548 }
1549
1550 let shard_id = ShardId::new();
1551
1552 let client = new_test_client(&dyncfgs).await;
1553 let (mut write, read) = client
1554 .expect_open::<String, String, u64, i64>(shard_id)
1555 .await;
1556
1557 let mut offset = 0;
1559 let mut width = 2;
1560
1561 for i in offset..offset + width {
1562 write
1563 .expect_compare_and_append(
1564 &data[i..i + 1],
1565 u64::cast_from(i),
1566 u64::cast_from(i) + 1,
1567 )
1568 .await;
1569 }
1570 offset += width;
1571
1572 let mut fetcher = client
1574 .create_batch_fetcher::<String, String, u64, i64>(
1575 shard_id,
1576 Default::default(),
1577 Default::default(),
1578 false,
1579 Diagnostics::for_tests(),
1580 )
1581 .await
1582 .unwrap();
1583
1584 let mut subscribe = read
1585 .subscribe(timely::progress::Antichain::from_elem(1))
1586 .await
1587 .expect("cannot serve requested as_of");
1588
1589 let original_seqno_since = subscribe.listen.handle.outstanding_seqno();
1591 if let Some(snapshot) = &subscribe.snapshot {
1592 for part in snapshot {
1593 assert!(
1594 part.lease.seqno() >= original_seqno_since,
1595 "our seqno hold must cover all parts"
1596 );
1597 }
1598 }
1599
1600 let mut parts = vec![];
1601
1602 width = 4;
1603 for i in offset..offset + width {
1605 for event in subscribe.next(None).await {
1606 if let ListenEvent::Updates(mut new_parts) = event {
1607 parts.append(&mut new_parts);
1608 subscribe
1611 .listen
1612 .handle
1613 .downgrade_since(&subscribe.listen.since)
1614 .await;
1615 }
1616 }
1617
1618 write
1619 .expect_compare_and_append(
1620 &data[i..i + 1],
1621 u64::cast_from(i),
1622 u64::cast_from(i) + 1,
1623 )
1624 .await;
1625
1626 assert_eq!(
1628 subscribe.listen.handle.machine.applier.seqno_since(),
1629 original_seqno_since
1630 );
1631 }
1632
1633 offset += width;
1634
1635 let mut seqno_since = subscribe.listen.handle.machine.applier.seqno_since();
1636
1637 assert_eq!(seqno_since, original_seqno_since);
1639
1640 let mut subsequent_parts = vec![];
1643
1644 let mut this_seqno = SeqNo::minimum();
1648
1649 for (mut i, part) in parts.into_iter().enumerate() {
1651 let part_seqno = part.lease.seqno();
1652 let last_seqno = this_seqno;
1653 this_seqno = part_seqno;
1654 assert!(this_seqno >= last_seqno);
1655
1656 let (part, lease) = part.into_exchangeable_part();
1657 let _ = fetcher.fetch_leased_part(part).await;
1658 drop(lease);
1659
1660 for event in subscribe.next(None).await {
1662 if let ListenEvent::Updates(parts) = event {
1663 for part in parts {
1664 let (_, lease) = part.into_exchangeable_part();
1665 subsequent_parts.push(lease);
1666 }
1667 }
1668 }
1669
1670 subscribe
1671 .listen
1672 .handle
1673 .downgrade_since(&subscribe.listen.since)
1674 .await;
1675
1676 i += offset;
1678 write
1679 .expect_compare_and_append(
1680 &data[i..i + 1],
1681 u64::cast_from(i),
1682 u64::cast_from(i) + 1,
1683 )
1684 .await;
1685
1686 let expect_downgrade = subscribe.listen.handle.outstanding_seqno() > part_seqno;
1689
1690 let new_seqno_since = subscribe.listen.handle.machine.applier.seqno_since();
1691 if expect_downgrade {
1692 assert!(new_seqno_since > seqno_since);
1693 } else {
1694 assert_eq!(new_seqno_since, seqno_since);
1695 }
1696 seqno_since = new_seqno_since;
1697 }
1698
1699 assert!(seqno_since > original_seqno_since);
1701
1702 drop(subsequent_parts);
1704 drop(subscribe);
1705 }
1706
1707 #[mz_ore::test]
1708 fn reader_id_human_readable_serde() {
1709 #[derive(Debug, Serialize, Deserialize)]
1710 struct Container {
1711 reader_id: LeasedReaderId,
1712 }
1713
1714 let id =
1716 LeasedReaderId::from_str("r00000000-1234-5678-0000-000000000000").expect("valid id");
1717 assert_eq!(
1718 id,
1719 serde_json::from_value(serde_json::to_value(id.clone()).expect("serializable"))
1720 .expect("deserializable")
1721 );
1722
1723 assert_eq!(
1725 id,
1726 serde_json::from_str("\"r00000000-1234-5678-0000-000000000000\"")
1727 .expect("deserializable")
1728 );
1729
1730 let json = json!({ "reader_id": id });
1732 assert_eq!(
1733 "{\"reader_id\":\"r00000000-1234-5678-0000-000000000000\"}",
1734 &json.to_string()
1735 );
1736 let container: Container = serde_json::from_value(json).expect("deserializable");
1737 assert_eq!(container.reader_id, id);
1738 }
1739
1740 #[mz_ore::test(tokio::test)]
1744 #[cfg_attr(miri, ignore)] async fn skip_consensus_fetch_optimization() {
1746 let data = vec![
1747 (("0".to_owned(), "zero".to_owned()), 0, 1),
1748 (("1".to_owned(), "one".to_owned()), 1, 1),
1749 (("2".to_owned(), "two".to_owned()), 2, 1),
1750 ];
1751
1752 let cfg = PersistConfig::new_for_tests();
1753 let blob = Arc::new(MemBlob::open(MemBlobConfig::default()));
1754 let consensus = Arc::new(MemConsensus::default());
1755 let unreliable = UnreliableHandle::default();
1756 unreliable.totally_available();
1757 let consensus = Arc::new(UnreliableConsensus::new(consensus, unreliable.clone()));
1758 let metrics = Arc::new(Metrics::new(&cfg, &MetricsRegistry::new()));
1759 let pubsub_sender = Arc::new(NoopPubSubSender);
1760 let (mut write, mut read) = PersistClient::new(
1761 cfg,
1762 blob,
1763 consensus,
1764 metrics,
1765 Arc::new(IsolatedRuntime::new_for_tests()),
1766 Arc::new(StateCache::new_no_metrics()),
1767 pubsub_sender,
1768 )
1769 .expect("client construction failed")
1770 .expect_open::<String, String, u64, i64>(ShardId::new())
1771 .await;
1772
1773 write.expect_compare_and_append(&data[0..1], 0, 1).await;
1774 write.expect_compare_and_append(&data[1..2], 1, 2).await;
1775 write.expect_compare_and_append(&data[2..3], 2, 3).await;
1776
1777 let snapshot = read.expect_snapshot_and_fetch(2).await;
1778 let mut listen = read.expect_listen(0).await;
1779
1780 let listen_actual = listen.fetch_next().await;
1785 let expected_events = vec![ListenEvent::Progress(Antichain::from_elem(1))];
1786 assert_eq!(listen_actual, expected_events);
1787
1788 unreliable.totally_unavailable();
1791 assert_eq!(snapshot, all_ok(&data, 2));
1792 assert_eq!(
1793 listen.read_until(&3).await,
1794 (all_ok(&data[1..], 1), Antichain::from_elem(3))
1795 );
1796 }
1797}