Skip to main content

mz_catalog/durable/
persist.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10#[cfg(test)]
11mod tests;
12
13use std::cmp::max;
14use std::collections::{BTreeMap, VecDeque};
15use std::fmt::Debug;
16use std::str::FromStr;
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use async_trait::async_trait;
21use differential_dataflow::lattice::Lattice;
22use futures::{FutureExt, StreamExt};
23use itertools::Itertools;
24use mz_audit_log::VersionedEvent;
25use mz_ore::metrics::MetricsFutureExt;
26use mz_ore::now::EpochMillis;
27use mz_ore::{
28    soft_assert_eq_no_log, soft_assert_eq_or_log, soft_assert_ne_or_log, soft_assert_no_log,
29    soft_assert_or_log, soft_panic_or_log,
30};
31use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
32use mz_persist_client::cli::admin::{CATALOG_FORCE_COMPACTION_FUEL, CATALOG_FORCE_COMPACTION_WAIT};
33use mz_persist_client::critical::SinceHandle;
34use mz_persist_client::error::UpperMismatch;
35use mz_persist_client::read::{Listen, ListenEvent, ReadHandle};
36use mz_persist_client::write::WriteHandle;
37use mz_persist_client::{Diagnostics, PersistClient, ShardId};
38use mz_persist_types::codec_impls::UnitSchema;
39use mz_proto::{RustType, TryFromProtoError};
40use mz_repr::{Diff, RelationDesc, SqlScalarType};
41use mz_storage_types::StorageDiff;
42use mz_storage_types::sources::SourceData;
43use sha2::Digest;
44use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
45use tracing::{debug, info, warn};
46use uuid::Uuid;
47
48use crate::durable::debug::{Collection, CollectionType, DebugCatalogState, Trace};
49use crate::durable::error::FenceError;
50use crate::durable::initialize::{
51    ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, SYSTEM_CONFIG_SYNCED_KEY, USER_VERSION_KEY,
52    WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
53};
54use crate::durable::metrics::Metrics;
55use crate::durable::objects::state_update::{
56    IntoStateUpdateKindJson, StateUpdate, StateUpdateKind, StateUpdateKindJson,
57    TryIntoStateUpdateKind,
58};
59use crate::durable::objects::{AuditLogKey, FenceToken, Snapshot};
60use crate::durable::transaction::TransactionBatch;
61use crate::durable::upgrade::upgrade;
62use crate::durable::{
63    AuditLogIterator, BootstrapArgs, CATALOG_CONTENT_VERSION_KEY, CatalogError,
64    DurableCatalogError, DurableCatalogState, Epoch, OpenableDurableCatalogState,
65    ReadOnlyDurableCatalogState, Transaction, initialize,
66};
67use crate::memory;
68
69/// New-type used to represent timestamps in persist.
70pub(crate) type Timestamp = mz_repr::Timestamp;
71
72/// The minimum value of an epoch.
73///
74/// # Safety
75/// `new_unchecked` is safe to call with a non-zero value.
76const MIN_EPOCH: Epoch = unsafe { Epoch::new_unchecked(1) };
77
78/// Human readable catalog shard name.
79const CATALOG_SHARD_NAME: &str = "catalog";
80/// Human readable catalog upgrade shard name.
81const UPGRADE_SHARD_NAME: &str = "catalog_upgrade";
82
83/// Seed used to generate the persist shard ID for the catalog.
84const CATALOG_SEED: usize = 1;
85/// Seed used to generate the catalog upgrade shard ID.
86///
87/// All state that gets written to persist is tagged with the version of the code that wrote that
88/// state. Persist has limited forward compatibility in how many versions in the future a reader can
89/// read. Reading from persist updates state and the version that the state is tagged with. As a
90/// consequence, reading from persist may unintentionally fence out other readers and writers with
91/// a lower version. We use the catalog upgrade shard to track what database version is actively
92/// deployed so readers from the future, such as the upgrade checker tool, don't accidentally fence out the
93/// database from persist. Only writable opened catalogs can increment the version in the upgrade
94/// shard.
95///
96/// One specific example that we are trying to avoid with the catalog upgrade shard is the
97/// following:
98///
99///   1. Database is running on version 0.X.0.
100///   2. Upgrade checker is run on version 0.X+1.0.
101///   3. Upgrade checker is run on version 0.X+2.0.
102///
103/// With the catalog upgrade shard, the upgrade checker in step (3) can see that the database is
104/// currently running on v0.X.0 and reading the catalog would cause the database to get fenced out.
105/// So instead of reading the catalog it errors out. Without the catalog upgrade shard, the upgrade
106/// checker could read the version in the catalog shard, and see that it is v0.X+1.0, but it would
107/// be impossible to differentiate between the following two scenarios:
108///
109///   - The database is running on v0.X+1.0 and it's safe to run the upgrade checker at v0.X+2.0.
110///   - Some other upgrade checker incremented the version to v0.X+1.0, the database is running on
111///   version v0.X.0, and it is not safe to run the upgrade checker.
112///
113/// Persist guarantees that the shard versions are non-decreasing, so we don't need to worry about
114/// race conditions where the shard version decreases after reading it.
115const UPGRADE_SEED: usize = 2;
116/// Legacy seed used to generate the persist shard ID for builtin table migrations. DO NOT REUSE.
117pub const _BUILTIN_MIGRATION_SEED: usize = 3;
118/// Legacy seed used to generate the persist shard ID for the expression cache. DO NOT REUSE.
119pub const _EXPRESSION_CACHE_SEED: usize = 4;
120
121/// Durable catalog mode that dictates the effect of mutable operations.
122#[derive(Debug, Copy, Clone, Eq, PartialEq)]
123pub(crate) enum Mode {
124    /// Mutable operations are prohibited.
125    Readonly,
126    /// Mutable operations have an effect in-memory, but aren't persisted durably.
127    Savepoint,
128    /// Mutable operations have an effect in-memory and durably.
129    Writable,
130}
131
132/// Enum representing the fenced state of the catalog.
133#[derive(Debug)]
134pub(crate) enum FenceableToken {
135    /// The catalog is still initializing and learning about previously written fence tokens. This
136    /// state can be fenced if it encounters a larger deploy generation.
137    Initializing {
138        /// The largest fence token durably written to the catalog, if any.
139        durable_token: Option<FenceToken>,
140        /// This process's deploy generation.
141        current_deploy_generation: Option<u64>,
142    },
143    /// The current token has not been fenced.
144    Unfenced { current_token: FenceToken },
145    /// The current token has been fenced.
146    Fenced {
147        current_token: FenceToken,
148        fence_token: FenceToken,
149    },
150}
151
152impl FenceableToken {
153    /// Returns a new token.
154    fn new(current_deploy_generation: Option<u64>) -> Self {
155        Self::Initializing {
156            durable_token: None,
157            current_deploy_generation,
158        }
159    }
160
161    /// Returns the current token if it is not fenced, otherwise returns an error.
162    fn validate(&self) -> Result<Option<FenceToken>, FenceError> {
163        match self {
164            FenceableToken::Initializing { durable_token, .. } => Ok(durable_token.clone()),
165            FenceableToken::Unfenced { current_token, .. } => Ok(Some(current_token.clone())),
166            FenceableToken::Fenced {
167                current_token,
168                fence_token,
169            } => {
170                assert!(
171                    fence_token > current_token,
172                    "must be fenced by higher token; current={current_token:?}, fence={fence_token:?}"
173                );
174                if fence_token.deploy_generation > current_token.deploy_generation {
175                    Err(FenceError::DeployGeneration {
176                        current_generation: current_token.deploy_generation,
177                        fence_generation: fence_token.deploy_generation,
178                    })
179                } else {
180                    assert!(
181                        fence_token.epoch > current_token.epoch,
182                        "must be fenced by higher token; current={current_token:?}, fence={fence_token:?}"
183                    );
184                    Err(FenceError::Epoch {
185                        current_epoch: current_token.epoch,
186                        fence_epoch: fence_token.epoch,
187                    })
188                }
189            }
190        }
191    }
192
193    /// Returns the current token.
194    fn token(&self) -> Option<FenceToken> {
195        match self {
196            FenceableToken::Initializing { durable_token, .. } => durable_token.clone(),
197            FenceableToken::Unfenced { current_token, .. } => Some(current_token.clone()),
198            FenceableToken::Fenced { current_token, .. } => Some(current_token.clone()),
199        }
200    }
201
202    /// Returns `Err` if `token` fences out `self`, `Ok` otherwise.
203    fn maybe_fence(&mut self, token: FenceToken) -> Result<(), FenceError> {
204        match self {
205            FenceableToken::Initializing {
206                durable_token,
207                current_deploy_generation,
208                ..
209            } => {
210                match durable_token {
211                    Some(durable_token) => {
212                        *durable_token = max(durable_token.clone(), token.clone());
213                    }
214                    None => {
215                        *durable_token = Some(token.clone());
216                    }
217                }
218                if let Some(current_deploy_generation) = current_deploy_generation {
219                    if *current_deploy_generation < token.deploy_generation {
220                        *self = FenceableToken::Fenced {
221                            current_token: FenceToken {
222                                deploy_generation: *current_deploy_generation,
223                                epoch: token.epoch,
224                            },
225                            fence_token: token,
226                        };
227                        self.validate()?;
228                    }
229                }
230            }
231            FenceableToken::Unfenced { current_token } => {
232                if *current_token < token {
233                    *self = FenceableToken::Fenced {
234                        current_token: current_token.clone(),
235                        fence_token: token,
236                    };
237                    self.validate()?;
238                }
239            }
240            FenceableToken::Fenced { .. } => {
241                self.validate()?;
242            }
243        }
244
245        Ok(())
246    }
247
248    /// Returns a [`FenceableToken::Unfenced`] token and the updates to the catalog required to
249    /// transition to the `Unfenced` state if `self` is [`FenceableToken::Initializing`], otherwise
250    /// returns `None`.
251    fn generate_unfenced_token(
252        &self,
253        mode: Mode,
254    ) -> Result<Option<(Vec<(StateUpdateKind, Diff)>, FenceableToken)>, DurableCatalogError> {
255        let (durable_token, current_deploy_generation) = match self {
256            FenceableToken::Initializing {
257                durable_token,
258                current_deploy_generation,
259            } => (durable_token.clone(), current_deploy_generation.clone()),
260            FenceableToken::Unfenced { .. } | FenceableToken::Fenced { .. } => return Ok(None),
261        };
262
263        let mut fence_updates = Vec::with_capacity(2);
264
265        if let Some(durable_token) = &durable_token {
266            fence_updates.push((
267                StateUpdateKind::FenceToken(durable_token.clone()),
268                Diff::MINUS_ONE,
269            ));
270        }
271
272        let current_deploy_generation = current_deploy_generation
273            .or_else(|| durable_token.as_ref().map(|token| token.deploy_generation))
274            // We cannot initialize a catalog without a deploy generation.
275            .ok_or(DurableCatalogError::Uninitialized)?;
276        let mut current_epoch = durable_token
277            .map(|token| token.epoch)
278            .unwrap_or(MIN_EPOCH)
279            .get();
280        // Only writable catalogs attempt to increment the epoch.
281        if matches!(mode, Mode::Writable) {
282            current_epoch = current_epoch + 1;
283        }
284        let current_epoch = Epoch::new(current_epoch).expect("known to be non-zero");
285        let current_token = FenceToken {
286            deploy_generation: current_deploy_generation,
287            epoch: current_epoch,
288        };
289
290        fence_updates.push((
291            StateUpdateKind::FenceToken(current_token.clone()),
292            Diff::ONE,
293        ));
294
295        let current_fenceable_token = FenceableToken::Unfenced { current_token };
296
297        Ok(Some((fence_updates, current_fenceable_token)))
298    }
299}
300
301/// An error that can occur while executing [`PersistHandle::compare_and_append`].
302#[derive(Debug, thiserror::Error)]
303pub(crate) enum CompareAndAppendError {
304    #[error(transparent)]
305    Fence(#[from] FenceError),
306    /// Catalog encountered an upper mismatch when trying to write to the catalog. This should only
307    /// happen while trying to fence out other catalogs.
308    #[error(
309        "expected catalog upper {expected_upper:?} did not match actual catalog upper {actual_upper:?}"
310    )]
311    UpperMismatch {
312        expected_upper: Timestamp,
313        actual_upper: Timestamp,
314    },
315}
316
317impl CompareAndAppendError {
318    pub(crate) fn unwrap_fence_error(self) -> FenceError {
319        match self {
320            CompareAndAppendError::Fence(e) => e,
321            e @ CompareAndAppendError::UpperMismatch { .. } => {
322                panic!("unexpected upper mismatch: {e:?}")
323            }
324        }
325    }
326}
327
328impl From<UpperMismatch<Timestamp>> for CompareAndAppendError {
329    fn from(upper_mismatch: UpperMismatch<Timestamp>) -> Self {
330        Self::UpperMismatch {
331            expected_upper: antichain_to_timestamp(upper_mismatch.expected),
332            actual_upper: antichain_to_timestamp(upper_mismatch.current),
333        }
334    }
335}
336
337pub(crate) trait ApplyUpdate<T: IntoStateUpdateKindJson> {
338    /// Process and apply `update`.
339    ///
340    /// Returns `Some` if `update` should be cached in memory and `None` otherwise.
341    fn apply_update(
342        &mut self,
343        update: StateUpdate<T>,
344        current_fence_token: &mut FenceableToken,
345        metrics: &Arc<Metrics>,
346    ) -> Result<Option<StateUpdate<T>>, FenceError>;
347}
348
349/// A handle for interacting with the persist catalog shard.
350///
351/// The catalog shard is used in multiple different contexts, for example pre-open and post-open,
352/// but for all contexts the majority of the durable catalog's behavior is identical. This struct
353/// implements those behaviors that are identical while allowing the user to specify the different
354/// behaviors via generic parameters.
355///
356/// The behavior of the durable catalog can be different along one of two axes. The first is the
357/// format of each individual update, i.e. raw binary, the current protobuf version, previous
358/// protobuf versions, etc. The second axis is what to do with each individual update, for example
359/// before opening we cache all config updates but don't cache them after opening. These behaviors
360/// are customizable via the `T: TryIntoStateUpdateKind` and `U: ApplyUpdate<T>` generic parameters
361/// respectively.
362#[derive(Debug)]
363pub(crate) struct PersistHandle<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> {
364    /// The [`Mode`] that this catalog was opened in.
365    pub(crate) mode: Mode,
366    /// Since handle to control compaction.
367    since_handle: SinceHandle<SourceData, (), Timestamp, StorageDiff, i64>,
368    /// Write handle to persist.
369    write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
370    /// Listener to catalog changes.
371    listen: Listen<SourceData, (), Timestamp, StorageDiff>,
372    /// Handle for connecting to persist.
373    persist_client: PersistClient,
374    /// Catalog shard ID.
375    shard_id: ShardId,
376    /// Cache of the most recent catalog snapshot.
377    ///
378    /// We use a tuple instead of [`StateUpdate`] to make consolidation easier.
379    pub(crate) snapshot: Vec<(T, Timestamp, Diff)>,
380    /// Applies custom processing, filtering, and fencing for each individual update.
381    update_applier: U,
382    /// The current upper of the persist shard.
383    pub(crate) upper: Timestamp,
384    /// The fence token of the catalog, if one exists.
385    fenceable_token: FenceableToken,
386    /// The semantic version of the current binary.
387    catalog_content_version: semver::Version,
388    /// Flag to indicate if bootstrap is complete.
389    bootstrap_complete: bool,
390    /// Metrics for the persist catalog.
391    metrics: Arc<Metrics>,
392}
393
394impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
395    /// Increment the version in the catalog upgrade shard to the code's current version.
396    async fn increment_catalog_upgrade_shard_version(&self, organization_id: Uuid) {
397        let upgrade_shard_id = shard_id(organization_id, UPGRADE_SEED);
398
399        let () = self
400            .persist_client
401            .upgrade_version::<(), (), Timestamp, StorageDiff>(
402                upgrade_shard_id,
403                Diagnostics {
404                    shard_name: UPGRADE_SHARD_NAME.to_string(),
405                    handle_purpose: "durable catalog state upgrade".to_string(),
406                },
407            )
408            .await
409            .expect("invalid usage");
410    }
411
412    /// Fetch the current upper of the catalog state.
413    #[mz_ore::instrument]
414    async fn current_upper(&mut self) -> Timestamp {
415        match self.mode {
416            Mode::Writable | Mode::Readonly => {
417                let upper = self.write_handle.fetch_recent_upper().await;
418                antichain_to_timestamp(upper.clone())
419            }
420            Mode::Savepoint => self.upper,
421        }
422    }
423
424    /// Appends `updates` iff the current global upper of the catalog is `self.upper`.
425    ///
426    /// Returns the next upper used to commit the transaction.
427    #[mz_ore::instrument]
428    pub(crate) async fn compare_and_append<S: IntoStateUpdateKindJson>(
429        &mut self,
430        updates: Vec<(S, Diff)>,
431        commit_ts: Timestamp,
432    ) -> Result<Timestamp, CompareAndAppendError> {
433        assert_eq!(self.mode, Mode::Writable);
434        assert!(
435            commit_ts >= self.upper,
436            "expected commit ts, {}, to be greater than or equal to upper, {}",
437            commit_ts,
438            self.upper
439        );
440
441        // This awkward code allows us to perform an expensive soft assert that requires cloning
442        // `updates` twice, after `updates` has been consumed.
443        let contains_fence = if mz_ore::assert::soft_assertions_enabled() {
444            let updates: Vec<_> = updates.clone();
445            let parsed_updates: Vec<_> = updates
446                .clone()
447                .into_iter()
448                .map(|(update, diff)| {
449                    let update: StateUpdateKindJson = update.into();
450                    (update, diff)
451                })
452                .filter_map(|(update, diff)| {
453                    <StateUpdateKindJson as TryIntoStateUpdateKind>::try_into(update)
454                        .ok()
455                        .map(|update| (update, diff))
456                })
457                .collect();
458            let contains_retraction = parsed_updates.iter().any(|(update, diff)| {
459                matches!(update, StateUpdateKind::FenceToken(..)) && *diff == Diff::MINUS_ONE
460            });
461            let contains_addition = parsed_updates.iter().any(|(update, diff)| {
462                matches!(update, StateUpdateKind::FenceToken(..)) && *diff == Diff::ONE
463            });
464            let contains_fence = contains_retraction && contains_addition;
465            Some((contains_fence, updates))
466        } else {
467            None
468        };
469
470        let updates = updates.into_iter().map(|(kind, diff)| {
471            let kind: StateUpdateKindJson = kind.into();
472            (
473                (Into::<SourceData>::into(kind), ()),
474                commit_ts,
475                diff.into_inner(),
476            )
477        });
478        let next_upper = commit_ts.step_forward();
479        let res = self
480            .write_handle
481            .compare_and_append(
482                updates,
483                Antichain::from_elem(self.upper),
484                Antichain::from_elem(next_upper),
485            )
486            .await
487            .expect("invalid usage");
488
489        // There was an upper mismatch which means something else must have written to the catalog.
490        // Syncing to the current upper should result in a fence error since writing to the catalog
491        // without fencing other catalogs should be impossible. The one exception is if we are
492        // trying to fence other catalogs with this write, in which case we won't see a fence error.
493        if let Err(e @ UpperMismatch { .. }) = res {
494            self.sync_to_current_upper().await?;
495            if let Some((contains_fence, updates)) = contains_fence {
496                assert!(
497                    contains_fence,
498                    "updates were neither fenced nor fencing and encountered an upper mismatch: {updates:#?}"
499                )
500            }
501            return Err(e.into());
502        }
503
504        // Lag the shard's upper by 1 to keep it readable.
505        let downgrade_to = Antichain::from_elem(next_upper.saturating_sub(1));
506
507        // The since handle gives us the ability to fence out other downgraders using an opaque token.
508        // (See the method documentation for details.)
509        // That's not needed here, so we use the since handle's opaque token to avoid any comparison
510        // failures.
511        let opaque = *self.since_handle.opaque();
512        let downgrade = self
513            .since_handle
514            .maybe_compare_and_downgrade_since(&opaque, (&opaque, &downgrade_to))
515            .await;
516
517        match downgrade {
518            None => {}
519            Some(Err(e)) => soft_panic_or_log!("found opaque value {e}, but expected {opaque}"),
520            Some(Ok(updated)) => soft_assert_or_log!(
521                updated == downgrade_to,
522                "updated bound should match expected"
523            ),
524        }
525        self.sync(next_upper).await?;
526        Ok(next_upper)
527    }
528
529    /// Generates an iterator of [`StateUpdate`] that contain all unconsolidated updates to the
530    /// catalog state up to, and including, `as_of`.
531    #[mz_ore::instrument]
532    async fn snapshot_unconsolidated(&mut self) -> Vec<StateUpdate<StateUpdateKind>> {
533        let current_upper = self.current_upper().await;
534
535        let mut snapshot = Vec::new();
536        let mut read_handle = self.read_handle().await;
537        let as_of = as_of(&read_handle, current_upper);
538        let mut stream = Box::pin(
539            // We use `snapshot_and_stream` because it guarantees unconsolidated output.
540            read_handle
541                .snapshot_and_stream(Antichain::from_elem(as_of))
542                .await
543                .expect("we have advanced the restart_as_of by the since"),
544        );
545        while let Some(update) = stream.next().await {
546            snapshot.push(update)
547        }
548        read_handle.expire().await;
549        snapshot
550            .into_iter()
551            .map(Into::<StateUpdate<StateUpdateKindJson>>::into)
552            .map(|state_update| state_update.try_into().expect("kind decoding error"))
553            .collect()
554    }
555
556    /// Listen and apply all updates that are currently in persist.
557    ///
558    /// Returns an error if this instance has been fenced out.
559    #[mz_ore::instrument]
560    pub(crate) async fn sync_to_current_upper(&mut self) -> Result<(), FenceError> {
561        let upper = self.current_upper().await;
562        self.sync(upper).await
563    }
564
565    /// Listen and apply all updates up to `target_upper`.
566    ///
567    /// Returns an error if this instance has been fenced out.
568    #[mz_ore::instrument(level = "debug")]
569    pub(crate) async fn sync(&mut self, target_upper: Timestamp) -> Result<(), FenceError> {
570        self.metrics.syncs.inc();
571        let counter = self.metrics.sync_latency_seconds.clone();
572        self.sync_inner(target_upper)
573            .wall_time()
574            .inc_by(counter)
575            .await
576    }
577
578    #[mz_ore::instrument(level = "debug")]
579    async fn sync_inner(&mut self, target_upper: Timestamp) -> Result<(), FenceError> {
580        self.fenceable_token.validate()?;
581
582        // Savepoint catalogs do not yet know how to update themselves in response to concurrent
583        // writes from writer catalogs.
584        if self.mode == Mode::Savepoint {
585            self.upper = max(self.upper, target_upper);
586            return Ok(());
587        }
588
589        let mut updates: BTreeMap<_, Vec<_>> = BTreeMap::new();
590
591        while self.upper < target_upper {
592            let listen_events = self.listen.fetch_next().await;
593            for listen_event in listen_events {
594                match listen_event {
595                    ListenEvent::Progress(upper) => {
596                        debug!("synced up to {upper:?}");
597                        self.upper = antichain_to_timestamp(upper);
598                        // Attempt to apply updates in batches of a single timestamp. If another
599                        // catalog wrote a fence token at one timestamp and then updates in a new
600                        // format at a later timestamp, then we want to apply the fence token
601                        // before attempting to deserialize the new updates.
602                        while let Some((ts, updates)) = updates.pop_first() {
603                            assert!(ts < self.upper, "expected {} < {}", ts, self.upper);
604                            let updates = updates.into_iter().map(
605                                |update: StateUpdate<StateUpdateKindJson>| {
606                                    let kind =
607                                        T::try_from(update.kind).expect("kind decoding error");
608                                    StateUpdate {
609                                        kind,
610                                        ts: update.ts,
611                                        diff: update.diff,
612                                    }
613                                },
614                            );
615                            self.apply_updates(updates)?;
616                        }
617                    }
618                    ListenEvent::Updates(batch_updates) => {
619                        for update in batch_updates {
620                            let update: StateUpdate<StateUpdateKindJson> = update.into();
621                            updates.entry(update.ts).or_default().push(update);
622                        }
623                    }
624                }
625            }
626        }
627        assert_eq!(updates, BTreeMap::new(), "all updates should be applied");
628        Ok(())
629    }
630
631    #[mz_ore::instrument(level = "debug")]
632    pub(crate) fn apply_updates(
633        &mut self,
634        updates: impl IntoIterator<Item = StateUpdate<T>>,
635    ) -> Result<(), FenceError> {
636        let mut updates: Vec<_> = updates
637            .into_iter()
638            .map(|StateUpdate { kind, ts, diff }| (kind, ts, diff))
639            .collect();
640
641        // This helps guarantee that for a single key, there is at most a single retraction and a
642        // single insertion per timestamp. Otherwise, we would need to match the retractions and
643        // insertions up by value and manually figure out what the end value should be.
644        differential_dataflow::consolidation::consolidate_updates(&mut updates);
645
646        // Updates must be applied in timestamp order. Within a timestamp retractions must be
647        // applied before insertions, or we might end up retracting the wrong value.
648        updates.sort_by(|(_, ts1, diff1), (_, ts2, diff2)| ts1.cmp(ts2).then(diff1.cmp(diff2)));
649
650        let mut errors = Vec::new();
651
652        for (kind, ts, diff) in updates {
653            if diff != Diff::ONE && diff != Diff::MINUS_ONE {
654                panic!("invalid update in consolidated trace: ({kind:?}, {ts:?}, {diff:?})");
655            }
656
657            match self.update_applier.apply_update(
658                StateUpdate { kind, ts, diff },
659                &mut self.fenceable_token,
660                &self.metrics,
661            ) {
662                Ok(Some(StateUpdate { kind, ts, diff })) => self.snapshot.push((kind, ts, diff)),
663                Ok(None) => {}
664                // Instead of returning immediately, we accumulate all the errors and return the one
665                // with the most information.
666                Err(err) => errors.push(err),
667            }
668        }
669
670        errors.sort();
671        if let Some(err) = errors.into_iter().next() {
672            return Err(err);
673        }
674
675        self.consolidate();
676
677        Ok(())
678    }
679
680    #[mz_ore::instrument]
681    pub(crate) fn consolidate(&mut self) {
682        soft_assert_no_log!(
683            self.snapshot
684                .windows(2)
685                .all(|updates| updates[0].1 <= updates[1].1),
686            "snapshot should be sorted by timestamp, {:#?}",
687            self.snapshot
688        );
689
690        let new_ts = self
691            .snapshot
692            .last()
693            .map(|(_, ts, _)| *ts)
694            .unwrap_or_else(Timestamp::minimum);
695        for (_, ts, _) in &mut self.snapshot {
696            *ts = new_ts;
697        }
698        differential_dataflow::consolidation::consolidate_updates(&mut self.snapshot);
699    }
700
701    /// Execute and return the results of `f` on the current catalog trace.
702    ///
703    /// Will return an error if the catalog has been fenced out.
704    async fn with_trace<R>(
705        &mut self,
706        f: impl FnOnce(&Vec<(T, Timestamp, Diff)>) -> Result<R, CatalogError>,
707    ) -> Result<R, CatalogError> {
708        self.sync_to_current_upper().await?;
709        f(&self.snapshot)
710    }
711
712    /// Open a read handle to the catalog.
713    async fn read_handle(&self) -> ReadHandle<SourceData, (), Timestamp, StorageDiff> {
714        self.persist_client
715            .open_leased_reader(
716                self.shard_id,
717                Arc::new(desc()),
718                Arc::new(UnitSchema::default()),
719                Diagnostics {
720                    shard_name: CATALOG_SHARD_NAME.to_string(),
721                    handle_purpose: "openable durable catalog state temporary reader".to_string(),
722                },
723                USE_CRITICAL_SINCE_CATALOG.get(self.persist_client.dyncfgs()),
724            )
725            .await
726            .expect("invalid usage")
727    }
728
729    /// Politely releases all external resources that can only be released in an async context.
730    async fn expire(self: Box<Self>) {
731        self.write_handle.expire().await;
732        self.listen.expire().await;
733    }
734}
735
736impl<U: ApplyUpdate<StateUpdateKind>> PersistHandle<StateUpdateKind, U> {
737    /// Execute and return the results of `f` on the current catalog snapshot.
738    ///
739    /// Will return an error if the catalog has been fenced out.
740    async fn with_snapshot<T>(
741        &mut self,
742        f: impl FnOnce(Snapshot) -> Result<T, CatalogError>,
743    ) -> Result<T, CatalogError> {
744        fn apply<K, V>(map: &mut BTreeMap<K, V>, key: &K, value: &V, diff: Diff)
745        where
746            K: Ord + Clone,
747            V: Ord + Clone + Debug,
748        {
749            let key = key.clone();
750            let value = value.clone();
751            if diff == Diff::ONE {
752                let prev = map.insert(key, value);
753                assert_eq!(
754                    prev, None,
755                    "values must be explicitly retracted before inserting a new value"
756                );
757            } else if diff == Diff::MINUS_ONE {
758                let prev = map.remove(&key);
759                assert_eq!(
760                    prev,
761                    Some(value),
762                    "retraction does not match existing value"
763                );
764            }
765        }
766
767        self.with_trace(|trace| {
768            let mut snapshot = Snapshot::empty();
769            for (kind, ts, diff) in trace {
770                let diff = *diff;
771                if diff != Diff::ONE && diff != Diff::MINUS_ONE {
772                    panic!("invalid update in consolidated trace: ({kind:?}, {ts:?}, {diff:?})");
773                }
774
775                match kind {
776                    StateUpdateKind::AuditLog(_key, ()) => {
777                        // Ignore for snapshots.
778                    }
779                    StateUpdateKind::Cluster(key, value) => {
780                        apply(&mut snapshot.clusters, key, value, diff);
781                    }
782                    StateUpdateKind::ClusterReplica(key, value) => {
783                        apply(&mut snapshot.cluster_replicas, key, value, diff);
784                    }
785                    StateUpdateKind::Comment(key, value) => {
786                        apply(&mut snapshot.comments, key, value, diff);
787                    }
788                    StateUpdateKind::Config(key, value) => {
789                        apply(&mut snapshot.configs, key, value, diff);
790                    }
791                    StateUpdateKind::Database(key, value) => {
792                        apply(&mut snapshot.databases, key, value, diff);
793                    }
794                    StateUpdateKind::DefaultPrivilege(key, value) => {
795                        apply(&mut snapshot.default_privileges, key, value, diff);
796                    }
797                    StateUpdateKind::FenceToken(_token) => {
798                        // Ignore for snapshots.
799                    }
800                    StateUpdateKind::IdAllocator(key, value) => {
801                        apply(&mut snapshot.id_allocator, key, value, diff);
802                    }
803                    StateUpdateKind::IntrospectionSourceIndex(key, value) => {
804                        apply(&mut snapshot.introspection_sources, key, value, diff);
805                    }
806                    StateUpdateKind::Item(key, value) => {
807                        apply(&mut snapshot.items, key, value, diff);
808                    }
809                    StateUpdateKind::NetworkPolicy(key, value) => {
810                        apply(&mut snapshot.network_policies, key, value, diff);
811                    }
812                    StateUpdateKind::Role(key, value) => {
813                        apply(&mut snapshot.roles, key, value, diff);
814                    }
815                    StateUpdateKind::Schema(key, value) => {
816                        apply(&mut snapshot.schemas, key, value, diff);
817                    }
818                    StateUpdateKind::Setting(key, value) => {
819                        apply(&mut snapshot.settings, key, value, diff);
820                    }
821                    StateUpdateKind::SourceReferences(key, value) => {
822                        apply(&mut snapshot.source_references, key, value, diff);
823                    }
824                    StateUpdateKind::SystemConfiguration(key, value) => {
825                        apply(&mut snapshot.system_configurations, key, value, diff);
826                    }
827                    StateUpdateKind::SystemObjectMapping(key, value) => {
828                        apply(&mut snapshot.system_object_mappings, key, value, diff);
829                    }
830                    StateUpdateKind::SystemPrivilege(key, value) => {
831                        apply(&mut snapshot.system_privileges, key, value, diff);
832                    }
833                    StateUpdateKind::StorageCollectionMetadata(key, value) => {
834                        apply(&mut snapshot.storage_collection_metadata, key, value, diff);
835                    }
836                    StateUpdateKind::UnfinalizedShard(key, ()) => {
837                        apply(&mut snapshot.unfinalized_shards, key, &(), diff);
838                    }
839                    StateUpdateKind::TxnWalShard((), value) => {
840                        apply(&mut snapshot.txn_wal_shard, &(), value, diff);
841                    }
842                    StateUpdateKind::RoleAuth(key, value) => {
843                        apply(&mut snapshot.role_auth, key, value, diff);
844                    }
845                }
846            }
847            f(snapshot)
848        })
849        .await
850    }
851
852    /// Generates an iterator of [`StateUpdate`] that contain all updates to the catalog
853    /// state.
854    ///
855    /// The output is fetched directly from persist instead of the in-memory cache.
856    ///
857    /// The output is consolidated and sorted by timestamp in ascending order.
858    #[mz_ore::instrument(level = "debug")]
859    async fn persist_snapshot(&self) -> impl Iterator<Item = StateUpdate> + DoubleEndedIterator {
860        let mut read_handle = self.read_handle().await;
861        let as_of = as_of(&read_handle, self.upper);
862        let snapshot = snapshot_binary(&mut read_handle, as_of, &self.metrics)
863            .await
864            .map(|update| update.try_into().expect("kind decoding error"));
865        read_handle.expire().await;
866        snapshot
867    }
868}
869
870/// Applies updates for an unopened catalog.
871#[derive(Debug)]
872pub(crate) struct UnopenedCatalogStateInner {
873    /// The organization ID of the environment.
874    organization_id: Uuid,
875    /// A cache of the config collection of the catalog.
876    configs: BTreeMap<String, u64>,
877    /// A cache of the settings collection of the catalog.
878    settings: BTreeMap<String, String>,
879}
880
881impl UnopenedCatalogStateInner {
882    fn new(organization_id: Uuid) -> UnopenedCatalogStateInner {
883        UnopenedCatalogStateInner {
884            organization_id,
885            configs: BTreeMap::new(),
886            settings: BTreeMap::new(),
887        }
888    }
889}
890
891impl ApplyUpdate<StateUpdateKindJson> for UnopenedCatalogStateInner {
892    fn apply_update(
893        &mut self,
894        update: StateUpdate<StateUpdateKindJson>,
895        current_fence_token: &mut FenceableToken,
896        _metrics: &Arc<Metrics>,
897    ) -> Result<Option<StateUpdate<StateUpdateKindJson>>, FenceError> {
898        if !update.kind.is_audit_log() && update.kind.is_always_deserializable() {
899            let kind = TryInto::try_into(&update.kind).expect("kind is known to be deserializable");
900            match (kind, update.diff) {
901                (StateUpdateKind::Config(key, value), Diff::ONE) => {
902                    let prev = self.configs.insert(key.key, value.value);
903                    assert_eq!(
904                        prev, None,
905                        "values must be explicitly retracted before inserting a new value"
906                    );
907                }
908                (StateUpdateKind::Config(key, value), Diff::MINUS_ONE) => {
909                    let prev = self.configs.remove(&key.key);
910                    assert_eq!(
911                        prev,
912                        Some(value.value),
913                        "retraction does not match existing value"
914                    );
915                }
916                (StateUpdateKind::Setting(key, value), Diff::ONE) => {
917                    let prev = self.settings.insert(key.name, value.value);
918                    assert_eq!(
919                        prev, None,
920                        "values must be explicitly retracted before inserting a new value"
921                    );
922                }
923                (StateUpdateKind::Setting(key, value), Diff::MINUS_ONE) => {
924                    let prev = self.settings.remove(&key.name);
925                    assert_eq!(
926                        prev,
927                        Some(value.value),
928                        "retraction does not match existing value"
929                    );
930                }
931                (StateUpdateKind::FenceToken(fence_token), Diff::ONE) => {
932                    current_fence_token.maybe_fence(fence_token)?;
933                }
934                _ => {}
935            }
936        }
937
938        Ok(Some(update))
939    }
940}
941
942/// A Handle to an unopened catalog stored in persist. The unopened catalog can serve `Config` data,
943/// `Setting` data, or the current epoch. All other catalog data may be un-migrated and should not
944/// be read until the catalog has been opened. The [`UnopenedPersistCatalogState`] is responsible
945/// for opening the catalog, see [`OpenableDurableCatalogState::open`] for more details.
946///
947/// Production users should call [`Self::expire`] before dropping an [`UnopenedPersistCatalogState`]
948/// so that it can expire its leases. If/when rust gets AsyncDrop, this will be done automatically.
949pub(crate) type UnopenedPersistCatalogState =
950    PersistHandle<StateUpdateKindJson, UnopenedCatalogStateInner>;
951
952impl UnopenedPersistCatalogState {
953    /// Create a new [`UnopenedPersistCatalogState`] to the catalog state associated with
954    /// `organization_id`.
955    ///
956    /// All usages of the persist catalog must go through this function. That includes the
957    /// catalog-debug tool, the adapter's catalog, etc.
958    #[mz_ore::instrument]
959    pub(crate) async fn new(
960        persist_client: PersistClient,
961        organization_id: Uuid,
962        version: semver::Version,
963        deploy_generation: Option<u64>,
964        metrics: Arc<Metrics>,
965    ) -> Result<UnopenedPersistCatalogState, DurableCatalogError> {
966        let catalog_shard_id = shard_id(organization_id, CATALOG_SEED);
967        let upgrade_shard_id = shard_id(organization_id, UPGRADE_SEED);
968        debug!(
969            ?catalog_shard_id,
970            ?upgrade_shard_id,
971            "new persist backed catalog state"
972        );
973
974        // Check the catalog upgrade shard to see ensure that we don't fence anyone out of persist.
975        let version_in_upgrade_shard =
976            fetch_catalog_upgrade_shard_version(&persist_client, upgrade_shard_id).await;
977        // If this is `None`, no version was found in the upgrade shard. This is a brand-new
978        // environment, and we don't need to worry about fencing existing users.
979        if let Some(version_in_upgrade_shard) = version_in_upgrade_shard {
980            // Check that the current version of the code can handle data from the shard.
981            // (We used to reverse this check, to confirm that whatever code wrote the data
982            // in the shard would be able to read data written by the current version... but
983            // we now require the current code to be able to maintain compat with whatever
984            // data format versions pass this check.)
985            if !mz_persist_client::cfg::code_can_write_data(&version, &version_in_upgrade_shard) {
986                return Err(DurableCatalogError::IncompatiblePersistVersion {
987                    found_version: version_in_upgrade_shard,
988                    catalog_version: version,
989                });
990            }
991        }
992
993        let open_handles_start = Instant::now();
994        info!("startup: envd serve: catalog init: open handles beginning");
995        let since_handle = persist_client
996            .open_critical_since(
997                catalog_shard_id,
998                // TODO: We may need to use a different critical reader
999                // id for this if we want to be able to introspect it via SQL.
1000                PersistClient::CONTROLLER_CRITICAL_SINCE,
1001                Diagnostics {
1002                    shard_name: CATALOG_SHARD_NAME.to_string(),
1003                    handle_purpose: "durable catalog state critical since".to_string(),
1004                },
1005            )
1006            .await
1007            .expect("invalid usage");
1008        let (mut write_handle, mut read_handle) = persist_client
1009            .open(
1010                catalog_shard_id,
1011                Arc::new(desc()),
1012                Arc::new(UnitSchema::default()),
1013                Diagnostics {
1014                    shard_name: CATALOG_SHARD_NAME.to_string(),
1015                    handle_purpose: "durable catalog state handles".to_string(),
1016                },
1017                USE_CRITICAL_SINCE_CATALOG.get(persist_client.dyncfgs()),
1018            )
1019            .await
1020            .expect("invalid usage");
1021        info!(
1022            "startup: envd serve: catalog init: open handles complete in {:?}",
1023            open_handles_start.elapsed()
1024        );
1025
1026        // Commit an empty write at the minimum timestamp so the catalog is always readable.
1027        let upper = {
1028            const EMPTY_UPDATES: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
1029            let upper = Antichain::from_elem(Timestamp::minimum());
1030            let next_upper = Timestamp::minimum().step_forward();
1031            match write_handle
1032                .compare_and_append(EMPTY_UPDATES, upper, Antichain::from_elem(next_upper))
1033                .await
1034                .expect("invalid usage")
1035            {
1036                Ok(()) => next_upper,
1037                Err(mismatch) => antichain_to_timestamp(mismatch.current),
1038            }
1039        };
1040
1041        let snapshot_start = Instant::now();
1042        info!("startup: envd serve: catalog init: snapshot beginning");
1043        let as_of = as_of(&read_handle, upper);
1044        let snapshot: Vec<_> = snapshot_binary(&mut read_handle, as_of, &metrics)
1045            .await
1046            .map(|StateUpdate { kind, ts, diff }| (kind, ts, diff))
1047            .collect();
1048        let listen = read_handle
1049            .listen(Antichain::from_elem(as_of))
1050            .await
1051            .expect("invalid usage");
1052        info!(
1053            "startup: envd serve: catalog init: snapshot complete in {:?}",
1054            snapshot_start.elapsed()
1055        );
1056
1057        let mut handle = UnopenedPersistCatalogState {
1058            // Unopened catalogs are always writeable until they're opened in an explicit mode.
1059            mode: Mode::Writable,
1060            since_handle,
1061            write_handle,
1062            listen,
1063            persist_client,
1064            shard_id: catalog_shard_id,
1065            // Initialize empty in-memory state.
1066            snapshot: Vec::new(),
1067            update_applier: UnopenedCatalogStateInner::new(organization_id),
1068            upper,
1069            fenceable_token: FenceableToken::new(deploy_generation),
1070            catalog_content_version: version,
1071            bootstrap_complete: false,
1072            metrics,
1073        };
1074        // If the snapshot is not consolidated, and we see multiple epoch values while applying the
1075        // updates, then we might accidentally fence ourselves out.
1076        soft_assert_no_log!(
1077            snapshot.iter().all(|(_, _, diff)| *diff == Diff::ONE),
1078            "snapshot should be consolidated: {snapshot:#?}"
1079        );
1080
1081        let apply_start = Instant::now();
1082        info!("startup: envd serve: catalog init: apply updates beginning");
1083        let updates = snapshot
1084            .into_iter()
1085            .map(|(kind, ts, diff)| StateUpdate { kind, ts, diff });
1086        handle.apply_updates(updates)?;
1087        info!(
1088            "startup: envd serve: catalog init: apply updates complete in {:?}",
1089            apply_start.elapsed()
1090        );
1091
1092        // Validate that the binary version of the current process is not less than any binary
1093        // version that has written to the catalog.
1094        // This condition is only checked once, right here. If a new process comes along with a
1095        // higher version, it must fence this process out with one of the existing fencing
1096        // mechanisms.
1097        if let Some(found_version) = handle.get_catalog_content_version().await? {
1098            // Use cmp_precedence() to ignore build metadata per SemVer 2.0.0 spec
1099            if handle
1100                .catalog_content_version
1101                .cmp_precedence(&found_version)
1102                == std::cmp::Ordering::Less
1103            {
1104                return Err(DurableCatalogError::IncompatiblePersistVersion {
1105                    found_version,
1106                    catalog_version: handle.catalog_content_version,
1107                });
1108            }
1109        }
1110
1111        Ok(handle)
1112    }
1113
1114    #[mz_ore::instrument]
1115    async fn open_inner(
1116        mut self,
1117        mode: Mode,
1118        initial_ts: Timestamp,
1119        bootstrap_args: &BootstrapArgs,
1120    ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1121        // It would be nice to use `initial_ts` here, but it comes from the system clock, not the
1122        // timestamp oracle.
1123        let mut commit_ts = self.upper;
1124        self.mode = mode;
1125
1126        // Validate the current deploy generation.
1127        match (&self.mode, &self.fenceable_token) {
1128            (_, FenceableToken::Unfenced { .. } | FenceableToken::Fenced { .. }) => {
1129                return Err(DurableCatalogError::Internal(
1130                    "catalog should not have fenced before opening".to_string(),
1131                )
1132                .into());
1133            }
1134            (
1135                Mode::Writable | Mode::Savepoint,
1136                FenceableToken::Initializing {
1137                    current_deploy_generation: None,
1138                    ..
1139                },
1140            ) => {
1141                return Err(DurableCatalogError::Internal(format!(
1142                    "cannot open in mode '{:?}' without a deploy generation",
1143                    self.mode,
1144                ))
1145                .into());
1146            }
1147            _ => {}
1148        }
1149
1150        let read_only = matches!(self.mode, Mode::Readonly);
1151
1152        // Fence out previous catalogs.
1153        loop {
1154            self.sync_to_current_upper().await?;
1155            commit_ts = max(commit_ts, self.upper);
1156            let (fence_updates, current_fenceable_token) = self
1157                .fenceable_token
1158                .generate_unfenced_token(self.mode)?
1159                .ok_or_else(|| {
1160                    DurableCatalogError::Internal(
1161                        "catalog should not have fenced before opening".to_string(),
1162                    )
1163                })?;
1164            debug!(
1165                ?self.upper,
1166                ?self.fenceable_token,
1167                ?current_fenceable_token,
1168                "fencing previous catalogs"
1169            );
1170            if matches!(self.mode, Mode::Writable) {
1171                match self
1172                    .compare_and_append(fence_updates.clone(), commit_ts)
1173                    .await
1174                {
1175                    Ok(upper) => {
1176                        commit_ts = upper;
1177                    }
1178                    Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
1179                    Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
1180                        warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
1181                        continue;
1182                    }
1183                }
1184            }
1185            self.fenceable_token = current_fenceable_token;
1186            break;
1187        }
1188
1189        let is_initialized = self.is_initialized_inner();
1190        if !matches!(self.mode, Mode::Writable) && !is_initialized {
1191            return Err(CatalogError::Durable(DurableCatalogError::NotWritable(
1192                format!(
1193                    "catalog tables do not exist; will not create in {:?} mode",
1194                    self.mode
1195                ),
1196            )));
1197        }
1198        soft_assert_ne_or_log!(self.upper, Timestamp::minimum());
1199
1200        // Remove all audit log entries.
1201        let (audit_logs, snapshot): (Vec<_>, Vec<_>) = self
1202            .snapshot
1203            .into_iter()
1204            .partition(|(update, _, _)| update.is_audit_log());
1205        self.snapshot = snapshot;
1206        let audit_log_count = audit_logs.iter().map(|(_, _, diff)| diff).sum::<Diff>();
1207        let audit_log_handle = AuditLogIterator::new(audit_logs);
1208
1209        // Perform data migrations.
1210        if is_initialized && !read_only {
1211            commit_ts = upgrade(&mut self, commit_ts).await?;
1212        }
1213
1214        debug!(
1215            ?is_initialized,
1216            ?self.upper,
1217            "initializing catalog state"
1218        );
1219        let mut catalog = PersistCatalogState {
1220            mode: self.mode,
1221            since_handle: self.since_handle,
1222            write_handle: self.write_handle,
1223            listen: self.listen,
1224            persist_client: self.persist_client,
1225            shard_id: self.shard_id,
1226            upper: self.upper,
1227            fenceable_token: self.fenceable_token,
1228            // Initialize empty in-memory state.
1229            snapshot: Vec::new(),
1230            update_applier: CatalogStateInner::new(),
1231            catalog_content_version: self.catalog_content_version,
1232            bootstrap_complete: false,
1233            metrics: self.metrics,
1234        };
1235        catalog.metrics.collection_entries.reset();
1236        // Normally, `collection_entries` is updated in `apply_updates`. The audit log updates skip
1237        // over that function so we manually update it here.
1238        catalog
1239            .metrics
1240            .collection_entries
1241            .with_label_values(&[&CollectionType::AuditLog.to_string()])
1242            .add(audit_log_count.into_inner());
1243        let updates = self.snapshot.into_iter().map(|(kind, ts, diff)| {
1244            let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
1245            StateUpdate { kind, ts, diff }
1246        });
1247        catalog.apply_updates(updates)?;
1248
1249        let catalog_content_version = catalog.catalog_content_version.to_string();
1250        let txn = if is_initialized {
1251            let mut txn = catalog.transaction().await?;
1252
1253            // Ad-hoc migration: Initialize the `migration_version` expected by adapter to be
1254            // present in existing catalogs.
1255            //
1256            // Note: Need to exclude read-only catalog mode here, because in that mode all
1257            // transactions are expected to be no-ops.
1258            // TODO: remove this once we only support upgrades from version >= 0.164
1259            if txn.get_setting("migration_version".into()).is_none() && mode != Mode::Readonly {
1260                let old_version = txn.get_catalog_content_version();
1261                txn.set_setting("migration_version".into(), old_version.map(Into::into))?;
1262            }
1263
1264            txn.set_catalog_content_version(catalog_content_version)?;
1265            txn
1266        } else {
1267            soft_assert_eq_no_log!(
1268                catalog
1269                    .snapshot
1270                    .iter()
1271                    .filter(|(kind, _, _)| !matches!(kind, StateUpdateKind::FenceToken(_)))
1272                    .count(),
1273                0,
1274                "trace should not contain any updates for an uninitialized catalog: {:#?}",
1275                catalog.snapshot
1276            );
1277
1278            let mut txn = catalog.transaction().await?;
1279            initialize::initialize(
1280                &mut txn,
1281                bootstrap_args,
1282                initial_ts.into(),
1283                catalog_content_version,
1284            )
1285            .await?;
1286            txn
1287        };
1288
1289        if read_only {
1290            let (txn_batch, _) = txn.into_parts();
1291            // The upper here doesn't matter because we are only applying the updates in memory.
1292            let updates = StateUpdate::from_txn_batch_ts(txn_batch, catalog.upper);
1293            catalog.apply_updates(updates)?;
1294        } else {
1295            txn.commit_internal(commit_ts).await?;
1296        }
1297
1298        // Now that we've fully opened the catalog at the current version, we can increment the
1299        // version in the catalog upgrade shard to signal to readers that the allowable versions
1300        // have increased.
1301        if matches!(catalog.mode, Mode::Writable) {
1302            catalog
1303                .increment_catalog_upgrade_shard_version(self.update_applier.organization_id)
1304                .await;
1305
1306            let write_handle = catalog
1307                .persist_client
1308                .open_writer::<SourceData, (), Timestamp, i64>(
1309                    catalog.write_handle.shard_id(),
1310                    Arc::new(desc()),
1311                    Arc::new(UnitSchema::default()),
1312                    Diagnostics {
1313                        shard_name: CATALOG_SHARD_NAME.to_string(),
1314                        handle_purpose: "compact catalog".to_string(),
1315                    },
1316                )
1317                .await
1318                .expect("invalid usage");
1319            let fuel = CATALOG_FORCE_COMPACTION_FUEL.handle(catalog.persist_client.dyncfgs());
1320            let wait = CATALOG_FORCE_COMPACTION_WAIT.handle(catalog.persist_client.dyncfgs());
1321            // We're going to gradually turn this on via dyncfgs. Run it in a task so that it
1322            // doesn't block startup.
1323            let _task = mz_ore::task::spawn(|| "catalog::force_shard_compaction", async move {
1324                let () =
1325                    mz_persist_client::cli::admin::dangerous_force_compaction_and_break_pushdown(
1326                        &write_handle,
1327                        || fuel.get(),
1328                        || wait.get(),
1329                    )
1330                    .await;
1331            });
1332        }
1333
1334        Ok((Box::new(catalog), audit_log_handle))
1335    }
1336
1337    /// Reports if the catalog state has been initialized.
1338    ///
1339    /// NOTE: This is the answer as of the last call to [`PersistHandle::sync`] or [`PersistHandle::sync_to_current_upper`],
1340    /// not necessarily what is currently in persist.
1341    #[mz_ore::instrument]
1342    fn is_initialized_inner(&self) -> bool {
1343        !self.update_applier.configs.is_empty()
1344    }
1345
1346    /// Get the current value of config `key`.
1347    ///
1348    /// Some configs need to be read before the catalog is opened for bootstrapping.
1349    #[mz_ore::instrument]
1350    async fn get_current_config(&mut self, key: &str) -> Result<Option<u64>, DurableCatalogError> {
1351        self.sync_to_current_upper().await?;
1352        Ok(self.update_applier.configs.get(key).cloned())
1353    }
1354
1355    /// Get the user version of this instance.
1356    ///
1357    /// The user version is used to determine if a migration is needed.
1358    #[mz_ore::instrument]
1359    pub(crate) async fn get_user_version(&mut self) -> Result<Option<u64>, DurableCatalogError> {
1360        self.get_current_config(USER_VERSION_KEY).await
1361    }
1362
1363    /// Get the current value of setting `name`.
1364    ///
1365    /// Some settings need to be read before the catalog is opened for bootstrapping.
1366    #[mz_ore::instrument]
1367    async fn get_current_setting(
1368        &mut self,
1369        name: &str,
1370    ) -> Result<Option<String>, DurableCatalogError> {
1371        self.sync_to_current_upper().await?;
1372        Ok(self.update_applier.settings.get(name).cloned())
1373    }
1374
1375    /// Get the catalog content version.
1376    ///
1377    /// The catalog content version is the semantic version of the most recent binary that wrote to
1378    /// the catalog.
1379    #[mz_ore::instrument]
1380    async fn get_catalog_content_version(
1381        &mut self,
1382    ) -> Result<Option<semver::Version>, DurableCatalogError> {
1383        let version = self
1384            .get_current_setting(CATALOG_CONTENT_VERSION_KEY)
1385            .await?;
1386        let version = version.map(|version| version.parse().expect("invalid version persisted"));
1387        Ok(version)
1388    }
1389}
1390
1391#[async_trait]
1392impl OpenableDurableCatalogState for UnopenedPersistCatalogState {
1393    #[mz_ore::instrument]
1394    async fn open_savepoint(
1395        mut self: Box<Self>,
1396        initial_ts: Timestamp,
1397        bootstrap_args: &BootstrapArgs,
1398    ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1399        self.open_inner(Mode::Savepoint, initial_ts, bootstrap_args)
1400            .boxed()
1401            .await
1402    }
1403
1404    #[mz_ore::instrument]
1405    async fn open_read_only(
1406        mut self: Box<Self>,
1407        bootstrap_args: &BootstrapArgs,
1408    ) -> Result<Box<dyn DurableCatalogState>, CatalogError> {
1409        self.open_inner(Mode::Readonly, EpochMillis::MIN.into(), bootstrap_args)
1410            .boxed()
1411            .await
1412            .map(|(catalog, _)| catalog)
1413    }
1414
1415    #[mz_ore::instrument]
1416    async fn open(
1417        mut self: Box<Self>,
1418        initial_ts: Timestamp,
1419        bootstrap_args: &BootstrapArgs,
1420    ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1421        self.open_inner(Mode::Writable, initial_ts, bootstrap_args)
1422            .boxed()
1423            .await
1424    }
1425
1426    #[mz_ore::instrument(level = "debug")]
1427    async fn open_debug(mut self: Box<Self>) -> Result<DebugCatalogState, CatalogError> {
1428        Ok(DebugCatalogState(*self))
1429    }
1430
1431    #[mz_ore::instrument]
1432    async fn is_initialized(&mut self) -> Result<bool, CatalogError> {
1433        self.sync_to_current_upper().await?;
1434        Ok(self.is_initialized_inner())
1435    }
1436
1437    #[mz_ore::instrument]
1438    async fn epoch(&mut self) -> Result<Epoch, CatalogError> {
1439        self.sync_to_current_upper().await?;
1440        self.fenceable_token
1441            .validate()?
1442            .map(|token| token.epoch)
1443            .ok_or(CatalogError::Durable(DurableCatalogError::Uninitialized))
1444    }
1445
1446    #[mz_ore::instrument]
1447    async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError> {
1448        self.sync_to_current_upper().await?;
1449        self.fenceable_token
1450            .token()
1451            .map(|token| token.deploy_generation)
1452            .ok_or(CatalogError::Durable(DurableCatalogError::Uninitialized))
1453    }
1454
1455    #[mz_ore::instrument(level = "debug")]
1456    async fn get_0dt_deployment_max_wait(&mut self) -> Result<Option<Duration>, CatalogError> {
1457        let value = self
1458            .get_current_config(WITH_0DT_DEPLOYMENT_MAX_WAIT)
1459            .await?;
1460        match value {
1461            None => Ok(None),
1462            Some(millis) => Ok(Some(Duration::from_millis(millis))),
1463        }
1464    }
1465
1466    #[mz_ore::instrument(level = "debug")]
1467    async fn get_0dt_deployment_ddl_check_interval(
1468        &mut self,
1469    ) -> Result<Option<Duration>, CatalogError> {
1470        let value = self
1471            .get_current_config(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL)
1472            .await?;
1473        match value {
1474            None => Ok(None),
1475            Some(millis) => Ok(Some(Duration::from_millis(millis))),
1476        }
1477    }
1478
1479    #[mz_ore::instrument(level = "debug")]
1480    async fn get_enable_0dt_deployment_panic_after_timeout(
1481        &mut self,
1482    ) -> Result<Option<bool>, CatalogError> {
1483        let value = self
1484            .get_current_config(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT)
1485            .await?;
1486        match value {
1487            None => Ok(None),
1488            Some(0) => Ok(Some(false)),
1489            Some(1) => Ok(Some(true)),
1490            Some(v) => Err(
1491                DurableCatalogError::from(TryFromProtoError::UnknownEnumVariant(format!(
1492                    "{v} is not a valid boolean value"
1493                )))
1494                .into(),
1495            ),
1496        }
1497    }
1498
1499    #[mz_ore::instrument]
1500    async fn has_system_config_synced_once(&mut self) -> Result<bool, DurableCatalogError> {
1501        self.get_current_config(SYSTEM_CONFIG_SYNCED_KEY)
1502            .await
1503            .map(|value| value.map(|value| value > 0).unwrap_or(false))
1504    }
1505
1506    #[mz_ore::instrument]
1507    async fn trace_unconsolidated(&mut self) -> Result<Trace, CatalogError> {
1508        self.sync_to_current_upper().await?;
1509        if self.is_initialized_inner() {
1510            let snapshot = self.snapshot_unconsolidated().await;
1511            Ok(Trace::from_snapshot(snapshot))
1512        } else {
1513            Err(CatalogError::Durable(DurableCatalogError::Uninitialized))
1514        }
1515    }
1516
1517    #[mz_ore::instrument]
1518    async fn trace_consolidated(&mut self) -> Result<Trace, CatalogError> {
1519        self.sync_to_current_upper().await?;
1520        if self.is_initialized_inner() {
1521            let snapshot = self.current_snapshot().await?;
1522            Ok(Trace::from_snapshot(snapshot))
1523        } else {
1524            Err(CatalogError::Durable(DurableCatalogError::Uninitialized))
1525        }
1526    }
1527
1528    #[mz_ore::instrument(level = "debug")]
1529    async fn expire(self: Box<Self>) {
1530        self.expire().await
1531    }
1532}
1533
1534/// Applies updates for an opened catalog.
1535#[derive(Debug)]
1536struct CatalogStateInner {
1537    /// A trace of all catalog updates that can be consumed by some higher layer.
1538    updates: VecDeque<memory::objects::StateUpdate>,
1539}
1540
1541impl CatalogStateInner {
1542    fn new() -> CatalogStateInner {
1543        CatalogStateInner {
1544            updates: VecDeque::new(),
1545        }
1546    }
1547}
1548
1549impl ApplyUpdate<StateUpdateKind> for CatalogStateInner {
1550    fn apply_update(
1551        &mut self,
1552        update: StateUpdate<StateUpdateKind>,
1553        current_fence_token: &mut FenceableToken,
1554        metrics: &Arc<Metrics>,
1555    ) -> Result<Option<StateUpdate<StateUpdateKind>>, FenceError> {
1556        if let Some(collection_type) = update.kind.collection_type() {
1557            metrics
1558                .collection_entries
1559                .with_label_values(&[&collection_type.to_string()])
1560                .add(update.diff.into_inner());
1561        }
1562
1563        {
1564            let update: Option<memory::objects::StateUpdate> = (&update)
1565                .try_into()
1566                .expect("invalid persisted update: {update:#?}");
1567            if let Some(update) = update {
1568                self.updates.push_back(update);
1569            }
1570        }
1571
1572        match (update.kind, update.diff) {
1573            (StateUpdateKind::AuditLog(_, ()), _) => Ok(None),
1574            // Nothing to due for fence token retractions but wait for the next insertion.
1575            (StateUpdateKind::FenceToken(_), Diff::MINUS_ONE) => Ok(None),
1576            (StateUpdateKind::FenceToken(token), Diff::ONE) => {
1577                current_fence_token.maybe_fence(token)?;
1578                Ok(None)
1579            }
1580            (kind, diff) => Ok(Some(StateUpdate {
1581                kind,
1582                ts: update.ts,
1583                diff,
1584            })),
1585        }
1586    }
1587}
1588
1589/// A durable store of the catalog state using Persist as an implementation. The durable store can
1590/// serve any catalog data and transactionally modify catalog data.
1591///
1592/// Production users should call [`Self::expire`] before dropping a [`PersistCatalogState`]
1593/// so that it can expire its leases. If/when rust gets AsyncDrop, this will be done automatically.
1594type PersistCatalogState = PersistHandle<StateUpdateKind, CatalogStateInner>;
1595
1596#[async_trait]
1597impl ReadOnlyDurableCatalogState for PersistCatalogState {
1598    fn epoch(&self) -> Epoch {
1599        self.fenceable_token
1600            .token()
1601            .expect("opened catalog state must have an epoch")
1602            .epoch
1603    }
1604
1605    fn metrics(&self) -> &Metrics {
1606        &self.metrics
1607    }
1608
1609    #[mz_ore::instrument(level = "debug")]
1610    async fn expire(self: Box<Self>) {
1611        self.expire().await
1612    }
1613
1614    fn is_bootstrap_complete(&self) -> bool {
1615        self.bootstrap_complete
1616    }
1617
1618    async fn get_audit_logs(&mut self) -> Result<Vec<VersionedEvent>, CatalogError> {
1619        self.sync_to_current_upper().await?;
1620        let audit_logs: Vec<_> = self
1621            .persist_snapshot()
1622            .await
1623            .filter_map(
1624                |StateUpdate {
1625                     kind,
1626                     ts: _,
1627                     diff: _,
1628                 }| match kind {
1629                    StateUpdateKind::AuditLog(key, ()) => Some(key),
1630                    _ => None,
1631                },
1632            )
1633            .collect();
1634        let mut audit_logs: Vec<_> = audit_logs
1635            .into_iter()
1636            .map(RustType::from_proto)
1637            .map_ok(|key: AuditLogKey| key.event)
1638            .collect::<Result<_, _>>()?;
1639        audit_logs.sort_by(|a, b| a.sortable_id().cmp(&b.sortable_id()));
1640        Ok(audit_logs)
1641    }
1642
1643    #[mz_ore::instrument(level = "debug")]
1644    async fn get_next_id(&mut self, id_type: &str) -> Result<u64, CatalogError> {
1645        self.with_trace(|trace| {
1646            Ok(trace
1647                .into_iter()
1648                .rev()
1649                .filter_map(|(kind, _, _)| match kind {
1650                    StateUpdateKind::IdAllocator(key, value) if key.name == id_type => {
1651                        Some(value.next_id)
1652                    }
1653                    _ => None,
1654                })
1655                .next()
1656                .expect("must exist"))
1657        })
1658        .await
1659    }
1660
1661    #[mz_ore::instrument(level = "debug")]
1662    async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError> {
1663        self.sync_to_current_upper().await?;
1664        Ok(self
1665            .fenceable_token
1666            .token()
1667            .expect("opened catalogs must have a token")
1668            .deploy_generation)
1669    }
1670
1671    #[mz_ore::instrument(level = "debug")]
1672    async fn snapshot(&mut self) -> Result<Snapshot, CatalogError> {
1673        self.with_snapshot(Ok).await
1674    }
1675
1676    #[mz_ore::instrument(level = "debug")]
1677    async fn sync_to_current_updates(
1678        &mut self,
1679    ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError> {
1680        let upper = self.current_upper().await;
1681        self.sync_updates(upper).await
1682    }
1683
1684    #[mz_ore::instrument(level = "debug")]
1685    async fn sync_updates(
1686        &mut self,
1687        target_upper: mz_repr::Timestamp,
1688    ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError> {
1689        self.sync(target_upper).await?;
1690        let mut updates = Vec::new();
1691        while let Some(update) = self.update_applier.updates.front() {
1692            if update.ts >= target_upper {
1693                break;
1694            }
1695
1696            let update = self
1697                .update_applier
1698                .updates
1699                .pop_front()
1700                .expect("peeked above");
1701            updates.push(update);
1702        }
1703        Ok(updates)
1704    }
1705
1706    async fn current_upper(&mut self) -> Timestamp {
1707        self.current_upper().await
1708    }
1709}
1710
1711#[async_trait]
1712#[allow(mismatched_lifetime_syntaxes)]
1713impl DurableCatalogState for PersistCatalogState {
1714    fn is_read_only(&self) -> bool {
1715        matches!(self.mode, Mode::Readonly)
1716    }
1717
1718    fn is_savepoint(&self) -> bool {
1719        matches!(self.mode, Mode::Savepoint)
1720    }
1721
1722    async fn mark_bootstrap_complete(&mut self) {
1723        self.bootstrap_complete = true;
1724        if matches!(self.mode, Mode::Writable) {
1725            self.since_handle
1726                .upgrade_version()
1727                .await
1728                .expect("invalid usage")
1729        }
1730    }
1731
1732    #[mz_ore::instrument(level = "debug")]
1733    async fn transaction(&mut self) -> Result<Transaction, CatalogError> {
1734        self.metrics.transactions_started.inc();
1735        let snapshot = self.snapshot().await?;
1736        let commit_ts = self.upper.clone();
1737        Transaction::new(self, snapshot, commit_ts)
1738    }
1739
1740    #[mz_ore::instrument(level = "debug")]
1741    async fn commit_transaction(
1742        &mut self,
1743        txn_batch: TransactionBatch,
1744        commit_ts: Timestamp,
1745    ) -> Result<Timestamp, CatalogError> {
1746        async fn commit_transaction_inner(
1747            catalog: &mut PersistCatalogState,
1748            txn_batch: TransactionBatch,
1749            commit_ts: Timestamp,
1750        ) -> Result<Timestamp, CatalogError> {
1751            // If the current upper does not match the transaction's commit timestamp, then the
1752            // catalog must have changed since the transaction was started, making the transaction
1753            // invalid. When/if we want a multi-writer catalog, this will likely have to change
1754            // from an assert to a retry.
1755            assert_eq!(
1756                catalog.upper, txn_batch.upper,
1757                "only one transaction at a time is supported"
1758            );
1759
1760            assert!(
1761                commit_ts >= catalog.upper,
1762                "expected commit ts, {}, to be greater than or equal to upper, {}",
1763                commit_ts,
1764                catalog.upper
1765            );
1766
1767            let updates = StateUpdate::from_txn_batch(txn_batch).collect();
1768            debug!("committing updates: {updates:?}");
1769
1770            let next_upper = match catalog.mode {
1771                Mode::Writable => catalog
1772                    .compare_and_append(updates, commit_ts)
1773                    .await
1774                    .map_err(|e| e.unwrap_fence_error())?,
1775                Mode::Savepoint => {
1776                    let updates = updates.into_iter().map(|(kind, diff)| StateUpdate {
1777                        kind,
1778                        ts: commit_ts,
1779                        diff,
1780                    });
1781                    catalog.apply_updates(updates)?;
1782                    catalog.upper = commit_ts.step_forward();
1783                    catalog.upper
1784                }
1785                Mode::Readonly => {
1786                    // If the transaction is empty then we don't error, even in read-only mode.
1787                    // This is mostly for legacy reasons (i.e. with enough elbow grease this
1788                    // behavior can be changed without breaking any fundamental assumptions).
1789                    if !updates.is_empty() {
1790                        return Err(DurableCatalogError::NotWritable(format!(
1791                            "cannot commit a transaction in a read-only catalog: {updates:#?}"
1792                        ))
1793                        .into());
1794                    }
1795                    catalog.upper
1796                }
1797            };
1798
1799            Ok(next_upper)
1800        }
1801        self.metrics.transaction_commits.inc();
1802        let counter = self.metrics.transaction_commit_latency_seconds.clone();
1803        commit_transaction_inner(self, txn_batch, commit_ts)
1804            .wall_time()
1805            .inc_by(counter)
1806            .await
1807    }
1808
1809    #[mz_ore::instrument(level = "debug")]
1810    async fn confirm_leadership(&mut self) -> Result<(), CatalogError> {
1811        // Read only catalog does not care about leadership.
1812        if self.is_read_only() {
1813            return Ok(());
1814        }
1815        self.sync_to_current_upper().await?;
1816        Ok(())
1817    }
1818}
1819
1820/// Deterministically generate a shard ID for the given `organization_id` and `seed`.
1821pub fn shard_id(organization_id: Uuid, seed: usize) -> ShardId {
1822    let hash = sha2::Sha256::digest(format!("{organization_id}{seed}")).to_vec();
1823    soft_assert_eq_or_log!(hash.len(), 32, "SHA256 returns 32 bytes (256 bits)");
1824    let uuid = Uuid::from_slice(&hash[0..16]).expect("from_slice accepts exactly 16 bytes");
1825    ShardId::from_str(&format!("s{uuid}")).expect("known to be valid")
1826}
1827
1828/// Returns the schema of the `Row`s/`SourceData`s stored in the persist
1829/// shard backing the catalog.
1830fn desc() -> RelationDesc {
1831    RelationDesc::builder()
1832        .with_column("data", SqlScalarType::Jsonb.nullable(false))
1833        .finish()
1834}
1835
1836/// Generates a timestamp for reading from `read_handle` that is as fresh as possible, given
1837/// `upper`.
1838fn as_of(
1839    read_handle: &ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1840    upper: Timestamp,
1841) -> Timestamp {
1842    let since = read_handle.since().clone();
1843    let mut as_of = upper.checked_sub(1).unwrap_or_else(|| {
1844        panic!("catalog persist shard should be initialize, found upper: {upper:?}")
1845    });
1846    // We only downgrade the since after writing, and we always set the since to one less than the
1847    // upper.
1848    soft_assert_or_log!(
1849        since.less_equal(&as_of),
1850        "since={since:?}, as_of={as_of:?}; since must be less than or equal to as_of"
1851    );
1852    // This should be a no-op if the assert above passes, however if it doesn't then we'd like to
1853    // continue with a correct timestamp instead of entering a panic loop.
1854    as_of.advance_by(since.borrow());
1855    as_of
1856}
1857
1858/// Fetch the persist version of the catalog upgrade shard, if one exists. A version will not
1859/// exist if we are creating a brand-new environment.
1860async fn fetch_catalog_upgrade_shard_version(
1861    persist_client: &PersistClient,
1862    upgrade_shard_id: ShardId,
1863) -> Option<semver::Version> {
1864    let shard_state = persist_client
1865        .inspect_shard::<Timestamp>(&upgrade_shard_id)
1866        .await
1867        .ok()?;
1868    let json_state = serde_json::to_value(shard_state).expect("state serialization error");
1869    let upgrade_version = json_state
1870        .get("applier_version")
1871        .cloned()
1872        .expect("missing applier_version");
1873    let upgrade_version =
1874        serde_json::from_value(upgrade_version).expect("version deserialization error");
1875    Some(upgrade_version)
1876}
1877
1878/// Generates an iterator of [`StateUpdate`] that contain all updates to the catalog
1879/// state up to, and including, `as_of`.
1880///
1881/// The output is consolidated and sorted by timestamp in ascending order.
1882#[mz_ore::instrument(level = "debug")]
1883async fn snapshot_binary(
1884    read_handle: &mut ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1885    as_of: Timestamp,
1886    metrics: &Arc<Metrics>,
1887) -> impl Iterator<Item = StateUpdate<StateUpdateKindJson>> + DoubleEndedIterator + use<> {
1888    metrics.snapshots_taken.inc();
1889    let counter = metrics.snapshot_latency_seconds.clone();
1890    snapshot_binary_inner(read_handle, as_of)
1891        .wall_time()
1892        .inc_by(counter)
1893        .await
1894}
1895
1896/// Generates an iterator of [`StateUpdate`] that contain all updates to the catalog
1897/// state up to, and including, `as_of`.
1898///
1899/// The output is consolidated and sorted by timestamp in ascending order.
1900#[mz_ore::instrument(level = "debug")]
1901async fn snapshot_binary_inner(
1902    read_handle: &mut ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1903    as_of: Timestamp,
1904) -> impl Iterator<Item = StateUpdate<StateUpdateKindJson>> + DoubleEndedIterator + use<> {
1905    let snapshot = read_handle
1906        .snapshot_and_fetch(Antichain::from_elem(as_of))
1907        .await
1908        .expect("we have advanced the restart_as_of by the since");
1909    soft_assert_no_log!(
1910        snapshot.iter().all(|(_, _, diff)| *diff == 1),
1911        "snapshot_and_fetch guarantees a consolidated result: {snapshot:#?}"
1912    );
1913    snapshot
1914        .into_iter()
1915        .map(Into::<StateUpdate<StateUpdateKindJson>>::into)
1916        .sorted_by(|a, b| Ord::cmp(&b.ts, &a.ts))
1917}
1918
1919/// Convert an [`Antichain<Timestamp>`] to a [`Timestamp`].
1920///
1921/// The correctness of this function relies on [`Timestamp`] being totally ordered and never
1922/// finalizing the catalog shard.
1923pub(crate) fn antichain_to_timestamp(antichain: Antichain<Timestamp>) -> Timestamp {
1924    antichain
1925        .into_option()
1926        .expect("we use a totally ordered time and never finalize the shard")
1927}
1928
1929// Debug methods used by the catalog-debug tool.
1930
1931impl Trace {
1932    /// Generates a [`Trace`] from snapshot.
1933    fn from_snapshot(snapshot: impl IntoIterator<Item = StateUpdate>) -> Trace {
1934        let mut trace = Trace::new();
1935        for StateUpdate { kind, ts, diff } in snapshot {
1936            match kind {
1937                StateUpdateKind::AuditLog(k, v) => trace.audit_log.values.push(((k, v), ts, diff)),
1938                StateUpdateKind::Cluster(k, v) => trace.clusters.values.push(((k, v), ts, diff)),
1939                StateUpdateKind::ClusterReplica(k, v) => {
1940                    trace.cluster_replicas.values.push(((k, v), ts, diff))
1941                }
1942                StateUpdateKind::Comment(k, v) => trace.comments.values.push(((k, v), ts, diff)),
1943                StateUpdateKind::Config(k, v) => trace.configs.values.push(((k, v), ts, diff)),
1944                StateUpdateKind::Database(k, v) => trace.databases.values.push(((k, v), ts, diff)),
1945                StateUpdateKind::DefaultPrivilege(k, v) => {
1946                    trace.default_privileges.values.push(((k, v), ts, diff))
1947                }
1948                StateUpdateKind::FenceToken(_) => {
1949                    // Fence token not included in trace.
1950                }
1951                StateUpdateKind::IdAllocator(k, v) => {
1952                    trace.id_allocator.values.push(((k, v), ts, diff))
1953                }
1954                StateUpdateKind::IntrospectionSourceIndex(k, v) => {
1955                    trace.introspection_sources.values.push(((k, v), ts, diff))
1956                }
1957                StateUpdateKind::Item(k, v) => trace.items.values.push(((k, v), ts, diff)),
1958                StateUpdateKind::NetworkPolicy(k, v) => {
1959                    trace.network_policies.values.push(((k, v), ts, diff))
1960                }
1961                StateUpdateKind::Role(k, v) => trace.roles.values.push(((k, v), ts, diff)),
1962                StateUpdateKind::Schema(k, v) => trace.schemas.values.push(((k, v), ts, diff)),
1963                StateUpdateKind::Setting(k, v) => trace.settings.values.push(((k, v), ts, diff)),
1964                StateUpdateKind::SourceReferences(k, v) => {
1965                    trace.source_references.values.push(((k, v), ts, diff))
1966                }
1967                StateUpdateKind::SystemConfiguration(k, v) => {
1968                    trace.system_configurations.values.push(((k, v), ts, diff))
1969                }
1970                StateUpdateKind::SystemObjectMapping(k, v) => {
1971                    trace.system_object_mappings.values.push(((k, v), ts, diff))
1972                }
1973                StateUpdateKind::SystemPrivilege(k, v) => {
1974                    trace.system_privileges.values.push(((k, v), ts, diff))
1975                }
1976                StateUpdateKind::StorageCollectionMetadata(k, v) => trace
1977                    .storage_collection_metadata
1978                    .values
1979                    .push(((k, v), ts, diff)),
1980                StateUpdateKind::UnfinalizedShard(k, ()) => {
1981                    trace.unfinalized_shards.values.push(((k, ()), ts, diff))
1982                }
1983                StateUpdateKind::TxnWalShard((), v) => {
1984                    trace.txn_wal_shard.values.push((((), v), ts, diff))
1985                }
1986                StateUpdateKind::RoleAuth(k, v) => trace.role_auth.values.push(((k, v), ts, diff)),
1987            }
1988        }
1989        trace
1990    }
1991}
1992
1993impl UnopenedPersistCatalogState {
1994    /// Manually update value of `key` in collection `T` to `value`.
1995    #[mz_ore::instrument]
1996    pub(crate) async fn debug_edit<T: Collection>(
1997        &mut self,
1998        key: T::Key,
1999        value: T::Value,
2000    ) -> Result<Option<T::Value>, CatalogError>
2001    where
2002        T::Key: PartialEq + Eq + Debug + Clone,
2003        T::Value: Debug + Clone,
2004    {
2005        let prev_value = loop {
2006            let key = key.clone();
2007            let value = value.clone();
2008            let snapshot = self.current_snapshot().await?;
2009            let trace = Trace::from_snapshot(snapshot);
2010            let collection_trace = T::collection_trace(trace);
2011            let prev_values: Vec<_> = collection_trace
2012                .values
2013                .into_iter()
2014                .filter(|((k, _), _, diff)| {
2015                    soft_assert_eq_or_log!(*diff, Diff::ONE, "trace is consolidated");
2016                    &key == k
2017                })
2018                .collect();
2019
2020            let prev_value = match &prev_values[..] {
2021                [] => None,
2022                [((_, v), _, _)] => Some(v.clone()),
2023                prev_values => panic!("multiple values found for key {key:?}: {prev_values:?}"),
2024            };
2025
2026            let mut updates: Vec<_> = prev_values
2027                .into_iter()
2028                .map(|((k, v), _, _)| (T::update(k, v), Diff::MINUS_ONE))
2029                .collect();
2030            updates.push((T::update(key, value), Diff::ONE));
2031            // We must fence out all other catalogs, if we haven't already, since we are writing.
2032            match self.fenceable_token.generate_unfenced_token(self.mode)? {
2033                Some((fence_updates, current_fenceable_token)) => {
2034                    updates.extend(fence_updates.clone());
2035                    match self.compare_and_append(updates, self.upper).await {
2036                        Ok(_) => {
2037                            self.fenceable_token = current_fenceable_token;
2038                            break prev_value;
2039                        }
2040                        Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
2041                        Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
2042                            warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
2043                            continue;
2044                        }
2045                    }
2046                }
2047                None => {
2048                    self.compare_and_append(updates, self.upper)
2049                        .await
2050                        .map_err(|e| e.unwrap_fence_error())?;
2051                    break prev_value;
2052                }
2053            }
2054        };
2055        Ok(prev_value)
2056    }
2057
2058    /// Manually delete `key` from collection `T`.
2059    #[mz_ore::instrument]
2060    pub(crate) async fn debug_delete<T: Collection>(
2061        &mut self,
2062        key: T::Key,
2063    ) -> Result<(), CatalogError>
2064    where
2065        T::Key: PartialEq + Eq + Debug + Clone,
2066        T::Value: Debug,
2067    {
2068        loop {
2069            let key = key.clone();
2070            let snapshot = self.current_snapshot().await?;
2071            let trace = Trace::from_snapshot(snapshot);
2072            let collection_trace = T::collection_trace(trace);
2073            let mut retractions: Vec<_> = collection_trace
2074                .values
2075                .into_iter()
2076                .filter(|((k, _), _, diff)| {
2077                    soft_assert_eq_or_log!(*diff, Diff::ONE, "trace is consolidated");
2078                    &key == k
2079                })
2080                .map(|((k, v), _, _)| (T::update(k, v), Diff::MINUS_ONE))
2081                .collect();
2082
2083            // We must fence out all other catalogs, if we haven't already, since we are writing.
2084            match self.fenceable_token.generate_unfenced_token(self.mode)? {
2085                Some((fence_updates, current_fenceable_token)) => {
2086                    retractions.extend(fence_updates.clone());
2087                    match self.compare_and_append(retractions, self.upper).await {
2088                        Ok(_) => {
2089                            self.fenceable_token = current_fenceable_token;
2090                            break;
2091                        }
2092                        Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
2093                        Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
2094                            warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
2095                            continue;
2096                        }
2097                    }
2098                }
2099                None => {
2100                    self.compare_and_append(retractions, self.upper)
2101                        .await
2102                        .map_err(|e| e.unwrap_fence_error())?;
2103                    break;
2104                }
2105            }
2106        }
2107        Ok(())
2108    }
2109
2110    /// Generates a [`Vec<StateUpdate>`] that contain all updates to the catalog
2111    /// state.
2112    ///
2113    /// The output is consolidated and sorted by timestamp in ascending order and the current upper.
2114    async fn current_snapshot(
2115        &mut self,
2116    ) -> Result<impl IntoIterator<Item = StateUpdate> + '_, CatalogError> {
2117        self.sync_to_current_upper().await?;
2118        self.consolidate();
2119        Ok(self.snapshot.iter().cloned().map(|(kind, ts, diff)| {
2120            let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
2121            StateUpdate { kind, ts, diff }
2122        }))
2123    }
2124}