mz_persist_client/internal/
state_versions.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A durable, truncatable log of versions of [State].
11
12#[cfg(debug_assertions)]
13use std::collections::BTreeSet;
14use std::fmt::Debug;
15use std::ops::ControlFlow::{Break, Continue};
16use std::sync::Arc;
17use std::time::SystemTime;
18
19use bytes::Bytes;
20use differential_dataflow::difference::Semigroup;
21use differential_dataflow::lattice::Lattice;
22use differential_dataflow::trace::Description;
23use mz_ore::cast::CastFrom;
24use mz_persist::location::{
25    Blob, CaSResult, Consensus, Indeterminate, SCAN_ALL, SeqNo, VersionedData,
26};
27use mz_persist::retry::Retry;
28use mz_persist_types::{Codec, Codec64};
29use mz_proto::RustType;
30use prost::Message;
31use timely::progress::Timestamp;
32use tracing::{Instrument, debug, debug_span, trace, warn};
33
34use crate::cfg::STATE_VERSIONS_RECENT_LIVE_DIFFS_LIMIT;
35use crate::error::{CodecMismatch, CodecMismatchT};
36use crate::internal::encoding::{Rollup, UntypedState};
37use crate::internal::machine::{retry_determinate, retry_external};
38use crate::internal::metrics::ShardMetrics;
39use crate::internal::paths::{BlobKey, PartialBlobKey, PartialRollupKey, RollupId};
40#[cfg(debug_assertions)]
41use crate::internal::state::HollowBatch;
42use crate::internal::state::{
43    BatchPart, HollowBlobRef, HollowRollup, NoOpStateTransition, RunPart, State, TypedState,
44};
45use crate::internal::state_diff::{StateDiff, StateFieldValDiff};
46use crate::{Metrics, PersistConfig, ShardId};
47
48/// A durable, truncatable log of versions of [State].
49///
50/// As persist metadata changes over time, we make its versions (each identified
51/// by a [SeqNo]) durable in two ways:
52/// - `rollups`: Periodic copies of the entirety of [State], written to [Blob].
53/// - `diffs`: Incremental [StateDiff]s, written to [Consensus].
54///
55/// The following invariants are maintained at all times:
56/// - A shard is initialized iff there is at least one version of it in
57///   Consensus.
58/// - The first version of state is written to `SeqNo(1)`. Each successive state
59///   version is assigned its predecessor's SeqNo +1.
60/// - `current`: The latest version of state. By definition, the largest SeqNo
61///   present in Consensus.
62/// - As state changes over time, we keep a range of consecutive versions
63///   available. These are periodically `truncated` to prune old versions that
64///   are no longer necessary.
65/// - `earliest`: The first version of state that it is possible to reconstruct.
66///   - Invariant: `earliest <= current.seqno_since()` (we don't garbage collect
67///     versions still being used by some reader).
68///   - Invariant: `earliest` is always the smallest Seqno present in Consensus.
69///     - This doesn't have to be true, but we select to enforce it.
70///     - Because the data stored at that smallest Seqno is an incremental diff,
71///       to make this invariant work, there needs to be a rollup at either
72///       `earliest-1` or `earliest`. We choose `earliest` because it seems to
73///       make the code easier to reason about in practice.
74///     - A consequence of the above is when we garbage collect old versions of
75///       state, we're only free to truncate ones that are `<` the latest rollup
76///       that is `<= current.seqno_since`.
77/// - `live diffs`: The set of SeqNos present in Consensus at any given time.
78/// - `live states`: The range of state versions that it is possible to
79///   reconstruct: `[earliest,current]`.
80///   - Because of earliest and current invariants above, the range of `live
81///     diffs` and `live states` are the same.
82/// - The set of known rollups are tracked in the shard state itself.
83///   - For efficiency of common operations, the most recent rollup's Blob key
84///     is always denormalized in each StateDiff written to Consensus. (As
85///     described above, there is always a rollup at earliest, so we're
86///     guaranteed that there is always at least one live rollup.)
87///   - Invariant: The rollups in `current` exist in Blob.
88///     - A consequence is that, if a rollup in a state you believe is `current`
89///       doesn't exist, it's a guarantee that `current` has changed (or it's a
90///       bug).
91///   - Any rollup at a version `< earliest-1` is useless (we've lost the
92///     incremental diffs between it and the live states). GC is tasked with
93///     deleting these rollups from Blob before truncating diffs from Consensus.
94///     Thus, any rollup at a seqno < earliest can be considered "leaked" and
95///     deleted by the leaked blob detector.
96///   - Note that this means, while `current`'s rollups exist, it will be common
97///     for other live states to reference rollups that no longer exist.
98#[derive(Debug)]
99pub struct StateVersions {
100    pub(crate) cfg: PersistConfig,
101    pub(crate) consensus: Arc<dyn Consensus>,
102    pub(crate) blob: Arc<dyn Blob>,
103    pub(crate) metrics: Arc<Metrics>,
104}
105
106#[derive(Debug, Clone)]
107pub struct RecentLiveDiffs(pub Vec<VersionedData>);
108
109#[derive(Debug, Clone)]
110pub struct AllLiveDiffs(pub Vec<VersionedData>);
111
112#[derive(Debug, Clone)]
113pub struct EncodedRollup {
114    pub(crate) shard_id: ShardId,
115    pub(crate) seqno: SeqNo,
116    pub(crate) key: PartialRollupKey,
117    pub(crate) _desc: Description<SeqNo>,
118    buf: Bytes,
119}
120
121impl EncodedRollup {
122    pub fn to_hollow(&self) -> HollowRollup {
123        HollowRollup {
124            key: self.key.clone(),
125            encoded_size_bytes: Some(self.buf.len()),
126        }
127    }
128}
129
130impl StateVersions {
131    pub fn new(
132        cfg: PersistConfig,
133        consensus: Arc<dyn Consensus>,
134        blob: Arc<dyn Blob>,
135        metrics: Arc<Metrics>,
136    ) -> Self {
137        StateVersions {
138            cfg,
139            consensus,
140            blob,
141            metrics,
142        }
143    }
144
145    /// Fetches the `current` state of the requested shard, or creates it if
146    /// uninitialized.
147    pub async fn maybe_init_shard<K, V, T, D>(
148        &self,
149        shard_metrics: &ShardMetrics,
150    ) -> Result<TypedState<K, V, T, D>, Box<CodecMismatch>>
151    where
152        K: Debug + Codec,
153        V: Debug + Codec,
154        T: Timestamp + Lattice + Codec64,
155        D: Semigroup + Codec64,
156    {
157        let shard_id = shard_metrics.shard_id;
158
159        // The common case is that the shard is initialized, so try that first
160        let recent_live_diffs = self.fetch_recent_live_diffs::<T>(&shard_id).await;
161        if !recent_live_diffs.0.is_empty() {
162            return self
163                .fetch_current_state(&shard_id, recent_live_diffs.0)
164                .await
165                .check_codecs(&shard_id);
166        }
167
168        // Shard is not initialized, try initializing it.
169        let (initial_state, initial_diff) = self.write_initial_rollup(shard_metrics).await;
170        let (cas_res, _diff) =
171            retry_external(&self.metrics.retries.external.maybe_init_cas, || async {
172                self.try_compare_and_set_current(
173                    "maybe_init_shard",
174                    shard_metrics,
175                    None,
176                    &initial_state,
177                    &initial_diff,
178                )
179                .await
180                .map_err(|err| err.into())
181            })
182            .await;
183        match cas_res {
184            CaSResult::Committed => Ok(initial_state),
185            CaSResult::ExpectationMismatch => {
186                let recent_live_diffs = self.fetch_recent_live_diffs::<T>(&shard_id).await;
187                let state = self
188                    .fetch_current_state(&shard_id, recent_live_diffs.0)
189                    .await
190                    .check_codecs(&shard_id);
191
192                // Clean up the rollup blob that we were trying to reference.
193                //
194                // SUBTLE: If we got an Indeterminate error in the CaS above,
195                // but it actually went through, then we'll "contend" with
196                // ourselves and get an expectation mismatch. Use the actual
197                // fetched state to determine if our rollup actually made it in
198                // and decide whether to delete based on that.
199                let (_, rollup) = initial_state.latest_rollup();
200                let should_delete_rollup = match state.as_ref() {
201                    Ok(state) => !state
202                        .collections
203                        .rollups
204                        .values()
205                        .any(|x| &x.key == &rollup.key),
206                    // If the codecs don't match, then we definitely didn't
207                    // write the state.
208                    Err(_codec_mismatch) => true,
209                };
210                if should_delete_rollup {
211                    self.delete_rollup(&shard_id, &rollup.key).await;
212                }
213
214                state
215            }
216        }
217    }
218
219    /// Updates the state of a shard to a new `current` iff `expected` matches
220    /// `current`.
221    ///
222    /// May be called on uninitialized shards.
223    pub async fn try_compare_and_set_current<K, V, T, D>(
224        &self,
225        cmd_name: &str,
226        shard_metrics: &ShardMetrics,
227        expected: Option<SeqNo>,
228        new_state: &TypedState<K, V, T, D>,
229        diff: &StateDiff<T>,
230    ) -> Result<(CaSResult, VersionedData), Indeterminate>
231    where
232        K: Debug + Codec,
233        V: Debug + Codec,
234        T: Timestamp + Lattice + Codec64,
235        D: Semigroup + Codec64,
236    {
237        assert_eq!(shard_metrics.shard_id, new_state.shard_id);
238        let path = new_state.shard_id.to_string();
239
240        trace!(
241            "apply_unbatched_cmd {} attempting {}\n  new_state={:?}",
242            cmd_name,
243            new_state.seqno(),
244            new_state
245        );
246        let new = self.metrics.codecs.state_diff.encode(|| {
247            let mut buf = Vec::new();
248            diff.encode(&mut buf);
249            VersionedData {
250                seqno: new_state.seqno(),
251                data: Bytes::from(buf),
252            }
253        });
254        assert_eq!(new.seqno, diff.seqno_to);
255
256        let payload_len = new.data.len();
257        let cas_res = retry_determinate(
258            &self.metrics.retries.determinate.apply_unbatched_cmd_cas,
259            || async {
260                self.consensus
261                    .compare_and_set(&path, expected, new.clone())
262                    .await
263            },
264        )
265        .instrument(debug_span!("apply_unbatched_cmd::cas", payload_len))
266        .await
267        .map_err(|err| {
268            debug!("apply_unbatched_cmd {} errored: {}", cmd_name, err);
269            err
270        })?;
271
272        match cas_res {
273            CaSResult::Committed => {
274                trace!(
275                    "apply_unbatched_cmd {} succeeded {}\n  new_state={:?}",
276                    cmd_name,
277                    new_state.seqno(),
278                    new_state
279                );
280
281                shard_metrics.set_since(new_state.since());
282                shard_metrics.set_upper(new_state.upper());
283                shard_metrics.seqnos_since_last_rollup.set(
284                    new_state
285                        .seqno
286                        .0
287                        .saturating_sub(new_state.latest_rollup().0.0),
288                );
289                shard_metrics
290                    .spine_batch_count
291                    .set(u64::cast_from(new_state.spine_batch_count()));
292                let size_metrics = new_state.size_metrics();
293                shard_metrics
294                    .schema_registry_version_count
295                    .set(u64::cast_from(new_state.collections.schemas.len()));
296                shard_metrics
297                    .hollow_batch_count
298                    .set(u64::cast_from(size_metrics.hollow_batch_count));
299                shard_metrics
300                    .batch_part_count
301                    .set(u64::cast_from(size_metrics.batch_part_count));
302                shard_metrics
303                    .rewrite_part_count
304                    .set(u64::cast_from(size_metrics.rewrite_part_count));
305                shard_metrics
306                    .update_count
307                    .set(u64::cast_from(size_metrics.num_updates));
308                shard_metrics
309                    .rollup_count
310                    .set(u64::cast_from(size_metrics.state_rollup_count));
311                shard_metrics
312                    .largest_batch_size
313                    .set(u64::cast_from(size_metrics.largest_batch_bytes));
314                shard_metrics
315                    .usage_current_state_batches_bytes
316                    .set(u64::cast_from(size_metrics.state_batches_bytes));
317                shard_metrics
318                    .usage_current_state_rollups_bytes
319                    .set(u64::cast_from(size_metrics.state_rollups_bytes));
320                shard_metrics
321                    .seqnos_held
322                    .set(u64::cast_from(new_state.seqnos_held()));
323                shard_metrics
324                    .encoded_diff_size
325                    .inc_by(u64::cast_from(payload_len));
326                shard_metrics
327                    .live_writers
328                    .set(u64::cast_from(new_state.collections.writers.len()));
329                shard_metrics
330                    .rewrite_part_count
331                    .set(u64::cast_from(size_metrics.rewrite_part_count));
332                shard_metrics
333                    .inline_part_count
334                    .set(u64::cast_from(size_metrics.inline_part_count));
335                shard_metrics
336                    .inline_part_bytes
337                    .set(u64::cast_from(size_metrics.inline_part_bytes));
338
339                let spine_metrics = new_state.collections.trace.spine_metrics();
340                shard_metrics
341                    .compact_batches
342                    .set(spine_metrics.compact_batches);
343                shard_metrics
344                    .compacting_batches
345                    .set(spine_metrics.compacting_batches);
346                shard_metrics
347                    .noncompact_batches
348                    .set(spine_metrics.noncompact_batches);
349
350                let batch_parts_by_version = new_state
351                    .collections
352                    .trace
353                    .batches()
354                    .flat_map(|x| x.parts.iter())
355                    .flat_map(|part| {
356                        let key = match part {
357                            RunPart::Many(x) => Some(&x.key),
358                            RunPart::Single(BatchPart::Hollow(x)) => Some(&x.key),
359                            // TODO: Would be nice to include these too, but we lose the info atm.
360                            RunPart::Single(BatchPart::Inline { .. }) => None,
361                        }?;
362                        // Carefully avoid any String allocs by splitting.
363                        let (writer_key, _) = key.0.split_once('/')?;
364                        match &writer_key[..1] {
365                            "w" => Some(("old", part.encoded_size_bytes())),
366                            "n" => Some((&writer_key[1..], part.encoded_size_bytes())),
367                            _ => None,
368                        }
369                    });
370                shard_metrics.set_batch_part_versions(batch_parts_by_version);
371
372                Ok((CaSResult::Committed, new))
373            }
374            CaSResult::ExpectationMismatch => {
375                debug!(
376                    "apply_unbatched_cmd {} {} lost the CaS race, retrying: {:?}",
377                    new_state.shard_id(),
378                    cmd_name,
379                    expected,
380                );
381                Ok((CaSResult::ExpectationMismatch, new))
382            }
383        }
384    }
385
386    /// Fetches the `current` state of the requested shard.
387    ///
388    /// Uses the provided hint (live_diffs), which is a possibly outdated
389    /// copy of all or recent live diffs, to avoid fetches where possible.
390    ///
391    /// Panics if called on an uninitialized shard.
392    pub async fn fetch_current_state<T>(
393        &self,
394        shard_id: &ShardId,
395        mut live_diffs: Vec<VersionedData>,
396    ) -> UntypedState<T>
397    where
398        T: Timestamp + Lattice + Codec64,
399    {
400        let retry = self
401            .metrics
402            .retries
403            .fetch_latest_state
404            .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
405        loop {
406            let latest_diff = live_diffs
407                .last()
408                .expect("initialized shard should have at least one diff");
409            let latest_diff = self
410                .metrics
411                .codecs
412                .state_diff
413                // Note: `latest_diff.data` is a `Bytes`, so cloning just increments a ref count
414                .decode(|| {
415                    StateDiff::<T>::decode(&self.cfg.build_version, latest_diff.data.clone())
416                });
417            let mut state = match self
418                .fetch_rollup_at_key(shard_id, &latest_diff.latest_rollup_key)
419                .await
420            {
421                Some(x) => x,
422                None => {
423                    // The rollup that this diff referenced is gone, so the diff
424                    // must be out of date. Try again. Intentionally don't sleep on retry.
425                    retry.retries.inc();
426                    let earliest_before_refetch = live_diffs
427                        .first()
428                        .expect("initialized shard should have at least one diff")
429                        .seqno;
430                    live_diffs = self.fetch_recent_live_diffs::<T>(shard_id).await.0;
431
432                    // We should only hit the race condition that leads to a
433                    // refetch if the set of live diffs changed out from under
434                    // us.
435                    //
436                    // TODO: Make this an assert once we're 100% sure the above
437                    // is always true.
438                    let earliest_after_refetch = live_diffs
439                        .first()
440                        .expect("initialized shard should have at least one diff")
441                        .seqno;
442                    if earliest_before_refetch >= earliest_after_refetch {
443                        warn!(
444                            concat!(
445                                "fetch_current_state refetch expects earliest live diff to advance: {} vs {}. ",
446                                "In dev and testing, this happens when persist's Blob (files in mzdata) ",
447                                "is deleted out from under it or when two processes are talking to ",
448                                "different Blobs (e.g. docker containers without it shared)."
449                            ),
450                            earliest_before_refetch, earliest_after_refetch
451                        )
452                    }
453                    continue;
454                }
455            };
456
457            state.apply_encoded_diffs(&self.cfg, &self.metrics, &live_diffs);
458            return state;
459        }
460    }
461
462    /// Returns an iterator over all live states for the requested shard.
463    ///
464    /// Returns None if called on an uninitialized shard.
465    pub async fn fetch_all_live_states<T>(
466        &self,
467        shard_id: ShardId,
468    ) -> Option<UntypedStateVersionsIter<T>>
469    where
470        T: Timestamp + Lattice + Codec64,
471    {
472        let retry = self
473            .metrics
474            .retries
475            .fetch_live_states
476            .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
477        let mut all_live_diffs = self.fetch_all_live_diffs(&shard_id).await;
478        loop {
479            let earliest_live_diff = match all_live_diffs.0.first() {
480                Some(x) => x,
481                None => return None,
482            };
483            let state = match self
484                .fetch_rollup_at_seqno(
485                    &shard_id,
486                    all_live_diffs.0.clone(),
487                    earliest_live_diff.seqno,
488                )
489                .await
490            {
491                Some(x) => x,
492                None => {
493                    // We maintain an invariant that a rollup always exists for
494                    // the earliest live diff. Since we didn't find out, that
495                    // can only mean that the live_diffs we just fetched are
496                    // obsolete (there's a race condition with gc). This should
497                    // be rare in practice, so inc a counter and try again.
498                    // Intentionally don't sleep on retry.
499                    retry.retries.inc();
500                    let earliest_before_refetch = earliest_live_diff.seqno;
501                    all_live_diffs = self.fetch_all_live_diffs(&shard_id).await;
502
503                    // We should only hit the race condition that leads to a
504                    // refetch if the set of live diffs changed out from under
505                    // us.
506                    //
507                    // TODO: Make this an assert once we're 100% sure the above
508                    // is always true.
509                    let earliest_after_refetch = all_live_diffs
510                        .0
511                        .first()
512                        .expect("initialized shard should have at least one diff")
513                        .seqno;
514                    if earliest_before_refetch >= earliest_after_refetch {
515                        warn!(
516                            concat!(
517                                "fetch_all_live_states refetch expects earliest live diff to advance: {} vs {}. ",
518                                "In dev and testing, this happens when persist's Blob (files in mzdata) ",
519                                "is deleted out from under it or when two processes are talking to ",
520                                "different Blobs (e.g. docker containers without it shared)."
521                            ),
522                            earliest_before_refetch, earliest_after_refetch
523                        )
524                    }
525                    continue;
526                }
527            };
528            assert_eq!(earliest_live_diff.seqno, state.seqno());
529            return Some(UntypedStateVersionsIter {
530                shard_id,
531                cfg: self.cfg.clone(),
532                metrics: Arc::clone(&self.metrics),
533                state,
534                diffs: all_live_diffs.0,
535            });
536        }
537    }
538
539    /// Fetches all live_diffs for a shard. Intended only for when a caller needs to reconstruct
540    /// _all_ states still referenced by Consensus. Prefer [Self::fetch_recent_live_diffs] when
541    /// the caller simply needs to fetch the latest state.
542    ///
543    /// Returns an empty Vec iff called on an uninitialized shard.
544    pub async fn fetch_all_live_diffs(&self, shard_id: &ShardId) -> AllLiveDiffs {
545        let path = shard_id.to_string();
546        let diffs = retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
547            self.consensus.scan(&path, SeqNo::minimum(), SCAN_ALL).await
548        })
549        .instrument(debug_span!("fetch_state::scan"))
550        .await;
551        AllLiveDiffs(diffs)
552    }
553
554    /// Fetches recent live_diffs for a shard. Intended for when a caller needs to fetch
555    /// the latest state in Consensus.
556    ///
557    /// "Recent" is defined as either:
558    /// * All of the diffs known in Consensus
559    /// * All of the diffs in Consensus after the latest rollup
560    pub async fn fetch_recent_live_diffs<T>(&self, shard_id: &ShardId) -> RecentLiveDiffs
561    where
562        T: Timestamp + Lattice + Codec64,
563    {
564        let path = shard_id.to_string();
565        let scan_limit = STATE_VERSIONS_RECENT_LIVE_DIFFS_LIMIT.get(&self.cfg);
566        let oldest_diffs =
567            retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
568                self.consensus
569                    .scan(&path, SeqNo::minimum(), scan_limit)
570                    .await
571            })
572            .instrument(debug_span!("fetch_state::scan"))
573            .await;
574
575        // fast-path: we found all known diffs in a single page of our scan. we expect almost all
576        // calls to go down this path, unless a reader has a very long seqno-hold on the shard.
577        if oldest_diffs.len() < scan_limit {
578            self.metrics.state.fetch_recent_live_diffs_fast_path.inc();
579            return RecentLiveDiffs(oldest_diffs);
580        }
581
582        // slow-path: we could be arbitrarily far behind the head of Consensus (either intentionally
583        // due to a long seqno-hold from a reader, or unintentionally from a bug that's preventing
584        // a seqno-hold from advancing). rather than scanning a potentially unbounded number of old
585        // states in Consensus, we jump to the latest state, determine the seqno of the most recent
586        // rollup, and then fetch all the diffs from that point onward.
587        //
588        // this approach requires more network calls, but it should smooth out our access pattern
589        // and use only bounded calls to Consensus. additionally, if `limit` is adequately tuned,
590        // this path will only be invoked when there's an excess number of states in Consensus and
591        // it might be slower to do a single long scan over unneeded rows.
592        let head = retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
593            self.consensus.head(&path).await
594        })
595        .instrument(debug_span!("fetch_state::slow_path::head"))
596        .await
597        .expect("initialized shard should have at least 1 diff");
598
599        let latest_diff = self
600            .metrics
601            .codecs
602            .state_diff
603            .decode(|| StateDiff::<T>::decode(&self.cfg.build_version, head.data));
604
605        match BlobKey::parse_ids(&latest_diff.latest_rollup_key.complete(shard_id)) {
606            Ok((_shard_id, PartialBlobKey::Rollup(seqno, _rollup))) => {
607                self.metrics.state.fetch_recent_live_diffs_slow_path.inc();
608                let diffs =
609                    retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
610                        // (pedantry) this call is technically unbounded, but something very strange
611                        // would have had to happen to have accumulated so many states between our
612                        // call to `head` and this invocation for it to become problematic
613                        self.consensus.scan(&path, seqno, SCAN_ALL).await
614                    })
615                    .instrument(debug_span!("fetch_state::slow_path::scan"))
616                    .await;
617                RecentLiveDiffs(diffs)
618            }
619            Ok(_) => panic!(
620                "invalid state diff rollup key: {}",
621                latest_diff.latest_rollup_key
622            ),
623            Err(err) => panic!("unparseable state diff rollup key: {}", err),
624        }
625    }
626
627    /// Fetches all live diffs greater than the given SeqNo.
628    ///
629    /// TODO: Apply a limit to this scan. This could additionally be used as an internal
630    /// call within `fetch_recent_live_diffs`.
631    pub async fn fetch_all_live_diffs_gt_seqno<K, V, T, D>(
632        &self,
633        shard_id: &ShardId,
634        seqno: SeqNo,
635    ) -> Vec<VersionedData> {
636        let path = shard_id.to_string();
637        retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
638            self.consensus.scan(&path, seqno.next(), SCAN_ALL).await
639        })
640        .instrument(debug_span!("fetch_state::scan"))
641        .await
642    }
643
644    /// Truncates any diffs in consensus less than the given seqno.
645    pub async fn truncate_diffs(&self, shard_id: &ShardId, seqno: SeqNo) {
646        let path = shard_id.to_string();
647        let _deleted_count = retry_external(&self.metrics.retries.external.gc_truncate, || async {
648            self.consensus.truncate(&path, seqno).await
649        })
650        .instrument(debug_span!("gc::truncate"))
651        .await;
652    }
653
654    // Writes a self-referential rollup to blob storage and returns the diff
655    // that should be compare_and_set into consensus to finish initializing the
656    // shard.
657    async fn write_initial_rollup<K, V, T, D>(
658        &self,
659        shard_metrics: &ShardMetrics,
660    ) -> (TypedState<K, V, T, D>, StateDiff<T>)
661    where
662        K: Debug + Codec,
663        V: Debug + Codec,
664        T: Timestamp + Lattice + Codec64,
665        D: Semigroup + Codec64,
666    {
667        let empty_state = TypedState::new(
668            self.cfg.build_version.clone(),
669            shard_metrics.shard_id,
670            self.cfg.hostname.clone(),
671            (self.cfg.now)(),
672        );
673        let rollup_seqno = empty_state.seqno.next();
674        let rollup = HollowRollup {
675            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
676            // Chicken-and-egg problem here. We don't know the size of the
677            // rollup until we encode it, but it includes a reference back to
678            // itself.
679            encoded_size_bytes: None,
680        };
681        let (applied, initial_state) = match empty_state
682            .clone_apply(&self.cfg, &mut |_, _, state| {
683                state.add_rollup((rollup_seqno, &rollup))
684            }) {
685            Continue(x) => x,
686            Break(NoOpStateTransition(_)) => {
687                panic!("initial state transition should not be a no-op")
688            }
689        };
690        assert!(
691            applied,
692            "add_and_remove_rollups should apply to the empty state"
693        );
694
695        let rollup = self.encode_rollup_blob(
696            shard_metrics,
697            initial_state.clone_for_rollup(),
698            vec![],
699            rollup.key,
700        );
701        let () = self.write_rollup_blob(&rollup).await;
702        assert_eq!(initial_state.seqno, rollup.seqno);
703
704        let diff = StateDiff::from_diff(&empty_state.state, &initial_state.state);
705        (initial_state, diff)
706    }
707
708    pub async fn write_rollup_for_state<K, V, T, D>(
709        &self,
710        shard_metrics: &ShardMetrics,
711        state: TypedState<K, V, T, D>,
712        rollup_id: &RollupId,
713    ) -> Option<EncodedRollup>
714    where
715        K: Debug + Codec,
716        V: Debug + Codec,
717        T: Timestamp + Lattice + Codec64,
718        D: Semigroup + Codec64,
719    {
720        let (latest_rollup_seqno, _rollup) = state.latest_rollup();
721        let seqno = state.seqno();
722
723        // TODO: maintain the diffs since the latest rollup in-memory rather than
724        // needing an additional API call here. This would reduce Consensus load
725        // / avoid races with Consensus truncation, but is trickier to write.
726        let diffs: Vec<_> = self
727            .fetch_all_live_diffs_gt_seqno::<K, V, T, D>(&state.shard_id, *latest_rollup_seqno)
728            .await;
729
730        match diffs.first() {
731            None => {
732                // early-out because these are no diffs past our latest rollup.
733                //
734                // this should only occur in the initial state, but we can write a more
735                // general assertion: if no live diffs exist past this state's latest
736                // known rollup, then that rollup must be for the latest known state.
737                self.metrics.state.rollup_write_noop_latest.inc();
738                assert_eq!(seqno, *latest_rollup_seqno);
739                return None;
740            }
741            Some(first) => {
742                // early-out if it is no longer possible to inline all the diffs from
743                // the last known rollup to the current state. some or all of the diffs
744                // have already been truncated by another process.
745                //
746                // this can happen if one process gets told to write a rollup, the
747                // maintenance task falls arbitrarily behind, and another process writes
748                // a new rollup / GCs and truncates past the first process's rollup.
749                self.metrics.state.rollup_write_noop_truncated.inc();
750                if first.seqno != latest_rollup_seqno.next() {
751                    assert!(
752                        first.seqno > latest_rollup_seqno.next(),
753                        "diff: {}, rollup: {}",
754                        first.seqno,
755                        latest_rollup_seqno,
756                    );
757                    return None;
758                }
759            }
760        }
761
762        // we may have fetched more diffs than we need: trim anything beyond the state's seqno
763        let diffs: Vec<_> = diffs.into_iter().filter(|x| x.seqno <= seqno).collect();
764
765        // verify that we've done all the filtering correctly and that our
766        // diffs have seqnos bounded by (last_rollup, current_state]
767        assert_eq!(
768            diffs.first().map(|x| x.seqno),
769            Some(latest_rollup_seqno.next())
770        );
771        assert_eq!(diffs.last().map(|x| x.seqno), Some(state.seqno));
772
773        let key = PartialRollupKey::new(state.seqno, rollup_id);
774        let rollup = self.encode_rollup_blob(shard_metrics, state, diffs, key);
775        let () = self.write_rollup_blob(&rollup).await;
776
777        self.metrics.state.rollup_write_success.inc();
778
779        Some(rollup)
780    }
781
782    /// Encodes the given state and diffs as a rollup to be written to the specified key.
783    ///
784    /// The diffs must span the seqno range `(state.last_rollup().seqno, state.seqno]`.
785    pub fn encode_rollup_blob<K, V, T, D>(
786        &self,
787        shard_metrics: &ShardMetrics,
788        state: TypedState<K, V, T, D>,
789        diffs: Vec<VersionedData>,
790        key: PartialRollupKey,
791    ) -> EncodedRollup
792    where
793        K: Debug + Codec,
794        V: Debug + Codec,
795        T: Timestamp + Lattice + Codec64,
796        D: Semigroup + Codec64,
797    {
798        let shard_id = state.shard_id;
799        let rollup_seqno = state.seqno;
800
801        let rollup = Rollup::from(state.into(), diffs);
802        let desc = rollup.diffs.as_ref().expect("inlined diffs").description();
803
804        let buf = self.metrics.codecs.state.encode(|| {
805            let mut buf = Vec::new();
806            rollup
807                .into_proto()
808                .encode(&mut buf)
809                .expect("no required fields means no initialization errors");
810            Bytes::from(buf)
811        });
812        shard_metrics
813            .latest_rollup_size
814            .set(u64::cast_from(buf.len()));
815        EncodedRollup {
816            shard_id,
817            seqno: rollup_seqno,
818            key,
819            buf,
820            _desc: desc,
821        }
822    }
823
824    /// Writes the given state rollup out to blob.
825    pub async fn write_rollup_blob(&self, rollup: &EncodedRollup) {
826        let payload_len = rollup.buf.len();
827        retry_external(&self.metrics.retries.external.rollup_set, || async {
828            self.blob
829                .set(
830                    &rollup.key.complete(&rollup.shard_id),
831                    Bytes::clone(&rollup.buf),
832                )
833                .await
834        })
835        .instrument(debug_span!("rollup::set", payload_len))
836        .await;
837    }
838
839    /// Fetches a rollup for the given SeqNo, if it exists.
840    ///
841    /// Uses the provided hint, which is a possibly outdated copy of all
842    /// or recent live diffs, to avoid fetches where possible.
843    ///
844    /// Panics if called on an uninitialized shard.
845    async fn fetch_rollup_at_seqno<T>(
846        &self,
847        shard_id: &ShardId,
848        live_diffs: Vec<VersionedData>,
849        seqno: SeqNo,
850    ) -> Option<UntypedState<T>>
851    where
852        T: Timestamp + Lattice + Codec64,
853    {
854        let rollup_key_for_migration = live_diffs.iter().find_map(|x| {
855            let diff = self
856                .metrics
857                .codecs
858                .state_diff
859                // Note: `x.data` is a `Bytes`, so cloning just increments a ref count
860                .decode(|| StateDiff::<T>::decode(&self.cfg.build_version, x.data.clone()));
861            diff.rollups
862                .iter()
863                .find(|x| x.key == seqno)
864                .map(|x| match &x.val {
865                    StateFieldValDiff::Insert(x) => x.clone(),
866                    StateFieldValDiff::Update(_, x) => x.clone(),
867                    StateFieldValDiff::Delete(x) => x.clone(),
868                })
869        });
870
871        let state = self.fetch_current_state::<T>(shard_id, live_diffs).await;
872        if let Some(rollup) = state.rollups().get(&seqno) {
873            return self.fetch_rollup_at_key(shard_id, &rollup.key).await;
874        }
875
876        // MIGRATION: We maintain an invariant that the _current state_ contains
877        // a rollup for the _earliest live diff_ in consensus (and that the
878        // referenced rollup exists). At one point, we fixed a bug that could
879        // lead to that invariant being violated.
880        //
881        // If the earliest live diff is X and we receive a gc req for X+Y to
882        // X+Y+Z (this can happen e.g. if some cmd ignores an earlier req for X
883        // to X+Y, or if they're processing concurrently and the X to X+Y req
884        // loses the race), then the buggy version of gc would delete any
885        // rollups strictly less than old_seqno_since (X+Y in this example). But
886        // our invariant is that the rollup exists for the earliest live diff,
887        // in this case X. So if the first call to gc was interrupted after this
888        // but before truncate (when all the blob deletes happen), later calls
889        // to gc would attempt to call `fetch_live_states` and end up infinitely
890        // in its loop.
891        //
892        // The fix was to base which rollups are deleteable on the earliest live
893        // diff, not old_seqno_since.
894        //
895        // Sadly, some envs in prod now violate this invariant. So, even with
896        // the fix, existing shards will never successfully run gc. We add a
897        // temporary migration to fix them in `fetch_rollup_at_seqno`. This
898        // method normally looks in the latest version of state for the
899        // specifically requested seqno. In the invariant violation case, some
900        // version of state in the range `[earliest, current]` has a rollup for
901        // earliest, but current doesn't. So, for the migration, if
902        // fetch_rollup_at_seqno doesn't find a rollup in current, then we fall
903        // back to sniffing one out of raw diffs. If this success, we increment
904        // a counter and log, so we can track how often this migration is
905        // bailing us out. After the next deploy, this should initially start at
906        // > 0 and then settle down to 0. After the next prod envs wipe, we can
907        // remove the migration.
908        let rollup = rollup_key_for_migration.expect("someone should have a key for this rollup");
909        tracing::info!("only found rollup for {} {} via migration", shard_id, seqno);
910        self.metrics.state.rollup_at_seqno_migration.inc();
911        self.fetch_rollup_at_key(shard_id, &rollup.key).await
912    }
913
914    /// Fetches the rollup at the given key, if it exists.
915    async fn fetch_rollup_at_key<T>(
916        &self,
917        shard_id: &ShardId,
918        rollup_key: &PartialRollupKey,
919    ) -> Option<UntypedState<T>>
920    where
921        T: Timestamp + Lattice + Codec64,
922    {
923        retry_external(&self.metrics.retries.external.rollup_get, || async {
924            self.blob.get(&rollup_key.complete(shard_id)).await
925        })
926        .instrument(debug_span!("rollup::get"))
927        .await
928        .map(|buf| {
929            self.metrics
930                .codecs
931                .state
932                .decode(|| UntypedState::decode(&self.cfg.build_version, buf))
933        })
934    }
935
936    /// Deletes the rollup at the given key, if it exists.
937    pub async fn delete_rollup(&self, shard_id: &ShardId, key: &PartialRollupKey) {
938        let _ = retry_external(&self.metrics.retries.external.rollup_delete, || async {
939            self.blob.delete(&key.complete(shard_id)).await
940        })
941        .await
942        .instrument(debug_span!("rollup::delete"));
943    }
944}
945
946pub struct UntypedStateVersionsIter<T> {
947    shard_id: ShardId,
948    cfg: PersistConfig,
949    metrics: Arc<Metrics>,
950    state: UntypedState<T>,
951    diffs: Vec<VersionedData>,
952}
953
954impl<T: Timestamp + Lattice + Codec64> UntypedStateVersionsIter<T> {
955    pub(crate) fn check_ts_codec(self) -> Result<StateVersionsIter<T>, CodecMismatchT> {
956        let key_codec = self.state.key_codec.clone();
957        let val_codec = self.state.val_codec.clone();
958        let diff_codec = self.state.diff_codec.clone();
959        let state = self.state.check_ts_codec(&self.shard_id)?;
960        Ok(StateVersionsIter::new(
961            self.cfg,
962            self.metrics,
963            state,
964            self.diffs,
965            key_codec,
966            val_codec,
967            diff_codec,
968        ))
969    }
970}
971
972/// An iterator over consecutive versions of [State].
973pub struct StateVersionsIter<T> {
974    cfg: PersistConfig,
975    metrics: Arc<Metrics>,
976    state: State<T>,
977    diffs: Vec<VersionedData>,
978    key_codec: String,
979    val_codec: String,
980    diff_codec: String,
981    #[cfg(debug_assertions)]
982    validator: ReferencedBlobValidator<T>,
983}
984
985impl<T: Timestamp + Lattice + Codec64> StateVersionsIter<T> {
986    fn new(
987        cfg: PersistConfig,
988        metrics: Arc<Metrics>,
989        state: State<T>,
990        // diffs is stored reversed so we can efficiently pop off the Vec.
991        mut diffs: Vec<VersionedData>,
992        key_codec: String,
993        val_codec: String,
994        diff_codec: String,
995    ) -> Self {
996        assert!(diffs.first().map_or(true, |x| x.seqno == state.seqno));
997        diffs.reverse();
998        StateVersionsIter {
999            cfg,
1000            metrics,
1001            state,
1002            diffs,
1003            key_codec,
1004            val_codec,
1005            diff_codec,
1006            #[cfg(debug_assertions)]
1007            validator: ReferencedBlobValidator::default(),
1008        }
1009    }
1010
1011    pub fn len(&self) -> usize {
1012        self.diffs.len()
1013    }
1014
1015    /// Advances first to some starting state (in practice, usually the first
1016    /// live state), and then through each successive state, for as many diffs
1017    /// as this iterator was initialized with.
1018    ///
1019    /// The `inspect_diff_fn` callback can be used to inspect diffs directly as
1020    /// they are applied. The first call to `next` returns a
1021    /// [InspectDiff::FromInitial] representing a diff from the initial state.
1022    pub fn next<F: for<'a> FnMut(InspectDiff<'a, T>)>(
1023        &mut self,
1024        mut inspect_diff_fn: F,
1025    ) -> Option<&State<T>> {
1026        let diff = match self.diffs.pop() {
1027            Some(x) => x,
1028            None => return None,
1029        };
1030        let data = diff.data.clone();
1031        let diff = self
1032            .metrics
1033            .codecs
1034            .state_diff
1035            .decode(|| StateDiff::decode(&self.cfg.build_version, diff.data));
1036
1037        // A bit hacky, but the first diff in StateVersionsIter is always a
1038        // no-op.
1039        if diff.seqno_to == self.state.seqno {
1040            let inspect = InspectDiff::FromInitial(&self.state);
1041            #[cfg(debug_assertions)]
1042            {
1043                inspect
1044                    .referenced_blobs()
1045                    .for_each(|x| self.validator.add_inc_blob(x));
1046            }
1047            inspect_diff_fn(inspect);
1048        } else {
1049            let inspect = InspectDiff::Diff(&diff);
1050            #[cfg(debug_assertions)]
1051            {
1052                inspect
1053                    .referenced_blobs()
1054                    .for_each(|x| self.validator.add_inc_blob(x));
1055            }
1056            inspect_diff_fn(inspect);
1057        }
1058
1059        let diff_seqno_to = diff.seqno_to;
1060        self.state
1061            .apply_diffs(&self.metrics, std::iter::once((diff, data)));
1062        assert_eq!(self.state.seqno, diff_seqno_to);
1063        #[cfg(debug_assertions)]
1064        {
1065            self.validator.validate_against_state(&self.state);
1066        }
1067        Some(&self.state)
1068    }
1069
1070    pub fn state(&self) -> &State<T> {
1071        &self.state
1072    }
1073
1074    pub fn into_rollup_proto_without_diffs(&self) -> impl serde::Serialize + use<T> {
1075        Rollup::from_state_without_diffs(
1076            State {
1077                applier_version: self.state.applier_version.clone(),
1078                shard_id: self.state.shard_id.clone(),
1079                seqno: self.state.seqno.clone(),
1080                walltime_ms: self.state.walltime_ms.clone(),
1081                hostname: self.state.hostname.clone(),
1082                collections: self.state.collections.clone(),
1083            },
1084            self.key_codec.clone(),
1085            self.val_codec.clone(),
1086            T::codec_name(),
1087            self.diff_codec.clone(),
1088        )
1089        .into_proto()
1090    }
1091}
1092
1093/// This represents a diff, either directly or, in the case of the FromInitial
1094/// variant, a diff from the initial state. (We could instead compute the diff
1095/// from the initial state and replace this with only a `StateDiff<T>`, but don't
1096/// for efficiency.)
1097#[derive(Debug)]
1098pub enum InspectDiff<'a, T> {
1099    FromInitial(&'a State<T>),
1100    Diff(&'a StateDiff<T>),
1101}
1102
1103impl<T: Timestamp + Lattice + Codec64> InspectDiff<'_, T> {
1104    /// A callback invoked for each blob added this state transition.
1105    ///
1106    /// Blob removals, along with all other diffs, are ignored.
1107    pub fn referenced_blobs(&self) -> impl Iterator<Item = HollowBlobRef<T>> {
1108        let (state, diff) = match self {
1109            InspectDiff::FromInitial(x) => (Some(x), None),
1110            InspectDiff::Diff(x) => (None, Some(x)),
1111        };
1112        let state_blobs = state.into_iter().flat_map(|s| s.blobs());
1113        let diff_blobs = diff.into_iter().flat_map(|d| d.blob_inserts());
1114        state_blobs.chain(diff_blobs)
1115    }
1116}
1117
1118#[cfg(debug_assertions)]
1119struct ReferencedBlobValidator<T> {
1120    // A copy of every batch and rollup referenced by some state iterator,
1121    // computed by scanning the full copy of state at each seqno.
1122    full_batches: BTreeSet<HollowBatch<T>>,
1123    full_rollups: BTreeSet<HollowRollup>,
1124    // A copy of every batch and rollup referenced by some state iterator,
1125    // computed incrementally.
1126    inc_batches: BTreeSet<HollowBatch<T>>,
1127    inc_rollups: BTreeSet<HollowRollup>,
1128}
1129
1130#[cfg(debug_assertions)]
1131impl<T> Default for ReferencedBlobValidator<T> {
1132    fn default() -> Self {
1133        Self {
1134            full_batches: Default::default(),
1135            full_rollups: Default::default(),
1136            inc_batches: Default::default(),
1137            inc_rollups: Default::default(),
1138        }
1139    }
1140}
1141
1142#[cfg(debug_assertions)]
1143impl<T: Timestamp + Lattice + Codec64> ReferencedBlobValidator<T> {
1144    fn add_inc_blob(&mut self, x: HollowBlobRef<'_, T>) {
1145        match x {
1146            HollowBlobRef::Batch(x) => assert!(
1147                self.inc_batches.insert(x.clone()) || x.desc.lower() == x.desc.upper(),
1148                "non-empty batches should only be appended once; duplicate: {x:?}"
1149            ),
1150            HollowBlobRef::Rollup(x) => assert!(self.inc_rollups.insert(x.clone())),
1151        }
1152    }
1153    fn validate_against_state(&mut self, x: &State<T>) {
1154        use std::hash::{DefaultHasher, Hash, Hasher};
1155
1156        use mz_ore::collections::HashSet;
1157        use timely::progress::Antichain;
1158
1159        use crate::internal::state::BatchPart;
1160
1161        x.blobs().for_each(|x| match x {
1162            HollowBlobRef::Batch(x) => {
1163                self.full_batches.insert(x.clone());
1164            }
1165            HollowBlobRef::Rollup(x) => {
1166                self.full_rollups.insert(x.clone());
1167            }
1168        });
1169
1170        // Check that the sets of batches overall cover the same pTVC.
1171        // Partial ordering means we can't just take the first and last batches; instead compute
1172        // bounds using the lattice operations.
1173        fn overall_desc<'a, T: Timestamp + Lattice>(
1174            iter: impl Iterator<Item = &'a Description<T>>,
1175        ) -> (Antichain<T>, Antichain<T>) {
1176            let mut lower = Antichain::new();
1177            let mut upper = Antichain::from_elem(T::minimum());
1178            for desc in iter {
1179                lower.meet_assign(desc.lower());
1180                upper.join_assign(desc.upper());
1181            }
1182            (lower, upper)
1183        }
1184        let (inc_lower, inc_upper) = overall_desc(self.inc_batches.iter().map(|a| &a.desc));
1185        let (full_lower, full_upper) = overall_desc(self.full_batches.iter().map(|a| &a.desc));
1186        assert_eq!(inc_lower, full_lower);
1187        assert_eq!(inc_upper, full_upper);
1188
1189        fn part_unique<T: Hash>(x: &RunPart<T>) -> String {
1190            match x {
1191                RunPart::Single(BatchPart::Inline {
1192                    updates,
1193                    ts_rewrite,
1194                    ..
1195                }) => {
1196                    let mut h = DefaultHasher::new();
1197                    updates.hash(&mut h);
1198                    ts_rewrite.as_ref().map(|x| x.elements()).hash(&mut h);
1199                    h.finish().to_string()
1200                }
1201                other => other.printable_name().to_string(),
1202            }
1203        }
1204
1205        // Check that the overall set of parts contained in both representations is the same.
1206        let inc_parts: HashSet<_> = self
1207            .inc_batches
1208            .iter()
1209            .flat_map(|x| x.parts.iter())
1210            .map(part_unique)
1211            .collect();
1212        let full_parts = self
1213            .full_batches
1214            .iter()
1215            .flat_map(|x| x.parts.iter())
1216            .map(part_unique)
1217            .collect();
1218        assert_eq!(inc_parts, full_parts);
1219
1220        // Check that both representations have the same rollups.
1221        assert_eq!(self.inc_rollups, self.full_rollups);
1222    }
1223}
1224
1225#[cfg(test)]
1226mod tests {
1227    use mz_dyncfg::ConfigUpdates;
1228
1229    use crate::tests::new_test_client;
1230
1231    use super::*;
1232
1233    /// Regression test for (part of) database-issues#5170, where an interrupted
1234    /// `bin/environmentd --reset` resulted in panic in persist usage code.
1235    #[mz_persist_proc::test(tokio::test)]
1236    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1237    async fn fetch_all_live_states_regression_uninitialized(dyncfgs: ConfigUpdates) {
1238        let client = new_test_client(&dyncfgs).await;
1239        let state_versions = StateVersions::new(
1240            client.cfg.clone(),
1241            Arc::clone(&client.consensus),
1242            Arc::clone(&client.blob),
1243            Arc::clone(&client.metrics),
1244        );
1245        assert!(
1246            state_versions
1247                .fetch_all_live_states::<u64>(ShardId::new())
1248                .await
1249                .is_none()
1250        );
1251    }
1252}