1#[cfg(debug_assertions)]
13use std::collections::BTreeSet;
14use std::fmt::Debug;
15use std::ops::ControlFlow::{Break, Continue};
16use std::sync::Arc;
17use std::time::SystemTime;
18
19use bytes::Bytes;
20use differential_dataflow::difference::Semigroup;
21use differential_dataflow::lattice::Lattice;
22use differential_dataflow::trace::Description;
23use mz_ore::cast::CastFrom;
24use mz_persist::location::{
25 Blob, CaSResult, Consensus, Indeterminate, SCAN_ALL, SeqNo, VersionedData,
26};
27use mz_persist::retry::Retry;
28use mz_persist_types::{Codec, Codec64};
29use mz_proto::RustType;
30use prost::Message;
31use timely::progress::Timestamp;
32use tracing::{Instrument, debug, debug_span, trace, warn};
33
34use crate::cfg::STATE_VERSIONS_RECENT_LIVE_DIFFS_LIMIT;
35use crate::error::{CodecMismatch, CodecMismatchT};
36use crate::internal::encoding::{Rollup, UntypedState};
37use crate::internal::machine::{retry_determinate, retry_external};
38use crate::internal::metrics::ShardMetrics;
39use crate::internal::paths::{BlobKey, PartialBlobKey, PartialRollupKey, RollupId};
40#[cfg(debug_assertions)]
41use crate::internal::state::HollowBatch;
42use crate::internal::state::{
43 BatchPart, HollowBlobRef, HollowRollup, NoOpStateTransition, RunPart, State, TypedState,
44};
45use crate::internal::state_diff::{StateDiff, StateFieldValDiff};
46use crate::{Metrics, PersistConfig, ShardId};
47
48#[derive(Debug)]
99pub struct StateVersions {
100 pub(crate) cfg: PersistConfig,
101 pub(crate) consensus: Arc<dyn Consensus>,
102 pub(crate) blob: Arc<dyn Blob>,
103 pub(crate) metrics: Arc<Metrics>,
104}
105
106#[derive(Debug, Clone)]
107pub struct RecentLiveDiffs(pub Vec<VersionedData>);
108
109#[derive(Debug, Clone)]
110pub struct AllLiveDiffs(pub Vec<VersionedData>);
111
112#[derive(Debug, Clone)]
113pub struct EncodedRollup {
114 pub(crate) shard_id: ShardId,
115 pub(crate) seqno: SeqNo,
116 pub(crate) key: PartialRollupKey,
117 pub(crate) _desc: Description<SeqNo>,
118 buf: Bytes,
119}
120
121impl EncodedRollup {
122 pub fn to_hollow(&self) -> HollowRollup {
123 HollowRollup {
124 key: self.key.clone(),
125 encoded_size_bytes: Some(self.buf.len()),
126 }
127 }
128}
129
130impl StateVersions {
131 pub fn new(
132 cfg: PersistConfig,
133 consensus: Arc<dyn Consensus>,
134 blob: Arc<dyn Blob>,
135 metrics: Arc<Metrics>,
136 ) -> Self {
137 StateVersions {
138 cfg,
139 consensus,
140 blob,
141 metrics,
142 }
143 }
144
145 pub async fn maybe_init_shard<K, V, T, D>(
148 &self,
149 shard_metrics: &ShardMetrics,
150 ) -> Result<TypedState<K, V, T, D>, Box<CodecMismatch>>
151 where
152 K: Debug + Codec,
153 V: Debug + Codec,
154 T: Timestamp + Lattice + Codec64,
155 D: Semigroup + Codec64,
156 {
157 let shard_id = shard_metrics.shard_id;
158
159 let recent_live_diffs = self.fetch_recent_live_diffs::<T>(&shard_id).await;
161 if !recent_live_diffs.0.is_empty() {
162 return self
163 .fetch_current_state(&shard_id, recent_live_diffs.0)
164 .await
165 .check_codecs(&shard_id);
166 }
167
168 let (initial_state, initial_diff) = self.write_initial_rollup(shard_metrics).await;
170 let (cas_res, _diff) =
171 retry_external(&self.metrics.retries.external.maybe_init_cas, || async {
172 self.try_compare_and_set_current(
173 "maybe_init_shard",
174 shard_metrics,
175 None,
176 &initial_state,
177 &initial_diff,
178 )
179 .await
180 .map_err(|err| err.into())
181 })
182 .await;
183 match cas_res {
184 CaSResult::Committed => Ok(initial_state),
185 CaSResult::ExpectationMismatch => {
186 let recent_live_diffs = self.fetch_recent_live_diffs::<T>(&shard_id).await;
187 let state = self
188 .fetch_current_state(&shard_id, recent_live_diffs.0)
189 .await
190 .check_codecs(&shard_id);
191
192 let (_, rollup) = initial_state.latest_rollup();
200 let should_delete_rollup = match state.as_ref() {
201 Ok(state) => !state
202 .collections
203 .rollups
204 .values()
205 .any(|x| &x.key == &rollup.key),
206 Err(_codec_mismatch) => true,
209 };
210 if should_delete_rollup {
211 self.delete_rollup(&shard_id, &rollup.key).await;
212 }
213
214 state
215 }
216 }
217 }
218
219 pub async fn try_compare_and_set_current<K, V, T, D>(
224 &self,
225 cmd_name: &str,
226 shard_metrics: &ShardMetrics,
227 expected: Option<SeqNo>,
228 new_state: &TypedState<K, V, T, D>,
229 diff: &StateDiff<T>,
230 ) -> Result<(CaSResult, VersionedData), Indeterminate>
231 where
232 K: Debug + Codec,
233 V: Debug + Codec,
234 T: Timestamp + Lattice + Codec64,
235 D: Semigroup + Codec64,
236 {
237 assert_eq!(shard_metrics.shard_id, new_state.shard_id);
238 let path = new_state.shard_id.to_string();
239
240 trace!(
241 "apply_unbatched_cmd {} attempting {}\n new_state={:?}",
242 cmd_name,
243 new_state.seqno(),
244 new_state
245 );
246 let new = self.metrics.codecs.state_diff.encode(|| {
247 let mut buf = Vec::new();
248 diff.encode(&mut buf);
249 VersionedData {
250 seqno: new_state.seqno(),
251 data: Bytes::from(buf),
252 }
253 });
254 assert_eq!(new.seqno, diff.seqno_to);
255
256 let payload_len = new.data.len();
257 let cas_res = retry_determinate(
258 &self.metrics.retries.determinate.apply_unbatched_cmd_cas,
259 || async {
260 self.consensus
261 .compare_and_set(&path, expected, new.clone())
262 .await
263 },
264 )
265 .instrument(debug_span!("apply_unbatched_cmd::cas", payload_len))
266 .await
267 .map_err(|err| {
268 debug!("apply_unbatched_cmd {} errored: {}", cmd_name, err);
269 err
270 })?;
271
272 match cas_res {
273 CaSResult::Committed => {
274 trace!(
275 "apply_unbatched_cmd {} succeeded {}\n new_state={:?}",
276 cmd_name,
277 new_state.seqno(),
278 new_state
279 );
280
281 shard_metrics.set_since(new_state.since());
282 shard_metrics.set_upper(new_state.upper());
283 shard_metrics.seqnos_since_last_rollup.set(
284 new_state
285 .seqno
286 .0
287 .saturating_sub(new_state.latest_rollup().0.0),
288 );
289 shard_metrics
290 .spine_batch_count
291 .set(u64::cast_from(new_state.spine_batch_count()));
292 let size_metrics = new_state.size_metrics();
293 shard_metrics
294 .schema_registry_version_count
295 .set(u64::cast_from(new_state.collections.schemas.len()));
296 shard_metrics
297 .hollow_batch_count
298 .set(u64::cast_from(size_metrics.hollow_batch_count));
299 shard_metrics
300 .batch_part_count
301 .set(u64::cast_from(size_metrics.batch_part_count));
302 shard_metrics
303 .rewrite_part_count
304 .set(u64::cast_from(size_metrics.rewrite_part_count));
305 shard_metrics
306 .update_count
307 .set(u64::cast_from(size_metrics.num_updates));
308 shard_metrics
309 .rollup_count
310 .set(u64::cast_from(size_metrics.state_rollup_count));
311 shard_metrics
312 .largest_batch_size
313 .set(u64::cast_from(size_metrics.largest_batch_bytes));
314 shard_metrics
315 .usage_current_state_batches_bytes
316 .set(u64::cast_from(size_metrics.state_batches_bytes));
317 shard_metrics
318 .usage_current_state_rollups_bytes
319 .set(u64::cast_from(size_metrics.state_rollups_bytes));
320 shard_metrics
321 .seqnos_held
322 .set(u64::cast_from(new_state.seqnos_held()));
323 shard_metrics
324 .encoded_diff_size
325 .inc_by(u64::cast_from(payload_len));
326 shard_metrics
327 .live_writers
328 .set(u64::cast_from(new_state.collections.writers.len()));
329 shard_metrics
330 .rewrite_part_count
331 .set(u64::cast_from(size_metrics.rewrite_part_count));
332 shard_metrics
333 .inline_part_count
334 .set(u64::cast_from(size_metrics.inline_part_count));
335 shard_metrics
336 .inline_part_bytes
337 .set(u64::cast_from(size_metrics.inline_part_bytes));
338
339 let spine_metrics = new_state.collections.trace.spine_metrics();
340 shard_metrics
341 .compact_batches
342 .set(spine_metrics.compact_batches);
343 shard_metrics
344 .compacting_batches
345 .set(spine_metrics.compacting_batches);
346 shard_metrics
347 .noncompact_batches
348 .set(spine_metrics.noncompact_batches);
349
350 let batch_parts_by_version = new_state
351 .collections
352 .trace
353 .batches()
354 .flat_map(|x| x.parts.iter())
355 .flat_map(|part| {
356 let key = match part {
357 RunPart::Many(x) => Some(&x.key),
358 RunPart::Single(BatchPart::Hollow(x)) => Some(&x.key),
359 RunPart::Single(BatchPart::Inline { .. }) => None,
361 }?;
362 let (writer_key, _) = key.0.split_once('/')?;
364 match &writer_key[..1] {
365 "w" => Some(("old", part.encoded_size_bytes())),
366 "n" => Some((&writer_key[1..], part.encoded_size_bytes())),
367 _ => None,
368 }
369 });
370 shard_metrics.set_batch_part_versions(batch_parts_by_version);
371
372 Ok((CaSResult::Committed, new))
373 }
374 CaSResult::ExpectationMismatch => {
375 debug!(
376 "apply_unbatched_cmd {} {} lost the CaS race, retrying: {:?}",
377 new_state.shard_id(),
378 cmd_name,
379 expected,
380 );
381 Ok((CaSResult::ExpectationMismatch, new))
382 }
383 }
384 }
385
386 pub async fn fetch_current_state<T>(
393 &self,
394 shard_id: &ShardId,
395 mut live_diffs: Vec<VersionedData>,
396 ) -> UntypedState<T>
397 where
398 T: Timestamp + Lattice + Codec64,
399 {
400 let retry = self
401 .metrics
402 .retries
403 .fetch_latest_state
404 .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
405 loop {
406 let latest_diff = live_diffs
407 .last()
408 .expect("initialized shard should have at least one diff");
409 let latest_diff = self
410 .metrics
411 .codecs
412 .state_diff
413 .decode(|| {
415 StateDiff::<T>::decode(&self.cfg.build_version, latest_diff.data.clone())
416 });
417 let mut state = match self
418 .fetch_rollup_at_key(shard_id, &latest_diff.latest_rollup_key)
419 .await
420 {
421 Some(x) => x,
422 None => {
423 retry.retries.inc();
426 let earliest_before_refetch = live_diffs
427 .first()
428 .expect("initialized shard should have at least one diff")
429 .seqno;
430 live_diffs = self.fetch_recent_live_diffs::<T>(shard_id).await.0;
431
432 let earliest_after_refetch = live_diffs
439 .first()
440 .expect("initialized shard should have at least one diff")
441 .seqno;
442 if earliest_before_refetch >= earliest_after_refetch {
443 warn!(
444 concat!(
445 "fetch_current_state refetch expects earliest live diff to advance: {} vs {}. ",
446 "In dev and testing, this happens when persist's Blob (files in mzdata) ",
447 "is deleted out from under it or when two processes are talking to ",
448 "different Blobs (e.g. docker containers without it shared)."
449 ),
450 earliest_before_refetch, earliest_after_refetch
451 )
452 }
453 continue;
454 }
455 };
456
457 state.apply_encoded_diffs(&self.cfg, &self.metrics, &live_diffs);
458 return state;
459 }
460 }
461
462 pub async fn fetch_all_live_states<T>(
466 &self,
467 shard_id: ShardId,
468 ) -> Option<UntypedStateVersionsIter<T>>
469 where
470 T: Timestamp + Lattice + Codec64,
471 {
472 let retry = self
473 .metrics
474 .retries
475 .fetch_live_states
476 .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
477 let mut all_live_diffs = self.fetch_all_live_diffs(&shard_id).await;
478 loop {
479 let earliest_live_diff = match all_live_diffs.0.first() {
480 Some(x) => x,
481 None => return None,
482 };
483 let state = match self
484 .fetch_rollup_at_seqno(
485 &shard_id,
486 all_live_diffs.0.clone(),
487 earliest_live_diff.seqno,
488 )
489 .await
490 {
491 Some(x) => x,
492 None => {
493 retry.retries.inc();
500 let earliest_before_refetch = earliest_live_diff.seqno;
501 all_live_diffs = self.fetch_all_live_diffs(&shard_id).await;
502
503 let earliest_after_refetch = all_live_diffs
510 .0
511 .first()
512 .expect("initialized shard should have at least one diff")
513 .seqno;
514 if earliest_before_refetch >= earliest_after_refetch {
515 warn!(
516 concat!(
517 "fetch_all_live_states refetch expects earliest live diff to advance: {} vs {}. ",
518 "In dev and testing, this happens when persist's Blob (files in mzdata) ",
519 "is deleted out from under it or when two processes are talking to ",
520 "different Blobs (e.g. docker containers without it shared)."
521 ),
522 earliest_before_refetch, earliest_after_refetch
523 )
524 }
525 continue;
526 }
527 };
528 assert_eq!(earliest_live_diff.seqno, state.seqno());
529 return Some(UntypedStateVersionsIter {
530 shard_id,
531 cfg: self.cfg.clone(),
532 metrics: Arc::clone(&self.metrics),
533 state,
534 diffs: all_live_diffs.0,
535 });
536 }
537 }
538
539 pub async fn fetch_all_live_diffs(&self, shard_id: &ShardId) -> AllLiveDiffs {
545 let path = shard_id.to_string();
546 let diffs = retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
547 self.consensus.scan(&path, SeqNo::minimum(), SCAN_ALL).await
548 })
549 .instrument(debug_span!("fetch_state::scan"))
550 .await;
551 AllLiveDiffs(diffs)
552 }
553
554 pub async fn fetch_recent_live_diffs<T>(&self, shard_id: &ShardId) -> RecentLiveDiffs
561 where
562 T: Timestamp + Lattice + Codec64,
563 {
564 let path = shard_id.to_string();
565 let scan_limit = STATE_VERSIONS_RECENT_LIVE_DIFFS_LIMIT.get(&self.cfg);
566 let oldest_diffs =
567 retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
568 self.consensus
569 .scan(&path, SeqNo::minimum(), scan_limit)
570 .await
571 })
572 .instrument(debug_span!("fetch_state::scan"))
573 .await;
574
575 if oldest_diffs.len() < scan_limit {
578 self.metrics.state.fetch_recent_live_diffs_fast_path.inc();
579 return RecentLiveDiffs(oldest_diffs);
580 }
581
582 let head = retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
593 self.consensus.head(&path).await
594 })
595 .instrument(debug_span!("fetch_state::slow_path::head"))
596 .await
597 .expect("initialized shard should have at least 1 diff");
598
599 let latest_diff = self
600 .metrics
601 .codecs
602 .state_diff
603 .decode(|| StateDiff::<T>::decode(&self.cfg.build_version, head.data));
604
605 match BlobKey::parse_ids(&latest_diff.latest_rollup_key.complete(shard_id)) {
606 Ok((_shard_id, PartialBlobKey::Rollup(seqno, _rollup))) => {
607 self.metrics.state.fetch_recent_live_diffs_slow_path.inc();
608 let diffs =
609 retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
610 self.consensus.scan(&path, seqno, SCAN_ALL).await
614 })
615 .instrument(debug_span!("fetch_state::slow_path::scan"))
616 .await;
617 RecentLiveDiffs(diffs)
618 }
619 Ok(_) => panic!(
620 "invalid state diff rollup key: {}",
621 latest_diff.latest_rollup_key
622 ),
623 Err(err) => panic!("unparseable state diff rollup key: {}", err),
624 }
625 }
626
627 pub async fn fetch_all_live_diffs_gt_seqno<K, V, T, D>(
632 &self,
633 shard_id: &ShardId,
634 seqno: SeqNo,
635 ) -> Vec<VersionedData> {
636 let path = shard_id.to_string();
637 retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
638 self.consensus.scan(&path, seqno.next(), SCAN_ALL).await
639 })
640 .instrument(debug_span!("fetch_state::scan"))
641 .await
642 }
643
644 pub async fn truncate_diffs(&self, shard_id: &ShardId, seqno: SeqNo) {
646 let path = shard_id.to_string();
647 let _deleted_count = retry_external(&self.metrics.retries.external.gc_truncate, || async {
648 self.consensus.truncate(&path, seqno).await
649 })
650 .instrument(debug_span!("gc::truncate"))
651 .await;
652 }
653
654 async fn write_initial_rollup<K, V, T, D>(
658 &self,
659 shard_metrics: &ShardMetrics,
660 ) -> (TypedState<K, V, T, D>, StateDiff<T>)
661 where
662 K: Debug + Codec,
663 V: Debug + Codec,
664 T: Timestamp + Lattice + Codec64,
665 D: Semigroup + Codec64,
666 {
667 let empty_state = TypedState::new(
668 self.cfg.build_version.clone(),
669 shard_metrics.shard_id,
670 self.cfg.hostname.clone(),
671 (self.cfg.now)(),
672 );
673 let rollup_seqno = empty_state.seqno.next();
674 let rollup = HollowRollup {
675 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
676 encoded_size_bytes: None,
680 };
681 let (applied, initial_state) = match empty_state
682 .clone_apply(&self.cfg, &mut |_, _, state| {
683 state.add_rollup((rollup_seqno, &rollup))
684 }) {
685 Continue(x) => x,
686 Break(NoOpStateTransition(_)) => {
687 panic!("initial state transition should not be a no-op")
688 }
689 };
690 assert!(
691 applied,
692 "add_and_remove_rollups should apply to the empty state"
693 );
694
695 let rollup = self.encode_rollup_blob(
696 shard_metrics,
697 initial_state.clone_for_rollup(),
698 vec![],
699 rollup.key,
700 );
701 let () = self.write_rollup_blob(&rollup).await;
702 assert_eq!(initial_state.seqno, rollup.seqno);
703
704 let diff = StateDiff::from_diff(&empty_state.state, &initial_state.state);
705 (initial_state, diff)
706 }
707
708 pub async fn write_rollup_for_state<K, V, T, D>(
709 &self,
710 shard_metrics: &ShardMetrics,
711 state: TypedState<K, V, T, D>,
712 rollup_id: &RollupId,
713 ) -> Option<EncodedRollup>
714 where
715 K: Debug + Codec,
716 V: Debug + Codec,
717 T: Timestamp + Lattice + Codec64,
718 D: Semigroup + Codec64,
719 {
720 let (latest_rollup_seqno, _rollup) = state.latest_rollup();
721 let seqno = state.seqno();
722
723 let diffs: Vec<_> = self
727 .fetch_all_live_diffs_gt_seqno::<K, V, T, D>(&state.shard_id, *latest_rollup_seqno)
728 .await;
729
730 match diffs.first() {
731 None => {
732 self.metrics.state.rollup_write_noop_latest.inc();
738 assert_eq!(seqno, *latest_rollup_seqno);
739 return None;
740 }
741 Some(first) => {
742 self.metrics.state.rollup_write_noop_truncated.inc();
750 if first.seqno != latest_rollup_seqno.next() {
751 assert!(
752 first.seqno > latest_rollup_seqno.next(),
753 "diff: {}, rollup: {}",
754 first.seqno,
755 latest_rollup_seqno,
756 );
757 return None;
758 }
759 }
760 }
761
762 let diffs: Vec<_> = diffs.into_iter().filter(|x| x.seqno <= seqno).collect();
764
765 assert_eq!(
768 diffs.first().map(|x| x.seqno),
769 Some(latest_rollup_seqno.next())
770 );
771 assert_eq!(diffs.last().map(|x| x.seqno), Some(state.seqno));
772
773 let key = PartialRollupKey::new(state.seqno, rollup_id);
774 let rollup = self.encode_rollup_blob(shard_metrics, state, diffs, key);
775 let () = self.write_rollup_blob(&rollup).await;
776
777 self.metrics.state.rollup_write_success.inc();
778
779 Some(rollup)
780 }
781
782 pub fn encode_rollup_blob<K, V, T, D>(
786 &self,
787 shard_metrics: &ShardMetrics,
788 state: TypedState<K, V, T, D>,
789 diffs: Vec<VersionedData>,
790 key: PartialRollupKey,
791 ) -> EncodedRollup
792 where
793 K: Debug + Codec,
794 V: Debug + Codec,
795 T: Timestamp + Lattice + Codec64,
796 D: Semigroup + Codec64,
797 {
798 let shard_id = state.shard_id;
799 let rollup_seqno = state.seqno;
800
801 let rollup = Rollup::from(state.into(), diffs);
802 let desc = rollup.diffs.as_ref().expect("inlined diffs").description();
803
804 let buf = self.metrics.codecs.state.encode(|| {
805 let mut buf = Vec::new();
806 rollup
807 .into_proto()
808 .encode(&mut buf)
809 .expect("no required fields means no initialization errors");
810 Bytes::from(buf)
811 });
812 shard_metrics
813 .latest_rollup_size
814 .set(u64::cast_from(buf.len()));
815 EncodedRollup {
816 shard_id,
817 seqno: rollup_seqno,
818 key,
819 buf,
820 _desc: desc,
821 }
822 }
823
824 pub async fn write_rollup_blob(&self, rollup: &EncodedRollup) {
826 let payload_len = rollup.buf.len();
827 retry_external(&self.metrics.retries.external.rollup_set, || async {
828 self.blob
829 .set(
830 &rollup.key.complete(&rollup.shard_id),
831 Bytes::clone(&rollup.buf),
832 )
833 .await
834 })
835 .instrument(debug_span!("rollup::set", payload_len))
836 .await;
837 }
838
839 async fn fetch_rollup_at_seqno<T>(
846 &self,
847 shard_id: &ShardId,
848 live_diffs: Vec<VersionedData>,
849 seqno: SeqNo,
850 ) -> Option<UntypedState<T>>
851 where
852 T: Timestamp + Lattice + Codec64,
853 {
854 let rollup_key_for_migration = live_diffs.iter().find_map(|x| {
855 let diff = self
856 .metrics
857 .codecs
858 .state_diff
859 .decode(|| StateDiff::<T>::decode(&self.cfg.build_version, x.data.clone()));
861 diff.rollups
862 .iter()
863 .find(|x| x.key == seqno)
864 .map(|x| match &x.val {
865 StateFieldValDiff::Insert(x) => x.clone(),
866 StateFieldValDiff::Update(_, x) => x.clone(),
867 StateFieldValDiff::Delete(x) => x.clone(),
868 })
869 });
870
871 let state = self.fetch_current_state::<T>(shard_id, live_diffs).await;
872 if let Some(rollup) = state.rollups().get(&seqno) {
873 return self.fetch_rollup_at_key(shard_id, &rollup.key).await;
874 }
875
876 let rollup = rollup_key_for_migration.expect("someone should have a key for this rollup");
909 tracing::info!("only found rollup for {} {} via migration", shard_id, seqno);
910 self.metrics.state.rollup_at_seqno_migration.inc();
911 self.fetch_rollup_at_key(shard_id, &rollup.key).await
912 }
913
914 async fn fetch_rollup_at_key<T>(
916 &self,
917 shard_id: &ShardId,
918 rollup_key: &PartialRollupKey,
919 ) -> Option<UntypedState<T>>
920 where
921 T: Timestamp + Lattice + Codec64,
922 {
923 retry_external(&self.metrics.retries.external.rollup_get, || async {
924 self.blob.get(&rollup_key.complete(shard_id)).await
925 })
926 .instrument(debug_span!("rollup::get"))
927 .await
928 .map(|buf| {
929 self.metrics
930 .codecs
931 .state
932 .decode(|| UntypedState::decode(&self.cfg.build_version, buf))
933 })
934 }
935
936 pub async fn delete_rollup(&self, shard_id: &ShardId, key: &PartialRollupKey) {
938 let _ = retry_external(&self.metrics.retries.external.rollup_delete, || async {
939 self.blob.delete(&key.complete(shard_id)).await
940 })
941 .await
942 .instrument(debug_span!("rollup::delete"));
943 }
944}
945
946pub struct UntypedStateVersionsIter<T> {
947 shard_id: ShardId,
948 cfg: PersistConfig,
949 metrics: Arc<Metrics>,
950 state: UntypedState<T>,
951 diffs: Vec<VersionedData>,
952}
953
954impl<T: Timestamp + Lattice + Codec64> UntypedStateVersionsIter<T> {
955 pub(crate) fn check_ts_codec(self) -> Result<StateVersionsIter<T>, CodecMismatchT> {
956 let key_codec = self.state.key_codec.clone();
957 let val_codec = self.state.val_codec.clone();
958 let diff_codec = self.state.diff_codec.clone();
959 let state = self.state.check_ts_codec(&self.shard_id)?;
960 Ok(StateVersionsIter::new(
961 self.cfg,
962 self.metrics,
963 state,
964 self.diffs,
965 key_codec,
966 val_codec,
967 diff_codec,
968 ))
969 }
970}
971
972pub struct StateVersionsIter<T> {
974 cfg: PersistConfig,
975 metrics: Arc<Metrics>,
976 state: State<T>,
977 diffs: Vec<VersionedData>,
978 key_codec: String,
979 val_codec: String,
980 diff_codec: String,
981 #[cfg(debug_assertions)]
982 validator: ReferencedBlobValidator<T>,
983}
984
985impl<T: Timestamp + Lattice + Codec64> StateVersionsIter<T> {
986 fn new(
987 cfg: PersistConfig,
988 metrics: Arc<Metrics>,
989 state: State<T>,
990 mut diffs: Vec<VersionedData>,
992 key_codec: String,
993 val_codec: String,
994 diff_codec: String,
995 ) -> Self {
996 assert!(diffs.first().map_or(true, |x| x.seqno == state.seqno));
997 diffs.reverse();
998 StateVersionsIter {
999 cfg,
1000 metrics,
1001 state,
1002 diffs,
1003 key_codec,
1004 val_codec,
1005 diff_codec,
1006 #[cfg(debug_assertions)]
1007 validator: ReferencedBlobValidator::default(),
1008 }
1009 }
1010
1011 pub fn len(&self) -> usize {
1012 self.diffs.len()
1013 }
1014
1015 pub fn next<F: for<'a> FnMut(InspectDiff<'a, T>)>(
1023 &mut self,
1024 mut inspect_diff_fn: F,
1025 ) -> Option<&State<T>> {
1026 let diff = match self.diffs.pop() {
1027 Some(x) => x,
1028 None => return None,
1029 };
1030 let data = diff.data.clone();
1031 let diff = self
1032 .metrics
1033 .codecs
1034 .state_diff
1035 .decode(|| StateDiff::decode(&self.cfg.build_version, diff.data));
1036
1037 if diff.seqno_to == self.state.seqno {
1040 let inspect = InspectDiff::FromInitial(&self.state);
1041 #[cfg(debug_assertions)]
1042 {
1043 inspect
1044 .referenced_blobs()
1045 .for_each(|x| self.validator.add_inc_blob(x));
1046 }
1047 inspect_diff_fn(inspect);
1048 } else {
1049 let inspect = InspectDiff::Diff(&diff);
1050 #[cfg(debug_assertions)]
1051 {
1052 inspect
1053 .referenced_blobs()
1054 .for_each(|x| self.validator.add_inc_blob(x));
1055 }
1056 inspect_diff_fn(inspect);
1057 }
1058
1059 let diff_seqno_to = diff.seqno_to;
1060 self.state
1061 .apply_diffs(&self.metrics, std::iter::once((diff, data)));
1062 assert_eq!(self.state.seqno, diff_seqno_to);
1063 #[cfg(debug_assertions)]
1064 {
1065 self.validator.validate_against_state(&self.state);
1066 }
1067 Some(&self.state)
1068 }
1069
1070 pub fn state(&self) -> &State<T> {
1071 &self.state
1072 }
1073
1074 pub fn into_rollup_proto_without_diffs(&self) -> impl serde::Serialize + use<T> {
1075 Rollup::from_state_without_diffs(
1076 State {
1077 applier_version: self.state.applier_version.clone(),
1078 shard_id: self.state.shard_id.clone(),
1079 seqno: self.state.seqno.clone(),
1080 walltime_ms: self.state.walltime_ms.clone(),
1081 hostname: self.state.hostname.clone(),
1082 collections: self.state.collections.clone(),
1083 },
1084 self.key_codec.clone(),
1085 self.val_codec.clone(),
1086 T::codec_name(),
1087 self.diff_codec.clone(),
1088 )
1089 .into_proto()
1090 }
1091}
1092
1093#[derive(Debug)]
1098pub enum InspectDiff<'a, T> {
1099 FromInitial(&'a State<T>),
1100 Diff(&'a StateDiff<T>),
1101}
1102
1103impl<T: Timestamp + Lattice + Codec64> InspectDiff<'_, T> {
1104 pub fn referenced_blobs(&self) -> impl Iterator<Item = HollowBlobRef<T>> {
1108 let (state, diff) = match self {
1109 InspectDiff::FromInitial(x) => (Some(x), None),
1110 InspectDiff::Diff(x) => (None, Some(x)),
1111 };
1112 let state_blobs = state.into_iter().flat_map(|s| s.blobs());
1113 let diff_blobs = diff.into_iter().flat_map(|d| d.blob_inserts());
1114 state_blobs.chain(diff_blobs)
1115 }
1116}
1117
1118#[cfg(debug_assertions)]
1119struct ReferencedBlobValidator<T> {
1120 full_batches: BTreeSet<HollowBatch<T>>,
1123 full_rollups: BTreeSet<HollowRollup>,
1124 inc_batches: BTreeSet<HollowBatch<T>>,
1127 inc_rollups: BTreeSet<HollowRollup>,
1128}
1129
1130#[cfg(debug_assertions)]
1131impl<T> Default for ReferencedBlobValidator<T> {
1132 fn default() -> Self {
1133 Self {
1134 full_batches: Default::default(),
1135 full_rollups: Default::default(),
1136 inc_batches: Default::default(),
1137 inc_rollups: Default::default(),
1138 }
1139 }
1140}
1141
1142#[cfg(debug_assertions)]
1143impl<T: Timestamp + Lattice + Codec64> ReferencedBlobValidator<T> {
1144 fn add_inc_blob(&mut self, x: HollowBlobRef<'_, T>) {
1145 match x {
1146 HollowBlobRef::Batch(x) => assert!(
1147 self.inc_batches.insert(x.clone()) || x.desc.lower() == x.desc.upper(),
1148 "non-empty batches should only be appended once; duplicate: {x:?}"
1149 ),
1150 HollowBlobRef::Rollup(x) => assert!(self.inc_rollups.insert(x.clone())),
1151 }
1152 }
1153 fn validate_against_state(&mut self, x: &State<T>) {
1154 use std::hash::{DefaultHasher, Hash, Hasher};
1155
1156 use mz_ore::collections::HashSet;
1157 use timely::progress::Antichain;
1158
1159 use crate::internal::state::BatchPart;
1160
1161 x.blobs().for_each(|x| match x {
1162 HollowBlobRef::Batch(x) => {
1163 self.full_batches.insert(x.clone());
1164 }
1165 HollowBlobRef::Rollup(x) => {
1166 self.full_rollups.insert(x.clone());
1167 }
1168 });
1169
1170 fn overall_desc<'a, T: Timestamp + Lattice>(
1174 iter: impl Iterator<Item = &'a Description<T>>,
1175 ) -> (Antichain<T>, Antichain<T>) {
1176 let mut lower = Antichain::new();
1177 let mut upper = Antichain::from_elem(T::minimum());
1178 for desc in iter {
1179 lower.meet_assign(desc.lower());
1180 upper.join_assign(desc.upper());
1181 }
1182 (lower, upper)
1183 }
1184 let (inc_lower, inc_upper) = overall_desc(self.inc_batches.iter().map(|a| &a.desc));
1185 let (full_lower, full_upper) = overall_desc(self.full_batches.iter().map(|a| &a.desc));
1186 assert_eq!(inc_lower, full_lower);
1187 assert_eq!(inc_upper, full_upper);
1188
1189 fn part_unique<T: Hash>(x: &RunPart<T>) -> String {
1190 match x {
1191 RunPart::Single(BatchPart::Inline {
1192 updates,
1193 ts_rewrite,
1194 ..
1195 }) => {
1196 let mut h = DefaultHasher::new();
1197 updates.hash(&mut h);
1198 ts_rewrite.as_ref().map(|x| x.elements()).hash(&mut h);
1199 h.finish().to_string()
1200 }
1201 other => other.printable_name().to_string(),
1202 }
1203 }
1204
1205 let inc_parts: HashSet<_> = self
1207 .inc_batches
1208 .iter()
1209 .flat_map(|x| x.parts.iter())
1210 .map(part_unique)
1211 .collect();
1212 let full_parts = self
1213 .full_batches
1214 .iter()
1215 .flat_map(|x| x.parts.iter())
1216 .map(part_unique)
1217 .collect();
1218 assert_eq!(inc_parts, full_parts);
1219
1220 assert_eq!(self.inc_rollups, self.full_rollups);
1222 }
1223}
1224
1225#[cfg(test)]
1226mod tests {
1227 use mz_dyncfg::ConfigUpdates;
1228
1229 use crate::tests::new_test_client;
1230
1231 use super::*;
1232
1233 #[mz_persist_proc::test(tokio::test)]
1236 #[cfg_attr(miri, ignore)] async fn fetch_all_live_states_regression_uninitialized(dyncfgs: ConfigUpdates) {
1238 let client = new_test_client(&dyncfgs).await;
1239 let state_versions = StateVersions::new(
1240 client.cfg.clone(),
1241 Arc::clone(&client.consensus),
1242 Arc::clone(&client.blob),
1243 Arc::clone(&client.metrics),
1244 );
1245 assert!(
1246 state_versions
1247 .fetch_all_live_states::<u64>(ShardId::new())
1248 .await
1249 .is_none()
1250 );
1251 }
1252}