mz_persist_client/internal/
state_versions.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A durable, truncatable log of versions of [State].
11
12#[cfg(debug_assertions)]
13use std::collections::BTreeSet;
14use std::fmt::Debug;
15use std::ops::ControlFlow::{Break, Continue};
16use std::sync::Arc;
17use std::time::SystemTime;
18
19use bytes::Bytes;
20use differential_dataflow::difference::Monoid;
21use differential_dataflow::lattice::Lattice;
22use differential_dataflow::trace::Description;
23use mz_ore::cast::CastFrom;
24use mz_persist::location::{
25    Blob, CaSResult, Consensus, Indeterminate, SCAN_ALL, SeqNo, VersionedData,
26};
27use mz_persist::retry::Retry;
28use mz_persist_types::{Codec, Codec64};
29use mz_proto::RustType;
30use prost::Message;
31use timely::progress::Timestamp;
32use tracing::{Instrument, debug, debug_span, trace, warn};
33
34use crate::cfg::STATE_VERSIONS_RECENT_LIVE_DIFFS_LIMIT;
35use crate::error::{CodecMismatch, CodecMismatchT};
36use crate::internal::encoding::{Rollup, UntypedState};
37use crate::internal::machine::{retry_determinate, retry_external};
38use crate::internal::metrics::ShardMetrics;
39use crate::internal::paths::{BlobKey, PartialBlobKey, PartialRollupKey, RollupId};
40#[cfg(debug_assertions)]
41use crate::internal::state::HollowBatch;
42use crate::internal::state::{
43    BatchPart, HollowBlobRef, HollowRollup, NoOpStateTransition, RunPart, State, TypedState,
44};
45use crate::internal::state_diff::{StateDiff, StateFieldValDiff};
46use crate::{Metrics, PersistConfig, ShardId};
47
48/// A durable, truncatable log of versions of [State].
49///
50/// As persist metadata changes over time, we make its versions (each identified
51/// by a [SeqNo]) durable in two ways:
52/// - `rollups`: Periodic copies of the entirety of [State], written to [Blob].
53/// - `diffs`: Incremental [StateDiff]s, written to [Consensus].
54///
55/// The following invariants are maintained at all times:
56/// - A shard is initialized iff there is at least one version of it in
57///   Consensus.
58/// - The first version of state is written to `SeqNo(1)`. Each successive state
59///   version is assigned its predecessor's SeqNo +1.
60/// - `current`: The latest version of state. By definition, the largest SeqNo
61///   present in Consensus.
62/// - As state changes over time, we keep a range of consecutive versions
63///   available. These are periodically `truncated` to prune old versions that
64///   are no longer necessary.
65/// - `earliest`: The first version of state that it is possible to reconstruct.
66///   - Invariant: `earliest <= current.seqno_since()` (we don't garbage collect
67///     versions still being used by some reader).
68///   - Invariant: `earliest` is always the smallest Seqno present in Consensus.
69///     - This doesn't have to be true, but we select to enforce it.
70///     - Because the data stored at that smallest Seqno is an incremental diff,
71///       to make this invariant work, there needs to be a rollup at either
72///       `earliest-1` or `earliest`. We choose `earliest` because it seems to
73///       make the code easier to reason about in practice.
74///     - A consequence of the above is when we garbage collect old versions of
75///       state, we're only free to truncate ones that are `<` the latest rollup
76///       that is `<= current.seqno_since`.
77/// - `live diffs`: The set of SeqNos present in Consensus at any given time.
78/// - `live states`: The range of state versions that it is possible to
79///   reconstruct: `[earliest,current]`.
80///   - Because of earliest and current invariants above, the range of `live
81///     diffs` and `live states` are the same.
82/// - The set of known rollups are tracked in the shard state itself.
83///   - For efficiency of common operations, the most recent rollup's Blob key
84///     is always denormalized in each StateDiff written to Consensus. (As
85///     described above, there is always a rollup at earliest, so we're
86///     guaranteed that there is always at least one live rollup.)
87///   - Invariant: The rollups in `current` exist in Blob.
88///     - A consequence is that, if a rollup in a state you believe is `current`
89///       doesn't exist, it's a guarantee that `current` has changed (or it's a
90///       bug).
91///   - Any rollup at a version `< earliest-1` is useless (we've lost the
92///     incremental diffs between it and the live states). GC is tasked with
93///     deleting these rollups from Blob before truncating diffs from Consensus.
94///     Thus, any rollup at a seqno < earliest can be considered "leaked" and
95///     deleted by the leaked blob detector.
96///   - Note that this means, while `current`'s rollups exist, it will be common
97///     for other live states to reference rollups that no longer exist.
98#[derive(Debug)]
99pub struct StateVersions {
100    pub(crate) cfg: PersistConfig,
101    pub(crate) consensus: Arc<dyn Consensus>,
102    pub(crate) blob: Arc<dyn Blob>,
103    pub(crate) metrics: Arc<Metrics>,
104}
105
106#[derive(Debug, Clone)]
107pub struct RecentLiveDiffs(pub Vec<VersionedData>);
108
109#[derive(Debug, Clone)]
110pub struct AllLiveDiffs(pub Vec<VersionedData>);
111
112#[derive(Debug, Clone)]
113pub struct EncodedRollup {
114    pub(crate) shard_id: ShardId,
115    pub(crate) seqno: SeqNo,
116    pub(crate) key: PartialRollupKey,
117    pub(crate) _desc: Description<SeqNo>,
118    buf: Bytes,
119}
120
121impl EncodedRollup {
122    pub fn to_hollow(&self) -> HollowRollup {
123        HollowRollup {
124            key: self.key.clone(),
125            encoded_size_bytes: Some(self.buf.len()),
126        }
127    }
128}
129
130impl StateVersions {
131    pub fn new(
132        cfg: PersistConfig,
133        consensus: Arc<dyn Consensus>,
134        blob: Arc<dyn Blob>,
135        metrics: Arc<Metrics>,
136    ) -> Self {
137        StateVersions {
138            cfg,
139            consensus,
140            blob,
141            metrics,
142        }
143    }
144
145    /// Fetches the `current` state of the requested shard, or creates it if
146    /// uninitialized.
147    pub async fn maybe_init_shard<K, V, T, D>(
148        &self,
149        shard_metrics: &ShardMetrics,
150    ) -> Result<TypedState<K, V, T, D>, Box<CodecMismatch>>
151    where
152        K: Debug + Codec,
153        V: Debug + Codec,
154        T: Timestamp + Lattice + Codec64,
155        D: Monoid + Codec64,
156    {
157        let shard_id = shard_metrics.shard_id;
158
159        // The common case is that the shard is initialized, so try that first
160        let recent_live_diffs = self.fetch_recent_live_diffs::<T>(&shard_id).await;
161        if !recent_live_diffs.0.is_empty() {
162            return self
163                .fetch_current_state(&shard_id, recent_live_diffs.0)
164                .await
165                .check_codecs(&shard_id);
166        }
167
168        // Shard is not initialized, try initializing it.
169        let (initial_state, initial_diff) = self.write_initial_rollup(shard_metrics).await;
170        let (cas_res, _diff) =
171            retry_external(&self.metrics.retries.external.maybe_init_cas, || async {
172                self.try_compare_and_set_current(
173                    "maybe_init_shard",
174                    shard_metrics,
175                    None,
176                    &initial_state,
177                    &initial_diff,
178                )
179                .await
180                .map_err(|err| err.into())
181            })
182            .await;
183        match cas_res {
184            CaSResult::Committed => Ok(initial_state),
185            CaSResult::ExpectationMismatch => {
186                let recent_live_diffs = self.fetch_recent_live_diffs::<T>(&shard_id).await;
187                let state = self
188                    .fetch_current_state(&shard_id, recent_live_diffs.0)
189                    .await
190                    .check_codecs(&shard_id);
191
192                // Clean up the rollup blob that we were trying to reference.
193                //
194                // SUBTLE: If we got an Indeterminate error in the CaS above,
195                // but it actually went through, then we'll "contend" with
196                // ourselves and get an expectation mismatch. Use the actual
197                // fetched state to determine if our rollup actually made it in
198                // and decide whether to delete based on that.
199                let (_, rollup) = initial_state.latest_rollup();
200                let should_delete_rollup = match state.as_ref() {
201                    Ok(state) => !state
202                        .collections
203                        .rollups
204                        .values()
205                        .any(|x| &x.key == &rollup.key),
206                    // If the codecs don't match, then we definitely didn't
207                    // write the state.
208                    Err(_codec_mismatch) => true,
209                };
210                if should_delete_rollup {
211                    self.delete_rollup(&shard_id, &rollup.key).await;
212                }
213
214                state
215            }
216        }
217    }
218
219    /// Updates the state of a shard to a new `current` iff `expected` matches
220    /// `current`.
221    ///
222    /// May be called on uninitialized shards.
223    pub async fn try_compare_and_set_current<K, V, T, D>(
224        &self,
225        cmd_name: &str,
226        shard_metrics: &ShardMetrics,
227        expected: Option<SeqNo>,
228        new_state: &TypedState<K, V, T, D>,
229        diff: &StateDiff<T>,
230    ) -> Result<(CaSResult, VersionedData), Indeterminate>
231    where
232        K: Debug + Codec,
233        V: Debug + Codec,
234        T: Timestamp + Lattice + Codec64,
235        D: Monoid + Codec64,
236    {
237        assert_eq!(shard_metrics.shard_id, new_state.shard_id);
238        let path = new_state.shard_id.to_string();
239
240        trace!(
241            "apply_unbatched_cmd {} attempting {}\n  new_state={:?}",
242            cmd_name,
243            new_state.seqno(),
244            new_state
245        );
246        let new = self.metrics.codecs.state_diff.encode(|| {
247            let mut buf = Vec::new();
248            diff.encode(&mut buf);
249            VersionedData {
250                seqno: new_state.seqno(),
251                data: Bytes::from(buf),
252            }
253        });
254        assert_eq!(new.seqno, diff.seqno_to);
255
256        let payload_len = new.data.len();
257        let cas_res = retry_determinate(
258            &self.metrics.retries.determinate.apply_unbatched_cmd_cas,
259            || async {
260                self.consensus
261                    .compare_and_set(&path, expected, new.clone())
262                    .await
263            },
264        )
265        .instrument(debug_span!("apply_unbatched_cmd::cas", payload_len))
266        .await
267        .map_err(|err| {
268            debug!("apply_unbatched_cmd {} errored: {}", cmd_name, err);
269            err
270        })?;
271
272        match cas_res {
273            CaSResult::Committed => {
274                trace!(
275                    "apply_unbatched_cmd {} succeeded {}\n  new_state={:?}",
276                    cmd_name,
277                    new_state.seqno(),
278                    new_state
279                );
280
281                shard_metrics.set_since(new_state.since());
282                shard_metrics.set_upper(new_state.upper());
283                shard_metrics.seqnos_since_last_rollup.set(
284                    new_state
285                        .seqno
286                        .0
287                        .saturating_sub(new_state.latest_rollup().0.0),
288                );
289                shard_metrics
290                    .spine_batch_count
291                    .set(u64::cast_from(new_state.spine_batch_count()));
292                let size_metrics = new_state.size_metrics();
293                shard_metrics
294                    .schema_registry_version_count
295                    .set(u64::cast_from(new_state.collections.schemas.len()));
296                shard_metrics
297                    .hollow_batch_count
298                    .set(u64::cast_from(size_metrics.hollow_batch_count));
299                shard_metrics
300                    .batch_part_count
301                    .set(u64::cast_from(size_metrics.batch_part_count));
302                shard_metrics
303                    .rewrite_part_count
304                    .set(u64::cast_from(size_metrics.rewrite_part_count));
305                shard_metrics
306                    .update_count
307                    .set(u64::cast_from(size_metrics.num_updates));
308                shard_metrics
309                    .rollup_count
310                    .set(u64::cast_from(size_metrics.state_rollup_count));
311                shard_metrics
312                    .largest_batch_size
313                    .set(u64::cast_from(size_metrics.largest_batch_bytes));
314                shard_metrics
315                    .usage_current_state_batches_bytes
316                    .set(u64::cast_from(size_metrics.state_batches_bytes));
317                shard_metrics
318                    .usage_current_state_rollups_bytes
319                    .set(u64::cast_from(size_metrics.state_rollups_bytes));
320                shard_metrics
321                    .seqnos_held
322                    .set(u64::cast_from(new_state.seqnos_held()));
323                shard_metrics
324                    .encoded_diff_size
325                    .inc_by(u64::cast_from(payload_len));
326                shard_metrics
327                    .live_writers
328                    .set(u64::cast_from(new_state.collections.writers.len()));
329                shard_metrics
330                    .rewrite_part_count
331                    .set(u64::cast_from(size_metrics.rewrite_part_count));
332                shard_metrics
333                    .inline_part_count
334                    .set(u64::cast_from(size_metrics.inline_part_count));
335                shard_metrics
336                    .inline_part_bytes
337                    .set(u64::cast_from(size_metrics.inline_part_bytes));
338                shard_metrics.stale_version.set(
339                    if new_state
340                        .state
341                        .collections
342                        .version
343                        .cmp_precedence(&self.cfg.build_version)
344                        .is_lt()
345                    {
346                        1
347                    } else {
348                        0
349                    },
350                );
351
352                let spine_metrics = new_state.collections.trace.spine_metrics();
353                shard_metrics
354                    .compact_batches
355                    .set(spine_metrics.compact_batches);
356                shard_metrics
357                    .compacting_batches
358                    .set(spine_metrics.compacting_batches);
359                shard_metrics
360                    .noncompact_batches
361                    .set(spine_metrics.noncompact_batches);
362
363                let batch_parts_by_version = new_state
364                    .collections
365                    .trace
366                    .batches()
367                    .flat_map(|x| x.parts.iter())
368                    .flat_map(|part| {
369                        let key = match part {
370                            RunPart::Many(x) => Some(&x.key),
371                            RunPart::Single(BatchPart::Hollow(x)) => Some(&x.key),
372                            // TODO: Would be nice to include these too, but we lose the info atm.
373                            RunPart::Single(BatchPart::Inline { .. }) => None,
374                        }?;
375                        // Carefully avoid any String allocs by splitting.
376                        let (writer_key, _) = key.0.split_once('/')?;
377                        match &writer_key[..1] {
378                            "w" => Some(("old", part.encoded_size_bytes())),
379                            "n" => Some((&writer_key[1..], part.encoded_size_bytes())),
380                            _ => None,
381                        }
382                    });
383                shard_metrics.set_batch_part_versions(batch_parts_by_version);
384
385                Ok((CaSResult::Committed, new))
386            }
387            CaSResult::ExpectationMismatch => {
388                debug!(
389                    "apply_unbatched_cmd {} {} lost the CaS race, retrying: {:?}",
390                    new_state.shard_id(),
391                    cmd_name,
392                    expected,
393                );
394                Ok((CaSResult::ExpectationMismatch, new))
395            }
396        }
397    }
398
399    /// Fetches the `current` state of the requested shard.
400    ///
401    /// Uses the provided hint (live_diffs), which is a possibly outdated
402    /// copy of all or recent live diffs, to avoid fetches where possible.
403    ///
404    /// Panics if called on an uninitialized shard.
405    pub async fn fetch_current_state<T>(
406        &self,
407        shard_id: &ShardId,
408        mut live_diffs: Vec<VersionedData>,
409    ) -> UntypedState<T>
410    where
411        T: Timestamp + Lattice + Codec64,
412    {
413        let retry = self
414            .metrics
415            .retries
416            .fetch_latest_state
417            .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
418        loop {
419            let latest_diff = live_diffs
420                .last()
421                .expect("initialized shard should have at least one diff");
422            let latest_diff = self
423                .metrics
424                .codecs
425                .state_diff
426                // Note: `latest_diff.data` is a `Bytes`, so cloning just increments a ref count
427                .decode(|| {
428                    StateDiff::<T>::decode(&self.cfg.build_version, latest_diff.data.clone())
429                });
430            let mut state = match self
431                .fetch_rollup_at_key(shard_id, &latest_diff.latest_rollup_key)
432                .await
433            {
434                Some(x) => x,
435                None => {
436                    // The rollup that this diff referenced is gone, so the diff
437                    // must be out of date. Try again. Intentionally don't sleep on retry.
438                    retry.retries.inc();
439                    let earliest_before_refetch = live_diffs
440                        .first()
441                        .expect("initialized shard should have at least one diff")
442                        .seqno;
443                    live_diffs = self.fetch_recent_live_diffs::<T>(shard_id).await.0;
444
445                    // We should only hit the race condition that leads to a
446                    // refetch if the set of live diffs changed out from under
447                    // us.
448                    //
449                    // TODO: Make this an assert once we're 100% sure the above
450                    // is always true.
451                    let earliest_after_refetch = live_diffs
452                        .first()
453                        .expect("initialized shard should have at least one diff")
454                        .seqno;
455                    if earliest_before_refetch >= earliest_after_refetch {
456                        warn!(
457                            concat!(
458                                "fetch_current_state refetch expects earliest live diff to advance: {} vs {}. ",
459                                "In dev and testing, this happens when persist's Blob (files in mzdata) ",
460                                "is deleted out from under it or when two processes are talking to ",
461                                "different Blobs (e.g. docker containers without it shared)."
462                            ),
463                            earliest_before_refetch, earliest_after_refetch
464                        )
465                    }
466                    continue;
467                }
468            };
469
470            state.apply_encoded_diffs(&self.cfg, &self.metrics, &live_diffs);
471            return state;
472        }
473    }
474
475    /// Returns an iterator over all live states for the requested shard.
476    ///
477    /// Returns None if called on an uninitialized shard.
478    pub async fn fetch_all_live_states<T>(
479        &self,
480        shard_id: ShardId,
481    ) -> Option<UntypedStateVersionsIter<T>>
482    where
483        T: Timestamp + Lattice + Codec64,
484    {
485        let retry = self
486            .metrics
487            .retries
488            .fetch_live_states
489            .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
490        let mut all_live_diffs = self.fetch_all_live_diffs(&shard_id).await;
491        loop {
492            let earliest_live_diff = match all_live_diffs.0.first() {
493                Some(x) => x,
494                None => return None,
495            };
496            let state = match self
497                .fetch_rollup_at_seqno(
498                    &shard_id,
499                    all_live_diffs.0.clone(),
500                    earliest_live_diff.seqno,
501                )
502                .await
503            {
504                Some(x) => x,
505                None => {
506                    // We maintain an invariant that a rollup always exists for
507                    // the earliest live diff. Since we didn't find out, that
508                    // can only mean that the live_diffs we just fetched are
509                    // obsolete (there's a race condition with gc). This should
510                    // be rare in practice, so inc a counter and try again.
511                    // Intentionally don't sleep on retry.
512                    retry.retries.inc();
513                    let earliest_before_refetch = earliest_live_diff.seqno;
514                    all_live_diffs = self.fetch_all_live_diffs(&shard_id).await;
515
516                    // We should only hit the race condition that leads to a
517                    // refetch if the set of live diffs changed out from under
518                    // us.
519                    //
520                    // TODO: Make this an assert once we're 100% sure the above
521                    // is always true.
522                    let earliest_after_refetch = all_live_diffs
523                        .0
524                        .first()
525                        .expect("initialized shard should have at least one diff")
526                        .seqno;
527                    if earliest_before_refetch >= earliest_after_refetch {
528                        warn!(
529                            concat!(
530                                "fetch_all_live_states refetch expects earliest live diff to advance: {} vs {}. ",
531                                "In dev and testing, this happens when persist's Blob (files in mzdata) ",
532                                "is deleted out from under it or when two processes are talking to ",
533                                "different Blobs (e.g. docker containers without it shared)."
534                            ),
535                            earliest_before_refetch, earliest_after_refetch
536                        )
537                    }
538                    continue;
539                }
540            };
541            assert_eq!(earliest_live_diff.seqno, state.seqno());
542            return Some(UntypedStateVersionsIter {
543                shard_id,
544                cfg: self.cfg.clone(),
545                metrics: Arc::clone(&self.metrics),
546                state,
547                diffs: all_live_diffs.0,
548            });
549        }
550    }
551
552    /// Fetches all live_diffs for a shard. Intended only for when a caller needs to reconstruct
553    /// _all_ states still referenced by Consensus. Prefer [Self::fetch_recent_live_diffs] when
554    /// the caller simply needs to fetch the latest state.
555    ///
556    /// Returns an empty Vec iff called on an uninitialized shard.
557    pub async fn fetch_all_live_diffs(&self, shard_id: &ShardId) -> AllLiveDiffs {
558        let path = shard_id.to_string();
559        let diffs = retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
560            self.consensus.scan(&path, SeqNo::minimum(), SCAN_ALL).await
561        })
562        .instrument(debug_span!("fetch_state::scan"))
563        .await;
564        AllLiveDiffs(diffs)
565    }
566
567    /// Fetches recent live_diffs for a shard. Intended for when a caller needs to fetch
568    /// the latest state in Consensus.
569    ///
570    /// "Recent" is defined as either:
571    /// * All of the diffs known in Consensus
572    /// * All of the diffs in Consensus after the latest rollup
573    pub async fn fetch_recent_live_diffs<T>(&self, shard_id: &ShardId) -> RecentLiveDiffs
574    where
575        T: Timestamp + Lattice + Codec64,
576    {
577        let path = shard_id.to_string();
578        let scan_limit = STATE_VERSIONS_RECENT_LIVE_DIFFS_LIMIT.get(&self.cfg);
579        let oldest_diffs =
580            retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
581                self.consensus
582                    .scan(&path, SeqNo::minimum(), scan_limit)
583                    .await
584            })
585            .instrument(debug_span!("fetch_state::scan"))
586            .await;
587
588        // fast-path: we found all known diffs in a single page of our scan. we expect almost all
589        // calls to go down this path, unless a reader has a very long seqno-hold on the shard.
590        if oldest_diffs.len() < scan_limit {
591            self.metrics.state.fetch_recent_live_diffs_fast_path.inc();
592            return RecentLiveDiffs(oldest_diffs);
593        }
594
595        // slow-path: we could be arbitrarily far behind the head of Consensus (either intentionally
596        // due to a long seqno-hold from a reader, or unintentionally from a bug that's preventing
597        // a seqno-hold from advancing). rather than scanning a potentially unbounded number of old
598        // states in Consensus, we jump to the latest state, determine the seqno of the most recent
599        // rollup, and then fetch all the diffs from that point onward.
600        //
601        // this approach requires more network calls, but it should smooth out our access pattern
602        // and use only bounded calls to Consensus. additionally, if `limit` is adequately tuned,
603        // this path will only be invoked when there's an excess number of states in Consensus and
604        // it might be slower to do a single long scan over unneeded rows.
605        let head = retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
606            self.consensus.head(&path).await
607        })
608        .instrument(debug_span!("fetch_state::slow_path::head"))
609        .await
610        .expect("initialized shard should have at least 1 diff");
611
612        let latest_diff = self
613            .metrics
614            .codecs
615            .state_diff
616            .decode(|| StateDiff::<T>::decode(&self.cfg.build_version, head.data));
617
618        match BlobKey::parse_ids(&latest_diff.latest_rollup_key.complete(shard_id)) {
619            Ok((_shard_id, PartialBlobKey::Rollup(seqno, _rollup))) => {
620                self.metrics.state.fetch_recent_live_diffs_slow_path.inc();
621                let diffs =
622                    retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
623                        // (pedantry) this call is technically unbounded, but something very strange
624                        // would have had to happen to have accumulated so many states between our
625                        // call to `head` and this invocation for it to become problematic
626                        self.consensus.scan(&path, seqno, SCAN_ALL).await
627                    })
628                    .instrument(debug_span!("fetch_state::slow_path::scan"))
629                    .await;
630                RecentLiveDiffs(diffs)
631            }
632            Ok(_) => panic!(
633                "invalid state diff rollup key: {}",
634                latest_diff.latest_rollup_key
635            ),
636            Err(err) => panic!("unparseable state diff rollup key: {}", err),
637        }
638    }
639
640    /// Fetches all live diffs greater than the given SeqNo.
641    ///
642    /// TODO: Apply a limit to this scan. This could additionally be used as an internal
643    /// call within `fetch_recent_live_diffs`.
644    pub async fn fetch_all_live_diffs_gt_seqno<K, V, T, D>(
645        &self,
646        shard_id: &ShardId,
647        seqno: SeqNo,
648    ) -> Vec<VersionedData> {
649        let path = shard_id.to_string();
650        retry_external(&self.metrics.retries.external.fetch_state_scan, || async {
651            self.consensus.scan(&path, seqno.next(), SCAN_ALL).await
652        })
653        .instrument(debug_span!("fetch_state::scan"))
654        .await
655    }
656
657    /// Truncates any diffs in consensus less than the given seqno.
658    pub async fn truncate_diffs(&self, shard_id: &ShardId, seqno: SeqNo) {
659        let path = shard_id.to_string();
660        let _deleted_count = retry_external(&self.metrics.retries.external.gc_truncate, || async {
661            self.consensus.truncate(&path, seqno).await
662        })
663        .instrument(debug_span!("gc::truncate"))
664        .await;
665    }
666
667    // Writes a self-referential rollup to blob storage and returns the diff
668    // that should be compare_and_set into consensus to finish initializing the
669    // shard.
670    async fn write_initial_rollup<K, V, T, D>(
671        &self,
672        shard_metrics: &ShardMetrics,
673    ) -> (TypedState<K, V, T, D>, StateDiff<T>)
674    where
675        K: Debug + Codec,
676        V: Debug + Codec,
677        T: Timestamp + Lattice + Codec64,
678        D: Monoid + Codec64,
679    {
680        let empty_state = TypedState::new(
681            self.cfg.build_version.clone(),
682            shard_metrics.shard_id,
683            self.cfg.hostname.clone(),
684            (self.cfg.now)(),
685        );
686        let rollup_seqno = empty_state.seqno.next();
687        let rollup = HollowRollup {
688            key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
689            // Chicken-and-egg problem here. We don't know the size of the
690            // rollup until we encode it, but it includes a reference back to
691            // itself.
692            encoded_size_bytes: None,
693        };
694        let (applied, initial_state) = match empty_state
695            .clone_apply(&self.cfg, &mut |_, _, state| {
696                state.add_rollup((rollup_seqno, &rollup))
697            }) {
698            Continue(x) => x,
699            Break(NoOpStateTransition(_)) => {
700                panic!("initial state transition should not be a no-op")
701            }
702        };
703        assert!(
704            applied,
705            "add_and_remove_rollups should apply to the empty state"
706        );
707
708        let rollup = self.encode_rollup_blob(
709            shard_metrics,
710            initial_state.clone_for_rollup(),
711            vec![],
712            rollup.key,
713        );
714        let () = self.write_rollup_blob(&rollup).await;
715        assert_eq!(initial_state.seqno, rollup.seqno);
716
717        let diff = StateDiff::from_diff(&empty_state.state, &initial_state.state);
718        (initial_state, diff)
719    }
720
721    pub async fn write_rollup_for_state<K, V, T, D>(
722        &self,
723        shard_metrics: &ShardMetrics,
724        state: TypedState<K, V, T, D>,
725        rollup_id: &RollupId,
726    ) -> Option<EncodedRollup>
727    where
728        K: Debug + Codec,
729        V: Debug + Codec,
730        T: Timestamp + Lattice + Codec64,
731        D: Monoid + Codec64,
732    {
733        let (latest_rollup_seqno, _rollup) = state.latest_rollup();
734        let seqno = state.seqno();
735
736        // TODO: maintain the diffs since the latest rollup in-memory rather than
737        // needing an additional API call here. This would reduce Consensus load
738        // / avoid races with Consensus truncation, but is trickier to write.
739        let diffs: Vec<_> = self
740            .fetch_all_live_diffs_gt_seqno::<K, V, T, D>(&state.shard_id, *latest_rollup_seqno)
741            .await;
742
743        match diffs.first() {
744            None => {
745                // early-out because these are no diffs past our latest rollup.
746                //
747                // this should only occur in the initial state, but we can write a more
748                // general assertion: if no live diffs exist past this state's latest
749                // known rollup, then that rollup must be for the latest known state.
750                self.metrics.state.rollup_write_noop_latest.inc();
751                assert_eq!(seqno, *latest_rollup_seqno);
752                return None;
753            }
754            Some(first) => {
755                // early-out if it is no longer possible to inline all the diffs from
756                // the last known rollup to the current state. some or all of the diffs
757                // have already been truncated by another process.
758                //
759                // this can happen if one process gets told to write a rollup, the
760                // maintenance task falls arbitrarily behind, and another process writes
761                // a new rollup / GCs and truncates past the first process's rollup.
762                self.metrics.state.rollup_write_noop_truncated.inc();
763                if first.seqno != latest_rollup_seqno.next() {
764                    assert!(
765                        first.seqno > latest_rollup_seqno.next(),
766                        "diff: {}, rollup: {}",
767                        first.seqno,
768                        latest_rollup_seqno,
769                    );
770                    return None;
771                }
772            }
773        }
774
775        // we may have fetched more diffs than we need: trim anything beyond the state's seqno
776        let diffs: Vec<_> = diffs.into_iter().filter(|x| x.seqno <= seqno).collect();
777
778        // verify that we've done all the filtering correctly and that our
779        // diffs have seqnos bounded by (last_rollup, current_state]
780        assert_eq!(
781            diffs.first().map(|x| x.seqno),
782            Some(latest_rollup_seqno.next())
783        );
784        assert_eq!(diffs.last().map(|x| x.seqno), Some(state.seqno));
785
786        let key = PartialRollupKey::new(state.seqno, rollup_id);
787        let rollup = self.encode_rollup_blob(shard_metrics, state, diffs, key);
788        let () = self.write_rollup_blob(&rollup).await;
789
790        self.metrics.state.rollup_write_success.inc();
791
792        Some(rollup)
793    }
794
795    /// Encodes the given state and diffs as a rollup to be written to the specified key.
796    ///
797    /// The diffs must span the seqno range `(state.last_rollup().seqno, state.seqno]`.
798    pub fn encode_rollup_blob<K, V, T, D>(
799        &self,
800        shard_metrics: &ShardMetrics,
801        state: TypedState<K, V, T, D>,
802        diffs: Vec<VersionedData>,
803        key: PartialRollupKey,
804    ) -> EncodedRollup
805    where
806        K: Debug + Codec,
807        V: Debug + Codec,
808        T: Timestamp + Lattice + Codec64,
809        D: Monoid + Codec64,
810    {
811        let shard_id = state.shard_id;
812        let rollup_seqno = state.seqno;
813
814        let rollup = Rollup::from(state.into(), diffs);
815        let desc = rollup.diffs.as_ref().expect("inlined diffs").description();
816
817        let buf = self.metrics.codecs.state.encode(|| {
818            let mut buf = Vec::new();
819            rollup
820                .into_proto()
821                .encode(&mut buf)
822                .expect("no required fields means no initialization errors");
823            Bytes::from(buf)
824        });
825        shard_metrics
826            .latest_rollup_size
827            .set(u64::cast_from(buf.len()));
828        EncodedRollup {
829            shard_id,
830            seqno: rollup_seqno,
831            key,
832            buf,
833            _desc: desc,
834        }
835    }
836
837    /// Writes the given state rollup out to blob.
838    pub async fn write_rollup_blob(&self, rollup: &EncodedRollup) {
839        let payload_len = rollup.buf.len();
840        retry_external(&self.metrics.retries.external.rollup_set, || async {
841            self.blob
842                .set(
843                    &rollup.key.complete(&rollup.shard_id),
844                    Bytes::clone(&rollup.buf),
845                )
846                .await
847        })
848        .instrument(debug_span!("rollup::set", payload_len))
849        .await;
850    }
851
852    /// Fetches a rollup for the given SeqNo, if it exists.
853    ///
854    /// Uses the provided hint, which is a possibly outdated copy of all
855    /// or recent live diffs, to avoid fetches where possible.
856    ///
857    /// Panics if called on an uninitialized shard.
858    async fn fetch_rollup_at_seqno<T>(
859        &self,
860        shard_id: &ShardId,
861        live_diffs: Vec<VersionedData>,
862        seqno: SeqNo,
863    ) -> Option<UntypedState<T>>
864    where
865        T: Timestamp + Lattice + Codec64,
866    {
867        let rollup_key_for_migration = live_diffs.iter().find_map(|x| {
868            let diff = self
869                .metrics
870                .codecs
871                .state_diff
872                // Note: `x.data` is a `Bytes`, so cloning just increments a ref count
873                .decode(|| StateDiff::<T>::decode(&self.cfg.build_version, x.data.clone()));
874            diff.rollups
875                .iter()
876                .find(|x| x.key == seqno)
877                .map(|x| match &x.val {
878                    StateFieldValDiff::Insert(x) => x.clone(),
879                    StateFieldValDiff::Update(_, x) => x.clone(),
880                    StateFieldValDiff::Delete(x) => x.clone(),
881                })
882        });
883
884        let state = self.fetch_current_state::<T>(shard_id, live_diffs).await;
885        if let Some(rollup) = state.rollups().get(&seqno) {
886            return self.fetch_rollup_at_key(shard_id, &rollup.key).await;
887        }
888
889        // MIGRATION: We maintain an invariant that the _current state_ contains
890        // a rollup for the _earliest live diff_ in consensus (and that the
891        // referenced rollup exists). At one point, we fixed a bug that could
892        // lead to that invariant being violated.
893        //
894        // If the earliest live diff is X and we receive a gc req for X+Y to
895        // X+Y+Z (this can happen e.g. if some cmd ignores an earlier req for X
896        // to X+Y, or if they're processing concurrently and the X to X+Y req
897        // loses the race), then the buggy version of gc would delete any
898        // rollups strictly less than old_seqno_since (X+Y in this example). But
899        // our invariant is that the rollup exists for the earliest live diff,
900        // in this case X. So if the first call to gc was interrupted after this
901        // but before truncate (when all the blob deletes happen), later calls
902        // to gc would attempt to call `fetch_live_states` and end up infinitely
903        // in its loop.
904        //
905        // The fix was to base which rollups are deleteable on the earliest live
906        // diff, not old_seqno_since.
907        //
908        // Sadly, some envs in prod now violate this invariant. So, even with
909        // the fix, existing shards will never successfully run gc. We add a
910        // temporary migration to fix them in `fetch_rollup_at_seqno`. This
911        // method normally looks in the latest version of state for the
912        // specifically requested seqno. In the invariant violation case, some
913        // version of state in the range `[earliest, current]` has a rollup for
914        // earliest, but current doesn't. So, for the migration, if
915        // fetch_rollup_at_seqno doesn't find a rollup in current, then we fall
916        // back to sniffing one out of raw diffs. If this success, we increment
917        // a counter and log, so we can track how often this migration is
918        // bailing us out. After the next deploy, this should initially start at
919        // > 0 and then settle down to 0. After the next prod envs wipe, we can
920        // remove the migration.
921        let rollup = rollup_key_for_migration.expect("someone should have a key for this rollup");
922        tracing::info!("only found rollup for {} {} via migration", shard_id, seqno);
923        self.metrics.state.rollup_at_seqno_migration.inc();
924        self.fetch_rollup_at_key(shard_id, &rollup.key).await
925    }
926
927    /// Fetches the rollup at the given key, if it exists.
928    async fn fetch_rollup_at_key<T>(
929        &self,
930        shard_id: &ShardId,
931        rollup_key: &PartialRollupKey,
932    ) -> Option<UntypedState<T>>
933    where
934        T: Timestamp + Lattice + Codec64,
935    {
936        retry_external(&self.metrics.retries.external.rollup_get, || async {
937            self.blob.get(&rollup_key.complete(shard_id)).await
938        })
939        .instrument(debug_span!("rollup::get"))
940        .await
941        .map(|buf| {
942            self.metrics
943                .codecs
944                .state
945                .decode(|| UntypedState::decode(&self.cfg.build_version, buf))
946        })
947    }
948
949    /// Deletes the rollup at the given key, if it exists.
950    pub async fn delete_rollup(&self, shard_id: &ShardId, key: &PartialRollupKey) {
951        let _ = retry_external(&self.metrics.retries.external.rollup_delete, || async {
952            self.blob.delete(&key.complete(shard_id)).await
953        })
954        .await
955        .instrument(debug_span!("rollup::delete"));
956    }
957}
958
959pub struct UntypedStateVersionsIter<T> {
960    shard_id: ShardId,
961    cfg: PersistConfig,
962    metrics: Arc<Metrics>,
963    state: UntypedState<T>,
964    diffs: Vec<VersionedData>,
965}
966
967impl<T: Timestamp + Lattice + Codec64> UntypedStateVersionsIter<T> {
968    pub(crate) fn check_ts_codec(self) -> Result<StateVersionsIter<T>, CodecMismatchT> {
969        let key_codec = self.state.key_codec.clone();
970        let val_codec = self.state.val_codec.clone();
971        let diff_codec = self.state.diff_codec.clone();
972        let state = self.state.check_ts_codec(&self.shard_id)?;
973        Ok(StateVersionsIter::new(
974            self.cfg,
975            self.metrics,
976            state,
977            self.diffs,
978            key_codec,
979            val_codec,
980            diff_codec,
981        ))
982    }
983}
984
985/// An iterator over consecutive versions of [State].
986pub struct StateVersionsIter<T> {
987    cfg: PersistConfig,
988    metrics: Arc<Metrics>,
989    state: State<T>,
990    diffs: Vec<VersionedData>,
991    key_codec: String,
992    val_codec: String,
993    diff_codec: String,
994    #[cfg(debug_assertions)]
995    validator: ReferencedBlobValidator<T>,
996}
997
998impl<T: Timestamp + Lattice + Codec64> StateVersionsIter<T> {
999    fn new(
1000        cfg: PersistConfig,
1001        metrics: Arc<Metrics>,
1002        state: State<T>,
1003        // diffs is stored reversed so we can efficiently pop off the Vec.
1004        mut diffs: Vec<VersionedData>,
1005        key_codec: String,
1006        val_codec: String,
1007        diff_codec: String,
1008    ) -> Self {
1009        assert!(diffs.first().map_or(true, |x| x.seqno == state.seqno));
1010        diffs.reverse();
1011        StateVersionsIter {
1012            cfg,
1013            metrics,
1014            state,
1015            diffs,
1016            key_codec,
1017            val_codec,
1018            diff_codec,
1019            #[cfg(debug_assertions)]
1020            validator: ReferencedBlobValidator::default(),
1021        }
1022    }
1023
1024    pub fn len(&self) -> usize {
1025        self.diffs.len()
1026    }
1027
1028    /// Advances first to some starting state (in practice, usually the first
1029    /// live state), and then through each successive state, for as many diffs
1030    /// as this iterator was initialized with.
1031    ///
1032    /// The `inspect_diff_fn` callback can be used to inspect diffs directly as
1033    /// they are applied. The first call to `next` returns a
1034    /// [InspectDiff::FromInitial] representing a diff from the initial state.
1035    pub fn next<F: for<'a> FnMut(InspectDiff<'a, T>)>(
1036        &mut self,
1037        mut inspect_diff_fn: F,
1038    ) -> Option<&State<T>> {
1039        let diff = match self.diffs.pop() {
1040            Some(x) => x,
1041            None => return None,
1042        };
1043        let data = diff.data.clone();
1044        let diff = self
1045            .metrics
1046            .codecs
1047            .state_diff
1048            .decode(|| StateDiff::decode(&self.cfg.build_version, diff.data));
1049
1050        // A bit hacky, but the first diff in StateVersionsIter is always a
1051        // no-op.
1052        if diff.seqno_to == self.state.seqno {
1053            let inspect = InspectDiff::FromInitial(&self.state);
1054            #[cfg(debug_assertions)]
1055            {
1056                inspect
1057                    .referenced_blobs()
1058                    .for_each(|x| self.validator.add_inc_blob(x));
1059            }
1060            inspect_diff_fn(inspect);
1061        } else {
1062            let inspect = InspectDiff::Diff(&diff);
1063            #[cfg(debug_assertions)]
1064            {
1065                inspect
1066                    .referenced_blobs()
1067                    .for_each(|x| self.validator.add_inc_blob(x));
1068            }
1069            inspect_diff_fn(inspect);
1070        }
1071
1072        let diff_seqno_to = diff.seqno_to;
1073        self.state
1074            .apply_diffs(&self.metrics, std::iter::once((diff, data)));
1075        assert_eq!(self.state.seqno, diff_seqno_to);
1076        #[cfg(debug_assertions)]
1077        {
1078            self.validator.validate_against_state(&self.state);
1079        }
1080        Some(&self.state)
1081    }
1082
1083    pub fn state(&self) -> &State<T> {
1084        &self.state
1085    }
1086
1087    pub fn into_rollup_proto_without_diffs(&self) -> impl serde::Serialize + use<T> {
1088        Rollup::from_state_without_diffs(
1089            State {
1090                shard_id: self.state.shard_id.clone(),
1091                seqno: self.state.seqno.clone(),
1092                walltime_ms: self.state.walltime_ms.clone(),
1093                hostname: self.state.hostname.clone(),
1094                collections: self.state.collections.clone(),
1095            },
1096            self.key_codec.clone(),
1097            self.val_codec.clone(),
1098            T::codec_name(),
1099            self.diff_codec.clone(),
1100        )
1101        .into_proto()
1102    }
1103}
1104
1105/// This represents a diff, either directly or, in the case of the FromInitial
1106/// variant, a diff from the initial state. (We could instead compute the diff
1107/// from the initial state and replace this with only a `StateDiff<T>`, but don't
1108/// for efficiency.)
1109#[derive(Debug)]
1110pub enum InspectDiff<'a, T> {
1111    FromInitial(&'a State<T>),
1112    Diff(&'a StateDiff<T>),
1113}
1114
1115impl<T: Timestamp + Lattice + Codec64> InspectDiff<'_, T> {
1116    /// A callback invoked for each blob added this state transition.
1117    ///
1118    /// Blob removals, along with all other diffs, are ignored.
1119    pub fn referenced_blobs(&self) -> impl Iterator<Item = HollowBlobRef<'_, T>> {
1120        let (state, diff) = match self {
1121            InspectDiff::FromInitial(x) => (Some(x), None),
1122            InspectDiff::Diff(x) => (None, Some(x)),
1123        };
1124        let state_blobs = state.into_iter().flat_map(|s| s.blobs());
1125        let diff_blobs = diff.into_iter().flat_map(|d| d.blob_inserts());
1126        state_blobs.chain(diff_blobs)
1127    }
1128}
1129
1130#[cfg(debug_assertions)]
1131struct ReferencedBlobValidator<T> {
1132    // A copy of every batch and rollup referenced by some state iterator,
1133    // computed by scanning the full copy of state at each seqno.
1134    full_batches: BTreeSet<HollowBatch<T>>,
1135    full_rollups: BTreeSet<HollowRollup>,
1136    // A copy of every batch and rollup referenced by some state iterator,
1137    // computed incrementally.
1138    inc_batches: BTreeSet<HollowBatch<T>>,
1139    inc_rollups: BTreeSet<HollowRollup>,
1140}
1141
1142#[cfg(debug_assertions)]
1143impl<T> Default for ReferencedBlobValidator<T> {
1144    fn default() -> Self {
1145        Self {
1146            full_batches: Default::default(),
1147            full_rollups: Default::default(),
1148            inc_batches: Default::default(),
1149            inc_rollups: Default::default(),
1150        }
1151    }
1152}
1153
1154#[cfg(debug_assertions)]
1155impl<T: Timestamp + Lattice + Codec64> ReferencedBlobValidator<T> {
1156    fn add_inc_blob(&mut self, x: HollowBlobRef<'_, T>) {
1157        match x {
1158            HollowBlobRef::Batch(x) => assert!(
1159                self.inc_batches.insert(x.clone()) || x.desc.lower() == x.desc.upper(),
1160                "non-empty batches should only be appended once; duplicate: {x:?}"
1161            ),
1162            HollowBlobRef::Rollup(x) => assert!(self.inc_rollups.insert(x.clone())),
1163        }
1164    }
1165    fn validate_against_state(&mut self, x: &State<T>) {
1166        use std::hash::{DefaultHasher, Hash, Hasher};
1167
1168        use mz_ore::collections::HashSet;
1169        use timely::progress::Antichain;
1170
1171        use crate::internal::state::BatchPart;
1172
1173        x.blobs().for_each(|x| match x {
1174            HollowBlobRef::Batch(x) => {
1175                self.full_batches.insert(x.clone());
1176            }
1177            HollowBlobRef::Rollup(x) => {
1178                self.full_rollups.insert(x.clone());
1179            }
1180        });
1181
1182        // Check that the sets of batches overall cover the same pTVC.
1183        // Partial ordering means we can't just take the first and last batches; instead compute
1184        // bounds using the lattice operations.
1185        fn overall_desc<'a, T: Timestamp + Lattice>(
1186            iter: impl Iterator<Item = &'a Description<T>>,
1187        ) -> (Antichain<T>, Antichain<T>) {
1188            let mut lower = Antichain::new();
1189            let mut upper = Antichain::from_elem(T::minimum());
1190            for desc in iter {
1191                lower.meet_assign(desc.lower());
1192                upper.join_assign(desc.upper());
1193            }
1194            (lower, upper)
1195        }
1196        let (inc_lower, inc_upper) = overall_desc(self.inc_batches.iter().map(|a| &a.desc));
1197        let (full_lower, full_upper) = overall_desc(self.full_batches.iter().map(|a| &a.desc));
1198        assert_eq!(inc_lower, full_lower);
1199        assert_eq!(inc_upper, full_upper);
1200
1201        fn part_unique<T: Codec64>(x: &RunPart<T>) -> String {
1202            match x {
1203                RunPart::Single(BatchPart::Inline {
1204                    updates,
1205                    ts_rewrite,
1206                    ..
1207                }) => {
1208                    let mut h = DefaultHasher::new();
1209                    updates.hash(&mut h);
1210                    if let Some(frontier) = &ts_rewrite {
1211                        h.write_usize(frontier.len());
1212                        frontier.iter().for_each(|t| t.encode().hash(&mut h));
1213                    }
1214                    h.finish().to_string()
1215                }
1216                other => other.printable_name().to_string(),
1217            }
1218        }
1219
1220        // Check that the overall set of parts contained in both representations is the same.
1221        let inc_parts: HashSet<_> = self
1222            .inc_batches
1223            .iter()
1224            .flat_map(|x| x.parts.iter())
1225            .map(part_unique)
1226            .collect();
1227        let full_parts = self
1228            .full_batches
1229            .iter()
1230            .flat_map(|x| x.parts.iter())
1231            .map(part_unique)
1232            .collect();
1233        assert_eq!(inc_parts, full_parts);
1234
1235        // Check that both representations have the same rollups.
1236        assert_eq!(self.inc_rollups, self.full_rollups);
1237    }
1238}
1239
1240#[cfg(test)]
1241mod tests {
1242    use mz_dyncfg::ConfigUpdates;
1243
1244    use crate::tests::new_test_client;
1245
1246    use super::*;
1247
1248    /// Regression test for (part of) database-issues#5170, where an interrupted
1249    /// `bin/environmentd --reset` resulted in panic in persist usage code.
1250    #[mz_persist_proc::test(tokio::test)]
1251    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1252    async fn fetch_all_live_states_regression_uninitialized(dyncfgs: ConfigUpdates) {
1253        let client = new_test_client(&dyncfgs).await;
1254        let state_versions = StateVersions::new(
1255            client.cfg.clone(),
1256            Arc::clone(&client.consensus),
1257            Arc::clone(&client.blob),
1258            Arc::clone(&client.metrics),
1259        );
1260        assert!(
1261            state_versions
1262                .fetch_all_live_states::<u64>(ShardId::new())
1263                .await
1264                .is_none()
1265        );
1266    }
1267}