1use async_stream::stream;
13use std::collections::BTreeMap;
14use std::fmt::Debug;
15use std::future::Future;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use differential_dataflow::Hashable;
20use differential_dataflow::consolidation::consolidate_updates;
21use differential_dataflow::difference::Monoid;
22use differential_dataflow::lattice::Lattice;
23use futures::Stream;
24use futures_util::{StreamExt, stream};
25use mz_dyncfg::Config;
26use mz_ore::cast::CastLossy;
27use mz_ore::halt;
28use mz_ore::instrument;
29use mz_ore::task::JoinHandle;
30use mz_persist::location::{Blob, SeqNo};
31use mz_persist_types::columnar::{ColumnDecoder, Schema};
32use mz_persist_types::{Codec, Codec64};
33use proptest_derive::Arbitrary;
34use serde::{Deserialize, Serialize};
35use timely::PartialOrder;
36use timely::order::TotalOrder;
37use timely::progress::{Antichain, Timestamp};
38use tracing::warn;
39use uuid::Uuid;
40
41use crate::batch::BLOB_TARGET_SIZE;
42use crate::cfg::{COMPACTION_MEMORY_BOUND_BYTES, RetryParameters};
43use crate::fetch::FetchConfig;
44use crate::fetch::{FetchBatchFilter, FetchedPart, Lease, LeasedBatchPart, fetch_leased_part};
45use crate::internal::encoding::Schemas;
46use crate::internal::machine::{Machine, next_listen_batch_retry_params};
47use crate::internal::metrics::{Metrics, ReadMetrics, ShardMetrics};
48use crate::internal::state::{HollowBatch, LeasedReaderState, SnapshotErr};
49use crate::internal::watch::{AwaitableState, StateWatch};
50use crate::iter::{Consolidator, StructuredSort};
51use crate::schema::SchemaCache;
52use crate::stats::{SnapshotPartStats, SnapshotPartsStats, SnapshotStats};
53use crate::{GarbageCollector, PersistConfig, ShardId, parse_id};
54
55pub use crate::internal::encoding::LazyPartStats;
56pub use crate::internal::state::Since;
57
58#[derive(
60 Arbitrary,
61 Clone,
62 PartialEq,
63 Eq,
64 PartialOrd,
65 Ord,
66 Hash,
67 Serialize,
68 Deserialize
69)]
70#[serde(try_from = "String", into = "String")]
71pub struct LeasedReaderId(pub(crate) [u8; 16]);
72
73impl std::fmt::Display for LeasedReaderId {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 write!(f, "r{}", Uuid::from_bytes(self.0))
76 }
77}
78
79impl std::fmt::Debug for LeasedReaderId {
80 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81 write!(f, "LeasedReaderId({})", Uuid::from_bytes(self.0))
82 }
83}
84
85impl std::str::FromStr for LeasedReaderId {
86 type Err = String;
87
88 fn from_str(s: &str) -> Result<Self, Self::Err> {
89 parse_id("r", "LeasedReaderId", s).map(LeasedReaderId)
90 }
91}
92
93impl From<LeasedReaderId> for String {
94 fn from(reader_id: LeasedReaderId) -> Self {
95 reader_id.to_string()
96 }
97}
98
99impl TryFrom<String> for LeasedReaderId {
100 type Error = String;
101
102 fn try_from(s: String) -> Result<Self, Self::Error> {
103 s.parse()
104 }
105}
106
107impl LeasedReaderId {
108 pub(crate) fn new() -> Self {
109 LeasedReaderId(*Uuid::new_v4().as_bytes())
110 }
111}
112
113#[derive(Debug)]
118pub struct Subscribe<K: Codec, V: Codec, T, D> {
119 snapshot: Option<Vec<LeasedBatchPart<T>>>,
120 listen: Listen<K, V, T, D>,
121}
122
123impl<K, V, T, D> Subscribe<K, V, T, D>
124where
125 K: Debug + Codec,
126 V: Debug + Codec,
127 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
128 D: Monoid + Codec64 + Send + Sync,
129{
130 fn new(snapshot_parts: Vec<LeasedBatchPart<T>>, listen: Listen<K, V, T, D>) -> Self {
131 Subscribe {
132 snapshot: Some(snapshot_parts),
133 listen,
134 }
135 }
136
137 #[instrument(level = "debug", fields(shard = %self.listen.handle.machine.shard_id()))]
145 pub async fn next(
146 &mut self,
147 listen_retry: Option<RetryParameters>,
149 ) -> Vec<ListenEvent<T, LeasedBatchPart<T>>> {
150 match self.snapshot.take() {
151 Some(parts) => vec![ListenEvent::Updates(parts)],
152 None => {
153 let (parts, upper) = self.listen.next(listen_retry).await;
154 vec![ListenEvent::Updates(parts), ListenEvent::Progress(upper)]
155 }
156 }
157 }
158}
159
160impl<K, V, T, D> Subscribe<K, V, T, D>
161where
162 K: Debug + Codec,
163 V: Debug + Codec,
164 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
165 D: Monoid + Codec64 + Send + Sync,
166{
167 #[instrument(level = "debug", fields(shard = %self.listen.handle.machine.shard_id()))]
170 pub async fn fetch_next(&mut self) -> Vec<ListenEvent<T, ((K, V), T, D)>> {
171 let events = self.next(None).await;
172 let new_len = events
173 .iter()
174 .map(|event| match event {
175 ListenEvent::Updates(parts) => parts.len(),
176 ListenEvent::Progress(_) => 1,
177 })
178 .sum();
179 let mut ret = Vec::with_capacity(new_len);
180 for event in events {
181 match event {
182 ListenEvent::Updates(parts) => {
183 for part in parts {
184 let fetched_part = self.listen.fetch_batch_part(part).await;
185 let updates = fetched_part.collect::<Vec<_>>();
186 if !updates.is_empty() {
187 ret.push(ListenEvent::Updates(updates));
188 }
189 }
190 }
191 ListenEvent::Progress(progress) => ret.push(ListenEvent::Progress(progress)),
192 }
193 }
194 ret
195 }
196
197 pub async fn fetch_batch_part(&mut self, part: LeasedBatchPart<T>) -> FetchedPart<K, V, T, D> {
199 self.listen.fetch_batch_part(part).await
200 }
201}
202
203impl<K, V, T, D> Subscribe<K, V, T, D>
204where
205 K: Debug + Codec,
206 V: Debug + Codec,
207 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
208 D: Monoid + Codec64 + Send + Sync,
209{
210 pub async fn expire(mut self) {
217 let _ = self.snapshot.take(); self.listen.expire().await;
219 }
220}
221
222#[derive(Debug, PartialEq)]
226pub enum ListenEvent<T, D> {
227 Progress(Antichain<T>),
229 Updates(Vec<D>),
231}
232
233#[derive(Debug)]
235pub struct Listen<K: Codec, V: Codec, T, D> {
236 handle: ReadHandle<K, V, T, D>,
237 as_of: Antichain<T>,
238 since: Antichain<T>,
239 frontier: Antichain<T>,
240}
241
242impl<K, V, T, D> Listen<K, V, T, D>
243where
244 K: Debug + Codec,
245 V: Debug + Codec,
246 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
247 D: Monoid + Codec64 + Send + Sync,
248{
249 async fn new(
250 mut handle: ReadHandle<K, V, T, D>,
251 as_of: Antichain<T>,
252 ) -> Result<Self, Since<T>> {
253 let () = handle.machine.verify_listen(&as_of)?;
254
255 let since = as_of.clone();
256 if !PartialOrder::less_equal(handle.since(), &since) {
257 return Err(Since(handle.since().clone()));
260 }
261 handle.downgrade_since(&since).await;
265 Ok(Listen {
266 handle,
267 since,
268 frontier: as_of.clone(),
269 as_of,
270 })
271 }
272
273 pub fn frontier(&self) -> &Antichain<T> {
275 &self.frontier
276 }
277
278 pub async fn next(
286 &mut self,
287 retry: Option<RetryParameters>,
289 ) -> (Vec<LeasedBatchPart<T>>, Antichain<T>) {
290 let retry = retry
292 .unwrap_or_else(|| next_listen_batch_retry_params(&self.handle.machine.applier.cfg));
293 self.handle
294 .machine
295 .wait_for_upper_past(
296 &self.frontier,
297 &mut self.handle.watch,
298 Some(&self.handle.reader_id),
299 &self.handle.metrics.retries.next_listen_batch,
300 retry,
301 )
302 .await;
303
304 let lease = self.handle.lease_seqno().await;
306 let batch = match self
307 .handle
308 .machine
309 .applier
310 .next_listen_batch(&self.frontier)
311 {
312 Ok(batch) => batch,
313 Err(seqno) => {
314 panic!(
315 "waited for upper past {frontier:?}, but no listen batch was available at {seqno:?}!",
316 frontier = self.frontier.elements()
317 );
318 }
319 };
320
321 let acceptable_desc = PartialOrder::less_than(batch.desc.since(), &self.frontier)
331 || (self.frontier == self.as_of
336 && PartialOrder::less_equal(batch.desc.since(), &self.frontier));
337 if !acceptable_desc {
338 let lease_state = self
339 .handle
340 .machine
341 .applier
342 .reader_lease(self.handle.reader_id.clone());
343 if let Some(lease) = lease_state {
344 panic!(
345 "Listen on {} received a batch {:?} advanced past the listen frontier {:?}, but the lease has not expired: {:?}",
346 self.handle.machine.shard_id(),
347 batch.desc,
348 self.frontier,
349 lease
350 )
351 } else {
352 halt!(
355 "Listen on {} received a batch {:?} advanced past the listen frontier {:?} after the reader has expired. \
356 This can happen in exceptional cases: a machine goes to sleep or is running out of memory or CPU, for example.",
357 self.handle.machine.shard_id(),
358 batch.desc,
359 self.frontier
360 )
361 }
362 }
363
364 let new_frontier = batch.desc.upper().clone();
365
366 for x in self.frontier.elements().iter() {
389 let less_than_upper = batch.desc.upper().elements().iter().any(|u| x.less_than(u));
390 if less_than_upper {
391 self.since.join_assign(&Antichain::from_elem(x.clone()));
392 }
393 }
394
395 let filter = FetchBatchFilter::Listen {
400 as_of: self.as_of.clone(),
401 lower: self.frontier.clone(),
402 };
403 let parts = self
404 .handle
405 .lease_batch_parts(lease, batch, filter)
406 .collect()
407 .await;
408
409 self.handle.maybe_downgrade_since(&self.since).await;
410
411 self.frontier = new_frontier;
414
415 (parts, self.frontier.clone())
416 }
417}
418
419impl<K, V, T, D> Listen<K, V, T, D>
420where
421 K: Debug + Codec,
422 V: Debug + Codec,
423 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
424 D: Monoid + Codec64 + Send + Sync,
425{
426 #[instrument(level = "debug", name = "listen::next", fields(shard = %self.handle.machine.shard_id()))]
437 pub async fn fetch_next(&mut self) -> Vec<ListenEvent<T, ((K, V), T, D)>> {
438 let (parts, progress) = self.next(None).await;
439 let mut ret = Vec::with_capacity(parts.len() + 1);
440 for part in parts {
441 let fetched_part = self.fetch_batch_part(part).await;
442 let updates = fetched_part.collect::<Vec<_>>();
443 if !updates.is_empty() {
444 ret.push(ListenEvent::Updates(updates));
445 }
446 }
447 ret.push(ListenEvent::Progress(progress));
448 ret
449 }
450
451 pub fn into_stream(mut self) -> impl Stream<Item = ListenEvent<T, ((K, V), T, D)>> {
453 async_stream::stream!({
454 loop {
455 for msg in self.fetch_next().await {
456 yield msg;
457 }
458 }
459 })
460 }
461
462 #[cfg(test)]
466 #[track_caller]
467 pub async fn read_until(&mut self, ts: &T) -> (Vec<((K, V), T, D)>, Antichain<T>) {
468 let mut updates = Vec::new();
469 let mut frontier = Antichain::from_elem(T::minimum());
470 while self.frontier.less_than(ts) {
471 for event in self.fetch_next().await {
472 match event {
473 ListenEvent::Updates(mut x) => updates.append(&mut x),
474 ListenEvent::Progress(x) => frontier = x,
475 }
476 }
477 }
478 (updates, frontier)
481 }
482}
483
484impl<K, V, T, D> Listen<K, V, T, D>
485where
486 K: Debug + Codec,
487 V: Debug + Codec,
488 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
489 D: Monoid + Codec64 + Send + Sync,
490{
491 async fn fetch_batch_part(&mut self, part: LeasedBatchPart<T>) -> FetchedPart<K, V, T, D> {
496 let fetched_part = fetch_leased_part(
497 &self.handle.cfg,
498 &part,
499 self.handle.blob.as_ref(),
500 Arc::clone(&self.handle.metrics),
501 &self.handle.metrics.read.listen,
502 &self.handle.machine.applier.shard_metrics,
503 &self.handle.reader_id,
504 self.handle.read_schemas.clone(),
505 &mut self.handle.schema_cache,
506 )
507 .await;
508 fetched_part
509 }
510
511 pub async fn expire(self) {
518 self.handle.expire().await
519 }
520}
521
522#[derive(Debug)]
525pub(crate) struct ReadHolds<T> {
526 held_since: Antichain<T>,
528 applied_since: Antichain<T>,
531 recent_seqno: SeqNo,
533 leases: BTreeMap<SeqNo, Lease>,
536 expired: bool,
538 request_sync: bool,
541}
542
543impl<T> ReadHolds<T>
544where
545 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
546{
547 pub fn downgrade_since(&mut self, since: &Antichain<T>) {
548 self.held_since.join_assign(since);
549 }
550
551 pub fn observe_seqno(&mut self, seqno: SeqNo) {
552 self.recent_seqno = seqno.max(self.recent_seqno);
553 }
554
555 pub fn lease_seqno(&mut self) -> Lease {
556 let seqno = self.recent_seqno;
557 let lease = self
558 .leases
559 .entry(seqno)
560 .or_insert_with(|| Lease::new(seqno));
561 lease.clone()
562 }
563
564 pub fn outstanding_seqno(&mut self) -> SeqNo {
565 while let Some(first) = self.leases.first_entry() {
566 if first.get().count() <= 1 {
567 first.remove();
568 } else {
569 return *first.key();
570 }
571 }
572 self.recent_seqno
573 }
574}
575
576#[derive(Debug)]
597pub struct ReadHandle<K: Codec, V: Codec, T, D> {
598 pub(crate) cfg: PersistConfig,
599 pub(crate) metrics: Arc<Metrics>,
600 pub(crate) machine: Machine<K, V, T, D>,
601 pub(crate) gc: GarbageCollector<K, V, T, D>,
602 pub(crate) blob: Arc<dyn Blob>,
603 watch: StateWatch<K, V, T, D>,
604
605 pub(crate) reader_id: LeasedReaderId,
606 pub(crate) read_schemas: Schemas<K, V>,
607 pub(crate) schema_cache: SchemaCache<K, V, T, D>,
608
609 since: Antichain<T>,
610 pub(crate) hold_state: AwaitableState<ReadHolds<T>>,
611 pub(crate) unexpired_state: Option<UnexpiredReadHandleState>,
612}
613
614pub(crate) const READER_LEASE_DURATION: Config<Duration> = Config::new(
617 "persist_reader_lease_duration",
618 Duration::from_secs(60 * 15),
619 "The time after which we'll clean up stale read leases",
620);
621
622impl<K, V, T, D> ReadHandle<K, V, T, D>
623where
624 K: Debug + Codec,
625 V: Debug + Codec,
626 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
627 D: Monoid + Codec64 + Send + Sync,
628{
629 #[allow(clippy::unused_async)]
630 pub(crate) async fn new(
631 cfg: PersistConfig,
632 metrics: Arc<Metrics>,
633 machine: Machine<K, V, T, D>,
634 gc: GarbageCollector<K, V, T, D>,
635 blob: Arc<dyn Blob>,
636 reader_id: LeasedReaderId,
637 read_schemas: Schemas<K, V>,
638 state: LeasedReaderState<T>,
639 ) -> Self {
640 let schema_cache = machine.applier.schema_cache();
641 let hold_state = AwaitableState::new(ReadHolds {
642 held_since: state.since.clone(),
643 applied_since: state.since.clone(),
644 recent_seqno: state.seqno,
645 leases: Default::default(),
646 expired: false,
647 request_sync: false,
648 });
649 ReadHandle {
650 cfg,
651 metrics: Arc::clone(&metrics),
652 machine: machine.clone(),
653 gc: gc.clone(),
654 blob,
655 watch: machine.applier.watch(),
656 reader_id: reader_id.clone(),
657 read_schemas,
658 schema_cache,
659 since: state.since,
660 hold_state: hold_state.clone(),
661 unexpired_state: Some(UnexpiredReadHandleState {
662 heartbeat_task: Self::start_reader_heartbeat_task(
663 machine, reader_id, gc, hold_state,
664 ),
665 }),
666 }
667 }
668
669 fn start_reader_heartbeat_task(
670 machine: Machine<K, V, T, D>,
671 reader_id: LeasedReaderId,
672 gc: GarbageCollector<K, V, T, D>,
673 leased_seqnos: AwaitableState<ReadHolds<T>>,
674 ) -> JoinHandle<()> {
675 let metrics = Arc::clone(&machine.applier.metrics);
676 let name = format!(
677 "persist::heartbeat_read({},{})",
678 machine.shard_id(),
679 reader_id
680 );
681 mz_ore::task::spawn(|| name, {
682 metrics.tasks.heartbeat_read.instrument_task(async move {
683 Self::reader_heartbeat_task(machine, reader_id, gc, leased_seqnos).await
684 })
685 })
686 }
687
688 async fn reader_heartbeat_task(
689 machine: Machine<K, V, T, D>,
690 reader_id: LeasedReaderId,
691 gc: GarbageCollector<K, V, T, D>,
692 leased_seqnos: AwaitableState<ReadHolds<T>>,
693 ) {
694 let sleep_duration = READER_LEASE_DURATION.get(&machine.applier.cfg) / 4;
695 let jitter: f64 = f64::cast_lossy(reader_id.hashed()) / f64::cast_lossy(u64::MAX);
698 let mut interval = tokio::time::interval_at(
699 tokio::time::Instant::now() + sleep_duration.mul_f64(jitter),
700 sleep_duration,
701 );
702 let mut held_since = leased_seqnos.read(|s| s.held_since.clone());
703 loop {
704 let before_sleep = Instant::now();
705 let _woke_by_tick = tokio::select! {
706 _tick = interval.tick() => {
707 true
708 }
709 _whatever = leased_seqnos.wait_while(|s| !s.request_sync) => {
710 false
711 }
712 };
713
714 let elapsed_since_before_sleeping = before_sleep.elapsed();
715 if elapsed_since_before_sleeping > sleep_duration + Duration::from_secs(60) {
716 warn!(
717 "reader ({}) of shard ({}) went {}s between heartbeats",
718 reader_id,
719 machine.shard_id(),
720 elapsed_since_before_sleeping.as_secs_f64()
721 );
722 }
723
724 let before_heartbeat = Instant::now();
725 let heartbeat_ms = (machine.applier.cfg.now)();
726 let current_seqno = machine.seqno();
727 let result = leased_seqnos.modify(|s| {
728 if s.expired {
729 Err(())
730 } else {
731 s.observe_seqno(current_seqno);
732 s.request_sync = false;
733 held_since.join_assign(&s.held_since);
734 Ok(s.outstanding_seqno())
735 }
736 });
737 let actual_since = match result {
738 Ok(held_seqno) => {
739 let (seqno, actual_since, maintenance) = machine
740 .downgrade_since(&reader_id, held_seqno, &held_since, heartbeat_ms)
741 .await;
742 leased_seqnos.modify(|s| {
743 s.applied_since.clone_from(&actual_since.0);
744 s.observe_seqno(seqno)
745 });
746 maintenance.start_performing(&machine, &gc);
747 actual_since
748 }
749 Err(()) => {
750 let (seqno, maintenance) = machine.expire_leased_reader(&reader_id).await;
751 leased_seqnos.modify(|s| s.observe_seqno(seqno));
752 maintenance.start_performing(&machine, &gc);
753 break;
754 }
755 };
756
757 let elapsed_since_heartbeat = before_heartbeat.elapsed();
758 if elapsed_since_heartbeat > Duration::from_secs(60) {
759 warn!(
760 "reader ({}) of shard ({}) heartbeat call took {}s",
761 reader_id,
762 machine.shard_id(),
763 elapsed_since_heartbeat.as_secs_f64(),
764 );
765 }
766
767 if PartialOrder::less_than(&held_since, &actual_since.0) {
768 warn!(
776 "heartbeat task for reader ({}) of shard ({}) exiting due to expired lease \
777 while read handle is live",
778 reader_id,
779 machine.shard_id(),
780 );
781 return;
782 }
783 }
784 }
785
786 pub fn shard_id(&self) -> ShardId {
788 self.machine.shard_id()
789 }
790
791 pub fn since(&self) -> &Antichain<T> {
795 &self.since
796 }
797
798 #[cfg(test)]
799 fn outstanding_seqno(&self) -> SeqNo {
800 let current_seqno = self.machine.seqno();
801 self.hold_state.modify(|s| {
802 s.observe_seqno(current_seqno);
803 s.outstanding_seqno()
804 })
805 }
806
807 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
814 pub async fn downgrade_since(&mut self, new_since: &Antichain<T>) {
815 self.since = new_since.clone();
816 self.hold_state.modify(|s| {
817 s.downgrade_since(new_since);
818 s.request_sync = true;
819 });
820 self.hold_state
821 .wait_while(|s| PartialOrder::less_than(&s.applied_since, new_since))
822 .await;
823 }
824
825 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
842 pub async fn listen(self, as_of: Antichain<T>) -> Result<Listen<K, V, T, D>, Since<T>> {
843 Listen::new(self, as_of).await
844 }
845
846 async fn snapshot_batches(
847 &mut self,
848 as_of: Antichain<T>,
849 ) -> Result<(Lease, Vec<HollowBatch<T>>), Since<T>> {
850 self.machine
851 .wait_for_upper_past(
852 &as_of,
853 &mut self.watch,
854 Some(&self.reader_id),
855 &self.metrics.retries.snapshot,
856 RetryParameters::persist_defaults(),
857 )
858 .await;
859 let lease = self.lease_seqno().await;
860 let batches = match self.machine.applier.snapshot(&as_of) {
861 Ok(data) => data,
862 Err(SnapshotErr::AsOfHistoricalDistinctionsLost(since)) => return Err(since),
863 Err(SnapshotErr::AsOfNotYetAvailable(seqno, upper)) => {
864 panic!(
865 "waited for upper past {as_of:?}, but at latest seqno {seqno:?} the frontier was only {upper:?}",
866 as_of = as_of.elements(),
867 upper = upper.0.elements(),
868 )
869 }
870 };
871 Ok((lease, batches))
872 }
873
874 #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
888 pub async fn snapshot(
889 &mut self,
890 as_of: Antichain<T>,
891 ) -> Result<Vec<LeasedBatchPart<T>>, Since<T>> {
892 let (lease, batches) = self.snapshot_batches(as_of.clone()).await?;
893
894 if !PartialOrder::less_equal(self.since(), &as_of) {
895 return Err(Since(self.since().clone()));
896 }
897
898 let filter = FetchBatchFilter::Snapshot { as_of };
899 let mut leased_parts = Vec::new();
900 for batch in batches {
901 leased_parts.extend(
906 self.lease_batch_parts(lease.clone(), batch, filter.clone())
907 .collect::<Vec<_>>()
908 .await,
909 );
910 }
911 Ok(leased_parts)
912 }
913
914 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
920 pub async fn subscribe(
921 mut self,
922 as_of: Antichain<T>,
923 ) -> Result<Subscribe<K, V, T, D>, Since<T>> {
924 let snapshot_parts = self.snapshot(as_of.clone()).await?;
925 let listen = self.listen(as_of.clone()).await?;
926 Ok(Subscribe::new(snapshot_parts, listen))
927 }
928
929 fn lease_batch_parts(
930 &mut self,
931 lease: Lease,
932 batch: HollowBatch<T>,
933 filter: FetchBatchFilter<T>,
934 ) -> impl Stream<Item = LeasedBatchPart<T>> + '_ {
935 stream! {
936 let blob = Arc::clone(&self.blob);
937 let metrics = Arc::clone(&self.metrics);
938 let desc = batch.desc.clone();
939 for await part in batch.part_stream(self.shard_id(), &*blob, &*metrics) {
940 yield LeasedBatchPart {
941 metrics: Arc::clone(&self.metrics),
942 shard_id: self.machine.shard_id(),
943 filter: filter.clone(),
944 desc: desc.clone(),
945 part: part.expect("leased part").into_owned(),
946 lease: lease.clone(),
947 filter_pushdown_audit: false,
948 }
949 }
950 }
951 }
952
953 async fn lease_seqno(&mut self) -> Lease {
963 let current_seqno = self.machine.seqno();
964 let lease = self.hold_state.modify(|s| {
965 s.observe_seqno(current_seqno);
966 s.lease_seqno()
967 });
968 self.watch.wait_for_seqno_ge(lease.seqno()).await;
973 lease
974 }
975
976 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
979 pub async fn clone(&self, purpose: &str) -> Self {
980 let new_reader_id = LeasedReaderId::new();
981 let machine = self.machine.clone();
982 let gc = self.gc.clone();
983 let heartbeat_ts = (self.cfg.now)();
984 let (reader_state, maintenance) = machine
985 .register_leased_reader(
986 &new_reader_id,
987 purpose,
988 READER_LEASE_DURATION.get(&self.cfg),
989 heartbeat_ts,
990 false,
991 )
992 .await;
993 maintenance.start_performing(&machine, &gc);
994 assert!(PartialOrder::less_equal(&reader_state.since, &self.since));
998 let new_reader = ReadHandle::new(
999 self.cfg.clone(),
1000 Arc::clone(&self.metrics),
1001 machine,
1002 gc,
1003 Arc::clone(&self.blob),
1004 new_reader_id,
1005 self.read_schemas.clone(),
1006 reader_state,
1007 )
1008 .await;
1009 new_reader
1010 }
1011
1012 #[allow(clippy::unused_async)]
1017 pub async fn maybe_downgrade_since(&mut self, new_since: &Antichain<T>) {
1018 self.since = new_since.clone();
1019 self.hold_state.modify(|s| {
1020 s.downgrade_since(new_since);
1021 });
1022 }
1023
1024 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
1030 pub async fn expire(mut self) {
1031 self.hold_state.modify(|s| {
1032 s.expired = true;
1033 s.request_sync = true;
1034 });
1035 let Some(unexpired_state) = self.unexpired_state.take() else {
1036 return;
1037 };
1038 unexpired_state.heartbeat_task.await;
1039 }
1040
1041 #[cfg(test)]
1043 #[track_caller]
1044 pub async fn expect_listen(self, as_of: T) -> Listen<K, V, T, D> {
1045 self.listen(Antichain::from_elem(as_of))
1046 .await
1047 .expect("cannot serve requested as_of")
1048 }
1049}
1050
1051#[derive(Debug)]
1053pub(crate) struct UnexpiredReadHandleState {
1054 pub(crate) heartbeat_task: JoinHandle<()>,
1055}
1056
1057#[derive(Debug)]
1063pub struct Cursor<K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64, L = Lease> {
1064 consolidator: Consolidator<T, D, StructuredSort<K, V, T, D>>,
1065 max_len: usize,
1066 max_bytes: usize,
1067 _lease: L,
1068 read_schemas: Schemas<K, V>,
1069}
1070
1071impl<K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64, L> Cursor<K, V, T, D, L> {
1072 pub fn into_lease(self: Self) -> L {
1075 self._lease
1076 }
1077}
1078
1079impl<K, V, T, D, L> Cursor<K, V, T, D, L>
1080where
1081 K: Debug + Codec + Ord,
1082 V: Debug + Codec + Ord,
1083 T: Timestamp + Lattice + Codec64 + Sync,
1084 D: Monoid + Ord + Codec64 + Send + Sync,
1085{
1086 pub async fn next(&mut self) -> Option<impl Iterator<Item = ((K, V), T, D)> + '_> {
1088 let Self {
1089 consolidator,
1090 max_len,
1091 max_bytes,
1092 _lease,
1093 read_schemas: _,
1094 } = self;
1095
1096 let part = consolidator
1097 .next_chunk(*max_len, *max_bytes)
1098 .await
1099 .expect("fetching a leased part")?;
1100 let key_decoder = self
1101 .read_schemas
1102 .key
1103 .decoder_any(part.key.as_ref())
1104 .expect("ok");
1105 let val_decoder = self
1106 .read_schemas
1107 .val
1108 .decoder_any(part.val.as_ref())
1109 .expect("ok");
1110 let iter = (0..part.len()).map(move |i| {
1111 let mut k = K::default();
1112 let mut v = V::default();
1113 key_decoder.decode(i, &mut k);
1114 val_decoder.decode(i, &mut v);
1115 let t = T::decode(part.time.value(i).to_le_bytes());
1116 let d = D::decode(part.diff.value(i).to_le_bytes());
1117 ((k, v), t, d)
1118 });
1119
1120 Some(iter)
1121 }
1122}
1123
1124impl<K, V, T, D> ReadHandle<K, V, T, D>
1125where
1126 K: Debug + Codec + Ord,
1127 V: Debug + Codec + Ord,
1128 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
1129 D: Monoid + Ord + Codec64 + Send + Sync,
1130{
1131 pub async fn snapshot_and_fetch(
1145 &mut self,
1146 as_of: Antichain<T>,
1147 ) -> Result<Vec<((K, V), T, D)>, Since<T>> {
1148 let mut cursor = self.snapshot_cursor(as_of, |_| true).await?;
1149 let mut contents = Vec::new();
1150 while let Some(iter) = cursor.next().await {
1151 contents.extend(iter);
1152 }
1153
1154 let old_len = contents.len();
1157 consolidate_updates(&mut contents);
1158 if old_len != contents.len() {
1159 self.machine
1161 .applier
1162 .shard_metrics
1163 .unconsolidated_snapshot
1164 .inc();
1165 }
1166
1167 Ok(contents)
1168 }
1169
1170 pub async fn snapshot_cursor(
1177 &mut self,
1178 as_of: Antichain<T>,
1179 should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
1180 ) -> Result<Cursor<K, V, T, D>, Since<T>> {
1181 let (lease, batches) = self.snapshot_batches(as_of.clone()).await?;
1182
1183 Self::read_batches_consolidated(
1184 &self.cfg,
1185 Arc::clone(&self.metrics),
1186 Arc::clone(&self.machine.applier.shard_metrics),
1187 self.metrics.read.snapshot.clone(),
1188 Arc::clone(&self.blob),
1189 self.shard_id(),
1190 as_of,
1191 self.read_schemas.clone(),
1192 &batches,
1193 lease,
1194 should_fetch_part,
1195 COMPACTION_MEMORY_BOUND_BYTES.get(&self.cfg),
1196 )
1197 }
1198
1199 pub(crate) fn read_batches_consolidated<L>(
1200 persist_cfg: &PersistConfig,
1201 metrics: Arc<Metrics>,
1202 shard_metrics: Arc<ShardMetrics>,
1203 read_metrics: ReadMetrics,
1204 blob: Arc<dyn Blob>,
1205 shard_id: ShardId,
1206 as_of: Antichain<T>,
1207 schemas: Schemas<K, V>,
1208 batches: &[HollowBatch<T>],
1209 lease: L,
1210 should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
1211 memory_budget_bytes: usize,
1212 ) -> Result<Cursor<K, V, T, D, L>, Since<T>> {
1213 let context = format!("{}[as_of={:?}]", shard_id, as_of.elements());
1214 let filter = FetchBatchFilter::Snapshot {
1215 as_of: as_of.clone(),
1216 };
1217
1218 let mut consolidator = Consolidator::new(
1219 context,
1220 FetchConfig::from_persist_config(persist_cfg),
1221 shard_id,
1222 StructuredSort::new(schemas.clone()),
1223 blob,
1224 metrics,
1225 shard_metrics,
1226 read_metrics,
1227 filter,
1228 None,
1229 memory_budget_bytes,
1230 );
1231 for batch in batches {
1232 for (meta, run) in batch.runs() {
1233 consolidator.enqueue_run(
1234 &batch.desc,
1235 meta,
1236 run.into_iter()
1237 .filter(|p| should_fetch_part(p.stats()))
1238 .cloned(),
1239 );
1240 }
1241 }
1242 let max_len = persist_cfg.compaction_yield_after_n_updates;
1246 let max_bytes = BLOB_TARGET_SIZE.get(persist_cfg).max(1);
1247
1248 Ok(Cursor {
1249 consolidator,
1250 max_len,
1251 max_bytes,
1252 _lease: lease,
1253 read_schemas: schemas,
1254 })
1255 }
1256
1257 pub fn snapshot_stats(
1269 &self,
1270 as_of: Option<Antichain<T>>,
1271 ) -> impl Future<Output = Result<SnapshotStats, Since<T>>> + Send + 'static {
1272 let machine = self.machine.clone();
1273 async move {
1274 let batches = match as_of {
1275 Some(as_of) => machine.unleased_snapshot(&as_of).await?,
1276 None => machine.applier.all_batches(),
1277 };
1278 let num_updates = batches.iter().map(|b| b.len).sum();
1279 Ok(SnapshotStats {
1280 shard_id: machine.shard_id(),
1281 num_updates,
1282 })
1283 }
1284 }
1285
1286 pub async fn snapshot_parts_stats(
1297 &self,
1298 as_of: Antichain<T>,
1299 ) -> Result<SnapshotPartsStats, Since<T>> {
1300 let batches = self.machine.unleased_snapshot(&as_of).await?;
1301 let parts = stream::iter(&batches)
1302 .flat_map(|b| b.part_stream(self.shard_id(), &*self.blob, &*self.metrics))
1303 .map(|p| {
1304 let p = p.expect("live batch");
1305 SnapshotPartStats {
1306 encoded_size_bytes: p.encoded_size_bytes(),
1307 stats: p.stats().cloned(),
1308 }
1309 })
1310 .collect()
1311 .await;
1312 Ok(SnapshotPartsStats {
1313 metrics: Arc::clone(&self.machine.applier.metrics),
1314 shard_id: self.machine.shard_id(),
1315 parts,
1316 })
1317 }
1318}
1319
1320impl<K, V, T, D> ReadHandle<K, V, T, D>
1321where
1322 K: Debug + Codec + Ord,
1323 V: Debug + Codec + Ord,
1324 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
1325 D: Monoid + Codec64 + Send + Sync,
1326{
1327 pub async fn snapshot_and_stream(
1332 &mut self,
1333 as_of: Antichain<T>,
1334 ) -> Result<impl Stream<Item = ((K, V), T, D)> + use<K, V, T, D>, Since<T>> {
1335 let snap = self.snapshot(as_of).await?;
1336
1337 let blob = Arc::clone(&self.blob);
1338 let metrics = Arc::clone(&self.metrics);
1339 let snapshot_metrics = self.metrics.read.snapshot.clone();
1340 let shard_metrics = Arc::clone(&self.machine.applier.shard_metrics);
1341 let reader_id = self.reader_id.clone();
1342 let schemas = self.read_schemas.clone();
1343 let mut schema_cache = self.schema_cache.clone();
1344 let persist_cfg = self.cfg.clone();
1345 let stream = async_stream::stream! {
1346 for part in snap {
1347 let mut fetched_part = fetch_leased_part(
1348 &persist_cfg,
1349 &part,
1350 blob.as_ref(),
1351 Arc::clone(&metrics),
1352 &snapshot_metrics,
1353 &shard_metrics,
1354 &reader_id,
1355 schemas.clone(),
1356 &mut schema_cache,
1357 )
1358 .await;
1359
1360 while let Some(next) = fetched_part.next() {
1361 yield next;
1362 }
1363 }
1364 };
1365
1366 Ok(stream)
1367 }
1368}
1369
1370impl<K, V, T, D> ReadHandle<K, V, T, D>
1371where
1372 K: Debug + Codec + Ord,
1373 V: Debug + Codec + Ord,
1374 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
1375 D: Monoid + Ord + Codec64 + Send + Sync,
1376{
1377 #[cfg(test)]
1380 #[track_caller]
1381 pub async fn expect_snapshot_and_fetch(&mut self, as_of: T) -> Vec<((K, V), T, D)> {
1382 let mut ret = self
1383 .snapshot_and_fetch(Antichain::from_elem(as_of))
1384 .await
1385 .expect("cannot serve requested as_of");
1386
1387 ret.sort();
1388 ret
1389 }
1390}
1391
1392impl<K: Codec, V: Codec, T, D> Drop for ReadHandle<K, V, T, D> {
1393 fn drop(&mut self) {
1394 self.hold_state.modify(|s| {
1395 s.expired = true;
1396 s.request_sync = true;
1397 });
1398 }
1399}
1400
1401#[cfg(test)]
1402mod tests {
1403 use std::pin;
1404 use std::str::FromStr;
1405
1406 use mz_dyncfg::ConfigUpdates;
1407 use mz_ore::cast::CastFrom;
1408 use mz_ore::metrics::MetricsRegistry;
1409 use mz_persist::mem::{MemBlob, MemBlobConfig, MemConsensus};
1410 use mz_persist::unreliable::{UnreliableConsensus, UnreliableHandle};
1411 use serde::{Deserialize, Serialize};
1412 use serde_json::json;
1413 use tokio_stream::StreamExt;
1414
1415 use crate::async_runtime::IsolatedRuntime;
1416 use crate::batch::BLOB_TARGET_SIZE;
1417 use crate::cache::StateCache;
1418 use crate::internal::metrics::Metrics;
1419 use crate::rpc::NoopPubSubSender;
1420 use crate::tests::{all_ok, new_test_client};
1421 use crate::{Diagnostics, PersistClient, PersistConfig, ShardId};
1422
1423 use super::*;
1424
1425 #[mz_persist_proc::test(tokio::test)]
1427 #[cfg_attr(miri, ignore)] async fn drop_unused_subscribe(dyncfgs: ConfigUpdates) {
1429 let data = [
1430 (("0".to_owned(), "zero".to_owned()), 0, 1),
1431 (("1".to_owned(), "one".to_owned()), 1, 1),
1432 (("2".to_owned(), "two".to_owned()), 2, 1),
1433 ];
1434
1435 let (mut write, read) = new_test_client(&dyncfgs)
1436 .await
1437 .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1438 .await;
1439
1440 write.expect_compare_and_append(&data[0..1], 0, 1).await;
1441 write.expect_compare_and_append(&data[1..2], 1, 2).await;
1442 write.expect_compare_and_append(&data[2..3], 2, 3).await;
1443
1444 let subscribe = read
1445 .subscribe(timely::progress::Antichain::from_elem(2))
1446 .await
1447 .unwrap();
1448 assert!(
1449 !subscribe.snapshot.as_ref().unwrap().is_empty(),
1450 "snapshot must have batches for test to be meaningful"
1451 );
1452 drop(subscribe);
1453 }
1454
1455 #[mz_persist_proc::test(tokio::test)]
1457 #[cfg_attr(miri, ignore)] async fn streaming_consolidate(dyncfgs: ConfigUpdates) {
1459 let data = &[
1460 (("k".to_owned(), "v".to_owned()), 0, 1),
1462 (("k".to_owned(), "v".to_owned()), 1, 1),
1463 (("k".to_owned(), "v".to_owned()), 2, 1),
1464 (("k2".to_owned(), "v".to_owned()), 0, 1),
1466 (("k2".to_owned(), "v".to_owned()), 1, -1),
1467 ];
1468
1469 let (mut write, read) = {
1470 let client = new_test_client(&dyncfgs).await;
1471 client.cfg.set_config(&BLOB_TARGET_SIZE, 1000); client
1473 .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1474 .await
1475 };
1476
1477 write.expect_compare_and_append(data, 0, 5).await;
1478
1479 let mut snapshot = read
1480 .subscribe(timely::progress::Antichain::from_elem(4))
1481 .await
1482 .unwrap();
1483
1484 let mut updates = vec![];
1485 'outer: loop {
1486 for event in snapshot.fetch_next().await {
1487 match event {
1488 ListenEvent::Progress(t) => {
1489 if !t.less_than(&4) {
1490 break 'outer;
1491 }
1492 }
1493 ListenEvent::Updates(data) => {
1494 updates.extend(data);
1495 }
1496 }
1497 }
1498 }
1499 assert_eq!(updates, &[(("k".to_owned(), "v".to_owned()), 4u64, 3i64)],)
1500 }
1501
1502 #[mz_persist_proc::test(tokio::test)]
1503 #[cfg_attr(miri, ignore)] async fn snapshot_and_stream(dyncfgs: ConfigUpdates) {
1505 let data = &mut [
1506 (("k1".to_owned(), "v1".to_owned()), 0, 1),
1507 (("k2".to_owned(), "v2".to_owned()), 1, 1),
1508 (("k3".to_owned(), "v3".to_owned()), 2, 1),
1509 (("k4".to_owned(), "v4".to_owned()), 2, 1),
1510 (("k5".to_owned(), "v5".to_owned()), 3, 1),
1511 ];
1512
1513 let (mut write, mut read) = {
1514 let client = new_test_client(&dyncfgs).await;
1515 client.cfg.set_config(&BLOB_TARGET_SIZE, 0); client
1517 .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1518 .await
1519 };
1520
1521 write.expect_compare_and_append(&data[0..2], 0, 2).await;
1522 write.expect_compare_and_append(&data[2..4], 2, 3).await;
1523 write.expect_compare_and_append(&data[4..], 3, 4).await;
1524
1525 let as_of = Antichain::from_elem(3);
1526 let mut snapshot = pin::pin!(read.snapshot_and_stream(as_of.clone()).await.unwrap());
1527
1528 let mut snapshot_rows = vec![];
1529 while let Some(((k, v), t, d)) = snapshot.next().await {
1530 snapshot_rows.push(((k, v), t, d));
1531 }
1532
1533 for ((_k, _v), t, _d) in data.as_mut_slice() {
1534 t.advance_by(as_of.borrow());
1535 }
1536
1537 assert_eq!(data.as_slice(), snapshot_rows.as_slice());
1538 }
1539
1540 #[mz_persist_proc::test(tokio::test)]
1542 #[cfg_attr(miri, ignore)] async fn seqno_leases(dyncfgs: ConfigUpdates) {
1544 let mut data = vec![];
1545 for i in 0..20 {
1546 data.push(((i.to_string(), i.to_string()), i, 1))
1547 }
1548
1549 let shard_id = ShardId::new();
1550
1551 let client = new_test_client(&dyncfgs).await;
1552 let (mut write, read) = client
1553 .expect_open::<String, String, u64, i64>(shard_id)
1554 .await;
1555
1556 let mut offset = 0;
1558 let mut width = 2;
1559
1560 for i in offset..offset + width {
1561 write
1562 .expect_compare_and_append(
1563 &data[i..i + 1],
1564 u64::cast_from(i),
1565 u64::cast_from(i) + 1,
1566 )
1567 .await;
1568 }
1569 offset += width;
1570
1571 let mut fetcher = client
1573 .create_batch_fetcher::<String, String, u64, i64>(
1574 shard_id,
1575 Default::default(),
1576 Default::default(),
1577 false,
1578 Diagnostics::for_tests(),
1579 )
1580 .await
1581 .unwrap();
1582
1583 let mut subscribe = read
1584 .subscribe(timely::progress::Antichain::from_elem(1))
1585 .await
1586 .expect("cannot serve requested as_of");
1587
1588 let original_seqno_since = subscribe.listen.handle.outstanding_seqno();
1590 if let Some(snapshot) = &subscribe.snapshot {
1591 for part in snapshot {
1592 assert!(
1593 part.lease.seqno() >= original_seqno_since,
1594 "our seqno hold must cover all parts"
1595 );
1596 }
1597 }
1598
1599 let mut parts = vec![];
1600
1601 width = 4;
1602 for i in offset..offset + width {
1604 for event in subscribe.next(None).await {
1605 if let ListenEvent::Updates(mut new_parts) = event {
1606 parts.append(&mut new_parts);
1607 subscribe
1610 .listen
1611 .handle
1612 .downgrade_since(&subscribe.listen.since)
1613 .await;
1614 }
1615 }
1616
1617 write
1618 .expect_compare_and_append(
1619 &data[i..i + 1],
1620 u64::cast_from(i),
1621 u64::cast_from(i) + 1,
1622 )
1623 .await;
1624
1625 assert_eq!(
1627 subscribe.listen.handle.machine.applier.seqno_since(),
1628 original_seqno_since
1629 );
1630 }
1631
1632 offset += width;
1633
1634 let mut seqno_since = subscribe.listen.handle.machine.applier.seqno_since();
1635
1636 assert_eq!(seqno_since, original_seqno_since);
1638
1639 let mut subsequent_parts = vec![];
1642
1643 let mut this_seqno = SeqNo::minimum();
1647
1648 for (mut i, part) in parts.into_iter().enumerate() {
1650 let part_seqno = part.lease.seqno();
1651 let last_seqno = this_seqno;
1652 this_seqno = part_seqno;
1653 assert!(this_seqno >= last_seqno);
1654
1655 let (part, lease) = part.into_exchangeable_part();
1656 let _ = fetcher.fetch_leased_part(part).await;
1657 drop(lease);
1658
1659 for event in subscribe.next(None).await {
1661 if let ListenEvent::Updates(parts) = event {
1662 for part in parts {
1663 let (_, lease) = part.into_exchangeable_part();
1664 subsequent_parts.push(lease);
1665 }
1666 }
1667 }
1668
1669 subscribe
1670 .listen
1671 .handle
1672 .downgrade_since(&subscribe.listen.since)
1673 .await;
1674
1675 i += offset;
1677 write
1678 .expect_compare_and_append(
1679 &data[i..i + 1],
1680 u64::cast_from(i),
1681 u64::cast_from(i) + 1,
1682 )
1683 .await;
1684
1685 let expect_downgrade = subscribe.listen.handle.outstanding_seqno() > part_seqno;
1688
1689 let new_seqno_since = subscribe.listen.handle.machine.applier.seqno_since();
1690 if expect_downgrade {
1691 assert!(new_seqno_since > seqno_since);
1692 } else {
1693 assert_eq!(new_seqno_since, seqno_since);
1694 }
1695 seqno_since = new_seqno_since;
1696 }
1697
1698 assert!(seqno_since > original_seqno_since);
1700
1701 drop(subsequent_parts);
1703 drop(subscribe);
1704 }
1705
1706 #[mz_ore::test]
1707 fn reader_id_human_readable_serde() {
1708 #[derive(Debug, Serialize, Deserialize)]
1709 struct Container {
1710 reader_id: LeasedReaderId,
1711 }
1712
1713 let id =
1715 LeasedReaderId::from_str("r00000000-1234-5678-0000-000000000000").expect("valid id");
1716 assert_eq!(
1717 id,
1718 serde_json::from_value(serde_json::to_value(id.clone()).expect("serializable"))
1719 .expect("deserializable")
1720 );
1721
1722 assert_eq!(
1724 id,
1725 serde_json::from_str("\"r00000000-1234-5678-0000-000000000000\"")
1726 .expect("deserializable")
1727 );
1728
1729 let json = json!({ "reader_id": id });
1731 assert_eq!(
1732 "{\"reader_id\":\"r00000000-1234-5678-0000-000000000000\"}",
1733 &json.to_string()
1734 );
1735 let container: Container = serde_json::from_value(json).expect("deserializable");
1736 assert_eq!(container.reader_id, id);
1737 }
1738
1739 #[mz_ore::test(tokio::test)]
1743 #[cfg_attr(miri, ignore)] async fn skip_consensus_fetch_optimization() {
1745 let data = vec![
1746 (("0".to_owned(), "zero".to_owned()), 0, 1),
1747 (("1".to_owned(), "one".to_owned()), 1, 1),
1748 (("2".to_owned(), "two".to_owned()), 2, 1),
1749 ];
1750
1751 let cfg = PersistConfig::new_for_tests();
1752 let blob = Arc::new(MemBlob::open(MemBlobConfig::default()));
1753 let consensus = Arc::new(MemConsensus::default());
1754 let unreliable = UnreliableHandle::default();
1755 unreliable.totally_available();
1756 let consensus = Arc::new(UnreliableConsensus::new(consensus, unreliable.clone()));
1757 let metrics = Arc::new(Metrics::new(&cfg, &MetricsRegistry::new()));
1758 let pubsub_sender = Arc::new(NoopPubSubSender);
1759 let (mut write, mut read) = PersistClient::new(
1760 cfg,
1761 blob,
1762 consensus,
1763 metrics,
1764 Arc::new(IsolatedRuntime::new_for_tests()),
1765 Arc::new(StateCache::new_no_metrics()),
1766 pubsub_sender,
1767 )
1768 .expect("client construction failed")
1769 .expect_open::<String, String, u64, i64>(ShardId::new())
1770 .await;
1771
1772 write.expect_compare_and_append(&data[0..1], 0, 1).await;
1773 write.expect_compare_and_append(&data[1..2], 1, 2).await;
1774 write.expect_compare_and_append(&data[2..3], 2, 3).await;
1775
1776 let snapshot = read.expect_snapshot_and_fetch(2).await;
1777 let mut listen = read.expect_listen(0).await;
1778
1779 let listen_actual = listen.fetch_next().await;
1784 let expected_events = vec![ListenEvent::Progress(Antichain::from_elem(1))];
1785 assert_eq!(listen_actual, expected_events);
1786
1787 unreliable.totally_unavailable();
1790 assert_eq!(snapshot, all_ok(&data, 2));
1791 assert_eq!(
1792 listen.read_until(&3).await,
1793 (all_ok(&data[1..], 1), Antichain::from_elem(3))
1794 );
1795 }
1796}