1#[cfg(debug_assertions)]
13use std::collections::BTreeSet;
14use std::fmt::Debug;
15use std::ops::ControlFlow::{Break, Continue};
16use std::sync::Arc;
17use std::time::SystemTime;
18
19use bytes::Bytes;
20use differential_dataflow::difference::Monoid;
21use differential_dataflow::lattice::Lattice;
22use differential_dataflow::trace::Description;
23use mz_ore::cast::CastFrom;
24use mz_persist::location::{
25 Blob, CaSResult, Consensus, Indeterminate, SCAN_ALL, SeqNo, VersionedData,
26};
27use mz_persist::retry::Retry;
28use mz_persist_types::{Codec, Codec64};
29use mz_proto::RustType;
30use prost::Message;
31use timely::progress::Timestamp;
32use tracing::{Instrument, debug, debug_span, trace, warn};
33
34use crate::cfg::STATE_VERSIONS_RECENT_LIVE_DIFFS_LIMIT;
35use crate::error::{CodecMismatch, CodecMismatchT};
36use crate::internal::encoding::{Rollup, UntypedState};
37use crate::internal::machine::{retry_determinate, retry_external};
38use crate::internal::metrics::ShardMetrics;
39use crate::internal::paths::{BlobKey, PartialBlobKey, PartialRollupKey, RollupId};
40#[cfg(debug_assertions)]
41use crate::internal::state::HollowBatch;
42use crate::internal::state::{
43 BatchPart, HollowBlobRef, HollowRollup, NoOpStateTransition, RunPart, State, TypedState,
44};
45use crate::internal::state_diff::{StateDiff, StateFieldValDiff};
46use crate::{Metrics, PersistConfig, ShardId};
47
48#[derive(Debug)]
99pub struct StateVersions {
100 pub(crate) cfg: PersistConfig,
101 pub(crate) consensus: Arc<dyn Consensus>,
102 pub(crate) blob: Arc<dyn Blob>,
103 pub(crate) metrics: Arc<Metrics>,
104}
105
106#[derive(Debug, Clone)]
107pub struct RecentLiveDiffs(pub Vec<VersionedData>);
108
109#[derive(Debug, Clone)]
110pub struct EncodedRollup {
111 pub(crate) shard_id: ShardId,
112 pub(crate) seqno: SeqNo,
113 pub(crate) key: PartialRollupKey,
114 pub(crate) _desc: Description<SeqNo>,
115 buf: Bytes,
116}
117
118impl EncodedRollup {
119 pub fn to_hollow(&self) -> HollowRollup {
120 HollowRollup {
121 key: self.key.clone(),
122 encoded_size_bytes: Some(self.buf.len()),
123 }
124 }
125}
126
127impl StateVersions {
128 pub fn new(
129 cfg: PersistConfig,
130 consensus: Arc<dyn Consensus>,
131 blob: Arc<dyn Blob>,
132 metrics: Arc<Metrics>,
133 ) -> Self {
134 StateVersions {
135 cfg,
136 consensus,
137 blob,
138 metrics,
139 }
140 }
141
142 pub async fn maybe_init_shard<K, V, T, D>(
145 &self,
146 shard_metrics: &ShardMetrics,
147 ) -> Result<TypedState<K, V, T, D>, Box<CodecMismatch>>
148 where
149 K: Debug + Codec,
150 V: Debug + Codec,
151 T: Timestamp + Lattice + Codec64,
152 D: Monoid + Codec64,
153 {
154 let shard_id = shard_metrics.shard_id;
155
156 let recent_live_diffs = self.fetch_recent_live_diffs::<T>(&shard_id).await;
158 if !recent_live_diffs.0.is_empty() {
159 return self
160 .fetch_current_state(&shard_id, recent_live_diffs.0)
161 .await
162 .check_codecs(&shard_id);
163 }
164
165 let (initial_state, initial_diff) = self.write_initial_rollup(shard_metrics).await;
167 let (cas_res, _diff) =
168 retry_external(&self.metrics.retries.external.maybe_init_cas, || async {
169 self.try_compare_and_set_current(
170 "maybe_init_shard",
171 shard_metrics,
172 None,
173 &initial_state,
174 &initial_diff,
175 )
176 .await
177 .map_err(|err| err.into())
178 })
179 .await;
180 match cas_res {
181 CaSResult::Committed => Ok(initial_state),
182 CaSResult::ExpectationMismatch => {
183 let recent_live_diffs = self.fetch_recent_live_diffs::<T>(&shard_id).await;
184 let state = self
185 .fetch_current_state(&shard_id, recent_live_diffs.0)
186 .await
187 .check_codecs(&shard_id);
188
189 let (_, rollup) = initial_state.latest_rollup();
197 let should_delete_rollup = match state.as_ref() {
198 Ok(state) => !state
199 .collections
200 .rollups
201 .values()
202 .any(|x| &x.key == &rollup.key),
203 Err(_codec_mismatch) => true,
206 };
207 if should_delete_rollup {
208 self.delete_rollup(&shard_id, &rollup.key).await;
209 }
210
211 state
212 }
213 }
214 }
215
216 pub async fn try_compare_and_set_current<K, V, T, D>(
221 &self,
222 cmd_name: &str,
223 shard_metrics: &ShardMetrics,
224 expected: Option<SeqNo>,
225 new_state: &TypedState<K, V, T, D>,
226 diff: &StateDiff<T>,
227 ) -> Result<(CaSResult, VersionedData), Indeterminate>
228 where
229 K: Debug + Codec,
230 V: Debug + Codec,
231 T: Timestamp + Lattice + Codec64,
232 D: Monoid + Codec64,
233 {
234 assert_eq!(shard_metrics.shard_id, new_state.shard_id);
235 let path = new_state.shard_id.to_string();
236
237 trace!(
238 "apply_unbatched_cmd {} attempting {}\n new_state={:?}",
239 cmd_name,
240 new_state.seqno(),
241 new_state
242 );
243 let new = self.metrics.codecs.state_diff.encode(|| {
244 let mut buf = Vec::new();
245 diff.encode(&mut buf);
246 VersionedData {
247 seqno: new_state.seqno(),
248 data: Bytes::from(buf),
249 }
250 });
251 assert_eq!(new.seqno, diff.seqno_to);
252
253 let payload_len = new.data.len();
254 let cas_res = retry_determinate(
255 &self.metrics.retries.determinate.apply_unbatched_cmd_cas,
256 || async {
257 self.consensus
258 .compare_and_set(&path, expected, new.clone())
259 .await
260 },
261 )
262 .instrument(debug_span!("apply_unbatched_cmd::cas", payload_len))
263 .await
264 .map_err(|err| {
265 debug!("apply_unbatched_cmd {} errored: {}", cmd_name, err);
266 err
267 })?;
268
269 match cas_res {
270 CaSResult::Committed => {
271 trace!(
272 "apply_unbatched_cmd {} succeeded {}\n new_state={:?}",
273 cmd_name,
274 new_state.seqno(),
275 new_state
276 );
277
278 shard_metrics.set_since(new_state.since());
279 shard_metrics.set_upper(new_state.upper());
280 shard_metrics.seqnos_since_last_rollup.set(
281 new_state
282 .seqno
283 .0
284 .saturating_sub(new_state.latest_rollup().0.0),
285 );
286 shard_metrics
287 .spine_batch_count
288 .set(u64::cast_from(new_state.spine_batch_count()));
289 let size_metrics = new_state.size_metrics();
290 shard_metrics
291 .schema_registry_version_count
292 .set(u64::cast_from(new_state.collections.schemas.len()));
293 shard_metrics
294 .hollow_batch_count
295 .set(u64::cast_from(size_metrics.hollow_batch_count));
296 shard_metrics
297 .batch_part_count
298 .set(u64::cast_from(size_metrics.batch_part_count));
299 shard_metrics
300 .rewrite_part_count
301 .set(u64::cast_from(size_metrics.rewrite_part_count));
302 shard_metrics
303 .update_count
304 .set(u64::cast_from(size_metrics.num_updates));
305 shard_metrics
306 .rollup_count
307 .set(u64::cast_from(size_metrics.state_rollup_count));
308 shard_metrics
309 .largest_batch_size
310 .set(u64::cast_from(size_metrics.largest_batch_bytes));
311 shard_metrics
312 .usage_current_state_batches_bytes
313 .set(u64::cast_from(size_metrics.state_batches_bytes));
314 shard_metrics
315 .usage_current_state_rollups_bytes
316 .set(u64::cast_from(size_metrics.state_rollups_bytes));
317 shard_metrics
318 .seqnos_held
319 .set(u64::cast_from(new_state.seqnos_held()));
320 shard_metrics
321 .encoded_diff_size
322 .inc_by(u64::cast_from(payload_len));
323 shard_metrics
324 .live_writers
325 .set(u64::cast_from(new_state.collections.writers.len()));
326 shard_metrics
327 .rewrite_part_count
328 .set(u64::cast_from(size_metrics.rewrite_part_count));
329 shard_metrics
330 .inline_part_count
331 .set(u64::cast_from(size_metrics.inline_part_count));
332 shard_metrics
333 .inline_part_bytes
334 .set(u64::cast_from(size_metrics.inline_part_bytes));
335 shard_metrics.stale_version.set(
336 if new_state
337 .state
338 .collections
339 .version
340 .cmp_precedence(&self.cfg.build_version)
341 .is_lt()
342 {
343 1
344 } else {
345 0
346 },
347 );
348
349 let spine_metrics = new_state.collections.trace.spine_metrics();
350 shard_metrics
351 .compact_batches
352 .set(spine_metrics.compact_batches);
353 shard_metrics
354 .compacting_batches
355 .set(spine_metrics.compacting_batches);
356 shard_metrics
357 .noncompact_batches
358 .set(spine_metrics.noncompact_batches);
359
360 let batch_parts_by_version = new_state
361 .collections
362 .trace
363 .batches()
364 .flat_map(|x| x.parts.iter())
365 .flat_map(|part| {
366 let key = match part {
367 RunPart::Many(x) => Some(&x.key),
368 RunPart::Single(BatchPart::Hollow(x)) => Some(&x.key),
369 RunPart::Single(BatchPart::Inline { .. }) => None,
371 }?;
372 let (writer_key, _) = key.0.split_once('/')?;
374 match &writer_key[..1] {
375 "w" => Some(("old", part.encoded_size_bytes())),
376 "n" => Some((&writer_key[1..], part.encoded_size_bytes())),
377 _ => None,
378 }
379 });
380 shard_metrics.set_batch_part_versions(batch_parts_by_version);
381
382 Ok((CaSResult::Committed, new))
383 }
384 CaSResult::ExpectationMismatch => {
385 debug!(
386 "apply_unbatched_cmd {} {} lost the CaS race, retrying: {:?}",
387 new_state.shard_id(),
388 cmd_name,
389 expected,
390 );
391 Ok((CaSResult::ExpectationMismatch, new))
392 }
393 }
394 }
395
396 pub async fn fetch_current_state<T>(
403 &self,
404 shard_id: &ShardId,
405 mut live_diffs: Vec<VersionedData>,
406 ) -> UntypedState<T>
407 where
408 T: Timestamp + Lattice + Codec64,
409 {
410 let retry = self
411 .metrics
412 .retries
413 .fetch_latest_state
414 .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
415 loop {
416 let latest_diff = live_diffs
417 .last()
418 .expect("initialized shard should have at least one diff");
419 let latest_diff = self
420 .metrics
421 .codecs
422 .state_diff
423 .decode(|| {
425 StateDiff::<T>::decode(&self.cfg.build_version, latest_diff.data.clone())
426 });
427 let mut state = match self
428 .fetch_rollup_at_key(shard_id, &latest_diff.latest_rollup_key)
429 .await
430 {
431 Some(x) => x,
432 None => {
433 retry.retries.inc();
436 let earliest_before_refetch = live_diffs
437 .first()
438 .expect("initialized shard should have at least one diff")
439 .seqno;
440 live_diffs = self.fetch_recent_live_diffs::<T>(shard_id).await.0;
441
442 let earliest_after_refetch = live_diffs
449 .first()
450 .expect("initialized shard should have at least one diff")
451 .seqno;
452 if earliest_before_refetch >= earliest_after_refetch {
453 warn!(
454 concat!(
455 "fetch_current_state refetch expects earliest live diff to advance: {} vs {}. ",
456 "In dev and testing, this happens when persist's Blob (files in mzdata) ",
457 "is deleted out from under it or when two processes are talking to ",
458 "different Blobs (e.g. docker containers without it shared)."
459 ),
460 earliest_before_refetch, earliest_after_refetch
461 )
462 }
463 continue;
464 }
465 };
466
467 state.apply_encoded_diffs(&self.cfg, &self.metrics, &live_diffs);
468 return state;
469 }
470 }
471
472 pub async fn fetch_all_live_states<T>(
476 &self,
477 shard_id: ShardId,
478 ) -> Option<UntypedStateVersionsIter<T>>
479 where
480 T: Timestamp + Lattice + Codec64,
481 {
482 let retry = self
483 .metrics
484 .retries
485 .fetch_live_states
486 .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
487 let mut all_live_diffs = self.fetch_all_live_diffs(&shard_id).await;
488 loop {
489 let earliest_live_diff = match all_live_diffs.first() {
490 Some(x) => x,
491 None => return None,
492 };
493 let state = match self
494 .fetch_rollup_at_seqno(&shard_id, all_live_diffs.clone(), earliest_live_diff.seqno)
495 .await
496 {
497 Some(x) => x,
498 None => {
499 retry.retries.inc();
506 let earliest_before_refetch = earliest_live_diff.seqno;
507 all_live_diffs = self.fetch_all_live_diffs(&shard_id).await;
508
509 let earliest_after_refetch = all_live_diffs
516 .first()
517 .expect("initialized shard should have at least one diff")
518 .seqno;
519 if earliest_before_refetch >= earliest_after_refetch {
520 warn!(
521 concat!(
522 "fetch_all_live_states refetch expects earliest live diff to advance: {} vs {}. ",
523 "In dev and testing, this happens when persist's Blob (files in mzdata) ",
524 "is deleted out from under it or when two processes are talking to ",
525 "different Blobs (e.g. docker containers without it shared)."
526 ),
527 earliest_before_refetch, earliest_after_refetch
528 )
529 }
530 continue;
531 }
532 };
533 assert_eq!(earliest_live_diff.seqno, state.seqno());
534 return Some(UntypedStateVersionsIter {
535 shard_id,
536 cfg: self.cfg.clone(),
537 metrics: Arc::clone(&self.metrics),
538 state,
539 diffs: all_live_diffs,
540 });
541 }
542 }
543
544 pub async fn fetch_all_live_diffs(&self, shard_id: &ShardId) -> Vec<VersionedData> {
550 let path = shard_id.to_string();
551 let diffs = retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
552 self.consensus.scan(&path, SeqNo::minimum(), SCAN_ALL).await
553 })
554 .instrument(debug_span!("fetch_state::scan"))
555 .await;
556 diffs
557 }
558
559 async fn fetch_live_diffs(
562 &self,
563 shard_id: &ShardId,
564 from: SeqNo,
565 limit: usize,
566 ) -> Vec<VersionedData> {
567 let path = shard_id.to_string();
568 retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
569 self.consensus.scan(&path, from, limit).await
570 })
571 .instrument(debug_span!("fetch_state::scan"))
572 .await
573 }
574
575 pub async fn fetch_live_diffs_through(
578 &self,
579 shard_id: &ShardId,
580 through: SeqNo,
581 ) -> Vec<VersionedData> {
582 let scan_limit = STATE_VERSIONS_RECENT_LIVE_DIFFS_LIMIT.get(&self.cfg);
584 let mut versions = self
585 .fetch_live_diffs(shard_id, SeqNo::minimum(), scan_limit)
586 .await;
587
588 if versions.len() == scan_limit {
589 loop {
591 let Some(last_seqno) = versions.last().map(|v| v.seqno) else {
592 break;
593 };
594 if through <= last_seqno {
595 break;
596 }
597 let from = last_seqno.next();
598 let limit = usize::cast_from(through.0 - last_seqno.0).clamp(1, 10 * scan_limit);
599 let more_versions = self.fetch_live_diffs(shard_id, from, limit).await;
600 let more_versions_len = more_versions.len();
601 if let Some(first) = more_versions.first() {
602 assert!(last_seqno < first.seqno);
603 }
604 versions.extend(more_versions);
605 if more_versions_len < limit {
606 break;
607 }
608 }
609 }
610 let partition_index = versions.partition_point(|v| v.seqno <= through);
613 versions.truncate(partition_index);
614 versions
615 }
616
617 pub async fn fetch_recent_live_diffs<T>(&self, shard_id: &ShardId) -> RecentLiveDiffs
624 where
625 T: Timestamp + Lattice + Codec64,
626 {
627 let path = shard_id.to_string();
628 let scan_limit = STATE_VERSIONS_RECENT_LIVE_DIFFS_LIMIT.get(&self.cfg);
629 let oldest_diffs =
630 retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
631 self.consensus
632 .scan(&path, SeqNo::minimum(), scan_limit)
633 .await
634 })
635 .instrument(debug_span!("fetch_state::scan"))
636 .await;
637
638 if oldest_diffs.len() < scan_limit {
641 self.metrics.state.fetch_recent_live_diffs_fast_path.inc();
642 return RecentLiveDiffs(oldest_diffs);
643 }
644
645 let head = retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
656 self.consensus.head(&path).await
657 })
658 .instrument(debug_span!("fetch_state::slow_path::head"))
659 .await
660 .expect("initialized shard should have at least 1 diff");
661
662 let latest_diff = self
663 .metrics
664 .codecs
665 .state_diff
666 .decode(|| StateDiff::<T>::decode(&self.cfg.build_version, head.data));
667
668 match BlobKey::parse_ids(&latest_diff.latest_rollup_key.complete(shard_id)) {
669 Ok((_shard_id, PartialBlobKey::Rollup(seqno, _rollup))) => {
670 self.metrics.state.fetch_recent_live_diffs_slow_path.inc();
671 let diffs =
672 retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
673 self.consensus.scan(&path, seqno, SCAN_ALL).await
677 })
678 .instrument(debug_span!("fetch_state::slow_path::scan"))
679 .await;
680 RecentLiveDiffs(diffs)
681 }
682 Ok(_) => panic!(
683 "invalid state diff rollup key: {}",
684 latest_diff.latest_rollup_key
685 ),
686 Err(err) => panic!("unparseable state diff rollup key: {}", err),
687 }
688 }
689
690 pub async fn fetch_all_live_diffs_gt_seqno<K, V, T, D>(
695 &self,
696 shard_id: &ShardId,
697 seqno: SeqNo,
698 ) -> Vec<VersionedData> {
699 let path = shard_id.to_string();
700 retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
701 self.consensus.scan(&path, seqno.next(), SCAN_ALL).await
702 })
703 .instrument(debug_span!("fetch_state::scan"))
704 .await
705 }
706
707 pub async fn truncate_diffs(&self, shard_id: &ShardId, seqno: SeqNo) {
709 let path = shard_id.to_string();
710 let _deleted_count = retry_external(&self.metrics.retries.external.gc_truncate, || async {
711 self.consensus.truncate(&path, seqno).await
712 })
713 .instrument(debug_span!("gc::truncate"))
714 .await;
715 }
716
717 async fn write_initial_rollup<K, V, T, D>(
721 &self,
722 shard_metrics: &ShardMetrics,
723 ) -> (TypedState<K, V, T, D>, StateDiff<T>)
724 where
725 K: Debug + Codec,
726 V: Debug + Codec,
727 T: Timestamp + Lattice + Codec64,
728 D: Monoid + Codec64,
729 {
730 let empty_state = TypedState::new(
731 self.cfg.build_version.clone(),
732 shard_metrics.shard_id,
733 self.cfg.hostname.clone(),
734 (self.cfg.now)(),
735 );
736 let rollup_seqno = empty_state.seqno.next();
737 let rollup = HollowRollup {
738 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
739 encoded_size_bytes: None,
743 };
744 let (applied, initial_state) = match empty_state
745 .clone_apply(&self.cfg, &mut |_, _, state| {
746 state.add_rollup((rollup_seqno, &rollup))
747 }) {
748 Continue(x) => x,
749 Break(NoOpStateTransition(_)) => {
750 panic!("initial state transition should not be a no-op")
751 }
752 };
753 assert!(
754 applied,
755 "add_and_remove_rollups should apply to the empty state"
756 );
757
758 let rollup = self.encode_rollup_blob(
759 shard_metrics,
760 initial_state.clone_for_rollup(),
761 vec![],
762 rollup.key,
763 );
764 let () = self.write_rollup_blob(&rollup).await;
765 assert_eq!(initial_state.seqno, rollup.seqno);
766
767 let diff = StateDiff::from_diff(&empty_state.state, &initial_state.state);
768 (initial_state, diff)
769 }
770
771 pub async fn write_rollup_for_state<K, V, T, D>(
772 &self,
773 shard_metrics: &ShardMetrics,
774 state: TypedState<K, V, T, D>,
775 rollup_id: &RollupId,
776 ) -> Option<EncodedRollup>
777 where
778 K: Debug + Codec,
779 V: Debug + Codec,
780 T: Timestamp + Lattice + Codec64,
781 D: Monoid + Codec64,
782 {
783 let (latest_rollup_seqno, _rollup) = state.latest_rollup();
784 let seqno = state.seqno();
785
786 let diffs: Vec<_> = self
790 .fetch_all_live_diffs_gt_seqno::<K, V, T, D>(&state.shard_id, *latest_rollup_seqno)
791 .await;
792
793 match diffs.first() {
794 None => {
795 self.metrics.state.rollup_write_noop_latest.inc();
801 assert_eq!(seqno, *latest_rollup_seqno);
802 return None;
803 }
804 Some(first) => {
805 self.metrics.state.rollup_write_noop_truncated.inc();
813 if first.seqno != latest_rollup_seqno.next() {
814 assert!(
815 first.seqno > latest_rollup_seqno.next(),
816 "diff: {}, rollup: {}",
817 first.seqno,
818 latest_rollup_seqno,
819 );
820 return None;
821 }
822 }
823 }
824
825 let diffs: Vec<_> = diffs.into_iter().filter(|x| x.seqno <= seqno).collect();
827
828 assert_eq!(
831 diffs.first().map(|x| x.seqno),
832 Some(latest_rollup_seqno.next())
833 );
834 assert_eq!(diffs.last().map(|x| x.seqno), Some(state.seqno));
835
836 let key = PartialRollupKey::new(state.seqno, rollup_id);
837 let rollup = self.encode_rollup_blob(shard_metrics, state, diffs, key);
838 let () = self.write_rollup_blob(&rollup).await;
839
840 self.metrics.state.rollup_write_success.inc();
841
842 Some(rollup)
843 }
844
845 pub fn encode_rollup_blob<K, V, T, D>(
849 &self,
850 shard_metrics: &ShardMetrics,
851 state: TypedState<K, V, T, D>,
852 diffs: Vec<VersionedData>,
853 key: PartialRollupKey,
854 ) -> EncodedRollup
855 where
856 K: Debug + Codec,
857 V: Debug + Codec,
858 T: Timestamp + Lattice + Codec64,
859 D: Monoid + Codec64,
860 {
861 let shard_id = state.shard_id;
862 let rollup_seqno = state.seqno;
863
864 let rollup = Rollup::from(state.into(), diffs);
865 let desc = rollup.diffs.as_ref().expect("inlined diffs").description();
866
867 let buf = self.metrics.codecs.state.encode(|| {
868 let mut buf = Vec::new();
869 rollup
870 .into_proto()
871 .encode(&mut buf)
872 .expect("no required fields means no initialization errors");
873 Bytes::from(buf)
874 });
875 shard_metrics
876 .latest_rollup_size
877 .set(u64::cast_from(buf.len()));
878 EncodedRollup {
879 shard_id,
880 seqno: rollup_seqno,
881 key,
882 buf,
883 _desc: desc,
884 }
885 }
886
887 pub async fn write_rollup_blob(&self, rollup: &EncodedRollup) {
889 let payload_len = rollup.buf.len();
890 retry_external(&self.metrics.retries.external.rollup_set, || async {
891 self.blob
892 .set(
893 &rollup.key.complete(&rollup.shard_id),
894 Bytes::clone(&rollup.buf),
895 )
896 .await
897 })
898 .instrument(debug_span!("rollup::set", payload_len))
899 .await;
900 }
901
902 async fn fetch_rollup_at_seqno<T>(
909 &self,
910 shard_id: &ShardId,
911 live_diffs: Vec<VersionedData>,
912 seqno: SeqNo,
913 ) -> Option<UntypedState<T>>
914 where
915 T: Timestamp + Lattice + Codec64,
916 {
917 let rollup_key_for_migration = live_diffs.iter().find_map(|x| {
918 let diff = self
919 .metrics
920 .codecs
921 .state_diff
922 .decode(|| StateDiff::<T>::decode(&self.cfg.build_version, x.data.clone()));
924 diff.rollups
925 .iter()
926 .find(|x| x.key == seqno)
927 .map(|x| match &x.val {
928 StateFieldValDiff::Insert(x) => x.clone(),
929 StateFieldValDiff::Update(_, x) => x.clone(),
930 StateFieldValDiff::Delete(x) => x.clone(),
931 })
932 });
933
934 let state = self.fetch_current_state::<T>(shard_id, live_diffs).await;
935 if let Some(rollup) = state.rollups().get(&seqno) {
936 return self.fetch_rollup_at_key(shard_id, &rollup.key).await;
937 }
938
939 let rollup = rollup_key_for_migration.expect("someone should have a key for this rollup");
972 tracing::info!("only found rollup for {} {} via migration", shard_id, seqno);
973 self.metrics.state.rollup_at_seqno_migration.inc();
974 self.fetch_rollup_at_key(shard_id, &rollup.key).await
975 }
976
977 pub async fn fetch_rollup_at_key<T>(
979 &self,
980 shard_id: &ShardId,
981 rollup_key: &PartialRollupKey,
982 ) -> Option<UntypedState<T>>
983 where
984 T: Timestamp + Lattice + Codec64,
985 {
986 retry_external(&self.metrics.retries.external.rollup_get, || async {
987 self.blob.get(&rollup_key.complete(shard_id)).await
988 })
989 .instrument(debug_span!("rollup::get"))
990 .await
991 .map(|buf| {
992 self.metrics
993 .codecs
994 .state
995 .decode(|| UntypedState::decode(&self.cfg.build_version, buf))
996 })
997 }
998
999 pub async fn delete_rollup(&self, shard_id: &ShardId, key: &PartialRollupKey) {
1001 let _ = retry_external(&self.metrics.retries.external.rollup_delete, || async {
1002 self.blob.delete(&key.complete(shard_id)).await
1003 })
1004 .await
1005 .instrument(debug_span!("rollup::delete"));
1006 }
1007}
1008
1009pub struct UntypedStateVersionsIter<T> {
1010 shard_id: ShardId,
1011 cfg: PersistConfig,
1012 metrics: Arc<Metrics>,
1013 state: UntypedState<T>,
1014 diffs: Vec<VersionedData>,
1015}
1016
1017impl<T: Timestamp + Lattice + Codec64> UntypedStateVersionsIter<T> {
1018 pub(crate) fn new(
1019 shard_id: ShardId,
1020 cfg: PersistConfig,
1021 metrics: Arc<Metrics>,
1022 state: UntypedState<T>,
1023 diffs: Vec<VersionedData>,
1024 ) -> Self {
1025 Self {
1026 shard_id,
1027 cfg,
1028 metrics,
1029 state,
1030 diffs,
1031 }
1032 }
1033
1034 pub(crate) fn check_ts_codec(self) -> Result<StateVersionsIter<T>, CodecMismatchT> {
1035 let key_codec = self.state.key_codec.clone();
1036 let val_codec = self.state.val_codec.clone();
1037 let diff_codec = self.state.diff_codec.clone();
1038 let state = self.state.check_ts_codec(&self.shard_id)?;
1039 Ok(StateVersionsIter::new(
1040 self.cfg,
1041 self.metrics,
1042 state,
1043 self.diffs,
1044 key_codec,
1045 val_codec,
1046 diff_codec,
1047 ))
1048 }
1049}
1050
1051pub struct StateVersionsIter<T> {
1053 cfg: PersistConfig,
1054 metrics: Arc<Metrics>,
1055 state: State<T>,
1056 diffs: Vec<VersionedData>,
1057 key_codec: String,
1058 val_codec: String,
1059 diff_codec: String,
1060 #[cfg(debug_assertions)]
1061 validator: ReferencedBlobValidator<T>,
1062}
1063
1064impl<T: Timestamp + Lattice + Codec64> StateVersionsIter<T> {
1065 fn new(
1066 cfg: PersistConfig,
1067 metrics: Arc<Metrics>,
1068 state: State<T>,
1069 mut diffs: Vec<VersionedData>,
1071 key_codec: String,
1072 val_codec: String,
1073 diff_codec: String,
1074 ) -> Self {
1075 assert!(diffs.first().map_or(true, |x| x.seqno == state.seqno));
1076 diffs.reverse();
1077 StateVersionsIter {
1078 cfg,
1079 metrics,
1080 state,
1081 diffs,
1082 key_codec,
1083 val_codec,
1084 diff_codec,
1085 #[cfg(debug_assertions)]
1086 validator: ReferencedBlobValidator::default(),
1087 }
1088 }
1089
1090 pub fn len(&self) -> usize {
1091 self.diffs.len()
1092 }
1093
1094 pub fn next<F: for<'a> FnMut(InspectDiff<'a, T>)>(
1102 &mut self,
1103 mut inspect_diff_fn: F,
1104 ) -> Option<&State<T>> {
1105 let diff = match self.diffs.pop() {
1106 Some(x) => x,
1107 None => return None,
1108 };
1109 let data = diff.data.clone();
1110 let diff = self
1111 .metrics
1112 .codecs
1113 .state_diff
1114 .decode(|| StateDiff::decode(&self.cfg.build_version, diff.data));
1115
1116 if diff.seqno_to == self.state.seqno {
1119 let inspect = InspectDiff::FromInitial(&self.state);
1120 #[cfg(debug_assertions)]
1121 {
1122 inspect
1123 .referenced_blobs()
1124 .for_each(|x| self.validator.add_inc_blob(x));
1125 }
1126 inspect_diff_fn(inspect);
1127 } else {
1128 let inspect = InspectDiff::Diff(&diff);
1129 #[cfg(debug_assertions)]
1130 {
1131 inspect
1132 .referenced_blobs()
1133 .for_each(|x| self.validator.add_inc_blob(x));
1134 }
1135 inspect_diff_fn(inspect);
1136 }
1137
1138 let diff_seqno_to = diff.seqno_to;
1139 self.state
1140 .apply_diffs(&self.metrics, std::iter::once((diff, data)));
1141 assert_eq!(self.state.seqno, diff_seqno_to);
1142 #[cfg(debug_assertions)]
1143 {
1144 self.validator.validate_against_state(&self.state);
1145 }
1146 Some(&self.state)
1147 }
1148
1149 pub fn state(&self) -> &State<T> {
1150 &self.state
1151 }
1152
1153 pub fn into_rollup_proto_without_diffs(&self) -> impl serde::Serialize + use<T> {
1154 Rollup::from_state_without_diffs(
1155 State {
1156 shard_id: self.state.shard_id.clone(),
1157 seqno: self.state.seqno.clone(),
1158 walltime_ms: self.state.walltime_ms.clone(),
1159 hostname: self.state.hostname.clone(),
1160 collections: self.state.collections.clone(),
1161 },
1162 self.key_codec.clone(),
1163 self.val_codec.clone(),
1164 T::codec_name(),
1165 self.diff_codec.clone(),
1166 )
1167 .into_proto()
1168 }
1169}
1170
1171#[derive(Debug)]
1176pub enum InspectDiff<'a, T> {
1177 FromInitial(&'a State<T>),
1178 Diff(&'a StateDiff<T>),
1179}
1180
1181impl<T: Timestamp + Lattice + Codec64> InspectDiff<'_, T> {
1182 pub fn referenced_blobs(&self) -> impl Iterator<Item = HollowBlobRef<'_, T>> {
1186 let (state, diff) = match self {
1187 InspectDiff::FromInitial(x) => (Some(x), None),
1188 InspectDiff::Diff(x) => (None, Some(x)),
1189 };
1190 let state_blobs = state.into_iter().flat_map(|s| s.blobs());
1191 let diff_blobs = diff.into_iter().flat_map(|d| d.blob_inserts());
1192 state_blobs.chain(diff_blobs)
1193 }
1194}
1195
1196#[cfg(debug_assertions)]
1197struct ReferencedBlobValidator<T> {
1198 full_batches: BTreeSet<HollowBatch<T>>,
1201 full_rollups: BTreeSet<HollowRollup>,
1202 inc_batches: BTreeSet<HollowBatch<T>>,
1205 inc_rollups: BTreeSet<HollowRollup>,
1206}
1207
1208#[cfg(debug_assertions)]
1209impl<T> Default for ReferencedBlobValidator<T> {
1210 fn default() -> Self {
1211 Self {
1212 full_batches: Default::default(),
1213 full_rollups: Default::default(),
1214 inc_batches: Default::default(),
1215 inc_rollups: Default::default(),
1216 }
1217 }
1218}
1219
1220#[cfg(debug_assertions)]
1221impl<T: Timestamp + Lattice + Codec64> ReferencedBlobValidator<T> {
1222 fn add_inc_blob(&mut self, x: HollowBlobRef<'_, T>) {
1223 match x {
1224 HollowBlobRef::Batch(x) => assert!(
1225 self.inc_batches.insert(x.clone()) || x.desc.lower() == x.desc.upper(),
1226 "non-empty batches should only be appended once; duplicate: {x:?}"
1227 ),
1228 HollowBlobRef::Rollup(x) => assert!(self.inc_rollups.insert(x.clone())),
1229 }
1230 }
1231 fn validate_against_state(&mut self, x: &State<T>) {
1232 use std::hash::{DefaultHasher, Hash, Hasher};
1233
1234 use mz_ore::collections::HashSet;
1235 use timely::progress::Antichain;
1236
1237 use crate::internal::state::BatchPart;
1238
1239 x.blobs().for_each(|x| match x {
1240 HollowBlobRef::Batch(x) => {
1241 self.full_batches.insert(x.clone());
1242 }
1243 HollowBlobRef::Rollup(x) => {
1244 self.full_rollups.insert(x.clone());
1245 }
1246 });
1247
1248 fn overall_desc<'a, T: Timestamp + Lattice>(
1252 iter: impl Iterator<Item = &'a Description<T>>,
1253 ) -> (Antichain<T>, Antichain<T>) {
1254 let mut lower = Antichain::new();
1255 let mut upper = Antichain::from_elem(T::minimum());
1256 for desc in iter {
1257 lower.meet_assign(desc.lower());
1258 upper.join_assign(desc.upper());
1259 }
1260 (lower, upper)
1261 }
1262 let (inc_lower, inc_upper) = overall_desc(self.inc_batches.iter().map(|a| &a.desc));
1263 let (full_lower, full_upper) = overall_desc(self.full_batches.iter().map(|a| &a.desc));
1264 assert_eq!(inc_lower, full_lower);
1265 assert_eq!(inc_upper, full_upper);
1266
1267 fn part_unique<T: Codec64>(x: &RunPart<T>) -> String {
1268 match x {
1269 RunPart::Single(BatchPart::Inline {
1270 updates,
1271 ts_rewrite,
1272 ..
1273 }) => {
1274 let mut h = DefaultHasher::new();
1275 updates.hash(&mut h);
1276 if let Some(frontier) = &ts_rewrite {
1277 h.write_usize(frontier.len());
1278 frontier.iter().for_each(|t| t.encode().hash(&mut h));
1279 }
1280 h.finish().to_string()
1281 }
1282 other => other.printable_name().to_string(),
1283 }
1284 }
1285
1286 let inc_parts: HashSet<_> = self
1288 .inc_batches
1289 .iter()
1290 .flat_map(|x| x.parts.iter())
1291 .map(part_unique)
1292 .collect();
1293 let full_parts = self
1294 .full_batches
1295 .iter()
1296 .flat_map(|x| x.parts.iter())
1297 .map(part_unique)
1298 .collect();
1299 assert_eq!(inc_parts, full_parts);
1300
1301 assert_eq!(self.inc_rollups, self.full_rollups);
1303 }
1304}
1305
1306#[cfg(test)]
1307mod tests {
1308 use mz_dyncfg::ConfigUpdates;
1309
1310 use crate::tests::new_test_client;
1311
1312 use super::*;
1313
1314 #[mz_persist_proc::test(tokio::test)]
1317 #[cfg_attr(miri, ignore)] async fn fetch_all_live_states_regression_uninitialized(dyncfgs: ConfigUpdates) {
1319 let client = new_test_client(&dyncfgs).await;
1320 let state_versions = StateVersions::new(
1321 client.cfg.clone(),
1322 Arc::clone(&client.consensus),
1323 Arc::clone(&client.blob),
1324 Arc::clone(&client.metrics),
1325 );
1326 assert!(
1327 state_versions
1328 .fetch_all_live_states::<u64>(ShardId::new())
1329 .await
1330 .is_none()
1331 );
1332 }
1333}