Skip to main content

mz_catalog/durable/
persist.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10#[cfg(test)]
11mod tests;
12
13use std::cmp::max;
14use std::collections::{BTreeMap, VecDeque};
15use std::fmt::Debug;
16use std::str::FromStr;
17use std::sync::{Arc, LazyLock};
18use std::time::{Duration, Instant};
19
20use async_trait::async_trait;
21use differential_dataflow::lattice::Lattice;
22use futures::{FutureExt, StreamExt};
23use itertools::Itertools;
24use mz_audit_log::VersionedEvent;
25use mz_ore::metrics::MetricsFutureExt;
26use mz_ore::now::EpochMillis;
27use mz_ore::{
28    soft_assert_eq_no_log, soft_assert_eq_or_log, soft_assert_ne_or_log, soft_assert_no_log,
29    soft_assert_or_log, soft_panic_or_log,
30};
31use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
32use mz_persist_client::cli::admin::{CATALOG_FORCE_COMPACTION_FUEL, CATALOG_FORCE_COMPACTION_WAIT};
33use mz_persist_client::critical::{CriticalReaderId, Opaque, SinceHandle};
34use mz_persist_client::error::UpperMismatch;
35use mz_persist_client::read::{Listen, ListenEvent, ReadHandle};
36use mz_persist_client::write::WriteHandle;
37use mz_persist_client::{Diagnostics, PersistClient, ShardId};
38use mz_persist_types::codec_impls::UnitSchema;
39use mz_proto::{RustType, TryFromProtoError};
40use mz_repr::Diff;
41use mz_storage_client::controller::PersistEpoch;
42use mz_storage_types::StorageDiff;
43use mz_storage_types::sources::SourceData;
44use sha2::Digest;
45use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
46use tracing::{debug, info, warn};
47use uuid::Uuid;
48
49use crate::durable::debug::{Collection, CollectionType, DebugCatalogState, Trace};
50use crate::durable::error::FenceError;
51use crate::durable::initialize::{
52    ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, SYSTEM_CONFIG_SYNCED_KEY, USER_VERSION_KEY,
53    WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
54};
55use crate::durable::metrics::Metrics;
56use crate::durable::objects::state_update::{
57    IntoStateUpdateKindJson, StateUpdate, StateUpdateKind, StateUpdateKindJson,
58    TryIntoStateUpdateKind,
59};
60use crate::durable::objects::{AuditLogKey, FenceToken, Snapshot};
61use crate::durable::transaction::TransactionBatch;
62use crate::durable::upgrade::upgrade;
63use crate::durable::{
64    AuditLogIterator, BootstrapArgs, CATALOG_CONTENT_VERSION_KEY, CatalogError,
65    DurableCatalogError, DurableCatalogState, Epoch, OpenableDurableCatalogState,
66    ReadOnlyDurableCatalogState, Transaction, initialize, persist_desc,
67};
68use crate::memory;
69
70/// New-type used to represent timestamps in persist.
71pub(crate) type Timestamp = mz_repr::Timestamp;
72
73/// The minimum value of an epoch.
74///
75/// # Safety
76/// `new_unchecked` is safe to call with a non-zero value.
77const MIN_EPOCH: Epoch = unsafe { Epoch::new_unchecked(1) };
78
79/// Human readable catalog shard name.
80const CATALOG_SHARD_NAME: &str = "catalog";
81
82/// [`CriticalReaderId`] for the catalog shard's own since hold, separate from
83/// [`PersistClient::CONTROLLER_CRITICAL_SINCE`].
84static CATALOG_CRITICAL_SINCE: LazyLock<CriticalReaderId> = LazyLock::new(|| {
85    "c55555555-6666-7777-8888-999999999999"
86        .parse()
87        .expect("valid CriticalReaderId")
88});
89
90/// Seed used to generate the persist shard ID for the catalog.
91const CATALOG_SEED: usize = 1;
92/// Legacy seed used to generate the persist shard ID for the upgrade shard. DO NOT REUSE.
93const _UPGRADE_SEED: usize = 2;
94/// Legacy seed used to generate the persist shard ID for builtin table migrations. DO NOT REUSE.
95pub const _BUILTIN_MIGRATION_SEED: usize = 3;
96/// Legacy seed used to generate the persist shard ID for the expression cache. DO NOT REUSE.
97pub const _EXPRESSION_CACHE_SEED: usize = 4;
98
99/// Durable catalog mode that dictates the effect of mutable operations.
100#[derive(Debug, Copy, Clone, Eq, PartialEq)]
101pub(crate) enum Mode {
102    /// Mutable operations are prohibited.
103    Readonly,
104    /// Mutable operations have an effect in-memory, but aren't persisted durably.
105    Savepoint,
106    /// Mutable operations have an effect in-memory and durably.
107    Writable,
108}
109
110/// Enum representing the fenced state of the catalog.
111#[derive(Debug)]
112pub(crate) enum FenceableToken {
113    /// The catalog is still initializing and learning about previously written fence tokens. This
114    /// state can be fenced if it encounters a larger deploy generation.
115    Initializing {
116        /// The largest fence token durably written to the catalog, if any.
117        durable_token: Option<FenceToken>,
118        /// This process's deploy generation.
119        current_deploy_generation: Option<u64>,
120    },
121    /// The current token has not been fenced.
122    Unfenced { current_token: FenceToken },
123    /// The current token has been fenced.
124    Fenced {
125        current_token: FenceToken,
126        fence_token: FenceToken,
127    },
128}
129
130impl FenceableToken {
131    /// Returns a new token.
132    fn new(current_deploy_generation: Option<u64>) -> Self {
133        Self::Initializing {
134            durable_token: None,
135            current_deploy_generation,
136        }
137    }
138
139    /// Returns the current token if it is not fenced, otherwise returns an error.
140    fn validate(&self) -> Result<Option<FenceToken>, FenceError> {
141        match self {
142            FenceableToken::Initializing { durable_token, .. } => Ok(durable_token.clone()),
143            FenceableToken::Unfenced { current_token, .. } => Ok(Some(current_token.clone())),
144            FenceableToken::Fenced {
145                current_token,
146                fence_token,
147            } => {
148                assert!(
149                    fence_token > current_token,
150                    "must be fenced by higher token; current={current_token:?}, fence={fence_token:?}"
151                );
152                if fence_token.deploy_generation > current_token.deploy_generation {
153                    Err(FenceError::DeployGeneration {
154                        current_generation: current_token.deploy_generation,
155                        fence_generation: fence_token.deploy_generation,
156                    })
157                } else {
158                    assert!(
159                        fence_token.epoch > current_token.epoch,
160                        "must be fenced by higher token; current={current_token:?}, fence={fence_token:?}"
161                    );
162                    Err(FenceError::Epoch {
163                        current_epoch: current_token.epoch,
164                        fence_epoch: fence_token.epoch,
165                    })
166                }
167            }
168        }
169    }
170
171    /// Returns the current token.
172    fn token(&self) -> Option<FenceToken> {
173        match self {
174            FenceableToken::Initializing { durable_token, .. } => durable_token.clone(),
175            FenceableToken::Unfenced { current_token, .. } => Some(current_token.clone()),
176            FenceableToken::Fenced { current_token, .. } => Some(current_token.clone()),
177        }
178    }
179
180    /// Returns `Err` if `token` fences out `self`, `Ok` otherwise.
181    fn maybe_fence(&mut self, token: FenceToken) -> Result<(), FenceError> {
182        match self {
183            FenceableToken::Initializing {
184                durable_token,
185                current_deploy_generation,
186                ..
187            } => {
188                match durable_token {
189                    Some(durable_token) => {
190                        *durable_token = max(durable_token.clone(), token.clone());
191                    }
192                    None => {
193                        *durable_token = Some(token.clone());
194                    }
195                }
196                if let Some(current_deploy_generation) = current_deploy_generation {
197                    if *current_deploy_generation < token.deploy_generation {
198                        *self = FenceableToken::Fenced {
199                            current_token: FenceToken {
200                                deploy_generation: *current_deploy_generation,
201                                epoch: token.epoch,
202                            },
203                            fence_token: token,
204                        };
205                        self.validate()?;
206                    }
207                }
208            }
209            FenceableToken::Unfenced { current_token } => {
210                if *current_token < token {
211                    *self = FenceableToken::Fenced {
212                        current_token: current_token.clone(),
213                        fence_token: token,
214                    };
215                    self.validate()?;
216                }
217            }
218            FenceableToken::Fenced { .. } => {
219                self.validate()?;
220            }
221        }
222
223        Ok(())
224    }
225
226    /// Returns a [`FenceableToken::Unfenced`] token and the updates to the catalog required to
227    /// transition to the `Unfenced` state if `self` is [`FenceableToken::Initializing`], otherwise
228    /// returns `None`.
229    fn generate_unfenced_token(
230        &self,
231        mode: Mode,
232    ) -> Result<Option<(Vec<(StateUpdateKind, Diff)>, FenceableToken)>, DurableCatalogError> {
233        let (durable_token, current_deploy_generation) = match self {
234            FenceableToken::Initializing {
235                durable_token,
236                current_deploy_generation,
237            } => (durable_token.clone(), current_deploy_generation.clone()),
238            FenceableToken::Unfenced { .. } | FenceableToken::Fenced { .. } => return Ok(None),
239        };
240
241        let mut fence_updates = Vec::with_capacity(2);
242
243        if let Some(durable_token) = &durable_token {
244            fence_updates.push((
245                StateUpdateKind::FenceToken(durable_token.clone()),
246                Diff::MINUS_ONE,
247            ));
248        }
249
250        let current_deploy_generation = current_deploy_generation
251            .or_else(|| durable_token.as_ref().map(|token| token.deploy_generation))
252            // We cannot initialize a catalog without a deploy generation.
253            .ok_or(DurableCatalogError::Uninitialized)?;
254        let mut current_epoch = durable_token
255            .map(|token| token.epoch)
256            .unwrap_or(MIN_EPOCH)
257            .get();
258        // Only writable catalogs attempt to increment the epoch.
259        if matches!(mode, Mode::Writable) {
260            current_epoch = current_epoch + 1;
261        }
262        let current_epoch = Epoch::new(current_epoch).expect("known to be non-zero");
263        let current_token = FenceToken {
264            deploy_generation: current_deploy_generation,
265            epoch: current_epoch,
266        };
267
268        fence_updates.push((
269            StateUpdateKind::FenceToken(current_token.clone()),
270            Diff::ONE,
271        ));
272
273        let current_fenceable_token = FenceableToken::Unfenced { current_token };
274
275        Ok(Some((fence_updates, current_fenceable_token)))
276    }
277}
278
279/// An error that can occur while executing [`PersistHandle::compare_and_append`].
280#[derive(Debug, thiserror::Error)]
281pub(crate) enum CompareAndAppendError {
282    #[error(transparent)]
283    Fence(#[from] FenceError),
284    /// Catalog encountered an upper mismatch when trying to write to the catalog. This should only
285    /// happen while trying to fence out other catalogs.
286    #[error(
287        "expected catalog upper {expected_upper:?} did not match actual catalog upper {actual_upper:?}"
288    )]
289    UpperMismatch {
290        expected_upper: Timestamp,
291        actual_upper: Timestamp,
292    },
293}
294
295impl CompareAndAppendError {
296    pub(crate) fn unwrap_fence_error(self) -> FenceError {
297        match self {
298            CompareAndAppendError::Fence(e) => e,
299            e @ CompareAndAppendError::UpperMismatch { .. } => {
300                panic!("unexpected upper mismatch: {e:?}")
301            }
302        }
303    }
304}
305
306impl From<UpperMismatch<Timestamp>> for CompareAndAppendError {
307    fn from(upper_mismatch: UpperMismatch<Timestamp>) -> Self {
308        Self::UpperMismatch {
309            expected_upper: antichain_to_timestamp(upper_mismatch.expected),
310            actual_upper: antichain_to_timestamp(upper_mismatch.current),
311        }
312    }
313}
314
315pub(crate) trait ApplyUpdate<T: IntoStateUpdateKindJson> {
316    /// Process and apply `update`.
317    ///
318    /// Returns `Some` if `update` should be cached in memory and `None` otherwise.
319    fn apply_update(
320        &mut self,
321        update: StateUpdate<T>,
322        current_fence_token: &mut FenceableToken,
323        metrics: &Arc<Metrics>,
324    ) -> Result<Option<StateUpdate<T>>, FenceError>;
325}
326
327/// A handle for interacting with the persist catalog shard.
328///
329/// The catalog shard is used in multiple different contexts, for example pre-open and post-open,
330/// but for all contexts the majority of the durable catalog's behavior is identical. This struct
331/// implements those behaviors that are identical while allowing the user to specify the different
332/// behaviors via generic parameters.
333///
334/// The behavior of the durable catalog can be different along one of two axes. The first is the
335/// format of each individual update, i.e. raw binary, the current protobuf version, previous
336/// protobuf versions, etc. The second axis is what to do with each individual update, for example
337/// before opening we cache all config updates but don't cache them after opening. These behaviors
338/// are customizable via the `T: TryIntoStateUpdateKind` and `U: ApplyUpdate<T>` generic parameters
339/// respectively.
340#[derive(Debug)]
341pub(crate) struct PersistHandle<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> {
342    /// The [`Mode`] that this catalog was opened in.
343    pub(crate) mode: Mode,
344    /// Since handle to control compaction.
345    since_handle: SinceHandle<SourceData, (), Timestamp, StorageDiff>,
346    /// Write handle to persist.
347    write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
348    /// Listener to catalog changes.
349    listen: Listen<SourceData, (), Timestamp, StorageDiff>,
350    /// Handle for connecting to persist.
351    persist_client: PersistClient,
352    /// Catalog shard ID.
353    shard_id: ShardId,
354    /// Cache of the most recent catalog snapshot.
355    ///
356    /// We use a tuple instead of [`StateUpdate`] to make consolidation easier.
357    pub(crate) snapshot: Vec<(T, Timestamp, Diff)>,
358    /// Applies custom processing, filtering, and fencing for each individual update.
359    update_applier: U,
360    /// The current upper of the persist shard.
361    pub(crate) upper: Timestamp,
362    /// The fence token of the catalog, if one exists.
363    fenceable_token: FenceableToken,
364    /// The semantic version of the current binary.
365    catalog_content_version: semver::Version,
366    /// Flag to indicate if bootstrap is complete.
367    bootstrap_complete: bool,
368    /// Metrics for the persist catalog.
369    metrics: Arc<Metrics>,
370}
371
372impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
373    /// Fetch the current upper of the catalog state.
374    #[mz_ore::instrument]
375    async fn current_upper(&mut self) -> Timestamp {
376        match self.mode {
377            Mode::Writable | Mode::Readonly => {
378                let upper = self.write_handle.fetch_recent_upper().await;
379                antichain_to_timestamp(upper.clone())
380            }
381            Mode::Savepoint => self.upper,
382        }
383    }
384
385    /// Appends `updates` iff the current global upper of the catalog is `self.upper`.
386    ///
387    /// Returns the next upper used to commit the transaction.
388    #[mz_ore::instrument]
389    pub(crate) async fn compare_and_append<S: IntoStateUpdateKindJson>(
390        &mut self,
391        updates: Vec<(S, Diff)>,
392        commit_ts: Timestamp,
393    ) -> Result<Timestamp, CompareAndAppendError> {
394        assert_eq!(self.mode, Mode::Writable);
395        assert!(
396            commit_ts >= self.upper,
397            "expected commit ts, {}, to be greater than or equal to upper, {}",
398            commit_ts,
399            self.upper
400        );
401
402        // This awkward code allows us to perform an expensive soft assert that requires cloning
403        // `updates` twice, after `updates` has been consumed.
404        let contains_fence = if mz_ore::assert::soft_assertions_enabled() {
405            let updates: Vec<_> = updates.clone();
406            let parsed_updates: Vec<_> = updates
407                .clone()
408                .into_iter()
409                .map(|(update, diff)| {
410                    let update: StateUpdateKindJson = update.into();
411                    (update, diff)
412                })
413                .filter_map(|(update, diff)| {
414                    <StateUpdateKindJson as TryIntoStateUpdateKind>::try_into(update)
415                        .ok()
416                        .map(|update| (update, diff))
417                })
418                .collect();
419            let contains_retraction = parsed_updates.iter().any(|(update, diff)| {
420                matches!(update, StateUpdateKind::FenceToken(..)) && *diff == Diff::MINUS_ONE
421            });
422            let contains_addition = parsed_updates.iter().any(|(update, diff)| {
423                matches!(update, StateUpdateKind::FenceToken(..)) && *diff == Diff::ONE
424            });
425            let contains_fence = contains_retraction && contains_addition;
426            Some((contains_fence, updates))
427        } else {
428            None
429        };
430
431        let updates = updates.into_iter().map(|(kind, diff)| {
432            let kind: StateUpdateKindJson = kind.into();
433            (
434                (Into::<SourceData>::into(kind), ()),
435                commit_ts,
436                diff.into_inner(),
437            )
438        });
439        let next_upper = commit_ts.step_forward();
440        let res = self
441            .write_handle
442            .compare_and_append(
443                updates,
444                Antichain::from_elem(self.upper),
445                Antichain::from_elem(next_upper),
446            )
447            .await
448            .expect("invalid usage");
449
450        // There was an upper mismatch which means something else must have written to the catalog.
451        // Syncing to the current upper should result in a fence error since writing to the catalog
452        // without fencing other catalogs should be impossible. The one exception is if we are
453        // trying to fence other catalogs with this write, in which case we won't see a fence error.
454        if let Err(e @ UpperMismatch { .. }) = res {
455            self.sync_to_current_upper().await?;
456            if let Some((contains_fence, updates)) = contains_fence {
457                assert!(
458                    contains_fence,
459                    "updates were neither fenced nor fencing and encountered an upper mismatch: {updates:#?}"
460                )
461            }
462            return Err(e.into());
463        }
464
465        // Lag the shard's upper by 1 to keep it readable.
466        let downgrade_to = Antichain::from_elem(next_upper.saturating_sub(1));
467
468        // The since handle gives us the ability to fence out other downgraders using an opaque token.
469        // (See the method documentation for details.)
470        // That's not needed here, so we use the since handle's opaque token to avoid any comparison
471        // failures.
472        let opaque = self.since_handle.opaque().clone();
473        let downgrade = self
474            .since_handle
475            .maybe_compare_and_downgrade_since(&opaque, (&opaque, &downgrade_to))
476            .await;
477
478        match downgrade {
479            None => {}
480            Some(Err(e)) => soft_panic_or_log!("found opaque value {e:?}, but expected {opaque:?}"),
481            Some(Ok(updated)) => soft_assert_or_log!(
482                updated == downgrade_to,
483                "updated bound should match expected"
484            ),
485        }
486        self.sync(next_upper).await?;
487        Ok(next_upper)
488    }
489
490    /// Generates an iterator of [`StateUpdate`] that contain all unconsolidated updates to the
491    /// catalog state up to, and including, `as_of`.
492    #[mz_ore::instrument]
493    async fn snapshot_unconsolidated(&mut self) -> Vec<StateUpdate<StateUpdateKind>> {
494        let current_upper = self.current_upper().await;
495
496        let mut snapshot = Vec::new();
497        let mut read_handle = self.read_handle().await;
498        let as_of = as_of(&read_handle, current_upper);
499        let mut stream = Box::pin(
500            // We use `snapshot_and_stream` because it guarantees unconsolidated output.
501            read_handle
502                .snapshot_and_stream(Antichain::from_elem(as_of))
503                .await
504                .expect("we have advanced the restart_as_of by the since"),
505        );
506        while let Some(update) = stream.next().await {
507            snapshot.push(update)
508        }
509        read_handle.expire().await;
510        snapshot
511            .into_iter()
512            .map(Into::<StateUpdate<StateUpdateKindJson>>::into)
513            .map(|state_update| state_update.try_into().expect("kind decoding error"))
514            .collect()
515    }
516
517    /// Listen and apply all updates that are currently in persist.
518    ///
519    /// Returns an error if this instance has been fenced out.
520    #[mz_ore::instrument]
521    pub(crate) async fn sync_to_current_upper(&mut self) -> Result<(), FenceError> {
522        let upper = self.current_upper().await;
523        self.sync(upper).await
524    }
525
526    /// Listen and apply all updates up to `target_upper`.
527    ///
528    /// Returns an error if this instance has been fenced out.
529    #[mz_ore::instrument(level = "debug")]
530    pub(crate) async fn sync(&mut self, target_upper: Timestamp) -> Result<(), FenceError> {
531        self.metrics.syncs.inc();
532        let counter = self.metrics.sync_latency_seconds.clone();
533        self.sync_inner(target_upper)
534            .wall_time()
535            .inc_by(counter)
536            .await
537    }
538
539    #[mz_ore::instrument(level = "debug")]
540    async fn sync_inner(&mut self, target_upper: Timestamp) -> Result<(), FenceError> {
541        self.fenceable_token.validate()?;
542
543        // Savepoint catalogs do not yet know how to update themselves in response to concurrent
544        // writes from writer catalogs.
545        if self.mode == Mode::Savepoint {
546            self.upper = max(self.upper, target_upper);
547            return Ok(());
548        }
549
550        let mut updates: BTreeMap<_, Vec<_>> = BTreeMap::new();
551
552        while self.upper < target_upper {
553            let listen_events = self.listen.fetch_next().await;
554            for listen_event in listen_events {
555                match listen_event {
556                    ListenEvent::Progress(upper) => {
557                        debug!("synced up to {upper:?}");
558                        self.upper = antichain_to_timestamp(upper);
559                        // Attempt to apply updates in batches of a single timestamp. If another
560                        // catalog wrote a fence token at one timestamp and then updates in a new
561                        // format at a later timestamp, then we want to apply the fence token
562                        // before attempting to deserialize the new updates.
563                        while let Some((ts, updates)) = updates.pop_first() {
564                            assert!(ts < self.upper, "expected {} < {}", ts, self.upper);
565                            let updates = updates.into_iter().map(
566                                |update: StateUpdate<StateUpdateKindJson>| {
567                                    let kind =
568                                        T::try_from(update.kind).expect("kind decoding error");
569                                    StateUpdate {
570                                        kind,
571                                        ts: update.ts,
572                                        diff: update.diff,
573                                    }
574                                },
575                            );
576                            self.apply_updates(updates)?;
577                        }
578                    }
579                    ListenEvent::Updates(batch_updates) => {
580                        for update in batch_updates {
581                            let update: StateUpdate<StateUpdateKindJson> = update.into();
582                            updates.entry(update.ts).or_default().push(update);
583                        }
584                    }
585                }
586            }
587        }
588        assert_eq!(updates, BTreeMap::new(), "all updates should be applied");
589        Ok(())
590    }
591
592    #[mz_ore::instrument(level = "debug")]
593    pub(crate) fn apply_updates(
594        &mut self,
595        updates: impl IntoIterator<Item = StateUpdate<T>>,
596    ) -> Result<(), FenceError> {
597        let mut updates: Vec<_> = updates
598            .into_iter()
599            .map(|StateUpdate { kind, ts, diff }| (kind, ts, diff))
600            .collect();
601
602        // This helps guarantee that for a single key, there is at most a single retraction and a
603        // single insertion per timestamp. Otherwise, we would need to match the retractions and
604        // insertions up by value and manually figure out what the end value should be.
605        differential_dataflow::consolidation::consolidate_updates(&mut updates);
606
607        // Updates must be applied in timestamp order. Within a timestamp retractions must be
608        // applied before insertions, or we might end up retracting the wrong value.
609        updates.sort_by(|(_, ts1, diff1), (_, ts2, diff2)| ts1.cmp(ts2).then(diff1.cmp(diff2)));
610
611        let mut errors = Vec::new();
612
613        for (kind, ts, diff) in updates {
614            if diff != Diff::ONE && diff != Diff::MINUS_ONE {
615                panic!("invalid update in consolidated trace: ({kind:?}, {ts:?}, {diff:?})");
616            }
617
618            match self.update_applier.apply_update(
619                StateUpdate { kind, ts, diff },
620                &mut self.fenceable_token,
621                &self.metrics,
622            ) {
623                Ok(Some(StateUpdate { kind, ts, diff })) => self.snapshot.push((kind, ts, diff)),
624                Ok(None) => {}
625                // Instead of returning immediately, we accumulate all the errors and return the one
626                // with the most information.
627                Err(err) => errors.push(err),
628            }
629        }
630
631        errors.sort();
632        if let Some(err) = errors.into_iter().next() {
633            return Err(err);
634        }
635
636        self.consolidate();
637
638        Ok(())
639    }
640
641    #[mz_ore::instrument]
642    pub(crate) fn consolidate(&mut self) {
643        soft_assert_no_log!(
644            self.snapshot
645                .windows(2)
646                .all(|updates| updates[0].1 <= updates[1].1),
647            "snapshot should be sorted by timestamp, {:#?}",
648            self.snapshot
649        );
650
651        let new_ts = self
652            .snapshot
653            .last()
654            .map(|(_, ts, _)| *ts)
655            .unwrap_or_else(Timestamp::minimum);
656        for (_, ts, _) in &mut self.snapshot {
657            *ts = new_ts;
658        }
659        differential_dataflow::consolidation::consolidate_updates(&mut self.snapshot);
660    }
661
662    /// Execute and return the results of `f` on the current catalog trace.
663    ///
664    /// Will return an error if the catalog has been fenced out.
665    async fn with_trace<R>(
666        &mut self,
667        f: impl FnOnce(&Vec<(T, Timestamp, Diff)>) -> Result<R, CatalogError>,
668    ) -> Result<R, CatalogError> {
669        self.sync_to_current_upper().await?;
670        f(&self.snapshot)
671    }
672
673    /// Open a read handle to the catalog.
674    async fn read_handle(&self) -> ReadHandle<SourceData, (), Timestamp, StorageDiff> {
675        self.persist_client
676            .open_leased_reader(
677                self.shard_id,
678                Arc::new(persist_desc()),
679                Arc::new(UnitSchema::default()),
680                Diagnostics {
681                    shard_name: CATALOG_SHARD_NAME.to_string(),
682                    handle_purpose: "openable durable catalog state temporary reader".to_string(),
683                },
684                USE_CRITICAL_SINCE_CATALOG.get(self.persist_client.dyncfgs()),
685            )
686            .await
687            .expect("invalid usage")
688    }
689
690    /// Politely releases all external resources that can only be released in an async context.
691    async fn expire(self: Box<Self>) {
692        self.write_handle.expire().await;
693        self.listen.expire().await;
694    }
695}
696
697impl<U: ApplyUpdate<StateUpdateKind>> PersistHandle<StateUpdateKind, U> {
698    /// Execute and return the results of `f` on the current catalog snapshot.
699    ///
700    /// Will return an error if the catalog has been fenced out.
701    async fn with_snapshot<T>(
702        &mut self,
703        f: impl FnOnce(Snapshot) -> Result<T, CatalogError>,
704    ) -> Result<T, CatalogError> {
705        fn apply<K, V>(map: &mut BTreeMap<K, V>, key: &K, value: &V, diff: Diff)
706        where
707            K: Ord + Clone,
708            V: Ord + Clone + Debug,
709        {
710            let key = key.clone();
711            let value = value.clone();
712            if diff == Diff::ONE {
713                let prev = map.insert(key, value);
714                assert_eq!(
715                    prev, None,
716                    "values must be explicitly retracted before inserting a new value"
717                );
718            } else if diff == Diff::MINUS_ONE {
719                let prev = map.remove(&key);
720                assert_eq!(
721                    prev,
722                    Some(value),
723                    "retraction does not match existing value"
724                );
725            }
726        }
727
728        self.with_trace(|trace| {
729            let mut snapshot = Snapshot::empty();
730            for (kind, ts, diff) in trace {
731                let diff = *diff;
732                if diff != Diff::ONE && diff != Diff::MINUS_ONE {
733                    panic!("invalid update in consolidated trace: ({kind:?}, {ts:?}, {diff:?})");
734                }
735
736                match kind {
737                    StateUpdateKind::AuditLog(_key, ()) => {
738                        // Ignore for snapshots.
739                    }
740                    StateUpdateKind::Cluster(key, value) => {
741                        apply(&mut snapshot.clusters, key, value, diff);
742                    }
743                    StateUpdateKind::ClusterReplica(key, value) => {
744                        apply(&mut snapshot.cluster_replicas, key, value, diff);
745                    }
746                    StateUpdateKind::Comment(key, value) => {
747                        apply(&mut snapshot.comments, key, value, diff);
748                    }
749                    StateUpdateKind::Config(key, value) => {
750                        apply(&mut snapshot.configs, key, value, diff);
751                    }
752                    StateUpdateKind::Database(key, value) => {
753                        apply(&mut snapshot.databases, key, value, diff);
754                    }
755                    StateUpdateKind::DefaultPrivilege(key, value) => {
756                        apply(&mut snapshot.default_privileges, key, value, diff);
757                    }
758                    StateUpdateKind::FenceToken(_token) => {
759                        // Ignore for snapshots.
760                    }
761                    StateUpdateKind::IdAllocator(key, value) => {
762                        apply(&mut snapshot.id_allocator, key, value, diff);
763                    }
764                    StateUpdateKind::IntrospectionSourceIndex(key, value) => {
765                        apply(&mut snapshot.introspection_sources, key, value, diff);
766                    }
767                    StateUpdateKind::Item(key, value) => {
768                        apply(&mut snapshot.items, key, value, diff);
769                    }
770                    StateUpdateKind::NetworkPolicy(key, value) => {
771                        apply(&mut snapshot.network_policies, key, value, diff);
772                    }
773                    StateUpdateKind::Role(key, value) => {
774                        apply(&mut snapshot.roles, key, value, diff);
775                    }
776                    StateUpdateKind::Schema(key, value) => {
777                        apply(&mut snapshot.schemas, key, value, diff);
778                    }
779                    StateUpdateKind::Setting(key, value) => {
780                        apply(&mut snapshot.settings, key, value, diff);
781                    }
782                    StateUpdateKind::SourceReferences(key, value) => {
783                        apply(&mut snapshot.source_references, key, value, diff);
784                    }
785                    StateUpdateKind::SystemConfiguration(key, value) => {
786                        apply(&mut snapshot.system_configurations, key, value, diff);
787                    }
788                    StateUpdateKind::SystemObjectMapping(key, value) => {
789                        apply(&mut snapshot.system_object_mappings, key, value, diff);
790                    }
791                    StateUpdateKind::SystemPrivilege(key, value) => {
792                        apply(&mut snapshot.system_privileges, key, value, diff);
793                    }
794                    StateUpdateKind::StorageCollectionMetadata(key, value) => {
795                        apply(&mut snapshot.storage_collection_metadata, key, value, diff);
796                    }
797                    StateUpdateKind::UnfinalizedShard(key, ()) => {
798                        apply(&mut snapshot.unfinalized_shards, key, &(), diff);
799                    }
800                    StateUpdateKind::TxnWalShard((), value) => {
801                        apply(&mut snapshot.txn_wal_shard, &(), value, diff);
802                    }
803                    StateUpdateKind::RoleAuth(key, value) => {
804                        apply(&mut snapshot.role_auth, key, value, diff);
805                    }
806                }
807            }
808            f(snapshot)
809        })
810        .await
811    }
812
813    /// Generates an iterator of [`StateUpdate`] that contain all updates to the catalog
814    /// state.
815    ///
816    /// The output is fetched directly from persist instead of the in-memory cache.
817    ///
818    /// The output is consolidated and sorted by timestamp in ascending order.
819    #[mz_ore::instrument(level = "debug")]
820    async fn persist_snapshot(&self) -> impl Iterator<Item = StateUpdate> + DoubleEndedIterator {
821        let mut read_handle = self.read_handle().await;
822        let as_of = as_of(&read_handle, self.upper);
823        let snapshot = snapshot_binary(&mut read_handle, as_of, &self.metrics)
824            .await
825            .map(|update| update.try_into().expect("kind decoding error"));
826        read_handle.expire().await;
827        snapshot
828    }
829}
830
831/// Applies updates for an unopened catalog.
832#[derive(Debug)]
833pub(crate) struct UnopenedCatalogStateInner {
834    /// A cache of the config collection of the catalog.
835    configs: BTreeMap<String, u64>,
836    /// A cache of the settings collection of the catalog.
837    settings: BTreeMap<String, String>,
838}
839
840impl UnopenedCatalogStateInner {
841    fn new() -> UnopenedCatalogStateInner {
842        UnopenedCatalogStateInner {
843            configs: BTreeMap::new(),
844            settings: BTreeMap::new(),
845        }
846    }
847}
848
849impl ApplyUpdate<StateUpdateKindJson> for UnopenedCatalogStateInner {
850    fn apply_update(
851        &mut self,
852        update: StateUpdate<StateUpdateKindJson>,
853        current_fence_token: &mut FenceableToken,
854        _metrics: &Arc<Metrics>,
855    ) -> Result<Option<StateUpdate<StateUpdateKindJson>>, FenceError> {
856        if !update.kind.is_audit_log() && update.kind.is_always_deserializable() {
857            let kind = TryInto::try_into(&update.kind).expect("kind is known to be deserializable");
858            match (kind, update.diff) {
859                (StateUpdateKind::Config(key, value), Diff::ONE) => {
860                    let prev = self.configs.insert(key.key, value.value);
861                    assert_eq!(
862                        prev, None,
863                        "values must be explicitly retracted before inserting a new value"
864                    );
865                }
866                (StateUpdateKind::Config(key, value), Diff::MINUS_ONE) => {
867                    let prev = self.configs.remove(&key.key);
868                    assert_eq!(
869                        prev,
870                        Some(value.value),
871                        "retraction does not match existing value"
872                    );
873                }
874                (StateUpdateKind::Setting(key, value), Diff::ONE) => {
875                    let prev = self.settings.insert(key.name, value.value);
876                    assert_eq!(
877                        prev, None,
878                        "values must be explicitly retracted before inserting a new value"
879                    );
880                }
881                (StateUpdateKind::Setting(key, value), Diff::MINUS_ONE) => {
882                    let prev = self.settings.remove(&key.name);
883                    assert_eq!(
884                        prev,
885                        Some(value.value),
886                        "retraction does not match existing value"
887                    );
888                }
889                (StateUpdateKind::FenceToken(fence_token), Diff::ONE) => {
890                    current_fence_token.maybe_fence(fence_token)?;
891                }
892                _ => {}
893            }
894        }
895
896        Ok(Some(update))
897    }
898}
899
900/// A Handle to an unopened catalog stored in persist. The unopened catalog can serve `Config` data,
901/// `Setting` data, or the current epoch. All other catalog data may be un-migrated and should not
902/// be read until the catalog has been opened. The [`UnopenedPersistCatalogState`] is responsible
903/// for opening the catalog, see [`OpenableDurableCatalogState::open`] for more details.
904///
905/// Production users should call [`Self::expire`] before dropping an [`UnopenedPersistCatalogState`]
906/// so that it can expire its leases. If/when rust gets AsyncDrop, this will be done automatically.
907pub(crate) type UnopenedPersistCatalogState =
908    PersistHandle<StateUpdateKindJson, UnopenedCatalogStateInner>;
909
910impl UnopenedPersistCatalogState {
911    /// Create a new [`UnopenedPersistCatalogState`] to the catalog state associated with
912    /// `organization_id`.
913    ///
914    /// All usages of the persist catalog must go through this function. That includes the
915    /// catalog-debug tool, the adapter's catalog, etc.
916    #[mz_ore::instrument]
917    pub(crate) async fn new(
918        persist_client: PersistClient,
919        organization_id: Uuid,
920        version: semver::Version,
921        deploy_generation: Option<u64>,
922        metrics: Arc<Metrics>,
923    ) -> Result<UnopenedPersistCatalogState, DurableCatalogError> {
924        let catalog_shard_id = shard_id(organization_id, CATALOG_SEED);
925        debug!(?catalog_shard_id, "new persist backed catalog state");
926
927        // Check the catalog shard version to ensure that we are compatible with the persist
928        // data format. This lets us return an error gracefully, rather than panicking later in
929        // persist.
930        let version_in_catalog_shard =
931            fetch_catalog_shard_version(&persist_client, catalog_shard_id).await;
932        if let Some(version_in_catalog_shard) = version_in_catalog_shard {
933            if !mz_persist_client::cfg::code_can_write_data(&version, &version_in_catalog_shard) {
934                return Err(DurableCatalogError::IncompatiblePersistVersion {
935                    found_version: version_in_catalog_shard,
936                    catalog_version: version,
937                });
938            }
939        }
940
941        let open_handles_start = Instant::now();
942        info!("startup: envd serve: catalog init: open handles beginning");
943        let since_handle = persist_client
944            .open_critical_since(
945                catalog_shard_id,
946                CATALOG_CRITICAL_SINCE.clone(),
947                Opaque::encode(&i64::MIN),
948                Diagnostics {
949                    shard_name: CATALOG_SHARD_NAME.to_string(),
950                    handle_purpose: "durable catalog state critical since".to_string(),
951                },
952            )
953            .await
954            .expect("invalid usage");
955
956        let (mut write_handle, mut read_handle) = persist_client
957            .open(
958                catalog_shard_id,
959                Arc::new(persist_desc()),
960                Arc::new(UnitSchema::default()),
961                Diagnostics {
962                    shard_name: CATALOG_SHARD_NAME.to_string(),
963                    handle_purpose: "durable catalog state handles".to_string(),
964                },
965                USE_CRITICAL_SINCE_CATALOG.get(persist_client.dyncfgs()),
966            )
967            .await
968            .expect("invalid usage");
969        info!(
970            "startup: envd serve: catalog init: open handles complete in {:?}",
971            open_handles_start.elapsed()
972        );
973
974        // Commit an empty write at the minimum timestamp so the catalog is always readable.
975        let upper = {
976            const EMPTY_UPDATES: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
977            let upper = Antichain::from_elem(Timestamp::minimum());
978            let next_upper = Timestamp::minimum().step_forward();
979            match write_handle
980                .compare_and_append(EMPTY_UPDATES, upper, Antichain::from_elem(next_upper))
981                .await
982                .expect("invalid usage")
983            {
984                Ok(()) => next_upper,
985                Err(mismatch) => antichain_to_timestamp(mismatch.current),
986            }
987        };
988
989        let snapshot_start = Instant::now();
990        info!("startup: envd serve: catalog init: snapshot beginning");
991        let as_of = as_of(&read_handle, upper);
992        let snapshot: Vec<_> = snapshot_binary(&mut read_handle, as_of, &metrics)
993            .await
994            .map(|StateUpdate { kind, ts, diff }| (kind, ts, diff))
995            .collect();
996        let listen = read_handle
997            .listen(Antichain::from_elem(as_of))
998            .await
999            .expect("invalid usage");
1000        info!(
1001            "startup: envd serve: catalog init: snapshot complete in {:?}",
1002            snapshot_start.elapsed()
1003        );
1004
1005        let mut handle = UnopenedPersistCatalogState {
1006            // Unopened catalogs are always writeable until they're opened in an explicit mode.
1007            mode: Mode::Writable,
1008            since_handle,
1009            write_handle,
1010            listen,
1011            persist_client,
1012            shard_id: catalog_shard_id,
1013            // Initialize empty in-memory state.
1014            snapshot: Vec::new(),
1015            update_applier: UnopenedCatalogStateInner::new(),
1016            upper,
1017            fenceable_token: FenceableToken::new(deploy_generation),
1018            catalog_content_version: version,
1019            bootstrap_complete: false,
1020            metrics,
1021        };
1022        // If the snapshot is not consolidated, and we see multiple epoch values while applying the
1023        // updates, then we might accidentally fence ourselves out.
1024        soft_assert_no_log!(
1025            snapshot.iter().all(|(_, _, diff)| *diff == Diff::ONE),
1026            "snapshot should be consolidated: {snapshot:#?}"
1027        );
1028
1029        let apply_start = Instant::now();
1030        info!("startup: envd serve: catalog init: apply updates beginning");
1031        let updates = snapshot
1032            .into_iter()
1033            .map(|(kind, ts, diff)| StateUpdate { kind, ts, diff });
1034        handle.apply_updates(updates)?;
1035        info!(
1036            "startup: envd serve: catalog init: apply updates complete in {:?}",
1037            apply_start.elapsed()
1038        );
1039
1040        // Validate that the binary version of the current process is not less than any binary
1041        // version that has written to the catalog.
1042        // This condition is only checked once, right here. If a new process comes along with a
1043        // higher version, it must fence this process out with one of the existing fencing
1044        // mechanisms.
1045        if let Some(found_version) = handle.get_catalog_content_version().await? {
1046            // Use cmp_precedence() to ignore build metadata per SemVer 2.0.0 spec
1047            if handle
1048                .catalog_content_version
1049                .cmp_precedence(&found_version)
1050                == std::cmp::Ordering::Less
1051            {
1052                return Err(DurableCatalogError::IncompatiblePersistVersion {
1053                    found_version,
1054                    catalog_version: handle.catalog_content_version,
1055                });
1056            }
1057        }
1058
1059        Ok(handle)
1060    }
1061
1062    #[mz_ore::instrument]
1063    async fn open_inner(
1064        mut self,
1065        mode: Mode,
1066        initial_ts: Timestamp,
1067        bootstrap_args: &BootstrapArgs,
1068    ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1069        // It would be nice to use `initial_ts` here, but it comes from the system clock, not the
1070        // timestamp oracle.
1071        let mut commit_ts = self.upper;
1072        self.mode = mode;
1073
1074        // Validate the current deploy generation.
1075        match (&self.mode, &self.fenceable_token) {
1076            (_, FenceableToken::Unfenced { .. } | FenceableToken::Fenced { .. }) => {
1077                return Err(DurableCatalogError::Internal(
1078                    "catalog should not have fenced before opening".to_string(),
1079                )
1080                .into());
1081            }
1082            (
1083                Mode::Writable | Mode::Savepoint,
1084                FenceableToken::Initializing {
1085                    current_deploy_generation: None,
1086                    ..
1087                },
1088            ) => {
1089                return Err(DurableCatalogError::Internal(format!(
1090                    "cannot open in mode '{:?}' without a deploy generation",
1091                    self.mode,
1092                ))
1093                .into());
1094            }
1095            _ => {}
1096        }
1097
1098        let read_only = matches!(self.mode, Mode::Readonly);
1099
1100        // Fence out previous catalogs.
1101        loop {
1102            self.sync_to_current_upper().await?;
1103            commit_ts = max(commit_ts, self.upper);
1104            let (fence_updates, current_fenceable_token) = self
1105                .fenceable_token
1106                .generate_unfenced_token(self.mode)?
1107                .ok_or_else(|| {
1108                    DurableCatalogError::Internal(
1109                        "catalog should not have fenced before opening".to_string(),
1110                    )
1111                })?;
1112            debug!(
1113                ?self.upper,
1114                ?self.fenceable_token,
1115                ?current_fenceable_token,
1116                "fencing previous catalogs"
1117            );
1118            if matches!(self.mode, Mode::Writable) {
1119                match self
1120                    .compare_and_append(fence_updates.clone(), commit_ts)
1121                    .await
1122                {
1123                    Ok(upper) => {
1124                        commit_ts = upper;
1125                    }
1126                    Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
1127                    Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
1128                        warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
1129                        continue;
1130                    }
1131                }
1132            }
1133            self.fenceable_token = current_fenceable_token;
1134            break;
1135        }
1136
1137        if matches!(self.mode, Mode::Writable) {
1138            // One-time migration: The catalog previously used `CONTROLLER_CRITICAL_SINCE` for its
1139            // since handle. Now it uses its own `CATALOG_CRITICAL_SINCE`, to free
1140            // `CONTROLLER_CRITICAL_SINCE` up for the storage controller. The catalog and
1141            // controller handles differ in the `Opaque` codec, so we need a migration.
1142            //
1143            // TODO: Remove this once we don't support upgrading from v26 anymore.
1144            let mut controller_handle = self
1145                .persist_client
1146                .open_critical_since::<SourceData, (), Timestamp, StorageDiff>(
1147                    self.shard_id,
1148                    PersistClient::CONTROLLER_CRITICAL_SINCE,
1149                    Opaque::encode(&i64::MIN),
1150                    Diagnostics {
1151                        shard_name: CATALOG_SHARD_NAME.to_string(),
1152                        handle_purpose: "durable catalog state critical since (migration)"
1153                            .to_string(),
1154                    },
1155                )
1156                .await
1157                .expect("invalid usage");
1158
1159            let since = controller_handle.since().clone();
1160            let res = controller_handle
1161                .compare_and_downgrade_since(
1162                    &Opaque::encode(&i64::MIN),
1163                    (&Opaque::encode(&PersistEpoch::default()), &since),
1164                )
1165                .await;
1166            match res {
1167                Ok(_) => info!("migrated Opaque of catalog since handle"),
1168                Err(_) => { /* critical since was already migrated */ }
1169            }
1170        }
1171
1172        let is_initialized = self.is_initialized_inner();
1173        if !matches!(self.mode, Mode::Writable) && !is_initialized {
1174            return Err(CatalogError::Durable(DurableCatalogError::NotWritable(
1175                format!(
1176                    "catalog tables do not exist; will not create in {:?} mode",
1177                    self.mode
1178                ),
1179            )));
1180        }
1181        soft_assert_ne_or_log!(self.upper, Timestamp::minimum());
1182
1183        // Remove all audit log entries.
1184        let (audit_logs, snapshot): (Vec<_>, Vec<_>) = self
1185            .snapshot
1186            .into_iter()
1187            .partition(|(update, _, _)| update.is_audit_log());
1188        self.snapshot = snapshot;
1189        let audit_log_count = audit_logs.iter().map(|(_, _, diff)| diff).sum::<Diff>();
1190        let audit_log_handle = AuditLogIterator::new(audit_logs);
1191
1192        // Perform data migrations.
1193        if is_initialized && !read_only {
1194            commit_ts = upgrade(&mut self, commit_ts).await?;
1195        }
1196
1197        debug!(
1198            ?is_initialized,
1199            ?self.upper,
1200            "initializing catalog state"
1201        );
1202        let mut catalog = PersistCatalogState {
1203            mode: self.mode,
1204            since_handle: self.since_handle,
1205            write_handle: self.write_handle,
1206            listen: self.listen,
1207            persist_client: self.persist_client,
1208            shard_id: self.shard_id,
1209            upper: self.upper,
1210            fenceable_token: self.fenceable_token,
1211            // Initialize empty in-memory state.
1212            snapshot: Vec::new(),
1213            update_applier: CatalogStateInner::new(),
1214            catalog_content_version: self.catalog_content_version,
1215            bootstrap_complete: false,
1216            metrics: self.metrics,
1217        };
1218        catalog.metrics.collection_entries.reset();
1219        // Normally, `collection_entries` is updated in `apply_updates`. The audit log updates skip
1220        // over that function so we manually update it here.
1221        catalog
1222            .metrics
1223            .collection_entries
1224            .with_label_values(&[&CollectionType::AuditLog.to_string()])
1225            .add(audit_log_count.into_inner());
1226        let updates = self.snapshot.into_iter().map(|(kind, ts, diff)| {
1227            let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
1228            StateUpdate { kind, ts, diff }
1229        });
1230        catalog.apply_updates(updates)?;
1231
1232        let catalog_content_version = catalog.catalog_content_version.to_string();
1233        let txn = if is_initialized {
1234            let mut txn = catalog.transaction().await?;
1235
1236            // Ad-hoc migration: Initialize the `migration_version` expected by adapter to be
1237            // present in existing catalogs.
1238            //
1239            // Note: Need to exclude read-only catalog mode here, because in that mode all
1240            // transactions are expected to be no-ops.
1241            // TODO: remove this once we only support upgrades from version >= 0.164
1242            if txn.get_setting("migration_version".into()).is_none() && mode != Mode::Readonly {
1243                let old_version = txn.get_catalog_content_version();
1244                txn.set_setting("migration_version".into(), old_version.map(Into::into))?;
1245            }
1246
1247            txn.set_catalog_content_version(catalog_content_version)?;
1248            txn
1249        } else {
1250            soft_assert_eq_no_log!(
1251                catalog
1252                    .snapshot
1253                    .iter()
1254                    .filter(|(kind, _, _)| !matches!(kind, StateUpdateKind::FenceToken(_)))
1255                    .count(),
1256                0,
1257                "trace should not contain any updates for an uninitialized catalog: {:#?}",
1258                catalog.snapshot
1259            );
1260
1261            let mut txn = catalog.transaction().await?;
1262            initialize::initialize(
1263                &mut txn,
1264                bootstrap_args,
1265                initial_ts.into(),
1266                catalog_content_version,
1267            )
1268            .await?;
1269            txn
1270        };
1271
1272        if read_only {
1273            let (txn_batch, _) = txn.into_parts();
1274            // The upper here doesn't matter because we are only applying the updates in memory.
1275            let updates = StateUpdate::from_txn_batch_ts(txn_batch, catalog.upper);
1276            catalog.apply_updates(updates)?;
1277        } else {
1278            txn.commit_internal(commit_ts).await?;
1279        }
1280
1281        if matches!(catalog.mode, Mode::Writable) {
1282            let write_handle = catalog
1283                .persist_client
1284                .open_writer::<SourceData, (), Timestamp, i64>(
1285                    catalog.write_handle.shard_id(),
1286                    Arc::new(persist_desc()),
1287                    Arc::new(UnitSchema::default()),
1288                    Diagnostics {
1289                        shard_name: CATALOG_SHARD_NAME.to_string(),
1290                        handle_purpose: "compact catalog".to_string(),
1291                    },
1292                )
1293                .await
1294                .expect("invalid usage");
1295            let fuel = CATALOG_FORCE_COMPACTION_FUEL.handle(catalog.persist_client.dyncfgs());
1296            let wait = CATALOG_FORCE_COMPACTION_WAIT.handle(catalog.persist_client.dyncfgs());
1297            // We're going to gradually turn this on via dyncfgs. Run it in a task so that it
1298            // doesn't block startup.
1299            let _task = mz_ore::task::spawn(|| "catalog::force_shard_compaction", async move {
1300                let () =
1301                    mz_persist_client::cli::admin::dangerous_force_compaction_and_break_pushdown(
1302                        &write_handle,
1303                        || fuel.get(),
1304                        || wait.get(),
1305                    )
1306                    .await;
1307            });
1308        }
1309
1310        Ok((Box::new(catalog), audit_log_handle))
1311    }
1312
1313    /// Reports if the catalog state has been initialized.
1314    ///
1315    /// NOTE: This is the answer as of the last call to [`PersistHandle::sync`] or [`PersistHandle::sync_to_current_upper`],
1316    /// not necessarily what is currently in persist.
1317    #[mz_ore::instrument]
1318    fn is_initialized_inner(&self) -> bool {
1319        !self.update_applier.configs.is_empty()
1320    }
1321
1322    /// Get the current value of config `key`.
1323    ///
1324    /// Some configs need to be read before the catalog is opened for bootstrapping.
1325    #[mz_ore::instrument]
1326    async fn get_current_config(&mut self, key: &str) -> Result<Option<u64>, DurableCatalogError> {
1327        self.sync_to_current_upper().await?;
1328        Ok(self.update_applier.configs.get(key).cloned())
1329    }
1330
1331    /// Get the user version of this instance.
1332    ///
1333    /// The user version is used to determine if a migration is needed.
1334    #[mz_ore::instrument]
1335    pub(crate) async fn get_user_version(&mut self) -> Result<Option<u64>, DurableCatalogError> {
1336        self.get_current_config(USER_VERSION_KEY).await
1337    }
1338
1339    /// Get the current value of setting `name`.
1340    ///
1341    /// Some settings need to be read before the catalog is opened for bootstrapping.
1342    #[mz_ore::instrument]
1343    async fn get_current_setting(
1344        &mut self,
1345        name: &str,
1346    ) -> Result<Option<String>, DurableCatalogError> {
1347        self.sync_to_current_upper().await?;
1348        Ok(self.update_applier.settings.get(name).cloned())
1349    }
1350
1351    /// Get the catalog content version.
1352    ///
1353    /// The catalog content version is the semantic version of the most recent binary that wrote to
1354    /// the catalog.
1355    #[mz_ore::instrument]
1356    async fn get_catalog_content_version(
1357        &mut self,
1358    ) -> Result<Option<semver::Version>, DurableCatalogError> {
1359        let version = self
1360            .get_current_setting(CATALOG_CONTENT_VERSION_KEY)
1361            .await?;
1362        let version = version.map(|version| version.parse().expect("invalid version persisted"));
1363        Ok(version)
1364    }
1365}
1366
1367#[async_trait]
1368impl OpenableDurableCatalogState for UnopenedPersistCatalogState {
1369    #[mz_ore::instrument]
1370    async fn open_savepoint(
1371        mut self: Box<Self>,
1372        initial_ts: Timestamp,
1373        bootstrap_args: &BootstrapArgs,
1374    ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1375        self.open_inner(Mode::Savepoint, initial_ts, bootstrap_args)
1376            .boxed()
1377            .await
1378    }
1379
1380    #[mz_ore::instrument]
1381    async fn open_read_only(
1382        mut self: Box<Self>,
1383        bootstrap_args: &BootstrapArgs,
1384    ) -> Result<Box<dyn DurableCatalogState>, CatalogError> {
1385        self.open_inner(Mode::Readonly, EpochMillis::MIN.into(), bootstrap_args)
1386            .boxed()
1387            .await
1388            .map(|(catalog, _)| catalog)
1389    }
1390
1391    #[mz_ore::instrument]
1392    async fn open(
1393        mut self: Box<Self>,
1394        initial_ts: Timestamp,
1395        bootstrap_args: &BootstrapArgs,
1396    ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1397        self.open_inner(Mode::Writable, initial_ts, bootstrap_args)
1398            .boxed()
1399            .await
1400    }
1401
1402    #[mz_ore::instrument(level = "debug")]
1403    async fn open_debug(mut self: Box<Self>) -> Result<DebugCatalogState, CatalogError> {
1404        Ok(DebugCatalogState(*self))
1405    }
1406
1407    #[mz_ore::instrument]
1408    async fn is_initialized(&mut self) -> Result<bool, CatalogError> {
1409        self.sync_to_current_upper().await?;
1410        Ok(self.is_initialized_inner())
1411    }
1412
1413    #[mz_ore::instrument]
1414    async fn epoch(&mut self) -> Result<Epoch, CatalogError> {
1415        self.sync_to_current_upper().await?;
1416        self.fenceable_token
1417            .validate()?
1418            .map(|token| token.epoch)
1419            .ok_or(CatalogError::Durable(DurableCatalogError::Uninitialized))
1420    }
1421
1422    #[mz_ore::instrument]
1423    async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError> {
1424        self.sync_to_current_upper().await?;
1425        self.fenceable_token
1426            .token()
1427            .map(|token| token.deploy_generation)
1428            .ok_or(CatalogError::Durable(DurableCatalogError::Uninitialized))
1429    }
1430
1431    #[mz_ore::instrument(level = "debug")]
1432    async fn get_0dt_deployment_max_wait(&mut self) -> Result<Option<Duration>, CatalogError> {
1433        let value = self
1434            .get_current_config(WITH_0DT_DEPLOYMENT_MAX_WAIT)
1435            .await?;
1436        match value {
1437            None => Ok(None),
1438            Some(millis) => Ok(Some(Duration::from_millis(millis))),
1439        }
1440    }
1441
1442    #[mz_ore::instrument(level = "debug")]
1443    async fn get_0dt_deployment_ddl_check_interval(
1444        &mut self,
1445    ) -> Result<Option<Duration>, CatalogError> {
1446        let value = self
1447            .get_current_config(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL)
1448            .await?;
1449        match value {
1450            None => Ok(None),
1451            Some(millis) => Ok(Some(Duration::from_millis(millis))),
1452        }
1453    }
1454
1455    #[mz_ore::instrument(level = "debug")]
1456    async fn get_enable_0dt_deployment_panic_after_timeout(
1457        &mut self,
1458    ) -> Result<Option<bool>, CatalogError> {
1459        let value = self
1460            .get_current_config(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT)
1461            .await?;
1462        match value {
1463            None => Ok(None),
1464            Some(0) => Ok(Some(false)),
1465            Some(1) => Ok(Some(true)),
1466            Some(v) => Err(
1467                DurableCatalogError::from(TryFromProtoError::UnknownEnumVariant(format!(
1468                    "{v} is not a valid boolean value"
1469                )))
1470                .into(),
1471            ),
1472        }
1473    }
1474
1475    #[mz_ore::instrument]
1476    async fn has_system_config_synced_once(&mut self) -> Result<bool, DurableCatalogError> {
1477        self.get_current_config(SYSTEM_CONFIG_SYNCED_KEY)
1478            .await
1479            .map(|value| value.map(|value| value > 0).unwrap_or(false))
1480    }
1481
1482    #[mz_ore::instrument]
1483    async fn trace_unconsolidated(&mut self) -> Result<Trace, CatalogError> {
1484        self.sync_to_current_upper().await?;
1485        if self.is_initialized_inner() {
1486            let snapshot = self.snapshot_unconsolidated().await;
1487            Ok(Trace::from_snapshot(snapshot))
1488        } else {
1489            Err(CatalogError::Durable(DurableCatalogError::Uninitialized))
1490        }
1491    }
1492
1493    #[mz_ore::instrument]
1494    async fn trace_consolidated(&mut self) -> Result<Trace, CatalogError> {
1495        self.sync_to_current_upper().await?;
1496        if self.is_initialized_inner() {
1497            let snapshot = self.current_snapshot().await?;
1498            Ok(Trace::from_snapshot(snapshot))
1499        } else {
1500            Err(CatalogError::Durable(DurableCatalogError::Uninitialized))
1501        }
1502    }
1503
1504    #[mz_ore::instrument(level = "debug")]
1505    async fn expire(self: Box<Self>) {
1506        self.expire().await
1507    }
1508}
1509
1510/// Applies updates for an opened catalog.
1511#[derive(Debug)]
1512struct CatalogStateInner {
1513    /// A trace of all catalog updates that can be consumed by some higher layer.
1514    updates: VecDeque<memory::objects::StateUpdate>,
1515}
1516
1517impl CatalogStateInner {
1518    fn new() -> CatalogStateInner {
1519        CatalogStateInner {
1520            updates: VecDeque::new(),
1521        }
1522    }
1523}
1524
1525impl ApplyUpdate<StateUpdateKind> for CatalogStateInner {
1526    fn apply_update(
1527        &mut self,
1528        update: StateUpdate<StateUpdateKind>,
1529        current_fence_token: &mut FenceableToken,
1530        metrics: &Arc<Metrics>,
1531    ) -> Result<Option<StateUpdate<StateUpdateKind>>, FenceError> {
1532        if let Some(collection_type) = update.kind.collection_type() {
1533            metrics
1534                .collection_entries
1535                .with_label_values(&[&collection_type.to_string()])
1536                .add(update.diff.into_inner());
1537        }
1538
1539        {
1540            let update: Option<memory::objects::StateUpdate> = (&update)
1541                .try_into()
1542                .expect("invalid persisted update: {update:#?}");
1543            if let Some(update) = update {
1544                self.updates.push_back(update);
1545            }
1546        }
1547
1548        match (update.kind, update.diff) {
1549            (StateUpdateKind::AuditLog(_, ()), _) => Ok(None),
1550            // Nothing to due for fence token retractions but wait for the next insertion.
1551            (StateUpdateKind::FenceToken(_), Diff::MINUS_ONE) => Ok(None),
1552            (StateUpdateKind::FenceToken(token), Diff::ONE) => {
1553                current_fence_token.maybe_fence(token)?;
1554                Ok(None)
1555            }
1556            (kind, diff) => Ok(Some(StateUpdate {
1557                kind,
1558                ts: update.ts,
1559                diff,
1560            })),
1561        }
1562    }
1563}
1564
1565/// A durable store of the catalog state using Persist as an implementation. The durable store can
1566/// serve any catalog data and transactionally modify catalog data.
1567///
1568/// Production users should call [`Self::expire`] before dropping a [`PersistCatalogState`]
1569/// so that it can expire its leases. If/when rust gets AsyncDrop, this will be done automatically.
1570type PersistCatalogState = PersistHandle<StateUpdateKind, CatalogStateInner>;
1571
1572#[async_trait]
1573impl ReadOnlyDurableCatalogState for PersistCatalogState {
1574    fn epoch(&self) -> Epoch {
1575        self.fenceable_token
1576            .token()
1577            .expect("opened catalog state must have an epoch")
1578            .epoch
1579    }
1580
1581    fn metrics(&self) -> &Metrics {
1582        &self.metrics
1583    }
1584
1585    #[mz_ore::instrument(level = "debug")]
1586    async fn expire(self: Box<Self>) {
1587        self.expire().await
1588    }
1589
1590    fn is_bootstrap_complete(&self) -> bool {
1591        self.bootstrap_complete
1592    }
1593
1594    async fn get_audit_logs(&mut self) -> Result<Vec<VersionedEvent>, CatalogError> {
1595        self.sync_to_current_upper().await?;
1596        let audit_logs: Vec<_> = self
1597            .persist_snapshot()
1598            .await
1599            .filter_map(
1600                |StateUpdate {
1601                     kind,
1602                     ts: _,
1603                     diff: _,
1604                 }| match kind {
1605                    StateUpdateKind::AuditLog(key, ()) => Some(key),
1606                    _ => None,
1607                },
1608            )
1609            .collect();
1610        let mut audit_logs: Vec<_> = audit_logs
1611            .into_iter()
1612            .map(RustType::from_proto)
1613            .map_ok(|key: AuditLogKey| key.event)
1614            .collect::<Result<_, _>>()?;
1615        audit_logs.sort_by(|a, b| a.sortable_id().cmp(&b.sortable_id()));
1616        Ok(audit_logs)
1617    }
1618
1619    #[mz_ore::instrument(level = "debug")]
1620    async fn get_next_id(&mut self, id_type: &str) -> Result<u64, CatalogError> {
1621        self.with_trace(|trace| {
1622            Ok(trace
1623                .into_iter()
1624                .rev()
1625                .filter_map(|(kind, _, _)| match kind {
1626                    StateUpdateKind::IdAllocator(key, value) if key.name == id_type => {
1627                        Some(value.next_id)
1628                    }
1629                    _ => None,
1630                })
1631                .next()
1632                .expect("must exist"))
1633        })
1634        .await
1635    }
1636
1637    #[mz_ore::instrument(level = "debug")]
1638    async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError> {
1639        self.sync_to_current_upper().await?;
1640        Ok(self
1641            .fenceable_token
1642            .token()
1643            .expect("opened catalogs must have a token")
1644            .deploy_generation)
1645    }
1646
1647    #[mz_ore::instrument(level = "debug")]
1648    async fn snapshot(&mut self) -> Result<Snapshot, CatalogError> {
1649        self.with_snapshot(Ok).await
1650    }
1651
1652    #[mz_ore::instrument(level = "debug")]
1653    async fn sync_to_current_updates(
1654        &mut self,
1655    ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError> {
1656        let upper = self.current_upper().await;
1657        self.sync_updates(upper).await
1658    }
1659
1660    #[mz_ore::instrument(level = "debug")]
1661    async fn sync_updates(
1662        &mut self,
1663        target_upper: mz_repr::Timestamp,
1664    ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError> {
1665        self.sync(target_upper).await?;
1666        let mut updates = Vec::new();
1667        while let Some(update) = self.update_applier.updates.front() {
1668            if update.ts >= target_upper {
1669                break;
1670            }
1671
1672            let update = self
1673                .update_applier
1674                .updates
1675                .pop_front()
1676                .expect("peeked above");
1677            updates.push(update);
1678        }
1679        Ok(updates)
1680    }
1681
1682    async fn current_upper(&mut self) -> Timestamp {
1683        self.current_upper().await
1684    }
1685}
1686
1687#[async_trait]
1688#[allow(mismatched_lifetime_syntaxes)]
1689impl DurableCatalogState for PersistCatalogState {
1690    fn is_read_only(&self) -> bool {
1691        matches!(self.mode, Mode::Readonly)
1692    }
1693
1694    fn is_savepoint(&self) -> bool {
1695        matches!(self.mode, Mode::Savepoint)
1696    }
1697
1698    async fn mark_bootstrap_complete(&mut self) {
1699        self.bootstrap_complete = true;
1700        if matches!(self.mode, Mode::Writable) {
1701            self.since_handle
1702                .upgrade_version()
1703                .await
1704                .expect("invalid usage")
1705        }
1706    }
1707
1708    #[mz_ore::instrument(level = "debug")]
1709    async fn transaction(&mut self) -> Result<Transaction, CatalogError> {
1710        self.metrics.transactions_started.inc();
1711        let snapshot = self.snapshot().await?;
1712        let commit_ts = self.upper.clone();
1713        Transaction::new(self, snapshot, commit_ts)
1714    }
1715
1716    #[mz_ore::instrument(level = "debug")]
1717    async fn commit_transaction(
1718        &mut self,
1719        txn_batch: TransactionBatch,
1720        commit_ts: Timestamp,
1721    ) -> Result<Timestamp, CatalogError> {
1722        async fn commit_transaction_inner(
1723            catalog: &mut PersistCatalogState,
1724            txn_batch: TransactionBatch,
1725            commit_ts: Timestamp,
1726        ) -> Result<Timestamp, CatalogError> {
1727            // If the current upper does not match the transaction's commit timestamp, then the
1728            // catalog must have changed since the transaction was started, making the transaction
1729            // invalid. When/if we want a multi-writer catalog, this will likely have to change
1730            // from an assert to a retry.
1731            assert_eq!(
1732                catalog.upper, txn_batch.upper,
1733                "only one transaction at a time is supported"
1734            );
1735
1736            assert!(
1737                commit_ts >= catalog.upper,
1738                "expected commit ts, {}, to be greater than or equal to upper, {}",
1739                commit_ts,
1740                catalog.upper
1741            );
1742
1743            let updates = StateUpdate::from_txn_batch(txn_batch).collect();
1744            debug!("committing updates: {updates:?}");
1745
1746            let next_upper = match catalog.mode {
1747                Mode::Writable => catalog
1748                    .compare_and_append(updates, commit_ts)
1749                    .await
1750                    .map_err(|e| e.unwrap_fence_error())?,
1751                Mode::Savepoint => {
1752                    let updates = updates.into_iter().map(|(kind, diff)| StateUpdate {
1753                        kind,
1754                        ts: commit_ts,
1755                        diff,
1756                    });
1757                    catalog.apply_updates(updates)?;
1758                    catalog.upper = commit_ts.step_forward();
1759                    catalog.upper
1760                }
1761                Mode::Readonly => {
1762                    // If the transaction is empty then we don't error, even in read-only mode.
1763                    // This is mostly for legacy reasons (i.e. with enough elbow grease this
1764                    // behavior can be changed without breaking any fundamental assumptions).
1765                    if !updates.is_empty() {
1766                        return Err(DurableCatalogError::NotWritable(format!(
1767                            "cannot commit a transaction in a read-only catalog: {updates:#?}"
1768                        ))
1769                        .into());
1770                    }
1771                    catalog.upper
1772                }
1773            };
1774
1775            Ok(next_upper)
1776        }
1777        self.metrics.transaction_commits.inc();
1778        let counter = self.metrics.transaction_commit_latency_seconds.clone();
1779        commit_transaction_inner(self, txn_batch, commit_ts)
1780            .wall_time()
1781            .inc_by(counter)
1782            .await
1783    }
1784
1785    #[mz_ore::instrument(level = "debug")]
1786    async fn confirm_leadership(&mut self) -> Result<(), CatalogError> {
1787        // Read only catalog does not care about leadership.
1788        if self.is_read_only() {
1789            return Ok(());
1790        }
1791        self.sync_to_current_upper().await?;
1792        Ok(())
1793    }
1794
1795    fn shard_id(&self) -> ShardId {
1796        self.shard_id
1797    }
1798}
1799
1800/// Deterministically generate a shard ID for the given `organization_id` and `seed`.
1801pub fn shard_id(organization_id: Uuid, seed: usize) -> ShardId {
1802    let hash = sha2::Sha256::digest(format!("{organization_id}{seed}")).to_vec();
1803    soft_assert_eq_or_log!(hash.len(), 32, "SHA256 returns 32 bytes (256 bits)");
1804    let uuid = Uuid::from_slice(&hash[0..16]).expect("from_slice accepts exactly 16 bytes");
1805    ShardId::from_str(&format!("s{uuid}")).expect("known to be valid")
1806}
1807
1808/// Generates a timestamp for reading from `read_handle` that is as fresh as possible, given
1809/// `upper`.
1810fn as_of(
1811    read_handle: &ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1812    upper: Timestamp,
1813) -> Timestamp {
1814    let since = read_handle.since().clone();
1815    let mut as_of = upper.checked_sub(1).unwrap_or_else(|| {
1816        panic!("catalog persist shard should be initialize, found upper: {upper:?}")
1817    });
1818    // We only downgrade the since after writing, and we always set the since to one less than the
1819    // upper.
1820    soft_assert_or_log!(
1821        since.less_equal(&as_of),
1822        "since={since:?}, as_of={as_of:?}; since must be less than or equal to as_of"
1823    );
1824    // This should be a no-op if the assert above passes, however if it doesn't then we'd like to
1825    // continue with a correct timestamp instead of entering a panic loop.
1826    as_of.advance_by(since.borrow());
1827    as_of
1828}
1829
1830/// Fetch the persist version of the catalog shard, if one exists. A version will not
1831/// exist if we are creating a brand-new environment.
1832async fn fetch_catalog_shard_version(
1833    persist_client: &PersistClient,
1834    catalog_shard_id: ShardId,
1835) -> Option<semver::Version> {
1836    let shard_state = persist_client
1837        .inspect_shard::<Timestamp>(&catalog_shard_id)
1838        .await
1839        .ok()?;
1840    let json_state = serde_json::to_value(shard_state).expect("state serialization error");
1841    let json_version = json_state
1842        .get("applier_version")
1843        .cloned()
1844        .expect("missing applier_version");
1845    let version = serde_json::from_value(json_version).expect("version deserialization error");
1846    Some(version)
1847}
1848
1849/// Generates an iterator of [`StateUpdate`] that contain all updates to the catalog
1850/// state up to, and including, `as_of`.
1851///
1852/// The output is consolidated and sorted by timestamp in ascending order.
1853#[mz_ore::instrument(level = "debug")]
1854async fn snapshot_binary(
1855    read_handle: &mut ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1856    as_of: Timestamp,
1857    metrics: &Arc<Metrics>,
1858) -> impl Iterator<Item = StateUpdate<StateUpdateKindJson>> + DoubleEndedIterator + use<> {
1859    metrics.snapshots_taken.inc();
1860    let counter = metrics.snapshot_latency_seconds.clone();
1861    snapshot_binary_inner(read_handle, as_of)
1862        .wall_time()
1863        .inc_by(counter)
1864        .await
1865}
1866
1867/// Generates an iterator of [`StateUpdate`] that contain all updates to the catalog
1868/// state up to, and including, `as_of`.
1869///
1870/// The output is consolidated and sorted by timestamp in ascending order.
1871#[mz_ore::instrument(level = "debug")]
1872async fn snapshot_binary_inner(
1873    read_handle: &mut ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1874    as_of: Timestamp,
1875) -> impl Iterator<Item = StateUpdate<StateUpdateKindJson>> + DoubleEndedIterator + use<> {
1876    let snapshot = read_handle
1877        .snapshot_and_fetch(Antichain::from_elem(as_of))
1878        .await
1879        .expect("we have advanced the restart_as_of by the since");
1880    soft_assert_no_log!(
1881        snapshot.iter().all(|(_, _, diff)| *diff == 1),
1882        "snapshot_and_fetch guarantees a consolidated result: {snapshot:#?}"
1883    );
1884    snapshot
1885        .into_iter()
1886        .map(Into::<StateUpdate<StateUpdateKindJson>>::into)
1887        .sorted_by(|a, b| Ord::cmp(&b.ts, &a.ts))
1888}
1889
1890/// Convert an [`Antichain<Timestamp>`] to a [`Timestamp`].
1891///
1892/// The correctness of this function relies on [`Timestamp`] being totally ordered and never
1893/// finalizing the catalog shard.
1894pub(crate) fn antichain_to_timestamp(antichain: Antichain<Timestamp>) -> Timestamp {
1895    antichain
1896        .into_option()
1897        .expect("we use a totally ordered time and never finalize the shard")
1898}
1899
1900// Debug methods used by the catalog-debug tool.
1901
1902impl Trace {
1903    /// Generates a [`Trace`] from snapshot.
1904    fn from_snapshot(snapshot: impl IntoIterator<Item = StateUpdate>) -> Trace {
1905        let mut trace = Trace::new();
1906        for StateUpdate { kind, ts, diff } in snapshot {
1907            match kind {
1908                StateUpdateKind::AuditLog(k, v) => trace.audit_log.values.push(((k, v), ts, diff)),
1909                StateUpdateKind::Cluster(k, v) => trace.clusters.values.push(((k, v), ts, diff)),
1910                StateUpdateKind::ClusterReplica(k, v) => {
1911                    trace.cluster_replicas.values.push(((k, v), ts, diff))
1912                }
1913                StateUpdateKind::Comment(k, v) => trace.comments.values.push(((k, v), ts, diff)),
1914                StateUpdateKind::Config(k, v) => trace.configs.values.push(((k, v), ts, diff)),
1915                StateUpdateKind::Database(k, v) => trace.databases.values.push(((k, v), ts, diff)),
1916                StateUpdateKind::DefaultPrivilege(k, v) => {
1917                    trace.default_privileges.values.push(((k, v), ts, diff))
1918                }
1919                StateUpdateKind::FenceToken(_) => {
1920                    // Fence token not included in trace.
1921                }
1922                StateUpdateKind::IdAllocator(k, v) => {
1923                    trace.id_allocator.values.push(((k, v), ts, diff))
1924                }
1925                StateUpdateKind::IntrospectionSourceIndex(k, v) => {
1926                    trace.introspection_sources.values.push(((k, v), ts, diff))
1927                }
1928                StateUpdateKind::Item(k, v) => trace.items.values.push(((k, v), ts, diff)),
1929                StateUpdateKind::NetworkPolicy(k, v) => {
1930                    trace.network_policies.values.push(((k, v), ts, diff))
1931                }
1932                StateUpdateKind::Role(k, v) => trace.roles.values.push(((k, v), ts, diff)),
1933                StateUpdateKind::Schema(k, v) => trace.schemas.values.push(((k, v), ts, diff)),
1934                StateUpdateKind::Setting(k, v) => trace.settings.values.push(((k, v), ts, diff)),
1935                StateUpdateKind::SourceReferences(k, v) => {
1936                    trace.source_references.values.push(((k, v), ts, diff))
1937                }
1938                StateUpdateKind::SystemConfiguration(k, v) => {
1939                    trace.system_configurations.values.push(((k, v), ts, diff))
1940                }
1941                StateUpdateKind::SystemObjectMapping(k, v) => {
1942                    trace.system_object_mappings.values.push(((k, v), ts, diff))
1943                }
1944                StateUpdateKind::SystemPrivilege(k, v) => {
1945                    trace.system_privileges.values.push(((k, v), ts, diff))
1946                }
1947                StateUpdateKind::StorageCollectionMetadata(k, v) => trace
1948                    .storage_collection_metadata
1949                    .values
1950                    .push(((k, v), ts, diff)),
1951                StateUpdateKind::UnfinalizedShard(k, ()) => {
1952                    trace.unfinalized_shards.values.push(((k, ()), ts, diff))
1953                }
1954                StateUpdateKind::TxnWalShard((), v) => {
1955                    trace.txn_wal_shard.values.push((((), v), ts, diff))
1956                }
1957                StateUpdateKind::RoleAuth(k, v) => trace.role_auth.values.push(((k, v), ts, diff)),
1958            }
1959        }
1960        trace
1961    }
1962}
1963
1964impl UnopenedPersistCatalogState {
1965    /// Manually update value of `key` in collection `T` to `value`.
1966    #[mz_ore::instrument]
1967    pub(crate) async fn debug_edit<T: Collection>(
1968        &mut self,
1969        key: T::Key,
1970        value: T::Value,
1971    ) -> Result<Option<T::Value>, CatalogError>
1972    where
1973        T::Key: PartialEq + Eq + Debug + Clone,
1974        T::Value: Debug + Clone,
1975    {
1976        let prev_value = loop {
1977            let key = key.clone();
1978            let value = value.clone();
1979            let snapshot = self.current_snapshot().await?;
1980            let trace = Trace::from_snapshot(snapshot);
1981            let collection_trace = T::collection_trace(trace);
1982            let prev_values: Vec<_> = collection_trace
1983                .values
1984                .into_iter()
1985                .filter(|((k, _), _, diff)| {
1986                    soft_assert_eq_or_log!(*diff, Diff::ONE, "trace is consolidated");
1987                    &key == k
1988                })
1989                .collect();
1990
1991            let prev_value = match &prev_values[..] {
1992                [] => None,
1993                [((_, v), _, _)] => Some(v.clone()),
1994                prev_values => panic!("multiple values found for key {key:?}: {prev_values:?}"),
1995            };
1996
1997            let mut updates: Vec<_> = prev_values
1998                .into_iter()
1999                .map(|((k, v), _, _)| (T::update(k, v), Diff::MINUS_ONE))
2000                .collect();
2001            updates.push((T::update(key, value), Diff::ONE));
2002            // We must fence out all other catalogs, if we haven't already, since we are writing.
2003            match self.fenceable_token.generate_unfenced_token(self.mode)? {
2004                Some((fence_updates, current_fenceable_token)) => {
2005                    updates.extend(fence_updates.clone());
2006                    match self.compare_and_append(updates, self.upper).await {
2007                        Ok(_) => {
2008                            self.fenceable_token = current_fenceable_token;
2009                            break prev_value;
2010                        }
2011                        Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
2012                        Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
2013                            warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
2014                            continue;
2015                        }
2016                    }
2017                }
2018                None => {
2019                    self.compare_and_append(updates, self.upper)
2020                        .await
2021                        .map_err(|e| e.unwrap_fence_error())?;
2022                    break prev_value;
2023                }
2024            }
2025        };
2026        Ok(prev_value)
2027    }
2028
2029    /// Manually delete `key` from collection `T`.
2030    #[mz_ore::instrument]
2031    pub(crate) async fn debug_delete<T: Collection>(
2032        &mut self,
2033        key: T::Key,
2034    ) -> Result<(), CatalogError>
2035    where
2036        T::Key: PartialEq + Eq + Debug + Clone,
2037        T::Value: Debug,
2038    {
2039        loop {
2040            let key = key.clone();
2041            let snapshot = self.current_snapshot().await?;
2042            let trace = Trace::from_snapshot(snapshot);
2043            let collection_trace = T::collection_trace(trace);
2044            let mut retractions: Vec<_> = collection_trace
2045                .values
2046                .into_iter()
2047                .filter(|((k, _), _, diff)| {
2048                    soft_assert_eq_or_log!(*diff, Diff::ONE, "trace is consolidated");
2049                    &key == k
2050                })
2051                .map(|((k, v), _, _)| (T::update(k, v), Diff::MINUS_ONE))
2052                .collect();
2053
2054            // We must fence out all other catalogs, if we haven't already, since we are writing.
2055            match self.fenceable_token.generate_unfenced_token(self.mode)? {
2056                Some((fence_updates, current_fenceable_token)) => {
2057                    retractions.extend(fence_updates.clone());
2058                    match self.compare_and_append(retractions, self.upper).await {
2059                        Ok(_) => {
2060                            self.fenceable_token = current_fenceable_token;
2061                            break;
2062                        }
2063                        Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
2064                        Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
2065                            warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
2066                            continue;
2067                        }
2068                    }
2069                }
2070                None => {
2071                    self.compare_and_append(retractions, self.upper)
2072                        .await
2073                        .map_err(|e| e.unwrap_fence_error())?;
2074                    break;
2075                }
2076            }
2077        }
2078        Ok(())
2079    }
2080
2081    /// Generates a [`Vec<StateUpdate>`] that contain all updates to the catalog
2082    /// state.
2083    ///
2084    /// The output is consolidated and sorted by timestamp in ascending order and the current upper.
2085    async fn current_snapshot(
2086        &mut self,
2087    ) -> Result<impl IntoIterator<Item = StateUpdate> + '_, CatalogError> {
2088        self.sync_to_current_upper().await?;
2089        self.consolidate();
2090        Ok(self.snapshot.iter().cloned().map(|(kind, ts, diff)| {
2091            let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
2092            StateUpdate { kind, ts, diff }
2093        }))
2094    }
2095}