Skip to main content

mz_catalog/durable/
persist.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10#[cfg(test)]
11mod tests;
12
13use std::cmp::max;
14use std::collections::{BTreeMap, VecDeque};
15use std::fmt::Debug;
16use std::str::FromStr;
17use std::sync::{Arc, LazyLock};
18use std::time::{Duration, Instant};
19
20use async_trait::async_trait;
21use differential_dataflow::lattice::Lattice;
22use futures::{FutureExt, StreamExt};
23use itertools::Itertools;
24use mz_audit_log::VersionedEvent;
25use mz_ore::metrics::MetricsFutureExt;
26use mz_ore::now::EpochMillis;
27use mz_ore::{
28    soft_assert_eq_no_log, soft_assert_eq_or_log, soft_assert_ne_or_log, soft_assert_no_log,
29    soft_assert_or_log, soft_panic_or_log,
30};
31use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
32use mz_persist_client::cli::admin::{CATALOG_FORCE_COMPACTION_FUEL, CATALOG_FORCE_COMPACTION_WAIT};
33use mz_persist_client::critical::{CriticalReaderId, Opaque, SinceHandle};
34use mz_persist_client::error::UpperMismatch;
35use mz_persist_client::read::{Listen, ListenEvent, ReadHandle};
36use mz_persist_client::write::WriteHandle;
37use mz_persist_client::{Diagnostics, PersistClient, ShardId};
38use mz_persist_types::codec_impls::UnitSchema;
39use mz_proto::{RustType, TryFromProtoError};
40use mz_repr::Diff;
41use mz_storage_client::controller::PersistEpoch;
42use mz_storage_types::StorageDiff;
43use mz_storage_types::sources::SourceData;
44use sha2::Digest;
45use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
46use tracing::{debug, info, warn};
47use uuid::Uuid;
48
49use crate::durable::debug::{Collection, CollectionType, DebugCatalogState, Trace};
50use crate::durable::error::FenceError;
51use crate::durable::initialize::{
52    ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, SYSTEM_CONFIG_SYNCED_KEY, USER_VERSION_KEY,
53    WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
54};
55use crate::durable::metrics::Metrics;
56use crate::durable::objects::state_update::{
57    IntoStateUpdateKindJson, StateUpdate, StateUpdateKind, StateUpdateKindJson,
58    TryIntoStateUpdateKind,
59};
60use crate::durable::objects::{AuditLogKey, FenceToken, Snapshot};
61use crate::durable::transaction::TransactionBatch;
62use crate::durable::upgrade::upgrade;
63use crate::durable::{
64    AuditLogIterator, BootstrapArgs, CATALOG_CONTENT_VERSION_KEY, CatalogError,
65    DurableCatalogError, DurableCatalogState, Epoch, OpenableDurableCatalogState,
66    ReadOnlyDurableCatalogState, Transaction, initialize, persist_desc,
67};
68use crate::memory;
69
70/// New-type used to represent timestamps in persist.
71pub(crate) type Timestamp = mz_repr::Timestamp;
72
73/// The minimum value of an epoch.
74///
75/// # Safety
76/// `new_unchecked` is safe to call with a non-zero value.
77const MIN_EPOCH: Epoch = unsafe { Epoch::new_unchecked(1) };
78
79/// Human readable catalog shard name.
80const CATALOG_SHARD_NAME: &str = "catalog";
81
82/// [`CriticalReaderId`] for the catalog shard's own since hold, separate from
83/// [`PersistClient::CONTROLLER_CRITICAL_SINCE`].
84static CATALOG_CRITICAL_SINCE: LazyLock<CriticalReaderId> = LazyLock::new(|| {
85    "c55555555-6666-7777-8888-999999999999"
86        .parse()
87        .expect("valid CriticalReaderId")
88});
89
90/// Seed used to generate the persist shard ID for the catalog.
91const CATALOG_SEED: usize = 1;
92/// Legacy seed used to generate the persist shard ID for the upgrade shard. DO NOT REUSE.
93const _UPGRADE_SEED: usize = 2;
94/// Legacy seed used to generate the persist shard ID for builtin table migrations. DO NOT REUSE.
95pub const _BUILTIN_MIGRATION_SEED: usize = 3;
96/// Legacy seed used to generate the persist shard ID for the expression cache. DO NOT REUSE.
97pub const _EXPRESSION_CACHE_SEED: usize = 4;
98
99/// Durable catalog mode that dictates the effect of mutable operations.
100#[derive(Debug, Copy, Clone, Eq, PartialEq)]
101pub(crate) enum Mode {
102    /// Mutable operations are prohibited.
103    Readonly,
104    /// Mutable operations have an effect in-memory, but aren't persisted durably.
105    Savepoint,
106    /// Mutable operations have an effect in-memory and durably.
107    Writable,
108}
109
110/// Enum representing the fenced state of the catalog.
111#[derive(Debug)]
112pub(crate) enum FenceableToken {
113    /// The catalog is still initializing and learning about previously written fence tokens. This
114    /// state can be fenced if it encounters a larger deploy generation.
115    Initializing {
116        /// The largest fence token durably written to the catalog, if any.
117        durable_token: Option<FenceToken>,
118        /// This process's deploy generation.
119        current_deploy_generation: Option<u64>,
120    },
121    /// The current token has not been fenced.
122    Unfenced { current_token: FenceToken },
123    /// The current token has been fenced.
124    Fenced {
125        current_token: FenceToken,
126        fence_token: FenceToken,
127    },
128}
129
130impl FenceableToken {
131    /// Returns a new token.
132    fn new(current_deploy_generation: Option<u64>) -> Self {
133        Self::Initializing {
134            durable_token: None,
135            current_deploy_generation,
136        }
137    }
138
139    /// Returns the current token if it is not fenced, otherwise returns an error.
140    fn validate(&self) -> Result<Option<FenceToken>, FenceError> {
141        match self {
142            FenceableToken::Initializing { durable_token, .. } => Ok(durable_token.clone()),
143            FenceableToken::Unfenced { current_token, .. } => Ok(Some(current_token.clone())),
144            FenceableToken::Fenced {
145                current_token,
146                fence_token,
147            } => {
148                assert!(
149                    fence_token > current_token,
150                    "must be fenced by higher token; current={current_token:?}, fence={fence_token:?}"
151                );
152                if fence_token.deploy_generation > current_token.deploy_generation {
153                    Err(FenceError::DeployGeneration {
154                        current_generation: current_token.deploy_generation,
155                        fence_generation: fence_token.deploy_generation,
156                    })
157                } else {
158                    assert!(
159                        fence_token.epoch > current_token.epoch,
160                        "must be fenced by higher token; current={current_token:?}, fence={fence_token:?}"
161                    );
162                    Err(FenceError::Epoch {
163                        current_epoch: current_token.epoch,
164                        fence_epoch: fence_token.epoch,
165                    })
166                }
167            }
168        }
169    }
170
171    /// Returns the current token.
172    fn token(&self) -> Option<FenceToken> {
173        match self {
174            FenceableToken::Initializing { durable_token, .. } => durable_token.clone(),
175            FenceableToken::Unfenced { current_token, .. } => Some(current_token.clone()),
176            FenceableToken::Fenced { current_token, .. } => Some(current_token.clone()),
177        }
178    }
179
180    /// Returns `Err` if `token` fences out `self`, `Ok` otherwise.
181    fn maybe_fence(&mut self, token: FenceToken) -> Result<(), FenceError> {
182        match self {
183            FenceableToken::Initializing {
184                durable_token,
185                current_deploy_generation,
186                ..
187            } => {
188                match durable_token {
189                    Some(durable_token) => {
190                        *durable_token = max(durable_token.clone(), token.clone());
191                    }
192                    None => {
193                        *durable_token = Some(token.clone());
194                    }
195                }
196                if let Some(current_deploy_generation) = current_deploy_generation {
197                    if *current_deploy_generation < token.deploy_generation {
198                        *self = FenceableToken::Fenced {
199                            current_token: FenceToken {
200                                deploy_generation: *current_deploy_generation,
201                                epoch: token.epoch,
202                            },
203                            fence_token: token,
204                        };
205                        self.validate()?;
206                    }
207                }
208            }
209            FenceableToken::Unfenced { current_token } => {
210                if *current_token < token {
211                    *self = FenceableToken::Fenced {
212                        current_token: current_token.clone(),
213                        fence_token: token,
214                    };
215                    self.validate()?;
216                }
217            }
218            FenceableToken::Fenced { .. } => {
219                self.validate()?;
220            }
221        }
222
223        Ok(())
224    }
225
226    /// Returns a [`FenceableToken::Unfenced`] token and the updates to the catalog required to
227    /// transition to the `Unfenced` state if `self` is [`FenceableToken::Initializing`], otherwise
228    /// returns `None`.
229    fn generate_unfenced_token(
230        &self,
231        mode: Mode,
232    ) -> Result<Option<(Vec<(StateUpdateKind, Diff)>, FenceableToken)>, DurableCatalogError> {
233        let (durable_token, current_deploy_generation) = match self {
234            FenceableToken::Initializing {
235                durable_token,
236                current_deploy_generation,
237            } => (durable_token.clone(), current_deploy_generation.clone()),
238            FenceableToken::Unfenced { .. } | FenceableToken::Fenced { .. } => return Ok(None),
239        };
240
241        let mut fence_updates = Vec::with_capacity(2);
242
243        if let Some(durable_token) = &durable_token {
244            fence_updates.push((
245                StateUpdateKind::FenceToken(durable_token.clone()),
246                Diff::MINUS_ONE,
247            ));
248        }
249
250        let current_deploy_generation = current_deploy_generation
251            .or_else(|| durable_token.as_ref().map(|token| token.deploy_generation))
252            // We cannot initialize a catalog without a deploy generation.
253            .ok_or(DurableCatalogError::Uninitialized)?;
254        let mut current_epoch = durable_token
255            .map(|token| token.epoch)
256            .unwrap_or(MIN_EPOCH)
257            .get();
258        // Only writable catalogs attempt to increment the epoch.
259        if matches!(mode, Mode::Writable) {
260            current_epoch = current_epoch + 1;
261        }
262        let current_epoch = Epoch::new(current_epoch).expect("known to be non-zero");
263        let current_token = FenceToken {
264            deploy_generation: current_deploy_generation,
265            epoch: current_epoch,
266        };
267
268        fence_updates.push((
269            StateUpdateKind::FenceToken(current_token.clone()),
270            Diff::ONE,
271        ));
272
273        let current_fenceable_token = FenceableToken::Unfenced { current_token };
274
275        Ok(Some((fence_updates, current_fenceable_token)))
276    }
277}
278
279/// An error that can occur while executing [`PersistHandle::compare_and_append`].
280#[derive(Debug, thiserror::Error)]
281pub(crate) enum CompareAndAppendError {
282    #[error(transparent)]
283    Fence(#[from] FenceError),
284    /// Catalog encountered an upper mismatch when trying to write to the catalog. This should only
285    /// happen while trying to fence out other catalogs.
286    #[error(
287        "expected catalog upper {expected_upper:?} did not match actual catalog upper {actual_upper:?}"
288    )]
289    UpperMismatch {
290        expected_upper: Timestamp,
291        actual_upper: Timestamp,
292    },
293}
294
295impl CompareAndAppendError {
296    pub(crate) fn unwrap_fence_error(self) -> FenceError {
297        match self {
298            CompareAndAppendError::Fence(e) => e,
299            e @ CompareAndAppendError::UpperMismatch { .. } => {
300                panic!("unexpected upper mismatch: {e:?}")
301            }
302        }
303    }
304}
305
306impl From<UpperMismatch<Timestamp>> for CompareAndAppendError {
307    fn from(upper_mismatch: UpperMismatch<Timestamp>) -> Self {
308        Self::UpperMismatch {
309            expected_upper: antichain_to_timestamp(upper_mismatch.expected),
310            actual_upper: antichain_to_timestamp(upper_mismatch.current),
311        }
312    }
313}
314
315pub(crate) trait ApplyUpdate<T: IntoStateUpdateKindJson> {
316    /// Process and apply `update`.
317    ///
318    /// Returns `Some` if `update` should be cached in memory and `None` otherwise.
319    fn apply_update(
320        &mut self,
321        update: StateUpdate<T>,
322        current_fence_token: &mut FenceableToken,
323        metrics: &Arc<Metrics>,
324    ) -> Result<Option<StateUpdate<T>>, FenceError>;
325}
326
327/// A handle for interacting with the persist catalog shard.
328///
329/// The catalog shard is used in multiple different contexts, for example pre-open and post-open,
330/// but for all contexts the majority of the durable catalog's behavior is identical. This struct
331/// implements those behaviors that are identical while allowing the user to specify the different
332/// behaviors via generic parameters.
333///
334/// The behavior of the durable catalog can be different along one of two axes. The first is the
335/// format of each individual update, i.e. raw binary, the current protobuf version, previous
336/// protobuf versions, etc. The second axis is what to do with each individual update, for example
337/// before opening we cache all config updates but don't cache them after opening. These behaviors
338/// are customizable via the `T: TryIntoStateUpdateKind` and `U: ApplyUpdate<T>` generic parameters
339/// respectively.
340#[derive(Debug)]
341pub(crate) struct PersistHandle<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> {
342    /// The [`Mode`] that this catalog was opened in.
343    pub(crate) mode: Mode,
344    /// Since handle to control compaction.
345    since_handle: SinceHandle<SourceData, (), Timestamp, StorageDiff>,
346    /// Write handle to persist.
347    write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
348    /// Listener to catalog changes.
349    listen: Listen<SourceData, (), Timestamp, StorageDiff>,
350    /// Handle for connecting to persist.
351    persist_client: PersistClient,
352    /// Catalog shard ID.
353    shard_id: ShardId,
354    /// Cache of the most recent catalog snapshot.
355    ///
356    /// We use a tuple instead of [`StateUpdate`] to make consolidation easier.
357    pub(crate) snapshot: Vec<(T, Timestamp, Diff)>,
358    /// Applies custom processing, filtering, and fencing for each individual update.
359    update_applier: U,
360    /// The current upper of the persist shard.
361    pub(crate) upper: Timestamp,
362    /// The fence token of the catalog, if one exists.
363    fenceable_token: FenceableToken,
364    /// The semantic version of the current binary.
365    catalog_content_version: semver::Version,
366    /// Flag to indicate if bootstrap is complete.
367    bootstrap_complete: bool,
368    /// Metrics for the persist catalog.
369    metrics: Arc<Metrics>,
370}
371
372impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
373    /// Fetch the current upper of the catalog state.
374    #[mz_ore::instrument]
375    async fn current_upper(&mut self) -> Timestamp {
376        match self.mode {
377            Mode::Writable | Mode::Readonly => {
378                let upper = self.write_handle.fetch_recent_upper().await;
379                antichain_to_timestamp(upper.clone())
380            }
381            Mode::Savepoint => self.upper,
382        }
383    }
384
385    /// Appends `updates` iff the current global upper of the catalog is `self.upper`.
386    ///
387    /// Returns the next upper used to commit the transaction.
388    #[mz_ore::instrument]
389    pub(crate) async fn compare_and_append<S: IntoStateUpdateKindJson>(
390        &mut self,
391        updates: Vec<(S, Diff)>,
392        commit_ts: Timestamp,
393    ) -> Result<Timestamp, CompareAndAppendError> {
394        // The fencing check is expensive, so run it only with soft assertions enabled.
395        let contains_fence = if mz_ore::assert::soft_assertions_enabled() {
396            let parsed_updates: Vec<_> = updates
397                .clone()
398                .into_iter()
399                .filter_map(|(update, diff)| {
400                    let update: StateUpdateKindJson = update.into();
401                    let update = TryIntoStateUpdateKind::try_into(update).ok()?;
402                    Some((update, diff))
403                })
404                .collect();
405            let contains_retraction = parsed_updates.iter().any(|(update, diff)| {
406                matches!(update, StateUpdateKind::FenceToken(..)) && *diff == Diff::MINUS_ONE
407            });
408            let contains_addition = parsed_updates.iter().any(|(update, diff)| {
409                matches!(update, StateUpdateKind::FenceToken(..)) && *diff == Diff::ONE
410            });
411            Some(contains_retraction && contains_addition)
412        } else {
413            None
414        };
415
416        let updates = updates.into_iter().map(|(kind, diff)| {
417            let kind: StateUpdateKindJson = kind.into();
418            (
419                (Into::<SourceData>::into(kind), ()),
420                commit_ts,
421                diff.into_inner(),
422            )
423        });
424        let next_upper = commit_ts.step_forward();
425        self.compare_and_append_inner(updates, next_upper)
426            .await
427            .inspect_err(|e| {
428                // A compare-and-append failure means someone else must have written to the
429                // catalog. We expect to have been fenced out, since writing to the catalog without
430                // fencing other catalogs should be impossible. The one exception is if we are
431                // trying to fence other catalogs with this write.
432                if let Some(contains_fence) = contains_fence {
433                    soft_assert_or_log!(
434                        matches!(e, CompareAndAppendError::Fence(_)) || contains_fence,
435                        "encountered an upper mismatch on a non-fencing write"
436                    );
437                }
438            })?;
439
440        self.sync(next_upper).await?;
441        Ok(next_upper)
442    }
443
444    /// Compare-and-append `updates` to the catalog shard, advancing the upper to `next_upper`.
445    ///
446    /// On success, updating `self.upper` is left to the caller. The caller can thus decide whether
447    /// or not it needs to sync the catalog.
448    ///
449    /// # Panics
450    ///
451    /// Panics if not in `Writable` mode.
452    /// Panics if `next_upper` is not greater than `self.upper`.
453    async fn compare_and_append_inner(
454        &mut self,
455        updates: impl IntoIterator<Item = ((SourceData, ()), Timestamp, StorageDiff)>,
456        next_upper: Timestamp,
457    ) -> Result<(), CompareAndAppendError> {
458        assert_eq!(self.mode, Mode::Writable);
459        assert!(
460            next_upper > self.upper,
461            "next_upper ({next_upper}) not greater than current upper ({})",
462            self.upper,
463        );
464
465        let res = self
466            .write_handle
467            .compare_and_append(
468                updates,
469                Antichain::from_elem(self.upper),
470                Antichain::from_elem(next_upper),
471            )
472            .await
473            .expect("invalid usage");
474
475        if let Err(e @ UpperMismatch { .. }) = res {
476            // Most likely we were fenced out.
477            // Sync to the current upper to detect that.
478            self.sync_to_current_upper().await?;
479            return Err(e.into());
480        }
481
482        // Lag the shard's upper by 1 to keep it readable.
483        let downgrade_to = Antichain::from_elem(next_upper.saturating_sub(1));
484
485        // The since handle gives us the ability to fence out other downgraders using an opaque token.
486        // (See the method documentation for details.)
487        // That's not needed here, so we use the since handle's opaque token to avoid any comparison
488        // failures.
489        let opaque = self.since_handle.opaque().clone();
490        let downgrade = self
491            .since_handle
492            .maybe_compare_and_downgrade_since(&opaque, (&opaque, &downgrade_to))
493            .await;
494        if let Some(Err(e)) = downgrade {
495            soft_panic_or_log!("found opaque value {e:?}, but expected {opaque:?}");
496        }
497
498        Ok(())
499    }
500
501    /// Generates an iterator of [`StateUpdate`] that contain all unconsolidated updates to the
502    /// catalog state up to, and including, `as_of`.
503    #[mz_ore::instrument]
504    async fn snapshot_unconsolidated(&mut self) -> Vec<StateUpdate<StateUpdateKind>> {
505        let current_upper = self.current_upper().await;
506
507        let mut snapshot = Vec::new();
508        let mut read_handle = self.read_handle().await;
509        let as_of = as_of(&read_handle, current_upper);
510        let mut stream = Box::pin(
511            // We use `snapshot_and_stream` because it guarantees unconsolidated output.
512            read_handle
513                .snapshot_and_stream(Antichain::from_elem(as_of))
514                .await
515                .expect("we have advanced the restart_as_of by the since"),
516        );
517        while let Some(update) = stream.next().await {
518            snapshot.push(update)
519        }
520        read_handle.expire().await;
521        snapshot
522            .into_iter()
523            .map(Into::<StateUpdate<StateUpdateKindJson>>::into)
524            .map(|state_update| state_update.try_into().expect("kind decoding error"))
525            .collect()
526    }
527
528    /// Listen and apply all updates that are currently in persist.
529    ///
530    /// Returns an error if this instance has been fenced out.
531    #[mz_ore::instrument]
532    pub(crate) async fn sync_to_current_upper(&mut self) -> Result<(), FenceError> {
533        let upper = self.current_upper().await;
534        self.sync(upper).await
535    }
536
537    /// Listen and apply all updates up to `target_upper`.
538    ///
539    /// Returns an error if this instance has been fenced out.
540    #[mz_ore::instrument(level = "debug")]
541    pub(crate) async fn sync(&mut self, target_upper: Timestamp) -> Result<(), FenceError> {
542        self.metrics.syncs.inc();
543        let counter = self.metrics.sync_latency_seconds.clone();
544        self.sync_inner(target_upper)
545            .wall_time()
546            .inc_by(counter)
547            .await
548    }
549
550    #[mz_ore::instrument(level = "debug")]
551    async fn sync_inner(&mut self, target_upper: Timestamp) -> Result<(), FenceError> {
552        self.fenceable_token.validate()?;
553
554        // Savepoint catalogs do not yet know how to update themselves in response to concurrent
555        // writes from writer catalogs.
556        if self.mode == Mode::Savepoint {
557            self.upper = max(self.upper, target_upper);
558            return Ok(());
559        }
560
561        let mut updates: BTreeMap<_, Vec<_>> = BTreeMap::new();
562
563        while self.upper < target_upper {
564            let listen_events = self.listen.fetch_next().await;
565            for listen_event in listen_events {
566                match listen_event {
567                    ListenEvent::Progress(upper) => {
568                        debug!("synced up to {upper:?}");
569                        self.upper = antichain_to_timestamp(upper);
570                        // Attempt to apply updates in batches of a single timestamp. If another
571                        // catalog wrote a fence token at one timestamp and then updates in a new
572                        // format at a later timestamp, then we want to apply the fence token
573                        // before attempting to deserialize the new updates.
574                        while let Some((ts, updates)) = updates.pop_first() {
575                            assert!(ts < self.upper, "expected {} < {}", ts, self.upper);
576                            let updates = updates.into_iter().map(
577                                |update: StateUpdate<StateUpdateKindJson>| {
578                                    let kind =
579                                        T::try_from(update.kind).expect("kind decoding error");
580                                    StateUpdate {
581                                        kind,
582                                        ts: update.ts,
583                                        diff: update.diff,
584                                    }
585                                },
586                            );
587                            self.apply_updates(updates)?;
588                        }
589                    }
590                    ListenEvent::Updates(batch_updates) => {
591                        for update in batch_updates {
592                            let update: StateUpdate<StateUpdateKindJson> = update.into();
593                            updates.entry(update.ts).or_default().push(update);
594                        }
595                    }
596                }
597            }
598        }
599        assert_eq!(updates, BTreeMap::new(), "all updates should be applied");
600        Ok(())
601    }
602
603    #[mz_ore::instrument(level = "debug")]
604    pub(crate) fn apply_updates(
605        &mut self,
606        updates: impl IntoIterator<Item = StateUpdate<T>>,
607    ) -> Result<(), FenceError> {
608        let mut updates: Vec<_> = updates
609            .into_iter()
610            .map(|StateUpdate { kind, ts, diff }| (kind, ts, diff))
611            .collect();
612
613        // This helps guarantee that for a single key, there is at most a single retraction and a
614        // single insertion per timestamp. Otherwise, we would need to match the retractions and
615        // insertions up by value and manually figure out what the end value should be.
616        differential_dataflow::consolidation::consolidate_updates(&mut updates);
617
618        // Updates must be applied in timestamp order. Within a timestamp retractions must be
619        // applied before insertions, or we might end up retracting the wrong value.
620        updates.sort_by(|(_, ts1, diff1), (_, ts2, diff2)| ts1.cmp(ts2).then(diff1.cmp(diff2)));
621
622        let mut errors = Vec::new();
623
624        for (kind, ts, diff) in updates {
625            if diff != Diff::ONE && diff != Diff::MINUS_ONE {
626                panic!("invalid update in consolidated trace: ({kind:?}, {ts:?}, {diff:?})");
627            }
628
629            match self.update_applier.apply_update(
630                StateUpdate { kind, ts, diff },
631                &mut self.fenceable_token,
632                &self.metrics,
633            ) {
634                Ok(Some(StateUpdate { kind, ts, diff })) => self.snapshot.push((kind, ts, diff)),
635                Ok(None) => {}
636                // Instead of returning immediately, we accumulate all the errors and return the one
637                // with the most information.
638                Err(err) => errors.push(err),
639            }
640        }
641
642        errors.sort();
643        if let Some(err) = errors.into_iter().next() {
644            return Err(err);
645        }
646
647        self.consolidate();
648
649        Ok(())
650    }
651
652    #[mz_ore::instrument]
653    pub(crate) fn consolidate(&mut self) {
654        soft_assert_no_log!(
655            self.snapshot
656                .windows(2)
657                .all(|updates| updates[0].1 <= updates[1].1),
658            "snapshot should be sorted by timestamp, {:#?}",
659            self.snapshot
660        );
661
662        let new_ts = self
663            .snapshot
664            .last()
665            .map(|(_, ts, _)| *ts)
666            .unwrap_or_else(Timestamp::minimum);
667        for (_, ts, _) in &mut self.snapshot {
668            *ts = new_ts;
669        }
670        differential_dataflow::consolidation::consolidate_updates(&mut self.snapshot);
671    }
672
673    /// Execute and return the results of `f` on the current catalog trace.
674    ///
675    /// Will return an error if the catalog has been fenced out.
676    async fn with_trace<R>(
677        &mut self,
678        f: impl FnOnce(&Vec<(T, Timestamp, Diff)>) -> Result<R, CatalogError>,
679    ) -> Result<R, CatalogError> {
680        self.sync_to_current_upper().await?;
681        f(&self.snapshot)
682    }
683
684    /// Open a read handle to the catalog.
685    async fn read_handle(&self) -> ReadHandle<SourceData, (), Timestamp, StorageDiff> {
686        self.persist_client
687            .open_leased_reader(
688                self.shard_id,
689                Arc::new(persist_desc()),
690                Arc::new(UnitSchema::default()),
691                Diagnostics {
692                    shard_name: CATALOG_SHARD_NAME.to_string(),
693                    handle_purpose: "openable durable catalog state temporary reader".to_string(),
694                },
695                USE_CRITICAL_SINCE_CATALOG.get(self.persist_client.dyncfgs()),
696            )
697            .await
698            .expect("invalid usage")
699    }
700
701    /// Politely releases all external resources that can only be released in an async context.
702    async fn expire(self: Box<Self>) {
703        self.write_handle.expire().await;
704        self.listen.expire().await;
705    }
706}
707
708impl<U: ApplyUpdate<StateUpdateKind>> PersistHandle<StateUpdateKind, U> {
709    /// Execute and return the results of `f` on the current catalog snapshot.
710    ///
711    /// Will return an error if the catalog has been fenced out.
712    async fn with_snapshot<T>(
713        &mut self,
714        f: impl FnOnce(Snapshot) -> Result<T, CatalogError>,
715    ) -> Result<T, CatalogError> {
716        fn apply<K, V>(map: &mut BTreeMap<K, V>, key: &K, value: &V, diff: Diff)
717        where
718            K: Ord + Clone,
719            V: Ord + Clone + Debug,
720        {
721            let key = key.clone();
722            let value = value.clone();
723            if diff == Diff::ONE {
724                let prev = map.insert(key, value);
725                assert_eq!(
726                    prev, None,
727                    "values must be explicitly retracted before inserting a new value"
728                );
729            } else if diff == Diff::MINUS_ONE {
730                let prev = map.remove(&key);
731                assert_eq!(
732                    prev,
733                    Some(value),
734                    "retraction does not match existing value"
735                );
736            }
737        }
738
739        self.with_trace(|trace| {
740            let mut snapshot = Snapshot::empty();
741            for (kind, ts, diff) in trace {
742                let diff = *diff;
743                if diff != Diff::ONE && diff != Diff::MINUS_ONE {
744                    panic!("invalid update in consolidated trace: ({kind:?}, {ts:?}, {diff:?})");
745                }
746
747                match kind {
748                    StateUpdateKind::AuditLog(_key, ()) => {
749                        // Ignore for snapshots.
750                    }
751                    StateUpdateKind::Cluster(key, value) => {
752                        apply(&mut snapshot.clusters, key, value, diff);
753                    }
754                    StateUpdateKind::ClusterReplica(key, value) => {
755                        apply(&mut snapshot.cluster_replicas, key, value, diff);
756                    }
757                    StateUpdateKind::Comment(key, value) => {
758                        apply(&mut snapshot.comments, key, value, diff);
759                    }
760                    StateUpdateKind::Config(key, value) => {
761                        apply(&mut snapshot.configs, key, value, diff);
762                    }
763                    StateUpdateKind::Database(key, value) => {
764                        apply(&mut snapshot.databases, key, value, diff);
765                    }
766                    StateUpdateKind::DefaultPrivilege(key, value) => {
767                        apply(&mut snapshot.default_privileges, key, value, diff);
768                    }
769                    StateUpdateKind::FenceToken(_token) => {
770                        // Ignore for snapshots.
771                    }
772                    StateUpdateKind::IdAllocator(key, value) => {
773                        apply(&mut snapshot.id_allocator, key, value, diff);
774                    }
775                    StateUpdateKind::IntrospectionSourceIndex(key, value) => {
776                        apply(&mut snapshot.introspection_sources, key, value, diff);
777                    }
778                    StateUpdateKind::Item(key, value) => {
779                        apply(&mut snapshot.items, key, value, diff);
780                    }
781                    StateUpdateKind::NetworkPolicy(key, value) => {
782                        apply(&mut snapshot.network_policies, key, value, diff);
783                    }
784                    StateUpdateKind::Role(key, value) => {
785                        apply(&mut snapshot.roles, key, value, diff);
786                    }
787                    StateUpdateKind::Schema(key, value) => {
788                        apply(&mut snapshot.schemas, key, value, diff);
789                    }
790                    StateUpdateKind::Setting(key, value) => {
791                        apply(&mut snapshot.settings, key, value, diff);
792                    }
793                    StateUpdateKind::SourceReferences(key, value) => {
794                        apply(&mut snapshot.source_references, key, value, diff);
795                    }
796                    StateUpdateKind::SystemConfiguration(key, value) => {
797                        apply(&mut snapshot.system_configurations, key, value, diff);
798                    }
799                    StateUpdateKind::SystemObjectMapping(key, value) => {
800                        apply(&mut snapshot.system_object_mappings, key, value, diff);
801                    }
802                    StateUpdateKind::SystemPrivilege(key, value) => {
803                        apply(&mut snapshot.system_privileges, key, value, diff);
804                    }
805                    StateUpdateKind::StorageCollectionMetadata(key, value) => {
806                        apply(&mut snapshot.storage_collection_metadata, key, value, diff);
807                    }
808                    StateUpdateKind::UnfinalizedShard(key, ()) => {
809                        apply(&mut snapshot.unfinalized_shards, key, &(), diff);
810                    }
811                    StateUpdateKind::TxnWalShard((), value) => {
812                        apply(&mut snapshot.txn_wal_shard, &(), value, diff);
813                    }
814                    StateUpdateKind::RoleAuth(key, value) => {
815                        apply(&mut snapshot.role_auth, key, value, diff);
816                    }
817                }
818            }
819            f(snapshot)
820        })
821        .await
822    }
823
824    /// Generates an iterator of [`StateUpdate`] that contain all updates to the catalog
825    /// state.
826    ///
827    /// The output is fetched directly from persist instead of the in-memory cache.
828    ///
829    /// The output is consolidated and sorted by timestamp in ascending order.
830    #[mz_ore::instrument(level = "debug")]
831    async fn persist_snapshot(&self) -> impl Iterator<Item = StateUpdate> + DoubleEndedIterator {
832        let mut read_handle = self.read_handle().await;
833        let as_of = as_of(&read_handle, self.upper);
834        let snapshot = snapshot_binary(&mut read_handle, as_of, &self.metrics)
835            .await
836            .map(|update| update.try_into().expect("kind decoding error"));
837        read_handle.expire().await;
838        snapshot
839    }
840}
841
842/// Applies updates for an unopened catalog.
843#[derive(Debug)]
844pub(crate) struct UnopenedCatalogStateInner {
845    /// A cache of the config collection of the catalog.
846    configs: BTreeMap<String, u64>,
847    /// A cache of the settings collection of the catalog.
848    settings: BTreeMap<String, String>,
849}
850
851impl UnopenedCatalogStateInner {
852    fn new() -> UnopenedCatalogStateInner {
853        UnopenedCatalogStateInner {
854            configs: BTreeMap::new(),
855            settings: BTreeMap::new(),
856        }
857    }
858}
859
860impl ApplyUpdate<StateUpdateKindJson> for UnopenedCatalogStateInner {
861    fn apply_update(
862        &mut self,
863        update: StateUpdate<StateUpdateKindJson>,
864        current_fence_token: &mut FenceableToken,
865        _metrics: &Arc<Metrics>,
866    ) -> Result<Option<StateUpdate<StateUpdateKindJson>>, FenceError> {
867        if !update.kind.is_audit_log() && update.kind.is_always_deserializable() {
868            let kind = TryInto::try_into(&update.kind).expect("kind is known to be deserializable");
869            match (kind, update.diff) {
870                (StateUpdateKind::Config(key, value), Diff::ONE) => {
871                    let prev = self.configs.insert(key.key, value.value);
872                    assert_eq!(
873                        prev, None,
874                        "values must be explicitly retracted before inserting a new value"
875                    );
876                }
877                (StateUpdateKind::Config(key, value), Diff::MINUS_ONE) => {
878                    let prev = self.configs.remove(&key.key);
879                    assert_eq!(
880                        prev,
881                        Some(value.value),
882                        "retraction does not match existing value"
883                    );
884                }
885                (StateUpdateKind::Setting(key, value), Diff::ONE) => {
886                    let prev = self.settings.insert(key.name, value.value);
887                    assert_eq!(
888                        prev, None,
889                        "values must be explicitly retracted before inserting a new value"
890                    );
891                }
892                (StateUpdateKind::Setting(key, value), Diff::MINUS_ONE) => {
893                    let prev = self.settings.remove(&key.name);
894                    assert_eq!(
895                        prev,
896                        Some(value.value),
897                        "retraction does not match existing value"
898                    );
899                }
900                (StateUpdateKind::FenceToken(fence_token), Diff::ONE) => {
901                    current_fence_token.maybe_fence(fence_token)?;
902                }
903                _ => {}
904            }
905        }
906
907        Ok(Some(update))
908    }
909}
910
911/// A Handle to an unopened catalog stored in persist. The unopened catalog can serve `Config` data,
912/// `Setting` data, or the current epoch. All other catalog data may be un-migrated and should not
913/// be read until the catalog has been opened. The [`UnopenedPersistCatalogState`] is responsible
914/// for opening the catalog, see [`OpenableDurableCatalogState::open`] for more details.
915///
916/// Production users should call [`Self::expire`] before dropping an [`UnopenedPersistCatalogState`]
917/// so that it can expire its leases. If/when rust gets AsyncDrop, this will be done automatically.
918pub(crate) type UnopenedPersistCatalogState =
919    PersistHandle<StateUpdateKindJson, UnopenedCatalogStateInner>;
920
921impl UnopenedPersistCatalogState {
922    /// Create a new [`UnopenedPersistCatalogState`] to the catalog state associated with
923    /// `organization_id`.
924    ///
925    /// All usages of the persist catalog must go through this function. That includes the
926    /// catalog-debug tool, the adapter's catalog, etc.
927    #[mz_ore::instrument]
928    pub(crate) async fn new(
929        persist_client: PersistClient,
930        organization_id: Uuid,
931        version: semver::Version,
932        deploy_generation: Option<u64>,
933        metrics: Arc<Metrics>,
934    ) -> Result<UnopenedPersistCatalogState, DurableCatalogError> {
935        let catalog_shard_id = shard_id(organization_id, CATALOG_SEED);
936        debug!(?catalog_shard_id, "new persist backed catalog state");
937
938        // Check the catalog shard version to ensure that we are compatible with the persist
939        // data format. This lets us return an error gracefully, rather than panicking later in
940        // persist.
941        let version_in_catalog_shard =
942            fetch_catalog_shard_version(&persist_client, catalog_shard_id).await;
943        if let Some(version_in_catalog_shard) = version_in_catalog_shard {
944            if !mz_persist_client::cfg::code_can_write_data(&version, &version_in_catalog_shard) {
945                return Err(DurableCatalogError::IncompatiblePersistVersion {
946                    found_version: version_in_catalog_shard,
947                    catalog_version: version,
948                });
949            }
950        }
951
952        let open_handles_start = Instant::now();
953        info!("startup: envd serve: catalog init: open handles beginning");
954        let since_handle = persist_client
955            .open_critical_since(
956                catalog_shard_id,
957                CATALOG_CRITICAL_SINCE.clone(),
958                Opaque::encode(&i64::MIN),
959                Diagnostics {
960                    shard_name: CATALOG_SHARD_NAME.to_string(),
961                    handle_purpose: "durable catalog state critical since".to_string(),
962                },
963            )
964            .await
965            .expect("invalid usage");
966
967        let (mut write_handle, mut read_handle) = persist_client
968            .open(
969                catalog_shard_id,
970                Arc::new(persist_desc()),
971                Arc::new(UnitSchema::default()),
972                Diagnostics {
973                    shard_name: CATALOG_SHARD_NAME.to_string(),
974                    handle_purpose: "durable catalog state handles".to_string(),
975                },
976                USE_CRITICAL_SINCE_CATALOG.get(persist_client.dyncfgs()),
977            )
978            .await
979            .expect("invalid usage");
980        info!(
981            "startup: envd serve: catalog init: open handles complete in {:?}",
982            open_handles_start.elapsed()
983        );
984
985        // Commit an empty write at the minimum timestamp so the catalog is always readable.
986        let upper = {
987            const EMPTY_UPDATES: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
988            let upper = Antichain::from_elem(Timestamp::minimum());
989            let next_upper = Timestamp::minimum().step_forward();
990            match write_handle
991                .compare_and_append(EMPTY_UPDATES, upper, Antichain::from_elem(next_upper))
992                .await
993                .expect("invalid usage")
994            {
995                Ok(()) => next_upper,
996                Err(mismatch) => antichain_to_timestamp(mismatch.current),
997            }
998        };
999
1000        let snapshot_start = Instant::now();
1001        info!("startup: envd serve: catalog init: snapshot beginning");
1002        let as_of = as_of(&read_handle, upper);
1003        let snapshot: Vec<_> = snapshot_binary(&mut read_handle, as_of, &metrics)
1004            .await
1005            .map(|StateUpdate { kind, ts, diff }| (kind, ts, diff))
1006            .collect();
1007        let listen = read_handle
1008            .listen(Antichain::from_elem(as_of))
1009            .await
1010            .expect("invalid usage");
1011        info!(
1012            "startup: envd serve: catalog init: snapshot complete in {:?}",
1013            snapshot_start.elapsed()
1014        );
1015
1016        let mut handle = UnopenedPersistCatalogState {
1017            // Unopened catalogs are always writeable until they're opened in an explicit mode.
1018            mode: Mode::Writable,
1019            since_handle,
1020            write_handle,
1021            listen,
1022            persist_client,
1023            shard_id: catalog_shard_id,
1024            // Initialize empty in-memory state.
1025            snapshot: Vec::new(),
1026            update_applier: UnopenedCatalogStateInner::new(),
1027            upper,
1028            fenceable_token: FenceableToken::new(deploy_generation),
1029            catalog_content_version: version,
1030            bootstrap_complete: false,
1031            metrics,
1032        };
1033        // If the snapshot is not consolidated, and we see multiple epoch values while applying the
1034        // updates, then we might accidentally fence ourselves out.
1035        soft_assert_no_log!(
1036            snapshot.iter().all(|(_, _, diff)| *diff == Diff::ONE),
1037            "snapshot should be consolidated: {snapshot:#?}"
1038        );
1039
1040        let apply_start = Instant::now();
1041        info!("startup: envd serve: catalog init: apply updates beginning");
1042        let updates = snapshot
1043            .into_iter()
1044            .map(|(kind, ts, diff)| StateUpdate { kind, ts, diff });
1045        handle.apply_updates(updates)?;
1046        info!(
1047            "startup: envd serve: catalog init: apply updates complete in {:?}",
1048            apply_start.elapsed()
1049        );
1050
1051        // Validate that the binary version of the current process is not less than any binary
1052        // version that has written to the catalog.
1053        // This condition is only checked once, right here. If a new process comes along with a
1054        // higher version, it must fence this process out with one of the existing fencing
1055        // mechanisms.
1056        if let Some(found_version) = handle.get_catalog_content_version().await? {
1057            // Use cmp_precedence() to ignore build metadata per SemVer 2.0.0 spec
1058            if handle
1059                .catalog_content_version
1060                .cmp_precedence(&found_version)
1061                == std::cmp::Ordering::Less
1062            {
1063                return Err(DurableCatalogError::IncompatiblePersistVersion {
1064                    found_version,
1065                    catalog_version: handle.catalog_content_version,
1066                });
1067            }
1068        }
1069
1070        Ok(handle)
1071    }
1072
1073    #[mz_ore::instrument]
1074    async fn open_inner(
1075        mut self,
1076        mode: Mode,
1077        initial_ts: Timestamp,
1078        bootstrap_args: &BootstrapArgs,
1079    ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1080        // It would be nice to use `initial_ts` here, but it comes from the system clock, not the
1081        // timestamp oracle.
1082        let mut commit_ts = self.upper;
1083        self.mode = mode;
1084
1085        // Validate the current deploy generation.
1086        match (&self.mode, &self.fenceable_token) {
1087            (_, FenceableToken::Unfenced { .. } | FenceableToken::Fenced { .. }) => {
1088                return Err(DurableCatalogError::Internal(
1089                    "catalog should not have fenced before opening".to_string(),
1090                )
1091                .into());
1092            }
1093            (
1094                Mode::Writable | Mode::Savepoint,
1095                FenceableToken::Initializing {
1096                    current_deploy_generation: None,
1097                    ..
1098                },
1099            ) => {
1100                return Err(DurableCatalogError::Internal(format!(
1101                    "cannot open in mode '{:?}' without a deploy generation",
1102                    self.mode,
1103                ))
1104                .into());
1105            }
1106            _ => {}
1107        }
1108
1109        let read_only = matches!(self.mode, Mode::Readonly);
1110
1111        // Fence out previous catalogs.
1112        loop {
1113            self.sync_to_current_upper().await?;
1114            commit_ts = max(commit_ts, self.upper);
1115            let (fence_updates, current_fenceable_token) = self
1116                .fenceable_token
1117                .generate_unfenced_token(self.mode)?
1118                .ok_or_else(|| {
1119                    DurableCatalogError::Internal(
1120                        "catalog should not have fenced before opening".to_string(),
1121                    )
1122                })?;
1123            debug!(
1124                ?self.upper,
1125                ?self.fenceable_token,
1126                ?current_fenceable_token,
1127                "fencing previous catalogs"
1128            );
1129            if matches!(self.mode, Mode::Writable) {
1130                match self
1131                    .compare_and_append(fence_updates.clone(), commit_ts)
1132                    .await
1133                {
1134                    Ok(upper) => {
1135                        commit_ts = upper;
1136                    }
1137                    Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
1138                    Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
1139                        warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
1140                        continue;
1141                    }
1142                }
1143            }
1144            self.fenceable_token = current_fenceable_token;
1145            break;
1146        }
1147
1148        if matches!(self.mode, Mode::Writable) {
1149            // One-time migration: The catalog previously used `CONTROLLER_CRITICAL_SINCE` for its
1150            // since handle. Now it uses its own `CATALOG_CRITICAL_SINCE`, to free
1151            // `CONTROLLER_CRITICAL_SINCE` up for the storage controller. The catalog and
1152            // controller handles differ in the `Opaque` codec, so we need a migration.
1153            //
1154            // TODO: Remove this once we don't support upgrading from v26 anymore.
1155            let mut controller_handle = self
1156                .persist_client
1157                .open_critical_since::<SourceData, (), Timestamp, StorageDiff>(
1158                    self.shard_id,
1159                    PersistClient::CONTROLLER_CRITICAL_SINCE,
1160                    Opaque::encode(&i64::MIN),
1161                    Diagnostics {
1162                        shard_name: CATALOG_SHARD_NAME.to_string(),
1163                        handle_purpose: "durable catalog state critical since (migration)"
1164                            .to_string(),
1165                    },
1166                )
1167                .await
1168                .expect("invalid usage");
1169
1170            let since = controller_handle.since().clone();
1171            let res = controller_handle
1172                .compare_and_downgrade_since(
1173                    &Opaque::encode(&i64::MIN),
1174                    (&Opaque::encode(&PersistEpoch::default()), &since),
1175                )
1176                .await;
1177            match res {
1178                Ok(_) => info!("migrated Opaque of catalog since handle"),
1179                Err(_) => { /* critical since was already migrated */ }
1180            }
1181        }
1182
1183        let is_initialized = self.is_initialized_inner();
1184        if !matches!(self.mode, Mode::Writable) && !is_initialized {
1185            return Err(CatalogError::Durable(DurableCatalogError::NotWritable(
1186                format!(
1187                    "catalog tables do not exist; will not create in {:?} mode",
1188                    self.mode
1189                ),
1190            )));
1191        }
1192        soft_assert_ne_or_log!(self.upper, Timestamp::minimum());
1193
1194        // Remove all audit log entries.
1195        let (audit_logs, snapshot): (Vec<_>, Vec<_>) = self
1196            .snapshot
1197            .into_iter()
1198            .partition(|(update, _, _)| update.is_audit_log());
1199        self.snapshot = snapshot;
1200        let audit_log_count = audit_logs.iter().map(|(_, _, diff)| diff).sum::<Diff>();
1201        let audit_log_handle = AuditLogIterator::new(audit_logs);
1202
1203        // Perform data migrations.
1204        if is_initialized && !read_only {
1205            commit_ts = upgrade(&mut self, commit_ts).await?;
1206        }
1207
1208        debug!(
1209            ?is_initialized,
1210            ?self.upper,
1211            "initializing catalog state"
1212        );
1213        let mut catalog = PersistCatalogState {
1214            mode: self.mode,
1215            since_handle: self.since_handle,
1216            write_handle: self.write_handle,
1217            listen: self.listen,
1218            persist_client: self.persist_client,
1219            shard_id: self.shard_id,
1220            upper: self.upper,
1221            fenceable_token: self.fenceable_token,
1222            // Initialize empty in-memory state.
1223            snapshot: Vec::new(),
1224            update_applier: CatalogStateInner::new(),
1225            catalog_content_version: self.catalog_content_version,
1226            bootstrap_complete: false,
1227            metrics: self.metrics,
1228        };
1229        catalog.metrics.collection_entries.reset();
1230        // Normally, `collection_entries` is updated in `apply_updates`. The audit log updates skip
1231        // over that function so we manually update it here.
1232        catalog
1233            .metrics
1234            .collection_entries
1235            .with_label_values(&[&CollectionType::AuditLog.to_string()])
1236            .add(audit_log_count.into_inner());
1237        let updates = self.snapshot.into_iter().map(|(kind, ts, diff)| {
1238            let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
1239            StateUpdate { kind, ts, diff }
1240        });
1241        catalog.apply_updates(updates)?;
1242
1243        let catalog_content_version = catalog.catalog_content_version.to_string();
1244        let txn = if is_initialized {
1245            let mut txn = catalog.transaction().await?;
1246
1247            // Ad-hoc migration: Initialize the `migration_version` expected by adapter to be
1248            // present in existing catalogs.
1249            //
1250            // Note: Need to exclude read-only catalog mode here, because in that mode all
1251            // transactions are expected to be no-ops.
1252            // TODO: remove this once we only support upgrades from version >= 0.164
1253            if txn.get_setting("migration_version".into()).is_none() && mode != Mode::Readonly {
1254                let old_version = txn.get_catalog_content_version();
1255                txn.set_setting("migration_version".into(), old_version.map(Into::into))?;
1256            }
1257
1258            txn.set_catalog_content_version(catalog_content_version)?;
1259            txn
1260        } else {
1261            soft_assert_eq_no_log!(
1262                catalog
1263                    .snapshot
1264                    .iter()
1265                    .filter(|(kind, _, _)| !matches!(kind, StateUpdateKind::FenceToken(_)))
1266                    .count(),
1267                0,
1268                "trace should not contain any updates for an uninitialized catalog: {:#?}",
1269                catalog.snapshot
1270            );
1271
1272            let mut txn = catalog.transaction().await?;
1273            initialize::initialize(
1274                &mut txn,
1275                bootstrap_args,
1276                initial_ts.into(),
1277                catalog_content_version,
1278            )
1279            .await?;
1280            txn
1281        };
1282
1283        if read_only {
1284            let (txn_batch, _) = txn.into_parts();
1285            // The upper here doesn't matter because we are only applying the updates in memory.
1286            let updates = StateUpdate::from_txn_batch_ts(txn_batch, catalog.upper);
1287            catalog.apply_updates(updates)?;
1288        } else {
1289            txn.commit_internal(commit_ts).await?;
1290        }
1291
1292        if matches!(catalog.mode, Mode::Writable) {
1293            let write_handle = catalog
1294                .persist_client
1295                .open_writer::<SourceData, (), Timestamp, i64>(
1296                    catalog.write_handle.shard_id(),
1297                    Arc::new(persist_desc()),
1298                    Arc::new(UnitSchema::default()),
1299                    Diagnostics {
1300                        shard_name: CATALOG_SHARD_NAME.to_string(),
1301                        handle_purpose: "compact catalog".to_string(),
1302                    },
1303                )
1304                .await
1305                .expect("invalid usage");
1306            let fuel = CATALOG_FORCE_COMPACTION_FUEL.handle(catalog.persist_client.dyncfgs());
1307            let wait = CATALOG_FORCE_COMPACTION_WAIT.handle(catalog.persist_client.dyncfgs());
1308            // We're going to gradually turn this on via dyncfgs. Run it in a task so that it
1309            // doesn't block startup.
1310            let _task = mz_ore::task::spawn(|| "catalog::force_shard_compaction", async move {
1311                let () =
1312                    mz_persist_client::cli::admin::dangerous_force_compaction_and_break_pushdown(
1313                        &write_handle,
1314                        || fuel.get(),
1315                        || wait.get(),
1316                    )
1317                    .await;
1318            });
1319        }
1320
1321        Ok((Box::new(catalog), audit_log_handle))
1322    }
1323
1324    /// Reports if the catalog state has been initialized.
1325    ///
1326    /// NOTE: This is the answer as of the last call to [`PersistHandle::sync`] or [`PersistHandle::sync_to_current_upper`],
1327    /// not necessarily what is currently in persist.
1328    #[mz_ore::instrument]
1329    fn is_initialized_inner(&self) -> bool {
1330        !self.update_applier.configs.is_empty()
1331    }
1332
1333    /// Get the current value of config `key`.
1334    ///
1335    /// Some configs need to be read before the catalog is opened for bootstrapping.
1336    #[mz_ore::instrument]
1337    async fn get_current_config(&mut self, key: &str) -> Result<Option<u64>, DurableCatalogError> {
1338        self.sync_to_current_upper().await?;
1339        Ok(self.update_applier.configs.get(key).cloned())
1340    }
1341
1342    /// Get the user version of this instance.
1343    ///
1344    /// The user version is used to determine if a migration is needed.
1345    #[mz_ore::instrument]
1346    pub(crate) async fn get_user_version(&mut self) -> Result<Option<u64>, DurableCatalogError> {
1347        self.get_current_config(USER_VERSION_KEY).await
1348    }
1349
1350    /// Get the current value of setting `name`.
1351    ///
1352    /// Some settings need to be read before the catalog is opened for bootstrapping.
1353    #[mz_ore::instrument]
1354    async fn get_current_setting(
1355        &mut self,
1356        name: &str,
1357    ) -> Result<Option<String>, DurableCatalogError> {
1358        self.sync_to_current_upper().await?;
1359        Ok(self.update_applier.settings.get(name).cloned())
1360    }
1361
1362    /// Get the catalog content version.
1363    ///
1364    /// The catalog content version is the semantic version of the most recent binary that wrote to
1365    /// the catalog.
1366    #[mz_ore::instrument]
1367    async fn get_catalog_content_version(
1368        &mut self,
1369    ) -> Result<Option<semver::Version>, DurableCatalogError> {
1370        let version = self
1371            .get_current_setting(CATALOG_CONTENT_VERSION_KEY)
1372            .await?;
1373        let version = version.map(|version| version.parse().expect("invalid version persisted"));
1374        Ok(version)
1375    }
1376}
1377
1378#[async_trait]
1379impl OpenableDurableCatalogState for UnopenedPersistCatalogState {
1380    #[mz_ore::instrument]
1381    async fn open_savepoint(
1382        mut self: Box<Self>,
1383        initial_ts: Timestamp,
1384        bootstrap_args: &BootstrapArgs,
1385    ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1386        self.open_inner(Mode::Savepoint, initial_ts, bootstrap_args)
1387            .boxed()
1388            .await
1389    }
1390
1391    #[mz_ore::instrument]
1392    async fn open_read_only(
1393        mut self: Box<Self>,
1394        bootstrap_args: &BootstrapArgs,
1395    ) -> Result<Box<dyn DurableCatalogState>, CatalogError> {
1396        self.open_inner(Mode::Readonly, EpochMillis::MIN.into(), bootstrap_args)
1397            .boxed()
1398            .await
1399            .map(|(catalog, _)| catalog)
1400    }
1401
1402    #[mz_ore::instrument]
1403    async fn open(
1404        mut self: Box<Self>,
1405        initial_ts: Timestamp,
1406        bootstrap_args: &BootstrapArgs,
1407    ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1408        self.open_inner(Mode::Writable, initial_ts, bootstrap_args)
1409            .boxed()
1410            .await
1411    }
1412
1413    #[mz_ore::instrument(level = "debug")]
1414    async fn open_debug(mut self: Box<Self>) -> Result<DebugCatalogState, CatalogError> {
1415        Ok(DebugCatalogState(*self))
1416    }
1417
1418    #[mz_ore::instrument]
1419    async fn is_initialized(&mut self) -> Result<bool, CatalogError> {
1420        self.sync_to_current_upper().await?;
1421        Ok(self.is_initialized_inner())
1422    }
1423
1424    #[mz_ore::instrument]
1425    async fn epoch(&mut self) -> Result<Epoch, CatalogError> {
1426        self.sync_to_current_upper().await?;
1427        self.fenceable_token
1428            .validate()?
1429            .map(|token| token.epoch)
1430            .ok_or(CatalogError::Durable(DurableCatalogError::Uninitialized))
1431    }
1432
1433    #[mz_ore::instrument]
1434    async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError> {
1435        self.sync_to_current_upper().await?;
1436        self.fenceable_token
1437            .token()
1438            .map(|token| token.deploy_generation)
1439            .ok_or(CatalogError::Durable(DurableCatalogError::Uninitialized))
1440    }
1441
1442    #[mz_ore::instrument(level = "debug")]
1443    async fn get_0dt_deployment_max_wait(&mut self) -> Result<Option<Duration>, CatalogError> {
1444        let value = self
1445            .get_current_config(WITH_0DT_DEPLOYMENT_MAX_WAIT)
1446            .await?;
1447        match value {
1448            None => Ok(None),
1449            Some(millis) => Ok(Some(Duration::from_millis(millis))),
1450        }
1451    }
1452
1453    #[mz_ore::instrument(level = "debug")]
1454    async fn get_0dt_deployment_ddl_check_interval(
1455        &mut self,
1456    ) -> Result<Option<Duration>, CatalogError> {
1457        let value = self
1458            .get_current_config(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL)
1459            .await?;
1460        match value {
1461            None => Ok(None),
1462            Some(millis) => Ok(Some(Duration::from_millis(millis))),
1463        }
1464    }
1465
1466    #[mz_ore::instrument(level = "debug")]
1467    async fn get_enable_0dt_deployment_panic_after_timeout(
1468        &mut self,
1469    ) -> Result<Option<bool>, CatalogError> {
1470        let value = self
1471            .get_current_config(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT)
1472            .await?;
1473        match value {
1474            None => Ok(None),
1475            Some(0) => Ok(Some(false)),
1476            Some(1) => Ok(Some(true)),
1477            Some(v) => Err(
1478                DurableCatalogError::from(TryFromProtoError::UnknownEnumVariant(format!(
1479                    "{v} is not a valid boolean value"
1480                )))
1481                .into(),
1482            ),
1483        }
1484    }
1485
1486    #[mz_ore::instrument]
1487    async fn has_system_config_synced_once(&mut self) -> Result<bool, DurableCatalogError> {
1488        self.get_current_config(SYSTEM_CONFIG_SYNCED_KEY)
1489            .await
1490            .map(|value| value.map(|value| value > 0).unwrap_or(false))
1491    }
1492
1493    #[mz_ore::instrument]
1494    async fn trace_unconsolidated(&mut self) -> Result<Trace, CatalogError> {
1495        self.sync_to_current_upper().await?;
1496        if self.is_initialized_inner() {
1497            let snapshot = self.snapshot_unconsolidated().await;
1498            Ok(Trace::from_snapshot(snapshot))
1499        } else {
1500            Err(CatalogError::Durable(DurableCatalogError::Uninitialized))
1501        }
1502    }
1503
1504    #[mz_ore::instrument]
1505    async fn trace_consolidated(&mut self) -> Result<Trace, CatalogError> {
1506        self.sync_to_current_upper().await?;
1507        if self.is_initialized_inner() {
1508            let snapshot = self.current_snapshot().await?;
1509            Ok(Trace::from_snapshot(snapshot))
1510        } else {
1511            Err(CatalogError::Durable(DurableCatalogError::Uninitialized))
1512        }
1513    }
1514
1515    #[mz_ore::instrument(level = "debug")]
1516    async fn expire(self: Box<Self>) {
1517        self.expire().await
1518    }
1519}
1520
1521/// Applies updates for an opened catalog.
1522#[derive(Debug)]
1523struct CatalogStateInner {
1524    /// A trace of all catalog updates that can be consumed by some higher layer.
1525    updates: VecDeque<memory::objects::StateUpdate>,
1526}
1527
1528impl CatalogStateInner {
1529    fn new() -> CatalogStateInner {
1530        CatalogStateInner {
1531            updates: VecDeque::new(),
1532        }
1533    }
1534}
1535
1536impl ApplyUpdate<StateUpdateKind> for CatalogStateInner {
1537    fn apply_update(
1538        &mut self,
1539        update: StateUpdate<StateUpdateKind>,
1540        current_fence_token: &mut FenceableToken,
1541        metrics: &Arc<Metrics>,
1542    ) -> Result<Option<StateUpdate<StateUpdateKind>>, FenceError> {
1543        if let Some(collection_type) = update.kind.collection_type() {
1544            metrics
1545                .collection_entries
1546                .with_label_values(&[&collection_type.to_string()])
1547                .add(update.diff.into_inner());
1548        }
1549
1550        {
1551            let update: Option<memory::objects::StateUpdate> = (&update)
1552                .try_into()
1553                .expect("invalid persisted update: {update:#?}");
1554            if let Some(update) = update {
1555                self.updates.push_back(update);
1556            }
1557        }
1558
1559        match (update.kind, update.diff) {
1560            (StateUpdateKind::AuditLog(_, ()), _) => Ok(None),
1561            // Nothing to due for fence token retractions but wait for the next insertion.
1562            (StateUpdateKind::FenceToken(_), Diff::MINUS_ONE) => Ok(None),
1563            (StateUpdateKind::FenceToken(token), Diff::ONE) => {
1564                current_fence_token.maybe_fence(token)?;
1565                Ok(None)
1566            }
1567            (kind, diff) => Ok(Some(StateUpdate {
1568                kind,
1569                ts: update.ts,
1570                diff,
1571            })),
1572        }
1573    }
1574}
1575
1576/// A durable store of the catalog state using Persist as an implementation. The durable store can
1577/// serve any catalog data and transactionally modify catalog data.
1578///
1579/// Production users should call [`Self::expire`] before dropping a [`PersistCatalogState`]
1580/// so that it can expire its leases. If/when rust gets AsyncDrop, this will be done automatically.
1581type PersistCatalogState = PersistHandle<StateUpdateKind, CatalogStateInner>;
1582
1583#[async_trait]
1584impl ReadOnlyDurableCatalogState for PersistCatalogState {
1585    fn epoch(&self) -> Epoch {
1586        self.fenceable_token
1587            .token()
1588            .expect("opened catalog state must have an epoch")
1589            .epoch
1590    }
1591
1592    fn metrics(&self) -> &Metrics {
1593        &self.metrics
1594    }
1595
1596    #[mz_ore::instrument(level = "debug")]
1597    async fn expire(self: Box<Self>) {
1598        self.expire().await
1599    }
1600
1601    fn is_bootstrap_complete(&self) -> bool {
1602        self.bootstrap_complete
1603    }
1604
1605    async fn get_audit_logs(&mut self) -> Result<Vec<VersionedEvent>, CatalogError> {
1606        self.sync_to_current_upper().await?;
1607        let audit_logs: Vec<_> = self
1608            .persist_snapshot()
1609            .await
1610            .filter_map(
1611                |StateUpdate {
1612                     kind,
1613                     ts: _,
1614                     diff: _,
1615                 }| match kind {
1616                    StateUpdateKind::AuditLog(key, ()) => Some(key),
1617                    _ => None,
1618                },
1619            )
1620            .collect();
1621        let mut audit_logs: Vec<_> = audit_logs
1622            .into_iter()
1623            .map(RustType::from_proto)
1624            .map_ok(|key: AuditLogKey| key.event)
1625            .collect::<Result<_, _>>()?;
1626        audit_logs.sort_by(|a, b| a.sortable_id().cmp(&b.sortable_id()));
1627        Ok(audit_logs)
1628    }
1629
1630    #[mz_ore::instrument(level = "debug")]
1631    async fn get_next_id(&mut self, id_type: &str) -> Result<u64, CatalogError> {
1632        self.with_trace(|trace| {
1633            Ok(trace
1634                .into_iter()
1635                .rev()
1636                .filter_map(|(kind, _, _)| match kind {
1637                    StateUpdateKind::IdAllocator(key, value) if key.name == id_type => {
1638                        Some(value.next_id)
1639                    }
1640                    _ => None,
1641                })
1642                .next()
1643                .expect("must exist"))
1644        })
1645        .await
1646    }
1647
1648    #[mz_ore::instrument(level = "debug")]
1649    async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError> {
1650        self.sync_to_current_upper().await?;
1651        Ok(self
1652            .fenceable_token
1653            .token()
1654            .expect("opened catalogs must have a token")
1655            .deploy_generation)
1656    }
1657
1658    #[mz_ore::instrument(level = "debug")]
1659    async fn snapshot(&mut self) -> Result<Snapshot, CatalogError> {
1660        self.with_snapshot(Ok).await
1661    }
1662
1663    #[mz_ore::instrument(level = "debug")]
1664    async fn sync_to_current_updates(
1665        &mut self,
1666    ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError> {
1667        let upper = self.current_upper().await;
1668        self.sync_updates(upper).await
1669    }
1670
1671    #[mz_ore::instrument(level = "debug")]
1672    async fn sync_updates(
1673        &mut self,
1674        target_upper: mz_repr::Timestamp,
1675    ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError> {
1676        self.sync(target_upper).await?;
1677        let mut updates = Vec::new();
1678        while let Some(update) = self.update_applier.updates.front() {
1679            if update.ts >= target_upper {
1680                break;
1681            }
1682
1683            let update = self
1684                .update_applier
1685                .updates
1686                .pop_front()
1687                .expect("peeked above");
1688            updates.push(update);
1689        }
1690        Ok(updates)
1691    }
1692
1693    async fn current_upper(&mut self) -> Timestamp {
1694        self.current_upper().await
1695    }
1696}
1697
1698#[async_trait]
1699#[allow(mismatched_lifetime_syntaxes)]
1700impl DurableCatalogState for PersistCatalogState {
1701    fn is_read_only(&self) -> bool {
1702        matches!(self.mode, Mode::Readonly)
1703    }
1704
1705    fn is_savepoint(&self) -> bool {
1706        matches!(self.mode, Mode::Savepoint)
1707    }
1708
1709    async fn mark_bootstrap_complete(&mut self) {
1710        self.bootstrap_complete = true;
1711        if matches!(self.mode, Mode::Writable) {
1712            self.since_handle
1713                .upgrade_version()
1714                .await
1715                .expect("invalid usage")
1716        }
1717    }
1718
1719    #[mz_ore::instrument(level = "debug")]
1720    async fn transaction(&mut self) -> Result<Transaction, CatalogError> {
1721        self.metrics.transactions_started.inc();
1722        let snapshot = self.snapshot().await?;
1723        let commit_ts = self.upper.clone();
1724        Transaction::new(self, snapshot, commit_ts)
1725    }
1726
1727    fn transaction_from_snapshot(
1728        &mut self,
1729        snapshot: Snapshot,
1730    ) -> Result<Transaction, CatalogError> {
1731        let commit_ts = self.upper.clone();
1732        Transaction::new(self, snapshot, commit_ts)
1733    }
1734
1735    #[mz_ore::instrument(level = "debug")]
1736    async fn commit_transaction(
1737        &mut self,
1738        txn_batch: TransactionBatch,
1739        commit_ts: Timestamp,
1740    ) -> Result<Timestamp, CatalogError> {
1741        async fn commit_transaction_inner(
1742            catalog: &mut PersistCatalogState,
1743            txn_batch: TransactionBatch,
1744            commit_ts: Timestamp,
1745        ) -> Result<Timestamp, CatalogError> {
1746            // If the transaction is empty then we don't error, even in read-only mode.
1747            // This is mostly for legacy reasons (i.e. with enough elbow grease this
1748            // behavior can be changed without breaking any fundamental assumptions).
1749            if catalog.mode == Mode::Readonly {
1750                let updates: Vec<_> = StateUpdate::from_txn_batch(txn_batch).collect();
1751                if !updates.is_empty() {
1752                    return Err(DurableCatalogError::NotWritable(format!(
1753                        "cannot commit a transaction in a read-only catalog: {updates:#?}"
1754                    ))
1755                    .into());
1756                }
1757                return Ok(catalog.upper);
1758            }
1759
1760            // If the current upper does not match the transaction's commit timestamp, then the
1761            // catalog must have changed since the transaction was started, making the transaction
1762            // invalid. When/if we want a multi-writer catalog, this will likely have to change
1763            // from an assert to a retry.
1764            assert_eq!(
1765                catalog.upper, txn_batch.upper,
1766                "only one transaction at a time is supported"
1767            );
1768
1769            assert!(
1770                commit_ts >= catalog.upper,
1771                "expected commit ts, {}, to be greater than or equal to upper, {}",
1772                commit_ts,
1773                catalog.upper
1774            );
1775
1776            let updates = StateUpdate::from_txn_batch(txn_batch).collect();
1777            debug!("committing updates: {updates:?}");
1778
1779            let next_upper = match catalog.mode {
1780                Mode::Writable => catalog
1781                    .compare_and_append(updates, commit_ts)
1782                    .await
1783                    .map_err(|e| e.unwrap_fence_error())?,
1784                Mode::Savepoint => {
1785                    let updates = updates.into_iter().map(|(kind, diff)| StateUpdate {
1786                        kind,
1787                        ts: commit_ts,
1788                        diff,
1789                    });
1790                    catalog.apply_updates(updates)?;
1791                    catalog.upper = commit_ts.step_forward();
1792                    catalog.upper
1793                }
1794                Mode::Readonly => unreachable!("handled above"),
1795            };
1796
1797            Ok(next_upper)
1798        }
1799        self.metrics.transaction_commits.inc();
1800        let counter = self.metrics.transaction_commit_latency_seconds.clone();
1801        commit_transaction_inner(self, txn_batch, commit_ts)
1802            .wall_time()
1803            .inc_by(counter)
1804            .await
1805    }
1806
1807    #[mz_ore::instrument(level = "debug")]
1808    async fn advance_upper(&mut self, new_upper: Timestamp) -> Result<(), CatalogError> {
1809        if self.upper >= new_upper {
1810            // We don't expect a no-op advancement, but if we are wrong we'd crash the process.
1811            // Seems safer to only soft-assert and return gracefully in production. If we get here
1812            // that means we tried to make the catalog shard readable at a time it was already
1813            // readable, which likely means we are violating linearizability. That's not great, but
1814            // crashing (or even crash-looping) is worse.
1815            //
1816            // TODO: Consider removing this once we have built some confidence.
1817            soft_panic_or_log!(
1818                "new_upper ({new_upper}) not greater than current upper ({})",
1819                self.upper
1820            );
1821            return Ok(());
1822        }
1823
1824        match self.mode {
1825            Mode::Writable => self
1826                .compare_and_append_inner([], new_upper)
1827                .await
1828                .map_err(|e| e.unwrap_fence_error())?,
1829            Mode::Savepoint => (),
1830            Mode::Readonly => {
1831                return Err(DurableCatalogError::NotWritable(
1832                    "cannot advance upper of a read-only catalog".into(),
1833                )
1834                .into());
1835            }
1836        }
1837
1838        self.upper = new_upper;
1839        // No sync needed since no data was written.
1840
1841        Ok(())
1842    }
1843
1844    fn shard_id(&self) -> ShardId {
1845        self.shard_id
1846    }
1847}
1848
1849/// Deterministically generate a shard ID for the given `organization_id` and `seed`.
1850pub fn shard_id(organization_id: Uuid, seed: usize) -> ShardId {
1851    let hash = sha2::Sha256::digest(format!("{organization_id}{seed}")).to_vec();
1852    soft_assert_eq_or_log!(hash.len(), 32, "SHA256 returns 32 bytes (256 bits)");
1853    let uuid = Uuid::from_slice(&hash[0..16]).expect("from_slice accepts exactly 16 bytes");
1854    ShardId::from_str(&format!("s{uuid}")).expect("known to be valid")
1855}
1856
1857/// Generates a timestamp for reading from `read_handle` that is as fresh as possible, given
1858/// `upper`.
1859fn as_of(
1860    read_handle: &ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1861    upper: Timestamp,
1862) -> Timestamp {
1863    let since = read_handle.since().clone();
1864    let mut as_of = upper.checked_sub(1).unwrap_or_else(|| {
1865        panic!("catalog persist shard should be initialize, found upper: {upper:?}")
1866    });
1867    // We only downgrade the since after writing, and we always set the since to one less than the
1868    // upper.
1869    soft_assert_or_log!(
1870        since.less_equal(&as_of),
1871        "since={since:?}, as_of={as_of:?}; since must be less than or equal to as_of"
1872    );
1873    // This should be a no-op if the assert above passes, however if it doesn't then we'd like to
1874    // continue with a correct timestamp instead of entering a panic loop.
1875    as_of.advance_by(since.borrow());
1876    as_of
1877}
1878
1879/// Fetch the persist version of the catalog shard, if one exists. A version will not
1880/// exist if we are creating a brand-new environment.
1881async fn fetch_catalog_shard_version(
1882    persist_client: &PersistClient,
1883    catalog_shard_id: ShardId,
1884) -> Option<semver::Version> {
1885    let shard_state = persist_client
1886        .inspect_shard::<Timestamp>(&catalog_shard_id)
1887        .await
1888        .ok()?;
1889    let json_state = serde_json::to_value(shard_state).expect("state serialization error");
1890    let json_version = json_state
1891        .get("applier_version")
1892        .cloned()
1893        .expect("missing applier_version");
1894    let version = serde_json::from_value(json_version).expect("version deserialization error");
1895    Some(version)
1896}
1897
1898/// Generates an iterator of [`StateUpdate`] that contain all updates to the catalog
1899/// state up to, and including, `as_of`.
1900///
1901/// The output is consolidated and sorted by timestamp in ascending order.
1902#[mz_ore::instrument(level = "debug")]
1903async fn snapshot_binary(
1904    read_handle: &mut ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1905    as_of: Timestamp,
1906    metrics: &Arc<Metrics>,
1907) -> impl Iterator<Item = StateUpdate<StateUpdateKindJson>> + DoubleEndedIterator + use<> {
1908    metrics.snapshots_taken.inc();
1909    let counter = metrics.snapshot_latency_seconds.clone();
1910    snapshot_binary_inner(read_handle, as_of)
1911        .wall_time()
1912        .inc_by(counter)
1913        .await
1914}
1915
1916/// Generates an iterator of [`StateUpdate`] that contain all updates to the catalog
1917/// state up to, and including, `as_of`.
1918///
1919/// The output is consolidated and sorted by timestamp in ascending order.
1920#[mz_ore::instrument(level = "debug")]
1921async fn snapshot_binary_inner(
1922    read_handle: &mut ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1923    as_of: Timestamp,
1924) -> impl Iterator<Item = StateUpdate<StateUpdateKindJson>> + DoubleEndedIterator + use<> {
1925    let snapshot = read_handle
1926        .snapshot_and_fetch(Antichain::from_elem(as_of))
1927        .await
1928        .expect("we have advanced the restart_as_of by the since");
1929    soft_assert_no_log!(
1930        snapshot.iter().all(|(_, _, diff)| *diff == 1),
1931        "snapshot_and_fetch guarantees a consolidated result: {snapshot:#?}"
1932    );
1933    snapshot
1934        .into_iter()
1935        .map(Into::<StateUpdate<StateUpdateKindJson>>::into)
1936        .sorted_by(|a, b| Ord::cmp(&b.ts, &a.ts))
1937}
1938
1939/// Convert an [`Antichain<Timestamp>`] to a [`Timestamp`].
1940///
1941/// The correctness of this function relies on [`Timestamp`] being totally ordered and never
1942/// finalizing the catalog shard.
1943pub(crate) fn antichain_to_timestamp(antichain: Antichain<Timestamp>) -> Timestamp {
1944    antichain
1945        .into_option()
1946        .expect("we use a totally ordered time and never finalize the shard")
1947}
1948
1949// Debug methods used by the catalog-debug tool.
1950
1951impl Trace {
1952    /// Generates a [`Trace`] from snapshot.
1953    fn from_snapshot(snapshot: impl IntoIterator<Item = StateUpdate>) -> Trace {
1954        let mut trace = Trace::new();
1955        for StateUpdate { kind, ts, diff } in snapshot {
1956            match kind {
1957                StateUpdateKind::AuditLog(k, v) => trace.audit_log.values.push(((k, v), ts, diff)),
1958                StateUpdateKind::Cluster(k, v) => trace.clusters.values.push(((k, v), ts, diff)),
1959                StateUpdateKind::ClusterReplica(k, v) => {
1960                    trace.cluster_replicas.values.push(((k, v), ts, diff))
1961                }
1962                StateUpdateKind::Comment(k, v) => trace.comments.values.push(((k, v), ts, diff)),
1963                StateUpdateKind::Config(k, v) => trace.configs.values.push(((k, v), ts, diff)),
1964                StateUpdateKind::Database(k, v) => trace.databases.values.push(((k, v), ts, diff)),
1965                StateUpdateKind::DefaultPrivilege(k, v) => {
1966                    trace.default_privileges.values.push(((k, v), ts, diff))
1967                }
1968                StateUpdateKind::FenceToken(_) => {
1969                    // Fence token not included in trace.
1970                }
1971                StateUpdateKind::IdAllocator(k, v) => {
1972                    trace.id_allocator.values.push(((k, v), ts, diff))
1973                }
1974                StateUpdateKind::IntrospectionSourceIndex(k, v) => {
1975                    trace.introspection_sources.values.push(((k, v), ts, diff))
1976                }
1977                StateUpdateKind::Item(k, v) => trace.items.values.push(((k, v), ts, diff)),
1978                StateUpdateKind::NetworkPolicy(k, v) => {
1979                    trace.network_policies.values.push(((k, v), ts, diff))
1980                }
1981                StateUpdateKind::Role(k, v) => trace.roles.values.push(((k, v), ts, diff)),
1982                StateUpdateKind::Schema(k, v) => trace.schemas.values.push(((k, v), ts, diff)),
1983                StateUpdateKind::Setting(k, v) => trace.settings.values.push(((k, v), ts, diff)),
1984                StateUpdateKind::SourceReferences(k, v) => {
1985                    trace.source_references.values.push(((k, v), ts, diff))
1986                }
1987                StateUpdateKind::SystemConfiguration(k, v) => {
1988                    trace.system_configurations.values.push(((k, v), ts, diff))
1989                }
1990                StateUpdateKind::SystemObjectMapping(k, v) => {
1991                    trace.system_object_mappings.values.push(((k, v), ts, diff))
1992                }
1993                StateUpdateKind::SystemPrivilege(k, v) => {
1994                    trace.system_privileges.values.push(((k, v), ts, diff))
1995                }
1996                StateUpdateKind::StorageCollectionMetadata(k, v) => trace
1997                    .storage_collection_metadata
1998                    .values
1999                    .push(((k, v), ts, diff)),
2000                StateUpdateKind::UnfinalizedShard(k, ()) => {
2001                    trace.unfinalized_shards.values.push(((k, ()), ts, diff))
2002                }
2003                StateUpdateKind::TxnWalShard((), v) => {
2004                    trace.txn_wal_shard.values.push((((), v), ts, diff))
2005                }
2006                StateUpdateKind::RoleAuth(k, v) => trace.role_auth.values.push(((k, v), ts, diff)),
2007            }
2008        }
2009        trace
2010    }
2011}
2012
2013impl UnopenedPersistCatalogState {
2014    /// Manually update value of `key` in collection `T` to `value`.
2015    #[mz_ore::instrument]
2016    pub(crate) async fn debug_edit<T: Collection>(
2017        &mut self,
2018        key: T::Key,
2019        value: T::Value,
2020    ) -> Result<Option<T::Value>, CatalogError>
2021    where
2022        T::Key: PartialEq + Eq + Debug + Clone,
2023        T::Value: Debug + Clone,
2024    {
2025        let prev_value = loop {
2026            let key = key.clone();
2027            let value = value.clone();
2028            let snapshot = self.current_snapshot().await?;
2029            let trace = Trace::from_snapshot(snapshot);
2030            let collection_trace = T::collection_trace(trace);
2031            let prev_values: Vec<_> = collection_trace
2032                .values
2033                .into_iter()
2034                .filter(|((k, _), _, diff)| {
2035                    soft_assert_eq_or_log!(*diff, Diff::ONE, "trace is consolidated");
2036                    &key == k
2037                })
2038                .collect();
2039
2040            let prev_value = match &prev_values[..] {
2041                [] => None,
2042                [((_, v), _, _)] => Some(v.clone()),
2043                prev_values => panic!("multiple values found for key {key:?}: {prev_values:?}"),
2044            };
2045
2046            let mut updates: Vec<_> = prev_values
2047                .into_iter()
2048                .map(|((k, v), _, _)| (T::update(k, v), Diff::MINUS_ONE))
2049                .collect();
2050            updates.push((T::update(key, value), Diff::ONE));
2051            // We must fence out all other catalogs, if we haven't already, since we are writing.
2052            match self.fenceable_token.generate_unfenced_token(self.mode)? {
2053                Some((fence_updates, current_fenceable_token)) => {
2054                    updates.extend(fence_updates.clone());
2055                    match self.compare_and_append(updates, self.upper).await {
2056                        Ok(_) => {
2057                            self.fenceable_token = current_fenceable_token;
2058                            break prev_value;
2059                        }
2060                        Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
2061                        Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
2062                            warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
2063                            continue;
2064                        }
2065                    }
2066                }
2067                None => {
2068                    self.compare_and_append(updates, self.upper)
2069                        .await
2070                        .map_err(|e| e.unwrap_fence_error())?;
2071                    break prev_value;
2072                }
2073            }
2074        };
2075        Ok(prev_value)
2076    }
2077
2078    /// Manually delete `key` from collection `T`.
2079    #[mz_ore::instrument]
2080    pub(crate) async fn debug_delete<T: Collection>(
2081        &mut self,
2082        key: T::Key,
2083    ) -> Result<(), CatalogError>
2084    where
2085        T::Key: PartialEq + Eq + Debug + Clone,
2086        T::Value: Debug,
2087    {
2088        loop {
2089            let key = key.clone();
2090            let snapshot = self.current_snapshot().await?;
2091            let trace = Trace::from_snapshot(snapshot);
2092            let collection_trace = T::collection_trace(trace);
2093            let mut retractions: Vec<_> = collection_trace
2094                .values
2095                .into_iter()
2096                .filter(|((k, _), _, diff)| {
2097                    soft_assert_eq_or_log!(*diff, Diff::ONE, "trace is consolidated");
2098                    &key == k
2099                })
2100                .map(|((k, v), _, _)| (T::update(k, v), Diff::MINUS_ONE))
2101                .collect();
2102
2103            // We must fence out all other catalogs, if we haven't already, since we are writing.
2104            match self.fenceable_token.generate_unfenced_token(self.mode)? {
2105                Some((fence_updates, current_fenceable_token)) => {
2106                    retractions.extend(fence_updates.clone());
2107                    match self.compare_and_append(retractions, self.upper).await {
2108                        Ok(_) => {
2109                            self.fenceable_token = current_fenceable_token;
2110                            break;
2111                        }
2112                        Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
2113                        Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
2114                            warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
2115                            continue;
2116                        }
2117                    }
2118                }
2119                None => {
2120                    self.compare_and_append(retractions, self.upper)
2121                        .await
2122                        .map_err(|e| e.unwrap_fence_error())?;
2123                    break;
2124                }
2125            }
2126        }
2127        Ok(())
2128    }
2129
2130    /// Generates a [`Vec<StateUpdate>`] that contain all updates to the catalog
2131    /// state.
2132    ///
2133    /// The output is consolidated and sorted by timestamp in ascending order and the current upper.
2134    async fn current_snapshot(
2135        &mut self,
2136    ) -> Result<impl IntoIterator<Item = StateUpdate> + '_, CatalogError> {
2137        self.sync_to_current_upper().await?;
2138        self.consolidate();
2139        Ok(self.snapshot.iter().cloned().map(|(kind, ts, diff)| {
2140            let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
2141            StateUpdate { kind, ts, diff }
2142        }))
2143    }
2144}