Skip to main content

mz_persist_client/internal/
state_versions.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A durable, truncatable log of versions of [State].
11
12#[cfg(debug_assertions)]
13use std::collections::BTreeSet;
14use std::fmt::Debug;
15use std::ops::ControlFlow::{Break, Continue};
16use std::sync::Arc;
17use std::time::SystemTime;
18
19use bytes::Bytes;
20use differential_dataflow::difference::Monoid;
21use differential_dataflow::lattice::Lattice;
22use differential_dataflow::trace::Description;
23use mz_ore::cast::CastFrom;
24use mz_persist::location::{
25    Blob, CaSResult, Consensus, Indeterminate, SCAN_ALL, SeqNo, VersionedData,
26};
27use mz_persist::retry::Retry;
28use mz_persist_types::{Codec, Codec64};
29use mz_proto::RustType;
30use prost::Message;
31use timely::progress::Timestamp;
32use tracing::{Instrument, debug, debug_span, trace, warn};
33
34use crate::cfg::STATE_VERSIONS_RECENT_LIVE_DIFFS_LIMIT;
35use crate::error::{CodecMismatch, CodecMismatchT};
36use crate::internal::encoding::{Rollup, UntypedState};
37use crate::internal::machine::{retry_determinate, retry_external};
38use crate::internal::metrics::ShardMetrics;
39use crate::internal::paths::{BlobKey, PartialBlobKey, PartialRollupKey, RollupId};
40#[cfg(debug_assertions)]
41use crate::internal::state::HollowBatch;
42use crate::internal::state::{
43    BatchPart, HollowBlobRef, HollowRollup, NoOpStateTransition, RunPart, State, TypedState,
44};
45use crate::internal::state_diff::{StateDiff, StateFieldValDiff};
46use crate::{Metrics, PersistConfig, ShardId};
47
48/// A durable, truncatable log of versions of [State].
49///
50/// As persist metadata changes over time, we make its versions (each identified
51/// by a [SeqNo]) durable in two ways:
52/// - `rollups`: Periodic copies of the entirety of [State], written to [Blob].
53/// - `diffs`: Incremental [StateDiff]s, written to [Consensus].
54///
55/// The following invariants are maintained at all times:
56/// - A shard is initialized iff there is at least one version of it in
57///   Consensus.
58/// - The first version of state is written to `SeqNo(1)`. Each successive state
59///   version is assigned its predecessor's SeqNo +1.
60/// - `current`: The latest version of state. By definition, the largest SeqNo
61///   present in Consensus.
62/// - As state changes over time, we keep a range of consecutive versions
63///   available. These are periodically `truncated` to prune old versions that
64///   are no longer necessary.
65/// - `earliest`: The first version of state that it is possible to reconstruct.
66///   - Invariant: `earliest <= current.seqno_since()` (we don't garbage collect
67///     versions still being used by some reader).
68///   - Invariant: `earliest` is always the smallest Seqno present in Consensus.
69///     - This doesn't have to be true, but we select to enforce it.
70///     - Because the data stored at that smallest Seqno is an incremental diff,
71///       to make this invariant work, there needs to be a rollup at either
72///       `earliest-1` or `earliest`. We choose `earliest` because it seems to
73///       make the code easier to reason about in practice.
74///     - A consequence of the above is when we garbage collect old versions of
75///       state, we're only free to truncate ones that are `<` the latest rollup
76///       that is `<= current.seqno_since`.
77/// - `live diffs`: The set of SeqNos present in Consensus at any given time.
78/// - `live states`: The range of state versions that it is possible to
79///   reconstruct: `[earliest,current]`.
80///   - Because of earliest and current invariants above, the range of `live
81///     diffs` and `live states` are the same.
82/// - The set of known rollups are tracked in the shard state itself.
83///   - For efficiency of common operations, the most recent rollup's Blob key
84///     is always denormalized in each StateDiff written to Consensus. (As
85///     described above, there is always a rollup at earliest, so we're
86///     guaranteed that there is always at least one live rollup.)
87///   - Invariant: The rollups in `current` exist in Blob.
88///     - A consequence is that, if a rollup in a state you believe is `current`
89///       doesn't exist, it's a guarantee that `current` has changed (or it's a
90///       bug).
91///   - Any rollup at a version `< earliest-1` is useless (we've lost the
92///     incremental diffs between it and the live states). GC is tasked with
93///     deleting these rollups from Blob before truncating diffs from Consensus.
94///     Thus, any rollup at a seqno < earliest can be considered "leaked" and
95///     deleted by the leaked blob detector.
96///   - Note that this means, while `current`'s rollups exist, it will be common
97///     for other live states to reference rollups that no longer exist.
98#[derive(Debug)]
99pub struct StateVersions {
100    pub(crate) cfg: PersistConfig,
101    pub(crate) consensus: Arc<dyn Consensus>,
102    pub(crate) blob: Arc<dyn Blob>,
103    pub(crate) metrics: Arc<Metrics>,
104}
105
106#[derive(Debug, Clone)]
107pub struct RecentLiveDiffs(pub Vec<VersionedData>);
108
109#[derive(Debug, Clone)]
110pub struct EncodedRollup {
111    pub(crate) shard_id: ShardId,
112    pub(crate) seqno: SeqNo,
113    pub(crate) key: PartialRollupKey,
114    pub(crate) _desc: Description<SeqNo>,
115    buf: Bytes,
116}
117
118impl EncodedRollup {
119    pub fn to_hollow(&self) -> HollowRollup {
120        HollowRollup {
121            key: self.key.clone(),
122            encoded_size_bytes: Some(self.buf.len()),
123        }
124    }
125}
126
127impl StateVersions {
128    pub fn new(
129        cfg: PersistConfig,
130        consensus: Arc<dyn Consensus>,
131        blob: Arc<dyn Blob>,
132        metrics: Arc<Metrics>,
133    ) -> Self {
134        StateVersions {
135            cfg,
136            consensus,
137            blob,
138            metrics,
139        }
140    }
141
142    /// Fetches the `current` state of the requested shard, or creates it if
143    /// uninitialized.
144    pub async fn maybe_init_shard<K, V, T, D>(
145        &self,
146        shard_metrics: &ShardMetrics,
147    ) -> Result<TypedState<K, V, T, D>, Box<CodecMismatch>>
148    where
149        K: Debug + Codec,
150        V: Debug + Codec,
151        T: Timestamp + Lattice + Codec64,
152        D: Monoid + Codec64,
153    {
154        let shard_id = shard_metrics.shard_id;
155
156        // The common case is that the shard is initialized, so try that first
157        let recent_live_diffs = self.fetch_recent_live_diffs::<T>(&shard_id).await;
158        if !recent_live_diffs.0.is_empty() {
159            return self
160                .fetch_current_state(&shard_id, recent_live_diffs.0)
161                .await
162                .check_codecs(&shard_id);
163        }
164
165        // Shard is not initialized, try initializing it.
166        let (initial_state, initial_diff) = self.write_initial_rollup(shard_metrics).await;
167        assert_eq!(
168            initial_state.seqno(),
169            SeqNo::minimum(),
170            "initial state should have the initial seqno"
171        );
172        let (cas_res, _diff) =
173            retry_external(&self.metrics.retries.external.maybe_init_cas, || async {
174                self.try_compare_and_set_current(
175                    "maybe_init_shard",
176                    shard_metrics,
177                    &initial_state,
178                    &initial_diff,
179                )
180                .await
181                .map_err(|err| err.into())
182            })
183            .await;
184        match cas_res {
185            CaSResult::Committed => Ok(initial_state),
186            CaSResult::ExpectationMismatch => {
187                let recent_live_diffs = self.fetch_recent_live_diffs::<T>(&shard_id).await;
188                let state = self
189                    .fetch_current_state(&shard_id, recent_live_diffs.0)
190                    .await
191                    .check_codecs(&shard_id);
192
193                // Clean up the rollup blob that we were trying to reference.
194                //
195                // SUBTLE: If we got an Indeterminate error in the CaS above,
196                // but it actually went through, then we'll "contend" with
197                // ourselves and get an expectation mismatch. Use the actual
198                // fetched state to determine if our rollup actually made it in
199                // and decide whether to delete based on that.
200                let (_, rollup) = initial_state.latest_rollup();
201                let should_delete_rollup = match state.as_ref() {
202                    Ok(state) => !state
203                        .collections
204                        .rollups
205                        .values()
206                        .any(|x| &x.key == &rollup.key),
207                    // If the codecs don't match, then we definitely didn't
208                    // write the state.
209                    Err(_codec_mismatch) => true,
210                };
211                if should_delete_rollup {
212                    self.delete_rollup(&shard_id, &rollup.key).await;
213                }
214
215                state
216            }
217        }
218    }
219
220    /// Updates the state of a shard to a new `current` iff `expected` matches
221    /// `current`.
222    ///
223    /// May be called on uninitialized shards.
224    pub async fn try_compare_and_set_current<K, V, T, D>(
225        &self,
226        cmd_name: &str,
227        shard_metrics: &ShardMetrics,
228        new_state: &TypedState<K, V, T, D>,
229        diff: &StateDiff<T>,
230    ) -> Result<(CaSResult, VersionedData), Indeterminate>
231    where
232        K: Debug + Codec,
233        V: Debug + Codec,
234        T: Timestamp + Lattice + Codec64,
235        D: Monoid + Codec64,
236    {
237        assert_eq!(shard_metrics.shard_id, new_state.shard_id);
238        let path = new_state.shard_id.to_string();
239
240        trace!(
241            "apply_unbatched_cmd {} attempting {}\n  new_state={:?}",
242            cmd_name,
243            new_state.seqno(),
244            new_state
245        );
246        let new = self.metrics.codecs.state_diff.encode(|| {
247            let mut buf = Vec::new();
248            diff.encode(&mut buf);
249            VersionedData {
250                seqno: new_state.seqno(),
251                data: Bytes::from(buf),
252            }
253        });
254        assert_eq!(new.seqno, diff.seqno_to);
255
256        let payload_len = new.data.len();
257        let cas_res = retry_determinate(
258            &self.metrics.retries.determinate.apply_unbatched_cmd_cas,
259            || async { self.consensus.compare_and_set(&path, new.clone()).await },
260        )
261        .instrument(debug_span!("apply_unbatched_cmd::cas", payload_len))
262        .await
263        .map_err(|err| {
264            debug!("apply_unbatched_cmd {} errored: {}", cmd_name, err);
265            err
266        })?;
267
268        match cas_res {
269            CaSResult::Committed => {
270                trace!(
271                    "apply_unbatched_cmd {} succeeded {}\n  new_state={:?}",
272                    cmd_name,
273                    new_state.seqno(),
274                    new_state
275                );
276
277                shard_metrics.set_since(new_state.since());
278                shard_metrics.set_upper(new_state.upper());
279                shard_metrics.seqnos_since_last_rollup.set(
280                    new_state
281                        .seqno
282                        .0
283                        .saturating_sub(new_state.latest_rollup().0.0),
284                );
285                shard_metrics
286                    .spine_batch_count
287                    .set(u64::cast_from(new_state.spine_batch_count()));
288                let size_metrics = new_state.size_metrics();
289                shard_metrics
290                    .schema_registry_version_count
291                    .set(u64::cast_from(new_state.collections.schemas.len()));
292                shard_metrics
293                    .hollow_batch_count
294                    .set(u64::cast_from(size_metrics.hollow_batch_count));
295                shard_metrics
296                    .batch_part_count
297                    .set(u64::cast_from(size_metrics.batch_part_count));
298                shard_metrics
299                    .rewrite_part_count
300                    .set(u64::cast_from(size_metrics.rewrite_part_count));
301                shard_metrics
302                    .update_count
303                    .set(u64::cast_from(size_metrics.num_updates));
304                shard_metrics
305                    .rollup_count
306                    .set(u64::cast_from(size_metrics.state_rollup_count));
307                shard_metrics
308                    .largest_batch_size
309                    .set(u64::cast_from(size_metrics.largest_batch_bytes));
310                shard_metrics
311                    .usage_current_state_batches_bytes
312                    .set(u64::cast_from(size_metrics.state_batches_bytes));
313                shard_metrics
314                    .usage_current_state_rollups_bytes
315                    .set(u64::cast_from(size_metrics.state_rollups_bytes));
316                shard_metrics
317                    .seqnos_held
318                    .set(u64::cast_from(new_state.seqnos_held()));
319                shard_metrics
320                    .encoded_diff_size
321                    .inc_by(u64::cast_from(payload_len));
322                shard_metrics
323                    .live_writers
324                    .set(u64::cast_from(new_state.collections.writers.len()));
325                shard_metrics
326                    .rewrite_part_count
327                    .set(u64::cast_from(size_metrics.rewrite_part_count));
328                shard_metrics
329                    .inline_part_count
330                    .set(u64::cast_from(size_metrics.inline_part_count));
331                shard_metrics
332                    .inline_part_bytes
333                    .set(u64::cast_from(size_metrics.inline_part_bytes));
334                shard_metrics.stale_version.set(
335                    if new_state
336                        .state
337                        .collections
338                        .version
339                        .cmp_precedence(&self.cfg.build_version)
340                        .is_lt()
341                    {
342                        1
343                    } else {
344                        0
345                    },
346                );
347
348                let spine_metrics = new_state.collections.trace.spine_metrics();
349                shard_metrics
350                    .compact_batches
351                    .set(spine_metrics.compact_batches);
352                shard_metrics
353                    .compacting_batches
354                    .set(spine_metrics.compacting_batches);
355                shard_metrics
356                    .noncompact_batches
357                    .set(spine_metrics.noncompact_batches);
358
359                let batch_parts_by_version = new_state
360                    .collections
361                    .trace
362                    .batches()
363                    .flat_map(|x| x.parts.iter())
364                    .flat_map(|part| {
365                        let key = match part {
366                            RunPart::Many(x) => Some(&x.key),
367                            RunPart::Single(BatchPart::Hollow(x)) => Some(&x.key),
368                            // TODO: Would be nice to include these too, but we lose the info atm.
369                            RunPart::Single(BatchPart::Inline { .. }) => None,
370                        }?;
371                        // Carefully avoid any String allocs by splitting.
372                        let (writer_key, _) = key.0.split_once('/')?;
373                        match &writer_key[..1] {
374                            "w" => Some(("old", part.encoded_size_bytes())),
375                            "n" => Some((&writer_key[1..], part.encoded_size_bytes())),
376                            _ => None,
377                        }
378                    });
379                shard_metrics.set_batch_part_versions(batch_parts_by_version);
380
381                Ok((CaSResult::Committed, new))
382            }
383            CaSResult::ExpectationMismatch => {
384                debug!(
385                    "apply_unbatched_cmd {} {} lost the CaS race, retrying: {:?}",
386                    new_state.shard_id(),
387                    cmd_name,
388                    new_state.seqno.previous(),
389                );
390                Ok((CaSResult::ExpectationMismatch, new))
391            }
392        }
393    }
394
395    /// Fetches the `current` state of the requested shard.
396    ///
397    /// Uses the provided hint (live_diffs), which is a possibly outdated
398    /// copy of all or recent live diffs, to avoid fetches where possible.
399    ///
400    /// Panics if called on an uninitialized shard.
401    pub async fn fetch_current_state<T>(
402        &self,
403        shard_id: &ShardId,
404        mut live_diffs: Vec<VersionedData>,
405    ) -> UntypedState<T>
406    where
407        T: Timestamp + Lattice + Codec64,
408    {
409        let retry = self
410            .metrics
411            .retries
412            .fetch_latest_state
413            .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
414        loop {
415            let latest_diff = live_diffs
416                .last()
417                .expect("initialized shard should have at least one diff");
418            let latest_diff = self
419                .metrics
420                .codecs
421                .state_diff
422                // Note: `latest_diff.data` is a `Bytes`, so cloning just increments a ref count
423                .decode(|| {
424                    StateDiff::<T>::decode(&self.cfg.build_version, latest_diff.data.clone())
425                });
426            let mut state = match self
427                .fetch_rollup_at_key(shard_id, &latest_diff.latest_rollup_key)
428                .await
429            {
430                Some(x) => x,
431                None => {
432                    // The rollup that this diff referenced is gone, so the diff
433                    // must be out of date. Try again. Intentionally don't sleep on retry.
434                    retry.retries.inc();
435                    let earliest_before_refetch = live_diffs
436                        .first()
437                        .expect("initialized shard should have at least one diff")
438                        .seqno;
439                    live_diffs = self.fetch_recent_live_diffs::<T>(shard_id).await.0;
440
441                    // We should only hit the race condition that leads to a
442                    // refetch if the set of live diffs changed out from under
443                    // us.
444                    //
445                    // TODO: Make this an assert once we're 100% sure the above
446                    // is always true.
447                    let earliest_after_refetch = live_diffs
448                        .first()
449                        .expect("initialized shard should have at least one diff")
450                        .seqno;
451                    if earliest_before_refetch >= earliest_after_refetch {
452                        warn!(
453                            concat!(
454                                "fetch_current_state refetch expects earliest live diff to advance: {} vs {}. ",
455                                "In dev and testing, this happens when persist's Blob (files in mzdata) ",
456                                "is deleted out from under it or when two processes are talking to ",
457                                "different Blobs (e.g. docker containers without it shared)."
458                            ),
459                            earliest_before_refetch, earliest_after_refetch
460                        )
461                    }
462                    continue;
463                }
464            };
465
466            state.apply_encoded_diffs(&self.cfg, &self.metrics, &live_diffs);
467            return state;
468        }
469    }
470
471    /// Returns an iterator over all live states for the requested shard.
472    ///
473    /// Returns None if called on an uninitialized shard.
474    pub async fn fetch_all_live_states<T>(
475        &self,
476        shard_id: ShardId,
477    ) -> Option<UntypedStateVersionsIter<T>>
478    where
479        T: Timestamp + Lattice + Codec64,
480    {
481        let retry = self
482            .metrics
483            .retries
484            .fetch_live_states
485            .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
486        let mut all_live_diffs = self.fetch_all_live_diffs(&shard_id).await;
487        loop {
488            let earliest_live_diff = match all_live_diffs.first() {
489                Some(x) => x,
490                None => return None,
491            };
492            let state = match self
493                .fetch_rollup_at_seqno(&shard_id, all_live_diffs.clone(), earliest_live_diff.seqno)
494                .await
495            {
496                Some(x) => x,
497                None => {
498                    // We maintain an invariant that a rollup always exists for
499                    // the earliest live diff. Since we didn't find out, that
500                    // can only mean that the live_diffs we just fetched are
501                    // obsolete (there's a race condition with gc). This should
502                    // be rare in practice, so inc a counter and try again.
503                    // Intentionally don't sleep on retry.
504                    retry.retries.inc();
505                    let earliest_before_refetch = earliest_live_diff.seqno;
506                    all_live_diffs = self.fetch_all_live_diffs(&shard_id).await;
507
508                    // We should only hit the race condition that leads to a
509                    // refetch if the set of live diffs changed out from under
510                    // us.
511                    //
512                    // TODO: Make this an assert once we're 100% sure the above
513                    // is always true.
514                    let earliest_after_refetch = all_live_diffs
515                        .first()
516                        .expect("initialized shard should have at least one diff")
517                        .seqno;
518                    if earliest_before_refetch >= earliest_after_refetch {
519                        warn!(
520                            concat!(
521                                "fetch_all_live_states refetch expects earliest live diff to advance: {} vs {}. ",
522                                "In dev and testing, this happens when persist's Blob (files in mzdata) ",
523                                "is deleted out from under it or when two processes are talking to ",
524                                "different Blobs (e.g. docker containers without it shared)."
525                            ),
526                            earliest_before_refetch, earliest_after_refetch
527                        )
528                    }
529                    continue;
530                }
531            };
532            assert_eq!(earliest_live_diff.seqno, state.seqno());
533            return Some(UntypedStateVersionsIter {
534                shard_id,
535                cfg: self.cfg.clone(),
536                metrics: Arc::clone(&self.metrics),
537                state,
538                diffs: all_live_diffs,
539            });
540        }
541    }
542
543    /// Fetches all live_diffs for a shard. Intended only for when a caller needs to reconstruct
544    /// _all_ states still referenced by Consensus. Prefer [Self::fetch_recent_live_diffs] when
545    /// the caller simply needs to fetch the latest state.
546    ///
547    /// Returns an empty Vec iff called on an uninitialized shard.
548    pub async fn fetch_all_live_diffs(&self, shard_id: &ShardId) -> Vec<VersionedData> {
549        let path = shard_id.to_string();
550        let diffs = retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
551            self.consensus.scan(&path, SeqNo::minimum(), SCAN_ALL).await
552        })
553        .instrument(debug_span!("fetch_state::scan"))
554        .await;
555        diffs
556    }
557
558    /// Fetches live diffs for a shard. This is a thin wrapper around [Consensus::scan] with the
559    /// right retry policy and instrumentation.
560    async fn fetch_live_diffs(
561        &self,
562        shard_id: &ShardId,
563        from: SeqNo,
564        limit: usize,
565    ) -> Vec<VersionedData> {
566        let path = shard_id.to_string();
567        retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
568            self.consensus.scan(&path, from, limit).await
569        })
570        .instrument(debug_span!("fetch_state::scan"))
571        .await
572    }
573
574    /// Fetches all live_diffs for a shard up to and including a given threshold, allowing us to
575    /// reconstruct states up to and including that version.
576    pub async fn fetch_live_diffs_through(
577        &self,
578        shard_id: &ShardId,
579        through: SeqNo,
580    ) -> Vec<VersionedData> {
581        // Get an initial set of versions from consensus.
582        let scan_limit = STATE_VERSIONS_RECENT_LIVE_DIFFS_LIMIT.get(&self.cfg);
583        let mut versions = self
584            .fetch_live_diffs(shard_id, SeqNo::minimum(), scan_limit)
585            .await;
586
587        if versions.len() == scan_limit {
588            // Loop until our version range either covers the full set, or we stop getting data.
589            loop {
590                let Some(last_seqno) = versions.last().map(|v| v.seqno) else {
591                    break;
592                };
593                if through <= last_seqno {
594                    break;
595                }
596                let from = last_seqno.next();
597                let limit = usize::cast_from(through.0 - last_seqno.0).clamp(1, 10 * scan_limit);
598                let more_versions = self.fetch_live_diffs(shard_id, from, limit).await;
599                let more_versions_len = more_versions.len();
600                if let Some(first) = more_versions.first() {
601                    assert!(last_seqno < first.seqno);
602                }
603                versions.extend(more_versions);
604                if more_versions_len < limit {
605                    break;
606                }
607            }
608        }
609        // We may have fetched more versions than requested; find the index past the last
610        // requested version and truncate there.
611        let partition_index = versions.partition_point(|v| v.seqno <= through);
612        versions.truncate(partition_index);
613        versions
614    }
615
616    /// Fetches recent live_diffs for a shard. Intended for when a caller needs to fetch
617    /// the latest state in Consensus.
618    ///
619    /// "Recent" is defined as either:
620    /// * All of the diffs known in Consensus
621    /// * All of the diffs in Consensus after the latest rollup
622    pub async fn fetch_recent_live_diffs<T>(&self, shard_id: &ShardId) -> RecentLiveDiffs
623    where
624        T: Timestamp + Lattice + Codec64,
625    {
626        let path = shard_id.to_string();
627        let scan_limit = STATE_VERSIONS_RECENT_LIVE_DIFFS_LIMIT.get(&self.cfg);
628        let oldest_diffs =
629            retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
630                self.consensus
631                    .scan(&path, SeqNo::minimum(), scan_limit)
632                    .await
633            })
634            .instrument(debug_span!("fetch_state::scan"))
635            .await;
636
637        // fast-path: we found all known diffs in a single page of our scan. we expect almost all
638        // calls to go down this path, unless a reader has a very long seqno-hold on the shard.
639        if oldest_diffs.len() < scan_limit {
640            self.metrics.state.fetch_recent_live_diffs_fast_path.inc();
641            return RecentLiveDiffs(oldest_diffs);
642        }
643
644        // slow-path: we could be arbitrarily far behind the head of Consensus (either intentionally
645        // due to a long seqno-hold from a reader, or unintentionally from a bug that's preventing
646        // a seqno-hold from advancing). rather than scanning a potentially unbounded number of old
647        // states in Consensus, we jump to the latest state, determine the seqno of the most recent
648        // rollup, and then fetch all the diffs from that point onward.
649        //
650        // this approach requires more network calls, but it should smooth out our access pattern
651        // and use only bounded calls to Consensus. additionally, if `limit` is adequately tuned,
652        // this path will only be invoked when there's an excess number of states in Consensus and
653        // it might be slower to do a single long scan over unneeded rows.
654        let head = retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
655            self.consensus.head(&path).await
656        })
657        .instrument(debug_span!("fetch_state::slow_path::head"))
658        .await
659        .expect("initialized shard should have at least 1 diff");
660
661        let latest_diff = self
662            .metrics
663            .codecs
664            .state_diff
665            .decode(|| StateDiff::<T>::decode(&self.cfg.build_version, head.data));
666
667        match BlobKey::parse_ids(&latest_diff.latest_rollup_key.complete(shard_id)) {
668            Ok((_shard_id, PartialBlobKey::Rollup(seqno, _rollup))) => {
669                self.metrics.state.fetch_recent_live_diffs_slow_path.inc();
670                let diffs =
671                    retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
672                        // (pedantry) this call is technically unbounded, but something very strange
673                        // would have had to happen to have accumulated so many states between our
674                        // call to `head` and this invocation for it to become problematic
675                        self.consensus.scan(&path, seqno, SCAN_ALL).await
676                    })
677                    .instrument(debug_span!("fetch_state::slow_path::scan"))
678                    .await;
679                RecentLiveDiffs(diffs)
680            }
681            Ok(_) => panic!(
682                "invalid state diff rollup key: {}",
683                latest_diff.latest_rollup_key
684            ),
685            Err(err) => panic!("unparseable state diff rollup key: {}", err),
686        }
687    }
688
689    /// Fetches all live diffs greater than the given SeqNo.
690    ///
691    /// TODO: Apply a limit to this scan. This could additionally be used as an internal
692    /// call within `fetch_recent_live_diffs`.
693    pub async fn fetch_all_live_diffs_gt_seqno<K, V, T, D>(
694        &self,
695        shard_id: &ShardId,
696        seqno: SeqNo,
697    ) -> Vec<VersionedData> {
698        let path = shard_id.to_string();
699        retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
700            self.consensus.scan(&path, seqno.next(), SCAN_ALL).await
701        })
702        .instrument(debug_span!("fetch_state::scan"))
703        .await
704    }
705
706    /// Truncates any diffs in consensus less than the given seqno.
707    pub async fn truncate_diffs(&self, shard_id: &ShardId, seqno: SeqNo) {
708        let path = shard_id.to_string();
709        let _deleted_count = retry_external(&self.metrics.retries.external.gc_truncate, || async {
710            self.consensus.truncate(&path, seqno).await
711        })
712        .instrument(debug_span!("gc::truncate"))
713        .await;
714    }
715
716    // Writes a self-referential rollup to blob storage and returns the diff
717    // that should be compare_and_set into consensus to finish initializing the
718    // shard.
719    async fn write_initial_rollup<K, V, T, D>(
720        &self,
721        shard_metrics: &ShardMetrics,
722    ) -> (TypedState<K, V, T, D>, StateDiff<T>)
723    where
724        K: Debug + Codec,
725        V: Debug + Codec,
726        T: Timestamp + Lattice + Codec64,
727        D: Monoid + Codec64,
728    {
729        let empty_state = TypedState::new(
730            self.cfg.build_version.clone(),
731            shard_metrics.shard_id,
732            self.cfg.hostname.clone(),
733            (self.cfg.now)(),
734        );
735        let mut initial_state = empty_state.clone_for_rollup();
736        let rollup_seqno = initial_state.seqno();
737        let rollup = HollowRollup {
738            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
739            // Chicken-and-egg problem here. We don't know the size of the
740            // rollup until we encode it, but it includes a reference back to
741            // itself.
742            encoded_size_bytes: None,
743        };
744        let applied = match initial_state
745            .collections
746            .add_rollup((rollup_seqno, &rollup))
747        {
748            Continue(x) => x,
749            Break(NoOpStateTransition(_)) => {
750                panic!("initial state transition should not be a no-op")
751            }
752        };
753        assert!(
754            applied,
755            "add_and_remove_rollups should apply to the empty state"
756        );
757
758        let rollup = self.encode_rollup_blob(
759            shard_metrics,
760            initial_state.clone_for_rollup(),
761            vec![],
762            rollup.key,
763        );
764        let () = self.write_rollup_blob(&rollup).await;
765        assert_eq!(initial_state.seqno, rollup.seqno);
766
767        let diff = StateDiff::from_diff(&empty_state.state, &initial_state.state);
768        (initial_state, diff)
769    }
770
771    pub async fn write_rollup_for_state<K, V, T, D>(
772        &self,
773        shard_metrics: &ShardMetrics,
774        state: TypedState<K, V, T, D>,
775        rollup_id: &RollupId,
776    ) -> Option<EncodedRollup>
777    where
778        K: Debug + Codec,
779        V: Debug + Codec,
780        T: Timestamp + Lattice + Codec64,
781        D: Monoid + Codec64,
782    {
783        let (latest_rollup_seqno, _rollup) = state.latest_rollup();
784        let seqno = state.seqno();
785
786        // TODO: maintain the diffs since the latest rollup in-memory rather than
787        // needing an additional API call here. This would reduce Consensus load
788        // / avoid races with Consensus truncation, but is trickier to write.
789        let diffs: Vec<_> = self
790            .fetch_all_live_diffs_gt_seqno::<K, V, T, D>(&state.shard_id, *latest_rollup_seqno)
791            .await;
792
793        match diffs.first() {
794            None => {
795                // early-out because these are no diffs past our latest rollup.
796                //
797                // this should only occur in the initial state, but we can write a more
798                // general assertion: if no live diffs exist past this state's latest
799                // known rollup, then that rollup must be for the latest known state.
800                self.metrics.state.rollup_write_noop_latest.inc();
801                assert_eq!(seqno, *latest_rollup_seqno);
802                return None;
803            }
804            Some(first) => {
805                // early-out if it is no longer possible to inline all the diffs from
806                // the last known rollup to the current state. some or all of the diffs
807                // have already been truncated by another process.
808                //
809                // this can happen if one process gets told to write a rollup, the
810                // maintenance task falls arbitrarily behind, and another process writes
811                // a new rollup / GCs and truncates past the first process's rollup.
812                self.metrics.state.rollup_write_noop_truncated.inc();
813                if first.seqno != latest_rollup_seqno.next() {
814                    assert!(
815                        first.seqno > latest_rollup_seqno.next(),
816                        "diff: {}, rollup: {}",
817                        first.seqno,
818                        latest_rollup_seqno,
819                    );
820                    return None;
821                }
822            }
823        }
824
825        // we may have fetched more diffs than we need: trim anything beyond the state's seqno
826        let diffs: Vec<_> = diffs.into_iter().filter(|x| x.seqno <= seqno).collect();
827
828        // verify that we've done all the filtering correctly and that our
829        // diffs have seqnos bounded by (last_rollup, current_state]
830        assert_eq!(
831            diffs.first().map(|x| x.seqno),
832            Some(latest_rollup_seqno.next())
833        );
834        assert_eq!(diffs.last().map(|x| x.seqno), Some(state.seqno));
835
836        let key = PartialRollupKey::new(state.seqno, rollup_id);
837        let rollup = self.encode_rollup_blob(shard_metrics, state, diffs, key);
838        let () = self.write_rollup_blob(&rollup).await;
839
840        self.metrics.state.rollup_write_success.inc();
841
842        Some(rollup)
843    }
844
845    /// Encodes the given state and diffs as a rollup to be written to the specified key.
846    ///
847    /// The diffs must span the seqno range `(state.last_rollup().seqno, state.seqno]`.
848    pub fn encode_rollup_blob<K, V, T, D>(
849        &self,
850        shard_metrics: &ShardMetrics,
851        state: TypedState<K, V, T, D>,
852        diffs: Vec<VersionedData>,
853        key: PartialRollupKey,
854    ) -> EncodedRollup
855    where
856        K: Debug + Codec,
857        V: Debug + Codec,
858        T: Timestamp + Lattice + Codec64,
859        D: Monoid + Codec64,
860    {
861        let shard_id = state.shard_id;
862        let rollup_seqno = state.seqno;
863
864        let rollup = Rollup::from(state.into(), diffs);
865        let desc = rollup.diffs.as_ref().expect("inlined diffs").description();
866
867        let buf = self.metrics.codecs.state.encode(|| {
868            let mut buf = Vec::new();
869            rollup
870                .into_proto()
871                .encode(&mut buf)
872                .expect("no required fields means no initialization errors");
873            Bytes::from(buf)
874        });
875        shard_metrics
876            .latest_rollup_size
877            .set(u64::cast_from(buf.len()));
878        EncodedRollup {
879            shard_id,
880            seqno: rollup_seqno,
881            key,
882            buf,
883            _desc: desc,
884        }
885    }
886
887    /// Writes the given state rollup out to blob.
888    pub async fn write_rollup_blob(&self, rollup: &EncodedRollup) {
889        let payload_len = rollup.buf.len();
890        retry_external(&self.metrics.retries.external.rollup_set, || async {
891            self.blob
892                .set(
893                    &rollup.key.complete(&rollup.shard_id),
894                    Bytes::clone(&rollup.buf),
895                )
896                .await
897        })
898        .instrument(debug_span!("rollup::set", payload_len))
899        .await;
900    }
901
902    /// Fetches a rollup for the given SeqNo, if it exists.
903    ///
904    /// Uses the provided hint, which is a possibly outdated copy of all
905    /// or recent live diffs, to avoid fetches where possible.
906    ///
907    /// Panics if called on an uninitialized shard.
908    async fn fetch_rollup_at_seqno<T>(
909        &self,
910        shard_id: &ShardId,
911        live_diffs: Vec<VersionedData>,
912        seqno: SeqNo,
913    ) -> Option<UntypedState<T>>
914    where
915        T: Timestamp + Lattice + Codec64,
916    {
917        let rollup_key_for_migration = live_diffs.iter().find_map(|x| {
918            let diff = self
919                .metrics
920                .codecs
921                .state_diff
922                // Note: `x.data` is a `Bytes`, so cloning just increments a ref count
923                .decode(|| StateDiff::<T>::decode(&self.cfg.build_version, x.data.clone()));
924            diff.rollups
925                .iter()
926                .find(|x| x.key == seqno)
927                .map(|x| match &x.val {
928                    StateFieldValDiff::Insert(x) => x.clone(),
929                    StateFieldValDiff::Update(_, x) => x.clone(),
930                    StateFieldValDiff::Delete(x) => x.clone(),
931                })
932        });
933
934        let state = self.fetch_current_state::<T>(shard_id, live_diffs).await;
935        if let Some(rollup) = state.rollups().get(&seqno) {
936            return self.fetch_rollup_at_key(shard_id, &rollup.key).await;
937        }
938
939        // MIGRATION: We maintain an invariant that the _current state_ contains
940        // a rollup for the _earliest live diff_ in consensus (and that the
941        // referenced rollup exists). At one point, we fixed a bug that could
942        // lead to that invariant being violated.
943        //
944        // If the earliest live diff is X and we receive a gc req for X+Y to
945        // X+Y+Z (this can happen e.g. if some cmd ignores an earlier req for X
946        // to X+Y, or if they're processing concurrently and the X to X+Y req
947        // loses the race), then the buggy version of gc would delete any
948        // rollups strictly less than old_seqno_since (X+Y in this example). But
949        // our invariant is that the rollup exists for the earliest live diff,
950        // in this case X. So if the first call to gc was interrupted after this
951        // but before truncate (when all the blob deletes happen), later calls
952        // to gc would attempt to call `fetch_live_states` and end up infinitely
953        // in its loop.
954        //
955        // The fix was to base which rollups are deleteable on the earliest live
956        // diff, not old_seqno_since.
957        //
958        // Sadly, some envs in prod now violate this invariant. So, even with
959        // the fix, existing shards will never successfully run gc. We add a
960        // temporary migration to fix them in `fetch_rollup_at_seqno`. This
961        // method normally looks in the latest version of state for the
962        // specifically requested seqno. In the invariant violation case, some
963        // version of state in the range `[earliest, current]` has a rollup for
964        // earliest, but current doesn't. So, for the migration, if
965        // fetch_rollup_at_seqno doesn't find a rollup in current, then we fall
966        // back to sniffing one out of raw diffs. If this success, we increment
967        // a counter and log, so we can track how often this migration is
968        // bailing us out. After the next deploy, this should initially start at
969        // > 0 and then settle down to 0. After the next prod envs wipe, we can
970        // remove the migration.
971        let rollup = rollup_key_for_migration.expect("someone should have a key for this rollup");
972        tracing::info!("only found rollup for {} {} via migration", shard_id, seqno);
973        self.metrics.state.rollup_at_seqno_migration.inc();
974        self.fetch_rollup_at_key(shard_id, &rollup.key).await
975    }
976
977    /// Fetches the rollup at the given key, if it exists.
978    pub async fn fetch_rollup_at_key<T>(
979        &self,
980        shard_id: &ShardId,
981        rollup_key: &PartialRollupKey,
982    ) -> Option<UntypedState<T>>
983    where
984        T: Timestamp + Lattice + Codec64,
985    {
986        retry_external(&self.metrics.retries.external.rollup_get, || async {
987            self.blob.get(&rollup_key.complete(shard_id)).await
988        })
989        .instrument(debug_span!("rollup::get"))
990        .await
991        .map(|buf| {
992            self.metrics
993                .codecs
994                .state
995                .decode(|| UntypedState::decode(&self.cfg.build_version, buf))
996        })
997    }
998
999    /// Deletes the rollup at the given key, if it exists.
1000    pub async fn delete_rollup(&self, shard_id: &ShardId, key: &PartialRollupKey) {
1001        let _ = retry_external(&self.metrics.retries.external.rollup_delete, || async {
1002            self.blob.delete(&key.complete(shard_id)).await
1003        })
1004        .await
1005        .instrument(debug_span!("rollup::delete"));
1006    }
1007}
1008
1009pub struct UntypedStateVersionsIter<T> {
1010    shard_id: ShardId,
1011    cfg: PersistConfig,
1012    metrics: Arc<Metrics>,
1013    state: UntypedState<T>,
1014    diffs: Vec<VersionedData>,
1015}
1016
1017impl<T: Timestamp + Lattice + Codec64> UntypedStateVersionsIter<T> {
1018    pub(crate) fn new(
1019        shard_id: ShardId,
1020        cfg: PersistConfig,
1021        metrics: Arc<Metrics>,
1022        state: UntypedState<T>,
1023        diffs: Vec<VersionedData>,
1024    ) -> Self {
1025        Self {
1026            shard_id,
1027            cfg,
1028            metrics,
1029            state,
1030            diffs,
1031        }
1032    }
1033
1034    pub(crate) fn check_ts_codec(self) -> Result<StateVersionsIter<T>, CodecMismatchT> {
1035        let key_codec = self.state.key_codec.clone();
1036        let val_codec = self.state.val_codec.clone();
1037        let diff_codec = self.state.diff_codec.clone();
1038        let state = self.state.check_ts_codec(&self.shard_id)?;
1039        Ok(StateVersionsIter::new(
1040            self.cfg,
1041            self.metrics,
1042            state,
1043            self.diffs,
1044            key_codec,
1045            val_codec,
1046            diff_codec,
1047        ))
1048    }
1049}
1050
1051/// An iterator over consecutive versions of [State].
1052pub struct StateVersionsIter<T> {
1053    cfg: PersistConfig,
1054    metrics: Arc<Metrics>,
1055    state: State<T>,
1056    diffs: Vec<VersionedData>,
1057    key_codec: String,
1058    val_codec: String,
1059    diff_codec: String,
1060    #[cfg(debug_assertions)]
1061    validator: ReferencedBlobValidator<T>,
1062}
1063
1064impl<T: Timestamp + Lattice + Codec64> StateVersionsIter<T> {
1065    fn new(
1066        cfg: PersistConfig,
1067        metrics: Arc<Metrics>,
1068        state: State<T>,
1069        // diffs is stored reversed so we can efficiently pop off the Vec.
1070        mut diffs: Vec<VersionedData>,
1071        key_codec: String,
1072        val_codec: String,
1073        diff_codec: String,
1074    ) -> Self {
1075        assert!(diffs.first().map_or(true, |x| x.seqno == state.seqno));
1076        diffs.reverse();
1077        StateVersionsIter {
1078            cfg,
1079            metrics,
1080            state,
1081            diffs,
1082            key_codec,
1083            val_codec,
1084            diff_codec,
1085            #[cfg(debug_assertions)]
1086            validator: ReferencedBlobValidator::default(),
1087        }
1088    }
1089
1090    pub fn len(&self) -> usize {
1091        self.diffs.len()
1092    }
1093
1094    /// Advances first to some starting state (in practice, usually the first
1095    /// live state), and then through each successive state, for as many diffs
1096    /// as this iterator was initialized with.
1097    ///
1098    /// The `inspect_diff_fn` callback can be used to inspect diffs directly as
1099    /// they are applied. The first call to `next` returns a
1100    /// [InspectDiff::FromInitial] representing a diff from the initial state.
1101    pub fn next<F: for<'a> FnMut(InspectDiff<'a, T>)>(
1102        &mut self,
1103        mut inspect_diff_fn: F,
1104    ) -> Option<&State<T>> {
1105        let diff = match self.diffs.pop() {
1106            Some(x) => x,
1107            None => return None,
1108        };
1109        let data = diff.data.clone();
1110        let diff = self
1111            .metrics
1112            .codecs
1113            .state_diff
1114            .decode(|| StateDiff::decode(&self.cfg.build_version, diff.data));
1115
1116        // A bit hacky, but the first diff in StateVersionsIter is always a
1117        // no-op.
1118        if diff.seqno_to == self.state.seqno {
1119            let inspect = InspectDiff::FromInitial(&self.state);
1120            #[cfg(debug_assertions)]
1121            {
1122                inspect
1123                    .referenced_blobs()
1124                    .for_each(|x| self.validator.add_inc_blob(x));
1125            }
1126            inspect_diff_fn(inspect);
1127        } else {
1128            let inspect = InspectDiff::Diff(&diff);
1129            #[cfg(debug_assertions)]
1130            {
1131                inspect
1132                    .referenced_blobs()
1133                    .for_each(|x| self.validator.add_inc_blob(x));
1134            }
1135            inspect_diff_fn(inspect);
1136        }
1137
1138        let diff_seqno_to = diff.seqno_to;
1139        self.state
1140            .apply_diffs(&self.metrics, std::iter::once((diff, data)));
1141        assert_eq!(self.state.seqno, diff_seqno_to);
1142        #[cfg(debug_assertions)]
1143        {
1144            self.validator.validate_against_state(&self.state);
1145        }
1146        Some(&self.state)
1147    }
1148
1149    pub fn state(&self) -> &State<T> {
1150        &self.state
1151    }
1152
1153    pub fn into_rollup_proto_without_diffs(&self) -> impl serde::Serialize + use<T> {
1154        Rollup::from_state_without_diffs(
1155            State {
1156                shard_id: self.state.shard_id.clone(),
1157                seqno: self.state.seqno.clone(),
1158                walltime_ms: self.state.walltime_ms.clone(),
1159                hostname: self.state.hostname.clone(),
1160                collections: self.state.collections.clone(),
1161            },
1162            self.key_codec.clone(),
1163            self.val_codec.clone(),
1164            T::codec_name(),
1165            self.diff_codec.clone(),
1166        )
1167        .into_proto()
1168    }
1169}
1170
1171/// This represents a diff, either directly or, in the case of the FromInitial
1172/// variant, a diff from the initial state. (We could instead compute the diff
1173/// from the initial state and replace this with only a `StateDiff<T>`, but don't
1174/// for efficiency.)
1175#[derive(Debug)]
1176pub enum InspectDiff<'a, T> {
1177    FromInitial(&'a State<T>),
1178    Diff(&'a StateDiff<T>),
1179}
1180
1181impl<T: Timestamp + Lattice + Codec64> InspectDiff<'_, T> {
1182    /// A callback invoked for each blob added this state transition.
1183    ///
1184    /// Blob removals, along with all other diffs, are ignored.
1185    pub fn referenced_blobs(&self) -> impl Iterator<Item = HollowBlobRef<'_, T>> {
1186        let (state, diff) = match self {
1187            InspectDiff::FromInitial(x) => (Some(x), None),
1188            InspectDiff::Diff(x) => (None, Some(x)),
1189        };
1190        let state_blobs = state.into_iter().flat_map(|s| s.blobs());
1191        let diff_blobs = diff.into_iter().flat_map(|d| d.blob_inserts());
1192        state_blobs.chain(diff_blobs)
1193    }
1194}
1195
1196#[cfg(debug_assertions)]
1197struct ReferencedBlobValidator<T> {
1198    // A copy of every batch and rollup referenced by some state iterator,
1199    // computed by scanning the full copy of state at each seqno.
1200    full_batches: BTreeSet<HollowBatch<T>>,
1201    full_rollups: BTreeSet<HollowRollup>,
1202    // A copy of every batch and rollup referenced by some state iterator,
1203    // computed incrementally.
1204    inc_batches: BTreeSet<HollowBatch<T>>,
1205    inc_rollups: BTreeSet<HollowRollup>,
1206}
1207
1208#[cfg(debug_assertions)]
1209impl<T> Default for ReferencedBlobValidator<T> {
1210    fn default() -> Self {
1211        Self {
1212            full_batches: Default::default(),
1213            full_rollups: Default::default(),
1214            inc_batches: Default::default(),
1215            inc_rollups: Default::default(),
1216        }
1217    }
1218}
1219
1220#[cfg(debug_assertions)]
1221impl<T: Timestamp + Lattice + Codec64> ReferencedBlobValidator<T> {
1222    fn add_inc_blob(&mut self, x: HollowBlobRef<'_, T>) {
1223        match x {
1224            HollowBlobRef::Batch(x) => assert!(
1225                self.inc_batches.insert(x.clone()) || x.desc.lower() == x.desc.upper(),
1226                "non-empty batches should only be appended once; duplicate: {x:?}"
1227            ),
1228            HollowBlobRef::Rollup(x) => assert!(self.inc_rollups.insert(x.clone())),
1229        }
1230    }
1231    fn validate_against_state(&mut self, x: &State<T>) {
1232        use std::hash::{DefaultHasher, Hash, Hasher};
1233
1234        use mz_ore::collections::HashSet;
1235        use timely::progress::Antichain;
1236
1237        use crate::internal::state::BatchPart;
1238
1239        x.blobs().for_each(|x| match x {
1240            HollowBlobRef::Batch(x) => {
1241                self.full_batches.insert(x.clone());
1242            }
1243            HollowBlobRef::Rollup(x) => {
1244                self.full_rollups.insert(x.clone());
1245            }
1246        });
1247
1248        // Check that the sets of batches overall cover the same pTVC.
1249        // Partial ordering means we can't just take the first and last batches; instead compute
1250        // bounds using the lattice operations.
1251        fn overall_desc<'a, T: Timestamp + Lattice>(
1252            iter: impl Iterator<Item = &'a Description<T>>,
1253        ) -> (Antichain<T>, Antichain<T>) {
1254            let mut lower = Antichain::new();
1255            let mut upper = Antichain::from_elem(T::minimum());
1256            for desc in iter {
1257                lower.meet_assign(desc.lower());
1258                upper.join_assign(desc.upper());
1259            }
1260            (lower, upper)
1261        }
1262        let (inc_lower, inc_upper) = overall_desc(self.inc_batches.iter().map(|a| &a.desc));
1263        let (full_lower, full_upper) = overall_desc(self.full_batches.iter().map(|a| &a.desc));
1264        assert_eq!(inc_lower, full_lower);
1265        assert_eq!(inc_upper, full_upper);
1266
1267        fn part_unique<T: Codec64>(x: &RunPart<T>) -> String {
1268            match x {
1269                RunPart::Single(BatchPart::Inline {
1270                    updates,
1271                    ts_rewrite,
1272                    ..
1273                }) => {
1274                    let mut h = DefaultHasher::new();
1275                    updates.hash(&mut h);
1276                    if let Some(frontier) = &ts_rewrite {
1277                        h.write_usize(frontier.len());
1278                        frontier.iter().for_each(|t| t.encode().hash(&mut h));
1279                    }
1280                    h.finish().to_string()
1281                }
1282                other => other.printable_name().to_string(),
1283            }
1284        }
1285
1286        // Check that the overall set of parts contained in both representations is the same.
1287        let inc_parts: HashSet<_> = self
1288            .inc_batches
1289            .iter()
1290            .flat_map(|x| x.parts.iter())
1291            .map(part_unique)
1292            .collect();
1293        let full_parts = self
1294            .full_batches
1295            .iter()
1296            .flat_map(|x| x.parts.iter())
1297            .map(part_unique)
1298            .collect();
1299        assert_eq!(inc_parts, full_parts);
1300
1301        // Check that both representations have the same rollups.
1302        assert_eq!(self.inc_rollups, self.full_rollups);
1303    }
1304}
1305
1306#[cfg(test)]
1307mod tests {
1308    use mz_dyncfg::ConfigUpdates;
1309
1310    use crate::tests::new_test_client;
1311
1312    use super::*;
1313
1314    /// Regression test for (part of) database-issues#5170, where an interrupted
1315    /// `bin/environmentd --reset` resulted in panic in persist usage code.
1316    #[mz_persist_proc::test(tokio::test)]
1317    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1318    async fn fetch_all_live_states_regression_uninitialized(dyncfgs: ConfigUpdates) {
1319        let client = new_test_client(&dyncfgs).await;
1320        let state_versions = StateVersions::new(
1321            client.cfg.clone(),
1322            Arc::clone(&client.consensus),
1323            Arc::clone(&client.blob),
1324            Arc::clone(&client.metrics),
1325        );
1326        assert!(
1327            state_versions
1328                .fetch_all_live_states::<u64>(ShardId::new())
1329                .await
1330                .is_none()
1331        );
1332    }
1333}