1#[cfg(debug_assertions)]
13use std::collections::BTreeSet;
14use std::fmt::Debug;
15use std::ops::ControlFlow::{Break, Continue};
16use std::sync::Arc;
17use std::time::SystemTime;
18
19use bytes::Bytes;
20use differential_dataflow::difference::Monoid;
21use differential_dataflow::lattice::Lattice;
22use differential_dataflow::trace::Description;
23use mz_ore::cast::CastFrom;
24use mz_persist::location::{
25 Blob, CaSResult, Consensus, Indeterminate, SCAN_ALL, SeqNo, VersionedData,
26};
27use mz_persist::retry::Retry;
28use mz_persist_types::{Codec, Codec64};
29use mz_proto::RustType;
30use prost::Message;
31use timely::progress::Timestamp;
32use tracing::{Instrument, debug, debug_span, trace, warn};
33
34use crate::cfg::STATE_VERSIONS_RECENT_LIVE_DIFFS_LIMIT;
35use crate::error::{CodecMismatch, CodecMismatchT};
36use crate::internal::encoding::{Rollup, UntypedState};
37use crate::internal::machine::{retry_determinate, retry_external};
38use crate::internal::metrics::ShardMetrics;
39use crate::internal::paths::{BlobKey, PartialBlobKey, PartialRollupKey, RollupId};
40#[cfg(debug_assertions)]
41use crate::internal::state::HollowBatch;
42use crate::internal::state::{
43 BatchPart, HollowBlobRef, HollowRollup, NoOpStateTransition, RunPart, State, TypedState,
44};
45use crate::internal::state_diff::{StateDiff, StateFieldValDiff};
46use crate::{Metrics, PersistConfig, ShardId};
47
48#[derive(Debug)]
99pub struct StateVersions {
100 pub(crate) cfg: PersistConfig,
101 pub(crate) consensus: Arc<dyn Consensus>,
102 pub(crate) blob: Arc<dyn Blob>,
103 pub(crate) metrics: Arc<Metrics>,
104}
105
106#[derive(Debug, Clone)]
107pub struct RecentLiveDiffs(pub Vec<VersionedData>);
108
109#[derive(Debug, Clone)]
110pub struct EncodedRollup {
111 pub(crate) shard_id: ShardId,
112 pub(crate) seqno: SeqNo,
113 pub(crate) key: PartialRollupKey,
114 pub(crate) _desc: Description<SeqNo>,
115 buf: Bytes,
116}
117
118impl EncodedRollup {
119 pub fn to_hollow(&self) -> HollowRollup {
120 HollowRollup {
121 key: self.key.clone(),
122 encoded_size_bytes: Some(self.buf.len()),
123 }
124 }
125}
126
127impl StateVersions {
128 pub fn new(
129 cfg: PersistConfig,
130 consensus: Arc<dyn Consensus>,
131 blob: Arc<dyn Blob>,
132 metrics: Arc<Metrics>,
133 ) -> Self {
134 StateVersions {
135 cfg,
136 consensus,
137 blob,
138 metrics,
139 }
140 }
141
142 pub async fn maybe_init_shard<K, V, T, D>(
145 &self,
146 shard_metrics: &ShardMetrics,
147 ) -> Result<TypedState<K, V, T, D>, Box<CodecMismatch>>
148 where
149 K: Debug + Codec,
150 V: Debug + Codec,
151 T: Timestamp + Lattice + Codec64,
152 D: Monoid + Codec64,
153 {
154 let shard_id = shard_metrics.shard_id;
155
156 let recent_live_diffs = self.fetch_recent_live_diffs::<T>(&shard_id).await;
158 if !recent_live_diffs.0.is_empty() {
159 return self
160 .fetch_current_state(&shard_id, recent_live_diffs.0)
161 .await
162 .check_codecs(&shard_id);
163 }
164
165 let (initial_state, initial_diff) = self.write_initial_rollup(shard_metrics).await;
167 assert_eq!(
168 initial_state.seqno(),
169 SeqNo::minimum(),
170 "initial state should have the initial seqno"
171 );
172 let (cas_res, _diff) =
173 retry_external(&self.metrics.retries.external.maybe_init_cas, || async {
174 self.try_compare_and_set_current(
175 "maybe_init_shard",
176 shard_metrics,
177 &initial_state,
178 &initial_diff,
179 )
180 .await
181 .map_err(|err| err.into())
182 })
183 .await;
184 match cas_res {
185 CaSResult::Committed => Ok(initial_state),
186 CaSResult::ExpectationMismatch => {
187 let recent_live_diffs = self.fetch_recent_live_diffs::<T>(&shard_id).await;
188 let state = self
189 .fetch_current_state(&shard_id, recent_live_diffs.0)
190 .await
191 .check_codecs(&shard_id);
192
193 let (_, rollup) = initial_state.latest_rollup();
201 let should_delete_rollup = match state.as_ref() {
202 Ok(state) => !state
203 .collections
204 .rollups
205 .values()
206 .any(|x| &x.key == &rollup.key),
207 Err(_codec_mismatch) => true,
210 };
211 if should_delete_rollup {
212 self.delete_rollup(&shard_id, &rollup.key).await;
213 }
214
215 state
216 }
217 }
218 }
219
220 pub async fn try_compare_and_set_current<K, V, T, D>(
225 &self,
226 cmd_name: &str,
227 shard_metrics: &ShardMetrics,
228 new_state: &TypedState<K, V, T, D>,
229 diff: &StateDiff<T>,
230 ) -> Result<(CaSResult, VersionedData), Indeterminate>
231 where
232 K: Debug + Codec,
233 V: Debug + Codec,
234 T: Timestamp + Lattice + Codec64,
235 D: Monoid + Codec64,
236 {
237 assert_eq!(shard_metrics.shard_id, new_state.shard_id);
238 let path = new_state.shard_id.to_string();
239
240 trace!(
241 "apply_unbatched_cmd {} attempting {}\n new_state={:?}",
242 cmd_name,
243 new_state.seqno(),
244 new_state
245 );
246 let new = self.metrics.codecs.state_diff.encode(|| {
247 let mut buf = Vec::new();
248 diff.encode(&mut buf);
249 VersionedData {
250 seqno: new_state.seqno(),
251 data: Bytes::from(buf),
252 }
253 });
254 assert_eq!(new.seqno, diff.seqno_to);
255
256 let payload_len = new.data.len();
257 let cas_res = retry_determinate(
258 &self.metrics.retries.determinate.apply_unbatched_cmd_cas,
259 || async { self.consensus.compare_and_set(&path, new.clone()).await },
260 )
261 .instrument(debug_span!("apply_unbatched_cmd::cas", payload_len))
262 .await
263 .map_err(|err| {
264 debug!("apply_unbatched_cmd {} errored: {}", cmd_name, err);
265 err
266 })?;
267
268 match cas_res {
269 CaSResult::Committed => {
270 trace!(
271 "apply_unbatched_cmd {} succeeded {}\n new_state={:?}",
272 cmd_name,
273 new_state.seqno(),
274 new_state
275 );
276
277 shard_metrics.set_since(new_state.since());
278 shard_metrics.set_upper(new_state.upper());
279 shard_metrics.seqnos_since_last_rollup.set(
280 new_state
281 .seqno
282 .0
283 .saturating_sub(new_state.latest_rollup().0.0),
284 );
285 shard_metrics
286 .spine_batch_count
287 .set(u64::cast_from(new_state.spine_batch_count()));
288 let size_metrics = new_state.size_metrics();
289 shard_metrics
290 .schema_registry_version_count
291 .set(u64::cast_from(new_state.collections.schemas.len()));
292 shard_metrics
293 .hollow_batch_count
294 .set(u64::cast_from(size_metrics.hollow_batch_count));
295 shard_metrics
296 .batch_part_count
297 .set(u64::cast_from(size_metrics.batch_part_count));
298 shard_metrics
299 .rewrite_part_count
300 .set(u64::cast_from(size_metrics.rewrite_part_count));
301 shard_metrics
302 .update_count
303 .set(u64::cast_from(size_metrics.num_updates));
304 shard_metrics
305 .rollup_count
306 .set(u64::cast_from(size_metrics.state_rollup_count));
307 shard_metrics
308 .largest_batch_size
309 .set(u64::cast_from(size_metrics.largest_batch_bytes));
310 shard_metrics
311 .usage_current_state_batches_bytes
312 .set(u64::cast_from(size_metrics.state_batches_bytes));
313 shard_metrics
314 .usage_current_state_rollups_bytes
315 .set(u64::cast_from(size_metrics.state_rollups_bytes));
316 shard_metrics
317 .seqnos_held
318 .set(u64::cast_from(new_state.seqnos_held()));
319 shard_metrics
320 .encoded_diff_size
321 .inc_by(u64::cast_from(payload_len));
322 shard_metrics
323 .live_writers
324 .set(u64::cast_from(new_state.collections.writers.len()));
325 shard_metrics
326 .rewrite_part_count
327 .set(u64::cast_from(size_metrics.rewrite_part_count));
328 shard_metrics
329 .inline_part_count
330 .set(u64::cast_from(size_metrics.inline_part_count));
331 shard_metrics
332 .inline_part_bytes
333 .set(u64::cast_from(size_metrics.inline_part_bytes));
334 shard_metrics.stale_version.set(
335 if new_state
336 .state
337 .collections
338 .version
339 .cmp_precedence(&self.cfg.build_version)
340 .is_lt()
341 {
342 1
343 } else {
344 0
345 },
346 );
347
348 let spine_metrics = new_state.collections.trace.spine_metrics();
349 shard_metrics
350 .compact_batches
351 .set(spine_metrics.compact_batches);
352 shard_metrics
353 .compacting_batches
354 .set(spine_metrics.compacting_batches);
355 shard_metrics
356 .noncompact_batches
357 .set(spine_metrics.noncompact_batches);
358
359 let batch_parts_by_version = new_state
360 .collections
361 .trace
362 .batches()
363 .flat_map(|x| x.parts.iter())
364 .flat_map(|part| {
365 let key = match part {
366 RunPart::Many(x) => Some(&x.key),
367 RunPart::Single(BatchPart::Hollow(x)) => Some(&x.key),
368 RunPart::Single(BatchPart::Inline { .. }) => None,
370 }?;
371 let (writer_key, _) = key.0.split_once('/')?;
373 match &writer_key[..1] {
374 "w" => Some(("old", part.encoded_size_bytes())),
375 "n" => Some((&writer_key[1..], part.encoded_size_bytes())),
376 _ => None,
377 }
378 });
379 shard_metrics.set_batch_part_versions(batch_parts_by_version);
380
381 Ok((CaSResult::Committed, new))
382 }
383 CaSResult::ExpectationMismatch => {
384 debug!(
385 "apply_unbatched_cmd {} {} lost the CaS race, retrying: {:?}",
386 new_state.shard_id(),
387 cmd_name,
388 new_state.seqno.previous(),
389 );
390 Ok((CaSResult::ExpectationMismatch, new))
391 }
392 }
393 }
394
395 pub async fn fetch_current_state<T>(
402 &self,
403 shard_id: &ShardId,
404 mut live_diffs: Vec<VersionedData>,
405 ) -> UntypedState<T>
406 where
407 T: Timestamp + Lattice + Codec64,
408 {
409 let retry = self
410 .metrics
411 .retries
412 .fetch_latest_state
413 .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
414 loop {
415 let latest_diff = live_diffs
416 .last()
417 .expect("initialized shard should have at least one diff");
418 let latest_diff = self
419 .metrics
420 .codecs
421 .state_diff
422 .decode(|| {
424 StateDiff::<T>::decode(&self.cfg.build_version, latest_diff.data.clone())
425 });
426 let mut state = match self
427 .fetch_rollup_at_key(shard_id, &latest_diff.latest_rollup_key)
428 .await
429 {
430 Some(x) => x,
431 None => {
432 retry.retries.inc();
435 let earliest_before_refetch = live_diffs
436 .first()
437 .expect("initialized shard should have at least one diff")
438 .seqno;
439 live_diffs = self.fetch_recent_live_diffs::<T>(shard_id).await.0;
440
441 let earliest_after_refetch = live_diffs
448 .first()
449 .expect("initialized shard should have at least one diff")
450 .seqno;
451 if earliest_before_refetch >= earliest_after_refetch {
452 warn!(
453 concat!(
454 "fetch_current_state refetch expects earliest live diff to advance: {} vs {}. ",
455 "In dev and testing, this happens when persist's Blob (files in mzdata) ",
456 "is deleted out from under it or when two processes are talking to ",
457 "different Blobs (e.g. docker containers without it shared)."
458 ),
459 earliest_before_refetch, earliest_after_refetch
460 )
461 }
462 continue;
463 }
464 };
465
466 state.apply_encoded_diffs(&self.cfg, &self.metrics, &live_diffs);
467 return state;
468 }
469 }
470
471 pub async fn fetch_all_live_states<T>(
475 &self,
476 shard_id: ShardId,
477 ) -> Option<UntypedStateVersionsIter<T>>
478 where
479 T: Timestamp + Lattice + Codec64,
480 {
481 let retry = self
482 .metrics
483 .retries
484 .fetch_live_states
485 .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
486 let mut all_live_diffs = self.fetch_all_live_diffs(&shard_id).await;
487 loop {
488 let earliest_live_diff = match all_live_diffs.first() {
489 Some(x) => x,
490 None => return None,
491 };
492 let state = match self
493 .fetch_rollup_at_seqno(&shard_id, all_live_diffs.clone(), earliest_live_diff.seqno)
494 .await
495 {
496 Some(x) => x,
497 None => {
498 retry.retries.inc();
505 let earliest_before_refetch = earliest_live_diff.seqno;
506 all_live_diffs = self.fetch_all_live_diffs(&shard_id).await;
507
508 let earliest_after_refetch = all_live_diffs
515 .first()
516 .expect("initialized shard should have at least one diff")
517 .seqno;
518 if earliest_before_refetch >= earliest_after_refetch {
519 warn!(
520 concat!(
521 "fetch_all_live_states refetch expects earliest live diff to advance: {} vs {}. ",
522 "In dev and testing, this happens when persist's Blob (files in mzdata) ",
523 "is deleted out from under it or when two processes are talking to ",
524 "different Blobs (e.g. docker containers without it shared)."
525 ),
526 earliest_before_refetch, earliest_after_refetch
527 )
528 }
529 continue;
530 }
531 };
532 assert_eq!(earliest_live_diff.seqno, state.seqno());
533 return Some(UntypedStateVersionsIter {
534 shard_id,
535 cfg: self.cfg.clone(),
536 metrics: Arc::clone(&self.metrics),
537 state,
538 diffs: all_live_diffs,
539 });
540 }
541 }
542
543 pub async fn fetch_all_live_diffs(&self, shard_id: &ShardId) -> Vec<VersionedData> {
549 let path = shard_id.to_string();
550 let diffs = retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
551 self.consensus.scan(&path, SeqNo::minimum(), SCAN_ALL).await
552 })
553 .instrument(debug_span!("fetch_state::scan"))
554 .await;
555 diffs
556 }
557
558 async fn fetch_live_diffs(
561 &self,
562 shard_id: &ShardId,
563 from: SeqNo,
564 limit: usize,
565 ) -> Vec<VersionedData> {
566 let path = shard_id.to_string();
567 retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
568 self.consensus.scan(&path, from, limit).await
569 })
570 .instrument(debug_span!("fetch_state::scan"))
571 .await
572 }
573
574 pub async fn fetch_live_diffs_through(
577 &self,
578 shard_id: &ShardId,
579 through: SeqNo,
580 ) -> Vec<VersionedData> {
581 let scan_limit = STATE_VERSIONS_RECENT_LIVE_DIFFS_LIMIT.get(&self.cfg);
583 let mut versions = self
584 .fetch_live_diffs(shard_id, SeqNo::minimum(), scan_limit)
585 .await;
586
587 if versions.len() == scan_limit {
588 loop {
590 let Some(last_seqno) = versions.last().map(|v| v.seqno) else {
591 break;
592 };
593 if through <= last_seqno {
594 break;
595 }
596 let from = last_seqno.next();
597 let limit = usize::cast_from(through.0 - last_seqno.0).clamp(1, 10 * scan_limit);
598 let more_versions = self.fetch_live_diffs(shard_id, from, limit).await;
599 let more_versions_len = more_versions.len();
600 if let Some(first) = more_versions.first() {
601 assert!(last_seqno < first.seqno);
602 }
603 versions.extend(more_versions);
604 if more_versions_len < limit {
605 break;
606 }
607 }
608 }
609 let partition_index = versions.partition_point(|v| v.seqno <= through);
612 versions.truncate(partition_index);
613 versions
614 }
615
616 pub async fn fetch_recent_live_diffs<T>(&self, shard_id: &ShardId) -> RecentLiveDiffs
623 where
624 T: Timestamp + Lattice + Codec64,
625 {
626 let path = shard_id.to_string();
627 let scan_limit = STATE_VERSIONS_RECENT_LIVE_DIFFS_LIMIT.get(&self.cfg);
628 let oldest_diffs =
629 retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
630 self.consensus
631 .scan(&path, SeqNo::minimum(), scan_limit)
632 .await
633 })
634 .instrument(debug_span!("fetch_state::scan"))
635 .await;
636
637 if oldest_diffs.len() < scan_limit {
640 self.metrics.state.fetch_recent_live_diffs_fast_path.inc();
641 return RecentLiveDiffs(oldest_diffs);
642 }
643
644 let head = retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
655 self.consensus.head(&path).await
656 })
657 .instrument(debug_span!("fetch_state::slow_path::head"))
658 .await
659 .expect("initialized shard should have at least 1 diff");
660
661 let latest_diff = self
662 .metrics
663 .codecs
664 .state_diff
665 .decode(|| StateDiff::<T>::decode(&self.cfg.build_version, head.data));
666
667 match BlobKey::parse_ids(&latest_diff.latest_rollup_key.complete(shard_id)) {
668 Ok((_shard_id, PartialBlobKey::Rollup(seqno, _rollup))) => {
669 self.metrics.state.fetch_recent_live_diffs_slow_path.inc();
670 let diffs =
671 retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
672 self.consensus.scan(&path, seqno, SCAN_ALL).await
676 })
677 .instrument(debug_span!("fetch_state::slow_path::scan"))
678 .await;
679 RecentLiveDiffs(diffs)
680 }
681 Ok(_) => panic!(
682 "invalid state diff rollup key: {}",
683 latest_diff.latest_rollup_key
684 ),
685 Err(err) => panic!("unparseable state diff rollup key: {}", err),
686 }
687 }
688
689 pub async fn fetch_all_live_diffs_gt_seqno<K, V, T, D>(
694 &self,
695 shard_id: &ShardId,
696 seqno: SeqNo,
697 ) -> Vec<VersionedData> {
698 let path = shard_id.to_string();
699 retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
700 self.consensus.scan(&path, seqno.next(), SCAN_ALL).await
701 })
702 .instrument(debug_span!("fetch_state::scan"))
703 .await
704 }
705
706 pub async fn truncate_diffs(&self, shard_id: &ShardId, seqno: SeqNo) {
708 let path = shard_id.to_string();
709 let _deleted_count = retry_external(&self.metrics.retries.external.gc_truncate, || async {
710 self.consensus.truncate(&path, seqno).await
711 })
712 .instrument(debug_span!("gc::truncate"))
713 .await;
714 }
715
716 async fn write_initial_rollup<K, V, T, D>(
720 &self,
721 shard_metrics: &ShardMetrics,
722 ) -> (TypedState<K, V, T, D>, StateDiff<T>)
723 where
724 K: Debug + Codec,
725 V: Debug + Codec,
726 T: Timestamp + Lattice + Codec64,
727 D: Monoid + Codec64,
728 {
729 let empty_state = TypedState::new(
730 self.cfg.build_version.clone(),
731 shard_metrics.shard_id,
732 self.cfg.hostname.clone(),
733 (self.cfg.now)(),
734 );
735 let mut initial_state = empty_state.clone_for_rollup();
736 let rollup_seqno = initial_state.seqno();
737 let rollup = HollowRollup {
738 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
739 encoded_size_bytes: None,
743 };
744 let applied = match initial_state
745 .collections
746 .add_rollup((rollup_seqno, &rollup))
747 {
748 Continue(x) => x,
749 Break(NoOpStateTransition(_)) => {
750 panic!("initial state transition should not be a no-op")
751 }
752 };
753 assert!(
754 applied,
755 "add_and_remove_rollups should apply to the empty state"
756 );
757
758 let rollup = self.encode_rollup_blob(
759 shard_metrics,
760 initial_state.clone_for_rollup(),
761 vec![],
762 rollup.key,
763 );
764 let () = self.write_rollup_blob(&rollup).await;
765 assert_eq!(initial_state.seqno, rollup.seqno);
766
767 let diff = StateDiff::from_diff(&empty_state.state, &initial_state.state);
768 (initial_state, diff)
769 }
770
771 pub async fn write_rollup_for_state<K, V, T, D>(
772 &self,
773 shard_metrics: &ShardMetrics,
774 state: TypedState<K, V, T, D>,
775 rollup_id: &RollupId,
776 ) -> Option<EncodedRollup>
777 where
778 K: Debug + Codec,
779 V: Debug + Codec,
780 T: Timestamp + Lattice + Codec64,
781 D: Monoid + Codec64,
782 {
783 let (latest_rollup_seqno, _rollup) = state.latest_rollup();
784 let seqno = state.seqno();
785
786 let diffs: Vec<_> = self
790 .fetch_all_live_diffs_gt_seqno::<K, V, T, D>(&state.shard_id, *latest_rollup_seqno)
791 .await;
792
793 match diffs.first() {
794 None => {
795 self.metrics.state.rollup_write_noop_latest.inc();
801 assert_eq!(seqno, *latest_rollup_seqno);
802 return None;
803 }
804 Some(first) => {
805 self.metrics.state.rollup_write_noop_truncated.inc();
813 if first.seqno != latest_rollup_seqno.next() {
814 assert!(
815 first.seqno > latest_rollup_seqno.next(),
816 "diff: {}, rollup: {}",
817 first.seqno,
818 latest_rollup_seqno,
819 );
820 return None;
821 }
822 }
823 }
824
825 let diffs: Vec<_> = diffs.into_iter().filter(|x| x.seqno <= seqno).collect();
827
828 assert_eq!(
831 diffs.first().map(|x| x.seqno),
832 Some(latest_rollup_seqno.next())
833 );
834 assert_eq!(diffs.last().map(|x| x.seqno), Some(state.seqno));
835
836 let key = PartialRollupKey::new(state.seqno, rollup_id);
837 let rollup = self.encode_rollup_blob(shard_metrics, state, diffs, key);
838 let () = self.write_rollup_blob(&rollup).await;
839
840 self.metrics.state.rollup_write_success.inc();
841
842 Some(rollup)
843 }
844
845 pub fn encode_rollup_blob<K, V, T, D>(
849 &self,
850 shard_metrics: &ShardMetrics,
851 state: TypedState<K, V, T, D>,
852 diffs: Vec<VersionedData>,
853 key: PartialRollupKey,
854 ) -> EncodedRollup
855 where
856 K: Debug + Codec,
857 V: Debug + Codec,
858 T: Timestamp + Lattice + Codec64,
859 D: Monoid + Codec64,
860 {
861 let shard_id = state.shard_id;
862 let rollup_seqno = state.seqno;
863
864 let rollup = Rollup::from(state.into(), diffs);
865 let desc = rollup.diffs.as_ref().expect("inlined diffs").description();
866
867 let buf = self.metrics.codecs.state.encode(|| {
868 let mut buf = Vec::new();
869 rollup
870 .into_proto()
871 .encode(&mut buf)
872 .expect("no required fields means no initialization errors");
873 Bytes::from(buf)
874 });
875 shard_metrics
876 .latest_rollup_size
877 .set(u64::cast_from(buf.len()));
878 EncodedRollup {
879 shard_id,
880 seqno: rollup_seqno,
881 key,
882 buf,
883 _desc: desc,
884 }
885 }
886
887 pub async fn write_rollup_blob(&self, rollup: &EncodedRollup) {
889 let payload_len = rollup.buf.len();
890 retry_external(&self.metrics.retries.external.rollup_set, || async {
891 self.blob
892 .set(
893 &rollup.key.complete(&rollup.shard_id),
894 Bytes::clone(&rollup.buf),
895 )
896 .await
897 })
898 .instrument(debug_span!("rollup::set", payload_len))
899 .await;
900 }
901
902 async fn fetch_rollup_at_seqno<T>(
909 &self,
910 shard_id: &ShardId,
911 live_diffs: Vec<VersionedData>,
912 seqno: SeqNo,
913 ) -> Option<UntypedState<T>>
914 where
915 T: Timestamp + Lattice + Codec64,
916 {
917 let rollup_key_for_migration = live_diffs.iter().find_map(|x| {
918 let diff = self
919 .metrics
920 .codecs
921 .state_diff
922 .decode(|| StateDiff::<T>::decode(&self.cfg.build_version, x.data.clone()));
924 diff.rollups
925 .iter()
926 .find(|x| x.key == seqno)
927 .map(|x| match &x.val {
928 StateFieldValDiff::Insert(x) => x.clone(),
929 StateFieldValDiff::Update(_, x) => x.clone(),
930 StateFieldValDiff::Delete(x) => x.clone(),
931 })
932 });
933
934 let state = self.fetch_current_state::<T>(shard_id, live_diffs).await;
935 if let Some(rollup) = state.rollups().get(&seqno) {
936 return self.fetch_rollup_at_key(shard_id, &rollup.key).await;
937 }
938
939 let rollup = rollup_key_for_migration.expect("someone should have a key for this rollup");
972 tracing::info!("only found rollup for {} {} via migration", shard_id, seqno);
973 self.metrics.state.rollup_at_seqno_migration.inc();
974 self.fetch_rollup_at_key(shard_id, &rollup.key).await
975 }
976
977 pub async fn fetch_rollup_at_key<T>(
979 &self,
980 shard_id: &ShardId,
981 rollup_key: &PartialRollupKey,
982 ) -> Option<UntypedState<T>>
983 where
984 T: Timestamp + Lattice + Codec64,
985 {
986 retry_external(&self.metrics.retries.external.rollup_get, || async {
987 self.blob.get(&rollup_key.complete(shard_id)).await
988 })
989 .instrument(debug_span!("rollup::get"))
990 .await
991 .map(|buf| {
992 self.metrics
993 .codecs
994 .state
995 .decode(|| UntypedState::decode(&self.cfg.build_version, buf))
996 })
997 }
998
999 pub async fn delete_rollup(&self, shard_id: &ShardId, key: &PartialRollupKey) {
1001 let _ = retry_external(&self.metrics.retries.external.rollup_delete, || async {
1002 self.blob.delete(&key.complete(shard_id)).await
1003 })
1004 .await
1005 .instrument(debug_span!("rollup::delete"));
1006 }
1007}
1008
1009pub struct UntypedStateVersionsIter<T> {
1010 shard_id: ShardId,
1011 cfg: PersistConfig,
1012 metrics: Arc<Metrics>,
1013 state: UntypedState<T>,
1014 diffs: Vec<VersionedData>,
1015}
1016
1017impl<T: Timestamp + Lattice + Codec64> UntypedStateVersionsIter<T> {
1018 pub(crate) fn new(
1019 shard_id: ShardId,
1020 cfg: PersistConfig,
1021 metrics: Arc<Metrics>,
1022 state: UntypedState<T>,
1023 diffs: Vec<VersionedData>,
1024 ) -> Self {
1025 Self {
1026 shard_id,
1027 cfg,
1028 metrics,
1029 state,
1030 diffs,
1031 }
1032 }
1033
1034 pub(crate) fn check_ts_codec(self) -> Result<StateVersionsIter<T>, CodecMismatchT> {
1035 let key_codec = self.state.key_codec.clone();
1036 let val_codec = self.state.val_codec.clone();
1037 let diff_codec = self.state.diff_codec.clone();
1038 let state = self.state.check_ts_codec(&self.shard_id)?;
1039 Ok(StateVersionsIter::new(
1040 self.cfg,
1041 self.metrics,
1042 state,
1043 self.diffs,
1044 key_codec,
1045 val_codec,
1046 diff_codec,
1047 ))
1048 }
1049}
1050
1051pub struct StateVersionsIter<T> {
1053 cfg: PersistConfig,
1054 metrics: Arc<Metrics>,
1055 state: State<T>,
1056 diffs: Vec<VersionedData>,
1057 key_codec: String,
1058 val_codec: String,
1059 diff_codec: String,
1060 #[cfg(debug_assertions)]
1061 validator: ReferencedBlobValidator<T>,
1062}
1063
1064impl<T: Timestamp + Lattice + Codec64> StateVersionsIter<T> {
1065 fn new(
1066 cfg: PersistConfig,
1067 metrics: Arc<Metrics>,
1068 state: State<T>,
1069 mut diffs: Vec<VersionedData>,
1071 key_codec: String,
1072 val_codec: String,
1073 diff_codec: String,
1074 ) -> Self {
1075 assert!(diffs.first().map_or(true, |x| x.seqno == state.seqno));
1076 diffs.reverse();
1077 StateVersionsIter {
1078 cfg,
1079 metrics,
1080 state,
1081 diffs,
1082 key_codec,
1083 val_codec,
1084 diff_codec,
1085 #[cfg(debug_assertions)]
1086 validator: ReferencedBlobValidator::default(),
1087 }
1088 }
1089
1090 pub fn len(&self) -> usize {
1091 self.diffs.len()
1092 }
1093
1094 pub fn next<F: for<'a> FnMut(InspectDiff<'a, T>)>(
1102 &mut self,
1103 mut inspect_diff_fn: F,
1104 ) -> Option<&State<T>> {
1105 let diff = match self.diffs.pop() {
1106 Some(x) => x,
1107 None => return None,
1108 };
1109 let data = diff.data.clone();
1110 let diff = self
1111 .metrics
1112 .codecs
1113 .state_diff
1114 .decode(|| StateDiff::decode(&self.cfg.build_version, diff.data));
1115
1116 if diff.seqno_to == self.state.seqno {
1119 let inspect = InspectDiff::FromInitial(&self.state);
1120 #[cfg(debug_assertions)]
1121 {
1122 inspect
1123 .referenced_blobs()
1124 .for_each(|x| self.validator.add_inc_blob(x));
1125 }
1126 inspect_diff_fn(inspect);
1127 } else {
1128 let inspect = InspectDiff::Diff(&diff);
1129 #[cfg(debug_assertions)]
1130 {
1131 inspect
1132 .referenced_blobs()
1133 .for_each(|x| self.validator.add_inc_blob(x));
1134 }
1135 inspect_diff_fn(inspect);
1136 }
1137
1138 let diff_seqno_to = diff.seqno_to;
1139 self.state
1140 .apply_diffs(&self.metrics, std::iter::once((diff, data)));
1141 assert_eq!(self.state.seqno, diff_seqno_to);
1142 #[cfg(debug_assertions)]
1143 {
1144 self.validator.validate_against_state(&self.state);
1145 }
1146 Some(&self.state)
1147 }
1148
1149 pub fn state(&self) -> &State<T> {
1150 &self.state
1151 }
1152
1153 pub fn into_rollup_proto_without_diffs(&self) -> impl serde::Serialize + use<T> {
1154 Rollup::from_state_without_diffs(
1155 State {
1156 shard_id: self.state.shard_id.clone(),
1157 seqno: self.state.seqno.clone(),
1158 walltime_ms: self.state.walltime_ms.clone(),
1159 hostname: self.state.hostname.clone(),
1160 collections: self.state.collections.clone(),
1161 },
1162 self.key_codec.clone(),
1163 self.val_codec.clone(),
1164 T::codec_name(),
1165 self.diff_codec.clone(),
1166 )
1167 .into_proto()
1168 }
1169}
1170
1171#[derive(Debug)]
1176pub enum InspectDiff<'a, T> {
1177 FromInitial(&'a State<T>),
1178 Diff(&'a StateDiff<T>),
1179}
1180
1181impl<T: Timestamp + Lattice + Codec64> InspectDiff<'_, T> {
1182 pub fn referenced_blobs(&self) -> impl Iterator<Item = HollowBlobRef<'_, T>> {
1186 let (state, diff) = match self {
1187 InspectDiff::FromInitial(x) => (Some(x), None),
1188 InspectDiff::Diff(x) => (None, Some(x)),
1189 };
1190 let state_blobs = state.into_iter().flat_map(|s| s.blobs());
1191 let diff_blobs = diff.into_iter().flat_map(|d| d.blob_inserts());
1192 state_blobs.chain(diff_blobs)
1193 }
1194}
1195
1196#[cfg(debug_assertions)]
1197struct ReferencedBlobValidator<T> {
1198 full_batches: BTreeSet<HollowBatch<T>>,
1201 full_rollups: BTreeSet<HollowRollup>,
1202 inc_batches: BTreeSet<HollowBatch<T>>,
1205 inc_rollups: BTreeSet<HollowRollup>,
1206}
1207
1208#[cfg(debug_assertions)]
1209impl<T> Default for ReferencedBlobValidator<T> {
1210 fn default() -> Self {
1211 Self {
1212 full_batches: Default::default(),
1213 full_rollups: Default::default(),
1214 inc_batches: Default::default(),
1215 inc_rollups: Default::default(),
1216 }
1217 }
1218}
1219
1220#[cfg(debug_assertions)]
1221impl<T: Timestamp + Lattice + Codec64> ReferencedBlobValidator<T> {
1222 fn add_inc_blob(&mut self, x: HollowBlobRef<'_, T>) {
1223 match x {
1224 HollowBlobRef::Batch(x) => assert!(
1225 self.inc_batches.insert(x.clone()) || x.desc.lower() == x.desc.upper(),
1226 "non-empty batches should only be appended once; duplicate: {x:?}"
1227 ),
1228 HollowBlobRef::Rollup(x) => assert!(self.inc_rollups.insert(x.clone())),
1229 }
1230 }
1231 fn validate_against_state(&mut self, x: &State<T>) {
1232 use std::hash::{DefaultHasher, Hash, Hasher};
1233
1234 use mz_ore::collections::HashSet;
1235 use timely::progress::Antichain;
1236
1237 use crate::internal::state::BatchPart;
1238
1239 x.blobs().for_each(|x| match x {
1240 HollowBlobRef::Batch(x) => {
1241 self.full_batches.insert(x.clone());
1242 }
1243 HollowBlobRef::Rollup(x) => {
1244 self.full_rollups.insert(x.clone());
1245 }
1246 });
1247
1248 fn overall_desc<'a, T: Timestamp + Lattice>(
1252 iter: impl Iterator<Item = &'a Description<T>>,
1253 ) -> (Antichain<T>, Antichain<T>) {
1254 let mut lower = Antichain::new();
1255 let mut upper = Antichain::from_elem(T::minimum());
1256 for desc in iter {
1257 lower.meet_assign(desc.lower());
1258 upper.join_assign(desc.upper());
1259 }
1260 (lower, upper)
1261 }
1262 let (inc_lower, inc_upper) = overall_desc(self.inc_batches.iter().map(|a| &a.desc));
1263 let (full_lower, full_upper) = overall_desc(self.full_batches.iter().map(|a| &a.desc));
1264 assert_eq!(inc_lower, full_lower);
1265 assert_eq!(inc_upper, full_upper);
1266
1267 fn part_unique<T: Codec64>(x: &RunPart<T>) -> String {
1268 match x {
1269 RunPart::Single(BatchPart::Inline {
1270 updates,
1271 ts_rewrite,
1272 ..
1273 }) => {
1274 let mut h = DefaultHasher::new();
1275 updates.hash(&mut h);
1276 if let Some(frontier) = &ts_rewrite {
1277 h.write_usize(frontier.len());
1278 frontier.iter().for_each(|t| t.encode().hash(&mut h));
1279 }
1280 h.finish().to_string()
1281 }
1282 other => other.printable_name().to_string(),
1283 }
1284 }
1285
1286 let inc_parts: HashSet<_> = self
1288 .inc_batches
1289 .iter()
1290 .flat_map(|x| x.parts.iter())
1291 .map(part_unique)
1292 .collect();
1293 let full_parts = self
1294 .full_batches
1295 .iter()
1296 .flat_map(|x| x.parts.iter())
1297 .map(part_unique)
1298 .collect();
1299 assert_eq!(inc_parts, full_parts);
1300
1301 assert_eq!(self.inc_rollups, self.full_rollups);
1303 }
1304}
1305
1306#[cfg(test)]
1307mod tests {
1308 use mz_dyncfg::ConfigUpdates;
1309
1310 use crate::tests::new_test_client;
1311
1312 use super::*;
1313
1314 #[mz_persist_proc::test(tokio::test)]
1317 #[cfg_attr(miri, ignore)] async fn fetch_all_live_states_regression_uninitialized(dyncfgs: ConfigUpdates) {
1319 let client = new_test_client(&dyncfgs).await;
1320 let state_versions = StateVersions::new(
1321 client.cfg.clone(),
1322 Arc::clone(&client.consensus),
1323 Arc::clone(&client.blob),
1324 Arc::clone(&client.metrics),
1325 );
1326 assert!(
1327 state_versions
1328 .fetch_all_live_states::<u64>(ShardId::new())
1329 .await
1330 .is_none()
1331 );
1332 }
1333}