1#[cfg(debug_assertions)]
13use std::collections::BTreeSet;
14use std::fmt::Debug;
15use std::ops::ControlFlow::{Break, Continue};
16use std::sync::Arc;
17use std::time::SystemTime;
18
19use bytes::Bytes;
20use differential_dataflow::difference::Monoid;
21use differential_dataflow::lattice::Lattice;
22use differential_dataflow::trace::Description;
23use mz_ore::cast::CastFrom;
24use mz_persist::location::{
25 Blob, CaSResult, Consensus, Indeterminate, SCAN_ALL, SeqNo, VersionedData,
26};
27use mz_persist::retry::Retry;
28use mz_persist_types::{Codec, Codec64};
29use mz_proto::RustType;
30use prost::Message;
31use timely::progress::Timestamp;
32use tracing::{Instrument, debug, debug_span, trace, warn};
33
34use crate::cfg::STATE_VERSIONS_RECENT_LIVE_DIFFS_LIMIT;
35use crate::error::{CodecMismatch, CodecMismatchT};
36use crate::internal::encoding::{Rollup, UntypedState};
37use crate::internal::machine::{retry_determinate, retry_external};
38use crate::internal::metrics::ShardMetrics;
39use crate::internal::paths::{BlobKey, PartialBlobKey, PartialRollupKey, RollupId};
40#[cfg(debug_assertions)]
41use crate::internal::state::HollowBatch;
42use crate::internal::state::{
43 BatchPart, HollowBlobRef, HollowRollup, NoOpStateTransition, RunPart, State, TypedState,
44};
45use crate::internal::state_diff::{StateDiff, StateFieldValDiff};
46use crate::{Metrics, PersistConfig, ShardId};
47
48#[derive(Debug)]
99pub struct StateVersions {
100 pub(crate) cfg: PersistConfig,
101 pub(crate) consensus: Arc<dyn Consensus>,
102 pub(crate) blob: Arc<dyn Blob>,
103 pub(crate) metrics: Arc<Metrics>,
104}
105
106#[derive(Debug, Clone)]
107pub struct RecentLiveDiffs(pub Vec<VersionedData>);
108
109#[derive(Debug, Clone)]
110pub struct AllLiveDiffs(pub Vec<VersionedData>);
111
112#[derive(Debug, Clone)]
113pub struct EncodedRollup {
114 pub(crate) shard_id: ShardId,
115 pub(crate) seqno: SeqNo,
116 pub(crate) key: PartialRollupKey,
117 pub(crate) _desc: Description<SeqNo>,
118 buf: Bytes,
119}
120
121impl EncodedRollup {
122 pub fn to_hollow(&self) -> HollowRollup {
123 HollowRollup {
124 key: self.key.clone(),
125 encoded_size_bytes: Some(self.buf.len()),
126 }
127 }
128}
129
130impl StateVersions {
131 pub fn new(
132 cfg: PersistConfig,
133 consensus: Arc<dyn Consensus>,
134 blob: Arc<dyn Blob>,
135 metrics: Arc<Metrics>,
136 ) -> Self {
137 StateVersions {
138 cfg,
139 consensus,
140 blob,
141 metrics,
142 }
143 }
144
145 pub async fn maybe_init_shard<K, V, T, D>(
148 &self,
149 shard_metrics: &ShardMetrics,
150 ) -> Result<TypedState<K, V, T, D>, Box<CodecMismatch>>
151 where
152 K: Debug + Codec,
153 V: Debug + Codec,
154 T: Timestamp + Lattice + Codec64,
155 D: Monoid + Codec64,
156 {
157 let shard_id = shard_metrics.shard_id;
158
159 let recent_live_diffs = self.fetch_recent_live_diffs::<T>(&shard_id).await;
161 if !recent_live_diffs.0.is_empty() {
162 return self
163 .fetch_current_state(&shard_id, recent_live_diffs.0)
164 .await
165 .check_codecs(&shard_id);
166 }
167
168 let (initial_state, initial_diff) = self.write_initial_rollup(shard_metrics).await;
170 let (cas_res, _diff) =
171 retry_external(&self.metrics.retries.external.maybe_init_cas, || async {
172 self.try_compare_and_set_current(
173 "maybe_init_shard",
174 shard_metrics,
175 None,
176 &initial_state,
177 &initial_diff,
178 )
179 .await
180 .map_err(|err| err.into())
181 })
182 .await;
183 match cas_res {
184 CaSResult::Committed => Ok(initial_state),
185 CaSResult::ExpectationMismatch => {
186 let recent_live_diffs = self.fetch_recent_live_diffs::<T>(&shard_id).await;
187 let state = self
188 .fetch_current_state(&shard_id, recent_live_diffs.0)
189 .await
190 .check_codecs(&shard_id);
191
192 let (_, rollup) = initial_state.latest_rollup();
200 let should_delete_rollup = match state.as_ref() {
201 Ok(state) => !state
202 .collections
203 .rollups
204 .values()
205 .any(|x| &x.key == &rollup.key),
206 Err(_codec_mismatch) => true,
209 };
210 if should_delete_rollup {
211 self.delete_rollup(&shard_id, &rollup.key).await;
212 }
213
214 state
215 }
216 }
217 }
218
219 pub async fn try_compare_and_set_current<K, V, T, D>(
224 &self,
225 cmd_name: &str,
226 shard_metrics: &ShardMetrics,
227 expected: Option<SeqNo>,
228 new_state: &TypedState<K, V, T, D>,
229 diff: &StateDiff<T>,
230 ) -> Result<(CaSResult, VersionedData), Indeterminate>
231 where
232 K: Debug + Codec,
233 V: Debug + Codec,
234 T: Timestamp + Lattice + Codec64,
235 D: Monoid + Codec64,
236 {
237 assert_eq!(shard_metrics.shard_id, new_state.shard_id);
238 let path = new_state.shard_id.to_string();
239
240 trace!(
241 "apply_unbatched_cmd {} attempting {}\n new_state={:?}",
242 cmd_name,
243 new_state.seqno(),
244 new_state
245 );
246 let new = self.metrics.codecs.state_diff.encode(|| {
247 let mut buf = Vec::new();
248 diff.encode(&mut buf);
249 VersionedData {
250 seqno: new_state.seqno(),
251 data: Bytes::from(buf),
252 }
253 });
254 assert_eq!(new.seqno, diff.seqno_to);
255
256 let payload_len = new.data.len();
257 let cas_res = retry_determinate(
258 &self.metrics.retries.determinate.apply_unbatched_cmd_cas,
259 || async {
260 self.consensus
261 .compare_and_set(&path, expected, new.clone())
262 .await
263 },
264 )
265 .instrument(debug_span!("apply_unbatched_cmd::cas", payload_len))
266 .await
267 .map_err(|err| {
268 debug!("apply_unbatched_cmd {} errored: {}", cmd_name, err);
269 err
270 })?;
271
272 match cas_res {
273 CaSResult::Committed => {
274 trace!(
275 "apply_unbatched_cmd {} succeeded {}\n new_state={:?}",
276 cmd_name,
277 new_state.seqno(),
278 new_state
279 );
280
281 shard_metrics.set_since(new_state.since());
282 shard_metrics.set_upper(new_state.upper());
283 shard_metrics.seqnos_since_last_rollup.set(
284 new_state
285 .seqno
286 .0
287 .saturating_sub(new_state.latest_rollup().0.0),
288 );
289 shard_metrics
290 .spine_batch_count
291 .set(u64::cast_from(new_state.spine_batch_count()));
292 let size_metrics = new_state.size_metrics();
293 shard_metrics
294 .schema_registry_version_count
295 .set(u64::cast_from(new_state.collections.schemas.len()));
296 shard_metrics
297 .hollow_batch_count
298 .set(u64::cast_from(size_metrics.hollow_batch_count));
299 shard_metrics
300 .batch_part_count
301 .set(u64::cast_from(size_metrics.batch_part_count));
302 shard_metrics
303 .rewrite_part_count
304 .set(u64::cast_from(size_metrics.rewrite_part_count));
305 shard_metrics
306 .update_count
307 .set(u64::cast_from(size_metrics.num_updates));
308 shard_metrics
309 .rollup_count
310 .set(u64::cast_from(size_metrics.state_rollup_count));
311 shard_metrics
312 .largest_batch_size
313 .set(u64::cast_from(size_metrics.largest_batch_bytes));
314 shard_metrics
315 .usage_current_state_batches_bytes
316 .set(u64::cast_from(size_metrics.state_batches_bytes));
317 shard_metrics
318 .usage_current_state_rollups_bytes
319 .set(u64::cast_from(size_metrics.state_rollups_bytes));
320 shard_metrics
321 .seqnos_held
322 .set(u64::cast_from(new_state.seqnos_held()));
323 shard_metrics
324 .encoded_diff_size
325 .inc_by(u64::cast_from(payload_len));
326 shard_metrics
327 .live_writers
328 .set(u64::cast_from(new_state.collections.writers.len()));
329 shard_metrics
330 .rewrite_part_count
331 .set(u64::cast_from(size_metrics.rewrite_part_count));
332 shard_metrics
333 .inline_part_count
334 .set(u64::cast_from(size_metrics.inline_part_count));
335 shard_metrics
336 .inline_part_bytes
337 .set(u64::cast_from(size_metrics.inline_part_bytes));
338 shard_metrics.stale_version.set(
339 if new_state
340 .state
341 .collections
342 .version
343 .cmp_precedence(&self.cfg.build_version)
344 .is_lt()
345 {
346 1
347 } else {
348 0
349 },
350 );
351
352 let spine_metrics = new_state.collections.trace.spine_metrics();
353 shard_metrics
354 .compact_batches
355 .set(spine_metrics.compact_batches);
356 shard_metrics
357 .compacting_batches
358 .set(spine_metrics.compacting_batches);
359 shard_metrics
360 .noncompact_batches
361 .set(spine_metrics.noncompact_batches);
362
363 let batch_parts_by_version = new_state
364 .collections
365 .trace
366 .batches()
367 .flat_map(|x| x.parts.iter())
368 .flat_map(|part| {
369 let key = match part {
370 RunPart::Many(x) => Some(&x.key),
371 RunPart::Single(BatchPart::Hollow(x)) => Some(&x.key),
372 RunPart::Single(BatchPart::Inline { .. }) => None,
374 }?;
375 let (writer_key, _) = key.0.split_once('/')?;
377 match &writer_key[..1] {
378 "w" => Some(("old", part.encoded_size_bytes())),
379 "n" => Some((&writer_key[1..], part.encoded_size_bytes())),
380 _ => None,
381 }
382 });
383 shard_metrics.set_batch_part_versions(batch_parts_by_version);
384
385 Ok((CaSResult::Committed, new))
386 }
387 CaSResult::ExpectationMismatch => {
388 debug!(
389 "apply_unbatched_cmd {} {} lost the CaS race, retrying: {:?}",
390 new_state.shard_id(),
391 cmd_name,
392 expected,
393 );
394 Ok((CaSResult::ExpectationMismatch, new))
395 }
396 }
397 }
398
399 pub async fn fetch_current_state<T>(
406 &self,
407 shard_id: &ShardId,
408 mut live_diffs: Vec<VersionedData>,
409 ) -> UntypedState<T>
410 where
411 T: Timestamp + Lattice + Codec64,
412 {
413 let retry = self
414 .metrics
415 .retries
416 .fetch_latest_state
417 .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
418 loop {
419 let latest_diff = live_diffs
420 .last()
421 .expect("initialized shard should have at least one diff");
422 let latest_diff = self
423 .metrics
424 .codecs
425 .state_diff
426 .decode(|| {
428 StateDiff::<T>::decode(&self.cfg.build_version, latest_diff.data.clone())
429 });
430 let mut state = match self
431 .fetch_rollup_at_key(shard_id, &latest_diff.latest_rollup_key)
432 .await
433 {
434 Some(x) => x,
435 None => {
436 retry.retries.inc();
439 let earliest_before_refetch = live_diffs
440 .first()
441 .expect("initialized shard should have at least one diff")
442 .seqno;
443 live_diffs = self.fetch_recent_live_diffs::<T>(shard_id).await.0;
444
445 let earliest_after_refetch = live_diffs
452 .first()
453 .expect("initialized shard should have at least one diff")
454 .seqno;
455 if earliest_before_refetch >= earliest_after_refetch {
456 warn!(
457 concat!(
458 "fetch_current_state refetch expects earliest live diff to advance: {} vs {}. ",
459 "In dev and testing, this happens when persist's Blob (files in mzdata) ",
460 "is deleted out from under it or when two processes are talking to ",
461 "different Blobs (e.g. docker containers without it shared)."
462 ),
463 earliest_before_refetch, earliest_after_refetch
464 )
465 }
466 continue;
467 }
468 };
469
470 state.apply_encoded_diffs(&self.cfg, &self.metrics, &live_diffs);
471 return state;
472 }
473 }
474
475 pub async fn fetch_all_live_states<T>(
479 &self,
480 shard_id: ShardId,
481 ) -> Option<UntypedStateVersionsIter<T>>
482 where
483 T: Timestamp + Lattice + Codec64,
484 {
485 let retry = self
486 .metrics
487 .retries
488 .fetch_live_states
489 .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
490 let mut all_live_diffs = self.fetch_all_live_diffs(&shard_id).await;
491 loop {
492 let earliest_live_diff = match all_live_diffs.0.first() {
493 Some(x) => x,
494 None => return None,
495 };
496 let state = match self
497 .fetch_rollup_at_seqno(
498 &shard_id,
499 all_live_diffs.0.clone(),
500 earliest_live_diff.seqno,
501 )
502 .await
503 {
504 Some(x) => x,
505 None => {
506 retry.retries.inc();
513 let earliest_before_refetch = earliest_live_diff.seqno;
514 all_live_diffs = self.fetch_all_live_diffs(&shard_id).await;
515
516 let earliest_after_refetch = all_live_diffs
523 .0
524 .first()
525 .expect("initialized shard should have at least one diff")
526 .seqno;
527 if earliest_before_refetch >= earliest_after_refetch {
528 warn!(
529 concat!(
530 "fetch_all_live_states refetch expects earliest live diff to advance: {} vs {}. ",
531 "In dev and testing, this happens when persist's Blob (files in mzdata) ",
532 "is deleted out from under it or when two processes are talking to ",
533 "different Blobs (e.g. docker containers without it shared)."
534 ),
535 earliest_before_refetch, earliest_after_refetch
536 )
537 }
538 continue;
539 }
540 };
541 assert_eq!(earliest_live_diff.seqno, state.seqno());
542 return Some(UntypedStateVersionsIter {
543 shard_id,
544 cfg: self.cfg.clone(),
545 metrics: Arc::clone(&self.metrics),
546 state,
547 diffs: all_live_diffs.0,
548 });
549 }
550 }
551
552 pub async fn fetch_all_live_diffs(&self, shard_id: &ShardId) -> AllLiveDiffs {
558 let path = shard_id.to_string();
559 let diffs = retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
560 self.consensus.scan(&path, SeqNo::minimum(), SCAN_ALL).await
561 })
562 .instrument(debug_span!("fetch_state::scan"))
563 .await;
564 AllLiveDiffs(diffs)
565 }
566
567 pub async fn fetch_recent_live_diffs<T>(&self, shard_id: &ShardId) -> RecentLiveDiffs
574 where
575 T: Timestamp + Lattice + Codec64,
576 {
577 let path = shard_id.to_string();
578 let scan_limit = STATE_VERSIONS_RECENT_LIVE_DIFFS_LIMIT.get(&self.cfg);
579 let oldest_diffs =
580 retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
581 self.consensus
582 .scan(&path, SeqNo::minimum(), scan_limit)
583 .await
584 })
585 .instrument(debug_span!("fetch_state::scan"))
586 .await;
587
588 if oldest_diffs.len() < scan_limit {
591 self.metrics.state.fetch_recent_live_diffs_fast_path.inc();
592 return RecentLiveDiffs(oldest_diffs);
593 }
594
595 let head = retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
606 self.consensus.head(&path).await
607 })
608 .instrument(debug_span!("fetch_state::slow_path::head"))
609 .await
610 .expect("initialized shard should have at least 1 diff");
611
612 let latest_diff = self
613 .metrics
614 .codecs
615 .state_diff
616 .decode(|| StateDiff::<T>::decode(&self.cfg.build_version, head.data));
617
618 match BlobKey::parse_ids(&latest_diff.latest_rollup_key.complete(shard_id)) {
619 Ok((_shard_id, PartialBlobKey::Rollup(seqno, _rollup))) => {
620 self.metrics.state.fetch_recent_live_diffs_slow_path.inc();
621 let diffs =
622 retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
623 self.consensus.scan(&path, seqno, SCAN_ALL).await
627 })
628 .instrument(debug_span!("fetch_state::slow_path::scan"))
629 .await;
630 RecentLiveDiffs(diffs)
631 }
632 Ok(_) => panic!(
633 "invalid state diff rollup key: {}",
634 latest_diff.latest_rollup_key
635 ),
636 Err(err) => panic!("unparseable state diff rollup key: {}", err),
637 }
638 }
639
640 pub async fn fetch_all_live_diffs_gt_seqno<K, V, T, D>(
645 &self,
646 shard_id: &ShardId,
647 seqno: SeqNo,
648 ) -> Vec<VersionedData> {
649 let path = shard_id.to_string();
650 retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
651 self.consensus.scan(&path, seqno.next(), SCAN_ALL).await
652 })
653 .instrument(debug_span!("fetch_state::scan"))
654 .await
655 }
656
657 pub async fn truncate_diffs(&self, shard_id: &ShardId, seqno: SeqNo) {
659 let path = shard_id.to_string();
660 let _deleted_count = retry_external(&self.metrics.retries.external.gc_truncate, || async {
661 self.consensus.truncate(&path, seqno).await
662 })
663 .instrument(debug_span!("gc::truncate"))
664 .await;
665 }
666
667 async fn write_initial_rollup<K, V, T, D>(
671 &self,
672 shard_metrics: &ShardMetrics,
673 ) -> (TypedState<K, V, T, D>, StateDiff<T>)
674 where
675 K: Debug + Codec,
676 V: Debug + Codec,
677 T: Timestamp + Lattice + Codec64,
678 D: Monoid + Codec64,
679 {
680 let empty_state = TypedState::new(
681 self.cfg.build_version.clone(),
682 shard_metrics.shard_id,
683 self.cfg.hostname.clone(),
684 (self.cfg.now)(),
685 );
686 let rollup_seqno = empty_state.seqno.next();
687 let rollup = HollowRollup {
688 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
689 encoded_size_bytes: None,
693 };
694 let (applied, initial_state) = match empty_state
695 .clone_apply(&self.cfg, &mut |_, _, state| {
696 state.add_rollup((rollup_seqno, &rollup))
697 }) {
698 Continue(x) => x,
699 Break(NoOpStateTransition(_)) => {
700 panic!("initial state transition should not be a no-op")
701 }
702 };
703 assert!(
704 applied,
705 "add_and_remove_rollups should apply to the empty state"
706 );
707
708 let rollup = self.encode_rollup_blob(
709 shard_metrics,
710 initial_state.clone_for_rollup(),
711 vec![],
712 rollup.key,
713 );
714 let () = self.write_rollup_blob(&rollup).await;
715 assert_eq!(initial_state.seqno, rollup.seqno);
716
717 let diff = StateDiff::from_diff(&empty_state.state, &initial_state.state);
718 (initial_state, diff)
719 }
720
721 pub async fn write_rollup_for_state<K, V, T, D>(
722 &self,
723 shard_metrics: &ShardMetrics,
724 state: TypedState<K, V, T, D>,
725 rollup_id: &RollupId,
726 ) -> Option<EncodedRollup>
727 where
728 K: Debug + Codec,
729 V: Debug + Codec,
730 T: Timestamp + Lattice + Codec64,
731 D: Monoid + Codec64,
732 {
733 let (latest_rollup_seqno, _rollup) = state.latest_rollup();
734 let seqno = state.seqno();
735
736 let diffs: Vec<_> = self
740 .fetch_all_live_diffs_gt_seqno::<K, V, T, D>(&state.shard_id, *latest_rollup_seqno)
741 .await;
742
743 match diffs.first() {
744 None => {
745 self.metrics.state.rollup_write_noop_latest.inc();
751 assert_eq!(seqno, *latest_rollup_seqno);
752 return None;
753 }
754 Some(first) => {
755 self.metrics.state.rollup_write_noop_truncated.inc();
763 if first.seqno != latest_rollup_seqno.next() {
764 assert!(
765 first.seqno > latest_rollup_seqno.next(),
766 "diff: {}, rollup: {}",
767 first.seqno,
768 latest_rollup_seqno,
769 );
770 return None;
771 }
772 }
773 }
774
775 let diffs: Vec<_> = diffs.into_iter().filter(|x| x.seqno <= seqno).collect();
777
778 assert_eq!(
781 diffs.first().map(|x| x.seqno),
782 Some(latest_rollup_seqno.next())
783 );
784 assert_eq!(diffs.last().map(|x| x.seqno), Some(state.seqno));
785
786 let key = PartialRollupKey::new(state.seqno, rollup_id);
787 let rollup = self.encode_rollup_blob(shard_metrics, state, diffs, key);
788 let () = self.write_rollup_blob(&rollup).await;
789
790 self.metrics.state.rollup_write_success.inc();
791
792 Some(rollup)
793 }
794
795 pub fn encode_rollup_blob<K, V, T, D>(
799 &self,
800 shard_metrics: &ShardMetrics,
801 state: TypedState<K, V, T, D>,
802 diffs: Vec<VersionedData>,
803 key: PartialRollupKey,
804 ) -> EncodedRollup
805 where
806 K: Debug + Codec,
807 V: Debug + Codec,
808 T: Timestamp + Lattice + Codec64,
809 D: Monoid + Codec64,
810 {
811 let shard_id = state.shard_id;
812 let rollup_seqno = state.seqno;
813
814 let rollup = Rollup::from(state.into(), diffs);
815 let desc = rollup.diffs.as_ref().expect("inlined diffs").description();
816
817 let buf = self.metrics.codecs.state.encode(|| {
818 let mut buf = Vec::new();
819 rollup
820 .into_proto()
821 .encode(&mut buf)
822 .expect("no required fields means no initialization errors");
823 Bytes::from(buf)
824 });
825 shard_metrics
826 .latest_rollup_size
827 .set(u64::cast_from(buf.len()));
828 EncodedRollup {
829 shard_id,
830 seqno: rollup_seqno,
831 key,
832 buf,
833 _desc: desc,
834 }
835 }
836
837 pub async fn write_rollup_blob(&self, rollup: &EncodedRollup) {
839 let payload_len = rollup.buf.len();
840 retry_external(&self.metrics.retries.external.rollup_set, || async {
841 self.blob
842 .set(
843 &rollup.key.complete(&rollup.shard_id),
844 Bytes::clone(&rollup.buf),
845 )
846 .await
847 })
848 .instrument(debug_span!("rollup::set", payload_len))
849 .await;
850 }
851
852 async fn fetch_rollup_at_seqno<T>(
859 &self,
860 shard_id: &ShardId,
861 live_diffs: Vec<VersionedData>,
862 seqno: SeqNo,
863 ) -> Option<UntypedState<T>>
864 where
865 T: Timestamp + Lattice + Codec64,
866 {
867 let rollup_key_for_migration = live_diffs.iter().find_map(|x| {
868 let diff = self
869 .metrics
870 .codecs
871 .state_diff
872 .decode(|| StateDiff::<T>::decode(&self.cfg.build_version, x.data.clone()));
874 diff.rollups
875 .iter()
876 .find(|x| x.key == seqno)
877 .map(|x| match &x.val {
878 StateFieldValDiff::Insert(x) => x.clone(),
879 StateFieldValDiff::Update(_, x) => x.clone(),
880 StateFieldValDiff::Delete(x) => x.clone(),
881 })
882 });
883
884 let state = self.fetch_current_state::<T>(shard_id, live_diffs).await;
885 if let Some(rollup) = state.rollups().get(&seqno) {
886 return self.fetch_rollup_at_key(shard_id, &rollup.key).await;
887 }
888
889 let rollup = rollup_key_for_migration.expect("someone should have a key for this rollup");
922 tracing::info!("only found rollup for {} {} via migration", shard_id, seqno);
923 self.metrics.state.rollup_at_seqno_migration.inc();
924 self.fetch_rollup_at_key(shard_id, &rollup.key).await
925 }
926
927 async fn fetch_rollup_at_key<T>(
929 &self,
930 shard_id: &ShardId,
931 rollup_key: &PartialRollupKey,
932 ) -> Option<UntypedState<T>>
933 where
934 T: Timestamp + Lattice + Codec64,
935 {
936 retry_external(&self.metrics.retries.external.rollup_get, || async {
937 self.blob.get(&rollup_key.complete(shard_id)).await
938 })
939 .instrument(debug_span!("rollup::get"))
940 .await
941 .map(|buf| {
942 self.metrics
943 .codecs
944 .state
945 .decode(|| UntypedState::decode(&self.cfg.build_version, buf))
946 })
947 }
948
949 pub async fn delete_rollup(&self, shard_id: &ShardId, key: &PartialRollupKey) {
951 let _ = retry_external(&self.metrics.retries.external.rollup_delete, || async {
952 self.blob.delete(&key.complete(shard_id)).await
953 })
954 .await
955 .instrument(debug_span!("rollup::delete"));
956 }
957}
958
959pub struct UntypedStateVersionsIter<T> {
960 shard_id: ShardId,
961 cfg: PersistConfig,
962 metrics: Arc<Metrics>,
963 state: UntypedState<T>,
964 diffs: Vec<VersionedData>,
965}
966
967impl<T: Timestamp + Lattice + Codec64> UntypedStateVersionsIter<T> {
968 pub(crate) fn check_ts_codec(self) -> Result<StateVersionsIter<T>, CodecMismatchT> {
969 let key_codec = self.state.key_codec.clone();
970 let val_codec = self.state.val_codec.clone();
971 let diff_codec = self.state.diff_codec.clone();
972 let state = self.state.check_ts_codec(&self.shard_id)?;
973 Ok(StateVersionsIter::new(
974 self.cfg,
975 self.metrics,
976 state,
977 self.diffs,
978 key_codec,
979 val_codec,
980 diff_codec,
981 ))
982 }
983}
984
985pub struct StateVersionsIter<T> {
987 cfg: PersistConfig,
988 metrics: Arc<Metrics>,
989 state: State<T>,
990 diffs: Vec<VersionedData>,
991 key_codec: String,
992 val_codec: String,
993 diff_codec: String,
994 #[cfg(debug_assertions)]
995 validator: ReferencedBlobValidator<T>,
996}
997
998impl<T: Timestamp + Lattice + Codec64> StateVersionsIter<T> {
999 fn new(
1000 cfg: PersistConfig,
1001 metrics: Arc<Metrics>,
1002 state: State<T>,
1003 mut diffs: Vec<VersionedData>,
1005 key_codec: String,
1006 val_codec: String,
1007 diff_codec: String,
1008 ) -> Self {
1009 assert!(diffs.first().map_or(true, |x| x.seqno == state.seqno));
1010 diffs.reverse();
1011 StateVersionsIter {
1012 cfg,
1013 metrics,
1014 state,
1015 diffs,
1016 key_codec,
1017 val_codec,
1018 diff_codec,
1019 #[cfg(debug_assertions)]
1020 validator: ReferencedBlobValidator::default(),
1021 }
1022 }
1023
1024 pub fn len(&self) -> usize {
1025 self.diffs.len()
1026 }
1027
1028 pub fn next<F: for<'a> FnMut(InspectDiff<'a, T>)>(
1036 &mut self,
1037 mut inspect_diff_fn: F,
1038 ) -> Option<&State<T>> {
1039 let diff = match self.diffs.pop() {
1040 Some(x) => x,
1041 None => return None,
1042 };
1043 let data = diff.data.clone();
1044 let diff = self
1045 .metrics
1046 .codecs
1047 .state_diff
1048 .decode(|| StateDiff::decode(&self.cfg.build_version, diff.data));
1049
1050 if diff.seqno_to == self.state.seqno {
1053 let inspect = InspectDiff::FromInitial(&self.state);
1054 #[cfg(debug_assertions)]
1055 {
1056 inspect
1057 .referenced_blobs()
1058 .for_each(|x| self.validator.add_inc_blob(x));
1059 }
1060 inspect_diff_fn(inspect);
1061 } else {
1062 let inspect = InspectDiff::Diff(&diff);
1063 #[cfg(debug_assertions)]
1064 {
1065 inspect
1066 .referenced_blobs()
1067 .for_each(|x| self.validator.add_inc_blob(x));
1068 }
1069 inspect_diff_fn(inspect);
1070 }
1071
1072 let diff_seqno_to = diff.seqno_to;
1073 self.state
1074 .apply_diffs(&self.metrics, std::iter::once((diff, data)));
1075 assert_eq!(self.state.seqno, diff_seqno_to);
1076 #[cfg(debug_assertions)]
1077 {
1078 self.validator.validate_against_state(&self.state);
1079 }
1080 Some(&self.state)
1081 }
1082
1083 pub fn state(&self) -> &State<T> {
1084 &self.state
1085 }
1086
1087 pub fn into_rollup_proto_without_diffs(&self) -> impl serde::Serialize + use<T> {
1088 Rollup::from_state_without_diffs(
1089 State {
1090 shard_id: self.state.shard_id.clone(),
1091 seqno: self.state.seqno.clone(),
1092 walltime_ms: self.state.walltime_ms.clone(),
1093 hostname: self.state.hostname.clone(),
1094 collections: self.state.collections.clone(),
1095 },
1096 self.key_codec.clone(),
1097 self.val_codec.clone(),
1098 T::codec_name(),
1099 self.diff_codec.clone(),
1100 )
1101 .into_proto()
1102 }
1103}
1104
1105#[derive(Debug)]
1110pub enum InspectDiff<'a, T> {
1111 FromInitial(&'a State<T>),
1112 Diff(&'a StateDiff<T>),
1113}
1114
1115impl<T: Timestamp + Lattice + Codec64> InspectDiff<'_, T> {
1116 pub fn referenced_blobs(&self) -> impl Iterator<Item = HollowBlobRef<'_, T>> {
1120 let (state, diff) = match self {
1121 InspectDiff::FromInitial(x) => (Some(x), None),
1122 InspectDiff::Diff(x) => (None, Some(x)),
1123 };
1124 let state_blobs = state.into_iter().flat_map(|s| s.blobs());
1125 let diff_blobs = diff.into_iter().flat_map(|d| d.blob_inserts());
1126 state_blobs.chain(diff_blobs)
1127 }
1128}
1129
1130#[cfg(debug_assertions)]
1131struct ReferencedBlobValidator<T> {
1132 full_batches: BTreeSet<HollowBatch<T>>,
1135 full_rollups: BTreeSet<HollowRollup>,
1136 inc_batches: BTreeSet<HollowBatch<T>>,
1139 inc_rollups: BTreeSet<HollowRollup>,
1140}
1141
1142#[cfg(debug_assertions)]
1143impl<T> Default for ReferencedBlobValidator<T> {
1144 fn default() -> Self {
1145 Self {
1146 full_batches: Default::default(),
1147 full_rollups: Default::default(),
1148 inc_batches: Default::default(),
1149 inc_rollups: Default::default(),
1150 }
1151 }
1152}
1153
1154#[cfg(debug_assertions)]
1155impl<T: Timestamp + Lattice + Codec64> ReferencedBlobValidator<T> {
1156 fn add_inc_blob(&mut self, x: HollowBlobRef<'_, T>) {
1157 match x {
1158 HollowBlobRef::Batch(x) => assert!(
1159 self.inc_batches.insert(x.clone()) || x.desc.lower() == x.desc.upper(),
1160 "non-empty batches should only be appended once; duplicate: {x:?}"
1161 ),
1162 HollowBlobRef::Rollup(x) => assert!(self.inc_rollups.insert(x.clone())),
1163 }
1164 }
1165 fn validate_against_state(&mut self, x: &State<T>) {
1166 use std::hash::{DefaultHasher, Hash, Hasher};
1167
1168 use mz_ore::collections::HashSet;
1169 use timely::progress::Antichain;
1170
1171 use crate::internal::state::BatchPart;
1172
1173 x.blobs().for_each(|x| match x {
1174 HollowBlobRef::Batch(x) => {
1175 self.full_batches.insert(x.clone());
1176 }
1177 HollowBlobRef::Rollup(x) => {
1178 self.full_rollups.insert(x.clone());
1179 }
1180 });
1181
1182 fn overall_desc<'a, T: Timestamp + Lattice>(
1186 iter: impl Iterator<Item = &'a Description<T>>,
1187 ) -> (Antichain<T>, Antichain<T>) {
1188 let mut lower = Antichain::new();
1189 let mut upper = Antichain::from_elem(T::minimum());
1190 for desc in iter {
1191 lower.meet_assign(desc.lower());
1192 upper.join_assign(desc.upper());
1193 }
1194 (lower, upper)
1195 }
1196 let (inc_lower, inc_upper) = overall_desc(self.inc_batches.iter().map(|a| &a.desc));
1197 let (full_lower, full_upper) = overall_desc(self.full_batches.iter().map(|a| &a.desc));
1198 assert_eq!(inc_lower, full_lower);
1199 assert_eq!(inc_upper, full_upper);
1200
1201 fn part_unique<T: Codec64>(x: &RunPart<T>) -> String {
1202 match x {
1203 RunPart::Single(BatchPart::Inline {
1204 updates,
1205 ts_rewrite,
1206 ..
1207 }) => {
1208 let mut h = DefaultHasher::new();
1209 updates.hash(&mut h);
1210 if let Some(frontier) = &ts_rewrite {
1211 h.write_usize(frontier.len());
1212 frontier.iter().for_each(|t| t.encode().hash(&mut h));
1213 }
1214 h.finish().to_string()
1215 }
1216 other => other.printable_name().to_string(),
1217 }
1218 }
1219
1220 let inc_parts: HashSet<_> = self
1222 .inc_batches
1223 .iter()
1224 .flat_map(|x| x.parts.iter())
1225 .map(part_unique)
1226 .collect();
1227 let full_parts = self
1228 .full_batches
1229 .iter()
1230 .flat_map(|x| x.parts.iter())
1231 .map(part_unique)
1232 .collect();
1233 assert_eq!(inc_parts, full_parts);
1234
1235 assert_eq!(self.inc_rollups, self.full_rollups);
1237 }
1238}
1239
1240#[cfg(test)]
1241mod tests {
1242 use mz_dyncfg::ConfigUpdates;
1243
1244 use crate::tests::new_test_client;
1245
1246 use super::*;
1247
1248 #[mz_persist_proc::test(tokio::test)]
1251 #[cfg_attr(miri, ignore)] async fn fetch_all_live_states_regression_uninitialized(dyncfgs: ConfigUpdates) {
1253 let client = new_test_client(&dyncfgs).await;
1254 let state_versions = StateVersions::new(
1255 client.cfg.clone(),
1256 Arc::clone(&client.consensus),
1257 Arc::clone(&client.blob),
1258 Arc::clone(&client.metrics),
1259 );
1260 assert!(
1261 state_versions
1262 .fetch_all_live_states::<u64>(ShardId::new())
1263 .await
1264 .is_none()
1265 );
1266 }
1267}