mz_persist_client/internal/
state_versions.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A durable, truncatable log of versions of [State].
11
12#[cfg(debug_assertions)]
13use std::collections::BTreeSet;
14use std::fmt::Debug;
15use std::ops::ControlFlow::{Break, Continue};
16use std::sync::Arc;
17use std::time::SystemTime;
18
19use bytes::Bytes;
20use differential_dataflow::difference::Monoid;
21use differential_dataflow::lattice::Lattice;
22use differential_dataflow::trace::Description;
23use mz_ore::cast::CastFrom;
24use mz_persist::location::{
25    Blob, CaSResult, Consensus, Indeterminate, SCAN_ALL, SeqNo, VersionedData,
26};
27use mz_persist::retry::Retry;
28use mz_persist_types::{Codec, Codec64};
29use mz_proto::RustType;
30use prost::Message;
31use timely::progress::Timestamp;
32use tracing::{Instrument, debug, debug_span, trace, warn};
33
34use crate::cfg::STATE_VERSIONS_RECENT_LIVE_DIFFS_LIMIT;
35use crate::error::{CodecMismatch, CodecMismatchT};
36use crate::internal::encoding::{Rollup, UntypedState};
37use crate::internal::machine::{retry_determinate, retry_external};
38use crate::internal::metrics::ShardMetrics;
39use crate::internal::paths::{BlobKey, PartialBlobKey, PartialRollupKey, RollupId};
40#[cfg(debug_assertions)]
41use crate::internal::state::HollowBatch;
42use crate::internal::state::{
43    BatchPart, HollowBlobRef, HollowRollup, NoOpStateTransition, RunPart, State, TypedState,
44};
45use crate::internal::state_diff::{StateDiff, StateFieldValDiff};
46use crate::{Metrics, PersistConfig, ShardId};
47
48/// A durable, truncatable log of versions of [State].
49///
50/// As persist metadata changes over time, we make its versions (each identified
51/// by a [SeqNo]) durable in two ways:
52/// - `rollups`: Periodic copies of the entirety of [State], written to [Blob].
53/// - `diffs`: Incremental [StateDiff]s, written to [Consensus].
54///
55/// The following invariants are maintained at all times:
56/// - A shard is initialized iff there is at least one version of it in
57///   Consensus.
58/// - The first version of state is written to `SeqNo(1)`. Each successive state
59///   version is assigned its predecessor's SeqNo +1.
60/// - `current`: The latest version of state. By definition, the largest SeqNo
61///   present in Consensus.
62/// - As state changes over time, we keep a range of consecutive versions
63///   available. These are periodically `truncated` to prune old versions that
64///   are no longer necessary.
65/// - `earliest`: The first version of state that it is possible to reconstruct.
66///   - Invariant: `earliest <= current.seqno_since()` (we don't garbage collect
67///     versions still being used by some reader).
68///   - Invariant: `earliest` is always the smallest Seqno present in Consensus.
69///     - This doesn't have to be true, but we select to enforce it.
70///     - Because the data stored at that smallest Seqno is an incremental diff,
71///       to make this invariant work, there needs to be a rollup at either
72///       `earliest-1` or `earliest`. We choose `earliest` because it seems to
73///       make the code easier to reason about in practice.
74///     - A consequence of the above is when we garbage collect old versions of
75///       state, we're only free to truncate ones that are `<` the latest rollup
76///       that is `<= current.seqno_since`.
77/// - `live diffs`: The set of SeqNos present in Consensus at any given time.
78/// - `live states`: The range of state versions that it is possible to
79///   reconstruct: `[earliest,current]`.
80///   - Because of earliest and current invariants above, the range of `live
81///     diffs` and `live states` are the same.
82/// - The set of known rollups are tracked in the shard state itself.
83///   - For efficiency of common operations, the most recent rollup's Blob key
84///     is always denormalized in each StateDiff written to Consensus. (As
85///     described above, there is always a rollup at earliest, so we're
86///     guaranteed that there is always at least one live rollup.)
87///   - Invariant: The rollups in `current` exist in Blob.
88///     - A consequence is that, if a rollup in a state you believe is `current`
89///       doesn't exist, it's a guarantee that `current` has changed (or it's a
90///       bug).
91///   - Any rollup at a version `< earliest-1` is useless (we've lost the
92///     incremental diffs between it and the live states). GC is tasked with
93///     deleting these rollups from Blob before truncating diffs from Consensus.
94///     Thus, any rollup at a seqno < earliest can be considered "leaked" and
95///     deleted by the leaked blob detector.
96///   - Note that this means, while `current`'s rollups exist, it will be common
97///     for other live states to reference rollups that no longer exist.
98#[derive(Debug)]
99pub struct StateVersions {
100    pub(crate) cfg: PersistConfig,
101    pub(crate) consensus: Arc<dyn Consensus>,
102    pub(crate) blob: Arc<dyn Blob>,
103    pub(crate) metrics: Arc<Metrics>,
104}
105
106#[derive(Debug, Clone)]
107pub struct RecentLiveDiffs(pub Vec<VersionedData>);
108
109#[derive(Debug, Clone)]
110pub struct EncodedRollup {
111    pub(crate) shard_id: ShardId,
112    pub(crate) seqno: SeqNo,
113    pub(crate) key: PartialRollupKey,
114    pub(crate) _desc: Description<SeqNo>,
115    buf: Bytes,
116}
117
118impl EncodedRollup {
119    pub fn to_hollow(&self) -> HollowRollup {
120        HollowRollup {
121            key: self.key.clone(),
122            encoded_size_bytes: Some(self.buf.len()),
123        }
124    }
125}
126
127impl StateVersions {
128    pub fn new(
129        cfg: PersistConfig,
130        consensus: Arc<dyn Consensus>,
131        blob: Arc<dyn Blob>,
132        metrics: Arc<Metrics>,
133    ) -> Self {
134        StateVersions {
135            cfg,
136            consensus,
137            blob,
138            metrics,
139        }
140    }
141
142    /// Fetches the `current` state of the requested shard, or creates it if
143    /// uninitialized.
144    pub async fn maybe_init_shard<K, V, T, D>(
145        &self,
146        shard_metrics: &ShardMetrics,
147    ) -> Result<TypedState<K, V, T, D>, Box<CodecMismatch>>
148    where
149        K: Debug + Codec,
150        V: Debug + Codec,
151        T: Timestamp + Lattice + Codec64,
152        D: Monoid + Codec64,
153    {
154        let shard_id = shard_metrics.shard_id;
155
156        // The common case is that the shard is initialized, so try that first
157        let recent_live_diffs = self.fetch_recent_live_diffs::<T>(&shard_id).await;
158        if !recent_live_diffs.0.is_empty() {
159            return self
160                .fetch_current_state(&shard_id, recent_live_diffs.0)
161                .await
162                .check_codecs(&shard_id);
163        }
164
165        // Shard is not initialized, try initializing it.
166        let (initial_state, initial_diff) = self.write_initial_rollup(shard_metrics).await;
167        let (cas_res, _diff) =
168            retry_external(&self.metrics.retries.external.maybe_init_cas, || async {
169                self.try_compare_and_set_current(
170                    "maybe_init_shard",
171                    shard_metrics,
172                    None,
173                    &initial_state,
174                    &initial_diff,
175                )
176                .await
177                .map_err(|err| err.into())
178            })
179            .await;
180        match cas_res {
181            CaSResult::Committed => Ok(initial_state),
182            CaSResult::ExpectationMismatch => {
183                let recent_live_diffs = self.fetch_recent_live_diffs::<T>(&shard_id).await;
184                let state = self
185                    .fetch_current_state(&shard_id, recent_live_diffs.0)
186                    .await
187                    .check_codecs(&shard_id);
188
189                // Clean up the rollup blob that we were trying to reference.
190                //
191                // SUBTLE: If we got an Indeterminate error in the CaS above,
192                // but it actually went through, then we'll "contend" with
193                // ourselves and get an expectation mismatch. Use the actual
194                // fetched state to determine if our rollup actually made it in
195                // and decide whether to delete based on that.
196                let (_, rollup) = initial_state.latest_rollup();
197                let should_delete_rollup = match state.as_ref() {
198                    Ok(state) => !state
199                        .collections
200                        .rollups
201                        .values()
202                        .any(|x| &x.key == &rollup.key),
203                    // If the codecs don't match, then we definitely didn't
204                    // write the state.
205                    Err(_codec_mismatch) => true,
206                };
207                if should_delete_rollup {
208                    self.delete_rollup(&shard_id, &rollup.key).await;
209                }
210
211                state
212            }
213        }
214    }
215
216    /// Updates the state of a shard to a new `current` iff `expected` matches
217    /// `current`.
218    ///
219    /// May be called on uninitialized shards.
220    pub async fn try_compare_and_set_current<K, V, T, D>(
221        &self,
222        cmd_name: &str,
223        shard_metrics: &ShardMetrics,
224        expected: Option<SeqNo>,
225        new_state: &TypedState<K, V, T, D>,
226        diff: &StateDiff<T>,
227    ) -> Result<(CaSResult, VersionedData), Indeterminate>
228    where
229        K: Debug + Codec,
230        V: Debug + Codec,
231        T: Timestamp + Lattice + Codec64,
232        D: Monoid + Codec64,
233    {
234        assert_eq!(shard_metrics.shard_id, new_state.shard_id);
235        let path = new_state.shard_id.to_string();
236
237        trace!(
238            "apply_unbatched_cmd {} attempting {}\n  new_state={:?}",
239            cmd_name,
240            new_state.seqno(),
241            new_state
242        );
243        let new = self.metrics.codecs.state_diff.encode(|| {
244            let mut buf = Vec::new();
245            diff.encode(&mut buf);
246            VersionedData {
247                seqno: new_state.seqno(),
248                data: Bytes::from(buf),
249            }
250        });
251        assert_eq!(new.seqno, diff.seqno_to);
252
253        let payload_len = new.data.len();
254        let cas_res = retry_determinate(
255            &self.metrics.retries.determinate.apply_unbatched_cmd_cas,
256            || async {
257                self.consensus
258                    .compare_and_set(&path, expected, new.clone())
259                    .await
260            },
261        )
262        .instrument(debug_span!("apply_unbatched_cmd::cas", payload_len))
263        .await
264        .map_err(|err| {
265            debug!("apply_unbatched_cmd {} errored: {}", cmd_name, err);
266            err
267        })?;
268
269        match cas_res {
270            CaSResult::Committed => {
271                trace!(
272                    "apply_unbatched_cmd {} succeeded {}\n  new_state={:?}",
273                    cmd_name,
274                    new_state.seqno(),
275                    new_state
276                );
277
278                shard_metrics.set_since(new_state.since());
279                shard_metrics.set_upper(new_state.upper());
280                shard_metrics.seqnos_since_last_rollup.set(
281                    new_state
282                        .seqno
283                        .0
284                        .saturating_sub(new_state.latest_rollup().0.0),
285                );
286                shard_metrics
287                    .spine_batch_count
288                    .set(u64::cast_from(new_state.spine_batch_count()));
289                let size_metrics = new_state.size_metrics();
290                shard_metrics
291                    .schema_registry_version_count
292                    .set(u64::cast_from(new_state.collections.schemas.len()));
293                shard_metrics
294                    .hollow_batch_count
295                    .set(u64::cast_from(size_metrics.hollow_batch_count));
296                shard_metrics
297                    .batch_part_count
298                    .set(u64::cast_from(size_metrics.batch_part_count));
299                shard_metrics
300                    .rewrite_part_count
301                    .set(u64::cast_from(size_metrics.rewrite_part_count));
302                shard_metrics
303                    .update_count
304                    .set(u64::cast_from(size_metrics.num_updates));
305                shard_metrics
306                    .rollup_count
307                    .set(u64::cast_from(size_metrics.state_rollup_count));
308                shard_metrics
309                    .largest_batch_size
310                    .set(u64::cast_from(size_metrics.largest_batch_bytes));
311                shard_metrics
312                    .usage_current_state_batches_bytes
313                    .set(u64::cast_from(size_metrics.state_batches_bytes));
314                shard_metrics
315                    .usage_current_state_rollups_bytes
316                    .set(u64::cast_from(size_metrics.state_rollups_bytes));
317                shard_metrics
318                    .seqnos_held
319                    .set(u64::cast_from(new_state.seqnos_held()));
320                shard_metrics
321                    .encoded_diff_size
322                    .inc_by(u64::cast_from(payload_len));
323                shard_metrics
324                    .live_writers
325                    .set(u64::cast_from(new_state.collections.writers.len()));
326                shard_metrics
327                    .rewrite_part_count
328                    .set(u64::cast_from(size_metrics.rewrite_part_count));
329                shard_metrics
330                    .inline_part_count
331                    .set(u64::cast_from(size_metrics.inline_part_count));
332                shard_metrics
333                    .inline_part_bytes
334                    .set(u64::cast_from(size_metrics.inline_part_bytes));
335                shard_metrics.stale_version.set(
336                    if new_state
337                        .state
338                        .collections
339                        .version
340                        .cmp_precedence(&self.cfg.build_version)
341                        .is_lt()
342                    {
343                        1
344                    } else {
345                        0
346                    },
347                );
348
349                let spine_metrics = new_state.collections.trace.spine_metrics();
350                shard_metrics
351                    .compact_batches
352                    .set(spine_metrics.compact_batches);
353                shard_metrics
354                    .compacting_batches
355                    .set(spine_metrics.compacting_batches);
356                shard_metrics
357                    .noncompact_batches
358                    .set(spine_metrics.noncompact_batches);
359
360                let batch_parts_by_version = new_state
361                    .collections
362                    .trace
363                    .batches()
364                    .flat_map(|x| x.parts.iter())
365                    .flat_map(|part| {
366                        let key = match part {
367                            RunPart::Many(x) => Some(&x.key),
368                            RunPart::Single(BatchPart::Hollow(x)) => Some(&x.key),
369                            // TODO: Would be nice to include these too, but we lose the info atm.
370                            RunPart::Single(BatchPart::Inline { .. }) => None,
371                        }?;
372                        // Carefully avoid any String allocs by splitting.
373                        let (writer_key, _) = key.0.split_once('/')?;
374                        match &writer_key[..1] {
375                            "w" => Some(("old", part.encoded_size_bytes())),
376                            "n" => Some((&writer_key[1..], part.encoded_size_bytes())),
377                            _ => None,
378                        }
379                    });
380                shard_metrics.set_batch_part_versions(batch_parts_by_version);
381
382                Ok((CaSResult::Committed, new))
383            }
384            CaSResult::ExpectationMismatch => {
385                debug!(
386                    "apply_unbatched_cmd {} {} lost the CaS race, retrying: {:?}",
387                    new_state.shard_id(),
388                    cmd_name,
389                    expected,
390                );
391                Ok((CaSResult::ExpectationMismatch, new))
392            }
393        }
394    }
395
396    /// Fetches the `current` state of the requested shard.
397    ///
398    /// Uses the provided hint (live_diffs), which is a possibly outdated
399    /// copy of all or recent live diffs, to avoid fetches where possible.
400    ///
401    /// Panics if called on an uninitialized shard.
402    pub async fn fetch_current_state<T>(
403        &self,
404        shard_id: &ShardId,
405        mut live_diffs: Vec<VersionedData>,
406    ) -> UntypedState<T>
407    where
408        T: Timestamp + Lattice + Codec64,
409    {
410        let retry = self
411            .metrics
412            .retries
413            .fetch_latest_state
414            .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
415        loop {
416            let latest_diff = live_diffs
417                .last()
418                .expect("initialized shard should have at least one diff");
419            let latest_diff = self
420                .metrics
421                .codecs
422                .state_diff
423                // Note: `latest_diff.data` is a `Bytes`, so cloning just increments a ref count
424                .decode(|| {
425                    StateDiff::<T>::decode(&self.cfg.build_version, latest_diff.data.clone())
426                });
427            let mut state = match self
428                .fetch_rollup_at_key(shard_id, &latest_diff.latest_rollup_key)
429                .await
430            {
431                Some(x) => x,
432                None => {
433                    // The rollup that this diff referenced is gone, so the diff
434                    // must be out of date. Try again. Intentionally don't sleep on retry.
435                    retry.retries.inc();
436                    let earliest_before_refetch = live_diffs
437                        .first()
438                        .expect("initialized shard should have at least one diff")
439                        .seqno;
440                    live_diffs = self.fetch_recent_live_diffs::<T>(shard_id).await.0;
441
442                    // We should only hit the race condition that leads to a
443                    // refetch if the set of live diffs changed out from under
444                    // us.
445                    //
446                    // TODO: Make this an assert once we're 100% sure the above
447                    // is always true.
448                    let earliest_after_refetch = live_diffs
449                        .first()
450                        .expect("initialized shard should have at least one diff")
451                        .seqno;
452                    if earliest_before_refetch >= earliest_after_refetch {
453                        warn!(
454                            concat!(
455                                "fetch_current_state refetch expects earliest live diff to advance: {} vs {}. ",
456                                "In dev and testing, this happens when persist's Blob (files in mzdata) ",
457                                "is deleted out from under it or when two processes are talking to ",
458                                "different Blobs (e.g. docker containers without it shared)."
459                            ),
460                            earliest_before_refetch, earliest_after_refetch
461                        )
462                    }
463                    continue;
464                }
465            };
466
467            state.apply_encoded_diffs(&self.cfg, &self.metrics, &live_diffs);
468            return state;
469        }
470    }
471
472    /// Returns an iterator over all live states for the requested shard.
473    ///
474    /// Returns None if called on an uninitialized shard.
475    pub async fn fetch_all_live_states<T>(
476        &self,
477        shard_id: ShardId,
478    ) -> Option<UntypedStateVersionsIter<T>>
479    where
480        T: Timestamp + Lattice + Codec64,
481    {
482        let retry = self
483            .metrics
484            .retries
485            .fetch_live_states
486            .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
487        let mut all_live_diffs = self.fetch_all_live_diffs(&shard_id).await;
488        loop {
489            let earliest_live_diff = match all_live_diffs.first() {
490                Some(x) => x,
491                None => return None,
492            };
493            let state = match self
494                .fetch_rollup_at_seqno(&shard_id, all_live_diffs.clone(), earliest_live_diff.seqno)
495                .await
496            {
497                Some(x) => x,
498                None => {
499                    // We maintain an invariant that a rollup always exists for
500                    // the earliest live diff. Since we didn't find out, that
501                    // can only mean that the live_diffs we just fetched are
502                    // obsolete (there's a race condition with gc). This should
503                    // be rare in practice, so inc a counter and try again.
504                    // Intentionally don't sleep on retry.
505                    retry.retries.inc();
506                    let earliest_before_refetch = earliest_live_diff.seqno;
507                    all_live_diffs = self.fetch_all_live_diffs(&shard_id).await;
508
509                    // We should only hit the race condition that leads to a
510                    // refetch if the set of live diffs changed out from under
511                    // us.
512                    //
513                    // TODO: Make this an assert once we're 100% sure the above
514                    // is always true.
515                    let earliest_after_refetch = all_live_diffs
516                        .first()
517                        .expect("initialized shard should have at least one diff")
518                        .seqno;
519                    if earliest_before_refetch >= earliest_after_refetch {
520                        warn!(
521                            concat!(
522                                "fetch_all_live_states refetch expects earliest live diff to advance: {} vs {}. ",
523                                "In dev and testing, this happens when persist's Blob (files in mzdata) ",
524                                "is deleted out from under it or when two processes are talking to ",
525                                "different Blobs (e.g. docker containers without it shared)."
526                            ),
527                            earliest_before_refetch, earliest_after_refetch
528                        )
529                    }
530                    continue;
531                }
532            };
533            assert_eq!(earliest_live_diff.seqno, state.seqno());
534            return Some(UntypedStateVersionsIter {
535                shard_id,
536                cfg: self.cfg.clone(),
537                metrics: Arc::clone(&self.metrics),
538                state,
539                diffs: all_live_diffs,
540            });
541        }
542    }
543
544    /// Fetches all live_diffs for a shard. Intended only for when a caller needs to reconstruct
545    /// _all_ states still referenced by Consensus. Prefer [Self::fetch_recent_live_diffs] when
546    /// the caller simply needs to fetch the latest state.
547    ///
548    /// Returns an empty Vec iff called on an uninitialized shard.
549    pub async fn fetch_all_live_diffs(&self, shard_id: &ShardId) -> Vec<VersionedData> {
550        let path = shard_id.to_string();
551        let diffs = retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
552            self.consensus.scan(&path, SeqNo::minimum(), SCAN_ALL).await
553        })
554        .instrument(debug_span!("fetch_state::scan"))
555        .await;
556        diffs
557    }
558
559    /// Fetches live diffs for a shard. This is a thin wrapper around [Consensus::scan] with the
560    /// right retry policy and instrumentation.
561    async fn fetch_live_diffs(
562        &self,
563        shard_id: &ShardId,
564        from: SeqNo,
565        limit: usize,
566    ) -> Vec<VersionedData> {
567        let path = shard_id.to_string();
568        retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
569            self.consensus.scan(&path, from, limit).await
570        })
571        .instrument(debug_span!("fetch_state::scan"))
572        .await
573    }
574
575    /// Fetches all live_diffs for a shard up to and including a given threshold, allowing us to
576    /// reconstruct states up to and including that version.
577    pub async fn fetch_live_diffs_through(
578        &self,
579        shard_id: &ShardId,
580        through: SeqNo,
581    ) -> Vec<VersionedData> {
582        // Get an initial set of versions from consensus.
583        let scan_limit = STATE_VERSIONS_RECENT_LIVE_DIFFS_LIMIT.get(&self.cfg);
584        let mut versions = self
585            .fetch_live_diffs(shard_id, SeqNo::minimum(), scan_limit)
586            .await;
587
588        if versions.len() == scan_limit {
589            // Loop until our version range either covers the full set, or we stop getting data.
590            loop {
591                let Some(last_seqno) = versions.last().map(|v| v.seqno) else {
592                    break;
593                };
594                if through <= last_seqno {
595                    break;
596                }
597                let from = last_seqno.next();
598                let limit = usize::cast_from(through.0 - last_seqno.0).clamp(1, 10 * scan_limit);
599                let more_versions = self.fetch_live_diffs(shard_id, from, limit).await;
600                let more_versions_len = more_versions.len();
601                if let Some(first) = more_versions.first() {
602                    assert!(last_seqno < first.seqno);
603                }
604                versions.extend(more_versions);
605                if more_versions_len < limit {
606                    break;
607                }
608            }
609        }
610        // We may have fetched more versions than requested; find the index past the last
611        // requested version and truncate there.
612        let partition_index = versions.partition_point(|v| v.seqno <= through);
613        versions.truncate(partition_index);
614        versions
615    }
616
617    /// Fetches recent live_diffs for a shard. Intended for when a caller needs to fetch
618    /// the latest state in Consensus.
619    ///
620    /// "Recent" is defined as either:
621    /// * All of the diffs known in Consensus
622    /// * All of the diffs in Consensus after the latest rollup
623    pub async fn fetch_recent_live_diffs<T>(&self, shard_id: &ShardId) -> RecentLiveDiffs
624    where
625        T: Timestamp + Lattice + Codec64,
626    {
627        let path = shard_id.to_string();
628        let scan_limit = STATE_VERSIONS_RECENT_LIVE_DIFFS_LIMIT.get(&self.cfg);
629        let oldest_diffs =
630            retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
631                self.consensus
632                    .scan(&path, SeqNo::minimum(), scan_limit)
633                    .await
634            })
635            .instrument(debug_span!("fetch_state::scan"))
636            .await;
637
638        // fast-path: we found all known diffs in a single page of our scan. we expect almost all
639        // calls to go down this path, unless a reader has a very long seqno-hold on the shard.
640        if oldest_diffs.len() < scan_limit {
641            self.metrics.state.fetch_recent_live_diffs_fast_path.inc();
642            return RecentLiveDiffs(oldest_diffs);
643        }
644
645        // slow-path: we could be arbitrarily far behind the head of Consensus (either intentionally
646        // due to a long seqno-hold from a reader, or unintentionally from a bug that's preventing
647        // a seqno-hold from advancing). rather than scanning a potentially unbounded number of old
648        // states in Consensus, we jump to the latest state, determine the seqno of the most recent
649        // rollup, and then fetch all the diffs from that point onward.
650        //
651        // this approach requires more network calls, but it should smooth out our access pattern
652        // and use only bounded calls to Consensus. additionally, if `limit` is adequately tuned,
653        // this path will only be invoked when there's an excess number of states in Consensus and
654        // it might be slower to do a single long scan over unneeded rows.
655        let head = retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
656            self.consensus.head(&path).await
657        })
658        .instrument(debug_span!("fetch_state::slow_path::head"))
659        .await
660        .expect("initialized shard should have at least 1 diff");
661
662        let latest_diff = self
663            .metrics
664            .codecs
665            .state_diff
666            .decode(|| StateDiff::<T>::decode(&self.cfg.build_version, head.data));
667
668        match BlobKey::parse_ids(&latest_diff.latest_rollup_key.complete(shard_id)) {
669            Ok((_shard_id, PartialBlobKey::Rollup(seqno, _rollup))) => {
670                self.metrics.state.fetch_recent_live_diffs_slow_path.inc();
671                let diffs =
672                    retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
673                        // (pedantry) this call is technically unbounded, but something very strange
674                        // would have had to happen to have accumulated so many states between our
675                        // call to `head` and this invocation for it to become problematic
676                        self.consensus.scan(&path, seqno, SCAN_ALL).await
677                    })
678                    .instrument(debug_span!("fetch_state::slow_path::scan"))
679                    .await;
680                RecentLiveDiffs(diffs)
681            }
682            Ok(_) => panic!(
683                "invalid state diff rollup key: {}",
684                latest_diff.latest_rollup_key
685            ),
686            Err(err) => panic!("unparseable state diff rollup key: {}", err),
687        }
688    }
689
690    /// Fetches all live diffs greater than the given SeqNo.
691    ///
692    /// TODO: Apply a limit to this scan. This could additionally be used as an internal
693    /// call within `fetch_recent_live_diffs`.
694    pub async fn fetch_all_live_diffs_gt_seqno<K, V, T, D>(
695        &self,
696        shard_id: &ShardId,
697        seqno: SeqNo,
698    ) -> Vec<VersionedData> {
699        let path = shard_id.to_string();
700        retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
701            self.consensus.scan(&path, seqno.next(), SCAN_ALL).await
702        })
703        .instrument(debug_span!("fetch_state::scan"))
704        .await
705    }
706
707    /// Truncates any diffs in consensus less than the given seqno.
708    pub async fn truncate_diffs(&self, shard_id: &ShardId, seqno: SeqNo) {
709        let path = shard_id.to_string();
710        let _deleted_count = retry_external(&self.metrics.retries.external.gc_truncate, || async {
711            self.consensus.truncate(&path, seqno).await
712        })
713        .instrument(debug_span!("gc::truncate"))
714        .await;
715    }
716
717    // Writes a self-referential rollup to blob storage and returns the diff
718    // that should be compare_and_set into consensus to finish initializing the
719    // shard.
720    async fn write_initial_rollup<K, V, T, D>(
721        &self,
722        shard_metrics: &ShardMetrics,
723    ) -> (TypedState<K, V, T, D>, StateDiff<T>)
724    where
725        K: Debug + Codec,
726        V: Debug + Codec,
727        T: Timestamp + Lattice + Codec64,
728        D: Monoid + Codec64,
729    {
730        let empty_state = TypedState::new(
731            self.cfg.build_version.clone(),
732            shard_metrics.shard_id,
733            self.cfg.hostname.clone(),
734            (self.cfg.now)(),
735        );
736        let rollup_seqno = empty_state.seqno.next();
737        let rollup = HollowRollup {
738            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
739            // Chicken-and-egg problem here. We don't know the size of the
740            // rollup until we encode it, but it includes a reference back to
741            // itself.
742            encoded_size_bytes: None,
743        };
744        let (applied, initial_state) = match empty_state
745            .clone_apply(&self.cfg, &mut |_, _, state| {
746                state.add_rollup((rollup_seqno, &rollup))
747            }) {
748            Continue(x) => x,
749            Break(NoOpStateTransition(_)) => {
750                panic!("initial state transition should not be a no-op")
751            }
752        };
753        assert!(
754            applied,
755            "add_and_remove_rollups should apply to the empty state"
756        );
757
758        let rollup = self.encode_rollup_blob(
759            shard_metrics,
760            initial_state.clone_for_rollup(),
761            vec![],
762            rollup.key,
763        );
764        let () = self.write_rollup_blob(&rollup).await;
765        assert_eq!(initial_state.seqno, rollup.seqno);
766
767        let diff = StateDiff::from_diff(&empty_state.state, &initial_state.state);
768        (initial_state, diff)
769    }
770
771    pub async fn write_rollup_for_state<K, V, T, D>(
772        &self,
773        shard_metrics: &ShardMetrics,
774        state: TypedState<K, V, T, D>,
775        rollup_id: &RollupId,
776    ) -> Option<EncodedRollup>
777    where
778        K: Debug + Codec,
779        V: Debug + Codec,
780        T: Timestamp + Lattice + Codec64,
781        D: Monoid + Codec64,
782    {
783        let (latest_rollup_seqno, _rollup) = state.latest_rollup();
784        let seqno = state.seqno();
785
786        // TODO: maintain the diffs since the latest rollup in-memory rather than
787        // needing an additional API call here. This would reduce Consensus load
788        // / avoid races with Consensus truncation, but is trickier to write.
789        let diffs: Vec<_> = self
790            .fetch_all_live_diffs_gt_seqno::<K, V, T, D>(&state.shard_id, *latest_rollup_seqno)
791            .await;
792
793        match diffs.first() {
794            None => {
795                // early-out because these are no diffs past our latest rollup.
796                //
797                // this should only occur in the initial state, but we can write a more
798                // general assertion: if no live diffs exist past this state's latest
799                // known rollup, then that rollup must be for the latest known state.
800                self.metrics.state.rollup_write_noop_latest.inc();
801                assert_eq!(seqno, *latest_rollup_seqno);
802                return None;
803            }
804            Some(first) => {
805                // early-out if it is no longer possible to inline all the diffs from
806                // the last known rollup to the current state. some or all of the diffs
807                // have already been truncated by another process.
808                //
809                // this can happen if one process gets told to write a rollup, the
810                // maintenance task falls arbitrarily behind, and another process writes
811                // a new rollup / GCs and truncates past the first process's rollup.
812                self.metrics.state.rollup_write_noop_truncated.inc();
813                if first.seqno != latest_rollup_seqno.next() {
814                    assert!(
815                        first.seqno > latest_rollup_seqno.next(),
816                        "diff: {}, rollup: {}",
817                        first.seqno,
818                        latest_rollup_seqno,
819                    );
820                    return None;
821                }
822            }
823        }
824
825        // we may have fetched more diffs than we need: trim anything beyond the state's seqno
826        let diffs: Vec<_> = diffs.into_iter().filter(|x| x.seqno <= seqno).collect();
827
828        // verify that we've done all the filtering correctly and that our
829        // diffs have seqnos bounded by (last_rollup, current_state]
830        assert_eq!(
831            diffs.first().map(|x| x.seqno),
832            Some(latest_rollup_seqno.next())
833        );
834        assert_eq!(diffs.last().map(|x| x.seqno), Some(state.seqno));
835
836        let key = PartialRollupKey::new(state.seqno, rollup_id);
837        let rollup = self.encode_rollup_blob(shard_metrics, state, diffs, key);
838        let () = self.write_rollup_blob(&rollup).await;
839
840        self.metrics.state.rollup_write_success.inc();
841
842        Some(rollup)
843    }
844
845    /// Encodes the given state and diffs as a rollup to be written to the specified key.
846    ///
847    /// The diffs must span the seqno range `(state.last_rollup().seqno, state.seqno]`.
848    pub fn encode_rollup_blob<K, V, T, D>(
849        &self,
850        shard_metrics: &ShardMetrics,
851        state: TypedState<K, V, T, D>,
852        diffs: Vec<VersionedData>,
853        key: PartialRollupKey,
854    ) -> EncodedRollup
855    where
856        K: Debug + Codec,
857        V: Debug + Codec,
858        T: Timestamp + Lattice + Codec64,
859        D: Monoid + Codec64,
860    {
861        let shard_id = state.shard_id;
862        let rollup_seqno = state.seqno;
863
864        let rollup = Rollup::from(state.into(), diffs);
865        let desc = rollup.diffs.as_ref().expect("inlined diffs").description();
866
867        let buf = self.metrics.codecs.state.encode(|| {
868            let mut buf = Vec::new();
869            rollup
870                .into_proto()
871                .encode(&mut buf)
872                .expect("no required fields means no initialization errors");
873            Bytes::from(buf)
874        });
875        shard_metrics
876            .latest_rollup_size
877            .set(u64::cast_from(buf.len()));
878        EncodedRollup {
879            shard_id,
880            seqno: rollup_seqno,
881            key,
882            buf,
883            _desc: desc,
884        }
885    }
886
887    /// Writes the given state rollup out to blob.
888    pub async fn write_rollup_blob(&self, rollup: &EncodedRollup) {
889        let payload_len = rollup.buf.len();
890        retry_external(&self.metrics.retries.external.rollup_set, || async {
891            self.blob
892                .set(
893                    &rollup.key.complete(&rollup.shard_id),
894                    Bytes::clone(&rollup.buf),
895                )
896                .await
897        })
898        .instrument(debug_span!("rollup::set", payload_len))
899        .await;
900    }
901
902    /// Fetches a rollup for the given SeqNo, if it exists.
903    ///
904    /// Uses the provided hint, which is a possibly outdated copy of all
905    /// or recent live diffs, to avoid fetches where possible.
906    ///
907    /// Panics if called on an uninitialized shard.
908    async fn fetch_rollup_at_seqno<T>(
909        &self,
910        shard_id: &ShardId,
911        live_diffs: Vec<VersionedData>,
912        seqno: SeqNo,
913    ) -> Option<UntypedState<T>>
914    where
915        T: Timestamp + Lattice + Codec64,
916    {
917        let rollup_key_for_migration = live_diffs.iter().find_map(|x| {
918            let diff = self
919                .metrics
920                .codecs
921                .state_diff
922                // Note: `x.data` is a `Bytes`, so cloning just increments a ref count
923                .decode(|| StateDiff::<T>::decode(&self.cfg.build_version, x.data.clone()));
924            diff.rollups
925                .iter()
926                .find(|x| x.key == seqno)
927                .map(|x| match &x.val {
928                    StateFieldValDiff::Insert(x) => x.clone(),
929                    StateFieldValDiff::Update(_, x) => x.clone(),
930                    StateFieldValDiff::Delete(x) => x.clone(),
931                })
932        });
933
934        let state = self.fetch_current_state::<T>(shard_id, live_diffs).await;
935        if let Some(rollup) = state.rollups().get(&seqno) {
936            return self.fetch_rollup_at_key(shard_id, &rollup.key).await;
937        }
938
939        // MIGRATION: We maintain an invariant that the _current state_ contains
940        // a rollup for the _earliest live diff_ in consensus (and that the
941        // referenced rollup exists). At one point, we fixed a bug that could
942        // lead to that invariant being violated.
943        //
944        // If the earliest live diff is X and we receive a gc req for X+Y to
945        // X+Y+Z (this can happen e.g. if some cmd ignores an earlier req for X
946        // to X+Y, or if they're processing concurrently and the X to X+Y req
947        // loses the race), then the buggy version of gc would delete any
948        // rollups strictly less than old_seqno_since (X+Y in this example). But
949        // our invariant is that the rollup exists for the earliest live diff,
950        // in this case X. So if the first call to gc was interrupted after this
951        // but before truncate (when all the blob deletes happen), later calls
952        // to gc would attempt to call `fetch_live_states` and end up infinitely
953        // in its loop.
954        //
955        // The fix was to base which rollups are deleteable on the earliest live
956        // diff, not old_seqno_since.
957        //
958        // Sadly, some envs in prod now violate this invariant. So, even with
959        // the fix, existing shards will never successfully run gc. We add a
960        // temporary migration to fix them in `fetch_rollup_at_seqno`. This
961        // method normally looks in the latest version of state for the
962        // specifically requested seqno. In the invariant violation case, some
963        // version of state in the range `[earliest, current]` has a rollup for
964        // earliest, but current doesn't. So, for the migration, if
965        // fetch_rollup_at_seqno doesn't find a rollup in current, then we fall
966        // back to sniffing one out of raw diffs. If this success, we increment
967        // a counter and log, so we can track how often this migration is
968        // bailing us out. After the next deploy, this should initially start at
969        // > 0 and then settle down to 0. After the next prod envs wipe, we can
970        // remove the migration.
971        let rollup = rollup_key_for_migration.expect("someone should have a key for this rollup");
972        tracing::info!("only found rollup for {} {} via migration", shard_id, seqno);
973        self.metrics.state.rollup_at_seqno_migration.inc();
974        self.fetch_rollup_at_key(shard_id, &rollup.key).await
975    }
976
977    /// Fetches the rollup at the given key, if it exists.
978    pub async fn fetch_rollup_at_key<T>(
979        &self,
980        shard_id: &ShardId,
981        rollup_key: &PartialRollupKey,
982    ) -> Option<UntypedState<T>>
983    where
984        T: Timestamp + Lattice + Codec64,
985    {
986        retry_external(&self.metrics.retries.external.rollup_get, || async {
987            self.blob.get(&rollup_key.complete(shard_id)).await
988        })
989        .instrument(debug_span!("rollup::get"))
990        .await
991        .map(|buf| {
992            self.metrics
993                .codecs
994                .state
995                .decode(|| UntypedState::decode(&self.cfg.build_version, buf))
996        })
997    }
998
999    /// Deletes the rollup at the given key, if it exists.
1000    pub async fn delete_rollup(&self, shard_id: &ShardId, key: &PartialRollupKey) {
1001        let _ = retry_external(&self.metrics.retries.external.rollup_delete, || async {
1002            self.blob.delete(&key.complete(shard_id)).await
1003        })
1004        .await
1005        .instrument(debug_span!("rollup::delete"));
1006    }
1007}
1008
1009pub struct UntypedStateVersionsIter<T> {
1010    shard_id: ShardId,
1011    cfg: PersistConfig,
1012    metrics: Arc<Metrics>,
1013    state: UntypedState<T>,
1014    diffs: Vec<VersionedData>,
1015}
1016
1017impl<T: Timestamp + Lattice + Codec64> UntypedStateVersionsIter<T> {
1018    pub(crate) fn new(
1019        shard_id: ShardId,
1020        cfg: PersistConfig,
1021        metrics: Arc<Metrics>,
1022        state: UntypedState<T>,
1023        diffs: Vec<VersionedData>,
1024    ) -> Self {
1025        Self {
1026            shard_id,
1027            cfg,
1028            metrics,
1029            state,
1030            diffs,
1031        }
1032    }
1033
1034    pub(crate) fn check_ts_codec(self) -> Result<StateVersionsIter<T>, CodecMismatchT> {
1035        let key_codec = self.state.key_codec.clone();
1036        let val_codec = self.state.val_codec.clone();
1037        let diff_codec = self.state.diff_codec.clone();
1038        let state = self.state.check_ts_codec(&self.shard_id)?;
1039        Ok(StateVersionsIter::new(
1040            self.cfg,
1041            self.metrics,
1042            state,
1043            self.diffs,
1044            key_codec,
1045            val_codec,
1046            diff_codec,
1047        ))
1048    }
1049}
1050
1051/// An iterator over consecutive versions of [State].
1052pub struct StateVersionsIter<T> {
1053    cfg: PersistConfig,
1054    metrics: Arc<Metrics>,
1055    state: State<T>,
1056    diffs: Vec<VersionedData>,
1057    key_codec: String,
1058    val_codec: String,
1059    diff_codec: String,
1060    #[cfg(debug_assertions)]
1061    validator: ReferencedBlobValidator<T>,
1062}
1063
1064impl<T: Timestamp + Lattice + Codec64> StateVersionsIter<T> {
1065    fn new(
1066        cfg: PersistConfig,
1067        metrics: Arc<Metrics>,
1068        state: State<T>,
1069        // diffs is stored reversed so we can efficiently pop off the Vec.
1070        mut diffs: Vec<VersionedData>,
1071        key_codec: String,
1072        val_codec: String,
1073        diff_codec: String,
1074    ) -> Self {
1075        assert!(diffs.first().map_or(true, |x| x.seqno == state.seqno));
1076        diffs.reverse();
1077        StateVersionsIter {
1078            cfg,
1079            metrics,
1080            state,
1081            diffs,
1082            key_codec,
1083            val_codec,
1084            diff_codec,
1085            #[cfg(debug_assertions)]
1086            validator: ReferencedBlobValidator::default(),
1087        }
1088    }
1089
1090    pub fn len(&self) -> usize {
1091        self.diffs.len()
1092    }
1093
1094    /// Advances first to some starting state (in practice, usually the first
1095    /// live state), and then through each successive state, for as many diffs
1096    /// as this iterator was initialized with.
1097    ///
1098    /// The `inspect_diff_fn` callback can be used to inspect diffs directly as
1099    /// they are applied. The first call to `next` returns a
1100    /// [InspectDiff::FromInitial] representing a diff from the initial state.
1101    pub fn next<F: for<'a> FnMut(InspectDiff<'a, T>)>(
1102        &mut self,
1103        mut inspect_diff_fn: F,
1104    ) -> Option<&State<T>> {
1105        let diff = match self.diffs.pop() {
1106            Some(x) => x,
1107            None => return None,
1108        };
1109        let data = diff.data.clone();
1110        let diff = self
1111            .metrics
1112            .codecs
1113            .state_diff
1114            .decode(|| StateDiff::decode(&self.cfg.build_version, diff.data));
1115
1116        // A bit hacky, but the first diff in StateVersionsIter is always a
1117        // no-op.
1118        if diff.seqno_to == self.state.seqno {
1119            let inspect = InspectDiff::FromInitial(&self.state);
1120            #[cfg(debug_assertions)]
1121            {
1122                inspect
1123                    .referenced_blobs()
1124                    .for_each(|x| self.validator.add_inc_blob(x));
1125            }
1126            inspect_diff_fn(inspect);
1127        } else {
1128            let inspect = InspectDiff::Diff(&diff);
1129            #[cfg(debug_assertions)]
1130            {
1131                inspect
1132                    .referenced_blobs()
1133                    .for_each(|x| self.validator.add_inc_blob(x));
1134            }
1135            inspect_diff_fn(inspect);
1136        }
1137
1138        let diff_seqno_to = diff.seqno_to;
1139        self.state
1140            .apply_diffs(&self.metrics, std::iter::once((diff, data)));
1141        assert_eq!(self.state.seqno, diff_seqno_to);
1142        #[cfg(debug_assertions)]
1143        {
1144            self.validator.validate_against_state(&self.state);
1145        }
1146        Some(&self.state)
1147    }
1148
1149    pub fn state(&self) -> &State<T> {
1150        &self.state
1151    }
1152
1153    pub fn into_rollup_proto_without_diffs(&self) -> impl serde::Serialize + use<T> {
1154        Rollup::from_state_without_diffs(
1155            State {
1156                shard_id: self.state.shard_id.clone(),
1157                seqno: self.state.seqno.clone(),
1158                walltime_ms: self.state.walltime_ms.clone(),
1159                hostname: self.state.hostname.clone(),
1160                collections: self.state.collections.clone(),
1161            },
1162            self.key_codec.clone(),
1163            self.val_codec.clone(),
1164            T::codec_name(),
1165            self.diff_codec.clone(),
1166        )
1167        .into_proto()
1168    }
1169}
1170
1171/// This represents a diff, either directly or, in the case of the FromInitial
1172/// variant, a diff from the initial state. (We could instead compute the diff
1173/// from the initial state and replace this with only a `StateDiff<T>`, but don't
1174/// for efficiency.)
1175#[derive(Debug)]
1176pub enum InspectDiff<'a, T> {
1177    FromInitial(&'a State<T>),
1178    Diff(&'a StateDiff<T>),
1179}
1180
1181impl<T: Timestamp + Lattice + Codec64> InspectDiff<'_, T> {
1182    /// A callback invoked for each blob added this state transition.
1183    ///
1184    /// Blob removals, along with all other diffs, are ignored.
1185    pub fn referenced_blobs(&self) -> impl Iterator<Item = HollowBlobRef<'_, T>> {
1186        let (state, diff) = match self {
1187            InspectDiff::FromInitial(x) => (Some(x), None),
1188            InspectDiff::Diff(x) => (None, Some(x)),
1189        };
1190        let state_blobs = state.into_iter().flat_map(|s| s.blobs());
1191        let diff_blobs = diff.into_iter().flat_map(|d| d.blob_inserts());
1192        state_blobs.chain(diff_blobs)
1193    }
1194}
1195
1196#[cfg(debug_assertions)]
1197struct ReferencedBlobValidator<T> {
1198    // A copy of every batch and rollup referenced by some state iterator,
1199    // computed by scanning the full copy of state at each seqno.
1200    full_batches: BTreeSet<HollowBatch<T>>,
1201    full_rollups: BTreeSet<HollowRollup>,
1202    // A copy of every batch and rollup referenced by some state iterator,
1203    // computed incrementally.
1204    inc_batches: BTreeSet<HollowBatch<T>>,
1205    inc_rollups: BTreeSet<HollowRollup>,
1206}
1207
1208#[cfg(debug_assertions)]
1209impl<T> Default for ReferencedBlobValidator<T> {
1210    fn default() -> Self {
1211        Self {
1212            full_batches: Default::default(),
1213            full_rollups: Default::default(),
1214            inc_batches: Default::default(),
1215            inc_rollups: Default::default(),
1216        }
1217    }
1218}
1219
1220#[cfg(debug_assertions)]
1221impl<T: Timestamp + Lattice + Codec64> ReferencedBlobValidator<T> {
1222    fn add_inc_blob(&mut self, x: HollowBlobRef<'_, T>) {
1223        match x {
1224            HollowBlobRef::Batch(x) => assert!(
1225                self.inc_batches.insert(x.clone()) || x.desc.lower() == x.desc.upper(),
1226                "non-empty batches should only be appended once; duplicate: {x:?}"
1227            ),
1228            HollowBlobRef::Rollup(x) => assert!(self.inc_rollups.insert(x.clone())),
1229        }
1230    }
1231    fn validate_against_state(&mut self, x: &State<T>) {
1232        use std::hash::{DefaultHasher, Hash, Hasher};
1233
1234        use mz_ore::collections::HashSet;
1235        use timely::progress::Antichain;
1236
1237        use crate::internal::state::BatchPart;
1238
1239        x.blobs().for_each(|x| match x {
1240            HollowBlobRef::Batch(x) => {
1241                self.full_batches.insert(x.clone());
1242            }
1243            HollowBlobRef::Rollup(x) => {
1244                self.full_rollups.insert(x.clone());
1245            }
1246        });
1247
1248        // Check that the sets of batches overall cover the same pTVC.
1249        // Partial ordering means we can't just take the first and last batches; instead compute
1250        // bounds using the lattice operations.
1251        fn overall_desc<'a, T: Timestamp + Lattice>(
1252            iter: impl Iterator<Item = &'a Description<T>>,
1253        ) -> (Antichain<T>, Antichain<T>) {
1254            let mut lower = Antichain::new();
1255            let mut upper = Antichain::from_elem(T::minimum());
1256            for desc in iter {
1257                lower.meet_assign(desc.lower());
1258                upper.join_assign(desc.upper());
1259            }
1260            (lower, upper)
1261        }
1262        let (inc_lower, inc_upper) = overall_desc(self.inc_batches.iter().map(|a| &a.desc));
1263        let (full_lower, full_upper) = overall_desc(self.full_batches.iter().map(|a| &a.desc));
1264        assert_eq!(inc_lower, full_lower);
1265        assert_eq!(inc_upper, full_upper);
1266
1267        fn part_unique<T: Codec64>(x: &RunPart<T>) -> String {
1268            match x {
1269                RunPart::Single(BatchPart::Inline {
1270                    updates,
1271                    ts_rewrite,
1272                    ..
1273                }) => {
1274                    let mut h = DefaultHasher::new();
1275                    updates.hash(&mut h);
1276                    if let Some(frontier) = &ts_rewrite {
1277                        h.write_usize(frontier.len());
1278                        frontier.iter().for_each(|t| t.encode().hash(&mut h));
1279                    }
1280                    h.finish().to_string()
1281                }
1282                other => other.printable_name().to_string(),
1283            }
1284        }
1285
1286        // Check that the overall set of parts contained in both representations is the same.
1287        let inc_parts: HashSet<_> = self
1288            .inc_batches
1289            .iter()
1290            .flat_map(|x| x.parts.iter())
1291            .map(part_unique)
1292            .collect();
1293        let full_parts = self
1294            .full_batches
1295            .iter()
1296            .flat_map(|x| x.parts.iter())
1297            .map(part_unique)
1298            .collect();
1299        assert_eq!(inc_parts, full_parts);
1300
1301        // Check that both representations have the same rollups.
1302        assert_eq!(self.inc_rollups, self.full_rollups);
1303    }
1304}
1305
1306#[cfg(test)]
1307mod tests {
1308    use mz_dyncfg::ConfigUpdates;
1309
1310    use crate::tests::new_test_client;
1311
1312    use super::*;
1313
1314    /// Regression test for (part of) database-issues#5170, where an interrupted
1315    /// `bin/environmentd --reset` resulted in panic in persist usage code.
1316    #[mz_persist_proc::test(tokio::test)]
1317    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1318    async fn fetch_all_live_states_regression_uninitialized(dyncfgs: ConfigUpdates) {
1319        let client = new_test_client(&dyncfgs).await;
1320        let state_versions = StateVersions::new(
1321            client.cfg.clone(),
1322            Arc::clone(&client.consensus),
1323            Arc::clone(&client.blob),
1324            Arc::clone(&client.metrics),
1325        );
1326        assert!(
1327            state_versions
1328                .fetch_all_live_states::<u64>(ShardId::new())
1329                .await
1330                .is_none()
1331        );
1332    }
1333}