Skip to main content

mz_catalog/durable/
persist.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10#[cfg(test)]
11mod tests;
12
13use std::cmp::max;
14use std::collections::{BTreeMap, VecDeque};
15use std::fmt::Debug;
16use std::str::FromStr;
17use std::sync::{Arc, LazyLock};
18use std::time::{Duration, Instant};
19
20use async_trait::async_trait;
21use differential_dataflow::lattice::Lattice;
22use futures::{FutureExt, StreamExt};
23use itertools::Itertools;
24use mz_audit_log::VersionedEvent;
25use mz_ore::metrics::MetricsFutureExt;
26use mz_ore::now::EpochMillis;
27use mz_ore::{
28    soft_assert_eq_no_log, soft_assert_eq_or_log, soft_assert_ne_or_log, soft_assert_no_log,
29    soft_assert_or_log, soft_panic_or_log,
30};
31use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
32use mz_persist_client::cli::admin::{CATALOG_FORCE_COMPACTION_FUEL, CATALOG_FORCE_COMPACTION_WAIT};
33use mz_persist_client::critical::{CriticalReaderId, Opaque, SinceHandle};
34use mz_persist_client::error::UpperMismatch;
35use mz_persist_client::read::{Listen, ListenEvent, ReadHandle};
36use mz_persist_client::write::WriteHandle;
37use mz_persist_client::{Diagnostics, PersistClient, ShardId};
38use mz_persist_types::codec_impls::UnitSchema;
39use mz_proto::{RustType, TryFromProtoError};
40use mz_repr::Diff;
41use mz_storage_client::controller::PersistEpoch;
42use mz_storage_types::StorageDiff;
43use mz_storage_types::sources::SourceData;
44use sha2::Digest;
45use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
46use tracing::{debug, info, warn};
47use uuid::Uuid;
48
49use crate::durable::debug::{Collection, CollectionType, DebugCatalogState, Trace};
50use crate::durable::error::FenceError;
51use crate::durable::initialize::{
52    ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, SYSTEM_CONFIG_SYNCED_KEY, USER_VERSION_KEY,
53    WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
54};
55use crate::durable::metrics::Metrics;
56use crate::durable::objects::state_update::{
57    IntoStateUpdateKindJson, StateUpdate, StateUpdateKind, StateUpdateKindJson,
58    TryIntoStateUpdateKind,
59};
60use crate::durable::objects::{AuditLogKey, FenceToken, Snapshot};
61use crate::durable::transaction::TransactionBatch;
62use crate::durable::upgrade::upgrade;
63use crate::durable::{
64    AuditLogIterator, BootstrapArgs, CATALOG_CONTENT_VERSION_KEY, CatalogError,
65    DurableCatalogError, DurableCatalogState, Epoch, OpenableDurableCatalogState,
66    ReadOnlyDurableCatalogState, Transaction, initialize, persist_desc,
67};
68use crate::memory;
69
70/// New-type used to represent timestamps in persist.
71pub(crate) type Timestamp = mz_repr::Timestamp;
72
73/// The minimum value of an epoch.
74const MIN_EPOCH: Epoch = Epoch::new(1).expect("1 is non-zero");
75
76/// Human readable catalog shard name.
77const CATALOG_SHARD_NAME: &str = "catalog";
78
79/// [`CriticalReaderId`] for the catalog shard's own since hold, separate from
80/// [`PersistClient::CONTROLLER_CRITICAL_SINCE`].
81static CATALOG_CRITICAL_SINCE: LazyLock<CriticalReaderId> = LazyLock::new(|| {
82    "c55555555-6666-7777-8888-999999999999"
83        .parse()
84        .expect("valid CriticalReaderId")
85});
86
87/// Seed used to generate the persist shard ID for the catalog.
88const CATALOG_SEED: usize = 1;
89/// Legacy seed used to generate the persist shard ID for the upgrade shard. DO NOT REUSE.
90const _UPGRADE_SEED: usize = 2;
91/// Legacy seed used to generate the persist shard ID for builtin table migrations. DO NOT REUSE.
92pub const _BUILTIN_MIGRATION_SEED: usize = 3;
93/// Legacy seed used to generate the persist shard ID for the expression cache. DO NOT REUSE.
94pub const _EXPRESSION_CACHE_SEED: usize = 4;
95
96/// Durable catalog mode that dictates the effect of mutable operations.
97#[derive(Debug, Copy, Clone, Eq, PartialEq)]
98pub(crate) enum Mode {
99    /// Mutable operations are prohibited.
100    Readonly,
101    /// Mutable operations have an effect in-memory, but aren't persisted durably.
102    Savepoint,
103    /// Mutable operations have an effect in-memory and durably.
104    Writable,
105}
106
107/// Enum representing the fenced state of the catalog.
108#[derive(Debug)]
109pub(crate) enum FenceableToken {
110    /// The catalog is still initializing and learning about previously written fence tokens. This
111    /// state can be fenced if it encounters a larger deploy generation.
112    Initializing {
113        /// The largest fence token durably written to the catalog, if any.
114        durable_token: Option<FenceToken>,
115        /// This process's deploy generation.
116        current_deploy_generation: Option<u64>,
117    },
118    /// The current token has not been fenced.
119    Unfenced { current_token: FenceToken },
120    /// The current token has been fenced.
121    Fenced {
122        current_token: FenceToken,
123        fence_token: FenceToken,
124    },
125}
126
127impl FenceableToken {
128    /// Returns a new token.
129    fn new(current_deploy_generation: Option<u64>) -> Self {
130        Self::Initializing {
131            durable_token: None,
132            current_deploy_generation,
133        }
134    }
135
136    /// Returns the current token if it is not fenced, otherwise returns an error.
137    fn validate(&self) -> Result<Option<FenceToken>, FenceError> {
138        match self {
139            FenceableToken::Initializing { durable_token, .. } => Ok(durable_token.clone()),
140            FenceableToken::Unfenced { current_token, .. } => Ok(Some(current_token.clone())),
141            FenceableToken::Fenced {
142                current_token,
143                fence_token,
144            } => {
145                assert!(
146                    fence_token > current_token,
147                    "must be fenced by higher token; current={current_token:?}, fence={fence_token:?}"
148                );
149                if fence_token.deploy_generation > current_token.deploy_generation {
150                    Err(FenceError::DeployGeneration {
151                        current_generation: current_token.deploy_generation,
152                        fence_generation: fence_token.deploy_generation,
153                    })
154                } else {
155                    assert!(
156                        fence_token.epoch > current_token.epoch,
157                        "must be fenced by higher token; current={current_token:?}, fence={fence_token:?}"
158                    );
159                    Err(FenceError::Epoch {
160                        current_epoch: current_token.epoch,
161                        fence_epoch: fence_token.epoch,
162                    })
163                }
164            }
165        }
166    }
167
168    /// Returns the current token.
169    fn token(&self) -> Option<FenceToken> {
170        match self {
171            FenceableToken::Initializing { durable_token, .. } => durable_token.clone(),
172            FenceableToken::Unfenced { current_token, .. } => Some(current_token.clone()),
173            FenceableToken::Fenced { current_token, .. } => Some(current_token.clone()),
174        }
175    }
176
177    /// Returns `Err` if `token` fences out `self`, `Ok` otherwise.
178    fn maybe_fence(&mut self, token: FenceToken) -> Result<(), FenceError> {
179        match self {
180            FenceableToken::Initializing {
181                durable_token,
182                current_deploy_generation,
183                ..
184            } => {
185                match durable_token {
186                    Some(durable_token) => {
187                        *durable_token = max(durable_token.clone(), token.clone());
188                    }
189                    None => {
190                        *durable_token = Some(token.clone());
191                    }
192                }
193                if let Some(current_deploy_generation) = current_deploy_generation {
194                    if *current_deploy_generation < token.deploy_generation {
195                        *self = FenceableToken::Fenced {
196                            current_token: FenceToken {
197                                deploy_generation: *current_deploy_generation,
198                                epoch: token.epoch,
199                            },
200                            fence_token: token,
201                        };
202                        self.validate()?;
203                    }
204                }
205            }
206            FenceableToken::Unfenced { current_token } => {
207                if *current_token < token {
208                    *self = FenceableToken::Fenced {
209                        current_token: current_token.clone(),
210                        fence_token: token,
211                    };
212                    self.validate()?;
213                }
214            }
215            FenceableToken::Fenced { .. } => {
216                self.validate()?;
217            }
218        }
219
220        Ok(())
221    }
222
223    /// Returns a [`FenceableToken::Unfenced`] token and the updates to the catalog required to
224    /// transition to the `Unfenced` state if `self` is [`FenceableToken::Initializing`], otherwise
225    /// returns `None`.
226    fn generate_unfenced_token(
227        &self,
228        mode: Mode,
229    ) -> Result<Option<(Vec<(StateUpdateKind, Diff)>, FenceableToken)>, DurableCatalogError> {
230        let (durable_token, current_deploy_generation) = match self {
231            FenceableToken::Initializing {
232                durable_token,
233                current_deploy_generation,
234            } => (durable_token.clone(), current_deploy_generation.clone()),
235            FenceableToken::Unfenced { .. } | FenceableToken::Fenced { .. } => return Ok(None),
236        };
237
238        let mut fence_updates = Vec::with_capacity(2);
239
240        if let Some(durable_token) = &durable_token {
241            fence_updates.push((
242                StateUpdateKind::FenceToken(durable_token.clone()),
243                Diff::MINUS_ONE,
244            ));
245        }
246
247        let current_deploy_generation = current_deploy_generation
248            .or_else(|| durable_token.as_ref().map(|token| token.deploy_generation))
249            // We cannot initialize a catalog without a deploy generation.
250            .ok_or(DurableCatalogError::Uninitialized)?;
251        let mut current_epoch = durable_token
252            .map(|token| token.epoch)
253            .unwrap_or(MIN_EPOCH)
254            .get();
255        // Only writable catalogs attempt to increment the epoch.
256        if matches!(mode, Mode::Writable) {
257            current_epoch = current_epoch + 1;
258        }
259        let current_epoch = Epoch::new(current_epoch).expect("known to be non-zero");
260        let current_token = FenceToken {
261            deploy_generation: current_deploy_generation,
262            epoch: current_epoch,
263        };
264
265        fence_updates.push((
266            StateUpdateKind::FenceToken(current_token.clone()),
267            Diff::ONE,
268        ));
269
270        let current_fenceable_token = FenceableToken::Unfenced { current_token };
271
272        Ok(Some((fence_updates, current_fenceable_token)))
273    }
274}
275
276/// An error that can occur while executing [`PersistHandle::compare_and_append`].
277#[derive(Debug, thiserror::Error)]
278pub(crate) enum CompareAndAppendError {
279    #[error(transparent)]
280    Fence(#[from] FenceError),
281    /// Catalog encountered an upper mismatch when trying to write to the catalog. This should only
282    /// happen while trying to fence out other catalogs.
283    #[error(
284        "expected catalog upper {expected_upper:?} did not match actual catalog upper {actual_upper:?}"
285    )]
286    UpperMismatch {
287        expected_upper: Timestamp,
288        actual_upper: Timestamp,
289    },
290}
291
292impl CompareAndAppendError {
293    pub(crate) fn unwrap_fence_error(self) -> FenceError {
294        match self {
295            CompareAndAppendError::Fence(e) => e,
296            e @ CompareAndAppendError::UpperMismatch { .. } => {
297                panic!("unexpected upper mismatch: {e:?}")
298            }
299        }
300    }
301}
302
303impl From<UpperMismatch<Timestamp>> for CompareAndAppendError {
304    fn from(upper_mismatch: UpperMismatch<Timestamp>) -> Self {
305        Self::UpperMismatch {
306            expected_upper: antichain_to_timestamp(upper_mismatch.expected),
307            actual_upper: antichain_to_timestamp(upper_mismatch.current),
308        }
309    }
310}
311
312pub(crate) trait ApplyUpdate<T: IntoStateUpdateKindJson> {
313    /// Process and apply `update`.
314    ///
315    /// Returns `Some` if `update` should be cached in memory and `None` otherwise.
316    fn apply_update(
317        &mut self,
318        update: StateUpdate<T>,
319        current_fence_token: &mut FenceableToken,
320        metrics: &Arc<Metrics>,
321    ) -> Result<Option<StateUpdate<T>>, FenceError>;
322}
323
324/// A handle for interacting with the persist catalog shard.
325///
326/// The catalog shard is used in multiple different contexts, for example pre-open and post-open,
327/// but for all contexts the majority of the durable catalog's behavior is identical. This struct
328/// implements those behaviors that are identical while allowing the user to specify the different
329/// behaviors via generic parameters.
330///
331/// The behavior of the durable catalog can be different along one of two axes. The first is the
332/// format of each individual update, i.e. raw binary, the current protobuf version, previous
333/// protobuf versions, etc. The second axis is what to do with each individual update, for example
334/// before opening we cache all config updates but don't cache them after opening. These behaviors
335/// are customizable via the `T: TryIntoStateUpdateKind` and `U: ApplyUpdate<T>` generic parameters
336/// respectively.
337#[derive(Debug)]
338pub(crate) struct PersistHandle<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> {
339    /// The [`Mode`] that this catalog was opened in.
340    pub(crate) mode: Mode,
341    /// Since handle to control compaction.
342    since_handle: SinceHandle<SourceData, (), Timestamp, StorageDiff>,
343    /// Write handle to persist.
344    write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
345    /// Listener to catalog changes.
346    listen: Listen<SourceData, (), Timestamp, StorageDiff>,
347    /// Handle for connecting to persist.
348    persist_client: PersistClient,
349    /// Catalog shard ID.
350    shard_id: ShardId,
351    /// Cache of the most recent catalog snapshot.
352    ///
353    /// We use a tuple instead of [`StateUpdate`] to make consolidation easier.
354    pub(crate) snapshot: Vec<(T, Timestamp, Diff)>,
355    /// Applies custom processing, filtering, and fencing for each individual update.
356    update_applier: U,
357    /// The current upper of the persist shard.
358    pub(crate) upper: Timestamp,
359    /// The fence token of the catalog, if one exists.
360    fenceable_token: FenceableToken,
361    /// The semantic version of the current binary.
362    catalog_content_version: semver::Version,
363    /// Flag to indicate if bootstrap is complete.
364    bootstrap_complete: bool,
365    /// Metrics for the persist catalog.
366    metrics: Arc<Metrics>,
367    /// Snapshot size at the last amortized consolidation, used by
368    /// [`Self::maybe_consolidate`] to decide when to consolidate. Initialized
369    /// lazily on the first call.
370    size_at_last_consolidation: Option<usize>,
371}
372
373impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
374    /// Fetch the current upper of the catalog state.
375    #[mz_ore::instrument]
376    async fn current_upper(&mut self) -> Timestamp {
377        match self.mode {
378            Mode::Writable | Mode::Readonly => {
379                let upper = self.write_handle.fetch_recent_upper().await;
380                antichain_to_timestamp(upper.clone())
381            }
382            Mode::Savepoint => self.upper,
383        }
384    }
385
386    /// Appends `updates` iff the current global upper of the catalog is `self.upper`.
387    ///
388    /// Returns the next upper used to commit the transaction.
389    #[mz_ore::instrument]
390    pub(crate) async fn compare_and_append<S: IntoStateUpdateKindJson>(
391        &mut self,
392        updates: Vec<(S, Diff)>,
393        commit_ts: Timestamp,
394    ) -> Result<Timestamp, CompareAndAppendError> {
395        // The fencing check is expensive, so run it only with soft assertions enabled.
396        let contains_fence = if mz_ore::assert::soft_assertions_enabled() {
397            let parsed_updates: Vec<_> = updates
398                .clone()
399                .into_iter()
400                .filter_map(|(update, diff)| {
401                    let update: StateUpdateKindJson = update.into();
402                    let update = TryIntoStateUpdateKind::try_into(update).ok()?;
403                    Some((update, diff))
404                })
405                .collect();
406            let contains_retraction = parsed_updates.iter().any(|(update, diff)| {
407                matches!(update, StateUpdateKind::FenceToken(..)) && *diff == Diff::MINUS_ONE
408            });
409            let contains_addition = parsed_updates.iter().any(|(update, diff)| {
410                matches!(update, StateUpdateKind::FenceToken(..)) && *diff == Diff::ONE
411            });
412            Some(contains_retraction && contains_addition)
413        } else {
414            None
415        };
416
417        let updates = updates.into_iter().map(|(kind, diff)| {
418            let kind: StateUpdateKindJson = kind.into();
419            (
420                (Into::<SourceData>::into(kind), ()),
421                commit_ts,
422                diff.into_inner(),
423            )
424        });
425        let next_upper = commit_ts.step_forward();
426        self.compare_and_append_inner(updates, next_upper)
427            .await
428            .inspect_err(|e| {
429                // A compare-and-append failure means someone else must have written to the
430                // catalog. We expect to have been fenced out, since writing to the catalog without
431                // fencing other catalogs should be impossible. The one exception is if we are
432                // trying to fence other catalogs with this write.
433                if let Some(contains_fence) = contains_fence {
434                    soft_assert_or_log!(
435                        matches!(e, CompareAndAppendError::Fence(_)) || contains_fence,
436                        "encountered an upper mismatch on a non-fencing write"
437                    );
438                }
439            })?;
440
441        self.sync(next_upper).await?;
442        Ok(next_upper)
443    }
444
445    /// Compare-and-append `updates` to the catalog shard, advancing the upper to `next_upper`.
446    ///
447    /// On success, updating `self.upper` is left to the caller. The caller can thus decide whether
448    /// or not it needs to sync the catalog.
449    ///
450    /// # Panics
451    ///
452    /// Panics if not in `Writable` mode.
453    /// Panics if `next_upper` is not greater than `self.upper`.
454    async fn compare_and_append_inner(
455        &mut self,
456        updates: impl IntoIterator<Item = ((SourceData, ()), Timestamp, StorageDiff)>,
457        next_upper: Timestamp,
458    ) -> Result<(), CompareAndAppendError> {
459        assert_eq!(self.mode, Mode::Writable);
460        assert!(
461            next_upper > self.upper,
462            "next_upper ({next_upper}) not greater than current upper ({})",
463            self.upper,
464        );
465
466        let res = self
467            .write_handle
468            .compare_and_append(
469                updates,
470                Antichain::from_elem(self.upper),
471                Antichain::from_elem(next_upper),
472            )
473            .await
474            .expect("invalid usage");
475
476        if let Err(e @ UpperMismatch { .. }) = res {
477            // Most likely we were fenced out.
478            // Sync to the current upper to detect that.
479            self.sync_to_current_upper().await?;
480            return Err(e.into());
481        }
482
483        // Lag the shard's upper by 1 to keep it readable.
484        let downgrade_to = Antichain::from_elem(next_upper.saturating_sub(1));
485
486        // The since handle gives us the ability to fence out other downgraders using an opaque token.
487        // (See the method documentation for details.)
488        // That's not needed here, so we use the since handle's opaque token to avoid any comparison
489        // failures.
490        let opaque = self.since_handle.opaque().clone();
491        let downgrade = self
492            .since_handle
493            .maybe_compare_and_downgrade_since(&opaque, (&opaque, &downgrade_to))
494            .await;
495        if let Some(Err(e)) = downgrade {
496            soft_panic_or_log!("found opaque value {e:?}, but expected {opaque:?}");
497        }
498
499        Ok(())
500    }
501
502    /// Generates an iterator of [`StateUpdate`] that contain all unconsolidated updates to the
503    /// catalog state up to, and including, `as_of`.
504    #[mz_ore::instrument]
505    async fn snapshot_unconsolidated(&mut self) -> Vec<StateUpdate<StateUpdateKind>> {
506        let current_upper = self.current_upper().await;
507
508        let mut snapshot = Vec::new();
509        let mut read_handle = self.read_handle().await;
510        let as_of = as_of(&read_handle, current_upper);
511        let mut stream = Box::pin(
512            // We use `snapshot_and_stream` because it guarantees unconsolidated output.
513            read_handle
514                .snapshot_and_stream(Antichain::from_elem(as_of))
515                .await
516                .expect("we have advanced the restart_as_of by the since"),
517        );
518        while let Some(update) = stream.next().await {
519            snapshot.push(update)
520        }
521        read_handle.expire().await;
522        snapshot
523            .into_iter()
524            .map(Into::<StateUpdate<StateUpdateKindJson>>::into)
525            .map(|state_update| state_update.try_into().expect("kind decoding error"))
526            .collect()
527    }
528
529    /// Listen and apply all updates that are currently in persist.
530    ///
531    /// Returns an error if this instance has been fenced out.
532    #[mz_ore::instrument]
533    pub(crate) async fn sync_to_current_upper(&mut self) -> Result<(), FenceError> {
534        let upper = self.current_upper().await;
535        self.sync(upper).await
536    }
537
538    /// Listen and apply all updates up to `target_upper`.
539    ///
540    /// Returns an error if this instance has been fenced out.
541    #[mz_ore::instrument(level = "debug")]
542    pub(crate) async fn sync(&mut self, target_upper: Timestamp) -> Result<(), FenceError> {
543        self.metrics.syncs.inc();
544        let counter = self.metrics.sync_latency_seconds.clone();
545        self.sync_inner(target_upper)
546            .wall_time()
547            .inc_by(counter)
548            .await
549    }
550
551    #[mz_ore::instrument(level = "debug")]
552    async fn sync_inner(&mut self, target_upper: Timestamp) -> Result<(), FenceError> {
553        self.fenceable_token.validate()?;
554
555        // Savepoint catalogs do not yet know how to update themselves in response to concurrent
556        // writes from writer catalogs.
557        if self.mode == Mode::Savepoint {
558            self.upper = max(self.upper, target_upper);
559            return Ok(());
560        }
561
562        let mut updates: BTreeMap<_, Vec<_>> = BTreeMap::new();
563
564        // Reset the amortized consolidation tracker so it picks up the
565        // current snapshot size as its baseline.
566        self.size_at_last_consolidation = None;
567
568        while self.upper < target_upper {
569            let listen_events = self.listen.fetch_next().await;
570            for listen_event in listen_events {
571                match listen_event {
572                    ListenEvent::Progress(upper) => {
573                        debug!("synced up to {upper:?}");
574                        self.upper = antichain_to_timestamp(upper);
575                        // Attempt to apply updates in batches of a single timestamp. If another
576                        // catalog wrote a fence token at one timestamp and then updates in a new
577                        // format at a later timestamp, then we want to apply the fence token
578                        // before attempting to deserialize the new updates.
579                        while let Some((ts, updates)) = updates.pop_first() {
580                            assert!(ts < self.upper, "expected {} < {}", ts, self.upper);
581                            let updates = updates.into_iter().map(
582                                |update: StateUpdate<StateUpdateKindJson>| {
583                                    let kind =
584                                        T::try_from(update.kind).expect("kind decoding error");
585                                    StateUpdate {
586                                        kind,
587                                        ts: update.ts,
588                                        diff: update.diff,
589                                    }
590                                },
591                            );
592                            self.apply_updates(updates)?;
593                            self.maybe_consolidate();
594                        }
595                    }
596                    ListenEvent::Updates(batch_updates) => {
597                        for update in batch_updates {
598                            let update: StateUpdate<StateUpdateKindJson> = update.into();
599                            updates.entry(update.ts).or_default().push(update);
600                        }
601                    }
602                }
603            }
604        }
605        assert_eq!(updates, BTreeMap::new(), "all updates should be applied");
606        // Always consolidate at the end to ensure the snapshot is clean.
607        self.consolidate();
608        Ok(())
609    }
610
611    /// Apply a batch of updates and then consolidate the snapshot. This is the
612    /// typical entry point for callers that apply updates in a single batch.
613    ///
614    /// For hot loops that apply updates across many timestamps (e.g., `sync_inner`),
615    /// use `apply_updates` directly and call `consolidate()` periodically (e.g.,
616    /// on snapshot doubling) to bound memory while staying amortized O(N log N).
617    pub(crate) fn apply_updates_and_consolidate(
618        &mut self,
619        updates: impl IntoIterator<Item = StateUpdate<T>>,
620    ) -> Result<(), FenceError> {
621        self.apply_updates(updates)?;
622        self.consolidate();
623        Ok(())
624    }
625
626    /// Apply a batch of updates to the catalog state without consolidating.
627    ///
628    /// Does NOT consolidate the snapshot afterward. If you are calling this once,
629    /// prefer `apply_updates_and_consolidate`. This method exists for loops that
630    /// call it many times — consolidating per call would be O(K * N log N) instead
631    /// of O(N log N). Callers should consolidate periodically (e.g., on snapshot
632    /// doubling) to bound memory.
633    #[mz_ore::instrument(level = "debug")]
634    fn apply_updates(
635        &mut self,
636        updates: impl IntoIterator<Item = StateUpdate<T>>,
637    ) -> Result<(), FenceError> {
638        let mut updates: Vec<_> = updates
639            .into_iter()
640            .map(|StateUpdate { kind, ts, diff }| (kind, ts, diff))
641            .collect();
642
643        // This helps guarantee that for a single key, there is at most a single retraction and a
644        // single insertion per timestamp. Otherwise, we would need to match the retractions and
645        // insertions up by value and manually figure out what the end value should be.
646        differential_dataflow::consolidation::consolidate_updates(&mut updates);
647
648        // Updates must be applied in timestamp order. Within a timestamp retractions must be
649        // applied before insertions, or we might end up retracting the wrong value.
650        updates.sort_by(|(_, ts1, diff1), (_, ts2, diff2)| ts1.cmp(ts2).then(diff1.cmp(diff2)));
651
652        let mut errors = Vec::new();
653
654        for (kind, ts, diff) in updates {
655            if diff != Diff::ONE && diff != Diff::MINUS_ONE {
656                panic!("invalid update in consolidated trace: ({kind:?}, {ts:?}, {diff:?})");
657            }
658
659            match self.update_applier.apply_update(
660                StateUpdate { kind, ts, diff },
661                &mut self.fenceable_token,
662                &self.metrics,
663            ) {
664                Ok(Some(StateUpdate { kind, ts, diff })) => self.snapshot.push((kind, ts, diff)),
665                Ok(None) => {}
666                // Instead of returning immediately, we accumulate all the errors and return the one
667                // with the most information.
668                Err(err) => errors.push(err),
669            }
670        }
671
672        // Track the high-water mark of the unconsolidated snapshot size.
673        let len = i64::try_from(self.snapshot.len()).unwrap_or(i64::MAX);
674        if len > self.metrics.snapshot_max_entries.get() {
675            self.metrics.snapshot_max_entries.set(len);
676        }
677
678        errors.sort();
679        if let Some(err) = errors.into_iter().next() {
680            return Err(err);
681        }
682
683        Ok(())
684    }
685
686    /// Consolidate the snapshot if it has at least doubled in size since the
687    /// last consolidation. This amortizes the O(N log N) consolidation cost
688    /// over many small updates, keeping the total work O(N log N) rather than
689    /// O(K * N log N) for K timestamps.
690    fn maybe_consolidate(&mut self) {
691        let threshold = *self
692            .size_at_last_consolidation
693            // Use a minimum of 8 to avoid consolidating on every update when
694            // the snapshot is small or empty (since 0 * 2 = 0).
695            .get_or_insert_with(|| max(self.snapshot.len(), 8));
696        if self.snapshot.len() >= threshold * 2 {
697            self.consolidate();
698            self.size_at_last_consolidation = Some(self.snapshot.len());
699        }
700    }
701
702    #[mz_ore::instrument]
703    pub(crate) fn consolidate(&mut self) {
704        self.metrics.snapshot_consolidations.inc();
705        soft_assert_no_log!(
706            self.snapshot
707                .windows(2)
708                .all(|updates| updates[0].1 <= updates[1].1),
709            "snapshot should be sorted by timestamp, {:#?}",
710            self.snapshot
711        );
712
713        let new_ts = self
714            .snapshot
715            .last()
716            .map(|(_, ts, _)| *ts)
717            .unwrap_or_else(Timestamp::minimum);
718        for (_, ts, _) in &mut self.snapshot {
719            *ts = new_ts;
720        }
721        differential_dataflow::consolidation::consolidate_updates(&mut self.snapshot);
722    }
723
724    /// Execute and return the results of `f` on the current catalog trace.
725    ///
726    /// Will return an error if the catalog has been fenced out.
727    async fn with_trace<R>(
728        &mut self,
729        f: impl FnOnce(&Vec<(T, Timestamp, Diff)>) -> Result<R, CatalogError>,
730    ) -> Result<R, CatalogError> {
731        self.sync_to_current_upper().await?;
732        f(&self.snapshot)
733    }
734
735    /// Open a read handle to the catalog.
736    async fn read_handle(&self) -> ReadHandle<SourceData, (), Timestamp, StorageDiff> {
737        self.persist_client
738            .open_leased_reader(
739                self.shard_id,
740                Arc::new(persist_desc()),
741                Arc::new(UnitSchema::default()),
742                Diagnostics {
743                    shard_name: CATALOG_SHARD_NAME.to_string(),
744                    handle_purpose: "openable durable catalog state temporary reader".to_string(),
745                },
746                USE_CRITICAL_SINCE_CATALOG.get(self.persist_client.dyncfgs()),
747            )
748            .await
749            .expect("invalid usage")
750    }
751
752    /// Politely releases all external resources that can only be released in an async context.
753    async fn expire(self: Box<Self>) {
754        self.write_handle.expire().await;
755        self.listen.expire().await;
756    }
757}
758
759impl<U: ApplyUpdate<StateUpdateKind>> PersistHandle<StateUpdateKind, U> {
760    /// Execute and return the results of `f` on the current catalog snapshot.
761    ///
762    /// Will return an error if the catalog has been fenced out.
763    async fn with_snapshot<T>(
764        &mut self,
765        f: impl FnOnce(Snapshot) -> Result<T, CatalogError>,
766    ) -> Result<T, CatalogError> {
767        fn apply<K, V>(map: &mut BTreeMap<K, V>, key: &K, value: &V, diff: Diff)
768        where
769            K: Ord + Clone,
770            V: Ord + Clone + Debug,
771        {
772            let key = key.clone();
773            let value = value.clone();
774            if diff == Diff::ONE {
775                let prev = map.insert(key, value);
776                assert_eq!(
777                    prev, None,
778                    "values must be explicitly retracted before inserting a new value"
779                );
780            } else if diff == Diff::MINUS_ONE {
781                let prev = map.remove(&key);
782                assert_eq!(
783                    prev,
784                    Some(value),
785                    "retraction does not match existing value"
786                );
787            }
788        }
789
790        self.with_trace(|trace| {
791            let mut snapshot = Snapshot::empty();
792            for (kind, ts, diff) in trace {
793                let diff = *diff;
794                if diff != Diff::ONE && diff != Diff::MINUS_ONE {
795                    panic!("invalid update in consolidated trace: ({kind:?}, {ts:?}, {diff:?})");
796                }
797
798                match kind {
799                    StateUpdateKind::AuditLog(_key, ()) => {
800                        // Ignore for snapshots.
801                    }
802                    StateUpdateKind::Cluster(key, value) => {
803                        apply(&mut snapshot.clusters, key, value, diff);
804                    }
805                    StateUpdateKind::ClusterReplica(key, value) => {
806                        apply(&mut snapshot.cluster_replicas, key, value, diff);
807                    }
808                    StateUpdateKind::Comment(key, value) => {
809                        apply(&mut snapshot.comments, key, value, diff);
810                    }
811                    StateUpdateKind::Config(key, value) => {
812                        apply(&mut snapshot.configs, key, value, diff);
813                    }
814                    StateUpdateKind::Database(key, value) => {
815                        apply(&mut snapshot.databases, key, value, diff);
816                    }
817                    StateUpdateKind::DefaultPrivilege(key, value) => {
818                        apply(&mut snapshot.default_privileges, key, value, diff);
819                    }
820                    StateUpdateKind::FenceToken(_token) => {
821                        // Ignore for snapshots.
822                    }
823                    StateUpdateKind::IdAllocator(key, value) => {
824                        apply(&mut snapshot.id_allocator, key, value, diff);
825                    }
826                    StateUpdateKind::IntrospectionSourceIndex(key, value) => {
827                        apply(&mut snapshot.introspection_sources, key, value, diff);
828                    }
829                    StateUpdateKind::Item(key, value) => {
830                        apply(&mut snapshot.items, key, value, diff);
831                    }
832                    StateUpdateKind::NetworkPolicy(key, value) => {
833                        apply(&mut snapshot.network_policies, key, value, diff);
834                    }
835                    StateUpdateKind::Role(key, value) => {
836                        apply(&mut snapshot.roles, key, value, diff);
837                    }
838                    StateUpdateKind::Schema(key, value) => {
839                        apply(&mut snapshot.schemas, key, value, diff);
840                    }
841                    StateUpdateKind::Setting(key, value) => {
842                        apply(&mut snapshot.settings, key, value, diff);
843                    }
844                    StateUpdateKind::SourceReferences(key, value) => {
845                        apply(&mut snapshot.source_references, key, value, diff);
846                    }
847                    StateUpdateKind::SystemConfiguration(key, value) => {
848                        apply(&mut snapshot.system_configurations, key, value, diff);
849                    }
850                    StateUpdateKind::ClusterSystemConfiguration(key, value) => {
851                        apply(
852                            &mut snapshot.cluster_system_configurations,
853                            key,
854                            value,
855                            diff,
856                        );
857                    }
858                    StateUpdateKind::ReplicaSystemConfiguration(key, value) => {
859                        apply(
860                            &mut snapshot.replica_system_configurations,
861                            key,
862                            value,
863                            diff,
864                        );
865                    }
866                    StateUpdateKind::SystemObjectMapping(key, value) => {
867                        apply(&mut snapshot.system_object_mappings, key, value, diff);
868                    }
869                    StateUpdateKind::SystemPrivilege(key, value) => {
870                        apply(&mut snapshot.system_privileges, key, value, diff);
871                    }
872                    StateUpdateKind::StorageCollectionMetadata(key, value) => {
873                        apply(&mut snapshot.storage_collection_metadata, key, value, diff);
874                    }
875                    StateUpdateKind::UnfinalizedShard(key, ()) => {
876                        apply(&mut snapshot.unfinalized_shards, key, &(), diff);
877                    }
878                    StateUpdateKind::TxnWalShard((), value) => {
879                        apply(&mut snapshot.txn_wal_shard, &(), value, diff);
880                    }
881                    StateUpdateKind::RoleAuth(key, value) => {
882                        apply(&mut snapshot.role_auth, key, value, diff);
883                    }
884                }
885            }
886            f(snapshot)
887        })
888        .await
889    }
890
891    /// Generates an iterator of [`StateUpdate`] that contain all updates to the catalog
892    /// state.
893    ///
894    /// The output is fetched directly from persist instead of the in-memory cache.
895    ///
896    /// The output is consolidated and sorted by timestamp in ascending order.
897    #[mz_ore::instrument(level = "debug")]
898    async fn persist_snapshot(&self) -> impl Iterator<Item = StateUpdate> + DoubleEndedIterator {
899        let mut read_handle = self.read_handle().await;
900        let as_of = as_of(&read_handle, self.upper);
901        let snapshot = snapshot_binary(&mut read_handle, as_of, &self.metrics)
902            .await
903            .map(|update| update.try_into().expect("kind decoding error"));
904        read_handle.expire().await;
905        snapshot
906    }
907}
908
909/// Applies updates for an unopened catalog.
910#[derive(Debug)]
911pub(crate) struct UnopenedCatalogStateInner {
912    /// A cache of the config collection of the catalog.
913    configs: BTreeMap<String, u64>,
914    /// A cache of the settings collection of the catalog.
915    settings: BTreeMap<String, String>,
916}
917
918impl UnopenedCatalogStateInner {
919    fn new() -> UnopenedCatalogStateInner {
920        UnopenedCatalogStateInner {
921            configs: BTreeMap::new(),
922            settings: BTreeMap::new(),
923        }
924    }
925}
926
927impl ApplyUpdate<StateUpdateKindJson> for UnopenedCatalogStateInner {
928    fn apply_update(
929        &mut self,
930        update: StateUpdate<StateUpdateKindJson>,
931        current_fence_token: &mut FenceableToken,
932        _metrics: &Arc<Metrics>,
933    ) -> Result<Option<StateUpdate<StateUpdateKindJson>>, FenceError> {
934        if !update.kind.is_audit_log() && update.kind.is_always_deserializable() {
935            let kind = TryInto::try_into(&update.kind).expect("kind is known to be deserializable");
936            match (kind, update.diff) {
937                (StateUpdateKind::Config(key, value), Diff::ONE) => {
938                    let prev = self.configs.insert(key.key, value.value);
939                    assert_eq!(
940                        prev, None,
941                        "values must be explicitly retracted before inserting a new value"
942                    );
943                }
944                (StateUpdateKind::Config(key, value), Diff::MINUS_ONE) => {
945                    let prev = self.configs.remove(&key.key);
946                    assert_eq!(
947                        prev,
948                        Some(value.value),
949                        "retraction does not match existing value"
950                    );
951                }
952                (StateUpdateKind::Setting(key, value), Diff::ONE) => {
953                    let prev = self.settings.insert(key.name, value.value);
954                    assert_eq!(
955                        prev, None,
956                        "values must be explicitly retracted before inserting a new value"
957                    );
958                }
959                (StateUpdateKind::Setting(key, value), Diff::MINUS_ONE) => {
960                    let prev = self.settings.remove(&key.name);
961                    assert_eq!(
962                        prev,
963                        Some(value.value),
964                        "retraction does not match existing value"
965                    );
966                }
967                (StateUpdateKind::FenceToken(fence_token), Diff::ONE) => {
968                    current_fence_token.maybe_fence(fence_token)?;
969                }
970                _ => {}
971            }
972        }
973
974        Ok(Some(update))
975    }
976}
977
978/// A Handle to an unopened catalog stored in persist. The unopened catalog can serve `Config` data,
979/// `Setting` data, or the current epoch. All other catalog data may be un-migrated and should not
980/// be read until the catalog has been opened. The [`UnopenedPersistCatalogState`] is responsible
981/// for opening the catalog, see [`OpenableDurableCatalogState::open`] for more details.
982///
983/// Production users should call [`Self::expire`] before dropping an [`UnopenedPersistCatalogState`]
984/// so that it can expire its leases. If/when rust gets AsyncDrop, this will be done automatically.
985pub(crate) type UnopenedPersistCatalogState =
986    PersistHandle<StateUpdateKindJson, UnopenedCatalogStateInner>;
987
988impl UnopenedPersistCatalogState {
989    /// Create a new [`UnopenedPersistCatalogState`] to the catalog state associated with
990    /// `organization_id`.
991    ///
992    /// All usages of the persist catalog must go through this function. That includes the
993    /// catalog-debug tool, the adapter's catalog, etc.
994    #[mz_ore::instrument]
995    pub(crate) async fn new(
996        persist_client: PersistClient,
997        organization_id: Uuid,
998        version: semver::Version,
999        deploy_generation: Option<u64>,
1000        metrics: Arc<Metrics>,
1001    ) -> Result<UnopenedPersistCatalogState, DurableCatalogError> {
1002        let catalog_shard_id = shard_id(organization_id, CATALOG_SEED);
1003        debug!(?catalog_shard_id, "new persist backed catalog state");
1004
1005        // Check the catalog shard version to ensure that we are compatible with the persist
1006        // data format. This lets us return an error gracefully, rather than panicking later in
1007        // persist.
1008        let version_in_catalog_shard =
1009            fetch_catalog_shard_version(&persist_client, catalog_shard_id).await;
1010        if let Some(version_in_catalog_shard) = version_in_catalog_shard {
1011            if !mz_persist_client::cfg::code_can_write_data(&version, &version_in_catalog_shard) {
1012                return Err(DurableCatalogError::IncompatiblePersistVersion {
1013                    found_version: version_in_catalog_shard,
1014                    catalog_version: version,
1015                });
1016            }
1017        }
1018
1019        let open_handles_start = Instant::now();
1020        info!("startup: envd serve: catalog init: open handles beginning");
1021        let since_handle = persist_client
1022            .open_critical_since(
1023                catalog_shard_id,
1024                CATALOG_CRITICAL_SINCE.clone(),
1025                Opaque::encode(&i64::MIN),
1026                Diagnostics {
1027                    shard_name: CATALOG_SHARD_NAME.to_string(),
1028                    handle_purpose: "durable catalog state critical since".to_string(),
1029                },
1030            )
1031            .await
1032            .expect("invalid usage");
1033
1034        let (mut write_handle, mut read_handle) = persist_client
1035            .open(
1036                catalog_shard_id,
1037                Arc::new(persist_desc()),
1038                Arc::new(UnitSchema::default()),
1039                Diagnostics {
1040                    shard_name: CATALOG_SHARD_NAME.to_string(),
1041                    handle_purpose: "durable catalog state handles".to_string(),
1042                },
1043                USE_CRITICAL_SINCE_CATALOG.get(persist_client.dyncfgs()),
1044            )
1045            .await
1046            .expect("invalid usage");
1047        info!(
1048            "startup: envd serve: catalog init: open handles complete in {:?}",
1049            open_handles_start.elapsed()
1050        );
1051
1052        // Commit an empty write at the minimum timestamp so the catalog is always readable.
1053        let upper = {
1054            const EMPTY_UPDATES: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
1055            let upper = Antichain::from_elem(Timestamp::minimum());
1056            let next_upper = Timestamp::minimum().step_forward();
1057            match write_handle
1058                .compare_and_append(EMPTY_UPDATES, upper, Antichain::from_elem(next_upper))
1059                .await
1060                .expect("invalid usage")
1061            {
1062                Ok(()) => next_upper,
1063                Err(mismatch) => antichain_to_timestamp(mismatch.current),
1064            }
1065        };
1066
1067        let snapshot_start = Instant::now();
1068        info!("startup: envd serve: catalog init: snapshot beginning");
1069        let as_of = as_of(&read_handle, upper);
1070        let snapshot: Vec<_> = snapshot_binary(&mut read_handle, as_of, &metrics)
1071            .await
1072            .map(|StateUpdate { kind, ts, diff }| (kind, ts, diff))
1073            .collect();
1074        let listen = read_handle
1075            .listen(Antichain::from_elem(as_of))
1076            .await
1077            .expect("invalid usage");
1078        info!(
1079            "startup: envd serve: catalog init: snapshot complete in {:?}",
1080            snapshot_start.elapsed()
1081        );
1082
1083        let mut handle = UnopenedPersistCatalogState {
1084            // Unopened catalogs are always writeable until they're opened in an explicit mode.
1085            mode: Mode::Writable,
1086            since_handle,
1087            write_handle,
1088            listen,
1089            persist_client,
1090            shard_id: catalog_shard_id,
1091            // Initialize empty in-memory state.
1092            snapshot: Vec::new(),
1093            update_applier: UnopenedCatalogStateInner::new(),
1094            upper,
1095            fenceable_token: FenceableToken::new(deploy_generation),
1096            catalog_content_version: version,
1097            bootstrap_complete: false,
1098            metrics,
1099            size_at_last_consolidation: None,
1100        };
1101        // If the snapshot is not consolidated, and we see multiple epoch values while applying the
1102        // updates, then we might accidentally fence ourselves out.
1103        soft_assert_no_log!(
1104            snapshot.iter().all(|(_, _, diff)| *diff == Diff::ONE),
1105            "snapshot should be consolidated: {snapshot:#?}"
1106        );
1107
1108        let apply_start = Instant::now();
1109        info!("startup: envd serve: catalog init: apply updates beginning");
1110        let updates = snapshot
1111            .into_iter()
1112            .map(|(kind, ts, diff)| StateUpdate { kind, ts, diff });
1113        handle.apply_updates_and_consolidate(updates)?;
1114        info!(
1115            "startup: envd serve: catalog init: apply updates complete in {:?}",
1116            apply_start.elapsed()
1117        );
1118
1119        // Validate that the binary version of the current process is not less than any binary
1120        // version that has written to the catalog.
1121        // This condition is only checked once, right here. If a new process comes along with a
1122        // higher version, it must fence this process out with one of the existing fencing
1123        // mechanisms.
1124        if let Some(found_version) = handle.get_catalog_content_version().await? {
1125            // Use cmp_precedence() to ignore build metadata per SemVer 2.0.0 spec
1126            if handle
1127                .catalog_content_version
1128                .cmp_precedence(&found_version)
1129                == std::cmp::Ordering::Less
1130            {
1131                return Err(DurableCatalogError::IncompatiblePersistVersion {
1132                    found_version,
1133                    catalog_version: handle.catalog_content_version,
1134                });
1135            }
1136        }
1137
1138        Ok(handle)
1139    }
1140
1141    #[mz_ore::instrument]
1142    async fn open_inner(
1143        mut self,
1144        mode: Mode,
1145        initial_ts: Timestamp,
1146        bootstrap_args: &BootstrapArgs,
1147    ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1148        // It would be nice to use `initial_ts` here, but it comes from the system clock, not the
1149        // timestamp oracle.
1150        let mut commit_ts = self.upper;
1151        self.mode = mode;
1152
1153        // Validate the current deploy generation.
1154        match (&self.mode, &self.fenceable_token) {
1155            (_, FenceableToken::Unfenced { .. } | FenceableToken::Fenced { .. }) => {
1156                return Err(DurableCatalogError::Internal(
1157                    "catalog should not have fenced before opening".to_string(),
1158                )
1159                .into());
1160            }
1161            (
1162                Mode::Writable | Mode::Savepoint,
1163                FenceableToken::Initializing {
1164                    current_deploy_generation: None,
1165                    ..
1166                },
1167            ) => {
1168                return Err(DurableCatalogError::Internal(format!(
1169                    "cannot open in mode '{:?}' without a deploy generation",
1170                    self.mode,
1171                ))
1172                .into());
1173            }
1174            _ => {}
1175        }
1176
1177        let read_only = matches!(self.mode, Mode::Readonly);
1178
1179        // Fence out previous catalogs.
1180        loop {
1181            self.sync_to_current_upper().await?;
1182            commit_ts = max(commit_ts, self.upper);
1183            let (fence_updates, current_fenceable_token) = self
1184                .fenceable_token
1185                .generate_unfenced_token(self.mode)?
1186                .ok_or_else(|| {
1187                    DurableCatalogError::Internal(
1188                        "catalog should not have fenced before opening".to_string(),
1189                    )
1190                })?;
1191            debug!(
1192                ?self.upper,
1193                ?self.fenceable_token,
1194                ?current_fenceable_token,
1195                "fencing previous catalogs"
1196            );
1197            if matches!(self.mode, Mode::Writable) {
1198                match self
1199                    .compare_and_append(fence_updates.clone(), commit_ts)
1200                    .await
1201                {
1202                    Ok(upper) => {
1203                        commit_ts = upper;
1204                    }
1205                    Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
1206                    Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
1207                        warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
1208                        continue;
1209                    }
1210                }
1211            }
1212            self.fenceable_token = current_fenceable_token;
1213            break;
1214        }
1215
1216        if matches!(self.mode, Mode::Writable) {
1217            // One-time migration: The catalog previously used `CONTROLLER_CRITICAL_SINCE` for its
1218            // since handle. Now it uses its own `CATALOG_CRITICAL_SINCE`, to free
1219            // `CONTROLLER_CRITICAL_SINCE` up for the storage controller. The catalog and
1220            // controller handles differ in the `Opaque` codec, so we need a migration.
1221            //
1222            // TODO: Remove this once we don't support upgrading from v26 anymore.
1223            let mut controller_handle = self
1224                .persist_client
1225                .open_critical_since::<SourceData, (), Timestamp, StorageDiff>(
1226                    self.shard_id,
1227                    PersistClient::CONTROLLER_CRITICAL_SINCE,
1228                    Opaque::encode(&i64::MIN),
1229                    Diagnostics {
1230                        shard_name: CATALOG_SHARD_NAME.to_string(),
1231                        handle_purpose: "durable catalog state critical since (migration)"
1232                            .to_string(),
1233                    },
1234                )
1235                .await
1236                .expect("invalid usage");
1237
1238            let since = controller_handle.since().clone();
1239            let res = controller_handle
1240                .compare_and_downgrade_since(
1241                    &Opaque::encode(&i64::MIN),
1242                    (&Opaque::encode(&PersistEpoch::default()), &since),
1243                )
1244                .await;
1245            match res {
1246                Ok(_) => info!("migrated Opaque of catalog since handle"),
1247                Err(_) => { /* critical since was already migrated */ }
1248            }
1249        }
1250
1251        let is_initialized = self.is_initialized_inner();
1252        if !matches!(self.mode, Mode::Writable) && !is_initialized {
1253            return Err(CatalogError::Durable(DurableCatalogError::NotWritable(
1254                format!(
1255                    "catalog tables do not exist; will not create in {:?} mode",
1256                    self.mode
1257                ),
1258            )));
1259        }
1260        soft_assert_ne_or_log!(self.upper, Timestamp::minimum());
1261
1262        // Remove all audit log entries.
1263        let (audit_logs, snapshot): (Vec<_>, Vec<_>) = self
1264            .snapshot
1265            .into_iter()
1266            .partition(|(update, _, _)| update.is_audit_log());
1267        self.snapshot = snapshot;
1268        let audit_log_count = audit_logs.iter().map(|(_, _, diff)| diff).sum::<Diff>();
1269        let audit_log_handle = AuditLogIterator::new(audit_logs);
1270
1271        // Perform data migrations.
1272        if is_initialized && !read_only {
1273            commit_ts = upgrade(&mut self, commit_ts).await?;
1274        }
1275
1276        debug!(
1277            ?is_initialized,
1278            ?self.upper,
1279            "initializing catalog state"
1280        );
1281        let mut catalog = PersistCatalogState {
1282            mode: self.mode,
1283            since_handle: self.since_handle,
1284            write_handle: self.write_handle,
1285            listen: self.listen,
1286            persist_client: self.persist_client,
1287            shard_id: self.shard_id,
1288            upper: self.upper,
1289            fenceable_token: self.fenceable_token,
1290            // Initialize empty in-memory state.
1291            snapshot: Vec::new(),
1292            update_applier: CatalogStateInner::new(),
1293            catalog_content_version: self.catalog_content_version,
1294            bootstrap_complete: false,
1295            metrics: self.metrics,
1296            size_at_last_consolidation: None,
1297        };
1298        catalog.metrics.collection_entries.reset();
1299        // Normally, `collection_entries` is updated in `apply_updates`. The audit log updates skip
1300        // over that function so we manually update it here.
1301        catalog
1302            .metrics
1303            .collection_entries
1304            .with_label_values(&[&CollectionType::AuditLog.to_string()])
1305            .add(audit_log_count.into_inner());
1306        let updates = self.snapshot.into_iter().map(|(kind, ts, diff)| {
1307            let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
1308            StateUpdate { kind, ts, diff }
1309        });
1310        catalog.apply_updates_and_consolidate(updates)?;
1311
1312        let catalog_content_version = catalog.catalog_content_version.to_string();
1313        let txn = if is_initialized {
1314            let mut txn = catalog.transaction().await?;
1315
1316            // Ad-hoc migration: Initialize the `migration_version` expected by adapter to be
1317            // present in existing catalogs.
1318            //
1319            // Note: Need to exclude read-only catalog mode here, because in that mode all
1320            // transactions are expected to be no-ops.
1321            // TODO: remove this once we only support upgrades from version >= 0.164
1322            if txn.get_setting("migration_version".into()).is_none() && mode != Mode::Readonly {
1323                let old_version = txn.get_catalog_content_version();
1324                txn.set_setting("migration_version".into(), old_version.map(Into::into))?;
1325            }
1326
1327            txn.set_catalog_content_version(catalog_content_version)?;
1328            txn
1329        } else {
1330            soft_assert_eq_no_log!(
1331                catalog
1332                    .snapshot
1333                    .iter()
1334                    .filter(|(kind, _, _)| !matches!(kind, StateUpdateKind::FenceToken(_)))
1335                    .count(),
1336                0,
1337                "trace should not contain any updates for an uninitialized catalog: {:#?}",
1338                catalog.snapshot
1339            );
1340
1341            let mut txn = catalog.transaction().await?;
1342            initialize::initialize(
1343                &mut txn,
1344                bootstrap_args,
1345                initial_ts.into(),
1346                catalog_content_version,
1347            )
1348            .await?;
1349            txn
1350        };
1351
1352        if read_only {
1353            let (txn_batch, _) = txn.into_parts();
1354            // The upper here doesn't matter because we are only applying the updates in memory.
1355            let updates = StateUpdate::from_txn_batch_ts(txn_batch, catalog.upper);
1356            catalog.apply_updates_and_consolidate(updates)?;
1357        } else {
1358            txn.commit_internal(commit_ts).await?;
1359        }
1360
1361        if matches!(catalog.mode, Mode::Writable) {
1362            let write_handle = catalog
1363                .persist_client
1364                .open_writer::<SourceData, (), Timestamp, i64>(
1365                    catalog.write_handle.shard_id(),
1366                    Arc::new(persist_desc()),
1367                    Arc::new(UnitSchema::default()),
1368                    Diagnostics {
1369                        shard_name: CATALOG_SHARD_NAME.to_string(),
1370                        handle_purpose: "compact catalog".to_string(),
1371                    },
1372                )
1373                .await
1374                .expect("invalid usage");
1375            let fuel = CATALOG_FORCE_COMPACTION_FUEL.handle(catalog.persist_client.dyncfgs());
1376            let wait = CATALOG_FORCE_COMPACTION_WAIT.handle(catalog.persist_client.dyncfgs());
1377            // We're going to gradually turn this on via dyncfgs. Run it in a task so that it
1378            // doesn't block startup.
1379            let _task = mz_ore::task::spawn(|| "catalog::force_shard_compaction", async move {
1380                let () =
1381                    mz_persist_client::cli::admin::dangerous_force_compaction_and_break_pushdown(
1382                        &write_handle,
1383                        || fuel.get(),
1384                        || wait.get(),
1385                    )
1386                    .await;
1387            });
1388        }
1389
1390        Ok((Box::new(catalog), audit_log_handle))
1391    }
1392
1393    /// Reports if the catalog state has been initialized.
1394    ///
1395    /// NOTE: This is the answer as of the last call to [`PersistHandle::sync`] or [`PersistHandle::sync_to_current_upper`],
1396    /// not necessarily what is currently in persist.
1397    #[mz_ore::instrument]
1398    fn is_initialized_inner(&self) -> bool {
1399        !self.update_applier.configs.is_empty()
1400    }
1401
1402    /// Get the current value of config `key`.
1403    ///
1404    /// Some configs need to be read before the catalog is opened for bootstrapping.
1405    #[mz_ore::instrument]
1406    async fn get_current_config(&mut self, key: &str) -> Result<Option<u64>, DurableCatalogError> {
1407        self.sync_to_current_upper().await?;
1408        Ok(self.update_applier.configs.get(key).cloned())
1409    }
1410
1411    /// Get the user version of this instance.
1412    ///
1413    /// The user version is used to determine if a migration is needed.
1414    #[mz_ore::instrument]
1415    pub(crate) async fn get_user_version(&mut self) -> Result<Option<u64>, DurableCatalogError> {
1416        self.get_current_config(USER_VERSION_KEY).await
1417    }
1418
1419    /// Get the current value of setting `name`.
1420    ///
1421    /// Some settings need to be read before the catalog is opened for bootstrapping.
1422    #[mz_ore::instrument]
1423    async fn get_current_setting(
1424        &mut self,
1425        name: &str,
1426    ) -> Result<Option<String>, DurableCatalogError> {
1427        self.sync_to_current_upper().await?;
1428        Ok(self.update_applier.settings.get(name).cloned())
1429    }
1430
1431    /// Get the catalog content version.
1432    ///
1433    /// The catalog content version is the semantic version of the most recent binary that wrote to
1434    /// the catalog.
1435    #[mz_ore::instrument]
1436    async fn get_catalog_content_version(
1437        &mut self,
1438    ) -> Result<Option<semver::Version>, DurableCatalogError> {
1439        let version = self
1440            .get_current_setting(CATALOG_CONTENT_VERSION_KEY)
1441            .await?;
1442        let version = version.map(|version| version.parse().expect("invalid version persisted"));
1443        Ok(version)
1444    }
1445}
1446
1447#[async_trait]
1448impl OpenableDurableCatalogState for UnopenedPersistCatalogState {
1449    #[mz_ore::instrument]
1450    async fn open_savepoint(
1451        mut self: Box<Self>,
1452        initial_ts: Timestamp,
1453        bootstrap_args: &BootstrapArgs,
1454    ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1455        self.open_inner(Mode::Savepoint, initial_ts, bootstrap_args)
1456            .boxed()
1457            .await
1458    }
1459
1460    #[mz_ore::instrument]
1461    async fn open_read_only(
1462        mut self: Box<Self>,
1463        bootstrap_args: &BootstrapArgs,
1464    ) -> Result<Box<dyn DurableCatalogState>, CatalogError> {
1465        self.open_inner(Mode::Readonly, EpochMillis::MIN.into(), bootstrap_args)
1466            .boxed()
1467            .await
1468            .map(|(catalog, _)| catalog)
1469    }
1470
1471    #[mz_ore::instrument]
1472    async fn open(
1473        mut self: Box<Self>,
1474        initial_ts: Timestamp,
1475        bootstrap_args: &BootstrapArgs,
1476    ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1477        self.open_inner(Mode::Writable, initial_ts, bootstrap_args)
1478            .boxed()
1479            .await
1480    }
1481
1482    #[mz_ore::instrument(level = "debug")]
1483    async fn open_debug(mut self: Box<Self>) -> Result<DebugCatalogState, CatalogError> {
1484        Ok(DebugCatalogState(*self))
1485    }
1486
1487    #[mz_ore::instrument]
1488    async fn is_initialized(&mut self) -> Result<bool, CatalogError> {
1489        self.sync_to_current_upper().await?;
1490        Ok(self.is_initialized_inner())
1491    }
1492
1493    #[mz_ore::instrument]
1494    async fn epoch(&mut self) -> Result<Epoch, CatalogError> {
1495        self.sync_to_current_upper().await?;
1496        self.fenceable_token
1497            .validate()?
1498            .map(|token| token.epoch)
1499            .ok_or(CatalogError::Durable(DurableCatalogError::Uninitialized))
1500    }
1501
1502    #[mz_ore::instrument]
1503    async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError> {
1504        self.sync_to_current_upper().await?;
1505        self.fenceable_token
1506            .token()
1507            .map(|token| token.deploy_generation)
1508            .ok_or(CatalogError::Durable(DurableCatalogError::Uninitialized))
1509    }
1510
1511    #[mz_ore::instrument(level = "debug")]
1512    async fn get_0dt_deployment_max_wait(&mut self) -> Result<Option<Duration>, CatalogError> {
1513        let value = self
1514            .get_current_config(WITH_0DT_DEPLOYMENT_MAX_WAIT)
1515            .await?;
1516        match value {
1517            None => Ok(None),
1518            Some(millis) => Ok(Some(Duration::from_millis(millis))),
1519        }
1520    }
1521
1522    #[mz_ore::instrument(level = "debug")]
1523    async fn get_0dt_deployment_ddl_check_interval(
1524        &mut self,
1525    ) -> Result<Option<Duration>, CatalogError> {
1526        let value = self
1527            .get_current_config(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL)
1528            .await?;
1529        match value {
1530            None => Ok(None),
1531            Some(millis) => Ok(Some(Duration::from_millis(millis))),
1532        }
1533    }
1534
1535    #[mz_ore::instrument(level = "debug")]
1536    async fn get_enable_0dt_deployment_panic_after_timeout(
1537        &mut self,
1538    ) -> Result<Option<bool>, CatalogError> {
1539        let value = self
1540            .get_current_config(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT)
1541            .await?;
1542        match value {
1543            None => Ok(None),
1544            Some(0) => Ok(Some(false)),
1545            Some(1) => Ok(Some(true)),
1546            Some(v) => Err(
1547                DurableCatalogError::from(TryFromProtoError::UnknownEnumVariant(format!(
1548                    "{v} is not a valid boolean value"
1549                )))
1550                .into(),
1551            ),
1552        }
1553    }
1554
1555    #[mz_ore::instrument]
1556    async fn has_system_config_synced_once(&mut self) -> Result<bool, DurableCatalogError> {
1557        self.get_current_config(SYSTEM_CONFIG_SYNCED_KEY)
1558            .await
1559            .map(|value| value.map(|value| value > 0).unwrap_or(false))
1560    }
1561
1562    #[mz_ore::instrument]
1563    async fn trace_unconsolidated(&mut self) -> Result<Trace, CatalogError> {
1564        self.sync_to_current_upper().await?;
1565        if self.is_initialized_inner() {
1566            let snapshot = self.snapshot_unconsolidated().await;
1567            Ok(Trace::from_snapshot(snapshot))
1568        } else {
1569            Err(CatalogError::Durable(DurableCatalogError::Uninitialized))
1570        }
1571    }
1572
1573    #[mz_ore::instrument]
1574    async fn trace_consolidated(&mut self) -> Result<Trace, CatalogError> {
1575        self.sync_to_current_upper().await?;
1576        if self.is_initialized_inner() {
1577            let snapshot = self.current_snapshot().await?;
1578            Ok(Trace::from_snapshot(snapshot))
1579        } else {
1580            Err(CatalogError::Durable(DurableCatalogError::Uninitialized))
1581        }
1582    }
1583
1584    #[mz_ore::instrument(level = "debug")]
1585    async fn expire(self: Box<Self>) {
1586        self.expire().await
1587    }
1588}
1589
1590/// Applies updates for an opened catalog.
1591#[derive(Debug)]
1592struct CatalogStateInner {
1593    /// A trace of all catalog updates that can be consumed by some higher layer.
1594    updates: VecDeque<memory::objects::StateUpdate>,
1595}
1596
1597impl CatalogStateInner {
1598    fn new() -> CatalogStateInner {
1599        CatalogStateInner {
1600            updates: VecDeque::new(),
1601        }
1602    }
1603}
1604
1605impl ApplyUpdate<StateUpdateKind> for CatalogStateInner {
1606    fn apply_update(
1607        &mut self,
1608        update: StateUpdate<StateUpdateKind>,
1609        current_fence_token: &mut FenceableToken,
1610        metrics: &Arc<Metrics>,
1611    ) -> Result<Option<StateUpdate<StateUpdateKind>>, FenceError> {
1612        if let Some(collection_type) = update.kind.collection_type() {
1613            metrics
1614                .collection_entries
1615                .with_label_values(&[&collection_type.to_string()])
1616                .add(update.diff.into_inner());
1617        }
1618
1619        {
1620            let update: Option<memory::objects::StateUpdate> = (&update)
1621                .try_into()
1622                .expect("invalid persisted update: {update:#?}");
1623            if let Some(update) = update {
1624                self.updates.push_back(update);
1625            }
1626        }
1627
1628        match (update.kind, update.diff) {
1629            (StateUpdateKind::AuditLog(_, ()), _) => Ok(None),
1630            // Nothing to due for fence token retractions but wait for the next insertion.
1631            (StateUpdateKind::FenceToken(_), Diff::MINUS_ONE) => Ok(None),
1632            (StateUpdateKind::FenceToken(token), Diff::ONE) => {
1633                current_fence_token.maybe_fence(token)?;
1634                Ok(None)
1635            }
1636            (kind, diff) => Ok(Some(StateUpdate {
1637                kind,
1638                ts: update.ts,
1639                diff,
1640            })),
1641        }
1642    }
1643}
1644
1645/// A durable store of the catalog state using Persist as an implementation. The durable store can
1646/// serve any catalog data and transactionally modify catalog data.
1647///
1648/// Production users should call [`Self::expire`] before dropping a [`PersistCatalogState`]
1649/// so that it can expire its leases. If/when rust gets AsyncDrop, this will be done automatically.
1650type PersistCatalogState = PersistHandle<StateUpdateKind, CatalogStateInner>;
1651
1652#[async_trait]
1653impl ReadOnlyDurableCatalogState for PersistCatalogState {
1654    fn epoch(&self) -> Epoch {
1655        self.fenceable_token
1656            .token()
1657            .expect("opened catalog state must have an epoch")
1658            .epoch
1659    }
1660
1661    fn metrics(&self) -> &Metrics {
1662        &self.metrics
1663    }
1664
1665    #[mz_ore::instrument(level = "debug")]
1666    async fn expire(self: Box<Self>) {
1667        self.expire().await
1668    }
1669
1670    fn is_bootstrap_complete(&self) -> bool {
1671        self.bootstrap_complete
1672    }
1673
1674    async fn get_audit_logs(&mut self) -> Result<Vec<VersionedEvent>, CatalogError> {
1675        self.sync_to_current_upper().await?;
1676        let audit_logs: Vec<_> = self
1677            .persist_snapshot()
1678            .await
1679            .filter_map(
1680                |StateUpdate {
1681                     kind,
1682                     ts: _,
1683                     diff: _,
1684                 }| match kind {
1685                    StateUpdateKind::AuditLog(key, ()) => Some(key),
1686                    _ => None,
1687                },
1688            )
1689            .collect();
1690        let mut audit_logs: Vec<_> = audit_logs
1691            .into_iter()
1692            .map(RustType::from_proto)
1693            .map_ok(|key: AuditLogKey| key.event)
1694            .collect::<Result<_, _>>()?;
1695        audit_logs.sort_by(|a, b| a.sortable_id().cmp(&b.sortable_id()));
1696        Ok(audit_logs)
1697    }
1698
1699    #[mz_ore::instrument(level = "debug")]
1700    async fn get_next_id(&mut self, id_type: &str) -> Result<u64, CatalogError> {
1701        self.with_trace(|trace| {
1702            Ok(trace
1703                .into_iter()
1704                .rev()
1705                .filter_map(|(kind, _, _)| match kind {
1706                    StateUpdateKind::IdAllocator(key, value) if key.name == id_type => {
1707                        Some(value.next_id)
1708                    }
1709                    _ => None,
1710                })
1711                .next()
1712                .expect("must exist"))
1713        })
1714        .await
1715    }
1716
1717    #[mz_ore::instrument(level = "debug")]
1718    async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError> {
1719        self.sync_to_current_upper().await?;
1720        Ok(self
1721            .fenceable_token
1722            .token()
1723            .expect("opened catalogs must have a token")
1724            .deploy_generation)
1725    }
1726
1727    #[mz_ore::instrument(level = "debug")]
1728    async fn snapshot(&mut self) -> Result<Snapshot, CatalogError> {
1729        self.with_snapshot(Ok).await
1730    }
1731
1732    #[mz_ore::instrument(level = "debug")]
1733    async fn sync_to_current_updates(
1734        &mut self,
1735    ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError> {
1736        let upper = self.current_upper().await;
1737        self.sync_updates(upper).await
1738    }
1739
1740    #[mz_ore::instrument(level = "debug")]
1741    async fn sync_updates(
1742        &mut self,
1743        target_upper: mz_repr::Timestamp,
1744    ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError> {
1745        self.sync(target_upper).await?;
1746        let mut updates = Vec::new();
1747        while let Some(update) = self.update_applier.updates.front() {
1748            if update.ts >= target_upper {
1749                break;
1750            }
1751
1752            let update = self
1753                .update_applier
1754                .updates
1755                .pop_front()
1756                .expect("peeked above");
1757            updates.push(update);
1758        }
1759        Ok(updates)
1760    }
1761
1762    async fn current_upper(&mut self) -> Timestamp {
1763        self.current_upper().await
1764    }
1765}
1766
1767#[async_trait]
1768#[allow(mismatched_lifetime_syntaxes)]
1769impl DurableCatalogState for PersistCatalogState {
1770    fn is_read_only(&self) -> bool {
1771        matches!(self.mode, Mode::Readonly)
1772    }
1773
1774    fn is_savepoint(&self) -> bool {
1775        matches!(self.mode, Mode::Savepoint)
1776    }
1777
1778    async fn mark_bootstrap_complete(&mut self) {
1779        self.bootstrap_complete = true;
1780        if matches!(self.mode, Mode::Writable) {
1781            self.since_handle
1782                .upgrade_version()
1783                .await
1784                .expect("invalid usage")
1785        }
1786    }
1787
1788    #[mz_ore::instrument(level = "debug")]
1789    async fn transaction(&mut self) -> Result<Transaction, CatalogError> {
1790        self.metrics.transactions_started.inc();
1791        let snapshot = self.snapshot().await?;
1792        let commit_ts = self.upper.clone();
1793        Transaction::new(self, snapshot, commit_ts)
1794    }
1795
1796    fn transaction_from_snapshot(
1797        &mut self,
1798        snapshot: Snapshot,
1799    ) -> Result<Transaction, CatalogError> {
1800        let commit_ts = self.upper.clone();
1801        Transaction::new(self, snapshot, commit_ts)
1802    }
1803
1804    #[mz_ore::instrument(level = "debug")]
1805    async fn commit_transaction(
1806        &mut self,
1807        txn_batch: TransactionBatch,
1808        commit_ts: Timestamp,
1809    ) -> Result<Timestamp, CatalogError> {
1810        async fn commit_transaction_inner(
1811            catalog: &mut PersistCatalogState,
1812            txn_batch: TransactionBatch,
1813            commit_ts: Timestamp,
1814        ) -> Result<Timestamp, CatalogError> {
1815            // If the transaction is empty then we don't error, even in read-only mode.
1816            // This is mostly for legacy reasons (i.e. with enough elbow grease this
1817            // behavior can be changed without breaking any fundamental assumptions).
1818            if catalog.mode == Mode::Readonly {
1819                let updates: Vec<_> = StateUpdate::from_txn_batch(txn_batch).collect();
1820                if !updates.is_empty() {
1821                    let collection_types: Vec<_> = updates
1822                        .iter()
1823                        .filter_map(|u| u.0.collection_type())
1824                        .collect();
1825                    return Err(DurableCatalogError::NotWritable(format!(
1826                        "cannot commit a transaction in a read-only catalog: \
1827                         {} updates across collections: {collection_types:?}",
1828                        updates.len(),
1829                    ))
1830                    .into());
1831                }
1832                return Ok(catalog.upper);
1833            }
1834
1835            // If the current upper does not match the transaction's commit timestamp, then the
1836            // catalog must have changed since the transaction was started, making the transaction
1837            // invalid. When/if we want a multi-writer catalog, this will likely have to change
1838            // from an assert to a retry.
1839            assert_eq!(
1840                catalog.upper, txn_batch.upper,
1841                "only one transaction at a time is supported"
1842            );
1843
1844            assert!(
1845                commit_ts >= catalog.upper,
1846                "expected commit ts, {}, to be greater than or equal to upper, {}",
1847                commit_ts,
1848                catalog.upper
1849            );
1850
1851            let updates = StateUpdate::from_txn_batch(txn_batch).collect();
1852            debug!("committing updates: {updates:?}");
1853
1854            let next_upper = match catalog.mode {
1855                Mode::Writable => catalog
1856                    .compare_and_append(updates, commit_ts)
1857                    .await
1858                    .map_err(|e| e.unwrap_fence_error())?,
1859                Mode::Savepoint => {
1860                    let updates = updates.into_iter().map(|(kind, diff)| StateUpdate {
1861                        kind,
1862                        ts: commit_ts,
1863                        diff,
1864                    });
1865                    catalog.apply_updates_and_consolidate(updates)?;
1866                    catalog.upper = commit_ts.step_forward();
1867                    catalog.upper
1868                }
1869                Mode::Readonly => unreachable!("handled above"),
1870            };
1871
1872            Ok(next_upper)
1873        }
1874        self.metrics.transaction_commits.inc();
1875        let counter = self.metrics.transaction_commit_latency_seconds.clone();
1876        commit_transaction_inner(self, txn_batch, commit_ts)
1877            .wall_time()
1878            .inc_by(counter)
1879            .await
1880    }
1881
1882    #[mz_ore::instrument(level = "debug")]
1883    async fn advance_upper(&mut self, new_upper: Timestamp) -> Result<(), CatalogError> {
1884        if self.upper >= new_upper {
1885            // We don't expect a no-op advancement, but if we are wrong we'd crash the process.
1886            // Seems safer to only soft-assert and return gracefully in production. If we get here
1887            // that means we tried to make the catalog shard readable at a time it was already
1888            // readable, which likely means we are violating linearizability. That's not great, but
1889            // crashing (or even crash-looping) is worse.
1890            //
1891            // TODO: Consider removing this once we have built some confidence.
1892            soft_panic_or_log!(
1893                "new_upper ({new_upper}) not greater than current upper ({})",
1894                self.upper
1895            );
1896            return Ok(());
1897        }
1898
1899        match self.mode {
1900            Mode::Writable => self
1901                .compare_and_append_inner([], new_upper)
1902                .await
1903                .map_err(|e| e.unwrap_fence_error())?,
1904            Mode::Savepoint => (),
1905            Mode::Readonly => {
1906                return Err(DurableCatalogError::NotWritable(
1907                    "cannot advance upper of a read-only catalog".into(),
1908                )
1909                .into());
1910            }
1911        }
1912
1913        self.upper = new_upper;
1914        // No sync needed since no data was written.
1915
1916        Ok(())
1917    }
1918
1919    fn shard_id(&self) -> ShardId {
1920        self.shard_id
1921    }
1922}
1923
1924/// Deterministically generate a shard ID for the given `organization_id` and `seed`.
1925pub fn shard_id(organization_id: Uuid, seed: usize) -> ShardId {
1926    let hash = sha2::Sha256::digest(format!("{organization_id}{seed}")).to_vec();
1927    soft_assert_eq_or_log!(hash.len(), 32, "SHA256 returns 32 bytes (256 bits)");
1928    let uuid = Uuid::from_slice(&hash[0..16]).expect("from_slice accepts exactly 16 bytes");
1929    ShardId::from_str(&format!("s{uuid}")).expect("known to be valid")
1930}
1931
1932/// Generates a timestamp for reading from `read_handle` that is as fresh as possible, given
1933/// `upper`.
1934fn as_of(
1935    read_handle: &ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1936    upper: Timestamp,
1937) -> Timestamp {
1938    let since = read_handle.since().clone();
1939    let mut as_of = upper.checked_sub(1).unwrap_or_else(|| {
1940        panic!("catalog persist shard should be initialize, found upper: {upper:?}")
1941    });
1942    // We only downgrade the since after writing, and we always set the since to one less than the
1943    // upper.
1944    soft_assert_or_log!(
1945        since.less_equal(&as_of),
1946        "since={since:?}, as_of={as_of:?}; since must be less than or equal to as_of"
1947    );
1948    // This should be a no-op if the assert above passes, however if it doesn't then we'd like to
1949    // continue with a correct timestamp instead of entering a panic loop.
1950    as_of.advance_by(since.borrow());
1951    as_of
1952}
1953
1954/// Fetch the persist version of the catalog shard, if one exists. A version will not
1955/// exist if we are creating a brand-new environment.
1956async fn fetch_catalog_shard_version(
1957    persist_client: &PersistClient,
1958    catalog_shard_id: ShardId,
1959) -> Option<semver::Version> {
1960    let shard_state = persist_client
1961        .inspect_shard::<Timestamp>(&catalog_shard_id)
1962        .await
1963        .ok()?;
1964    let json_state = serde_json::to_value(shard_state).expect("state serialization error");
1965    let json_version = json_state
1966        .get("applier_version")
1967        .cloned()
1968        .expect("missing applier_version");
1969    let version = serde_json::from_value(json_version).expect("version deserialization error");
1970    Some(version)
1971}
1972
1973/// Generates an iterator of [`StateUpdate`] that contain all updates to the catalog
1974/// state up to, and including, `as_of`.
1975///
1976/// The output is consolidated and sorted by timestamp in ascending order.
1977#[mz_ore::instrument(level = "debug")]
1978async fn snapshot_binary(
1979    read_handle: &mut ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1980    as_of: Timestamp,
1981    metrics: &Arc<Metrics>,
1982) -> impl Iterator<Item = StateUpdate<StateUpdateKindJson>> + DoubleEndedIterator + use<> {
1983    metrics.snapshots_taken.inc();
1984    let counter = metrics.snapshot_latency_seconds.clone();
1985    snapshot_binary_inner(read_handle, as_of)
1986        .wall_time()
1987        .inc_by(counter)
1988        .await
1989}
1990
1991/// Generates an iterator of [`StateUpdate`] that contain all updates to the catalog
1992/// state up to, and including, `as_of`.
1993///
1994/// The output is consolidated and sorted by timestamp in ascending order.
1995#[mz_ore::instrument(level = "debug")]
1996async fn snapshot_binary_inner(
1997    read_handle: &mut ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1998    as_of: Timestamp,
1999) -> impl Iterator<Item = StateUpdate<StateUpdateKindJson>> + DoubleEndedIterator + use<> {
2000    let snapshot = read_handle
2001        .snapshot_and_fetch(Antichain::from_elem(as_of))
2002        .await
2003        .expect("we have advanced the restart_as_of by the since");
2004    soft_assert_no_log!(
2005        snapshot.iter().all(|(_, _, diff)| *diff == 1),
2006        "snapshot_and_fetch guarantees a consolidated result: {snapshot:#?}"
2007    );
2008    snapshot
2009        .into_iter()
2010        .map(Into::<StateUpdate<StateUpdateKindJson>>::into)
2011        .sorted_by(|a, b| Ord::cmp(&b.ts, &a.ts))
2012}
2013
2014/// Convert an [`Antichain<Timestamp>`] to a [`Timestamp`].
2015///
2016/// The correctness of this function relies on [`Timestamp`] being totally ordered and never
2017/// finalizing the catalog shard.
2018pub(crate) fn antichain_to_timestamp(antichain: Antichain<Timestamp>) -> Timestamp {
2019    antichain
2020        .into_option()
2021        .expect("we use a totally ordered time and never finalize the shard")
2022}
2023
2024// Debug methods used by the catalog-debug tool.
2025
2026impl Trace {
2027    /// Generates a [`Trace`] from snapshot.
2028    fn from_snapshot(snapshot: impl IntoIterator<Item = StateUpdate>) -> Trace {
2029        let mut trace = Trace::new();
2030        for StateUpdate { kind, ts, diff } in snapshot {
2031            match kind {
2032                StateUpdateKind::AuditLog(k, v) => trace.audit_log.values.push(((k, v), ts, diff)),
2033                StateUpdateKind::Cluster(k, v) => trace.clusters.values.push(((k, v), ts, diff)),
2034                StateUpdateKind::ClusterReplica(k, v) => {
2035                    trace.cluster_replicas.values.push(((k, v), ts, diff))
2036                }
2037                StateUpdateKind::Comment(k, v) => trace.comments.values.push(((k, v), ts, diff)),
2038                StateUpdateKind::Config(k, v) => trace.configs.values.push(((k, v), ts, diff)),
2039                StateUpdateKind::Database(k, v) => trace.databases.values.push(((k, v), ts, diff)),
2040                StateUpdateKind::DefaultPrivilege(k, v) => {
2041                    trace.default_privileges.values.push(((k, v), ts, diff))
2042                }
2043                StateUpdateKind::FenceToken(_) => {
2044                    // Fence token not included in trace.
2045                }
2046                StateUpdateKind::IdAllocator(k, v) => {
2047                    trace.id_allocator.values.push(((k, v), ts, diff))
2048                }
2049                StateUpdateKind::IntrospectionSourceIndex(k, v) => {
2050                    trace.introspection_sources.values.push(((k, v), ts, diff))
2051                }
2052                StateUpdateKind::Item(k, v) => trace.items.values.push(((k, v), ts, diff)),
2053                StateUpdateKind::NetworkPolicy(k, v) => {
2054                    trace.network_policies.values.push(((k, v), ts, diff))
2055                }
2056                StateUpdateKind::Role(k, v) => trace.roles.values.push(((k, v), ts, diff)),
2057                StateUpdateKind::Schema(k, v) => trace.schemas.values.push(((k, v), ts, diff)),
2058                StateUpdateKind::Setting(k, v) => trace.settings.values.push(((k, v), ts, diff)),
2059                StateUpdateKind::SourceReferences(k, v) => {
2060                    trace.source_references.values.push(((k, v), ts, diff))
2061                }
2062                StateUpdateKind::SystemConfiguration(k, v) => {
2063                    trace.system_configurations.values.push(((k, v), ts, diff))
2064                }
2065                StateUpdateKind::ClusterSystemConfiguration(k, v) => trace
2066                    .cluster_system_configurations
2067                    .values
2068                    .push(((k, v), ts, diff)),
2069                StateUpdateKind::ReplicaSystemConfiguration(k, v) => trace
2070                    .replica_system_configurations
2071                    .values
2072                    .push(((k, v), ts, diff)),
2073                StateUpdateKind::SystemObjectMapping(k, v) => {
2074                    trace.system_object_mappings.values.push(((k, v), ts, diff))
2075                }
2076                StateUpdateKind::SystemPrivilege(k, v) => {
2077                    trace.system_privileges.values.push(((k, v), ts, diff))
2078                }
2079                StateUpdateKind::StorageCollectionMetadata(k, v) => trace
2080                    .storage_collection_metadata
2081                    .values
2082                    .push(((k, v), ts, diff)),
2083                StateUpdateKind::UnfinalizedShard(k, ()) => {
2084                    trace.unfinalized_shards.values.push(((k, ()), ts, diff))
2085                }
2086                StateUpdateKind::TxnWalShard((), v) => {
2087                    trace.txn_wal_shard.values.push((((), v), ts, diff))
2088                }
2089                StateUpdateKind::RoleAuth(k, v) => trace.role_auth.values.push(((k, v), ts, diff)),
2090            }
2091        }
2092        trace
2093    }
2094}
2095
2096impl UnopenedPersistCatalogState {
2097    /// Manually update value of `key` in collection `T` to `value`.
2098    #[mz_ore::instrument]
2099    pub(crate) async fn debug_edit<T: Collection>(
2100        &mut self,
2101        key: T::Key,
2102        value: T::Value,
2103    ) -> Result<Option<T::Value>, CatalogError>
2104    where
2105        T::Key: PartialEq + Eq + Debug + Clone,
2106        T::Value: Debug + Clone,
2107    {
2108        let prev_value = loop {
2109            let key = key.clone();
2110            let value = value.clone();
2111            let snapshot = self.current_snapshot().await?;
2112            let trace = Trace::from_snapshot(snapshot);
2113            let collection_trace = T::collection_trace(trace);
2114            let prev_values: Vec<_> = collection_trace
2115                .values
2116                .into_iter()
2117                .filter(|((k, _), _, diff)| {
2118                    soft_assert_eq_or_log!(*diff, Diff::ONE, "trace is consolidated");
2119                    &key == k
2120                })
2121                .collect();
2122
2123            let prev_value = match &prev_values[..] {
2124                [] => None,
2125                [((_, v), _, _)] => Some(v.clone()),
2126                prev_values => panic!("multiple values found for key {key:?}: {prev_values:?}"),
2127            };
2128
2129            let mut updates: Vec<_> = prev_values
2130                .into_iter()
2131                .map(|((k, v), _, _)| (T::update(k, v), Diff::MINUS_ONE))
2132                .collect();
2133            updates.push((T::update(key, value), Diff::ONE));
2134            // We must fence out all other catalogs, if we haven't already, since we are writing.
2135            match self.fenceable_token.generate_unfenced_token(self.mode)? {
2136                Some((fence_updates, current_fenceable_token)) => {
2137                    updates.extend(fence_updates.clone());
2138                    match self.compare_and_append(updates, self.upper).await {
2139                        Ok(_) => {
2140                            self.fenceable_token = current_fenceable_token;
2141                            break prev_value;
2142                        }
2143                        Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
2144                        Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
2145                            warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
2146                            continue;
2147                        }
2148                    }
2149                }
2150                None => {
2151                    self.compare_and_append(updates, self.upper)
2152                        .await
2153                        .map_err(|e| e.unwrap_fence_error())?;
2154                    break prev_value;
2155                }
2156            }
2157        };
2158        Ok(prev_value)
2159    }
2160
2161    /// Manually delete `key` from collection `T`.
2162    #[mz_ore::instrument]
2163    pub(crate) async fn debug_delete<T: Collection>(
2164        &mut self,
2165        key: T::Key,
2166    ) -> Result<(), CatalogError>
2167    where
2168        T::Key: PartialEq + Eq + Debug + Clone,
2169        T::Value: Debug,
2170    {
2171        loop {
2172            let key = key.clone();
2173            let snapshot = self.current_snapshot().await?;
2174            let trace = Trace::from_snapshot(snapshot);
2175            let collection_trace = T::collection_trace(trace);
2176            let mut retractions: Vec<_> = collection_trace
2177                .values
2178                .into_iter()
2179                .filter(|((k, _), _, diff)| {
2180                    soft_assert_eq_or_log!(*diff, Diff::ONE, "trace is consolidated");
2181                    &key == k
2182                })
2183                .map(|((k, v), _, _)| (T::update(k, v), Diff::MINUS_ONE))
2184                .collect();
2185
2186            // We must fence out all other catalogs, if we haven't already, since we are writing.
2187            match self.fenceable_token.generate_unfenced_token(self.mode)? {
2188                Some((fence_updates, current_fenceable_token)) => {
2189                    retractions.extend(fence_updates.clone());
2190                    match self.compare_and_append(retractions, self.upper).await {
2191                        Ok(_) => {
2192                            self.fenceable_token = current_fenceable_token;
2193                            break;
2194                        }
2195                        Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
2196                        Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
2197                            warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
2198                            continue;
2199                        }
2200                    }
2201                }
2202                None => {
2203                    self.compare_and_append(retractions, self.upper)
2204                        .await
2205                        .map_err(|e| e.unwrap_fence_error())?;
2206                    break;
2207                }
2208            }
2209        }
2210        Ok(())
2211    }
2212
2213    /// Generates a [`Vec<StateUpdate>`] that contain all updates to the catalog
2214    /// state.
2215    ///
2216    /// The output is consolidated and sorted by timestamp in ascending order and the current upper.
2217    async fn current_snapshot(
2218        &mut self,
2219    ) -> Result<impl IntoIterator<Item = StateUpdate> + '_, CatalogError> {
2220        self.sync_to_current_upper().await?;
2221        self.consolidate();
2222        Ok(self.snapshot.iter().cloned().map(|(kind, ts, diff)| {
2223            let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
2224            StateUpdate { kind, ts, diff }
2225        }))
2226    }
2227}