1use std::cmp::{max, min};
13use std::collections::{BTreeMap, VecDeque};
14use std::fmt::Debug;
15use std::ops::{Deref, DerefMut};
16use std::sync::Arc;
17
18use differential_dataflow::hashable::Hashable;
19use differential_dataflow::lattice::Lattice;
20use itertools::Itertools;
21use mz_ore::cast::CastFrom;
22use mz_ore::collections::HashMap;
23use mz_ore::instrument;
24use mz_persist_client::cfg::USE_CRITICAL_SINCE_TXN;
25use mz_persist_client::fetch::LeasedBatchPart;
26use mz_persist_client::metrics::encode_ts_metric;
27use mz_persist_client::read::{ListenEvent, ReadHandle, Subscribe};
28use mz_persist_client::write::WriteHandle;
29use mz_persist_client::{Diagnostics, PersistClient, ShardId};
30use mz_persist_types::txn::{TxnsCodec, TxnsEntry};
31use mz_persist_types::{Codec64, StepForward};
32use timely::order::TotalOrder;
33use timely::progress::{Antichain, Timestamp};
34use tracing::debug;
35
36use crate::TxnsCodecDefault;
37use crate::metrics::Metrics;
38use crate::txn_read::{DataListenNext, DataRemapEntry, DataSnapshot, DataSubscribe};
39
40#[derive(Debug)]
80pub struct TxnsCacheState<T> {
81 txns_id: ShardId,
82 pub(crate) init_ts: T,
88 pub(crate) progress_exclusive: T,
90
91 next_batch_id: usize,
92 pub(crate) unapplied_batches: BTreeMap<usize, (ShardId, Vec<u8>, T)>,
99 batch_idx: HashMap<Vec<u8>, usize>,
101 pub(crate) datas: BTreeMap<ShardId, DataTimes<T>>,
106 pub(crate) unapplied_registers: VecDeque<(ShardId, T)>,
110
111 only_data_id: Option<ShardId>,
118}
119
120#[derive(Debug)]
122pub struct TxnsCache<T, C: TxnsCodec = TxnsCodecDefault> {
123 pub(crate) txns_subscribe: Subscribe<C::Key, C::Val, T, i64>,
125 pub(crate) buf: Vec<(TxnsEntry, T, i64)>,
127 state: TxnsCacheState<T>,
128}
129
130impl<T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync> TxnsCacheState<T> {
131 fn new(txns_id: ShardId, init_ts: T, only_data_id: Option<ShardId>) -> Self {
135 TxnsCacheState {
136 txns_id,
137 init_ts,
138 progress_exclusive: T::minimum(),
139 next_batch_id: 0,
140 unapplied_batches: BTreeMap::new(),
141 batch_idx: HashMap::new(),
142 datas: BTreeMap::new(),
143 unapplied_registers: VecDeque::new(),
144 only_data_id,
145 }
146 }
147
148 pub(crate) async fn init<C: TxnsCodec>(
152 only_data_id: Option<ShardId>,
153 txns_read: ReadHandle<C::Key, C::Val, T, i64>,
154 ) -> (Self, Subscribe<C::Key, C::Val, T, i64>) {
155 let txns_id = txns_read.shard_id();
156 let as_of = txns_read.since().clone();
157 let since_ts = as_of.as_option().expect("txns shard is not closed").clone();
158 let mut txns_subscribe = txns_read
159 .subscribe(as_of)
160 .await
161 .expect("handle holds a capability");
162 let mut state = Self::new(txns_id, since_ts.clone(), only_data_id.clone());
163 let mut buf = Vec::new();
164 TxnsCache::<T, C>::update(
167 &mut state,
168 &mut txns_subscribe,
169 &mut buf,
170 only_data_id,
171 |progress_exclusive| progress_exclusive >= &since_ts,
172 )
173 .await;
174 debug_assert_eq!(state.validate(), Ok(()));
175 (state, txns_subscribe)
176 }
177
178 pub fn txns_id(&self) -> ShardId {
180 self.txns_id
181 }
182
183 pub fn registered_at_progress(&self, data_id: &ShardId, ts: &T) -> bool {
197 self.assert_only_data_id(data_id);
198 assert_eq!(self.progress_exclusive, *ts);
199 let Some(data_times) = self.datas.get(data_id) else {
200 return false;
201 };
202 data_times.last_reg().forget_ts.is_none()
203 }
204
205 pub(crate) fn all_registered_at_progress(&self, ts: &T) -> Vec<ShardId> {
208 assert_eq!(self.only_data_id, None);
209 assert_eq!(self.progress_exclusive, *ts);
210 self.datas
211 .iter()
212 .filter(|(_, data_times)| data_times.last_reg().forget_ts.is_none())
213 .map(|(data_id, _)| *data_id)
214 .collect()
215 }
216
217 pub fn data_snapshot(&self, data_id: ShardId, as_of: T) -> DataSnapshot<T> {
226 self.assert_only_data_id(&data_id);
227 assert!(self.progress_exclusive > as_of);
228 let empty_to = max(as_of.step_forward(), self.init_ts.clone());
237 let Some(all) = self.datas.get(&data_id) else {
238 return DataSnapshot {
241 data_id,
242 latest_write: None,
243 as_of,
244 empty_to,
245 };
246 };
247
248 let min_unapplied_ts = self
249 .unapplied_batches
250 .first_key_value()
251 .map(|(_, (_, _, ts))| ts)
252 .unwrap_or(&self.progress_exclusive);
253 let latest_write = all
254 .writes
255 .iter()
256 .rev()
257 .find(|x| **x <= as_of && *x >= min_unapplied_ts)
258 .cloned();
259
260 debug!(
261 "data_snapshot {:.9} latest_write={:?} as_of={:?} empty_to={:?}: all={:?}",
262 data_id.to_string(),
263 latest_write,
264 as_of,
265 empty_to,
266 all,
267 );
268 let ret = DataSnapshot {
269 data_id: data_id.clone(),
270 latest_write,
271 as_of,
272 empty_to,
273 };
274 assert_eq!(ret.validate(), Ok(()));
275 ret
276 }
277
278 pub fn data_listen_next(&self, data_id: &ShardId, ts: &T) -> DataListenNext<T> {
292 self.assert_only_data_id(data_id);
293 assert!(
294 &self.progress_exclusive >= ts,
295 "ts {:?} is past progress_exclusive {:?}",
296 ts,
297 self.progress_exclusive
298 );
299 assert!(
316 ts >= &self.init_ts,
317 "ts {:?} is not past initial since {:?}",
318 ts,
319 self.init_ts
320 );
321 use DataListenNext::*;
322 let data_times = self.datas.get(data_id);
323 debug!(
324 "data_listen_next {:.9} {:?}: progress={:?} times={:?}",
325 data_id.to_string(),
326 ts,
327 self.progress_exclusive,
328 data_times,
329 );
330 let Some(data_times) = data_times else {
331 if ts < &self.progress_exclusive {
335 return ReadDataTo(self.progress_exclusive.clone());
336 } else {
337 return WaitForTxnsProgress;
338 }
339 };
340 let physical_ts = data_times.latest_physical_ts();
341 let last_reg = data_times.last_reg();
342 if ts >= &self.progress_exclusive {
343 WaitForTxnsProgress
345 } else if ts <= physical_ts {
346 ReadDataTo(physical_ts.step_forward())
348 } else if last_reg.forget_ts.is_none() {
349 assert!(last_reg.contains(ts));
353 EmitLogicalProgress(self.progress_exclusive.clone())
354 } else {
355 assert!(ts > &last_reg.register_ts && last_reg.forget_ts.is_some());
360 ReadDataTo(self.progress_exclusive.clone())
361 }
362 }
363
364 pub(crate) fn data_subscribe(&self, data_id: ShardId, as_of: T) -> DataSubscribe<T> {
369 self.assert_only_data_id(&data_id);
370 assert!(self.progress_exclusive > as_of);
371 let snapshot = self.data_snapshot(data_id, as_of);
372 let remap = DataRemapEntry {
373 physical_upper: snapshot.empty_to.clone(),
374 logical_upper: snapshot.empty_to.clone(),
375 };
376 DataSubscribe {
377 data_id,
378 snapshot: Some(snapshot),
379 remap,
380 }
381 }
382
383 pub fn min_unapplied_ts(&self) -> &T {
385 assert_eq!(self.only_data_id, None);
386 self.min_unapplied_ts_inner()
387 }
388
389 fn min_unapplied_ts_inner(&self) -> &T {
390 let min_batch_ts = self
393 .unapplied_batches
394 .first_key_value()
395 .map(|(_, (_, _, ts))| ts)
396 .unwrap_or(&self.progress_exclusive);
400 let min_register_ts = self
401 .unapplied_registers
402 .front()
403 .map(|(_, ts)| ts)
404 .unwrap_or(&self.progress_exclusive);
405
406 min(min_batch_ts, min_register_ts)
407 }
408
409 pub(crate) fn unapplied(&self) -> impl Iterator<Item = (&ShardId, Unapplied, &T)> {
411 assert_eq!(self.only_data_id, None);
412 let registers = self
413 .unapplied_registers
414 .iter()
415 .map(|(data_id, ts)| (data_id, Unapplied::RegisterForget, ts));
416 let batches = self
417 .unapplied_batches
418 .values()
419 .fold(
420 BTreeMap::new(),
421 |mut accum: BTreeMap<_, Vec<_>>, (data_id, batch, ts)| {
422 accum.entry((ts, data_id)).or_default().push(batch);
423 accum
424 },
425 )
426 .into_iter()
427 .map(|((ts, data_id), batches)| (data_id, Unapplied::Batch(batches), ts));
428 registers.merge_by(batches, |(_, _, ts1), (_, _, ts2)| ts1 <= ts2)
434 }
435
436 pub(crate) fn filter_retractions<'a>(
450 &'a self,
451 expected_txns_upper: &T,
452 retractions: impl Iterator<Item = (&'a Vec<u8>, &'a ([u8; 8], ShardId))>,
453 ) -> impl Iterator<Item = (&'a Vec<u8>, &'a ([u8; 8], ShardId))> {
454 assert_eq!(self.only_data_id, None);
455 assert!(&self.progress_exclusive >= expected_txns_upper);
456 retractions.filter(|(batch_raw, _)| self.batch_idx.contains_key(*batch_raw))
457 }
458
459 pub(crate) fn push_entries(&mut self, mut entries: Vec<(TxnsEntry, T, i64)>, progress: T) {
461 entries.sort_by(|(a, _, _), (b, _, _)| a.ts::<T>().cmp(&b.ts::<T>()));
466 for (e, t, d) in entries {
467 match e {
468 TxnsEntry::Register(data_id, ts) => {
469 let ts = T::decode(ts);
470 debug_assert!(ts <= t);
471 self.push_register(data_id, ts, d, t);
472 }
473 TxnsEntry::Append(data_id, ts, batch) => {
474 let ts = T::decode(ts);
475 debug_assert!(ts <= t);
476 self.push_append(data_id, batch, ts, d)
477 }
478 }
479 }
480 self.progress_exclusive = progress;
481 debug_assert_eq!(self.validate(), Ok(()));
482 }
483
484 fn push_register(&mut self, data_id: ShardId, ts: T, diff: i64, compacted_ts: T) {
485 self.assert_only_data_id(&data_id);
486 debug_assert!(ts >= self.progress_exclusive || diff < 0);
489 if let Some(only_data_id) = self.only_data_id.as_ref() {
490 if only_data_id != &data_id {
491 return;
492 }
493 }
494
495 if ts == compacted_ts {
497 self.unapplied_registers.push_back((data_id, ts.clone()));
498 }
499
500 if diff == 1 {
501 debug!(
502 "cache learned {:.9} registered t={:?}",
503 data_id.to_string(),
504 ts
505 );
506 let entry = self.datas.entry(data_id).or_default();
507 if let Some(last_reg) = entry.registered.back() {
510 assert!(last_reg.forget_ts.is_some())
511 }
512 entry.registered.push_back(DataRegistered {
513 register_ts: ts,
514 forget_ts: None,
515 });
516 } else if diff == -1 {
517 debug!(
518 "cache learned {:.9} forgotten t={:?}",
519 data_id.to_string(),
520 ts
521 );
522 let active_reg = self
523 .datas
524 .get_mut(&data_id)
525 .and_then(|x| x.registered.back_mut())
526 .expect("data shard should be registered before forget");
527 assert_eq!(active_reg.forget_ts.replace(ts), None);
528 } else {
529 unreachable!("only +1/-1 diffs are used");
530 }
531 debug_assert_eq!(self.validate(), Ok(()));
532 }
533
534 fn push_append(&mut self, data_id: ShardId, batch: Vec<u8>, ts: T, diff: i64) {
535 self.assert_only_data_id(&data_id);
536 debug_assert!(ts >= self.progress_exclusive || diff < 0);
539 if let Some(only_data_id) = self.only_data_id.as_ref() {
540 if only_data_id != &data_id {
541 return;
542 }
543 }
544
545 if diff == 1 {
546 debug!(
547 "cache learned {:.9} committed t={:?} b={}",
548 data_id.to_string(),
549 ts,
550 batch.hashed(),
551 );
552 let idx = self.next_batch_id;
553 self.next_batch_id += 1;
554 let prev = self.batch_idx.insert(batch.clone(), idx);
555 assert_eq!(prev, None);
556 let prev = self
557 .unapplied_batches
558 .insert(idx, (data_id, batch, ts.clone()));
559 assert_eq!(prev, None);
560 let times = self.datas.get_mut(&data_id).expect("data is initialized");
561 assert_eq!(times.last_reg().forget_ts, None);
563
564 if times.writes.back() != Some(&ts) {
568 times.writes.push_back(ts);
569 }
570 } else if diff == -1 {
571 debug!(
572 "cache learned {:.9} applied t={:?} b={}",
573 data_id.to_string(),
574 ts,
575 batch.hashed(),
576 );
577 let idx = self
578 .batch_idx
579 .remove(&batch)
580 .expect("invariant violation: batch should exist");
581 let prev = self
582 .unapplied_batches
583 .remove(&idx)
584 .expect("invariant violation: batch index should exist");
585 debug_assert_eq!(data_id, prev.0);
586 debug_assert_eq!(batch, prev.1);
587 debug_assert!(prev.2 <= ts);
589 } else {
590 unreachable!("only +1/-1 diffs are used");
591 }
592 self.compact_data_times(&data_id);
593 debug_assert_eq!(self.validate(), Ok(()));
594 }
595
596 pub(crate) fn mark_register_applied(&mut self, ts: &T) {
599 self.unapplied_registers
600 .retain(|(_, register_ts)| ts < register_ts);
601 debug_assert_eq!(self.validate(), Ok(()));
602 }
603
604 fn compact_data_times(&mut self, data_id: &ShardId) {
613 let Some(times) = self.datas.get_mut(data_id) else {
614 return;
615 };
616
617 debug!("cache compact {:.9} times={:?}", data_id.to_string(), times);
618
619 if let Some(unapplied_write_ts) = self
620 .unapplied_batches
621 .first_key_value()
622 .map(|(_, (_, _, ts))| ts)
623 {
624 debug!(
625 "cache compact {:.9} unapplied_write_ts={:?}",
626 data_id.to_string(),
627 unapplied_write_ts,
628 );
629 while let Some(write_ts) = times.writes.front() {
630 if times.writes.len() == 1 || write_ts >= unapplied_write_ts {
631 break;
632 }
633 times.writes.pop_front();
634 }
635 } else {
636 times.writes.drain(..times.writes.len() - 1);
637 }
638 let unapplied_reg_ts = self.unapplied_registers.front().map(|(_, ts)| ts);
639 let min_write_ts = times.writes.front();
640 let min_reg_ts = [unapplied_reg_ts, min_write_ts].into_iter().flatten().min();
641 if let Some(min_reg_ts) = min_reg_ts {
642 debug!(
643 "cache compact {:.9} unapplied_reg_ts={:?} min_write_ts={:?} min_reg_ts={:?}",
644 data_id.to_string(),
645 unapplied_reg_ts,
646 min_write_ts,
647 min_reg_ts,
648 );
649 while let Some(reg) = times.registered.front() {
650 match ®.forget_ts {
651 Some(forget_ts) if forget_ts >= min_reg_ts => break,
652 _ if times.registered.len() == 1 => break,
653 _ => {
654 assert!(
655 reg.forget_ts.is_some(),
656 "only the latest reg can have no forget ts"
657 );
658 times.registered.pop_front();
659 }
660 }
661 }
662 } else {
663 times.registered.drain(..times.registered.len() - 1);
664 }
665
666 debug!(
667 "cache compact DONE {:.9} times={:?}",
668 data_id.to_string(),
669 times
670 );
671 }
672
673 pub(crate) fn update_gauges(&self, metrics: &Metrics) {
674 metrics
675 .data_shard_count
676 .set(u64::cast_from(self.datas.len()));
677 metrics
678 .batches
679 .unapplied_count
680 .set(u64::cast_from(self.unapplied_batches.len()));
681 let unapplied_batches_bytes = self
682 .unapplied_batches
683 .values()
684 .map(|(_, x, _)| x.len())
685 .sum::<usize>();
686 metrics
687 .batches
688 .unapplied_bytes
689 .set(u64::cast_from(unapplied_batches_bytes));
690 metrics
691 .batches
692 .unapplied_min_ts
693 .set(encode_ts_metric(&Antichain::from_elem(
694 self.min_unapplied_ts().clone(),
695 )));
696 }
697
698 fn assert_only_data_id(&self, data_id: &ShardId) {
699 if let Some(only_data_id) = self.only_data_id.as_ref() {
700 assert_eq!(data_id, only_data_id);
701 }
702 }
703
704 pub(crate) fn validate(&self) -> Result<(), String> {
705 if self.batch_idx.len() != self.unapplied_batches.len() {
707 return Err(format!(
708 "expected index len {} to match what it's indexing {}",
709 self.batch_idx.len(),
710 self.unapplied_batches.len()
711 ));
712 }
713
714 let mut prev_batch_ts = T::minimum();
715 for (idx, (_, batch, ts)) in self.unapplied_batches.iter() {
716 if self.batch_idx.get(batch) != Some(idx) {
717 return Err(format!(
718 "expected batch to be indexed at {} got {:?}",
719 idx,
720 self.batch_idx.get(batch)
721 ));
722 }
723 if ts < &prev_batch_ts {
724 return Err(format!(
725 "unapplied batch timestamp {:?} out of order after {:?}",
726 ts, prev_batch_ts
727 ));
728 }
729 prev_batch_ts = ts.clone();
730 }
731
732 let mut prev_register_ts = T::minimum();
734 for (_, ts) in self.unapplied_registers.iter() {
735 if ts < &prev_register_ts {
736 return Err(format!(
737 "unapplied register timestamp {:?} out of order after {:?}",
738 ts, prev_register_ts
739 ));
740 }
741 prev_register_ts = ts.clone();
742 }
743
744 let min_unapplied_ts = self.min_unapplied_ts_inner();
745
746 for (data_id, data_times) in self.datas.iter() {
747 let () = data_times.validate()?;
748
749 if let Some(ts) = data_times.writes.front() {
750 if min_unapplied_ts > ts && data_times.writes.len() > 1 {
752 return Err(format!(
753 "{:?} write ts {:?} not past min unapplied ts {:?}",
754 data_id, ts, min_unapplied_ts
755 ));
756 }
757 }
758
759 if let Some((_, (_, _, unapplied_ts))) = self
761 .unapplied_batches
762 .iter()
763 .find(|(_, (shard_id, _, _))| shard_id == data_id)
764 {
765 if let Some(write_ts) = data_times.writes.front() {
766 if write_ts > unapplied_ts {
767 return Err(format!(
768 "{:?} min write ts {:?} past min unapplied batch ts {:?}",
769 data_id, write_ts, unapplied_ts
770 ));
771 }
772 }
773 }
774
775 if let Some((_, unapplied_ts)) = self
777 .unapplied_registers
778 .iter()
779 .find(|(shard_id, _)| shard_id == data_id)
780 {
781 let register_ts = &data_times.first_reg().register_ts;
782 if register_ts > unapplied_ts {
783 return Err(format!(
784 "{:?} min register ts {:?} past min unapplied register ts {:?}",
785 data_id, register_ts, unapplied_ts
786 ));
787 }
788 }
789 }
790
791 Ok(())
792 }
793}
794
795impl<T, C> TxnsCache<T, C>
796where
797 T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
798 C: TxnsCodec,
799{
800 pub(crate) async fn init(
803 init_ts: T,
804 txns_read: ReadHandle<C::Key, C::Val, T, i64>,
805 txns_write: &mut WriteHandle<C::Key, C::Val, T, i64>,
806 ) -> Self {
807 let () = crate::empty_caa(|| "txns init", txns_write, init_ts.clone()).await;
808 let mut ret = Self::from_read(txns_read, None).await;
809 let _ = ret.update_gt(&init_ts).await;
810 ret
811 }
812
813 pub async fn open(
819 client: &PersistClient,
820 txns_id: ShardId,
821 only_data_id: Option<ShardId>,
822 ) -> Self {
823 let (txns_key_schema, txns_val_schema) = C::schemas();
824 let txns_read = client
825 .open_leased_reader(
826 txns_id,
827 Arc::new(txns_key_schema),
828 Arc::new(txns_val_schema),
829 Diagnostics {
830 shard_name: "txns".to_owned(),
831 handle_purpose: "read txns".to_owned(),
832 },
833 USE_CRITICAL_SINCE_TXN.get(client.dyncfgs()),
834 )
835 .await
836 .expect("txns schema shouldn't change");
837 Self::from_read(txns_read, only_data_id).await
838 }
839
840 async fn from_read(
841 txns_read: ReadHandle<C::Key, C::Val, T, i64>,
842 only_data_id: Option<ShardId>,
843 ) -> Self {
844 let (state, txns_subscribe) = TxnsCacheState::init::<C>(only_data_id, txns_read).await;
845 TxnsCache {
846 txns_subscribe,
847 buf: Vec::new(),
848 state,
849 }
850 }
851
852 #[must_use]
856 #[instrument(level = "debug", fields(ts = ?ts))]
857 pub async fn update_gt(&mut self, ts: &T) -> &T {
858 let only_data_id = self.only_data_id.clone();
859 Self::update(
860 &mut self.state,
861 &mut self.txns_subscribe,
862 &mut self.buf,
863 only_data_id,
864 |progress_exclusive| progress_exclusive > ts,
865 )
866 .await;
867 debug_assert!(&self.progress_exclusive > ts);
868 debug_assert_eq!(self.validate(), Ok(()));
869 &self.progress_exclusive
870 }
871
872 #[must_use]
876 #[instrument(level = "debug", fields(ts = ?ts))]
877 pub async fn update_ge(&mut self, ts: &T) -> &T {
878 let only_data_id = self.only_data_id.clone();
879 Self::update(
880 &mut self.state,
881 &mut self.txns_subscribe,
882 &mut self.buf,
883 only_data_id,
884 |progress_exclusive| progress_exclusive >= ts,
885 )
886 .await;
887 debug_assert!(&self.progress_exclusive >= ts);
888 debug_assert_eq!(self.validate(), Ok(()));
889 &self.progress_exclusive
890 }
891
892 async fn update<F: Fn(&T) -> bool>(
894 state: &mut TxnsCacheState<T>,
895 txns_subscribe: &mut Subscribe<C::Key, C::Val, T, i64>,
896 buf: &mut Vec<(TxnsEntry, T, i64)>,
897 only_data_id: Option<ShardId>,
898 done: F,
899 ) {
900 while !done(&state.progress_exclusive) {
901 let events = txns_subscribe.next(None).await;
902 for event in events {
903 match event {
904 ListenEvent::Progress(frontier) => {
905 let progress = frontier
906 .into_option()
907 .expect("nothing should close the txns shard");
908 state.push_entries(std::mem::take(buf), progress);
909 }
910 ListenEvent::Updates(parts) => {
911 Self::fetch_parts(only_data_id, txns_subscribe, parts, buf).await;
912 }
913 };
914 }
915 }
916 debug_assert_eq!(state.validate(), Ok(()));
917 debug!(
918 "cache correct before {:?} len={} least_ts={:?}",
919 state.progress_exclusive,
920 state.unapplied_batches.len(),
921 state
922 .unapplied_batches
923 .first_key_value()
924 .map(|(_, (_, _, ts))| ts),
925 );
926 }
927
928 pub(crate) async fn fetch_parts(
929 only_data_id: Option<ShardId>,
930 txns_subscribe: &mut Subscribe<C::Key, C::Val, T, i64>,
931 parts: Vec<LeasedBatchPart<T>>,
932 updates: &mut Vec<(TxnsEntry, T, i64)>,
933 ) {
934 for part in parts {
940 let should_fetch_part = Self::should_fetch_part(only_data_id.as_ref(), &part);
941 debug!(
942 "should_fetch_part={} for {:?} {:?}",
943 should_fetch_part,
944 only_data_id,
945 part.stats()
946 );
947 if !should_fetch_part {
948 drop(part);
949 continue;
950 }
951 let part_updates = txns_subscribe.fetch_batch_part(part).await;
952 let part_updates = part_updates.map(|((k, v), t, d)| {
953 let (k, v) = (k.expect("valid key"), v.expect("valid val"));
954 (C::decode(k, v), t, d)
955 });
956 if let Some(only_data_id) = only_data_id.as_ref() {
957 updates.extend(part_updates.filter(|(x, _, _)| x.data_id() == only_data_id));
958 } else {
959 updates.extend(part_updates);
960 }
961 }
962 }
963
964 fn should_fetch_part(only_data_id: Option<&ShardId>, part: &LeasedBatchPart<T>) -> bool {
965 let Some(only_data_id) = only_data_id else {
966 return true;
967 };
968 let Some(stats) = part.stats() else {
971 return true;
972 };
973 C::should_fetch_part(only_data_id, &stats).unwrap_or(true)
974 }
975}
976
977impl<T, C: TxnsCodec> Deref for TxnsCache<T, C> {
978 type Target = TxnsCacheState<T>;
979 fn deref(&self) -> &Self::Target {
980 &self.state
981 }
982}
983
984impl<T, C: TxnsCodec> DerefMut for TxnsCache<T, C> {
985 fn deref_mut(&mut self) -> &mut Self::Target {
986 &mut self.state
987 }
988}
989
990#[derive(Debug)]
991pub(crate) struct DataTimes<T> {
992 pub(crate) registered: VecDeque<DataRegistered<T>>,
1001 pub(crate) writes: VecDeque<T>,
1003}
1004
1005impl<T> Default for DataTimes<T> {
1006 fn default() -> Self {
1007 Self {
1008 registered: Default::default(),
1009 writes: Default::default(),
1010 }
1011 }
1012}
1013
1014#[derive(Debug)]
1015pub(crate) struct DataRegistered<T> {
1016 pub(crate) register_ts: T,
1021 pub(crate) forget_ts: Option<T>,
1024}
1025
1026impl<T: Timestamp + TotalOrder> DataRegistered<T> {
1027 pub(crate) fn contains(&self, ts: &T) -> bool {
1028 &self.register_ts <= ts && self.forget_ts.as_ref().map_or(true, |x| ts <= x)
1029 }
1030}
1031
1032impl<T: Timestamp + TotalOrder> DataTimes<T> {
1033 pub(crate) fn last_reg(&self) -> &DataRegistered<T> {
1034 self.registered.back().expect("at least one registration")
1035 }
1036
1037 fn first_reg(&self) -> &DataRegistered<T> {
1038 self.registered.front().expect("at least one registration")
1039 }
1040
1041 fn latest_physical_ts(&self) -> &T {
1043 let last_reg = self.last_reg();
1044 let mut physical_ts = &last_reg.register_ts;
1045 if let Some(forget_ts) = &last_reg.forget_ts {
1046 physical_ts = max(physical_ts, forget_ts);
1047 }
1048 if let Some(latest_write) = self.writes.back() {
1049 physical_ts = max(physical_ts, latest_write);
1050 }
1051 physical_ts
1052 }
1053
1054 pub(crate) fn validate(&self) -> Result<(), String> {
1055 let mut prev_ts = T::minimum();
1057 for ts in self.writes.iter() {
1058 if ts < &prev_ts {
1059 return Err(format!(
1060 "write ts {:?} out of order after {:?}",
1061 ts, prev_ts
1062 ));
1063 }
1064 prev_ts = ts.clone();
1065 }
1066
1067 let mut prev_ts = T::minimum();
1069 let mut writes_idx = 0;
1070 for x in self.registered.iter() {
1071 if x.register_ts < prev_ts {
1072 return Err(format!(
1073 "register ts {:?} out of order after {:?}",
1074 x.register_ts, prev_ts
1075 ));
1076 }
1077 if let Some(forget_ts) = x.forget_ts.as_ref() {
1078 if !(&x.register_ts <= forget_ts) {
1079 return Err(format!(
1080 "register ts {:?} not less_equal forget ts {:?}",
1081 x.register_ts, forget_ts
1082 ));
1083 }
1084 prev_ts.clone_from(forget_ts);
1085 }
1086 while let Some(write_ts) = self.writes.get(writes_idx) {
1088 if write_ts < &x.register_ts {
1089 return Err(format!(
1090 "write ts {:?} not in any register interval {:?}",
1091 write_ts, self.registered
1092 ));
1093 }
1094 if let Some(forget_ts) = x.forget_ts.as_ref() {
1095 if write_ts <= forget_ts {
1096 writes_idx += 1;
1097 continue;
1098 }
1099 }
1100 break;
1101 }
1102 }
1103
1104 let Some(reg_back) = self.registered.back() else {
1106 return Err("registered was empty".into());
1107 };
1108 if writes_idx != self.writes.len() && reg_back.forget_ts.is_some() {
1109 return Err(format!(
1110 "write ts {:?} not in any register interval {:?}",
1111 self.writes, self.registered
1112 ));
1113 }
1114
1115 Ok(())
1116 }
1117}
1118
1119#[derive(Debug)]
1120pub(crate) enum Unapplied<'a> {
1121 RegisterForget,
1122 Batch(Vec<&'a Vec<u8>>),
1123}
1124
1125#[cfg(test)]
1126mod tests {
1127 use DataListenNext::*;
1128 use mz_ore::assert_err;
1129 use mz_persist_client::PersistClient;
1130 use mz_persist_types::codec_impls::{ShardIdSchema, VecU8Schema};
1131
1132 use crate::operator::DataSubscribe;
1133 use crate::tests::reader;
1134 use crate::txns::TxnsHandle;
1135
1136 use super::*;
1137
1138 impl TxnsCache<u64, TxnsCodecDefault> {
1139 pub(crate) async fn expect_open(
1140 init_ts: u64,
1141 txns: &TxnsHandle<String, (), u64, i64>,
1142 ) -> Self {
1143 let mut ret = TxnsCache::open(&txns.datas.client, txns.txns_id(), None).await;
1144 let _ = ret.update_gt(&init_ts).await;
1145 ret
1146 }
1147
1148 pub(crate) async fn expect_snapshot(
1149 &mut self,
1150 client: &PersistClient,
1151 data_id: ShardId,
1152 as_of: u64,
1153 ) -> Vec<String> {
1154 let mut data_read = reader(client, data_id).await;
1155 let _ = self.update_gt(&as_of).await;
1156 let mut snapshot = self
1157 .data_snapshot(data_read.shard_id(), as_of)
1158 .snapshot_and_fetch(&mut data_read)
1159 .await
1160 .unwrap();
1161 snapshot.sort();
1162 snapshot
1163 .into_iter()
1164 .flat_map(|((k, v), _t, d)| {
1165 let (k, ()) = (k.unwrap(), v.unwrap());
1166 std::iter::repeat(k).take(usize::try_from(d).unwrap())
1167 })
1168 .collect()
1169 }
1170
1171 pub(crate) fn expect_subscribe(
1172 &self,
1173 client: &PersistClient,
1174 data_id: ShardId,
1175 as_of: u64,
1176 ) -> DataSubscribe {
1177 DataSubscribe::new(
1178 "test",
1179 client.clone(),
1180 self.txns_id,
1181 data_id,
1182 as_of,
1183 Antichain::new(),
1184 true,
1185 )
1186 }
1187 }
1188
1189 #[mz_ore::test]
1190 fn txns_cache_data_snapshot_and_listen_next() {
1191 let d0 = ShardId::new();
1192 let ds = |latest_write: Option<u64>, as_of: u64, empty_to: u64| -> DataSnapshot<u64> {
1193 DataSnapshot {
1194 data_id: d0,
1195 latest_write,
1196 as_of,
1197 empty_to,
1198 }
1199 };
1200 #[track_caller]
1201 fn testcase(
1202 cache: &mut TxnsCacheState<u64>,
1203 ts: u64,
1204 data_id: ShardId,
1205 snap_expected: DataSnapshot<u64>,
1206 listen_expected: DataListenNext<u64>,
1207 ) {
1208 cache.progress_exclusive = ts + 1;
1209 assert_eq!(cache.data_snapshot(data_id, ts), snap_expected);
1210 assert_eq!(cache.data_listen_next(&data_id, &ts), listen_expected);
1211 assert_eq!(
1212 cache.data_listen_next(&data_id, &(ts + 1)),
1213 WaitForTxnsProgress
1214 );
1215 }
1216
1217 let mut c = TxnsCacheState::new(ShardId::new(), 0, None);
1225
1226 assert_eq!(c.progress_exclusive, 0);
1228 #[allow(clippy::disallowed_methods)] let result = std::panic::catch_unwind(|| c.data_snapshot(d0, 0));
1230 assert_err!(result);
1231 assert_eq!(c.data_listen_next(&d0, &0), WaitForTxnsProgress);
1232
1233 testcase(&mut c, 0, d0, ds(None, 0, 1), ReadDataTo(1));
1235
1236 testcase(&mut c, 1, d0, ds(None, 1, 2), ReadDataTo(2));
1242
1243 c.push_register(d0, 2, 1, 2);
1245 testcase(&mut c, 2, d0, ds(None, 2, 3), ReadDataTo(3));
1246
1247 testcase(&mut c, 3, d0, ds(None, 3, 4), EmitLogicalProgress(4));
1249
1250 c.push_append(d0, vec![4], 4, 1);
1252 testcase(&mut c, 4, d0, ds(Some(4), 4, 5), ReadDataTo(5));
1253
1254 c.push_append(d0, vec![5], 5, 1);
1256 testcase(&mut c, 5, d0, ds(Some(5), 5, 6), ReadDataTo(6));
1257
1258 testcase(&mut c, 6, d0, ds(Some(5), 6, 7), EmitLogicalProgress(7));
1260
1261 c.push_append(d0, vec![7], 7, 1);
1263 testcase(&mut c, 7, d0, ds(Some(7), 7, 8), ReadDataTo(8));
1264
1265 c.push_append(d0, vec![4], 8, -1);
1267 testcase(&mut c, 8, d0, ds(Some(7), 8, 9), EmitLogicalProgress(9));
1268
1269 c.push_append(d0, vec![5], 9, -1);
1271 testcase(&mut c, 9, d0, ds(Some(7), 9, 10), EmitLogicalProgress(10));
1272
1273 c.push_append(d0, vec![7], 10, -1);
1275 testcase(&mut c, 10, d0, ds(None, 10, 11), EmitLogicalProgress(11));
1276
1277 c.push_register(d0, 11, -1, 11);
1282 testcase(&mut c, 11, d0, ds(None, 11, 12), ReadDataTo(12));
1283
1284 testcase(&mut c, 12, d0, ds(None, 12, 13), ReadDataTo(13));
1287
1288 testcase(&mut c, 13, d0, ds(None, 13, 14), ReadDataTo(14));
1290
1291 testcase(&mut c, 14, d0, ds(None, 14, 15), ReadDataTo(15));
1294
1295 c.push_register(d0, 15, 1, 15);
1297 testcase(&mut c, 15, d0, ds(None, 15, 16), ReadDataTo(16));
1298
1299 c.push_register(d0, 16, -1, 16);
1304 testcase(&mut c, 16, d0, ds(None, 16, 17), ReadDataTo(17));
1305
1306 assert_eq!(c.data_snapshot(d0, 0), ds(None, 0, 1));
1311 assert_eq!(c.data_snapshot(d0, 1), ds(None, 1, 2));
1312 assert_eq!(c.data_snapshot(d0, 2), ds(None, 2, 3));
1313 assert_eq!(c.data_snapshot(d0, 3), ds(None, 3, 4));
1314 assert_eq!(c.data_snapshot(d0, 4), ds(None, 4, 5));
1315 assert_eq!(c.data_snapshot(d0, 5), ds(None, 5, 6));
1316 assert_eq!(c.data_snapshot(d0, 6), ds(None, 6, 7));
1317 assert_eq!(c.data_snapshot(d0, 7), ds(None, 7, 8));
1318 assert_eq!(c.data_snapshot(d0, 8), ds(None, 8, 9));
1319 assert_eq!(c.data_snapshot(d0, 9), ds(None, 9, 10));
1320 assert_eq!(c.data_snapshot(d0, 10), ds(None, 10, 11));
1321 assert_eq!(c.data_snapshot(d0, 11), ds(None, 11, 12));
1322 assert_eq!(c.data_snapshot(d0, 12), ds(None, 12, 13));
1323 assert_eq!(c.data_snapshot(d0, 13), ds(None, 13, 14));
1324 assert_eq!(c.data_snapshot(d0, 14), ds(None, 14, 15));
1325 assert_eq!(c.data_snapshot(d0, 15), ds(None, 15, 16));
1326 assert_eq!(c.data_snapshot(d0, 16), ds(None, 16, 17));
1327
1328 assert_eq!(c.data_listen_next(&d0, &0), ReadDataTo(17));
1329 assert_eq!(c.data_listen_next(&d0, &1), ReadDataTo(17));
1330 assert_eq!(c.data_listen_next(&d0, &2), ReadDataTo(17));
1331 assert_eq!(c.data_listen_next(&d0, &3), ReadDataTo(17));
1332 assert_eq!(c.data_listen_next(&d0, &4), ReadDataTo(17));
1333 assert_eq!(c.data_listen_next(&d0, &5), ReadDataTo(17));
1334 assert_eq!(c.data_listen_next(&d0, &6), ReadDataTo(17));
1335 assert_eq!(c.data_listen_next(&d0, &7), ReadDataTo(17));
1336 assert_eq!(c.data_listen_next(&d0, &8), ReadDataTo(17));
1337 assert_eq!(c.data_listen_next(&d0, &9), ReadDataTo(17));
1338 assert_eq!(c.data_listen_next(&d0, &10), ReadDataTo(17));
1339 assert_eq!(c.data_listen_next(&d0, &11), ReadDataTo(17));
1340 assert_eq!(c.data_listen_next(&d0, &12), ReadDataTo(17));
1341 assert_eq!(c.data_listen_next(&d0, &13), ReadDataTo(17));
1342 assert_eq!(c.data_listen_next(&d0, &14), ReadDataTo(17));
1343 assert_eq!(c.data_listen_next(&d0, &15), ReadDataTo(17));
1344 assert_eq!(c.data_listen_next(&d0, &16), ReadDataTo(17));
1345 assert_eq!(c.data_listen_next(&d0, &17), WaitForTxnsProgress);
1346 }
1347
1348 #[mz_ore::test(tokio::test)]
1349 #[cfg_attr(miri, ignore)] async fn empty_to() {
1351 let client = PersistClient::new_for_tests().await;
1352 let mut txns = TxnsHandle::expect_open(client.clone()).await;
1353 let d0 = txns.expect_register(1).await;
1354
1355 for ts in [3, 5] {
1370 let mut txn = txns.begin();
1371 txn.write(&d0, "3".into(), (), 1).await;
1372 let _apply = txn.commit_at(&mut txns, ts).await.unwrap();
1373 }
1374 let _ = txns.txns_cache.update_gt(&5).await;
1375 txns.apply_le(&4).await;
1376 let snap = txns.txns_cache.data_snapshot(d0, 4);
1377 let mut data_read = reader(&client, d0).await;
1378 let contents = snap.snapshot_and_fetch(&mut data_read).await.unwrap();
1380 assert_eq!(contents.len(), 1);
1381
1382 assert_eq!(snap.empty_to, 5);
1384 }
1385
1386 #[mz_ore::test]
1387 fn data_times_validate() {
1388 fn dt(register_forget_ts: &[u64], write_ts: &[u64]) -> Result<(), ()> {
1389 let mut dt = DataTimes::default();
1390 for x in register_forget_ts {
1391 if let Some(back) = dt.registered.back_mut() {
1392 if back.forget_ts == None {
1393 back.forget_ts = Some(*x);
1394 continue;
1395 }
1396 }
1397 dt.registered.push_back(DataRegistered {
1398 register_ts: *x,
1399 forget_ts: None,
1400 })
1401 }
1402 dt.writes = write_ts.into_iter().cloned().collect();
1403 dt.validate().map_err(|_| ())
1404 }
1405
1406 assert_eq!(dt(&[1], &[2, 3]), Ok(()));
1408 assert_eq!(dt(&[1, 3], &[2]), Ok(()));
1409 assert_eq!(dt(&[1, 3, 5], &[2, 6, 7]), Ok(()));
1410 assert_eq!(dt(&[1, 3, 5], &[2, 6, 7]), Ok(()));
1411 assert_eq!(dt(&[1, 1], &[1]), Ok(()));
1412
1413 assert_eq!(dt(&[], &[]), Err(()));
1415 assert_eq!(dt(&[1], &[0]), Err(()));
1416 assert_eq!(dt(&[1, 3], &[4]), Err(()));
1417 assert_eq!(dt(&[1, 3, 5], &[4]), Err(()));
1418 assert_eq!(dt(&[1, 4], &[3, 2]), Err(()));
1419 }
1420
1421 #[mz_ore::test(tokio::test)]
1435 #[cfg_attr(miri, ignore)] async fn regression_compact_latest_write() {
1437 let client = PersistClient::new_for_tests().await;
1438 let mut txns = TxnsHandle::expect_open(client.clone()).await;
1439 let log = txns.new_log();
1440 let d0 = txns.expect_register(1).await;
1441
1442 let tidy_5 = txns.expect_commit_at(5, d0, &["5"], &log).await;
1443 let _ = txns.expect_commit_at(15, d0, &["15"], &log).await;
1444 txns.tidy_at(20, tidy_5).await.unwrap();
1445 let _ = txns.txns_cache.update_gt(&20).await;
1446 assert_eq!(txns.txns_cache.min_unapplied_ts(), &15);
1447 txns.compact_to(10).await;
1448
1449 let mut txns_read = client
1450 .open_leased_reader(
1451 txns.txns_id(),
1452 Arc::new(ShardIdSchema),
1453 Arc::new(VecU8Schema),
1454 Diagnostics::for_tests(),
1455 true,
1456 )
1457 .await
1458 .expect("txns schema shouldn't change");
1459 txns_read.downgrade_since(&Antichain::from_elem(10)).await;
1460 let mut cache = TxnsCache::<_, TxnsCodecDefault>::from_read(txns_read, None).await;
1461 let _ = cache.update_gt(&15).await;
1462 let snap = cache.data_snapshot(d0, 12);
1463 assert_eq!(snap.latest_write, Some(5));
1464 }
1465
1466 #[mz_ore::test(tokio::test)]
1471 #[cfg_attr(miri, ignore)] async fn regression_ts_sort() {
1473 let client = PersistClient::new_for_tests().await;
1474 let txns = TxnsHandle::expect_open(client.clone()).await;
1475 let mut cache = TxnsCache::expect_open(0, &txns).await;
1476 let d0 = ShardId::new();
1477
1478 cache.push_entries(
1480 vec![
1481 (TxnsEntry::Register(d0, u64::encode(&2)), 2, -1),
1482 (TxnsEntry::Register(d0, u64::encode(&1)), 2, 1),
1483 ],
1484 3,
1485 );
1486 }
1487
1488 #[mz_ore::test(tokio::test)]
1491 #[cfg_attr(miri, ignore)] async fn data_compacted() {
1493 let d0 = ShardId::new();
1494 let mut c = TxnsCacheState::new(ShardId::new(), 10, None);
1495 c.progress_exclusive = 20;
1496
1497 #[allow(clippy::disallowed_methods)] let result = std::panic::catch_unwind(|| c.data_listen_next(&d0, &0));
1499 assert_err!(result);
1500
1501 let ds = c.data_snapshot(d0, 0);
1502 assert_eq!(
1503 ds,
1504 DataSnapshot {
1505 data_id: d0,
1506 latest_write: None,
1507 as_of: 0,
1508 empty_to: 10,
1509 }
1510 );
1511 }
1512}