1use async_stream::stream;
13use std::collections::BTreeMap;
14use std::fmt::Debug;
15use std::future::Future;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use differential_dataflow::Hashable;
20use differential_dataflow::consolidation::consolidate_updates;
21use differential_dataflow::difference::Monoid;
22use differential_dataflow::lattice::Lattice;
23use futures::Stream;
24use futures_util::{StreamExt, stream};
25use mz_dyncfg::Config;
26use mz_ore::cast::CastLossy;
27use mz_ore::halt;
28use mz_ore::instrument;
29use mz_ore::task::JoinHandle;
30use mz_persist::location::{Blob, SeqNo};
31use mz_persist_types::columnar::{ColumnDecoder, Schema};
32use mz_persist_types::{Codec, Codec64};
33use proptest_derive::Arbitrary;
34use serde::{Deserialize, Serialize};
35use timely::PartialOrder;
36use timely::order::TotalOrder;
37use timely::progress::{Antichain, Timestamp};
38use tracing::warn;
39use uuid::Uuid;
40
41use crate::batch::BLOB_TARGET_SIZE;
42use crate::cfg::{COMPACTION_MEMORY_BOUND_BYTES, RetryParameters};
43use crate::fetch::FetchConfig;
44use crate::fetch::{FetchBatchFilter, FetchedPart, Lease, LeasedBatchPart, fetch_leased_part};
45use crate::internal::encoding::Schemas;
46use crate::internal::machine::Machine;
47use crate::internal::metrics::{Metrics, ReadMetrics, ShardMetrics};
48use crate::internal::state::{HollowBatch, LeasedReaderState};
49use crate::internal::watch::{AwaitableState, StateWatch};
50use crate::iter::{Consolidator, StructuredSort};
51use crate::schema::SchemaCache;
52use crate::stats::{SnapshotPartStats, SnapshotPartsStats, SnapshotStats};
53use crate::{GarbageCollector, PersistConfig, ShardId, parse_id};
54
55pub use crate::internal::encoding::LazyPartStats;
56pub use crate::internal::state::Since;
57
58#[derive(
60 Arbitrary,
61 Clone,
62 PartialEq,
63 Eq,
64 PartialOrd,
65 Ord,
66 Hash,
67 Serialize,
68 Deserialize
69)]
70#[serde(try_from = "String", into = "String")]
71pub struct LeasedReaderId(pub(crate) [u8; 16]);
72
73impl std::fmt::Display for LeasedReaderId {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 write!(f, "r{}", Uuid::from_bytes(self.0))
76 }
77}
78
79impl std::fmt::Debug for LeasedReaderId {
80 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81 write!(f, "LeasedReaderId({})", Uuid::from_bytes(self.0))
82 }
83}
84
85impl std::str::FromStr for LeasedReaderId {
86 type Err = String;
87
88 fn from_str(s: &str) -> Result<Self, Self::Err> {
89 parse_id("r", "LeasedReaderId", s).map(LeasedReaderId)
90 }
91}
92
93impl From<LeasedReaderId> for String {
94 fn from(reader_id: LeasedReaderId) -> Self {
95 reader_id.to_string()
96 }
97}
98
99impl TryFrom<String> for LeasedReaderId {
100 type Error = String;
101
102 fn try_from(s: String) -> Result<Self, Self::Error> {
103 s.parse()
104 }
105}
106
107impl LeasedReaderId {
108 pub(crate) fn new() -> Self {
109 LeasedReaderId(*Uuid::new_v4().as_bytes())
110 }
111}
112
113#[derive(Debug)]
118pub struct Subscribe<K: Codec, V: Codec, T, D> {
119 snapshot: Option<Vec<LeasedBatchPart<T>>>,
120 listen: Listen<K, V, T, D>,
121}
122
123impl<K, V, T, D> Subscribe<K, V, T, D>
124where
125 K: Debug + Codec,
126 V: Debug + Codec,
127 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
128 D: Monoid + Codec64 + Send + Sync,
129{
130 fn new(snapshot_parts: Vec<LeasedBatchPart<T>>, listen: Listen<K, V, T, D>) -> Self {
131 Subscribe {
132 snapshot: Some(snapshot_parts),
133 listen,
134 }
135 }
136
137 #[instrument(level = "debug", fields(shard = %self.listen.handle.machine.shard_id()))]
145 pub async fn next(
146 &mut self,
147 listen_retry: Option<RetryParameters>,
149 ) -> Vec<ListenEvent<T, LeasedBatchPart<T>>> {
150 match self.snapshot.take() {
151 Some(parts) => vec![ListenEvent::Updates(parts)],
152 None => {
153 let (parts, upper) = self.listen.next(listen_retry).await;
154 vec![ListenEvent::Updates(parts), ListenEvent::Progress(upper)]
155 }
156 }
157 }
158}
159
160impl<K, V, T, D> Subscribe<K, V, T, D>
161where
162 K: Debug + Codec,
163 V: Debug + Codec,
164 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
165 D: Monoid + Codec64 + Send + Sync,
166{
167 #[instrument(level = "debug", fields(shard = %self.listen.handle.machine.shard_id()))]
170 pub async fn fetch_next(&mut self) -> Vec<ListenEvent<T, ((K, V), T, D)>> {
171 let events = self.next(None).await;
172 let new_len = events
173 .iter()
174 .map(|event| match event {
175 ListenEvent::Updates(parts) => parts.len(),
176 ListenEvent::Progress(_) => 1,
177 })
178 .sum();
179 let mut ret = Vec::with_capacity(new_len);
180 for event in events {
181 match event {
182 ListenEvent::Updates(parts) => {
183 for part in parts {
184 let fetched_part = self.listen.fetch_batch_part(part).await;
185 let updates = fetched_part.collect::<Vec<_>>();
186 if !updates.is_empty() {
187 ret.push(ListenEvent::Updates(updates));
188 }
189 }
190 }
191 ListenEvent::Progress(progress) => ret.push(ListenEvent::Progress(progress)),
192 }
193 }
194 ret
195 }
196
197 pub async fn fetch_batch_part(&mut self, part: LeasedBatchPart<T>) -> FetchedPart<K, V, T, D> {
199 self.listen.fetch_batch_part(part).await
200 }
201}
202
203impl<K, V, T, D> Subscribe<K, V, T, D>
204where
205 K: Debug + Codec,
206 V: Debug + Codec,
207 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
208 D: Monoid + Codec64 + Send + Sync,
209{
210 pub async fn expire(mut self) {
217 let _ = self.snapshot.take(); self.listen.expire().await;
219 }
220}
221
222#[derive(Debug, PartialEq)]
226pub enum ListenEvent<T, D> {
227 Progress(Antichain<T>),
229 Updates(Vec<D>),
231}
232
233#[derive(Debug)]
235pub struct Listen<K: Codec, V: Codec, T, D> {
236 handle: ReadHandle<K, V, T, D>,
237 as_of: Antichain<T>,
238 since: Antichain<T>,
239 frontier: Antichain<T>,
240}
241
242impl<K, V, T, D> Listen<K, V, T, D>
243where
244 K: Debug + Codec,
245 V: Debug + Codec,
246 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
247 D: Monoid + Codec64 + Send + Sync,
248{
249 async fn new(
250 mut handle: ReadHandle<K, V, T, D>,
251 as_of: Antichain<T>,
252 ) -> Result<Self, Since<T>> {
253 let () = handle.machine.verify_listen(&as_of)?;
254
255 let since = as_of.clone();
256 if !PartialOrder::less_equal(handle.since(), &since) {
257 return Err(Since(handle.since().clone()));
260 }
261 handle.downgrade_since(&since).await;
265 Ok(Listen {
266 handle,
267 since,
268 frontier: as_of.clone(),
269 as_of,
270 })
271 }
272
273 pub fn frontier(&self) -> &Antichain<T> {
275 &self.frontier
276 }
277
278 pub async fn next(
286 &mut self,
287 retry: Option<RetryParameters>,
289 ) -> (Vec<LeasedBatchPart<T>>, Antichain<T>) {
290 let batch = loop {
291 let min_elapsed = self.handle.heartbeat_duration();
292 let next_batch = self.handle.machine.next_listen_batch(
293 &self.frontier,
294 &mut self.handle.watch,
295 Some(&self.handle.reader_id),
296 retry,
297 );
298 match tokio::time::timeout(min_elapsed, next_batch).await {
299 Ok(batch) => break batch,
300 Err(_elapsed) => {
301 self.handle.maybe_downgrade_since(&self.since).await;
302 }
303 }
304 };
305
306 let acceptable_desc = PartialOrder::less_than(batch.desc.since(), &self.frontier)
316 || (self.frontier == self.as_of
321 && PartialOrder::less_equal(batch.desc.since(), &self.frontier));
322 if !acceptable_desc {
323 let lease_state = self
324 .handle
325 .machine
326 .applier
327 .reader_lease(self.handle.reader_id.clone());
328 if let Some(lease) = lease_state {
329 panic!(
330 "Listen on {} received a batch {:?} advanced past the listen frontier {:?}, but the lease has not expired: {:?}",
331 self.handle.machine.shard_id(),
332 batch.desc,
333 self.frontier,
334 lease
335 )
336 } else {
337 halt!(
340 "Listen on {} received a batch {:?} advanced past the listen frontier {:?} after the reader has expired. \
341 This can happen in exceptional cases: a machine goes to sleep or is running out of memory or CPU, for example.",
342 self.handle.machine.shard_id(),
343 batch.desc,
344 self.frontier
345 )
346 }
347 }
348
349 let new_frontier = batch.desc.upper().clone();
350
351 for x in self.frontier.elements().iter() {
374 let less_than_upper = batch.desc.upper().elements().iter().any(|u| x.less_than(u));
375 if less_than_upper {
376 self.since.join_assign(&Antichain::from_elem(x.clone()));
377 }
378 }
379
380 let filter = FetchBatchFilter::Listen {
385 as_of: self.as_of.clone(),
386 lower: self.frontier.clone(),
387 };
388 let parts = self.handle.lease_batch_parts(batch, filter).collect().await;
389
390 self.handle.maybe_downgrade_since(&self.since).await;
391
392 self.frontier = new_frontier;
395
396 (parts, self.frontier.clone())
397 }
398}
399
400impl<K, V, T, D> Listen<K, V, T, D>
401where
402 K: Debug + Codec,
403 V: Debug + Codec,
404 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
405 D: Monoid + Codec64 + Send + Sync,
406{
407 #[instrument(level = "debug", name = "listen::next", fields(shard = %self.handle.machine.shard_id()))]
418 pub async fn fetch_next(&mut self) -> Vec<ListenEvent<T, ((K, V), T, D)>> {
419 let (parts, progress) = self.next(None).await;
420 let mut ret = Vec::with_capacity(parts.len() + 1);
421 for part in parts {
422 let fetched_part = self.fetch_batch_part(part).await;
423 let updates = fetched_part.collect::<Vec<_>>();
424 if !updates.is_empty() {
425 ret.push(ListenEvent::Updates(updates));
426 }
427 }
428 ret.push(ListenEvent::Progress(progress));
429 ret
430 }
431
432 pub fn into_stream(mut self) -> impl Stream<Item = ListenEvent<T, ((K, V), T, D)>> {
434 async_stream::stream!({
435 loop {
436 for msg in self.fetch_next().await {
437 yield msg;
438 }
439 }
440 })
441 }
442
443 #[cfg(test)]
447 #[track_caller]
448 pub async fn read_until(&mut self, ts: &T) -> (Vec<((K, V), T, D)>, Antichain<T>) {
449 let mut updates = Vec::new();
450 let mut frontier = Antichain::from_elem(T::minimum());
451 while self.frontier.less_than(ts) {
452 for event in self.fetch_next().await {
453 match event {
454 ListenEvent::Updates(mut x) => updates.append(&mut x),
455 ListenEvent::Progress(x) => frontier = x,
456 }
457 }
458 }
459 (updates, frontier)
462 }
463}
464
465impl<K, V, T, D> Listen<K, V, T, D>
466where
467 K: Debug + Codec,
468 V: Debug + Codec,
469 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
470 D: Monoid + Codec64 + Send + Sync,
471{
472 async fn fetch_batch_part(&mut self, part: LeasedBatchPart<T>) -> FetchedPart<K, V, T, D> {
477 let fetched_part = fetch_leased_part(
478 &self.handle.cfg,
479 &part,
480 self.handle.blob.as_ref(),
481 Arc::clone(&self.handle.metrics),
482 &self.handle.metrics.read.listen,
483 &self.handle.machine.applier.shard_metrics,
484 &self.handle.reader_id,
485 self.handle.read_schemas.clone(),
486 &mut self.handle.schema_cache,
487 )
488 .await;
489 fetched_part
490 }
491
492 pub async fn expire(self) {
499 self.handle.expire().await
500 }
501}
502
503#[derive(Debug)]
506pub(crate) struct ReadHolds<T> {
507 held_since: Antichain<T>,
509 applied_since: Antichain<T>,
512 recent_seqno: SeqNo,
514 leases: BTreeMap<SeqNo, Lease>,
517 expired: bool,
519 request_sync: bool,
522}
523
524impl<T> ReadHolds<T>
525where
526 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
527{
528 pub fn downgrade_since(&mut self, since: &Antichain<T>) {
529 self.held_since.join_assign(since);
530 }
531
532 pub fn observe_seqno(&mut self, seqno: SeqNo) {
533 self.recent_seqno = seqno.max(self.recent_seqno);
534 }
535
536 pub fn lease_seqno(&mut self) -> Lease {
537 let seqno = self.recent_seqno;
538 let lease = self
539 .leases
540 .entry(seqno)
541 .or_insert_with(|| Lease::new(seqno));
542 lease.clone()
543 }
544
545 pub fn outstanding_seqno(&mut self) -> SeqNo {
546 while let Some(first) = self.leases.first_entry() {
547 if first.get().count() <= 1 {
548 first.remove();
549 } else {
550 return *first.key();
551 }
552 }
553 self.recent_seqno
554 }
555}
556
557#[derive(Debug)]
578pub struct ReadHandle<K: Codec, V: Codec, T, D> {
579 pub(crate) cfg: PersistConfig,
580 pub(crate) metrics: Arc<Metrics>,
581 pub(crate) machine: Machine<K, V, T, D>,
582 pub(crate) gc: GarbageCollector<K, V, T, D>,
583 pub(crate) blob: Arc<dyn Blob>,
584 watch: StateWatch<K, V, T, D>,
585
586 pub(crate) reader_id: LeasedReaderId,
587 pub(crate) read_schemas: Schemas<K, V>,
588 pub(crate) schema_cache: SchemaCache<K, V, T, D>,
589
590 since: Antichain<T>,
591 pub(crate) hold_state: AwaitableState<ReadHolds<T>>,
592 pub(crate) unexpired_state: Option<UnexpiredReadHandleState>,
593}
594
595pub(crate) const READER_LEASE_DURATION: Config<Duration> = Config::new(
598 "persist_reader_lease_duration",
599 Duration::from_secs(60 * 15),
600 "The time after which we'll clean up stale read leases",
601);
602
603impl<K, V, T, D> ReadHandle<K, V, T, D>
604where
605 K: Debug + Codec,
606 V: Debug + Codec,
607 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
608 D: Monoid + Codec64 + Send + Sync,
609{
610 #[allow(clippy::unused_async)]
611 pub(crate) async fn new(
612 cfg: PersistConfig,
613 metrics: Arc<Metrics>,
614 machine: Machine<K, V, T, D>,
615 gc: GarbageCollector<K, V, T, D>,
616 blob: Arc<dyn Blob>,
617 reader_id: LeasedReaderId,
618 read_schemas: Schemas<K, V>,
619 state: LeasedReaderState<T>,
620 ) -> Self {
621 let schema_cache = machine.applier.schema_cache();
622 let hold_state = AwaitableState::new(ReadHolds {
623 held_since: state.since.clone(),
624 applied_since: state.since.clone(),
625 recent_seqno: state.seqno,
626 leases: Default::default(),
627 expired: false,
628 request_sync: false,
629 });
630 ReadHandle {
631 cfg,
632 metrics: Arc::clone(&metrics),
633 machine: machine.clone(),
634 gc: gc.clone(),
635 blob,
636 watch: machine.applier.watch(),
637 reader_id: reader_id.clone(),
638 read_schemas,
639 schema_cache,
640 since: state.since,
641 hold_state: hold_state.clone(),
642 unexpired_state: Some(UnexpiredReadHandleState {
643 heartbeat_task: Self::start_reader_heartbeat_task(
644 machine, reader_id, gc, hold_state,
645 ),
646 }),
647 }
648 }
649
650 fn start_reader_heartbeat_task(
651 machine: Machine<K, V, T, D>,
652 reader_id: LeasedReaderId,
653 gc: GarbageCollector<K, V, T, D>,
654 leased_seqnos: AwaitableState<ReadHolds<T>>,
655 ) -> JoinHandle<()> {
656 let metrics = Arc::clone(&machine.applier.metrics);
657 let name = format!(
658 "persist::heartbeat_read({},{})",
659 machine.shard_id(),
660 reader_id
661 );
662 mz_ore::task::spawn(|| name, {
663 metrics.tasks.heartbeat_read.instrument_task(async move {
664 Self::reader_heartbeat_task(machine, reader_id, gc, leased_seqnos).await
665 })
666 })
667 }
668
669 async fn reader_heartbeat_task(
670 machine: Machine<K, V, T, D>,
671 reader_id: LeasedReaderId,
672 gc: GarbageCollector<K, V, T, D>,
673 leased_seqnos: AwaitableState<ReadHolds<T>>,
674 ) {
675 let sleep_duration = READER_LEASE_DURATION.get(&machine.applier.cfg) / 4;
676 let jitter: f64 = f64::cast_lossy(reader_id.hashed()) / f64::cast_lossy(u64::MAX);
679 let mut interval = tokio::time::interval_at(
680 tokio::time::Instant::now() + sleep_duration.mul_f64(jitter),
681 sleep_duration,
682 );
683 let mut held_since = leased_seqnos.read(|s| s.held_since.clone());
684 loop {
685 let before_sleep = Instant::now();
686 let _woke_by_tick = tokio::select! {
687 _tick = interval.tick() => {
688 true
689 }
690 _whatever = leased_seqnos.wait_while(|s| !s.request_sync) => {
691 false
692 }
693 };
694
695 let elapsed_since_before_sleeping = before_sleep.elapsed();
696 if elapsed_since_before_sleeping > sleep_duration + Duration::from_secs(60) {
697 warn!(
698 "reader ({}) of shard ({}) went {}s between heartbeats",
699 reader_id,
700 machine.shard_id(),
701 elapsed_since_before_sleeping.as_secs_f64()
702 );
703 }
704
705 let before_heartbeat = Instant::now();
706 let heartbeat_ms = (machine.applier.cfg.now)();
707 let current_seqno = machine.seqno();
708 let result = leased_seqnos.modify(|s| {
709 if s.expired {
710 Err(())
711 } else {
712 s.observe_seqno(current_seqno);
713 s.request_sync = false;
714 held_since.join_assign(&s.held_since);
715 Ok(s.outstanding_seqno())
716 }
717 });
718 let actual_since = match result {
719 Ok(held_seqno) => {
720 let (seqno, actual_since, maintenance) = machine
721 .downgrade_since(&reader_id, held_seqno, &held_since, heartbeat_ms)
722 .await;
723 leased_seqnos.modify(|s| {
724 s.applied_since.clone_from(&actual_since.0);
725 s.observe_seqno(seqno)
726 });
727 maintenance.start_performing(&machine, &gc);
728 actual_since
729 }
730 Err(()) => {
731 let (seqno, maintenance) = machine.expire_leased_reader(&reader_id).await;
732 leased_seqnos.modify(|s| s.observe_seqno(seqno));
733 maintenance.start_performing(&machine, &gc);
734 break;
735 }
736 };
737
738 let elapsed_since_heartbeat = before_heartbeat.elapsed();
739 if elapsed_since_heartbeat > Duration::from_secs(60) {
740 warn!(
741 "reader ({}) of shard ({}) heartbeat call took {}s",
742 reader_id,
743 machine.shard_id(),
744 elapsed_since_heartbeat.as_secs_f64(),
745 );
746 }
747
748 if PartialOrder::less_than(&held_since, &actual_since.0) {
749 warn!(
757 "heartbeat task for reader ({}) of shard ({}) exiting due to expired lease \
758 while read handle is live",
759 reader_id,
760 machine.shard_id(),
761 );
762 return;
763 }
764 }
765 }
766
767 pub fn shard_id(&self) -> ShardId {
769 self.machine.shard_id()
770 }
771
772 pub fn since(&self) -> &Antichain<T> {
776 &self.since
777 }
778
779 #[cfg(test)]
780 fn outstanding_seqno(&self) -> SeqNo {
781 let current_seqno = self.machine.seqno();
782 self.hold_state.modify(|s| {
783 s.observe_seqno(current_seqno);
784 s.outstanding_seqno()
785 })
786 }
787
788 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
795 pub async fn downgrade_since(&mut self, new_since: &Antichain<T>) {
796 self.since = new_since.clone();
797 self.hold_state.modify(|s| {
798 s.downgrade_since(new_since);
799 s.request_sync = true;
800 });
801 self.hold_state
802 .wait_while(|s| PartialOrder::less_than(&s.applied_since, new_since))
803 .await;
804 }
805
806 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
823 pub async fn listen(self, as_of: Antichain<T>) -> Result<Listen<K, V, T, D>, Since<T>> {
824 Listen::new(self, as_of).await
825 }
826
827 #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
841 pub async fn snapshot(
842 &mut self,
843 as_of: Antichain<T>,
844 ) -> Result<Vec<LeasedBatchPart<T>>, Since<T>> {
845 let batches = loop {
846 let min_elapsed = self.heartbeat_duration();
847 match tokio::time::timeout(min_elapsed, self.machine.snapshot(&as_of)).await {
848 Ok(Ok(batches)) => break batches,
849 Ok(Err(since)) => return Err(since),
850 Err(_timeout) => {
851 let since = self.since().clone();
852 self.maybe_downgrade_since(&since).await;
853 }
854 }
855 };
856
857 if !PartialOrder::less_equal(self.since(), &as_of) {
858 return Err(Since(self.since().clone()));
859 }
860
861 let filter = FetchBatchFilter::Snapshot { as_of };
862 let mut leased_parts = Vec::new();
863 for batch in batches {
864 leased_parts.extend(
869 self.lease_batch_parts(batch, filter.clone())
870 .collect::<Vec<_>>()
871 .await,
872 );
873 }
874 Ok(leased_parts)
875 }
876
877 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
883 pub async fn subscribe(
884 mut self,
885 as_of: Antichain<T>,
886 ) -> Result<Subscribe<K, V, T, D>, Since<T>> {
887 let snapshot_parts = self.snapshot(as_of.clone()).await?;
888 let listen = self.listen(as_of.clone()).await?;
889 Ok(Subscribe::new(snapshot_parts, listen))
890 }
891
892 fn lease_batch_parts(
893 &mut self,
894 batch: HollowBatch<T>,
895 filter: FetchBatchFilter<T>,
896 ) -> impl Stream<Item = LeasedBatchPart<T>> + '_ {
897 stream! {
898 let blob = Arc::clone(&self.blob);
899 let metrics = Arc::clone(&self.metrics);
900 let desc = batch.desc.clone();
901 let lease = self.lease_seqno().await;
902 for await part in batch.part_stream(self.shard_id(), &*blob, &*metrics) {
903 yield LeasedBatchPart {
904 metrics: Arc::clone(&self.metrics),
905 shard_id: self.machine.shard_id(),
906 filter: filter.clone(),
907 desc: desc.clone(),
908 part: part.expect("leased part").into_owned(),
909 lease: lease.clone(),
910 filter_pushdown_audit: false,
911 }
912 }
913 }
914 }
915
916 async fn lease_seqno(&mut self) -> Lease {
920 let current_seqno = self.machine.seqno();
921 let lease = self.hold_state.modify(|s| {
922 s.observe_seqno(current_seqno);
923 s.lease_seqno()
924 });
925 self.watch.wait_for_seqno_ge(lease.seqno()).await;
930 lease
931 }
932
933 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
936 pub async fn clone(&self, purpose: &str) -> Self {
937 let new_reader_id = LeasedReaderId::new();
938 let machine = self.machine.clone();
939 let gc = self.gc.clone();
940 let heartbeat_ts = (self.cfg.now)();
941 let (reader_state, maintenance) = machine
942 .register_leased_reader(
943 &new_reader_id,
944 purpose,
945 READER_LEASE_DURATION.get(&self.cfg),
946 heartbeat_ts,
947 false,
948 )
949 .await;
950 maintenance.start_performing(&machine, &gc);
951 assert!(PartialOrder::less_equal(&reader_state.since, &self.since));
955 let new_reader = ReadHandle::new(
956 self.cfg.clone(),
957 Arc::clone(&self.metrics),
958 machine,
959 gc,
960 Arc::clone(&self.blob),
961 new_reader_id,
962 self.read_schemas.clone(),
963 reader_state,
964 )
965 .await;
966 new_reader
967 }
968
969 fn heartbeat_duration(&self) -> Duration {
970 READER_LEASE_DURATION.get(&self.cfg) / 4
971 }
972
973 #[allow(clippy::unused_async)]
978 pub async fn maybe_downgrade_since(&mut self, new_since: &Antichain<T>) {
979 self.since = new_since.clone();
980 self.hold_state.modify(|s| {
981 s.downgrade_since(new_since);
982 });
983 }
984
985 #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
991 pub async fn expire(mut self) {
992 self.hold_state.modify(|s| {
993 s.expired = true;
994 s.request_sync = true;
995 });
996 let Some(unexpired_state) = self.unexpired_state.take() else {
997 return;
998 };
999 unexpired_state.heartbeat_task.await;
1000 }
1001
1002 #[cfg(test)]
1004 #[track_caller]
1005 pub async fn expect_listen(self, as_of: T) -> Listen<K, V, T, D> {
1006 self.listen(Antichain::from_elem(as_of))
1007 .await
1008 .expect("cannot serve requested as_of")
1009 }
1010}
1011
1012#[derive(Debug)]
1014pub(crate) struct UnexpiredReadHandleState {
1015 pub(crate) heartbeat_task: JoinHandle<()>,
1016}
1017
1018#[derive(Debug)]
1024pub struct Cursor<K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64, L = Lease> {
1025 consolidator: Consolidator<T, D, StructuredSort<K, V, T, D>>,
1026 max_len: usize,
1027 max_bytes: usize,
1028 _lease: L,
1029 read_schemas: Schemas<K, V>,
1030}
1031
1032impl<K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64, L> Cursor<K, V, T, D, L> {
1033 pub fn into_lease(self: Self) -> L {
1036 self._lease
1037 }
1038}
1039
1040impl<K, V, T, D, L> Cursor<K, V, T, D, L>
1041where
1042 K: Debug + Codec + Ord,
1043 V: Debug + Codec + Ord,
1044 T: Timestamp + Lattice + Codec64 + Sync,
1045 D: Monoid + Ord + Codec64 + Send + Sync,
1046{
1047 pub async fn next(&mut self) -> Option<impl Iterator<Item = ((K, V), T, D)> + '_> {
1049 let Self {
1050 consolidator,
1051 max_len,
1052 max_bytes,
1053 _lease,
1054 read_schemas: _,
1055 } = self;
1056
1057 let part = consolidator
1058 .next_chunk(*max_len, *max_bytes)
1059 .await
1060 .expect("fetching a leased part")?;
1061 let key_decoder = self
1062 .read_schemas
1063 .key
1064 .decoder_any(part.key.as_ref())
1065 .expect("ok");
1066 let val_decoder = self
1067 .read_schemas
1068 .val
1069 .decoder_any(part.val.as_ref())
1070 .expect("ok");
1071 let iter = (0..part.len()).map(move |i| {
1072 let mut k = K::default();
1073 let mut v = V::default();
1074 key_decoder.decode(i, &mut k);
1075 val_decoder.decode(i, &mut v);
1076 let t = T::decode(part.time.value(i).to_le_bytes());
1077 let d = D::decode(part.diff.value(i).to_le_bytes());
1078 ((k, v), t, d)
1079 });
1080
1081 Some(iter)
1082 }
1083}
1084
1085impl<K, V, T, D> ReadHandle<K, V, T, D>
1086where
1087 K: Debug + Codec + Ord,
1088 V: Debug + Codec + Ord,
1089 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
1090 D: Monoid + Ord + Codec64 + Send + Sync,
1091{
1092 pub async fn snapshot_and_fetch(
1106 &mut self,
1107 as_of: Antichain<T>,
1108 ) -> Result<Vec<((K, V), T, D)>, Since<T>> {
1109 let mut cursor = self.snapshot_cursor(as_of, |_| true).await?;
1110 let mut contents = Vec::new();
1111 while let Some(iter) = cursor.next().await {
1112 contents.extend(iter);
1113 }
1114
1115 let old_len = contents.len();
1118 consolidate_updates(&mut contents);
1119 if old_len != contents.len() {
1120 self.machine
1122 .applier
1123 .shard_metrics
1124 .unconsolidated_snapshot
1125 .inc();
1126 }
1127
1128 Ok(contents)
1129 }
1130
1131 pub async fn snapshot_cursor(
1138 &mut self,
1139 as_of: Antichain<T>,
1140 should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
1141 ) -> Result<Cursor<K, V, T, D>, Since<T>> {
1142 let batches = self.machine.snapshot(&as_of).await?;
1143 let lease = self.lease_seqno().await;
1144
1145 Self::read_batches_consolidated(
1146 &self.cfg,
1147 Arc::clone(&self.metrics),
1148 Arc::clone(&self.machine.applier.shard_metrics),
1149 self.metrics.read.snapshot.clone(),
1150 Arc::clone(&self.blob),
1151 self.shard_id(),
1152 as_of,
1153 self.read_schemas.clone(),
1154 &batches,
1155 lease,
1156 should_fetch_part,
1157 COMPACTION_MEMORY_BOUND_BYTES.get(&self.cfg),
1158 )
1159 }
1160
1161 pub(crate) fn read_batches_consolidated<L>(
1162 persist_cfg: &PersistConfig,
1163 metrics: Arc<Metrics>,
1164 shard_metrics: Arc<ShardMetrics>,
1165 read_metrics: ReadMetrics,
1166 blob: Arc<dyn Blob>,
1167 shard_id: ShardId,
1168 as_of: Antichain<T>,
1169 schemas: Schemas<K, V>,
1170 batches: &[HollowBatch<T>],
1171 lease: L,
1172 should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
1173 memory_budget_bytes: usize,
1174 ) -> Result<Cursor<K, V, T, D, L>, Since<T>> {
1175 let context = format!("{}[as_of={:?}]", shard_id, as_of.elements());
1176 let filter = FetchBatchFilter::Snapshot {
1177 as_of: as_of.clone(),
1178 };
1179
1180 let mut consolidator = Consolidator::new(
1181 context,
1182 FetchConfig::from_persist_config(persist_cfg),
1183 shard_id,
1184 StructuredSort::new(schemas.clone()),
1185 blob,
1186 metrics,
1187 shard_metrics,
1188 read_metrics,
1189 filter,
1190 None,
1191 memory_budget_bytes,
1192 );
1193 for batch in batches {
1194 for (meta, run) in batch.runs() {
1195 consolidator.enqueue_run(
1196 &batch.desc,
1197 meta,
1198 run.into_iter()
1199 .filter(|p| should_fetch_part(p.stats()))
1200 .cloned(),
1201 );
1202 }
1203 }
1204 let max_len = persist_cfg.compaction_yield_after_n_updates;
1208 let max_bytes = BLOB_TARGET_SIZE.get(persist_cfg).max(1);
1209
1210 Ok(Cursor {
1211 consolidator,
1212 max_len,
1213 max_bytes,
1214 _lease: lease,
1215 read_schemas: schemas,
1216 })
1217 }
1218
1219 pub fn snapshot_stats(
1231 &self,
1232 as_of: Option<Antichain<T>>,
1233 ) -> impl Future<Output = Result<SnapshotStats, Since<T>>> + Send + 'static {
1234 let machine = self.machine.clone();
1235 async move {
1236 let batches = match as_of {
1237 Some(as_of) => machine.snapshot(&as_of).await?,
1238 None => machine.applier.all_batches(),
1239 };
1240 let num_updates = batches.iter().map(|b| b.len).sum();
1241 Ok(SnapshotStats {
1242 shard_id: machine.shard_id(),
1243 num_updates,
1244 })
1245 }
1246 }
1247
1248 pub async fn snapshot_parts_stats(
1259 &self,
1260 as_of: Antichain<T>,
1261 ) -> Result<SnapshotPartsStats, Since<T>> {
1262 let batches = self.machine.snapshot(&as_of).await?;
1263 let parts = stream::iter(&batches)
1264 .flat_map(|b| b.part_stream(self.shard_id(), &*self.blob, &*self.metrics))
1265 .map(|p| {
1266 let p = p.expect("live batch");
1267 SnapshotPartStats {
1268 encoded_size_bytes: p.encoded_size_bytes(),
1269 stats: p.stats().cloned(),
1270 }
1271 })
1272 .collect()
1273 .await;
1274 Ok(SnapshotPartsStats {
1275 metrics: Arc::clone(&self.machine.applier.metrics),
1276 shard_id: self.machine.shard_id(),
1277 parts,
1278 })
1279 }
1280}
1281
1282impl<K, V, T, D> ReadHandle<K, V, T, D>
1283where
1284 K: Debug + Codec + Ord,
1285 V: Debug + Codec + Ord,
1286 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
1287 D: Monoid + Codec64 + Send + Sync,
1288{
1289 pub async fn snapshot_and_stream(
1294 &mut self,
1295 as_of: Antichain<T>,
1296 ) -> Result<impl Stream<Item = ((K, V), T, D)> + use<K, V, T, D>, Since<T>> {
1297 let snap = self.snapshot(as_of).await?;
1298
1299 let blob = Arc::clone(&self.blob);
1300 let metrics = Arc::clone(&self.metrics);
1301 let snapshot_metrics = self.metrics.read.snapshot.clone();
1302 let shard_metrics = Arc::clone(&self.machine.applier.shard_metrics);
1303 let reader_id = self.reader_id.clone();
1304 let schemas = self.read_schemas.clone();
1305 let mut schema_cache = self.schema_cache.clone();
1306 let persist_cfg = self.cfg.clone();
1307 let stream = async_stream::stream! {
1308 for part in snap {
1309 let mut fetched_part = fetch_leased_part(
1310 &persist_cfg,
1311 &part,
1312 blob.as_ref(),
1313 Arc::clone(&metrics),
1314 &snapshot_metrics,
1315 &shard_metrics,
1316 &reader_id,
1317 schemas.clone(),
1318 &mut schema_cache,
1319 )
1320 .await;
1321
1322 while let Some(next) = fetched_part.next() {
1323 yield next;
1324 }
1325 }
1326 };
1327
1328 Ok(stream)
1329 }
1330}
1331
1332impl<K, V, T, D> ReadHandle<K, V, T, D>
1333where
1334 K: Debug + Codec + Ord,
1335 V: Debug + Codec + Ord,
1336 T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
1337 D: Monoid + Ord + Codec64 + Send + Sync,
1338{
1339 #[cfg(test)]
1342 #[track_caller]
1343 pub async fn expect_snapshot_and_fetch(&mut self, as_of: T) -> Vec<((K, V), T, D)> {
1344 let mut ret = self
1345 .snapshot_and_fetch(Antichain::from_elem(as_of))
1346 .await
1347 .expect("cannot serve requested as_of");
1348
1349 ret.sort();
1350 ret
1351 }
1352}
1353
1354impl<K: Codec, V: Codec, T, D> Drop for ReadHandle<K, V, T, D> {
1355 fn drop(&mut self) {
1356 self.hold_state.modify(|s| {
1357 s.expired = true;
1358 s.request_sync = true;
1359 });
1360 }
1361}
1362
1363#[cfg(test)]
1364mod tests {
1365 use std::pin;
1366 use std::str::FromStr;
1367
1368 use mz_dyncfg::ConfigUpdates;
1369 use mz_ore::cast::CastFrom;
1370 use mz_ore::metrics::MetricsRegistry;
1371 use mz_persist::mem::{MemBlob, MemBlobConfig, MemConsensus};
1372 use mz_persist::unreliable::{UnreliableConsensus, UnreliableHandle};
1373 use serde::{Deserialize, Serialize};
1374 use serde_json::json;
1375 use tokio_stream::StreamExt;
1376
1377 use crate::async_runtime::IsolatedRuntime;
1378 use crate::batch::BLOB_TARGET_SIZE;
1379 use crate::cache::StateCache;
1380 use crate::internal::metrics::Metrics;
1381 use crate::rpc::NoopPubSubSender;
1382 use crate::tests::{all_ok, new_test_client};
1383 use crate::{Diagnostics, PersistClient, PersistConfig, ShardId};
1384
1385 use super::*;
1386
1387 #[mz_persist_proc::test(tokio::test)]
1389 #[cfg_attr(miri, ignore)] async fn drop_unused_subscribe(dyncfgs: ConfigUpdates) {
1391 let data = [
1392 (("0".to_owned(), "zero".to_owned()), 0, 1),
1393 (("1".to_owned(), "one".to_owned()), 1, 1),
1394 (("2".to_owned(), "two".to_owned()), 2, 1),
1395 ];
1396
1397 let (mut write, read) = new_test_client(&dyncfgs)
1398 .await
1399 .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1400 .await;
1401
1402 write.expect_compare_and_append(&data[0..1], 0, 1).await;
1403 write.expect_compare_and_append(&data[1..2], 1, 2).await;
1404 write.expect_compare_and_append(&data[2..3], 2, 3).await;
1405
1406 let subscribe = read
1407 .subscribe(timely::progress::Antichain::from_elem(2))
1408 .await
1409 .unwrap();
1410 assert!(
1411 !subscribe.snapshot.as_ref().unwrap().is_empty(),
1412 "snapshot must have batches for test to be meaningful"
1413 );
1414 drop(subscribe);
1415 }
1416
1417 #[mz_persist_proc::test(tokio::test)]
1419 #[cfg_attr(miri, ignore)] async fn streaming_consolidate(dyncfgs: ConfigUpdates) {
1421 let data = &[
1422 (("k".to_owned(), "v".to_owned()), 0, 1),
1424 (("k".to_owned(), "v".to_owned()), 1, 1),
1425 (("k".to_owned(), "v".to_owned()), 2, 1),
1426 (("k2".to_owned(), "v".to_owned()), 0, 1),
1428 (("k2".to_owned(), "v".to_owned()), 1, -1),
1429 ];
1430
1431 let (mut write, read) = {
1432 let client = new_test_client(&dyncfgs).await;
1433 client.cfg.set_config(&BLOB_TARGET_SIZE, 1000); client
1435 .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1436 .await
1437 };
1438
1439 write.expect_compare_and_append(data, 0, 5).await;
1440
1441 let mut snapshot = read
1442 .subscribe(timely::progress::Antichain::from_elem(4))
1443 .await
1444 .unwrap();
1445
1446 let mut updates = vec![];
1447 'outer: loop {
1448 for event in snapshot.fetch_next().await {
1449 match event {
1450 ListenEvent::Progress(t) => {
1451 if !t.less_than(&4) {
1452 break 'outer;
1453 }
1454 }
1455 ListenEvent::Updates(data) => {
1456 updates.extend(data);
1457 }
1458 }
1459 }
1460 }
1461 assert_eq!(updates, &[(("k".to_owned(), "v".to_owned()), 4u64, 3i64)],)
1462 }
1463
1464 #[mz_persist_proc::test(tokio::test)]
1465 #[cfg_attr(miri, ignore)] async fn snapshot_and_stream(dyncfgs: ConfigUpdates) {
1467 let data = &mut [
1468 (("k1".to_owned(), "v1".to_owned()), 0, 1),
1469 (("k2".to_owned(), "v2".to_owned()), 1, 1),
1470 (("k3".to_owned(), "v3".to_owned()), 2, 1),
1471 (("k4".to_owned(), "v4".to_owned()), 2, 1),
1472 (("k5".to_owned(), "v5".to_owned()), 3, 1),
1473 ];
1474
1475 let (mut write, mut read) = {
1476 let client = new_test_client(&dyncfgs).await;
1477 client.cfg.set_config(&BLOB_TARGET_SIZE, 0); client
1479 .expect_open::<String, String, u64, i64>(crate::ShardId::new())
1480 .await
1481 };
1482
1483 write.expect_compare_and_append(&data[0..2], 0, 2).await;
1484 write.expect_compare_and_append(&data[2..4], 2, 3).await;
1485 write.expect_compare_and_append(&data[4..], 3, 4).await;
1486
1487 let as_of = Antichain::from_elem(3);
1488 let mut snapshot = pin::pin!(read.snapshot_and_stream(as_of.clone()).await.unwrap());
1489
1490 let mut snapshot_rows = vec![];
1491 while let Some(((k, v), t, d)) = snapshot.next().await {
1492 snapshot_rows.push(((k, v), t, d));
1493 }
1494
1495 for ((_k, _v), t, _d) in data.as_mut_slice() {
1496 t.advance_by(as_of.borrow());
1497 }
1498
1499 assert_eq!(data.as_slice(), snapshot_rows.as_slice());
1500 }
1501
1502 #[mz_persist_proc::test(tokio::test)]
1504 #[cfg_attr(miri, ignore)] async fn seqno_leases(dyncfgs: ConfigUpdates) {
1506 let mut data = vec![];
1507 for i in 0..20 {
1508 data.push(((i.to_string(), i.to_string()), i, 1))
1509 }
1510
1511 let shard_id = ShardId::new();
1512
1513 let client = new_test_client(&dyncfgs).await;
1514 let (mut write, read) = client
1515 .expect_open::<String, String, u64, i64>(shard_id)
1516 .await;
1517
1518 let mut offset = 0;
1520 let mut width = 2;
1521
1522 for i in offset..offset + width {
1523 write
1524 .expect_compare_and_append(
1525 &data[i..i + 1],
1526 u64::cast_from(i),
1527 u64::cast_from(i) + 1,
1528 )
1529 .await;
1530 }
1531 offset += width;
1532
1533 let mut fetcher = client
1535 .create_batch_fetcher::<String, String, u64, i64>(
1536 shard_id,
1537 Default::default(),
1538 Default::default(),
1539 false,
1540 Diagnostics::for_tests(),
1541 )
1542 .await
1543 .unwrap();
1544
1545 let mut subscribe = read
1546 .subscribe(timely::progress::Antichain::from_elem(1))
1547 .await
1548 .expect("cannot serve requested as_of");
1549
1550 let original_seqno_since = subscribe.listen.handle.outstanding_seqno();
1552 if let Some(snapshot) = &subscribe.snapshot {
1553 for part in snapshot {
1554 assert!(
1555 part.lease.seqno() >= original_seqno_since,
1556 "our seqno hold must cover all parts"
1557 );
1558 }
1559 }
1560
1561 let mut parts = vec![];
1562
1563 width = 4;
1564 for i in offset..offset + width {
1566 for event in subscribe.next(None).await {
1567 if let ListenEvent::Updates(mut new_parts) = event {
1568 parts.append(&mut new_parts);
1569 subscribe
1572 .listen
1573 .handle
1574 .downgrade_since(&subscribe.listen.since)
1575 .await;
1576 }
1577 }
1578
1579 write
1580 .expect_compare_and_append(
1581 &data[i..i + 1],
1582 u64::cast_from(i),
1583 u64::cast_from(i) + 1,
1584 )
1585 .await;
1586
1587 assert_eq!(
1589 subscribe.listen.handle.machine.applier.seqno_since(),
1590 original_seqno_since
1591 );
1592 }
1593
1594 offset += width;
1595
1596 let mut seqno_since = subscribe.listen.handle.machine.applier.seqno_since();
1597
1598 assert_eq!(seqno_since, original_seqno_since);
1600
1601 let mut subsequent_parts = vec![];
1604
1605 let mut this_seqno = SeqNo::minimum();
1609
1610 for (mut i, part) in parts.into_iter().enumerate() {
1612 let part_seqno = part.lease.seqno();
1613 let last_seqno = this_seqno;
1614 this_seqno = part_seqno;
1615 assert!(this_seqno >= last_seqno);
1616
1617 let (part, lease) = part.into_exchangeable_part();
1618 let _ = fetcher.fetch_leased_part(part).await;
1619 drop(lease);
1620
1621 for event in subscribe.next(None).await {
1623 if let ListenEvent::Updates(parts) = event {
1624 for part in parts {
1625 let (_, lease) = part.into_exchangeable_part();
1626 subsequent_parts.push(lease);
1627 }
1628 }
1629 }
1630
1631 subscribe
1632 .listen
1633 .handle
1634 .downgrade_since(&subscribe.listen.since)
1635 .await;
1636
1637 i += offset;
1639 write
1640 .expect_compare_and_append(
1641 &data[i..i + 1],
1642 u64::cast_from(i),
1643 u64::cast_from(i) + 1,
1644 )
1645 .await;
1646
1647 let expect_downgrade = subscribe.listen.handle.outstanding_seqno() > part_seqno;
1650
1651 let new_seqno_since = subscribe.listen.handle.machine.applier.seqno_since();
1652 if expect_downgrade {
1653 assert!(new_seqno_since > seqno_since);
1654 } else {
1655 assert_eq!(new_seqno_since, seqno_since);
1656 }
1657 seqno_since = new_seqno_since;
1658 }
1659
1660 assert!(seqno_since > original_seqno_since);
1662
1663 drop(subsequent_parts);
1665 drop(subscribe);
1666 }
1667
1668 #[mz_ore::test]
1669 fn reader_id_human_readable_serde() {
1670 #[derive(Debug, Serialize, Deserialize)]
1671 struct Container {
1672 reader_id: LeasedReaderId,
1673 }
1674
1675 let id =
1677 LeasedReaderId::from_str("r00000000-1234-5678-0000-000000000000").expect("valid id");
1678 assert_eq!(
1679 id,
1680 serde_json::from_value(serde_json::to_value(id.clone()).expect("serializable"))
1681 .expect("deserializable")
1682 );
1683
1684 assert_eq!(
1686 id,
1687 serde_json::from_str("\"r00000000-1234-5678-0000-000000000000\"")
1688 .expect("deserializable")
1689 );
1690
1691 let json = json!({ "reader_id": id });
1693 assert_eq!(
1694 "{\"reader_id\":\"r00000000-1234-5678-0000-000000000000\"}",
1695 &json.to_string()
1696 );
1697 let container: Container = serde_json::from_value(json).expect("deserializable");
1698 assert_eq!(container.reader_id, id);
1699 }
1700
1701 #[mz_ore::test(tokio::test)]
1705 #[cfg_attr(miri, ignore)] async fn skip_consensus_fetch_optimization() {
1707 let data = vec![
1708 (("0".to_owned(), "zero".to_owned()), 0, 1),
1709 (("1".to_owned(), "one".to_owned()), 1, 1),
1710 (("2".to_owned(), "two".to_owned()), 2, 1),
1711 ];
1712
1713 let cfg = PersistConfig::new_for_tests();
1714 let blob = Arc::new(MemBlob::open(MemBlobConfig::default()));
1715 let consensus = Arc::new(MemConsensus::default());
1716 let unreliable = UnreliableHandle::default();
1717 unreliable.totally_available();
1718 let consensus = Arc::new(UnreliableConsensus::new(consensus, unreliable.clone()));
1719 let metrics = Arc::new(Metrics::new(&cfg, &MetricsRegistry::new()));
1720 let pubsub_sender = Arc::new(NoopPubSubSender);
1721 let (mut write, mut read) = PersistClient::new(
1722 cfg,
1723 blob,
1724 consensus,
1725 metrics,
1726 Arc::new(IsolatedRuntime::new_for_tests()),
1727 Arc::new(StateCache::new_no_metrics()),
1728 pubsub_sender,
1729 )
1730 .expect("client construction failed")
1731 .expect_open::<String, String, u64, i64>(ShardId::new())
1732 .await;
1733
1734 write.expect_compare_and_append(&data[0..1], 0, 1).await;
1735 write.expect_compare_and_append(&data[1..2], 1, 2).await;
1736 write.expect_compare_and_append(&data[2..3], 2, 3).await;
1737
1738 let snapshot = read.expect_snapshot_and_fetch(2).await;
1739 let mut listen = read.expect_listen(0).await;
1740
1741 let listen_actual = listen.fetch_next().await;
1746 let expected_events = vec![ListenEvent::Progress(Antichain::from_elem(1))];
1747 assert_eq!(listen_actual, expected_events);
1748
1749 unreliable.totally_unavailable();
1752 assert_eq!(snapshot, all_ok(&data, 2));
1753 assert_eq!(
1754 listen.read_until(&3).await,
1755 (all_ok(&data[1..], 1), Antichain::from_elem(3))
1756 );
1757 }
1758}