Skip to main content

mz_catalog/durable/
persist.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10#[cfg(test)]
11mod tests;
12
13use std::cmp::max;
14use std::collections::{BTreeMap, VecDeque};
15use std::fmt::Debug;
16use std::str::FromStr;
17use std::sync::{Arc, LazyLock};
18use std::time::{Duration, Instant};
19
20use async_trait::async_trait;
21use differential_dataflow::lattice::Lattice;
22use futures::{FutureExt, StreamExt};
23use itertools::Itertools;
24use mz_audit_log::VersionedEvent;
25use mz_ore::metrics::MetricsFutureExt;
26use mz_ore::now::EpochMillis;
27use mz_ore::{
28    soft_assert_eq_no_log, soft_assert_eq_or_log, soft_assert_ne_or_log, soft_assert_no_log,
29    soft_assert_or_log, soft_panic_or_log,
30};
31use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
32use mz_persist_client::cli::admin::{CATALOG_FORCE_COMPACTION_FUEL, CATALOG_FORCE_COMPACTION_WAIT};
33use mz_persist_client::critical::{CriticalReaderId, Opaque, SinceHandle};
34use mz_persist_client::error::UpperMismatch;
35use mz_persist_client::read::{Listen, ListenEvent, ReadHandle};
36use mz_persist_client::write::WriteHandle;
37use mz_persist_client::{Diagnostics, PersistClient, ShardId};
38use mz_persist_types::codec_impls::UnitSchema;
39use mz_proto::{RustType, TryFromProtoError};
40use mz_repr::Diff;
41use mz_storage_client::controller::PersistEpoch;
42use mz_storage_types::StorageDiff;
43use mz_storage_types::sources::SourceData;
44use sha2::Digest;
45use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
46use tracing::{debug, info, warn};
47use uuid::Uuid;
48
49use crate::durable::debug::{Collection, CollectionType, DebugCatalogState, Trace};
50use crate::durable::error::FenceError;
51use crate::durable::initialize::{
52    ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, SYSTEM_CONFIG_SYNCED_KEY, USER_VERSION_KEY,
53    WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
54};
55use crate::durable::metrics::Metrics;
56use crate::durable::objects::state_update::{
57    IntoStateUpdateKindJson, StateUpdate, StateUpdateKind, StateUpdateKindJson,
58    TryIntoStateUpdateKind,
59};
60use crate::durable::objects::{AuditLogKey, FenceToken, Snapshot};
61use crate::durable::transaction::TransactionBatch;
62use crate::durable::upgrade::upgrade;
63use crate::durable::{
64    AuditLogIterator, BootstrapArgs, CATALOG_CONTENT_VERSION_KEY, CatalogError,
65    DurableCatalogError, DurableCatalogState, Epoch, OpenableDurableCatalogState,
66    ReadOnlyDurableCatalogState, Transaction, initialize, persist_desc,
67};
68use crate::memory;
69
70/// New-type used to represent timestamps in persist.
71pub(crate) type Timestamp = mz_repr::Timestamp;
72
73/// The minimum value of an epoch.
74///
75/// # Safety
76/// `new_unchecked` is safe to call with a non-zero value.
77const MIN_EPOCH: Epoch = unsafe { Epoch::new_unchecked(1) };
78
79/// Human readable catalog shard name.
80const CATALOG_SHARD_NAME: &str = "catalog";
81
82/// [`CriticalReaderId`] for the catalog shard's own since hold, separate from
83/// [`PersistClient::CONTROLLER_CRITICAL_SINCE`].
84static CATALOG_CRITICAL_SINCE: LazyLock<CriticalReaderId> = LazyLock::new(|| {
85    "c55555555-6666-7777-8888-999999999999"
86        .parse()
87        .expect("valid CriticalReaderId")
88});
89
90/// Seed used to generate the persist shard ID for the catalog.
91const CATALOG_SEED: usize = 1;
92/// Legacy seed used to generate the persist shard ID for the upgrade shard. DO NOT REUSE.
93const _UPGRADE_SEED: usize = 2;
94/// Legacy seed used to generate the persist shard ID for builtin table migrations. DO NOT REUSE.
95pub const _BUILTIN_MIGRATION_SEED: usize = 3;
96/// Legacy seed used to generate the persist shard ID for the expression cache. DO NOT REUSE.
97pub const _EXPRESSION_CACHE_SEED: usize = 4;
98
99/// Durable catalog mode that dictates the effect of mutable operations.
100#[derive(Debug, Copy, Clone, Eq, PartialEq)]
101pub(crate) enum Mode {
102    /// Mutable operations are prohibited.
103    Readonly,
104    /// Mutable operations have an effect in-memory, but aren't persisted durably.
105    Savepoint,
106    /// Mutable operations have an effect in-memory and durably.
107    Writable,
108}
109
110/// Enum representing the fenced state of the catalog.
111#[derive(Debug)]
112pub(crate) enum FenceableToken {
113    /// The catalog is still initializing and learning about previously written fence tokens. This
114    /// state can be fenced if it encounters a larger deploy generation.
115    Initializing {
116        /// The largest fence token durably written to the catalog, if any.
117        durable_token: Option<FenceToken>,
118        /// This process's deploy generation.
119        current_deploy_generation: Option<u64>,
120    },
121    /// The current token has not been fenced.
122    Unfenced { current_token: FenceToken },
123    /// The current token has been fenced.
124    Fenced {
125        current_token: FenceToken,
126        fence_token: FenceToken,
127    },
128}
129
130impl FenceableToken {
131    /// Returns a new token.
132    fn new(current_deploy_generation: Option<u64>) -> Self {
133        Self::Initializing {
134            durable_token: None,
135            current_deploy_generation,
136        }
137    }
138
139    /// Returns the current token if it is not fenced, otherwise returns an error.
140    fn validate(&self) -> Result<Option<FenceToken>, FenceError> {
141        match self {
142            FenceableToken::Initializing { durable_token, .. } => Ok(durable_token.clone()),
143            FenceableToken::Unfenced { current_token, .. } => Ok(Some(current_token.clone())),
144            FenceableToken::Fenced {
145                current_token,
146                fence_token,
147            } => {
148                assert!(
149                    fence_token > current_token,
150                    "must be fenced by higher token; current={current_token:?}, fence={fence_token:?}"
151                );
152                if fence_token.deploy_generation > current_token.deploy_generation {
153                    Err(FenceError::DeployGeneration {
154                        current_generation: current_token.deploy_generation,
155                        fence_generation: fence_token.deploy_generation,
156                    })
157                } else {
158                    assert!(
159                        fence_token.epoch > current_token.epoch,
160                        "must be fenced by higher token; current={current_token:?}, fence={fence_token:?}"
161                    );
162                    Err(FenceError::Epoch {
163                        current_epoch: current_token.epoch,
164                        fence_epoch: fence_token.epoch,
165                    })
166                }
167            }
168        }
169    }
170
171    /// Returns the current token.
172    fn token(&self) -> Option<FenceToken> {
173        match self {
174            FenceableToken::Initializing { durable_token, .. } => durable_token.clone(),
175            FenceableToken::Unfenced { current_token, .. } => Some(current_token.clone()),
176            FenceableToken::Fenced { current_token, .. } => Some(current_token.clone()),
177        }
178    }
179
180    /// Returns `Err` if `token` fences out `self`, `Ok` otherwise.
181    fn maybe_fence(&mut self, token: FenceToken) -> Result<(), FenceError> {
182        match self {
183            FenceableToken::Initializing {
184                durable_token,
185                current_deploy_generation,
186                ..
187            } => {
188                match durable_token {
189                    Some(durable_token) => {
190                        *durable_token = max(durable_token.clone(), token.clone());
191                    }
192                    None => {
193                        *durable_token = Some(token.clone());
194                    }
195                }
196                if let Some(current_deploy_generation) = current_deploy_generation {
197                    if *current_deploy_generation < token.deploy_generation {
198                        *self = FenceableToken::Fenced {
199                            current_token: FenceToken {
200                                deploy_generation: *current_deploy_generation,
201                                epoch: token.epoch,
202                            },
203                            fence_token: token,
204                        };
205                        self.validate()?;
206                    }
207                }
208            }
209            FenceableToken::Unfenced { current_token } => {
210                if *current_token < token {
211                    *self = FenceableToken::Fenced {
212                        current_token: current_token.clone(),
213                        fence_token: token,
214                    };
215                    self.validate()?;
216                }
217            }
218            FenceableToken::Fenced { .. } => {
219                self.validate()?;
220            }
221        }
222
223        Ok(())
224    }
225
226    /// Returns a [`FenceableToken::Unfenced`] token and the updates to the catalog required to
227    /// transition to the `Unfenced` state if `self` is [`FenceableToken::Initializing`], otherwise
228    /// returns `None`.
229    fn generate_unfenced_token(
230        &self,
231        mode: Mode,
232    ) -> Result<Option<(Vec<(StateUpdateKind, Diff)>, FenceableToken)>, DurableCatalogError> {
233        let (durable_token, current_deploy_generation) = match self {
234            FenceableToken::Initializing {
235                durable_token,
236                current_deploy_generation,
237            } => (durable_token.clone(), current_deploy_generation.clone()),
238            FenceableToken::Unfenced { .. } | FenceableToken::Fenced { .. } => return Ok(None),
239        };
240
241        let mut fence_updates = Vec::with_capacity(2);
242
243        if let Some(durable_token) = &durable_token {
244            fence_updates.push((
245                StateUpdateKind::FenceToken(durable_token.clone()),
246                Diff::MINUS_ONE,
247            ));
248        }
249
250        let current_deploy_generation = current_deploy_generation
251            .or_else(|| durable_token.as_ref().map(|token| token.deploy_generation))
252            // We cannot initialize a catalog without a deploy generation.
253            .ok_or(DurableCatalogError::Uninitialized)?;
254        let mut current_epoch = durable_token
255            .map(|token| token.epoch)
256            .unwrap_or(MIN_EPOCH)
257            .get();
258        // Only writable catalogs attempt to increment the epoch.
259        if matches!(mode, Mode::Writable) {
260            current_epoch = current_epoch + 1;
261        }
262        let current_epoch = Epoch::new(current_epoch).expect("known to be non-zero");
263        let current_token = FenceToken {
264            deploy_generation: current_deploy_generation,
265            epoch: current_epoch,
266        };
267
268        fence_updates.push((
269            StateUpdateKind::FenceToken(current_token.clone()),
270            Diff::ONE,
271        ));
272
273        let current_fenceable_token = FenceableToken::Unfenced { current_token };
274
275        Ok(Some((fence_updates, current_fenceable_token)))
276    }
277}
278
279/// An error that can occur while executing [`PersistHandle::compare_and_append`].
280#[derive(Debug, thiserror::Error)]
281pub(crate) enum CompareAndAppendError {
282    #[error(transparent)]
283    Fence(#[from] FenceError),
284    /// Catalog encountered an upper mismatch when trying to write to the catalog. This should only
285    /// happen while trying to fence out other catalogs.
286    #[error(
287        "expected catalog upper {expected_upper:?} did not match actual catalog upper {actual_upper:?}"
288    )]
289    UpperMismatch {
290        expected_upper: Timestamp,
291        actual_upper: Timestamp,
292    },
293}
294
295impl CompareAndAppendError {
296    pub(crate) fn unwrap_fence_error(self) -> FenceError {
297        match self {
298            CompareAndAppendError::Fence(e) => e,
299            e @ CompareAndAppendError::UpperMismatch { .. } => {
300                panic!("unexpected upper mismatch: {e:?}")
301            }
302        }
303    }
304}
305
306impl From<UpperMismatch<Timestamp>> for CompareAndAppendError {
307    fn from(upper_mismatch: UpperMismatch<Timestamp>) -> Self {
308        Self::UpperMismatch {
309            expected_upper: antichain_to_timestamp(upper_mismatch.expected),
310            actual_upper: antichain_to_timestamp(upper_mismatch.current),
311        }
312    }
313}
314
315pub(crate) trait ApplyUpdate<T: IntoStateUpdateKindJson> {
316    /// Process and apply `update`.
317    ///
318    /// Returns `Some` if `update` should be cached in memory and `None` otherwise.
319    fn apply_update(
320        &mut self,
321        update: StateUpdate<T>,
322        current_fence_token: &mut FenceableToken,
323        metrics: &Arc<Metrics>,
324    ) -> Result<Option<StateUpdate<T>>, FenceError>;
325}
326
327/// A handle for interacting with the persist catalog shard.
328///
329/// The catalog shard is used in multiple different contexts, for example pre-open and post-open,
330/// but for all contexts the majority of the durable catalog's behavior is identical. This struct
331/// implements those behaviors that are identical while allowing the user to specify the different
332/// behaviors via generic parameters.
333///
334/// The behavior of the durable catalog can be different along one of two axes. The first is the
335/// format of each individual update, i.e. raw binary, the current protobuf version, previous
336/// protobuf versions, etc. The second axis is what to do with each individual update, for example
337/// before opening we cache all config updates but don't cache them after opening. These behaviors
338/// are customizable via the `T: TryIntoStateUpdateKind` and `U: ApplyUpdate<T>` generic parameters
339/// respectively.
340#[derive(Debug)]
341pub(crate) struct PersistHandle<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> {
342    /// The [`Mode`] that this catalog was opened in.
343    pub(crate) mode: Mode,
344    /// Since handle to control compaction.
345    since_handle: SinceHandle<SourceData, (), Timestamp, StorageDiff>,
346    /// Write handle to persist.
347    write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
348    /// Listener to catalog changes.
349    listen: Listen<SourceData, (), Timestamp, StorageDiff>,
350    /// Handle for connecting to persist.
351    persist_client: PersistClient,
352    /// Catalog shard ID.
353    shard_id: ShardId,
354    /// Cache of the most recent catalog snapshot.
355    ///
356    /// We use a tuple instead of [`StateUpdate`] to make consolidation easier.
357    pub(crate) snapshot: Vec<(T, Timestamp, Diff)>,
358    /// Applies custom processing, filtering, and fencing for each individual update.
359    update_applier: U,
360    /// The current upper of the persist shard.
361    pub(crate) upper: Timestamp,
362    /// The fence token of the catalog, if one exists.
363    fenceable_token: FenceableToken,
364    /// The semantic version of the current binary.
365    catalog_content_version: semver::Version,
366    /// Flag to indicate if bootstrap is complete.
367    bootstrap_complete: bool,
368    /// Metrics for the persist catalog.
369    metrics: Arc<Metrics>,
370    /// Snapshot size at the last amortized consolidation, used by
371    /// [`Self::maybe_consolidate`] to decide when to consolidate. Initialized
372    /// lazily on the first call.
373    size_at_last_consolidation: Option<usize>,
374}
375
376impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
377    /// Fetch the current upper of the catalog state.
378    #[mz_ore::instrument]
379    async fn current_upper(&mut self) -> Timestamp {
380        match self.mode {
381            Mode::Writable | Mode::Readonly => {
382                let upper = self.write_handle.fetch_recent_upper().await;
383                antichain_to_timestamp(upper.clone())
384            }
385            Mode::Savepoint => self.upper,
386        }
387    }
388
389    /// Appends `updates` iff the current global upper of the catalog is `self.upper`.
390    ///
391    /// Returns the next upper used to commit the transaction.
392    #[mz_ore::instrument]
393    pub(crate) async fn compare_and_append<S: IntoStateUpdateKindJson>(
394        &mut self,
395        updates: Vec<(S, Diff)>,
396        commit_ts: Timestamp,
397    ) -> Result<Timestamp, CompareAndAppendError> {
398        // The fencing check is expensive, so run it only with soft assertions enabled.
399        let contains_fence = if mz_ore::assert::soft_assertions_enabled() {
400            let parsed_updates: Vec<_> = updates
401                .clone()
402                .into_iter()
403                .filter_map(|(update, diff)| {
404                    let update: StateUpdateKindJson = update.into();
405                    let update = TryIntoStateUpdateKind::try_into(update).ok()?;
406                    Some((update, diff))
407                })
408                .collect();
409            let contains_retraction = parsed_updates.iter().any(|(update, diff)| {
410                matches!(update, StateUpdateKind::FenceToken(..)) && *diff == Diff::MINUS_ONE
411            });
412            let contains_addition = parsed_updates.iter().any(|(update, diff)| {
413                matches!(update, StateUpdateKind::FenceToken(..)) && *diff == Diff::ONE
414            });
415            Some(contains_retraction && contains_addition)
416        } else {
417            None
418        };
419
420        let updates = updates.into_iter().map(|(kind, diff)| {
421            let kind: StateUpdateKindJson = kind.into();
422            (
423                (Into::<SourceData>::into(kind), ()),
424                commit_ts,
425                diff.into_inner(),
426            )
427        });
428        let next_upper = commit_ts.step_forward();
429        self.compare_and_append_inner(updates, next_upper)
430            .await
431            .inspect_err(|e| {
432                // A compare-and-append failure means someone else must have written to the
433                // catalog. We expect to have been fenced out, since writing to the catalog without
434                // fencing other catalogs should be impossible. The one exception is if we are
435                // trying to fence other catalogs with this write.
436                if let Some(contains_fence) = contains_fence {
437                    soft_assert_or_log!(
438                        matches!(e, CompareAndAppendError::Fence(_)) || contains_fence,
439                        "encountered an upper mismatch on a non-fencing write"
440                    );
441                }
442            })?;
443
444        self.sync(next_upper).await?;
445        Ok(next_upper)
446    }
447
448    /// Compare-and-append `updates` to the catalog shard, advancing the upper to `next_upper`.
449    ///
450    /// On success, updating `self.upper` is left to the caller. The caller can thus decide whether
451    /// or not it needs to sync the catalog.
452    ///
453    /// # Panics
454    ///
455    /// Panics if not in `Writable` mode.
456    /// Panics if `next_upper` is not greater than `self.upper`.
457    async fn compare_and_append_inner(
458        &mut self,
459        updates: impl IntoIterator<Item = ((SourceData, ()), Timestamp, StorageDiff)>,
460        next_upper: Timestamp,
461    ) -> Result<(), CompareAndAppendError> {
462        assert_eq!(self.mode, Mode::Writable);
463        assert!(
464            next_upper > self.upper,
465            "next_upper ({next_upper}) not greater than current upper ({})",
466            self.upper,
467        );
468
469        let res = self
470            .write_handle
471            .compare_and_append(
472                updates,
473                Antichain::from_elem(self.upper),
474                Antichain::from_elem(next_upper),
475            )
476            .await
477            .expect("invalid usage");
478
479        if let Err(e @ UpperMismatch { .. }) = res {
480            // Most likely we were fenced out.
481            // Sync to the current upper to detect that.
482            self.sync_to_current_upper().await?;
483            return Err(e.into());
484        }
485
486        // Lag the shard's upper by 1 to keep it readable.
487        let downgrade_to = Antichain::from_elem(next_upper.saturating_sub(1));
488
489        // The since handle gives us the ability to fence out other downgraders using an opaque token.
490        // (See the method documentation for details.)
491        // That's not needed here, so we use the since handle's opaque token to avoid any comparison
492        // failures.
493        let opaque = self.since_handle.opaque().clone();
494        let downgrade = self
495            .since_handle
496            .maybe_compare_and_downgrade_since(&opaque, (&opaque, &downgrade_to))
497            .await;
498        if let Some(Err(e)) = downgrade {
499            soft_panic_or_log!("found opaque value {e:?}, but expected {opaque:?}");
500        }
501
502        Ok(())
503    }
504
505    /// Generates an iterator of [`StateUpdate`] that contain all unconsolidated updates to the
506    /// catalog state up to, and including, `as_of`.
507    #[mz_ore::instrument]
508    async fn snapshot_unconsolidated(&mut self) -> Vec<StateUpdate<StateUpdateKind>> {
509        let current_upper = self.current_upper().await;
510
511        let mut snapshot = Vec::new();
512        let mut read_handle = self.read_handle().await;
513        let as_of = as_of(&read_handle, current_upper);
514        let mut stream = Box::pin(
515            // We use `snapshot_and_stream` because it guarantees unconsolidated output.
516            read_handle
517                .snapshot_and_stream(Antichain::from_elem(as_of))
518                .await
519                .expect("we have advanced the restart_as_of by the since"),
520        );
521        while let Some(update) = stream.next().await {
522            snapshot.push(update)
523        }
524        read_handle.expire().await;
525        snapshot
526            .into_iter()
527            .map(Into::<StateUpdate<StateUpdateKindJson>>::into)
528            .map(|state_update| state_update.try_into().expect("kind decoding error"))
529            .collect()
530    }
531
532    /// Listen and apply all updates that are currently in persist.
533    ///
534    /// Returns an error if this instance has been fenced out.
535    #[mz_ore::instrument]
536    pub(crate) async fn sync_to_current_upper(&mut self) -> Result<(), FenceError> {
537        let upper = self.current_upper().await;
538        self.sync(upper).await
539    }
540
541    /// Listen and apply all updates up to `target_upper`.
542    ///
543    /// Returns an error if this instance has been fenced out.
544    #[mz_ore::instrument(level = "debug")]
545    pub(crate) async fn sync(&mut self, target_upper: Timestamp) -> Result<(), FenceError> {
546        self.metrics.syncs.inc();
547        let counter = self.metrics.sync_latency_seconds.clone();
548        self.sync_inner(target_upper)
549            .wall_time()
550            .inc_by(counter)
551            .await
552    }
553
554    #[mz_ore::instrument(level = "debug")]
555    async fn sync_inner(&mut self, target_upper: Timestamp) -> Result<(), FenceError> {
556        self.fenceable_token.validate()?;
557
558        // Savepoint catalogs do not yet know how to update themselves in response to concurrent
559        // writes from writer catalogs.
560        if self.mode == Mode::Savepoint {
561            self.upper = max(self.upper, target_upper);
562            return Ok(());
563        }
564
565        let mut updates: BTreeMap<_, Vec<_>> = BTreeMap::new();
566
567        // Reset the amortized consolidation tracker so it picks up the
568        // current snapshot size as its baseline.
569        self.size_at_last_consolidation = None;
570
571        while self.upper < target_upper {
572            let listen_events = self.listen.fetch_next().await;
573            for listen_event in listen_events {
574                match listen_event {
575                    ListenEvent::Progress(upper) => {
576                        debug!("synced up to {upper:?}");
577                        self.upper = antichain_to_timestamp(upper);
578                        // Attempt to apply updates in batches of a single timestamp. If another
579                        // catalog wrote a fence token at one timestamp and then updates in a new
580                        // format at a later timestamp, then we want to apply the fence token
581                        // before attempting to deserialize the new updates.
582                        while let Some((ts, updates)) = updates.pop_first() {
583                            assert!(ts < self.upper, "expected {} < {}", ts, self.upper);
584                            let updates = updates.into_iter().map(
585                                |update: StateUpdate<StateUpdateKindJson>| {
586                                    let kind =
587                                        T::try_from(update.kind).expect("kind decoding error");
588                                    StateUpdate {
589                                        kind,
590                                        ts: update.ts,
591                                        diff: update.diff,
592                                    }
593                                },
594                            );
595                            self.apply_updates(updates)?;
596                            self.maybe_consolidate();
597                        }
598                    }
599                    ListenEvent::Updates(batch_updates) => {
600                        for update in batch_updates {
601                            let update: StateUpdate<StateUpdateKindJson> = update.into();
602                            updates.entry(update.ts).or_default().push(update);
603                        }
604                    }
605                }
606            }
607        }
608        assert_eq!(updates, BTreeMap::new(), "all updates should be applied");
609        // Always consolidate at the end to ensure the snapshot is clean.
610        self.consolidate();
611        Ok(())
612    }
613
614    /// Apply a batch of updates and then consolidate the snapshot. This is the
615    /// typical entry point for callers that apply updates in a single batch.
616    ///
617    /// For hot loops that apply updates across many timestamps (e.g., `sync_inner`),
618    /// use `apply_updates` directly and call `consolidate()` periodically (e.g.,
619    /// on snapshot doubling) to bound memory while staying amortized O(N log N).
620    pub(crate) fn apply_updates_and_consolidate(
621        &mut self,
622        updates: impl IntoIterator<Item = StateUpdate<T>>,
623    ) -> Result<(), FenceError> {
624        self.apply_updates(updates)?;
625        self.consolidate();
626        Ok(())
627    }
628
629    /// Apply a batch of updates to the catalog state without consolidating.
630    ///
631    /// Does NOT consolidate the snapshot afterward. If you are calling this once,
632    /// prefer `apply_updates_and_consolidate`. This method exists for loops that
633    /// call it many times — consolidating per call would be O(K * N log N) instead
634    /// of O(N log N). Callers should consolidate periodically (e.g., on snapshot
635    /// doubling) to bound memory.
636    #[mz_ore::instrument(level = "debug")]
637    fn apply_updates(
638        &mut self,
639        updates: impl IntoIterator<Item = StateUpdate<T>>,
640    ) -> Result<(), FenceError> {
641        let mut updates: Vec<_> = updates
642            .into_iter()
643            .map(|StateUpdate { kind, ts, diff }| (kind, ts, diff))
644            .collect();
645
646        // This helps guarantee that for a single key, there is at most a single retraction and a
647        // single insertion per timestamp. Otherwise, we would need to match the retractions and
648        // insertions up by value and manually figure out what the end value should be.
649        differential_dataflow::consolidation::consolidate_updates(&mut updates);
650
651        // Updates must be applied in timestamp order. Within a timestamp retractions must be
652        // applied before insertions, or we might end up retracting the wrong value.
653        updates.sort_by(|(_, ts1, diff1), (_, ts2, diff2)| ts1.cmp(ts2).then(diff1.cmp(diff2)));
654
655        let mut errors = Vec::new();
656
657        for (kind, ts, diff) in updates {
658            if diff != Diff::ONE && diff != Diff::MINUS_ONE {
659                panic!("invalid update in consolidated trace: ({kind:?}, {ts:?}, {diff:?})");
660            }
661
662            match self.update_applier.apply_update(
663                StateUpdate { kind, ts, diff },
664                &mut self.fenceable_token,
665                &self.metrics,
666            ) {
667                Ok(Some(StateUpdate { kind, ts, diff })) => self.snapshot.push((kind, ts, diff)),
668                Ok(None) => {}
669                // Instead of returning immediately, we accumulate all the errors and return the one
670                // with the most information.
671                Err(err) => errors.push(err),
672            }
673        }
674
675        // Track the high-water mark of the unconsolidated snapshot size.
676        let len = i64::try_from(self.snapshot.len()).unwrap_or(i64::MAX);
677        if len > self.metrics.snapshot_max_entries.get() {
678            self.metrics.snapshot_max_entries.set(len);
679        }
680
681        errors.sort();
682        if let Some(err) = errors.into_iter().next() {
683            return Err(err);
684        }
685
686        Ok(())
687    }
688
689    /// Consolidate the snapshot if it has at least doubled in size since the
690    /// last consolidation. This amortizes the O(N log N) consolidation cost
691    /// over many small updates, keeping the total work O(N log N) rather than
692    /// O(K * N log N) for K timestamps.
693    fn maybe_consolidate(&mut self) {
694        let threshold = *self
695            .size_at_last_consolidation
696            // Use a minimum of 8 to avoid consolidating on every update when
697            // the snapshot is small or empty (since 0 * 2 = 0).
698            .get_or_insert_with(|| max(self.snapshot.len(), 8));
699        if self.snapshot.len() >= threshold * 2 {
700            self.consolidate();
701            self.size_at_last_consolidation = Some(self.snapshot.len());
702        }
703    }
704
705    #[mz_ore::instrument]
706    pub(crate) fn consolidate(&mut self) {
707        self.metrics.snapshot_consolidations.inc();
708        soft_assert_no_log!(
709            self.snapshot
710                .windows(2)
711                .all(|updates| updates[0].1 <= updates[1].1),
712            "snapshot should be sorted by timestamp, {:#?}",
713            self.snapshot
714        );
715
716        let new_ts = self
717            .snapshot
718            .last()
719            .map(|(_, ts, _)| *ts)
720            .unwrap_or_else(Timestamp::minimum);
721        for (_, ts, _) in &mut self.snapshot {
722            *ts = new_ts;
723        }
724        differential_dataflow::consolidation::consolidate_updates(&mut self.snapshot);
725    }
726
727    /// Execute and return the results of `f` on the current catalog trace.
728    ///
729    /// Will return an error if the catalog has been fenced out.
730    async fn with_trace<R>(
731        &mut self,
732        f: impl FnOnce(&Vec<(T, Timestamp, Diff)>) -> Result<R, CatalogError>,
733    ) -> Result<R, CatalogError> {
734        self.sync_to_current_upper().await?;
735        f(&self.snapshot)
736    }
737
738    /// Open a read handle to the catalog.
739    async fn read_handle(&self) -> ReadHandle<SourceData, (), Timestamp, StorageDiff> {
740        self.persist_client
741            .open_leased_reader(
742                self.shard_id,
743                Arc::new(persist_desc()),
744                Arc::new(UnitSchema::default()),
745                Diagnostics {
746                    shard_name: CATALOG_SHARD_NAME.to_string(),
747                    handle_purpose: "openable durable catalog state temporary reader".to_string(),
748                },
749                USE_CRITICAL_SINCE_CATALOG.get(self.persist_client.dyncfgs()),
750            )
751            .await
752            .expect("invalid usage")
753    }
754
755    /// Politely releases all external resources that can only be released in an async context.
756    async fn expire(self: Box<Self>) {
757        self.write_handle.expire().await;
758        self.listen.expire().await;
759    }
760}
761
762impl<U: ApplyUpdate<StateUpdateKind>> PersistHandle<StateUpdateKind, U> {
763    /// Execute and return the results of `f` on the current catalog snapshot.
764    ///
765    /// Will return an error if the catalog has been fenced out.
766    async fn with_snapshot<T>(
767        &mut self,
768        f: impl FnOnce(Snapshot) -> Result<T, CatalogError>,
769    ) -> Result<T, CatalogError> {
770        fn apply<K, V>(map: &mut BTreeMap<K, V>, key: &K, value: &V, diff: Diff)
771        where
772            K: Ord + Clone,
773            V: Ord + Clone + Debug,
774        {
775            let key = key.clone();
776            let value = value.clone();
777            if diff == Diff::ONE {
778                let prev = map.insert(key, value);
779                assert_eq!(
780                    prev, None,
781                    "values must be explicitly retracted before inserting a new value"
782                );
783            } else if diff == Diff::MINUS_ONE {
784                let prev = map.remove(&key);
785                assert_eq!(
786                    prev,
787                    Some(value),
788                    "retraction does not match existing value"
789                );
790            }
791        }
792
793        self.with_trace(|trace| {
794            let mut snapshot = Snapshot::empty();
795            for (kind, ts, diff) in trace {
796                let diff = *diff;
797                if diff != Diff::ONE && diff != Diff::MINUS_ONE {
798                    panic!("invalid update in consolidated trace: ({kind:?}, {ts:?}, {diff:?})");
799                }
800
801                match kind {
802                    StateUpdateKind::AuditLog(_key, ()) => {
803                        // Ignore for snapshots.
804                    }
805                    StateUpdateKind::Cluster(key, value) => {
806                        apply(&mut snapshot.clusters, key, value, diff);
807                    }
808                    StateUpdateKind::ClusterReplica(key, value) => {
809                        apply(&mut snapshot.cluster_replicas, key, value, diff);
810                    }
811                    StateUpdateKind::Comment(key, value) => {
812                        apply(&mut snapshot.comments, key, value, diff);
813                    }
814                    StateUpdateKind::Config(key, value) => {
815                        apply(&mut snapshot.configs, key, value, diff);
816                    }
817                    StateUpdateKind::Database(key, value) => {
818                        apply(&mut snapshot.databases, key, value, diff);
819                    }
820                    StateUpdateKind::DefaultPrivilege(key, value) => {
821                        apply(&mut snapshot.default_privileges, key, value, diff);
822                    }
823                    StateUpdateKind::FenceToken(_token) => {
824                        // Ignore for snapshots.
825                    }
826                    StateUpdateKind::IdAllocator(key, value) => {
827                        apply(&mut snapshot.id_allocator, key, value, diff);
828                    }
829                    StateUpdateKind::IntrospectionSourceIndex(key, value) => {
830                        apply(&mut snapshot.introspection_sources, key, value, diff);
831                    }
832                    StateUpdateKind::Item(key, value) => {
833                        apply(&mut snapshot.items, key, value, diff);
834                    }
835                    StateUpdateKind::NetworkPolicy(key, value) => {
836                        apply(&mut snapshot.network_policies, key, value, diff);
837                    }
838                    StateUpdateKind::Role(key, value) => {
839                        apply(&mut snapshot.roles, key, value, diff);
840                    }
841                    StateUpdateKind::Schema(key, value) => {
842                        apply(&mut snapshot.schemas, key, value, diff);
843                    }
844                    StateUpdateKind::Setting(key, value) => {
845                        apply(&mut snapshot.settings, key, value, diff);
846                    }
847                    StateUpdateKind::SourceReferences(key, value) => {
848                        apply(&mut snapshot.source_references, key, value, diff);
849                    }
850                    StateUpdateKind::SystemConfiguration(key, value) => {
851                        apply(&mut snapshot.system_configurations, key, value, diff);
852                    }
853                    StateUpdateKind::SystemObjectMapping(key, value) => {
854                        apply(&mut snapshot.system_object_mappings, key, value, diff);
855                    }
856                    StateUpdateKind::SystemPrivilege(key, value) => {
857                        apply(&mut snapshot.system_privileges, key, value, diff);
858                    }
859                    StateUpdateKind::StorageCollectionMetadata(key, value) => {
860                        apply(&mut snapshot.storage_collection_metadata, key, value, diff);
861                    }
862                    StateUpdateKind::UnfinalizedShard(key, ()) => {
863                        apply(&mut snapshot.unfinalized_shards, key, &(), diff);
864                    }
865                    StateUpdateKind::TxnWalShard((), value) => {
866                        apply(&mut snapshot.txn_wal_shard, &(), value, diff);
867                    }
868                    StateUpdateKind::RoleAuth(key, value) => {
869                        apply(&mut snapshot.role_auth, key, value, diff);
870                    }
871                }
872            }
873            f(snapshot)
874        })
875        .await
876    }
877
878    /// Generates an iterator of [`StateUpdate`] that contain all updates to the catalog
879    /// state.
880    ///
881    /// The output is fetched directly from persist instead of the in-memory cache.
882    ///
883    /// The output is consolidated and sorted by timestamp in ascending order.
884    #[mz_ore::instrument(level = "debug")]
885    async fn persist_snapshot(&self) -> impl Iterator<Item = StateUpdate> + DoubleEndedIterator {
886        let mut read_handle = self.read_handle().await;
887        let as_of = as_of(&read_handle, self.upper);
888        let snapshot = snapshot_binary(&mut read_handle, as_of, &self.metrics)
889            .await
890            .map(|update| update.try_into().expect("kind decoding error"));
891        read_handle.expire().await;
892        snapshot
893    }
894}
895
896/// Applies updates for an unopened catalog.
897#[derive(Debug)]
898pub(crate) struct UnopenedCatalogStateInner {
899    /// A cache of the config collection of the catalog.
900    configs: BTreeMap<String, u64>,
901    /// A cache of the settings collection of the catalog.
902    settings: BTreeMap<String, String>,
903}
904
905impl UnopenedCatalogStateInner {
906    fn new() -> UnopenedCatalogStateInner {
907        UnopenedCatalogStateInner {
908            configs: BTreeMap::new(),
909            settings: BTreeMap::new(),
910        }
911    }
912}
913
914impl ApplyUpdate<StateUpdateKindJson> for UnopenedCatalogStateInner {
915    fn apply_update(
916        &mut self,
917        update: StateUpdate<StateUpdateKindJson>,
918        current_fence_token: &mut FenceableToken,
919        _metrics: &Arc<Metrics>,
920    ) -> Result<Option<StateUpdate<StateUpdateKindJson>>, FenceError> {
921        if !update.kind.is_audit_log() && update.kind.is_always_deserializable() {
922            let kind = TryInto::try_into(&update.kind).expect("kind is known to be deserializable");
923            match (kind, update.diff) {
924                (StateUpdateKind::Config(key, value), Diff::ONE) => {
925                    let prev = self.configs.insert(key.key, value.value);
926                    assert_eq!(
927                        prev, None,
928                        "values must be explicitly retracted before inserting a new value"
929                    );
930                }
931                (StateUpdateKind::Config(key, value), Diff::MINUS_ONE) => {
932                    let prev = self.configs.remove(&key.key);
933                    assert_eq!(
934                        prev,
935                        Some(value.value),
936                        "retraction does not match existing value"
937                    );
938                }
939                (StateUpdateKind::Setting(key, value), Diff::ONE) => {
940                    let prev = self.settings.insert(key.name, value.value);
941                    assert_eq!(
942                        prev, None,
943                        "values must be explicitly retracted before inserting a new value"
944                    );
945                }
946                (StateUpdateKind::Setting(key, value), Diff::MINUS_ONE) => {
947                    let prev = self.settings.remove(&key.name);
948                    assert_eq!(
949                        prev,
950                        Some(value.value),
951                        "retraction does not match existing value"
952                    );
953                }
954                (StateUpdateKind::FenceToken(fence_token), Diff::ONE) => {
955                    current_fence_token.maybe_fence(fence_token)?;
956                }
957                _ => {}
958            }
959        }
960
961        Ok(Some(update))
962    }
963}
964
965/// A Handle to an unopened catalog stored in persist. The unopened catalog can serve `Config` data,
966/// `Setting` data, or the current epoch. All other catalog data may be un-migrated and should not
967/// be read until the catalog has been opened. The [`UnopenedPersistCatalogState`] is responsible
968/// for opening the catalog, see [`OpenableDurableCatalogState::open`] for more details.
969///
970/// Production users should call [`Self::expire`] before dropping an [`UnopenedPersistCatalogState`]
971/// so that it can expire its leases. If/when rust gets AsyncDrop, this will be done automatically.
972pub(crate) type UnopenedPersistCatalogState =
973    PersistHandle<StateUpdateKindJson, UnopenedCatalogStateInner>;
974
975impl UnopenedPersistCatalogState {
976    /// Create a new [`UnopenedPersistCatalogState`] to the catalog state associated with
977    /// `organization_id`.
978    ///
979    /// All usages of the persist catalog must go through this function. That includes the
980    /// catalog-debug tool, the adapter's catalog, etc.
981    #[mz_ore::instrument]
982    pub(crate) async fn new(
983        persist_client: PersistClient,
984        organization_id: Uuid,
985        version: semver::Version,
986        deploy_generation: Option<u64>,
987        metrics: Arc<Metrics>,
988    ) -> Result<UnopenedPersistCatalogState, DurableCatalogError> {
989        let catalog_shard_id = shard_id(organization_id, CATALOG_SEED);
990        debug!(?catalog_shard_id, "new persist backed catalog state");
991
992        // Check the catalog shard version to ensure that we are compatible with the persist
993        // data format. This lets us return an error gracefully, rather than panicking later in
994        // persist.
995        let version_in_catalog_shard =
996            fetch_catalog_shard_version(&persist_client, catalog_shard_id).await;
997        if let Some(version_in_catalog_shard) = version_in_catalog_shard {
998            if !mz_persist_client::cfg::code_can_write_data(&version, &version_in_catalog_shard) {
999                return Err(DurableCatalogError::IncompatiblePersistVersion {
1000                    found_version: version_in_catalog_shard,
1001                    catalog_version: version,
1002                });
1003            }
1004        }
1005
1006        let open_handles_start = Instant::now();
1007        info!("startup: envd serve: catalog init: open handles beginning");
1008        let since_handle = persist_client
1009            .open_critical_since(
1010                catalog_shard_id,
1011                CATALOG_CRITICAL_SINCE.clone(),
1012                Opaque::encode(&i64::MIN),
1013                Diagnostics {
1014                    shard_name: CATALOG_SHARD_NAME.to_string(),
1015                    handle_purpose: "durable catalog state critical since".to_string(),
1016                },
1017            )
1018            .await
1019            .expect("invalid usage");
1020
1021        let (mut write_handle, mut read_handle) = persist_client
1022            .open(
1023                catalog_shard_id,
1024                Arc::new(persist_desc()),
1025                Arc::new(UnitSchema::default()),
1026                Diagnostics {
1027                    shard_name: CATALOG_SHARD_NAME.to_string(),
1028                    handle_purpose: "durable catalog state handles".to_string(),
1029                },
1030                USE_CRITICAL_SINCE_CATALOG.get(persist_client.dyncfgs()),
1031            )
1032            .await
1033            .expect("invalid usage");
1034        info!(
1035            "startup: envd serve: catalog init: open handles complete in {:?}",
1036            open_handles_start.elapsed()
1037        );
1038
1039        // Commit an empty write at the minimum timestamp so the catalog is always readable.
1040        let upper = {
1041            const EMPTY_UPDATES: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
1042            let upper = Antichain::from_elem(Timestamp::minimum());
1043            let next_upper = Timestamp::minimum().step_forward();
1044            match write_handle
1045                .compare_and_append(EMPTY_UPDATES, upper, Antichain::from_elem(next_upper))
1046                .await
1047                .expect("invalid usage")
1048            {
1049                Ok(()) => next_upper,
1050                Err(mismatch) => antichain_to_timestamp(mismatch.current),
1051            }
1052        };
1053
1054        let snapshot_start = Instant::now();
1055        info!("startup: envd serve: catalog init: snapshot beginning");
1056        let as_of = as_of(&read_handle, upper);
1057        let snapshot: Vec<_> = snapshot_binary(&mut read_handle, as_of, &metrics)
1058            .await
1059            .map(|StateUpdate { kind, ts, diff }| (kind, ts, diff))
1060            .collect();
1061        let listen = read_handle
1062            .listen(Antichain::from_elem(as_of))
1063            .await
1064            .expect("invalid usage");
1065        info!(
1066            "startup: envd serve: catalog init: snapshot complete in {:?}",
1067            snapshot_start.elapsed()
1068        );
1069
1070        let mut handle = UnopenedPersistCatalogState {
1071            // Unopened catalogs are always writeable until they're opened in an explicit mode.
1072            mode: Mode::Writable,
1073            since_handle,
1074            write_handle,
1075            listen,
1076            persist_client,
1077            shard_id: catalog_shard_id,
1078            // Initialize empty in-memory state.
1079            snapshot: Vec::new(),
1080            update_applier: UnopenedCatalogStateInner::new(),
1081            upper,
1082            fenceable_token: FenceableToken::new(deploy_generation),
1083            catalog_content_version: version,
1084            bootstrap_complete: false,
1085            metrics,
1086            size_at_last_consolidation: None,
1087        };
1088        // If the snapshot is not consolidated, and we see multiple epoch values while applying the
1089        // updates, then we might accidentally fence ourselves out.
1090        soft_assert_no_log!(
1091            snapshot.iter().all(|(_, _, diff)| *diff == Diff::ONE),
1092            "snapshot should be consolidated: {snapshot:#?}"
1093        );
1094
1095        let apply_start = Instant::now();
1096        info!("startup: envd serve: catalog init: apply updates beginning");
1097        let updates = snapshot
1098            .into_iter()
1099            .map(|(kind, ts, diff)| StateUpdate { kind, ts, diff });
1100        handle.apply_updates_and_consolidate(updates)?;
1101        info!(
1102            "startup: envd serve: catalog init: apply updates complete in {:?}",
1103            apply_start.elapsed()
1104        );
1105
1106        // Validate that the binary version of the current process is not less than any binary
1107        // version that has written to the catalog.
1108        // This condition is only checked once, right here. If a new process comes along with a
1109        // higher version, it must fence this process out with one of the existing fencing
1110        // mechanisms.
1111        if let Some(found_version) = handle.get_catalog_content_version().await? {
1112            // Use cmp_precedence() to ignore build metadata per SemVer 2.0.0 spec
1113            if handle
1114                .catalog_content_version
1115                .cmp_precedence(&found_version)
1116                == std::cmp::Ordering::Less
1117            {
1118                return Err(DurableCatalogError::IncompatiblePersistVersion {
1119                    found_version,
1120                    catalog_version: handle.catalog_content_version,
1121                });
1122            }
1123        }
1124
1125        Ok(handle)
1126    }
1127
1128    #[mz_ore::instrument]
1129    async fn open_inner(
1130        mut self,
1131        mode: Mode,
1132        initial_ts: Timestamp,
1133        bootstrap_args: &BootstrapArgs,
1134    ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1135        // It would be nice to use `initial_ts` here, but it comes from the system clock, not the
1136        // timestamp oracle.
1137        let mut commit_ts = self.upper;
1138        self.mode = mode;
1139
1140        // Validate the current deploy generation.
1141        match (&self.mode, &self.fenceable_token) {
1142            (_, FenceableToken::Unfenced { .. } | FenceableToken::Fenced { .. }) => {
1143                return Err(DurableCatalogError::Internal(
1144                    "catalog should not have fenced before opening".to_string(),
1145                )
1146                .into());
1147            }
1148            (
1149                Mode::Writable | Mode::Savepoint,
1150                FenceableToken::Initializing {
1151                    current_deploy_generation: None,
1152                    ..
1153                },
1154            ) => {
1155                return Err(DurableCatalogError::Internal(format!(
1156                    "cannot open in mode '{:?}' without a deploy generation",
1157                    self.mode,
1158                ))
1159                .into());
1160            }
1161            _ => {}
1162        }
1163
1164        let read_only = matches!(self.mode, Mode::Readonly);
1165
1166        // Fence out previous catalogs.
1167        loop {
1168            self.sync_to_current_upper().await?;
1169            commit_ts = max(commit_ts, self.upper);
1170            let (fence_updates, current_fenceable_token) = self
1171                .fenceable_token
1172                .generate_unfenced_token(self.mode)?
1173                .ok_or_else(|| {
1174                    DurableCatalogError::Internal(
1175                        "catalog should not have fenced before opening".to_string(),
1176                    )
1177                })?;
1178            debug!(
1179                ?self.upper,
1180                ?self.fenceable_token,
1181                ?current_fenceable_token,
1182                "fencing previous catalogs"
1183            );
1184            if matches!(self.mode, Mode::Writable) {
1185                match self
1186                    .compare_and_append(fence_updates.clone(), commit_ts)
1187                    .await
1188                {
1189                    Ok(upper) => {
1190                        commit_ts = upper;
1191                    }
1192                    Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
1193                    Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
1194                        warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
1195                        continue;
1196                    }
1197                }
1198            }
1199            self.fenceable_token = current_fenceable_token;
1200            break;
1201        }
1202
1203        if matches!(self.mode, Mode::Writable) {
1204            // One-time migration: The catalog previously used `CONTROLLER_CRITICAL_SINCE` for its
1205            // since handle. Now it uses its own `CATALOG_CRITICAL_SINCE`, to free
1206            // `CONTROLLER_CRITICAL_SINCE` up for the storage controller. The catalog and
1207            // controller handles differ in the `Opaque` codec, so we need a migration.
1208            //
1209            // TODO: Remove this once we don't support upgrading from v26 anymore.
1210            let mut controller_handle = self
1211                .persist_client
1212                .open_critical_since::<SourceData, (), Timestamp, StorageDiff>(
1213                    self.shard_id,
1214                    PersistClient::CONTROLLER_CRITICAL_SINCE,
1215                    Opaque::encode(&i64::MIN),
1216                    Diagnostics {
1217                        shard_name: CATALOG_SHARD_NAME.to_string(),
1218                        handle_purpose: "durable catalog state critical since (migration)"
1219                            .to_string(),
1220                    },
1221                )
1222                .await
1223                .expect("invalid usage");
1224
1225            let since = controller_handle.since().clone();
1226            let res = controller_handle
1227                .compare_and_downgrade_since(
1228                    &Opaque::encode(&i64::MIN),
1229                    (&Opaque::encode(&PersistEpoch::default()), &since),
1230                )
1231                .await;
1232            match res {
1233                Ok(_) => info!("migrated Opaque of catalog since handle"),
1234                Err(_) => { /* critical since was already migrated */ }
1235            }
1236        }
1237
1238        let is_initialized = self.is_initialized_inner();
1239        if !matches!(self.mode, Mode::Writable) && !is_initialized {
1240            return Err(CatalogError::Durable(DurableCatalogError::NotWritable(
1241                format!(
1242                    "catalog tables do not exist; will not create in {:?} mode",
1243                    self.mode
1244                ),
1245            )));
1246        }
1247        soft_assert_ne_or_log!(self.upper, Timestamp::minimum());
1248
1249        // Remove all audit log entries.
1250        let (audit_logs, snapshot): (Vec<_>, Vec<_>) = self
1251            .snapshot
1252            .into_iter()
1253            .partition(|(update, _, _)| update.is_audit_log());
1254        self.snapshot = snapshot;
1255        let audit_log_count = audit_logs.iter().map(|(_, _, diff)| diff).sum::<Diff>();
1256        let audit_log_handle = AuditLogIterator::new(audit_logs);
1257
1258        // Perform data migrations.
1259        if is_initialized && !read_only {
1260            commit_ts = upgrade(&mut self, commit_ts).await?;
1261        }
1262
1263        debug!(
1264            ?is_initialized,
1265            ?self.upper,
1266            "initializing catalog state"
1267        );
1268        let mut catalog = PersistCatalogState {
1269            mode: self.mode,
1270            since_handle: self.since_handle,
1271            write_handle: self.write_handle,
1272            listen: self.listen,
1273            persist_client: self.persist_client,
1274            shard_id: self.shard_id,
1275            upper: self.upper,
1276            fenceable_token: self.fenceable_token,
1277            // Initialize empty in-memory state.
1278            snapshot: Vec::new(),
1279            update_applier: CatalogStateInner::new(),
1280            catalog_content_version: self.catalog_content_version,
1281            bootstrap_complete: false,
1282            metrics: self.metrics,
1283            size_at_last_consolidation: None,
1284        };
1285        catalog.metrics.collection_entries.reset();
1286        // Normally, `collection_entries` is updated in `apply_updates`. The audit log updates skip
1287        // over that function so we manually update it here.
1288        catalog
1289            .metrics
1290            .collection_entries
1291            .with_label_values(&[&CollectionType::AuditLog.to_string()])
1292            .add(audit_log_count.into_inner());
1293        let updates = self.snapshot.into_iter().map(|(kind, ts, diff)| {
1294            let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
1295            StateUpdate { kind, ts, diff }
1296        });
1297        catalog.apply_updates_and_consolidate(updates)?;
1298
1299        let catalog_content_version = catalog.catalog_content_version.to_string();
1300        let txn = if is_initialized {
1301            let mut txn = catalog.transaction().await?;
1302
1303            // Ad-hoc migration: Initialize the `migration_version` expected by adapter to be
1304            // present in existing catalogs.
1305            //
1306            // Note: Need to exclude read-only catalog mode here, because in that mode all
1307            // transactions are expected to be no-ops.
1308            // TODO: remove this once we only support upgrades from version >= 0.164
1309            if txn.get_setting("migration_version".into()).is_none() && mode != Mode::Readonly {
1310                let old_version = txn.get_catalog_content_version();
1311                txn.set_setting("migration_version".into(), old_version.map(Into::into))?;
1312            }
1313
1314            txn.set_catalog_content_version(catalog_content_version)?;
1315            txn
1316        } else {
1317            soft_assert_eq_no_log!(
1318                catalog
1319                    .snapshot
1320                    .iter()
1321                    .filter(|(kind, _, _)| !matches!(kind, StateUpdateKind::FenceToken(_)))
1322                    .count(),
1323                0,
1324                "trace should not contain any updates for an uninitialized catalog: {:#?}",
1325                catalog.snapshot
1326            );
1327
1328            let mut txn = catalog.transaction().await?;
1329            initialize::initialize(
1330                &mut txn,
1331                bootstrap_args,
1332                initial_ts.into(),
1333                catalog_content_version,
1334            )
1335            .await?;
1336            txn
1337        };
1338
1339        if read_only {
1340            let (txn_batch, _) = txn.into_parts();
1341            // The upper here doesn't matter because we are only applying the updates in memory.
1342            let updates = StateUpdate::from_txn_batch_ts(txn_batch, catalog.upper);
1343            catalog.apply_updates_and_consolidate(updates)?;
1344        } else {
1345            txn.commit_internal(commit_ts).await?;
1346        }
1347
1348        if matches!(catalog.mode, Mode::Writable) {
1349            let write_handle = catalog
1350                .persist_client
1351                .open_writer::<SourceData, (), Timestamp, i64>(
1352                    catalog.write_handle.shard_id(),
1353                    Arc::new(persist_desc()),
1354                    Arc::new(UnitSchema::default()),
1355                    Diagnostics {
1356                        shard_name: CATALOG_SHARD_NAME.to_string(),
1357                        handle_purpose: "compact catalog".to_string(),
1358                    },
1359                )
1360                .await
1361                .expect("invalid usage");
1362            let fuel = CATALOG_FORCE_COMPACTION_FUEL.handle(catalog.persist_client.dyncfgs());
1363            let wait = CATALOG_FORCE_COMPACTION_WAIT.handle(catalog.persist_client.dyncfgs());
1364            // We're going to gradually turn this on via dyncfgs. Run it in a task so that it
1365            // doesn't block startup.
1366            let _task = mz_ore::task::spawn(|| "catalog::force_shard_compaction", async move {
1367                let () =
1368                    mz_persist_client::cli::admin::dangerous_force_compaction_and_break_pushdown(
1369                        &write_handle,
1370                        || fuel.get(),
1371                        || wait.get(),
1372                    )
1373                    .await;
1374            });
1375        }
1376
1377        Ok((Box::new(catalog), audit_log_handle))
1378    }
1379
1380    /// Reports if the catalog state has been initialized.
1381    ///
1382    /// NOTE: This is the answer as of the last call to [`PersistHandle::sync`] or [`PersistHandle::sync_to_current_upper`],
1383    /// not necessarily what is currently in persist.
1384    #[mz_ore::instrument]
1385    fn is_initialized_inner(&self) -> bool {
1386        !self.update_applier.configs.is_empty()
1387    }
1388
1389    /// Get the current value of config `key`.
1390    ///
1391    /// Some configs need to be read before the catalog is opened for bootstrapping.
1392    #[mz_ore::instrument]
1393    async fn get_current_config(&mut self, key: &str) -> Result<Option<u64>, DurableCatalogError> {
1394        self.sync_to_current_upper().await?;
1395        Ok(self.update_applier.configs.get(key).cloned())
1396    }
1397
1398    /// Get the user version of this instance.
1399    ///
1400    /// The user version is used to determine if a migration is needed.
1401    #[mz_ore::instrument]
1402    pub(crate) async fn get_user_version(&mut self) -> Result<Option<u64>, DurableCatalogError> {
1403        self.get_current_config(USER_VERSION_KEY).await
1404    }
1405
1406    /// Get the current value of setting `name`.
1407    ///
1408    /// Some settings need to be read before the catalog is opened for bootstrapping.
1409    #[mz_ore::instrument]
1410    async fn get_current_setting(
1411        &mut self,
1412        name: &str,
1413    ) -> Result<Option<String>, DurableCatalogError> {
1414        self.sync_to_current_upper().await?;
1415        Ok(self.update_applier.settings.get(name).cloned())
1416    }
1417
1418    /// Get the catalog content version.
1419    ///
1420    /// The catalog content version is the semantic version of the most recent binary that wrote to
1421    /// the catalog.
1422    #[mz_ore::instrument]
1423    async fn get_catalog_content_version(
1424        &mut self,
1425    ) -> Result<Option<semver::Version>, DurableCatalogError> {
1426        let version = self
1427            .get_current_setting(CATALOG_CONTENT_VERSION_KEY)
1428            .await?;
1429        let version = version.map(|version| version.parse().expect("invalid version persisted"));
1430        Ok(version)
1431    }
1432}
1433
1434#[async_trait]
1435impl OpenableDurableCatalogState for UnopenedPersistCatalogState {
1436    #[mz_ore::instrument]
1437    async fn open_savepoint(
1438        mut self: Box<Self>,
1439        initial_ts: Timestamp,
1440        bootstrap_args: &BootstrapArgs,
1441    ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1442        self.open_inner(Mode::Savepoint, initial_ts, bootstrap_args)
1443            .boxed()
1444            .await
1445    }
1446
1447    #[mz_ore::instrument]
1448    async fn open_read_only(
1449        mut self: Box<Self>,
1450        bootstrap_args: &BootstrapArgs,
1451    ) -> Result<Box<dyn DurableCatalogState>, CatalogError> {
1452        self.open_inner(Mode::Readonly, EpochMillis::MIN.into(), bootstrap_args)
1453            .boxed()
1454            .await
1455            .map(|(catalog, _)| catalog)
1456    }
1457
1458    #[mz_ore::instrument]
1459    async fn open(
1460        mut self: Box<Self>,
1461        initial_ts: Timestamp,
1462        bootstrap_args: &BootstrapArgs,
1463    ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1464        self.open_inner(Mode::Writable, initial_ts, bootstrap_args)
1465            .boxed()
1466            .await
1467    }
1468
1469    #[mz_ore::instrument(level = "debug")]
1470    async fn open_debug(mut self: Box<Self>) -> Result<DebugCatalogState, CatalogError> {
1471        Ok(DebugCatalogState(*self))
1472    }
1473
1474    #[mz_ore::instrument]
1475    async fn is_initialized(&mut self) -> Result<bool, CatalogError> {
1476        self.sync_to_current_upper().await?;
1477        Ok(self.is_initialized_inner())
1478    }
1479
1480    #[mz_ore::instrument]
1481    async fn epoch(&mut self) -> Result<Epoch, CatalogError> {
1482        self.sync_to_current_upper().await?;
1483        self.fenceable_token
1484            .validate()?
1485            .map(|token| token.epoch)
1486            .ok_or(CatalogError::Durable(DurableCatalogError::Uninitialized))
1487    }
1488
1489    #[mz_ore::instrument]
1490    async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError> {
1491        self.sync_to_current_upper().await?;
1492        self.fenceable_token
1493            .token()
1494            .map(|token| token.deploy_generation)
1495            .ok_or(CatalogError::Durable(DurableCatalogError::Uninitialized))
1496    }
1497
1498    #[mz_ore::instrument(level = "debug")]
1499    async fn get_0dt_deployment_max_wait(&mut self) -> Result<Option<Duration>, CatalogError> {
1500        let value = self
1501            .get_current_config(WITH_0DT_DEPLOYMENT_MAX_WAIT)
1502            .await?;
1503        match value {
1504            None => Ok(None),
1505            Some(millis) => Ok(Some(Duration::from_millis(millis))),
1506        }
1507    }
1508
1509    #[mz_ore::instrument(level = "debug")]
1510    async fn get_0dt_deployment_ddl_check_interval(
1511        &mut self,
1512    ) -> Result<Option<Duration>, CatalogError> {
1513        let value = self
1514            .get_current_config(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL)
1515            .await?;
1516        match value {
1517            None => Ok(None),
1518            Some(millis) => Ok(Some(Duration::from_millis(millis))),
1519        }
1520    }
1521
1522    #[mz_ore::instrument(level = "debug")]
1523    async fn get_enable_0dt_deployment_panic_after_timeout(
1524        &mut self,
1525    ) -> Result<Option<bool>, CatalogError> {
1526        let value = self
1527            .get_current_config(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT)
1528            .await?;
1529        match value {
1530            None => Ok(None),
1531            Some(0) => Ok(Some(false)),
1532            Some(1) => Ok(Some(true)),
1533            Some(v) => Err(
1534                DurableCatalogError::from(TryFromProtoError::UnknownEnumVariant(format!(
1535                    "{v} is not a valid boolean value"
1536                )))
1537                .into(),
1538            ),
1539        }
1540    }
1541
1542    #[mz_ore::instrument]
1543    async fn has_system_config_synced_once(&mut self) -> Result<bool, DurableCatalogError> {
1544        self.get_current_config(SYSTEM_CONFIG_SYNCED_KEY)
1545            .await
1546            .map(|value| value.map(|value| value > 0).unwrap_or(false))
1547    }
1548
1549    #[mz_ore::instrument]
1550    async fn trace_unconsolidated(&mut self) -> Result<Trace, CatalogError> {
1551        self.sync_to_current_upper().await?;
1552        if self.is_initialized_inner() {
1553            let snapshot = self.snapshot_unconsolidated().await;
1554            Ok(Trace::from_snapshot(snapshot))
1555        } else {
1556            Err(CatalogError::Durable(DurableCatalogError::Uninitialized))
1557        }
1558    }
1559
1560    #[mz_ore::instrument]
1561    async fn trace_consolidated(&mut self) -> Result<Trace, CatalogError> {
1562        self.sync_to_current_upper().await?;
1563        if self.is_initialized_inner() {
1564            let snapshot = self.current_snapshot().await?;
1565            Ok(Trace::from_snapshot(snapshot))
1566        } else {
1567            Err(CatalogError::Durable(DurableCatalogError::Uninitialized))
1568        }
1569    }
1570
1571    #[mz_ore::instrument(level = "debug")]
1572    async fn expire(self: Box<Self>) {
1573        self.expire().await
1574    }
1575}
1576
1577/// Applies updates for an opened catalog.
1578#[derive(Debug)]
1579struct CatalogStateInner {
1580    /// A trace of all catalog updates that can be consumed by some higher layer.
1581    updates: VecDeque<memory::objects::StateUpdate>,
1582}
1583
1584impl CatalogStateInner {
1585    fn new() -> CatalogStateInner {
1586        CatalogStateInner {
1587            updates: VecDeque::new(),
1588        }
1589    }
1590}
1591
1592impl ApplyUpdate<StateUpdateKind> for CatalogStateInner {
1593    fn apply_update(
1594        &mut self,
1595        update: StateUpdate<StateUpdateKind>,
1596        current_fence_token: &mut FenceableToken,
1597        metrics: &Arc<Metrics>,
1598    ) -> Result<Option<StateUpdate<StateUpdateKind>>, FenceError> {
1599        if let Some(collection_type) = update.kind.collection_type() {
1600            metrics
1601                .collection_entries
1602                .with_label_values(&[&collection_type.to_string()])
1603                .add(update.diff.into_inner());
1604        }
1605
1606        {
1607            let update: Option<memory::objects::StateUpdate> = (&update)
1608                .try_into()
1609                .expect("invalid persisted update: {update:#?}");
1610            if let Some(update) = update {
1611                self.updates.push_back(update);
1612            }
1613        }
1614
1615        match (update.kind, update.diff) {
1616            (StateUpdateKind::AuditLog(_, ()), _) => Ok(None),
1617            // Nothing to due for fence token retractions but wait for the next insertion.
1618            (StateUpdateKind::FenceToken(_), Diff::MINUS_ONE) => Ok(None),
1619            (StateUpdateKind::FenceToken(token), Diff::ONE) => {
1620                current_fence_token.maybe_fence(token)?;
1621                Ok(None)
1622            }
1623            (kind, diff) => Ok(Some(StateUpdate {
1624                kind,
1625                ts: update.ts,
1626                diff,
1627            })),
1628        }
1629    }
1630}
1631
1632/// A durable store of the catalog state using Persist as an implementation. The durable store can
1633/// serve any catalog data and transactionally modify catalog data.
1634///
1635/// Production users should call [`Self::expire`] before dropping a [`PersistCatalogState`]
1636/// so that it can expire its leases. If/when rust gets AsyncDrop, this will be done automatically.
1637type PersistCatalogState = PersistHandle<StateUpdateKind, CatalogStateInner>;
1638
1639#[async_trait]
1640impl ReadOnlyDurableCatalogState for PersistCatalogState {
1641    fn epoch(&self) -> Epoch {
1642        self.fenceable_token
1643            .token()
1644            .expect("opened catalog state must have an epoch")
1645            .epoch
1646    }
1647
1648    fn metrics(&self) -> &Metrics {
1649        &self.metrics
1650    }
1651
1652    #[mz_ore::instrument(level = "debug")]
1653    async fn expire(self: Box<Self>) {
1654        self.expire().await
1655    }
1656
1657    fn is_bootstrap_complete(&self) -> bool {
1658        self.bootstrap_complete
1659    }
1660
1661    async fn get_audit_logs(&mut self) -> Result<Vec<VersionedEvent>, CatalogError> {
1662        self.sync_to_current_upper().await?;
1663        let audit_logs: Vec<_> = self
1664            .persist_snapshot()
1665            .await
1666            .filter_map(
1667                |StateUpdate {
1668                     kind,
1669                     ts: _,
1670                     diff: _,
1671                 }| match kind {
1672                    StateUpdateKind::AuditLog(key, ()) => Some(key),
1673                    _ => None,
1674                },
1675            )
1676            .collect();
1677        let mut audit_logs: Vec<_> = audit_logs
1678            .into_iter()
1679            .map(RustType::from_proto)
1680            .map_ok(|key: AuditLogKey| key.event)
1681            .collect::<Result<_, _>>()?;
1682        audit_logs.sort_by(|a, b| a.sortable_id().cmp(&b.sortable_id()));
1683        Ok(audit_logs)
1684    }
1685
1686    #[mz_ore::instrument(level = "debug")]
1687    async fn get_next_id(&mut self, id_type: &str) -> Result<u64, CatalogError> {
1688        self.with_trace(|trace| {
1689            Ok(trace
1690                .into_iter()
1691                .rev()
1692                .filter_map(|(kind, _, _)| match kind {
1693                    StateUpdateKind::IdAllocator(key, value) if key.name == id_type => {
1694                        Some(value.next_id)
1695                    }
1696                    _ => None,
1697                })
1698                .next()
1699                .expect("must exist"))
1700        })
1701        .await
1702    }
1703
1704    #[mz_ore::instrument(level = "debug")]
1705    async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError> {
1706        self.sync_to_current_upper().await?;
1707        Ok(self
1708            .fenceable_token
1709            .token()
1710            .expect("opened catalogs must have a token")
1711            .deploy_generation)
1712    }
1713
1714    #[mz_ore::instrument(level = "debug")]
1715    async fn snapshot(&mut self) -> Result<Snapshot, CatalogError> {
1716        self.with_snapshot(Ok).await
1717    }
1718
1719    #[mz_ore::instrument(level = "debug")]
1720    async fn sync_to_current_updates(
1721        &mut self,
1722    ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError> {
1723        let upper = self.current_upper().await;
1724        self.sync_updates(upper).await
1725    }
1726
1727    #[mz_ore::instrument(level = "debug")]
1728    async fn sync_updates(
1729        &mut self,
1730        target_upper: mz_repr::Timestamp,
1731    ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError> {
1732        self.sync(target_upper).await?;
1733        let mut updates = Vec::new();
1734        while let Some(update) = self.update_applier.updates.front() {
1735            if update.ts >= target_upper {
1736                break;
1737            }
1738
1739            let update = self
1740                .update_applier
1741                .updates
1742                .pop_front()
1743                .expect("peeked above");
1744            updates.push(update);
1745        }
1746        Ok(updates)
1747    }
1748
1749    async fn current_upper(&mut self) -> Timestamp {
1750        self.current_upper().await
1751    }
1752}
1753
1754#[async_trait]
1755#[allow(mismatched_lifetime_syntaxes)]
1756impl DurableCatalogState for PersistCatalogState {
1757    fn is_read_only(&self) -> bool {
1758        matches!(self.mode, Mode::Readonly)
1759    }
1760
1761    fn is_savepoint(&self) -> bool {
1762        matches!(self.mode, Mode::Savepoint)
1763    }
1764
1765    async fn mark_bootstrap_complete(&mut self) {
1766        self.bootstrap_complete = true;
1767        if matches!(self.mode, Mode::Writable) {
1768            self.since_handle
1769                .upgrade_version()
1770                .await
1771                .expect("invalid usage")
1772        }
1773    }
1774
1775    #[mz_ore::instrument(level = "debug")]
1776    async fn transaction(&mut self) -> Result<Transaction, CatalogError> {
1777        self.metrics.transactions_started.inc();
1778        let snapshot = self.snapshot().await?;
1779        let commit_ts = self.upper.clone();
1780        Transaction::new(self, snapshot, commit_ts)
1781    }
1782
1783    fn transaction_from_snapshot(
1784        &mut self,
1785        snapshot: Snapshot,
1786    ) -> Result<Transaction, CatalogError> {
1787        let commit_ts = self.upper.clone();
1788        Transaction::new(self, snapshot, commit_ts)
1789    }
1790
1791    #[mz_ore::instrument(level = "debug")]
1792    async fn commit_transaction(
1793        &mut self,
1794        txn_batch: TransactionBatch,
1795        commit_ts: Timestamp,
1796    ) -> Result<Timestamp, CatalogError> {
1797        async fn commit_transaction_inner(
1798            catalog: &mut PersistCatalogState,
1799            txn_batch: TransactionBatch,
1800            commit_ts: Timestamp,
1801        ) -> Result<Timestamp, CatalogError> {
1802            // If the transaction is empty then we don't error, even in read-only mode.
1803            // This is mostly for legacy reasons (i.e. with enough elbow grease this
1804            // behavior can be changed without breaking any fundamental assumptions).
1805            if catalog.mode == Mode::Readonly {
1806                let updates: Vec<_> = StateUpdate::from_txn_batch(txn_batch).collect();
1807                if !updates.is_empty() {
1808                    return Err(DurableCatalogError::NotWritable(format!(
1809                        "cannot commit a transaction in a read-only catalog: {updates:#?}"
1810                    ))
1811                    .into());
1812                }
1813                return Ok(catalog.upper);
1814            }
1815
1816            // If the current upper does not match the transaction's commit timestamp, then the
1817            // catalog must have changed since the transaction was started, making the transaction
1818            // invalid. When/if we want a multi-writer catalog, this will likely have to change
1819            // from an assert to a retry.
1820            assert_eq!(
1821                catalog.upper, txn_batch.upper,
1822                "only one transaction at a time is supported"
1823            );
1824
1825            assert!(
1826                commit_ts >= catalog.upper,
1827                "expected commit ts, {}, to be greater than or equal to upper, {}",
1828                commit_ts,
1829                catalog.upper
1830            );
1831
1832            let updates = StateUpdate::from_txn_batch(txn_batch).collect();
1833            debug!("committing updates: {updates:?}");
1834
1835            let next_upper = match catalog.mode {
1836                Mode::Writable => catalog
1837                    .compare_and_append(updates, commit_ts)
1838                    .await
1839                    .map_err(|e| e.unwrap_fence_error())?,
1840                Mode::Savepoint => {
1841                    let updates = updates.into_iter().map(|(kind, diff)| StateUpdate {
1842                        kind,
1843                        ts: commit_ts,
1844                        diff,
1845                    });
1846                    catalog.apply_updates_and_consolidate(updates)?;
1847                    catalog.upper = commit_ts.step_forward();
1848                    catalog.upper
1849                }
1850                Mode::Readonly => unreachable!("handled above"),
1851            };
1852
1853            Ok(next_upper)
1854        }
1855        self.metrics.transaction_commits.inc();
1856        let counter = self.metrics.transaction_commit_latency_seconds.clone();
1857        commit_transaction_inner(self, txn_batch, commit_ts)
1858            .wall_time()
1859            .inc_by(counter)
1860            .await
1861    }
1862
1863    #[mz_ore::instrument(level = "debug")]
1864    async fn advance_upper(&mut self, new_upper: Timestamp) -> Result<(), CatalogError> {
1865        if self.upper >= new_upper {
1866            // We don't expect a no-op advancement, but if we are wrong we'd crash the process.
1867            // Seems safer to only soft-assert and return gracefully in production. If we get here
1868            // that means we tried to make the catalog shard readable at a time it was already
1869            // readable, which likely means we are violating linearizability. That's not great, but
1870            // crashing (or even crash-looping) is worse.
1871            //
1872            // TODO: Consider removing this once we have built some confidence.
1873            soft_panic_or_log!(
1874                "new_upper ({new_upper}) not greater than current upper ({})",
1875                self.upper
1876            );
1877            return Ok(());
1878        }
1879
1880        match self.mode {
1881            Mode::Writable => self
1882                .compare_and_append_inner([], new_upper)
1883                .await
1884                .map_err(|e| e.unwrap_fence_error())?,
1885            Mode::Savepoint => (),
1886            Mode::Readonly => {
1887                return Err(DurableCatalogError::NotWritable(
1888                    "cannot advance upper of a read-only catalog".into(),
1889                )
1890                .into());
1891            }
1892        }
1893
1894        self.upper = new_upper;
1895        // No sync needed since no data was written.
1896
1897        Ok(())
1898    }
1899
1900    fn shard_id(&self) -> ShardId {
1901        self.shard_id
1902    }
1903}
1904
1905/// Deterministically generate a shard ID for the given `organization_id` and `seed`.
1906pub fn shard_id(organization_id: Uuid, seed: usize) -> ShardId {
1907    let hash = sha2::Sha256::digest(format!("{organization_id}{seed}")).to_vec();
1908    soft_assert_eq_or_log!(hash.len(), 32, "SHA256 returns 32 bytes (256 bits)");
1909    let uuid = Uuid::from_slice(&hash[0..16]).expect("from_slice accepts exactly 16 bytes");
1910    ShardId::from_str(&format!("s{uuid}")).expect("known to be valid")
1911}
1912
1913/// Generates a timestamp for reading from `read_handle` that is as fresh as possible, given
1914/// `upper`.
1915fn as_of(
1916    read_handle: &ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1917    upper: Timestamp,
1918) -> Timestamp {
1919    let since = read_handle.since().clone();
1920    let mut as_of = upper.checked_sub(1).unwrap_or_else(|| {
1921        panic!("catalog persist shard should be initialize, found upper: {upper:?}")
1922    });
1923    // We only downgrade the since after writing, and we always set the since to one less than the
1924    // upper.
1925    soft_assert_or_log!(
1926        since.less_equal(&as_of),
1927        "since={since:?}, as_of={as_of:?}; since must be less than or equal to as_of"
1928    );
1929    // This should be a no-op if the assert above passes, however if it doesn't then we'd like to
1930    // continue with a correct timestamp instead of entering a panic loop.
1931    as_of.advance_by(since.borrow());
1932    as_of
1933}
1934
1935/// Fetch the persist version of the catalog shard, if one exists. A version will not
1936/// exist if we are creating a brand-new environment.
1937async fn fetch_catalog_shard_version(
1938    persist_client: &PersistClient,
1939    catalog_shard_id: ShardId,
1940) -> Option<semver::Version> {
1941    let shard_state = persist_client
1942        .inspect_shard::<Timestamp>(&catalog_shard_id)
1943        .await
1944        .ok()?;
1945    let json_state = serde_json::to_value(shard_state).expect("state serialization error");
1946    let json_version = json_state
1947        .get("applier_version")
1948        .cloned()
1949        .expect("missing applier_version");
1950    let version = serde_json::from_value(json_version).expect("version deserialization error");
1951    Some(version)
1952}
1953
1954/// Generates an iterator of [`StateUpdate`] that contain all updates to the catalog
1955/// state up to, and including, `as_of`.
1956///
1957/// The output is consolidated and sorted by timestamp in ascending order.
1958#[mz_ore::instrument(level = "debug")]
1959async fn snapshot_binary(
1960    read_handle: &mut ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1961    as_of: Timestamp,
1962    metrics: &Arc<Metrics>,
1963) -> impl Iterator<Item = StateUpdate<StateUpdateKindJson>> + DoubleEndedIterator + use<> {
1964    metrics.snapshots_taken.inc();
1965    let counter = metrics.snapshot_latency_seconds.clone();
1966    snapshot_binary_inner(read_handle, as_of)
1967        .wall_time()
1968        .inc_by(counter)
1969        .await
1970}
1971
1972/// Generates an iterator of [`StateUpdate`] that contain all updates to the catalog
1973/// state up to, and including, `as_of`.
1974///
1975/// The output is consolidated and sorted by timestamp in ascending order.
1976#[mz_ore::instrument(level = "debug")]
1977async fn snapshot_binary_inner(
1978    read_handle: &mut ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1979    as_of: Timestamp,
1980) -> impl Iterator<Item = StateUpdate<StateUpdateKindJson>> + DoubleEndedIterator + use<> {
1981    let snapshot = read_handle
1982        .snapshot_and_fetch(Antichain::from_elem(as_of))
1983        .await
1984        .expect("we have advanced the restart_as_of by the since");
1985    soft_assert_no_log!(
1986        snapshot.iter().all(|(_, _, diff)| *diff == 1),
1987        "snapshot_and_fetch guarantees a consolidated result: {snapshot:#?}"
1988    );
1989    snapshot
1990        .into_iter()
1991        .map(Into::<StateUpdate<StateUpdateKindJson>>::into)
1992        .sorted_by(|a, b| Ord::cmp(&b.ts, &a.ts))
1993}
1994
1995/// Convert an [`Antichain<Timestamp>`] to a [`Timestamp`].
1996///
1997/// The correctness of this function relies on [`Timestamp`] being totally ordered and never
1998/// finalizing the catalog shard.
1999pub(crate) fn antichain_to_timestamp(antichain: Antichain<Timestamp>) -> Timestamp {
2000    antichain
2001        .into_option()
2002        .expect("we use a totally ordered time and never finalize the shard")
2003}
2004
2005// Debug methods used by the catalog-debug tool.
2006
2007impl Trace {
2008    /// Generates a [`Trace`] from snapshot.
2009    fn from_snapshot(snapshot: impl IntoIterator<Item = StateUpdate>) -> Trace {
2010        let mut trace = Trace::new();
2011        for StateUpdate { kind, ts, diff } in snapshot {
2012            match kind {
2013                StateUpdateKind::AuditLog(k, v) => trace.audit_log.values.push(((k, v), ts, diff)),
2014                StateUpdateKind::Cluster(k, v) => trace.clusters.values.push(((k, v), ts, diff)),
2015                StateUpdateKind::ClusterReplica(k, v) => {
2016                    trace.cluster_replicas.values.push(((k, v), ts, diff))
2017                }
2018                StateUpdateKind::Comment(k, v) => trace.comments.values.push(((k, v), ts, diff)),
2019                StateUpdateKind::Config(k, v) => trace.configs.values.push(((k, v), ts, diff)),
2020                StateUpdateKind::Database(k, v) => trace.databases.values.push(((k, v), ts, diff)),
2021                StateUpdateKind::DefaultPrivilege(k, v) => {
2022                    trace.default_privileges.values.push(((k, v), ts, diff))
2023                }
2024                StateUpdateKind::FenceToken(_) => {
2025                    // Fence token not included in trace.
2026                }
2027                StateUpdateKind::IdAllocator(k, v) => {
2028                    trace.id_allocator.values.push(((k, v), ts, diff))
2029                }
2030                StateUpdateKind::IntrospectionSourceIndex(k, v) => {
2031                    trace.introspection_sources.values.push(((k, v), ts, diff))
2032                }
2033                StateUpdateKind::Item(k, v) => trace.items.values.push(((k, v), ts, diff)),
2034                StateUpdateKind::NetworkPolicy(k, v) => {
2035                    trace.network_policies.values.push(((k, v), ts, diff))
2036                }
2037                StateUpdateKind::Role(k, v) => trace.roles.values.push(((k, v), ts, diff)),
2038                StateUpdateKind::Schema(k, v) => trace.schemas.values.push(((k, v), ts, diff)),
2039                StateUpdateKind::Setting(k, v) => trace.settings.values.push(((k, v), ts, diff)),
2040                StateUpdateKind::SourceReferences(k, v) => {
2041                    trace.source_references.values.push(((k, v), ts, diff))
2042                }
2043                StateUpdateKind::SystemConfiguration(k, v) => {
2044                    trace.system_configurations.values.push(((k, v), ts, diff))
2045                }
2046                StateUpdateKind::SystemObjectMapping(k, v) => {
2047                    trace.system_object_mappings.values.push(((k, v), ts, diff))
2048                }
2049                StateUpdateKind::SystemPrivilege(k, v) => {
2050                    trace.system_privileges.values.push(((k, v), ts, diff))
2051                }
2052                StateUpdateKind::StorageCollectionMetadata(k, v) => trace
2053                    .storage_collection_metadata
2054                    .values
2055                    .push(((k, v), ts, diff)),
2056                StateUpdateKind::UnfinalizedShard(k, ()) => {
2057                    trace.unfinalized_shards.values.push(((k, ()), ts, diff))
2058                }
2059                StateUpdateKind::TxnWalShard((), v) => {
2060                    trace.txn_wal_shard.values.push((((), v), ts, diff))
2061                }
2062                StateUpdateKind::RoleAuth(k, v) => trace.role_auth.values.push(((k, v), ts, diff)),
2063            }
2064        }
2065        trace
2066    }
2067}
2068
2069impl UnopenedPersistCatalogState {
2070    /// Manually update value of `key` in collection `T` to `value`.
2071    #[mz_ore::instrument]
2072    pub(crate) async fn debug_edit<T: Collection>(
2073        &mut self,
2074        key: T::Key,
2075        value: T::Value,
2076    ) -> Result<Option<T::Value>, CatalogError>
2077    where
2078        T::Key: PartialEq + Eq + Debug + Clone,
2079        T::Value: Debug + Clone,
2080    {
2081        let prev_value = loop {
2082            let key = key.clone();
2083            let value = value.clone();
2084            let snapshot = self.current_snapshot().await?;
2085            let trace = Trace::from_snapshot(snapshot);
2086            let collection_trace = T::collection_trace(trace);
2087            let prev_values: Vec<_> = collection_trace
2088                .values
2089                .into_iter()
2090                .filter(|((k, _), _, diff)| {
2091                    soft_assert_eq_or_log!(*diff, Diff::ONE, "trace is consolidated");
2092                    &key == k
2093                })
2094                .collect();
2095
2096            let prev_value = match &prev_values[..] {
2097                [] => None,
2098                [((_, v), _, _)] => Some(v.clone()),
2099                prev_values => panic!("multiple values found for key {key:?}: {prev_values:?}"),
2100            };
2101
2102            let mut updates: Vec<_> = prev_values
2103                .into_iter()
2104                .map(|((k, v), _, _)| (T::update(k, v), Diff::MINUS_ONE))
2105                .collect();
2106            updates.push((T::update(key, value), Diff::ONE));
2107            // We must fence out all other catalogs, if we haven't already, since we are writing.
2108            match self.fenceable_token.generate_unfenced_token(self.mode)? {
2109                Some((fence_updates, current_fenceable_token)) => {
2110                    updates.extend(fence_updates.clone());
2111                    match self.compare_and_append(updates, self.upper).await {
2112                        Ok(_) => {
2113                            self.fenceable_token = current_fenceable_token;
2114                            break prev_value;
2115                        }
2116                        Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
2117                        Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
2118                            warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
2119                            continue;
2120                        }
2121                    }
2122                }
2123                None => {
2124                    self.compare_and_append(updates, self.upper)
2125                        .await
2126                        .map_err(|e| e.unwrap_fence_error())?;
2127                    break prev_value;
2128                }
2129            }
2130        };
2131        Ok(prev_value)
2132    }
2133
2134    /// Manually delete `key` from collection `T`.
2135    #[mz_ore::instrument]
2136    pub(crate) async fn debug_delete<T: Collection>(
2137        &mut self,
2138        key: T::Key,
2139    ) -> Result<(), CatalogError>
2140    where
2141        T::Key: PartialEq + Eq + Debug + Clone,
2142        T::Value: Debug,
2143    {
2144        loop {
2145            let key = key.clone();
2146            let snapshot = self.current_snapshot().await?;
2147            let trace = Trace::from_snapshot(snapshot);
2148            let collection_trace = T::collection_trace(trace);
2149            let mut retractions: Vec<_> = collection_trace
2150                .values
2151                .into_iter()
2152                .filter(|((k, _), _, diff)| {
2153                    soft_assert_eq_or_log!(*diff, Diff::ONE, "trace is consolidated");
2154                    &key == k
2155                })
2156                .map(|((k, v), _, _)| (T::update(k, v), Diff::MINUS_ONE))
2157                .collect();
2158
2159            // We must fence out all other catalogs, if we haven't already, since we are writing.
2160            match self.fenceable_token.generate_unfenced_token(self.mode)? {
2161                Some((fence_updates, current_fenceable_token)) => {
2162                    retractions.extend(fence_updates.clone());
2163                    match self.compare_and_append(retractions, self.upper).await {
2164                        Ok(_) => {
2165                            self.fenceable_token = current_fenceable_token;
2166                            break;
2167                        }
2168                        Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
2169                        Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
2170                            warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
2171                            continue;
2172                        }
2173                    }
2174                }
2175                None => {
2176                    self.compare_and_append(retractions, self.upper)
2177                        .await
2178                        .map_err(|e| e.unwrap_fence_error())?;
2179                    break;
2180                }
2181            }
2182        }
2183        Ok(())
2184    }
2185
2186    /// Generates a [`Vec<StateUpdate>`] that contain all updates to the catalog
2187    /// state.
2188    ///
2189    /// The output is consolidated and sorted by timestamp in ascending order and the current upper.
2190    async fn current_snapshot(
2191        &mut self,
2192    ) -> Result<impl IntoIterator<Item = StateUpdate> + '_, CatalogError> {
2193        self.sync_to_current_upper().await?;
2194        self.consolidate();
2195        Ok(self.snapshot.iter().cloned().map(|(kind, ts, diff)| {
2196            let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
2197            StateUpdate { kind, ts, diff }
2198        }))
2199    }
2200}