1use std::cmp::{max, min};
13use std::collections::{BTreeMap, VecDeque};
14use std::fmt::Debug;
15use std::ops::{Deref, DerefMut};
16use std::sync::Arc;
17
18use differential_dataflow::consolidation::consolidate_updates;
19use differential_dataflow::hashable::Hashable;
20use differential_dataflow::lattice::Lattice;
21use itertools::Itertools;
22use mz_ore::cast::CastFrom;
23use mz_ore::collections::HashMap;
24use mz_ore::instrument;
25use mz_persist_client::cfg::USE_CRITICAL_SINCE_TXN;
26use mz_persist_client::fetch::LeasedBatchPart;
27use mz_persist_client::metrics::encode_ts_metric;
28use mz_persist_client::read::{ListenEvent, ReadHandle, Subscribe};
29use mz_persist_client::write::WriteHandle;
30use mz_persist_client::{Diagnostics, PersistClient, ShardId};
31use mz_persist_types::txn::{TxnsCodec, TxnsEntry};
32use mz_persist_types::{Codec64, StepForward};
33use timely::order::TotalOrder;
34use timely::progress::{Antichain, Timestamp};
35use tracing::debug;
36
37use crate::TxnsCodecDefault;
38use crate::metrics::Metrics;
39use crate::txn_read::{DataListenNext, DataRemapEntry, DataSnapshot, DataSubscribe};
40
41#[derive(Debug)]
81pub struct TxnsCacheState<T> {
82 txns_id: ShardId,
83 pub(crate) init_ts: T,
89 pub(crate) progress_exclusive: T,
91
92 next_batch_id: usize,
93 pub(crate) unapplied_batches: BTreeMap<usize, (ShardId, Vec<u8>, T)>,
100 batch_idx: HashMap<Vec<u8>, usize>,
102 pub(crate) datas: BTreeMap<ShardId, DataTimes<T>>,
107 pub(crate) unapplied_registers: VecDeque<(ShardId, T)>,
111
112 only_data_id: Option<ShardId>,
119}
120
121#[derive(Debug)]
123pub struct TxnsCache<T, C: TxnsCodec = TxnsCodecDefault> {
124 pub(crate) txns_subscribe: Subscribe<C::Key, C::Val, T, i64>,
126 pub(crate) buf: Vec<(TxnsEntry, T, i64)>,
128 state: TxnsCacheState<T>,
129}
130
131impl<T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync> TxnsCacheState<T> {
132 fn new(txns_id: ShardId, init_ts: T, only_data_id: Option<ShardId>) -> Self {
136 TxnsCacheState {
137 txns_id,
138 init_ts,
139 progress_exclusive: T::minimum(),
140 next_batch_id: 0,
141 unapplied_batches: BTreeMap::new(),
142 batch_idx: HashMap::new(),
143 datas: BTreeMap::new(),
144 unapplied_registers: VecDeque::new(),
145 only_data_id,
146 }
147 }
148
149 pub(crate) async fn init<C: TxnsCodec>(
153 only_data_id: Option<ShardId>,
154 txns_read: ReadHandle<C::Key, C::Val, T, i64>,
155 ) -> (Self, Subscribe<C::Key, C::Val, T, i64>) {
156 let txns_id = txns_read.shard_id();
157 let as_of = txns_read.since().clone();
158 let since_ts = as_of.as_option().expect("txns shard is not closed").clone();
159 let mut txns_subscribe = txns_read
160 .subscribe(as_of)
161 .await
162 .expect("handle holds a capability");
163 let mut state = Self::new(txns_id, since_ts.clone(), only_data_id.clone());
164 let mut buf = Vec::new();
165 TxnsCache::<T, C>::update(
168 &mut state,
169 &mut txns_subscribe,
170 &mut buf,
171 only_data_id,
172 |progress_exclusive| progress_exclusive >= &since_ts,
173 )
174 .await;
175 debug_assert_eq!(state.validate(), Ok(()));
176 (state, txns_subscribe)
177 }
178
179 pub fn txns_id(&self) -> ShardId {
181 self.txns_id
182 }
183
184 pub fn registered_at_progress(&self, data_id: &ShardId, ts: &T) -> bool {
198 self.assert_only_data_id(data_id);
199 assert_eq!(self.progress_exclusive, *ts);
200 let Some(data_times) = self.datas.get(data_id) else {
201 return false;
202 };
203 data_times.last_reg().forget_ts.is_none()
204 }
205
206 pub(crate) fn all_registered_at_progress(&self, ts: &T) -> Vec<ShardId> {
209 assert_eq!(self.only_data_id, None);
210 assert_eq!(self.progress_exclusive, *ts);
211 self.datas
212 .iter()
213 .filter(|(_, data_times)| data_times.last_reg().forget_ts.is_none())
214 .map(|(data_id, _)| *data_id)
215 .collect()
216 }
217
218 pub fn data_snapshot(&self, data_id: ShardId, as_of: T) -> DataSnapshot<T> {
227 self.assert_only_data_id(&data_id);
228 assert!(self.progress_exclusive > as_of);
229 let empty_to = max(as_of.step_forward(), self.init_ts.clone());
238 let Some(all) = self.datas.get(&data_id) else {
239 return DataSnapshot {
242 data_id,
243 latest_write: None,
244 as_of,
245 empty_to,
246 };
247 };
248
249 let min_unapplied_ts = self
250 .unapplied_batches
251 .first_key_value()
252 .map(|(_, (_, _, ts))| ts)
253 .unwrap_or(&self.progress_exclusive);
254 let latest_write = all
255 .writes
256 .iter()
257 .rev()
258 .find(|x| **x <= as_of && *x >= min_unapplied_ts)
259 .cloned();
260
261 debug!(
262 "data_snapshot {:.9} latest_write={:?} as_of={:?} empty_to={:?}: all={:?}",
263 data_id.to_string(),
264 latest_write,
265 as_of,
266 empty_to,
267 all,
268 );
269 let ret = DataSnapshot {
270 data_id: data_id.clone(),
271 latest_write,
272 as_of,
273 empty_to,
274 };
275 assert_eq!(ret.validate(), Ok(()));
276 ret
277 }
278
279 pub fn data_listen_next(&self, data_id: &ShardId, ts: &T) -> DataListenNext<T> {
293 self.assert_only_data_id(data_id);
294 assert!(
295 &self.progress_exclusive >= ts,
296 "ts {:?} is past progress_exclusive {:?}",
297 ts,
298 self.progress_exclusive
299 );
300 assert!(
317 ts >= &self.init_ts,
318 "ts {:?} is not past initial since {:?}",
319 ts,
320 self.init_ts
321 );
322 use DataListenNext::*;
323 let data_times = self.datas.get(data_id);
324 debug!(
325 "data_listen_next {:.9} {:?}: progress={:?} times={:?}",
326 data_id.to_string(),
327 ts,
328 self.progress_exclusive,
329 data_times,
330 );
331 let Some(data_times) = data_times else {
332 if ts < &self.progress_exclusive {
336 return ReadDataTo(self.progress_exclusive.clone());
337 } else {
338 return WaitForTxnsProgress;
339 }
340 };
341 let physical_ts = data_times.latest_physical_ts();
342 let last_reg = data_times.last_reg();
343 if ts >= &self.progress_exclusive {
344 WaitForTxnsProgress
346 } else if ts <= physical_ts {
347 ReadDataTo(physical_ts.step_forward())
349 } else if last_reg.forget_ts.is_none() {
350 assert!(last_reg.contains(ts));
354 EmitLogicalProgress(self.progress_exclusive.clone())
355 } else {
356 assert!(ts > &last_reg.register_ts && last_reg.forget_ts.is_some());
361 ReadDataTo(self.progress_exclusive.clone())
362 }
363 }
364
365 pub(crate) fn data_subscribe(&self, data_id: ShardId, as_of: T) -> DataSubscribe<T> {
370 self.assert_only_data_id(&data_id);
371 assert!(self.progress_exclusive > as_of);
372 let snapshot = self.data_snapshot(data_id, as_of);
373 let remap = DataRemapEntry {
374 physical_upper: snapshot.empty_to.clone(),
375 logical_upper: snapshot.empty_to.clone(),
376 };
377 DataSubscribe {
378 data_id,
379 snapshot: Some(snapshot),
380 remap,
381 }
382 }
383
384 pub fn min_unapplied_ts(&self) -> &T {
386 assert_eq!(self.only_data_id, None);
387 self.min_unapplied_ts_inner()
388 }
389
390 fn min_unapplied_ts_inner(&self) -> &T {
391 let min_batch_ts = self
394 .unapplied_batches
395 .first_key_value()
396 .map(|(_, (_, _, ts))| ts)
397 .unwrap_or(&self.progress_exclusive);
401 let min_register_ts = self
402 .unapplied_registers
403 .front()
404 .map(|(_, ts)| ts)
405 .unwrap_or(&self.progress_exclusive);
406
407 min(min_batch_ts, min_register_ts)
408 }
409
410 pub(crate) fn unapplied(&self) -> impl Iterator<Item = (&ShardId, Unapplied<'_>, &T)> {
412 assert_eq!(self.only_data_id, None);
413 let registers = self
414 .unapplied_registers
415 .iter()
416 .map(|(data_id, ts)| (data_id, Unapplied::RegisterForget, ts));
417 let batches = self
418 .unapplied_batches
419 .values()
420 .fold(
421 BTreeMap::new(),
422 |mut accum: BTreeMap<_, Vec<_>>, (data_id, batch, ts)| {
423 accum.entry((ts, data_id)).or_default().push(batch);
424 accum
425 },
426 )
427 .into_iter()
428 .map(|((ts, data_id), batches)| (data_id, Unapplied::Batch(batches), ts));
429 registers.merge_by(batches, |(_, _, ts1), (_, _, ts2)| ts1 <= ts2)
435 }
436
437 pub(crate) fn filter_retractions<'a>(
451 &'a self,
452 expected_txns_upper: &T,
453 retractions: impl Iterator<Item = (&'a Vec<u8>, &'a ([u8; 8], ShardId))>,
454 ) -> impl Iterator<Item = (&'a Vec<u8>, &'a ([u8; 8], ShardId))> {
455 assert_eq!(self.only_data_id, None);
456 assert!(&self.progress_exclusive >= expected_txns_upper);
457 retractions.filter(|(batch_raw, _)| self.batch_idx.contains_key(*batch_raw))
458 }
459
460 pub(crate) fn push_entries(&mut self, mut entries: Vec<(TxnsEntry, T, i64)>, progress: T) {
462 consolidate_updates(&mut entries);
467 entries.sort_by(|(a, _, _), (b, _, _)| a.ts::<T>().cmp(&b.ts::<T>()));
468 for (e, t, d) in entries {
469 match e {
470 TxnsEntry::Register(data_id, ts) => {
471 let ts = T::decode(ts);
472 debug_assert!(ts <= t);
473 self.push_register(data_id, ts, d, t);
474 }
475 TxnsEntry::Append(data_id, ts, batch) => {
476 let ts = T::decode(ts);
477 debug_assert!(ts <= t);
478 self.push_append(data_id, batch, ts, d)
479 }
480 }
481 }
482 self.progress_exclusive = progress;
483 debug_assert_eq!(self.validate(), Ok(()));
484 }
485
486 fn push_register(&mut self, data_id: ShardId, ts: T, diff: i64, compacted_ts: T) {
487 self.assert_only_data_id(&data_id);
488 debug_assert!(ts >= self.progress_exclusive || diff < 0);
491 if let Some(only_data_id) = self.only_data_id.as_ref() {
492 if only_data_id != &data_id {
493 return;
494 }
495 }
496
497 if ts == compacted_ts {
499 self.unapplied_registers.push_back((data_id, ts.clone()));
500 }
501
502 if diff == 1 {
503 debug!(
504 "cache learned {:.9} registered t={:?}",
505 data_id.to_string(),
506 ts
507 );
508 let entry = self.datas.entry(data_id).or_default();
509 if let Some(last_reg) = entry.registered.back() {
512 assert!(last_reg.forget_ts.is_some())
513 }
514 entry.registered.push_back(DataRegistered {
515 register_ts: ts,
516 forget_ts: None,
517 });
518 } else if diff == -1 {
519 debug!(
520 "cache learned {:.9} forgotten t={:?}",
521 data_id.to_string(),
522 ts
523 );
524 let active_reg = self
525 .datas
526 .get_mut(&data_id)
527 .and_then(|x| x.registered.back_mut())
528 .expect("data shard should be registered before forget");
529 assert_eq!(active_reg.forget_ts.replace(ts), None);
530 } else {
531 unreachable!("only +1/-1 diffs are used");
532 }
533 debug_assert_eq!(self.validate(), Ok(()));
534 }
535
536 fn push_append(&mut self, data_id: ShardId, batch: Vec<u8>, ts: T, diff: i64) {
537 self.assert_only_data_id(&data_id);
538 debug_assert!(ts >= self.progress_exclusive || diff < 0);
541 if let Some(only_data_id) = self.only_data_id.as_ref() {
542 if only_data_id != &data_id {
543 return;
544 }
545 }
546
547 if diff == 1 {
548 debug!(
549 "cache learned {:.9} committed t={:?} b={}",
550 data_id.to_string(),
551 ts,
552 batch.hashed(),
553 );
554 let idx = self.next_batch_id;
555 self.next_batch_id += 1;
556 let prev = self.batch_idx.insert(batch.clone(), idx);
557 assert_eq!(prev, None);
558 let prev = self
559 .unapplied_batches
560 .insert(idx, (data_id, batch, ts.clone()));
561 assert_eq!(prev, None);
562 let times = self.datas.get_mut(&data_id).expect("data is initialized");
563 assert_eq!(times.last_reg().forget_ts, None);
565
566 if times.writes.back() != Some(&ts) {
570 times.writes.push_back(ts);
571 }
572 } else if diff == -1 {
573 debug!(
574 "cache learned {:.9} applied t={:?} b={}",
575 data_id.to_string(),
576 ts,
577 batch.hashed(),
578 );
579 let idx = self
580 .batch_idx
581 .remove(&batch)
582 .expect("invariant violation: batch should exist");
583 let prev = self
584 .unapplied_batches
585 .remove(&idx)
586 .expect("invariant violation: batch index should exist");
587 debug_assert_eq!(data_id, prev.0);
588 debug_assert_eq!(batch, prev.1);
589 debug_assert!(prev.2 <= ts);
591 } else {
592 unreachable!("only +1/-1 diffs are used");
593 }
594 self.compact_data_times(&data_id);
595 debug_assert_eq!(self.validate(), Ok(()));
596 }
597
598 pub(crate) fn mark_register_applied(&mut self, ts: &T) {
601 self.unapplied_registers
602 .retain(|(_, register_ts)| ts < register_ts);
603 debug_assert_eq!(self.validate(), Ok(()));
604 }
605
606 fn compact_data_times(&mut self, data_id: &ShardId) {
615 let Some(times) = self.datas.get_mut(data_id) else {
616 return;
617 };
618
619 debug!("cache compact {:.9} times={:?}", data_id.to_string(), times);
620
621 if let Some(unapplied_write_ts) = self
622 .unapplied_batches
623 .first_key_value()
624 .map(|(_, (_, _, ts))| ts)
625 {
626 debug!(
627 "cache compact {:.9} unapplied_write_ts={:?}",
628 data_id.to_string(),
629 unapplied_write_ts,
630 );
631 while let Some(write_ts) = times.writes.front() {
632 if times.writes.len() == 1 || write_ts >= unapplied_write_ts {
633 break;
634 }
635 times.writes.pop_front();
636 }
637 } else {
638 times.writes.drain(..times.writes.len() - 1);
639 }
640 let unapplied_reg_ts = self.unapplied_registers.front().map(|(_, ts)| ts);
641 let min_write_ts = times.writes.front();
642 let min_reg_ts = [unapplied_reg_ts, min_write_ts].into_iter().flatten().min();
643 if let Some(min_reg_ts) = min_reg_ts {
644 debug!(
645 "cache compact {:.9} unapplied_reg_ts={:?} min_write_ts={:?} min_reg_ts={:?}",
646 data_id.to_string(),
647 unapplied_reg_ts,
648 min_write_ts,
649 min_reg_ts,
650 );
651 while let Some(reg) = times.registered.front() {
652 match ®.forget_ts {
653 Some(forget_ts) if forget_ts >= min_reg_ts => break,
654 _ if times.registered.len() == 1 => break,
655 _ => {
656 assert!(
657 reg.forget_ts.is_some(),
658 "only the latest reg can have no forget ts"
659 );
660 times.registered.pop_front();
661 }
662 }
663 }
664 } else {
665 times.registered.drain(..times.registered.len() - 1);
666 }
667
668 debug!(
669 "cache compact DONE {:.9} times={:?}",
670 data_id.to_string(),
671 times
672 );
673 }
674
675 pub(crate) fn update_gauges(&self, metrics: &Metrics) {
676 metrics
677 .data_shard_count
678 .set(u64::cast_from(self.datas.len()));
679 metrics
680 .batches
681 .unapplied_count
682 .set(u64::cast_from(self.unapplied_batches.len()));
683 let unapplied_batches_bytes = self
684 .unapplied_batches
685 .values()
686 .map(|(_, x, _)| x.len())
687 .sum::<usize>();
688 metrics
689 .batches
690 .unapplied_bytes
691 .set(u64::cast_from(unapplied_batches_bytes));
692 metrics
693 .batches
694 .unapplied_min_ts
695 .set(encode_ts_metric(&Antichain::from_elem(
696 self.min_unapplied_ts().clone(),
697 )));
698 }
699
700 fn assert_only_data_id(&self, data_id: &ShardId) {
701 if let Some(only_data_id) = self.only_data_id.as_ref() {
702 assert_eq!(data_id, only_data_id);
703 }
704 }
705
706 pub(crate) fn validate(&self) -> Result<(), String> {
707 if self.batch_idx.len() != self.unapplied_batches.len() {
709 return Err(format!(
710 "expected index len {} to match what it's indexing {}",
711 self.batch_idx.len(),
712 self.unapplied_batches.len()
713 ));
714 }
715
716 let mut prev_batch_ts = T::minimum();
717 for (idx, (_, batch, ts)) in self.unapplied_batches.iter() {
718 if self.batch_idx.get(batch) != Some(idx) {
719 return Err(format!(
720 "expected batch to be indexed at {} got {:?}",
721 idx,
722 self.batch_idx.get(batch)
723 ));
724 }
725 if ts < &prev_batch_ts {
726 return Err(format!(
727 "unapplied batch timestamp {:?} out of order after {:?}",
728 ts, prev_batch_ts
729 ));
730 }
731 prev_batch_ts = ts.clone();
732 }
733
734 let mut prev_register_ts = T::minimum();
736 for (_, ts) in self.unapplied_registers.iter() {
737 if ts < &prev_register_ts {
738 return Err(format!(
739 "unapplied register timestamp {:?} out of order after {:?}",
740 ts, prev_register_ts
741 ));
742 }
743 prev_register_ts = ts.clone();
744 }
745
746 let min_unapplied_ts = self.min_unapplied_ts_inner();
747
748 for (data_id, data_times) in self.datas.iter() {
749 let () = data_times.validate()?;
750
751 if let Some(ts) = data_times.writes.front() {
752 if min_unapplied_ts > ts && data_times.writes.len() > 1 {
754 return Err(format!(
755 "{:?} write ts {:?} not past min unapplied ts {:?}",
756 data_id, ts, min_unapplied_ts
757 ));
758 }
759 }
760
761 if let Some((_, (_, _, unapplied_ts))) = self
763 .unapplied_batches
764 .iter()
765 .find(|(_, (shard_id, _, _))| shard_id == data_id)
766 {
767 if let Some(write_ts) = data_times.writes.front() {
768 if write_ts > unapplied_ts {
769 return Err(format!(
770 "{:?} min write ts {:?} past min unapplied batch ts {:?}",
771 data_id, write_ts, unapplied_ts
772 ));
773 }
774 }
775 }
776
777 if let Some((_, unapplied_ts)) = self
779 .unapplied_registers
780 .iter()
781 .find(|(shard_id, _)| shard_id == data_id)
782 {
783 let register_ts = &data_times.first_reg().register_ts;
784 if register_ts > unapplied_ts {
785 return Err(format!(
786 "{:?} min register ts {:?} past min unapplied register ts {:?}",
787 data_id, register_ts, unapplied_ts
788 ));
789 }
790 }
791 }
792
793 Ok(())
794 }
795}
796
797impl<T, C> TxnsCache<T, C>
798where
799 T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
800 C: TxnsCodec,
801{
802 pub(crate) async fn init(
805 init_ts: T,
806 txns_read: ReadHandle<C::Key, C::Val, T, i64>,
807 txns_write: &mut WriteHandle<C::Key, C::Val, T, i64>,
808 ) -> Self {
809 let () = crate::empty_caa(|| "txns init", txns_write, init_ts.clone()).await;
810 let mut ret = Self::from_read(txns_read, None).await;
811 let _ = ret.update_gt(&init_ts).await;
812 ret
813 }
814
815 pub async fn open(
821 client: &PersistClient,
822 txns_id: ShardId,
823 only_data_id: Option<ShardId>,
824 ) -> Self {
825 let (txns_key_schema, txns_val_schema) = C::schemas();
826 let txns_read = client
827 .open_leased_reader(
828 txns_id,
829 Arc::new(txns_key_schema),
830 Arc::new(txns_val_schema),
831 Diagnostics {
832 shard_name: "txns".to_owned(),
833 handle_purpose: "read txns".to_owned(),
834 },
835 USE_CRITICAL_SINCE_TXN.get(client.dyncfgs()),
836 )
837 .await
838 .expect("txns schema shouldn't change");
839 Self::from_read(txns_read, only_data_id).await
840 }
841
842 async fn from_read(
843 txns_read: ReadHandle<C::Key, C::Val, T, i64>,
844 only_data_id: Option<ShardId>,
845 ) -> Self {
846 let (state, txns_subscribe) = TxnsCacheState::init::<C>(only_data_id, txns_read).await;
847 TxnsCache {
848 txns_subscribe,
849 buf: Vec::new(),
850 state,
851 }
852 }
853
854 #[must_use]
858 #[instrument(level = "debug", fields(ts = ?ts))]
859 pub async fn update_gt(&mut self, ts: &T) -> &T {
860 let only_data_id = self.only_data_id.clone();
861 Self::update(
862 &mut self.state,
863 &mut self.txns_subscribe,
864 &mut self.buf,
865 only_data_id,
866 |progress_exclusive| progress_exclusive > ts,
867 )
868 .await;
869 debug_assert!(&self.progress_exclusive > ts);
870 debug_assert_eq!(self.validate(), Ok(()));
871 &self.progress_exclusive
872 }
873
874 #[must_use]
878 #[instrument(level = "debug", fields(ts = ?ts))]
879 pub async fn update_ge(&mut self, ts: &T) -> &T {
880 let only_data_id = self.only_data_id.clone();
881 Self::update(
882 &mut self.state,
883 &mut self.txns_subscribe,
884 &mut self.buf,
885 only_data_id,
886 |progress_exclusive| progress_exclusive >= ts,
887 )
888 .await;
889 debug_assert!(&self.progress_exclusive >= ts);
890 debug_assert_eq!(self.validate(), Ok(()));
891 &self.progress_exclusive
892 }
893
894 async fn update<F: Fn(&T) -> bool>(
896 state: &mut TxnsCacheState<T>,
897 txns_subscribe: &mut Subscribe<C::Key, C::Val, T, i64>,
898 buf: &mut Vec<(TxnsEntry, T, i64)>,
899 only_data_id: Option<ShardId>,
900 done: F,
901 ) {
902 while !done(&state.progress_exclusive) {
903 let events = txns_subscribe.next(None).await;
904 for event in events {
905 match event {
906 ListenEvent::Progress(frontier) => {
907 let progress = frontier
908 .into_option()
909 .expect("nothing should close the txns shard");
910 state.push_entries(std::mem::take(buf), progress);
911 }
912 ListenEvent::Updates(parts) => {
913 Self::fetch_parts(only_data_id, txns_subscribe, parts, buf).await;
914 }
915 };
916 }
917 }
918 debug_assert_eq!(state.validate(), Ok(()));
919 debug!(
920 "cache correct before {:?} len={} least_ts={:?}",
921 state.progress_exclusive,
922 state.unapplied_batches.len(),
923 state
924 .unapplied_batches
925 .first_key_value()
926 .map(|(_, (_, _, ts))| ts),
927 );
928 }
929
930 pub(crate) async fn fetch_parts(
931 only_data_id: Option<ShardId>,
932 txns_subscribe: &mut Subscribe<C::Key, C::Val, T, i64>,
933 parts: Vec<LeasedBatchPart<T>>,
934 updates: &mut Vec<(TxnsEntry, T, i64)>,
935 ) {
936 for part in parts {
942 let should_fetch_part = Self::should_fetch_part(only_data_id.as_ref(), &part);
943 debug!(
944 "should_fetch_part={} for {:?} {:?}",
945 should_fetch_part,
946 only_data_id,
947 part.stats()
948 );
949 if !should_fetch_part {
950 drop(part);
951 continue;
952 }
953 let part_updates = txns_subscribe.fetch_batch_part(part).await;
954 let part_updates = part_updates.map(|((k, v), t, d)| {
955 let (k, v) = (k.expect("valid key"), v.expect("valid val"));
956 (C::decode(k, v), t, d)
957 });
958 if let Some(only_data_id) = only_data_id.as_ref() {
959 updates.extend(part_updates.filter(|(x, _, _)| x.data_id() == only_data_id));
960 } else {
961 updates.extend(part_updates);
962 }
963 }
964 }
965
966 fn should_fetch_part(only_data_id: Option<&ShardId>, part: &LeasedBatchPart<T>) -> bool {
967 let Some(only_data_id) = only_data_id else {
968 return true;
969 };
970 let Some(stats) = part.stats() else {
973 return true;
974 };
975 C::should_fetch_part(only_data_id, &stats).unwrap_or(true)
976 }
977}
978
979impl<T, C: TxnsCodec> Deref for TxnsCache<T, C> {
980 type Target = TxnsCacheState<T>;
981 fn deref(&self) -> &Self::Target {
982 &self.state
983 }
984}
985
986impl<T, C: TxnsCodec> DerefMut for TxnsCache<T, C> {
987 fn deref_mut(&mut self) -> &mut Self::Target {
988 &mut self.state
989 }
990}
991
992#[derive(Debug)]
993pub(crate) struct DataTimes<T> {
994 pub(crate) registered: VecDeque<DataRegistered<T>>,
1003 pub(crate) writes: VecDeque<T>,
1005}
1006
1007impl<T> Default for DataTimes<T> {
1008 fn default() -> Self {
1009 Self {
1010 registered: Default::default(),
1011 writes: Default::default(),
1012 }
1013 }
1014}
1015
1016#[derive(Debug)]
1017pub(crate) struct DataRegistered<T> {
1018 pub(crate) register_ts: T,
1023 pub(crate) forget_ts: Option<T>,
1026}
1027
1028impl<T: Timestamp + TotalOrder> DataRegistered<T> {
1029 pub(crate) fn contains(&self, ts: &T) -> bool {
1030 &self.register_ts <= ts && self.forget_ts.as_ref().map_or(true, |x| ts <= x)
1031 }
1032}
1033
1034impl<T: Timestamp + TotalOrder> DataTimes<T> {
1035 pub(crate) fn last_reg(&self) -> &DataRegistered<T> {
1036 self.registered.back().expect("at least one registration")
1037 }
1038
1039 fn first_reg(&self) -> &DataRegistered<T> {
1040 self.registered.front().expect("at least one registration")
1041 }
1042
1043 fn latest_physical_ts(&self) -> &T {
1045 let last_reg = self.last_reg();
1046 let mut physical_ts = &last_reg.register_ts;
1047 if let Some(forget_ts) = &last_reg.forget_ts {
1048 physical_ts = max(physical_ts, forget_ts);
1049 }
1050 if let Some(latest_write) = self.writes.back() {
1051 physical_ts = max(physical_ts, latest_write);
1052 }
1053 physical_ts
1054 }
1055
1056 pub(crate) fn validate(&self) -> Result<(), String> {
1057 let mut prev_ts = T::minimum();
1059 for ts in self.writes.iter() {
1060 if ts < &prev_ts {
1061 return Err(format!(
1062 "write ts {:?} out of order after {:?}",
1063 ts, prev_ts
1064 ));
1065 }
1066 prev_ts = ts.clone();
1067 }
1068
1069 let mut prev_ts = T::minimum();
1071 let mut writes_idx = 0;
1072 for x in self.registered.iter() {
1073 if x.register_ts < prev_ts {
1074 return Err(format!(
1075 "register ts {:?} out of order after {:?}",
1076 x.register_ts, prev_ts
1077 ));
1078 }
1079 if let Some(forget_ts) = x.forget_ts.as_ref() {
1080 if !(&x.register_ts <= forget_ts) {
1081 return Err(format!(
1082 "register ts {:?} not less_equal forget ts {:?}",
1083 x.register_ts, forget_ts
1084 ));
1085 }
1086 prev_ts.clone_from(forget_ts);
1087 }
1088 while let Some(write_ts) = self.writes.get(writes_idx) {
1090 if write_ts < &x.register_ts {
1091 return Err(format!(
1092 "write ts {:?} not in any register interval {:?}",
1093 write_ts, self.registered
1094 ));
1095 }
1096 if let Some(forget_ts) = x.forget_ts.as_ref() {
1097 if write_ts <= forget_ts {
1098 writes_idx += 1;
1099 continue;
1100 }
1101 }
1102 break;
1103 }
1104 }
1105
1106 let Some(reg_back) = self.registered.back() else {
1108 return Err("registered was empty".into());
1109 };
1110 if writes_idx != self.writes.len() && reg_back.forget_ts.is_some() {
1111 return Err(format!(
1112 "write ts {:?} not in any register interval {:?}",
1113 self.writes, self.registered
1114 ));
1115 }
1116
1117 Ok(())
1118 }
1119}
1120
1121#[derive(Debug)]
1122pub(crate) enum Unapplied<'a> {
1123 RegisterForget,
1124 Batch(Vec<&'a Vec<u8>>),
1125}
1126
1127#[cfg(test)]
1128mod tests {
1129 use DataListenNext::*;
1130 use mz_ore::assert_err;
1131 use mz_persist_client::PersistClient;
1132 use mz_persist_types::codec_impls::{ShardIdSchema, VecU8Schema};
1133
1134 use crate::operator::DataSubscribe;
1135 use crate::tests::reader;
1136 use crate::txns::TxnsHandle;
1137
1138 use super::*;
1139
1140 impl TxnsCache<u64, TxnsCodecDefault> {
1141 pub(crate) async fn expect_open(
1142 init_ts: u64,
1143 txns: &TxnsHandle<String, (), u64, i64>,
1144 ) -> Self {
1145 let mut ret = TxnsCache::open(&txns.datas.client, txns.txns_id(), None).await;
1146 let _ = ret.update_gt(&init_ts).await;
1147 ret
1148 }
1149
1150 pub(crate) async fn expect_snapshot(
1151 &mut self,
1152 client: &PersistClient,
1153 data_id: ShardId,
1154 as_of: u64,
1155 ) -> Vec<String> {
1156 let mut data_read = reader(client, data_id).await;
1157 let _ = self.update_gt(&as_of).await;
1158 let mut snapshot = self
1159 .data_snapshot(data_read.shard_id(), as_of)
1160 .snapshot_and_fetch(&mut data_read)
1161 .await
1162 .unwrap();
1163 snapshot.sort();
1164 snapshot
1165 .into_iter()
1166 .flat_map(|((k, v), _t, d)| {
1167 let (k, ()) = (k.unwrap(), v.unwrap());
1168 std::iter::repeat(k).take(usize::try_from(d).unwrap())
1169 })
1170 .collect()
1171 }
1172
1173 pub(crate) fn expect_subscribe(
1174 &self,
1175 client: &PersistClient,
1176 data_id: ShardId,
1177 as_of: u64,
1178 ) -> DataSubscribe {
1179 DataSubscribe::new(
1180 "test",
1181 client.clone(),
1182 self.txns_id,
1183 data_id,
1184 as_of,
1185 Antichain::new(),
1186 true,
1187 )
1188 }
1189 }
1190
1191 #[mz_ore::test]
1192 fn txns_cache_data_snapshot_and_listen_next() {
1193 let d0 = ShardId::new();
1194 let ds = |latest_write: Option<u64>, as_of: u64, empty_to: u64| -> DataSnapshot<u64> {
1195 DataSnapshot {
1196 data_id: d0,
1197 latest_write,
1198 as_of,
1199 empty_to,
1200 }
1201 };
1202 #[track_caller]
1203 fn testcase(
1204 cache: &mut TxnsCacheState<u64>,
1205 ts: u64,
1206 data_id: ShardId,
1207 snap_expected: DataSnapshot<u64>,
1208 listen_expected: DataListenNext<u64>,
1209 ) {
1210 cache.progress_exclusive = ts + 1;
1211 assert_eq!(cache.data_snapshot(data_id, ts), snap_expected);
1212 assert_eq!(cache.data_listen_next(&data_id, &ts), listen_expected);
1213 assert_eq!(
1214 cache.data_listen_next(&data_id, &(ts + 1)),
1215 WaitForTxnsProgress
1216 );
1217 }
1218
1219 let mut c = TxnsCacheState::new(ShardId::new(), 0, None);
1227
1228 assert_eq!(c.progress_exclusive, 0);
1230 #[allow(clippy::disallowed_methods)] let result = std::panic::catch_unwind(|| c.data_snapshot(d0, 0));
1232 assert_err!(result);
1233 assert_eq!(c.data_listen_next(&d0, &0), WaitForTxnsProgress);
1234
1235 testcase(&mut c, 0, d0, ds(None, 0, 1), ReadDataTo(1));
1237
1238 testcase(&mut c, 1, d0, ds(None, 1, 2), ReadDataTo(2));
1244
1245 c.push_register(d0, 2, 1, 2);
1247 testcase(&mut c, 2, d0, ds(None, 2, 3), ReadDataTo(3));
1248
1249 testcase(&mut c, 3, d0, ds(None, 3, 4), EmitLogicalProgress(4));
1251
1252 c.push_append(d0, vec![4], 4, 1);
1254 testcase(&mut c, 4, d0, ds(Some(4), 4, 5), ReadDataTo(5));
1255
1256 c.push_append(d0, vec![5], 5, 1);
1258 testcase(&mut c, 5, d0, ds(Some(5), 5, 6), ReadDataTo(6));
1259
1260 testcase(&mut c, 6, d0, ds(Some(5), 6, 7), EmitLogicalProgress(7));
1262
1263 c.push_append(d0, vec![7], 7, 1);
1265 testcase(&mut c, 7, d0, ds(Some(7), 7, 8), ReadDataTo(8));
1266
1267 c.push_append(d0, vec![4], 8, -1);
1269 testcase(&mut c, 8, d0, ds(Some(7), 8, 9), EmitLogicalProgress(9));
1270
1271 c.push_append(d0, vec![5], 9, -1);
1273 testcase(&mut c, 9, d0, ds(Some(7), 9, 10), EmitLogicalProgress(10));
1274
1275 c.push_append(d0, vec![7], 10, -1);
1277 testcase(&mut c, 10, d0, ds(None, 10, 11), EmitLogicalProgress(11));
1278
1279 c.push_register(d0, 11, -1, 11);
1284 testcase(&mut c, 11, d0, ds(None, 11, 12), ReadDataTo(12));
1285
1286 testcase(&mut c, 12, d0, ds(None, 12, 13), ReadDataTo(13));
1289
1290 testcase(&mut c, 13, d0, ds(None, 13, 14), ReadDataTo(14));
1292
1293 testcase(&mut c, 14, d0, ds(None, 14, 15), ReadDataTo(15));
1296
1297 c.push_register(d0, 15, 1, 15);
1299 testcase(&mut c, 15, d0, ds(None, 15, 16), ReadDataTo(16));
1300
1301 c.push_register(d0, 16, -1, 16);
1306 testcase(&mut c, 16, d0, ds(None, 16, 17), ReadDataTo(17));
1307
1308 assert_eq!(c.data_snapshot(d0, 0), ds(None, 0, 1));
1313 assert_eq!(c.data_snapshot(d0, 1), ds(None, 1, 2));
1314 assert_eq!(c.data_snapshot(d0, 2), ds(None, 2, 3));
1315 assert_eq!(c.data_snapshot(d0, 3), ds(None, 3, 4));
1316 assert_eq!(c.data_snapshot(d0, 4), ds(None, 4, 5));
1317 assert_eq!(c.data_snapshot(d0, 5), ds(None, 5, 6));
1318 assert_eq!(c.data_snapshot(d0, 6), ds(None, 6, 7));
1319 assert_eq!(c.data_snapshot(d0, 7), ds(None, 7, 8));
1320 assert_eq!(c.data_snapshot(d0, 8), ds(None, 8, 9));
1321 assert_eq!(c.data_snapshot(d0, 9), ds(None, 9, 10));
1322 assert_eq!(c.data_snapshot(d0, 10), ds(None, 10, 11));
1323 assert_eq!(c.data_snapshot(d0, 11), ds(None, 11, 12));
1324 assert_eq!(c.data_snapshot(d0, 12), ds(None, 12, 13));
1325 assert_eq!(c.data_snapshot(d0, 13), ds(None, 13, 14));
1326 assert_eq!(c.data_snapshot(d0, 14), ds(None, 14, 15));
1327 assert_eq!(c.data_snapshot(d0, 15), ds(None, 15, 16));
1328 assert_eq!(c.data_snapshot(d0, 16), ds(None, 16, 17));
1329
1330 assert_eq!(c.data_listen_next(&d0, &0), ReadDataTo(17));
1331 assert_eq!(c.data_listen_next(&d0, &1), ReadDataTo(17));
1332 assert_eq!(c.data_listen_next(&d0, &2), ReadDataTo(17));
1333 assert_eq!(c.data_listen_next(&d0, &3), ReadDataTo(17));
1334 assert_eq!(c.data_listen_next(&d0, &4), ReadDataTo(17));
1335 assert_eq!(c.data_listen_next(&d0, &5), ReadDataTo(17));
1336 assert_eq!(c.data_listen_next(&d0, &6), ReadDataTo(17));
1337 assert_eq!(c.data_listen_next(&d0, &7), ReadDataTo(17));
1338 assert_eq!(c.data_listen_next(&d0, &8), ReadDataTo(17));
1339 assert_eq!(c.data_listen_next(&d0, &9), ReadDataTo(17));
1340 assert_eq!(c.data_listen_next(&d0, &10), ReadDataTo(17));
1341 assert_eq!(c.data_listen_next(&d0, &11), ReadDataTo(17));
1342 assert_eq!(c.data_listen_next(&d0, &12), ReadDataTo(17));
1343 assert_eq!(c.data_listen_next(&d0, &13), ReadDataTo(17));
1344 assert_eq!(c.data_listen_next(&d0, &14), ReadDataTo(17));
1345 assert_eq!(c.data_listen_next(&d0, &15), ReadDataTo(17));
1346 assert_eq!(c.data_listen_next(&d0, &16), ReadDataTo(17));
1347 assert_eq!(c.data_listen_next(&d0, &17), WaitForTxnsProgress);
1348 }
1349
1350 #[mz_ore::test(tokio::test)]
1351 #[cfg_attr(miri, ignore)] async fn empty_to() {
1353 let client = PersistClient::new_for_tests().await;
1354 let mut txns = TxnsHandle::expect_open(client.clone()).await;
1355 let d0 = txns.expect_register(1).await;
1356
1357 for ts in [3, 5] {
1372 let mut txn = txns.begin();
1373 txn.write(&d0, "3".into(), (), 1).await;
1374 let _apply = txn.commit_at(&mut txns, ts).await.unwrap();
1375 }
1376 let _ = txns.txns_cache.update_gt(&5).await;
1377 txns.apply_le(&4).await;
1378 let snap = txns.txns_cache.data_snapshot(d0, 4);
1379 let mut data_read = reader(&client, d0).await;
1380 let contents = snap.snapshot_and_fetch(&mut data_read).await.unwrap();
1382 assert_eq!(contents.len(), 1);
1383
1384 assert_eq!(snap.empty_to, 5);
1386 }
1387
1388 #[mz_ore::test]
1389 fn data_times_validate() {
1390 fn dt(register_forget_ts: &[u64], write_ts: &[u64]) -> Result<(), ()> {
1391 let mut dt = DataTimes::default();
1392 for x in register_forget_ts {
1393 if let Some(back) = dt.registered.back_mut() {
1394 if back.forget_ts == None {
1395 back.forget_ts = Some(*x);
1396 continue;
1397 }
1398 }
1399 dt.registered.push_back(DataRegistered {
1400 register_ts: *x,
1401 forget_ts: None,
1402 })
1403 }
1404 dt.writes = write_ts.into_iter().cloned().collect();
1405 dt.validate().map_err(|_| ())
1406 }
1407
1408 assert_eq!(dt(&[1], &[2, 3]), Ok(()));
1410 assert_eq!(dt(&[1, 3], &[2]), Ok(()));
1411 assert_eq!(dt(&[1, 3, 5], &[2, 6, 7]), Ok(()));
1412 assert_eq!(dt(&[1, 3, 5], &[2, 6, 7]), Ok(()));
1413 assert_eq!(dt(&[1, 1], &[1]), Ok(()));
1414
1415 assert_eq!(dt(&[], &[]), Err(()));
1417 assert_eq!(dt(&[1], &[0]), Err(()));
1418 assert_eq!(dt(&[1, 3], &[4]), Err(()));
1419 assert_eq!(dt(&[1, 3, 5], &[4]), Err(()));
1420 assert_eq!(dt(&[1, 4], &[3, 2]), Err(()));
1421 }
1422
1423 #[mz_ore::test(tokio::test)]
1437 #[cfg_attr(miri, ignore)] async fn regression_compact_latest_write() {
1439 let client = PersistClient::new_for_tests().await;
1440 let mut txns = TxnsHandle::expect_open(client.clone()).await;
1441 let log = txns.new_log();
1442 let d0 = txns.expect_register(1).await;
1443
1444 let tidy_5 = txns.expect_commit_at(5, d0, &["5"], &log).await;
1445 let _ = txns.expect_commit_at(15, d0, &["15"], &log).await;
1446 txns.tidy_at(20, tidy_5).await.unwrap();
1447 let _ = txns.txns_cache.update_gt(&20).await;
1448 assert_eq!(txns.txns_cache.min_unapplied_ts(), &15);
1449 txns.compact_to(10).await;
1450
1451 let mut txns_read = client
1452 .open_leased_reader(
1453 txns.txns_id(),
1454 Arc::new(ShardIdSchema),
1455 Arc::new(VecU8Schema),
1456 Diagnostics::for_tests(),
1457 true,
1458 )
1459 .await
1460 .expect("txns schema shouldn't change");
1461 txns_read.downgrade_since(&Antichain::from_elem(10)).await;
1462 let mut cache = TxnsCache::<_, TxnsCodecDefault>::from_read(txns_read, None).await;
1463 let _ = cache.update_gt(&15).await;
1464 let snap = cache.data_snapshot(d0, 12);
1465 assert_eq!(snap.latest_write, Some(5));
1466 }
1467
1468 #[mz_ore::test(tokio::test)]
1473 #[cfg_attr(miri, ignore)] async fn regression_ts_sort() {
1475 let client = PersistClient::new_for_tests().await;
1476 let txns = TxnsHandle::expect_open(client.clone()).await;
1477 let mut cache = TxnsCache::expect_open(0, &txns).await;
1478 let d0 = ShardId::new();
1479
1480 cache.push_entries(
1482 vec![
1483 (TxnsEntry::Register(d0, u64::encode(&2)), 2, -1),
1484 (TxnsEntry::Register(d0, u64::encode(&1)), 2, 1),
1485 ],
1486 3,
1487 );
1488 }
1489
1490 #[mz_ore::test(tokio::test)]
1493 #[cfg_attr(miri, ignore)] async fn data_compacted() {
1495 let d0 = ShardId::new();
1496 let mut c = TxnsCacheState::new(ShardId::new(), 10, None);
1497 c.progress_exclusive = 20;
1498
1499 #[allow(clippy::disallowed_methods)] let result = std::panic::catch_unwind(|| c.data_listen_next(&d0, &0));
1501 assert_err!(result);
1502
1503 let ds = c.data_snapshot(d0, 0);
1504 assert_eq!(
1505 ds,
1506 DataSnapshot {
1507 data_id: d0,
1508 latest_write: None,
1509 as_of: 0,
1510 empty_to: 10,
1511 }
1512 );
1513 }
1514}