mz_catalog/durable/
persist.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10#[cfg(test)]
11mod tests;
12
13use std::cmp::max;
14use std::collections::{BTreeMap, VecDeque};
15use std::fmt::Debug;
16use std::str::FromStr;
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use async_trait::async_trait;
21use differential_dataflow::lattice::Lattice;
22use futures::{FutureExt, StreamExt};
23use itertools::Itertools;
24use mz_audit_log::VersionedEvent;
25use mz_ore::metrics::MetricsFutureExt;
26use mz_ore::now::EpochMillis;
27use mz_ore::{
28    soft_assert_eq_no_log, soft_assert_eq_or_log, soft_assert_ne_or_log, soft_assert_no_log,
29    soft_assert_or_log, soft_panic_or_log,
30};
31use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
32use mz_persist_client::cli::admin::{CATALOG_FORCE_COMPACTION_FUEL, CATALOG_FORCE_COMPACTION_WAIT};
33use mz_persist_client::critical::SinceHandle;
34use mz_persist_client::error::UpperMismatch;
35use mz_persist_client::read::{Listen, ListenEvent, ReadHandle};
36use mz_persist_client::write::WriteHandle;
37use mz_persist_client::{Diagnostics, PersistClient, ShardId};
38use mz_persist_types::codec_impls::UnitSchema;
39use mz_proto::{RustType, TryFromProtoError};
40use mz_repr::{Diff, RelationDesc, ScalarType};
41use mz_storage_types::StorageDiff;
42use mz_storage_types::sources::SourceData;
43use sha2::Digest;
44use timely::Container;
45use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
46use tracing::{debug, info, warn};
47use uuid::Uuid;
48
49use crate::durable::debug::{Collection, CollectionType, DebugCatalogState, Trace};
50use crate::durable::error::FenceError;
51use crate::durable::initialize::{
52    ENABLE_0DT_DEPLOYMENT, ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, SYSTEM_CONFIG_SYNCED_KEY,
53    USER_VERSION_KEY, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
54};
55use crate::durable::metrics::Metrics;
56use crate::durable::objects::state_update::{
57    IntoStateUpdateKindJson, StateUpdate, StateUpdateKind, StateUpdateKindJson,
58    TryIntoStateUpdateKind,
59};
60use crate::durable::objects::{AuditLogKey, FenceToken, Snapshot};
61use crate::durable::transaction::TransactionBatch;
62use crate::durable::upgrade::upgrade;
63use crate::durable::{
64    AuditLogIterator, BootstrapArgs, CATALOG_CONTENT_VERSION_KEY, CatalogError,
65    DurableCatalogError, DurableCatalogState, Epoch, OpenableDurableCatalogState,
66    ReadOnlyDurableCatalogState, Transaction, initialize,
67};
68use crate::memory;
69
70/// New-type used to represent timestamps in persist.
71pub(crate) type Timestamp = mz_repr::Timestamp;
72
73/// The minimum value of an epoch.
74///
75/// # Safety
76/// `new_unchecked` is safe to call with a non-zero value.
77const MIN_EPOCH: Epoch = unsafe { Epoch::new_unchecked(1) };
78
79/// Human readable catalog shard name.
80const CATALOG_SHARD_NAME: &str = "catalog";
81/// Human readable catalog upgrade shard name.
82const UPGRADE_SHARD_NAME: &str = "catalog_upgrade";
83
84/// Seed used to generate the persist shard ID for the catalog.
85const CATALOG_SEED: usize = 1;
86/// Seed used to generate the catalog upgrade shard ID.
87///
88/// All state that gets written to persist is tagged with the version of the code that wrote that
89/// state. Persist has limited forward compatibility in how many versions in the future a reader can
90/// read. Reading from persist updates state and the version that the state is tagged with. As a
91/// consequence, reading from persist may unintentionally fence out other readers and writers with
92/// a lower version. We use the catalog upgrade shard to track what database version is actively
93/// deployed so readers from the future, such as the upgrade checker tool, don't accidentally fence out the
94/// database from persist. Only writable opened catalogs can increment the version in the upgrade
95/// shard.
96///
97/// One specific example that we are trying to avoid with the catalog upgrade shard is the
98/// following:
99///
100///   1. Database is running on version 0.X.0.
101///   2. Upgrade checker is run on version 0.X+1.0.
102///   3. Upgrade checker is run on version 0.X+2.0.
103///
104/// With the catalog upgrade shard, the upgrade checker in step (3) can see that the database is
105/// currently running on v0.X.0 and reading the catalog would cause the database to get fenced out.
106/// So instead of reading the catalog it errors out. Without the catalog upgrade shard, the upgrade
107/// checker could read the version in the catalog shard, and see that it is v0.X+1.0, but it would
108/// be impossible to differentiate between the following two scenarios:
109///
110///   - The database is running on v0.X+1.0 and it's safe to run the upgrade checker at v0.X+2.0.
111///   - Some other upgrade checker incremented the version to v0.X+1.0, the database is running on
112///   version v0.X.0, and it is not safe to run the upgrade checker.
113///
114/// Persist guarantees that the shard versions are non-decreasing, so we don't need to worry about
115/// race conditions where the shard version decreases after reading it.
116const UPGRADE_SEED: usize = 2;
117/// Legacy seed used to generate the persist shard ID for builtin table migrations. DO NOT REUSE.
118pub const _BUILTIN_MIGRATION_SEED: usize = 3;
119/// Legacy seed used to generate the persist shard ID for the expression cache. DO NOT REUSE.
120pub const _EXPRESSION_CACHE_SEED: usize = 4;
121
122/// Durable catalog mode that dictates the effect of mutable operations.
123#[derive(Debug, Copy, Clone, Eq, PartialEq)]
124pub(crate) enum Mode {
125    /// Mutable operations are prohibited.
126    Readonly,
127    /// Mutable operations have an effect in-memory, but aren't persisted durably.
128    Savepoint,
129    /// Mutable operations have an effect in-memory and durably.
130    Writable,
131}
132
133/// Enum representing the fenced state of the catalog.
134#[derive(Debug)]
135pub(crate) enum FenceableToken {
136    /// The catalog is still initializing and learning about previously written fence tokens. This
137    /// state can be fenced if it encounters a larger deploy generation.
138    Initializing {
139        /// The largest fence token durably written to the catalog, if any.
140        durable_token: Option<FenceToken>,
141        /// This process's deploy generation.
142        current_deploy_generation: Option<u64>,
143    },
144    /// The current token has not been fenced.
145    Unfenced { current_token: FenceToken },
146    /// The current token has been fenced.
147    Fenced {
148        current_token: FenceToken,
149        fence_token: FenceToken,
150    },
151}
152
153impl FenceableToken {
154    /// Returns a new token.
155    fn new(current_deploy_generation: Option<u64>) -> Self {
156        Self::Initializing {
157            durable_token: None,
158            current_deploy_generation,
159        }
160    }
161
162    /// Returns the current token if it is not fenced, otherwise returns an error.
163    fn validate(&self) -> Result<Option<FenceToken>, FenceError> {
164        match self {
165            FenceableToken::Initializing { durable_token, .. } => Ok(durable_token.clone()),
166            FenceableToken::Unfenced { current_token, .. } => Ok(Some(current_token.clone())),
167            FenceableToken::Fenced {
168                current_token,
169                fence_token,
170            } => {
171                assert!(
172                    fence_token > current_token,
173                    "must be fenced by higher token; current={current_token:?}, fence={fence_token:?}"
174                );
175                if fence_token.deploy_generation > current_token.deploy_generation {
176                    Err(FenceError::DeployGeneration {
177                        current_generation: current_token.deploy_generation,
178                        fence_generation: fence_token.deploy_generation,
179                    })
180                } else {
181                    assert!(
182                        fence_token.epoch > current_token.epoch,
183                        "must be fenced by higher token; current={current_token:?}, fence={fence_token:?}"
184                    );
185                    Err(FenceError::Epoch {
186                        current_epoch: current_token.epoch,
187                        fence_epoch: fence_token.epoch,
188                    })
189                }
190            }
191        }
192    }
193
194    /// Returns the current token.
195    fn token(&self) -> Option<FenceToken> {
196        match self {
197            FenceableToken::Initializing { durable_token, .. } => durable_token.clone(),
198            FenceableToken::Unfenced { current_token, .. } => Some(current_token.clone()),
199            FenceableToken::Fenced { current_token, .. } => Some(current_token.clone()),
200        }
201    }
202
203    /// Returns `Err` if `token` fences out `self`, `Ok` otherwise.
204    fn maybe_fence(&mut self, token: FenceToken) -> Result<(), FenceError> {
205        match self {
206            FenceableToken::Initializing {
207                durable_token,
208                current_deploy_generation,
209                ..
210            } => {
211                match durable_token {
212                    Some(durable_token) => {
213                        *durable_token = max(durable_token.clone(), token.clone());
214                    }
215                    None => {
216                        *durable_token = Some(token.clone());
217                    }
218                }
219                if let Some(current_deploy_generation) = current_deploy_generation {
220                    if *current_deploy_generation < token.deploy_generation {
221                        *self = FenceableToken::Fenced {
222                            current_token: FenceToken {
223                                deploy_generation: *current_deploy_generation,
224                                epoch: token.epoch,
225                            },
226                            fence_token: token,
227                        };
228                        self.validate()?;
229                    }
230                }
231            }
232            FenceableToken::Unfenced { current_token } => {
233                if *current_token < token {
234                    *self = FenceableToken::Fenced {
235                        current_token: current_token.clone(),
236                        fence_token: token,
237                    };
238                    self.validate()?;
239                }
240            }
241            FenceableToken::Fenced { .. } => {
242                self.validate()?;
243            }
244        }
245
246        Ok(())
247    }
248
249    /// Returns a [`FenceableToken::Unfenced`] token and the updates to the catalog required to
250    /// transition to the `Unfenced` state if `self` is [`FenceableToken::Initializing`], otherwise
251    /// returns `None`.
252    fn generate_unfenced_token(
253        &self,
254        mode: Mode,
255    ) -> Result<Option<(Vec<(StateUpdateKind, Diff)>, FenceableToken)>, DurableCatalogError> {
256        let (durable_token, current_deploy_generation) = match self {
257            FenceableToken::Initializing {
258                durable_token,
259                current_deploy_generation,
260            } => (durable_token.clone(), current_deploy_generation.clone()),
261            FenceableToken::Unfenced { .. } | FenceableToken::Fenced { .. } => return Ok(None),
262        };
263
264        let mut fence_updates = Vec::with_capacity(2);
265
266        if let Some(durable_token) = &durable_token {
267            fence_updates.push((
268                StateUpdateKind::FenceToken(durable_token.clone()),
269                Diff::MINUS_ONE,
270            ));
271        }
272
273        let current_deploy_generation = current_deploy_generation
274            .or_else(|| durable_token.as_ref().map(|token| token.deploy_generation))
275            // We cannot initialize a catalog without a deploy generation.
276            .ok_or(DurableCatalogError::Uninitialized)?;
277        let mut current_epoch = durable_token
278            .map(|token| token.epoch)
279            .unwrap_or(MIN_EPOCH)
280            .get();
281        // Only writable catalogs attempt to increment the epoch.
282        if matches!(mode, Mode::Writable) {
283            current_epoch = current_epoch + 1;
284        }
285        let current_epoch = Epoch::new(current_epoch).expect("known to be non-zero");
286        let current_token = FenceToken {
287            deploy_generation: current_deploy_generation,
288            epoch: current_epoch,
289        };
290
291        fence_updates.push((
292            StateUpdateKind::FenceToken(current_token.clone()),
293            Diff::ONE,
294        ));
295
296        let current_fenceable_token = FenceableToken::Unfenced { current_token };
297
298        Ok(Some((fence_updates, current_fenceable_token)))
299    }
300}
301
302/// An error that can occur while executing [`PersistHandle::compare_and_append`].
303#[derive(Debug, thiserror::Error)]
304pub(crate) enum CompareAndAppendError {
305    #[error(transparent)]
306    Fence(#[from] FenceError),
307    /// Catalog encountered an upper mismatch when trying to write to the catalog. This should only
308    /// happen while trying to fence out other catalogs.
309    #[error(
310        "expected catalog upper {expected_upper:?} did not match actual catalog upper {actual_upper:?}"
311    )]
312    UpperMismatch {
313        expected_upper: Timestamp,
314        actual_upper: Timestamp,
315    },
316}
317
318impl CompareAndAppendError {
319    pub(crate) fn unwrap_fence_error(self) -> FenceError {
320        match self {
321            CompareAndAppendError::Fence(e) => e,
322            e @ CompareAndAppendError::UpperMismatch { .. } => {
323                panic!("unexpected upper mismatch: {e:?}")
324            }
325        }
326    }
327}
328
329impl From<UpperMismatch<Timestamp>> for CompareAndAppendError {
330    fn from(upper_mismatch: UpperMismatch<Timestamp>) -> Self {
331        Self::UpperMismatch {
332            expected_upper: antichain_to_timestamp(upper_mismatch.expected),
333            actual_upper: antichain_to_timestamp(upper_mismatch.current),
334        }
335    }
336}
337
338pub(crate) trait ApplyUpdate<T: IntoStateUpdateKindJson> {
339    /// Process and apply `update`.
340    ///
341    /// Returns `Some` if `update` should be cached in memory and `None` otherwise.
342    fn apply_update(
343        &mut self,
344        update: StateUpdate<T>,
345        current_fence_token: &mut FenceableToken,
346        metrics: &Arc<Metrics>,
347    ) -> Result<Option<StateUpdate<T>>, FenceError>;
348}
349
350/// A handle for interacting with the persist catalog shard.
351///
352/// The catalog shard is used in multiple different contexts, for example pre-open and post-open,
353/// but for all contexts the majority of the durable catalog's behavior is identical. This struct
354/// implements those behaviors that are identical while allowing the user to specify the different
355/// behaviors via generic parameters.
356///
357/// The behavior of the durable catalog can be different along one of two axes. The first is the
358/// format of each individual update, i.e. raw binary, the current protobuf version, previous
359/// protobuf versions, etc. The second axis is what to do with each individual update, for example
360/// before opening we cache all config updates but don't cache them after opening. These behaviors
361/// are customizable via the `T: TryIntoStateUpdateKind` and `U: ApplyUpdate<T>` generic parameters
362/// respectively.
363#[derive(Debug)]
364pub(crate) struct PersistHandle<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> {
365    /// The [`Mode`] that this catalog was opened in.
366    pub(crate) mode: Mode,
367    /// Since handle to control compaction.
368    since_handle: SinceHandle<SourceData, (), Timestamp, StorageDiff, i64>,
369    /// Write handle to persist.
370    write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
371    /// Listener to catalog changes.
372    listen: Listen<SourceData, (), Timestamp, StorageDiff>,
373    /// Handle for connecting to persist.
374    persist_client: PersistClient,
375    /// Catalog shard ID.
376    shard_id: ShardId,
377    /// Cache of the most recent catalog snapshot.
378    ///
379    /// We use a tuple instead of [`StateUpdate`] to make consolidation easier.
380    pub(crate) snapshot: Vec<(T, Timestamp, Diff)>,
381    /// Applies custom processing, filtering, and fencing for each individual update.
382    update_applier: U,
383    /// The current upper of the persist shard.
384    pub(crate) upper: Timestamp,
385    /// The fence token of the catalog, if one exists.
386    fenceable_token: FenceableToken,
387    /// The semantic version of the current binary.
388    catalog_content_version: semver::Version,
389    /// Flag to indicate if bootstrap is complete.
390    bootstrap_complete: bool,
391    /// Metrics for the persist catalog.
392    metrics: Arc<Metrics>,
393}
394
395impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
396    /// Increment the version in the catalog upgrade shard to the code's current version.
397    async fn increment_catalog_upgrade_shard_version(&self, organization_id: Uuid) {
398        let upgrade_shard_id = shard_id(organization_id, UPGRADE_SEED);
399        let mut write_handle: WriteHandle<(), (), Timestamp, StorageDiff> = self
400            .persist_client
401            .open_writer(
402                upgrade_shard_id,
403                Arc::new(UnitSchema::default()),
404                Arc::new(UnitSchema::default()),
405                Diagnostics {
406                    shard_name: UPGRADE_SHARD_NAME.to_string(),
407                    handle_purpose: "increment durable catalog upgrade shard version".to_string(),
408                },
409            )
410            .await
411            .expect("invalid usage");
412        const EMPTY_UPDATES: &[(((), ()), Timestamp, StorageDiff)] = &[];
413        let mut upper = write_handle.fetch_recent_upper().await.clone();
414        loop {
415            let next_upper = upper
416                .iter()
417                .map(|timestamp| timestamp.step_forward())
418                .collect();
419            match write_handle
420                .compare_and_append(EMPTY_UPDATES, upper, next_upper)
421                .await
422                .expect("invalid usage")
423            {
424                Ok(()) => break,
425                Err(upper_mismatch) => {
426                    upper = upper_mismatch.current;
427                }
428            }
429        }
430    }
431
432    /// Fetch the current upper of the catalog state.
433    #[mz_ore::instrument]
434    async fn current_upper(&mut self) -> Timestamp {
435        match self.mode {
436            Mode::Writable | Mode::Readonly => {
437                let upper = self.write_handle.fetch_recent_upper().await;
438                antichain_to_timestamp(upper.clone())
439            }
440            Mode::Savepoint => self.upper,
441        }
442    }
443
444    /// Appends `updates` iff the current global upper of the catalog is `self.upper`.
445    ///
446    /// Returns the next upper used to commit the transaction.
447    #[mz_ore::instrument]
448    pub(crate) async fn compare_and_append<S: IntoStateUpdateKindJson>(
449        &mut self,
450        updates: Vec<(S, Diff)>,
451        commit_ts: Timestamp,
452    ) -> Result<Timestamp, CompareAndAppendError> {
453        assert_eq!(self.mode, Mode::Writable);
454        assert!(
455            commit_ts >= self.upper,
456            "expected commit ts, {}, to be greater than or equal to upper, {}",
457            commit_ts,
458            self.upper
459        );
460
461        // This awkward code allows us to perform an expensive soft assert that requires cloning
462        // `updates` twice, after `updates` has been consumed.
463        let contains_fence = if mz_ore::assert::soft_assertions_enabled() {
464            let updates: Vec<_> = updates.clone();
465            let parsed_updates: Vec<_> = updates
466                .clone()
467                .into_iter()
468                .map(|(update, diff)| {
469                    let update: StateUpdateKindJson = update.into();
470                    (update, diff)
471                })
472                .filter_map(|(update, diff)| {
473                    <StateUpdateKindJson as TryIntoStateUpdateKind>::try_into(update)
474                        .ok()
475                        .map(|update| (update, diff))
476                })
477                .collect();
478            let contains_retraction = parsed_updates.iter().any(|(update, diff)| {
479                matches!(update, StateUpdateKind::FenceToken(..)) && *diff == Diff::MINUS_ONE
480            });
481            let contains_addition = parsed_updates.iter().any(|(update, diff)| {
482                matches!(update, StateUpdateKind::FenceToken(..)) && *diff == Diff::ONE
483            });
484            let contains_fence = contains_retraction && contains_addition;
485            Some((contains_fence, updates))
486        } else {
487            None
488        };
489
490        let updates = updates.into_iter().map(|(kind, diff)| {
491            let kind: StateUpdateKindJson = kind.into();
492            (
493                (Into::<SourceData>::into(kind), ()),
494                commit_ts,
495                diff.into_inner(),
496            )
497        });
498        let next_upper = commit_ts.step_forward();
499        let res = self
500            .write_handle
501            .compare_and_append(
502                updates,
503                Antichain::from_elem(self.upper),
504                Antichain::from_elem(next_upper),
505            )
506            .await
507            .expect("invalid usage");
508
509        // There was an upper mismatch which means something else must have written to the catalog.
510        // Syncing to the current upper should result in a fence error since writing to the catalog
511        // without fencing other catalogs should be impossible. The one exception is if we are
512        // trying to fence other catalogs with this write, in which case we won't see a fence error.
513        if let Err(e @ UpperMismatch { .. }) = res {
514            self.sync_to_current_upper().await?;
515            if let Some((contains_fence, updates)) = contains_fence {
516                assert!(
517                    contains_fence,
518                    "updates were neither fenced nor fencing and encountered an upper mismatch: {updates:#?}"
519                )
520            }
521            return Err(e.into());
522        }
523
524        // Lag the shard's upper by 1 to keep it readable.
525        let downgrade_to = Antichain::from_elem(next_upper.saturating_sub(1));
526
527        // The since handle gives us the ability to fence out other downgraders using an opaque token.
528        // (See the method documentation for details.)
529        // That's not needed here, so we use the since handle's opaque token to avoid any comparison
530        // failures.
531        let opaque = *self.since_handle.opaque();
532        let downgrade = self
533            .since_handle
534            .maybe_compare_and_downgrade_since(&opaque, (&opaque, &downgrade_to))
535            .await;
536
537        match downgrade {
538            None => {}
539            Some(Err(e)) => soft_panic_or_log!("found opaque value {e}, but expected {opaque}"),
540            Some(Ok(updated)) => soft_assert_or_log!(
541                updated == downgrade_to,
542                "updated bound should match expected"
543            ),
544        }
545        self.sync(next_upper).await?;
546        Ok(next_upper)
547    }
548
549    /// Generates an iterator of [`StateUpdate`] that contain all unconsolidated updates to the
550    /// catalog state up to, and including, `as_of`.
551    #[mz_ore::instrument]
552    async fn snapshot_unconsolidated(&mut self) -> Vec<StateUpdate<StateUpdateKind>> {
553        let current_upper = self.current_upper().await;
554
555        let mut snapshot = Vec::new();
556        let mut read_handle = self.read_handle().await;
557        let as_of = as_of(&read_handle, current_upper);
558        let mut stream = Box::pin(
559            // We use `snapshot_and_stream` because it guarantees unconsolidated output.
560            read_handle
561                .snapshot_and_stream(Antichain::from_elem(as_of))
562                .await
563                .expect("we have advanced the restart_as_of by the since"),
564        );
565        while let Some(update) = stream.next().await {
566            snapshot.push(update)
567        }
568        read_handle.expire().await;
569        snapshot
570            .into_iter()
571            .map(Into::<StateUpdate<StateUpdateKindJson>>::into)
572            .map(|state_update| state_update.try_into().expect("kind decoding error"))
573            .collect()
574    }
575
576    /// Listen and apply all updates that are currently in persist.
577    ///
578    /// Returns an error if this instance has been fenced out.
579    #[mz_ore::instrument]
580    pub(crate) async fn sync_to_current_upper(&mut self) -> Result<(), FenceError> {
581        let upper = self.current_upper().await;
582        self.sync(upper).await
583    }
584
585    /// Listen and apply all updates up to `target_upper`.
586    ///
587    /// Returns an error if this instance has been fenced out.
588    #[mz_ore::instrument(level = "debug")]
589    pub(crate) async fn sync(&mut self, target_upper: Timestamp) -> Result<(), FenceError> {
590        self.metrics.syncs.inc();
591        let counter = self.metrics.sync_latency_seconds.clone();
592        self.sync_inner(target_upper)
593            .wall_time()
594            .inc_by(counter)
595            .await
596    }
597
598    #[mz_ore::instrument(level = "debug")]
599    async fn sync_inner(&mut self, target_upper: Timestamp) -> Result<(), FenceError> {
600        self.fenceable_token.validate()?;
601
602        // Savepoint catalogs do not yet know how to update themselves in response to concurrent
603        // writes from writer catalogs.
604        if self.mode == Mode::Savepoint {
605            self.upper = max(self.upper, target_upper);
606            return Ok(());
607        }
608
609        let mut updates: BTreeMap<_, Vec<_>> = BTreeMap::new();
610
611        while self.upper < target_upper {
612            let listen_events = self.listen.fetch_next().await;
613            for listen_event in listen_events {
614                match listen_event {
615                    ListenEvent::Progress(upper) => {
616                        debug!("synced up to {upper:?}");
617                        self.upper = antichain_to_timestamp(upper);
618                        // Attempt to apply updates in batches of a single timestamp. If another
619                        // catalog wrote a fence token at one timestamp and then updates in a new
620                        // format at a later timestamp, then we want to apply the fence token
621                        // before attempting to deserialize the new updates.
622                        while let Some((ts, updates)) = updates.pop_first() {
623                            assert!(ts < self.upper, "expected {} < {}", ts, self.upper);
624                            let updates = updates.into_iter().map(
625                                |update: StateUpdate<StateUpdateKindJson>| {
626                                    let kind =
627                                        T::try_from(update.kind).expect("kind decoding error");
628                                    StateUpdate {
629                                        kind,
630                                        ts: update.ts,
631                                        diff: update.diff,
632                                    }
633                                },
634                            );
635                            self.apply_updates(updates)?;
636                        }
637                    }
638                    ListenEvent::Updates(batch_updates) => {
639                        for update in batch_updates {
640                            let update: StateUpdate<StateUpdateKindJson> = update.into();
641                            updates.entry(update.ts).or_default().push(update);
642                        }
643                    }
644                }
645            }
646        }
647        assert_eq!(updates, BTreeMap::new(), "all updates should be applied");
648        Ok(())
649    }
650
651    #[mz_ore::instrument(level = "debug")]
652    pub(crate) fn apply_updates(
653        &mut self,
654        updates: impl IntoIterator<Item = StateUpdate<T>>,
655    ) -> Result<(), FenceError> {
656        let mut updates: Vec<_> = updates
657            .into_iter()
658            .map(|StateUpdate { kind, ts, diff }| (kind, ts, diff))
659            .collect();
660
661        // This helps guarantee that for a single key, there is at most a single retraction and a
662        // single insertion per timestamp. Otherwise, we would need to match the retractions and
663        // insertions up by value and manually figure out what the end value should be.
664        differential_dataflow::consolidation::consolidate_updates(&mut updates);
665
666        // Updates must be applied in timestamp order. Within a timestamp retractions must be
667        // applied before insertions, or we might end up retracting the wrong value.
668        updates.sort_by(|(_, ts1, diff1), (_, ts2, diff2)| ts1.cmp(ts2).then(diff1.cmp(diff2)));
669
670        let mut errors = Vec::new();
671
672        for (kind, ts, diff) in updates {
673            if diff != Diff::ONE && diff != Diff::MINUS_ONE {
674                panic!("invalid update in consolidated trace: ({kind:?}, {ts:?}, {diff:?})");
675            }
676
677            match self.update_applier.apply_update(
678                StateUpdate { kind, ts, diff },
679                &mut self.fenceable_token,
680                &self.metrics,
681            ) {
682                Ok(Some(StateUpdate { kind, ts, diff })) => self.snapshot.push((kind, ts, diff)),
683                Ok(None) => {}
684                // Instead of returning immediately, we accumulate all the errors and return the one
685                // with the most information.
686                Err(err) => errors.push(err),
687            }
688        }
689
690        errors.sort();
691        if let Some(err) = errors.into_iter().next() {
692            return Err(err);
693        }
694
695        self.consolidate();
696
697        Ok(())
698    }
699
700    #[mz_ore::instrument]
701    pub(crate) fn consolidate(&mut self) {
702        soft_assert_no_log!(
703            self.snapshot
704                .windows(2)
705                .all(|updates| updates[0].1 <= updates[1].1),
706            "snapshot should be sorted by timestamp, {:#?}",
707            self.snapshot
708        );
709
710        let new_ts = self
711            .snapshot
712            .last()
713            .map(|(_, ts, _)| *ts)
714            .unwrap_or_else(Timestamp::minimum);
715        for (_, ts, _) in &mut self.snapshot {
716            *ts = new_ts;
717        }
718        differential_dataflow::consolidation::consolidate_updates(&mut self.snapshot);
719    }
720
721    /// Execute and return the results of `f` on the current catalog trace.
722    ///
723    /// Will return an error if the catalog has been fenced out.
724    async fn with_trace<R>(
725        &mut self,
726        f: impl FnOnce(&Vec<(T, Timestamp, Diff)>) -> Result<R, CatalogError>,
727    ) -> Result<R, CatalogError> {
728        self.sync_to_current_upper().await?;
729        f(&self.snapshot)
730    }
731
732    /// Open a read handle to the catalog.
733    async fn read_handle(&self) -> ReadHandle<SourceData, (), Timestamp, StorageDiff> {
734        self.persist_client
735            .open_leased_reader(
736                self.shard_id,
737                Arc::new(desc()),
738                Arc::new(UnitSchema::default()),
739                Diagnostics {
740                    shard_name: CATALOG_SHARD_NAME.to_string(),
741                    handle_purpose: "openable durable catalog state temporary reader".to_string(),
742                },
743                USE_CRITICAL_SINCE_CATALOG.get(self.persist_client.dyncfgs()),
744            )
745            .await
746            .expect("invalid usage")
747    }
748
749    /// Politely releases all external resources that can only be released in an async context.
750    async fn expire(self: Box<Self>) {
751        self.write_handle.expire().await;
752        self.listen.expire().await;
753    }
754}
755
756impl<U: ApplyUpdate<StateUpdateKind>> PersistHandle<StateUpdateKind, U> {
757    /// Execute and return the results of `f` on the current catalog snapshot.
758    ///
759    /// Will return an error if the catalog has been fenced out.
760    async fn with_snapshot<T>(
761        &mut self,
762        f: impl FnOnce(Snapshot) -> Result<T, CatalogError>,
763    ) -> Result<T, CatalogError> {
764        fn apply<K, V>(map: &mut BTreeMap<K, V>, key: &K, value: &V, diff: Diff)
765        where
766            K: Ord + Clone,
767            V: Ord + Clone + Debug,
768        {
769            let key = key.clone();
770            let value = value.clone();
771            if diff == Diff::ONE {
772                let prev = map.insert(key, value);
773                assert_eq!(
774                    prev, None,
775                    "values must be explicitly retracted before inserting a new value"
776                );
777            } else if diff == Diff::MINUS_ONE {
778                let prev = map.remove(&key);
779                assert_eq!(
780                    prev,
781                    Some(value),
782                    "retraction does not match existing value"
783                );
784            }
785        }
786
787        self.with_trace(|trace| {
788            let mut snapshot = Snapshot::empty();
789            for (kind, ts, diff) in trace {
790                let diff = *diff;
791                if diff != Diff::ONE && diff != Diff::MINUS_ONE {
792                    panic!("invalid update in consolidated trace: ({kind:?}, {ts:?}, {diff:?})");
793                }
794
795                match kind {
796                    StateUpdateKind::AuditLog(_key, ()) => {
797                        // Ignore for snapshots.
798                    }
799                    StateUpdateKind::Cluster(key, value) => {
800                        apply(&mut snapshot.clusters, key, value, diff);
801                    }
802                    StateUpdateKind::ClusterReplica(key, value) => {
803                        apply(&mut snapshot.cluster_replicas, key, value, diff);
804                    }
805                    StateUpdateKind::Comment(key, value) => {
806                        apply(&mut snapshot.comments, key, value, diff);
807                    }
808                    StateUpdateKind::Config(key, value) => {
809                        apply(&mut snapshot.configs, key, value, diff);
810                    }
811                    StateUpdateKind::Database(key, value) => {
812                        apply(&mut snapshot.databases, key, value, diff);
813                    }
814                    StateUpdateKind::DefaultPrivilege(key, value) => {
815                        apply(&mut snapshot.default_privileges, key, value, diff);
816                    }
817                    StateUpdateKind::FenceToken(_token) => {
818                        // Ignore for snapshots.
819                    }
820                    StateUpdateKind::IdAllocator(key, value) => {
821                        apply(&mut snapshot.id_allocator, key, value, diff);
822                    }
823                    StateUpdateKind::IntrospectionSourceIndex(key, value) => {
824                        apply(&mut snapshot.introspection_sources, key, value, diff);
825                    }
826                    StateUpdateKind::Item(key, value) => {
827                        apply(&mut snapshot.items, key, value, diff);
828                    }
829                    StateUpdateKind::NetworkPolicy(key, value) => {
830                        apply(&mut snapshot.network_policies, key, value, diff);
831                    }
832                    StateUpdateKind::Role(key, value) => {
833                        apply(&mut snapshot.roles, key, value, diff);
834                    }
835                    StateUpdateKind::Schema(key, value) => {
836                        apply(&mut snapshot.schemas, key, value, diff);
837                    }
838                    StateUpdateKind::Setting(key, value) => {
839                        apply(&mut snapshot.settings, key, value, diff);
840                    }
841                    StateUpdateKind::SourceReferences(key, value) => {
842                        apply(&mut snapshot.source_references, key, value, diff);
843                    }
844                    StateUpdateKind::SystemConfiguration(key, value) => {
845                        apply(&mut snapshot.system_configurations, key, value, diff);
846                    }
847                    StateUpdateKind::SystemObjectMapping(key, value) => {
848                        apply(&mut snapshot.system_object_mappings, key, value, diff);
849                    }
850                    StateUpdateKind::SystemPrivilege(key, value) => {
851                        apply(&mut snapshot.system_privileges, key, value, diff);
852                    }
853                    StateUpdateKind::StorageCollectionMetadata(key, value) => {
854                        apply(&mut snapshot.storage_collection_metadata, key, value, diff);
855                    }
856                    StateUpdateKind::UnfinalizedShard(key, ()) => {
857                        apply(&mut snapshot.unfinalized_shards, key, &(), diff);
858                    }
859                    StateUpdateKind::TxnWalShard((), value) => {
860                        apply(&mut snapshot.txn_wal_shard, &(), value, diff);
861                    }
862                    StateUpdateKind::RoleAuth(key, value) => {
863                        apply(&mut snapshot.role_auth, key, value, diff);
864                    }
865                }
866            }
867            f(snapshot)
868        })
869        .await
870    }
871
872    /// Generates an iterator of [`StateUpdate`] that contain all updates to the catalog
873    /// state.
874    ///
875    /// The output is fetched directly from persist instead of the in-memory cache.
876    ///
877    /// The output is consolidated and sorted by timestamp in ascending order.
878    #[mz_ore::instrument(level = "debug")]
879    async fn persist_snapshot(
880        &mut self,
881    ) -> impl Iterator<Item = StateUpdate> + DoubleEndedIterator {
882        let mut read_handle = self.read_handle().await;
883        let as_of = as_of(&read_handle, self.upper);
884        let snapshot = snapshot_binary(&mut read_handle, as_of, &self.metrics)
885            .await
886            .map(|update| update.try_into().expect("kind decoding error"));
887        read_handle.expire().await;
888        snapshot
889    }
890}
891
892/// Applies updates for an unopened catalog.
893#[derive(Debug)]
894pub(crate) struct UnopenedCatalogStateInner {
895    /// The organization ID of the environment.
896    organization_id: Uuid,
897    /// A cache of the config collection of the catalog.
898    configs: BTreeMap<String, u64>,
899    /// A cache of the settings collection of the catalog.
900    settings: BTreeMap<String, String>,
901}
902
903impl UnopenedCatalogStateInner {
904    fn new(organization_id: Uuid) -> UnopenedCatalogStateInner {
905        UnopenedCatalogStateInner {
906            organization_id,
907            configs: BTreeMap::new(),
908            settings: BTreeMap::new(),
909        }
910    }
911}
912
913impl ApplyUpdate<StateUpdateKindJson> for UnopenedCatalogStateInner {
914    fn apply_update(
915        &mut self,
916        update: StateUpdate<StateUpdateKindJson>,
917        current_fence_token: &mut FenceableToken,
918        _metrics: &Arc<Metrics>,
919    ) -> Result<Option<StateUpdate<StateUpdateKindJson>>, FenceError> {
920        if !update.kind.is_audit_log() && update.kind.is_always_deserializable() {
921            let kind = TryInto::try_into(&update.kind).expect("kind is known to be deserializable");
922            match (kind, update.diff) {
923                (StateUpdateKind::Config(key, value), Diff::ONE) => {
924                    let prev = self.configs.insert(key.key, value.value);
925                    assert_eq!(
926                        prev, None,
927                        "values must be explicitly retracted before inserting a new value"
928                    );
929                }
930                (StateUpdateKind::Config(key, value), Diff::MINUS_ONE) => {
931                    let prev = self.configs.remove(&key.key);
932                    assert_eq!(
933                        prev,
934                        Some(value.value),
935                        "retraction does not match existing value"
936                    );
937                }
938                (StateUpdateKind::Setting(key, value), Diff::ONE) => {
939                    let prev = self.settings.insert(key.name, value.value);
940                    assert_eq!(
941                        prev, None,
942                        "values must be explicitly retracted before inserting a new value"
943                    );
944                }
945                (StateUpdateKind::Setting(key, value), Diff::MINUS_ONE) => {
946                    let prev = self.settings.remove(&key.name);
947                    assert_eq!(
948                        prev,
949                        Some(value.value),
950                        "retraction does not match existing value"
951                    );
952                }
953                (StateUpdateKind::FenceToken(fence_token), Diff::ONE) => {
954                    current_fence_token.maybe_fence(fence_token)?;
955                }
956                _ => {}
957            }
958        }
959
960        Ok(Some(update))
961    }
962}
963
964/// A Handle to an unopened catalog stored in persist. The unopened catalog can serve `Config` data,
965/// `Setting` data, or the current epoch. All other catalog data may be un-migrated and should not
966/// be read until the catalog has been opened. The [`UnopenedPersistCatalogState`] is responsible
967/// for opening the catalog, see [`OpenableDurableCatalogState::open`] for more details.
968///
969/// Production users should call [`Self::expire`] before dropping an [`UnopenedPersistCatalogState`]
970/// so that it can expire its leases. If/when rust gets AsyncDrop, this will be done automatically.
971pub(crate) type UnopenedPersistCatalogState =
972    PersistHandle<StateUpdateKindJson, UnopenedCatalogStateInner>;
973
974impl UnopenedPersistCatalogState {
975    /// Create a new [`UnopenedPersistCatalogState`] to the catalog state associated with
976    /// `organization_id`.
977    ///
978    /// All usages of the persist catalog must go through this function. That includes the
979    /// catalog-debug tool, the adapter's catalog, etc.
980    #[mz_ore::instrument]
981    pub(crate) async fn new(
982        persist_client: PersistClient,
983        organization_id: Uuid,
984        version: semver::Version,
985        deploy_generation: Option<u64>,
986        metrics: Arc<Metrics>,
987    ) -> Result<UnopenedPersistCatalogState, DurableCatalogError> {
988        let catalog_shard_id = shard_id(organization_id, CATALOG_SEED);
989        let upgrade_shard_id = shard_id(organization_id, UPGRADE_SEED);
990        debug!(
991            ?catalog_shard_id,
992            ?upgrade_shard_id,
993            "new persist backed catalog state"
994        );
995
996        // Check the catalog upgrade shard to see ensure that we don't fence anyone out of persist.
997        let version_in_upgrade_shard =
998            fetch_catalog_upgrade_shard_version(&persist_client, upgrade_shard_id).await;
999        // If this is `None`, no version was found in the upgrade shard. This is a brand-new
1000        // environment, and we don't need to worry about fencing existing users.
1001        if let Some(version_in_upgrade_shard) = version_in_upgrade_shard {
1002            // IMPORTANT: We swap the order of arguments here! Normally it's
1003            // `code_version, data_version`, and we check whether a given code
1004            // version, which is usually _older_, is allowed to touch a shard
1005            // that has been touched by a _future_ version.
1006            //
1007            // By inverting argument order, we check if our version is too far
1008            // ahead of the version in the shard.
1009            if mz_persist_client::cfg::check_data_version(&version_in_upgrade_shard, &version)
1010                .is_err()
1011            {
1012                return Err(DurableCatalogError::IncompatiblePersistVersion {
1013                    found_version: version_in_upgrade_shard,
1014                    catalog_version: version,
1015                });
1016            }
1017        }
1018
1019        let open_handles_start = Instant::now();
1020        info!("startup: envd serve: catalog init: open handles beginning");
1021        let since_handle = persist_client
1022            .open_critical_since(
1023                catalog_shard_id,
1024                // TODO: We may need to use a different critical reader
1025                // id for this if we want to be able to introspect it via SQL.
1026                PersistClient::CONTROLLER_CRITICAL_SINCE,
1027                Diagnostics {
1028                    shard_name: CATALOG_SHARD_NAME.to_string(),
1029                    handle_purpose: "durable catalog state critical since".to_string(),
1030                },
1031            )
1032            .await
1033            .expect("invalid usage");
1034        let (mut write_handle, mut read_handle) = persist_client
1035            .open(
1036                catalog_shard_id,
1037                Arc::new(desc()),
1038                Arc::new(UnitSchema::default()),
1039                Diagnostics {
1040                    shard_name: CATALOG_SHARD_NAME.to_string(),
1041                    handle_purpose: "durable catalog state handles".to_string(),
1042                },
1043                USE_CRITICAL_SINCE_CATALOG.get(persist_client.dyncfgs()),
1044            )
1045            .await
1046            .expect("invalid usage");
1047        info!(
1048            "startup: envd serve: catalog init: open handles complete in {:?}",
1049            open_handles_start.elapsed()
1050        );
1051
1052        // Commit an empty write at the minimum timestamp so the catalog is always readable.
1053        let upper = {
1054            const EMPTY_UPDATES: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
1055            let upper = Antichain::from_elem(Timestamp::minimum());
1056            let next_upper = Timestamp::minimum().step_forward();
1057            match write_handle
1058                .compare_and_append(EMPTY_UPDATES, upper, Antichain::from_elem(next_upper))
1059                .await
1060                .expect("invalid usage")
1061            {
1062                Ok(()) => next_upper,
1063                Err(mismatch) => antichain_to_timestamp(mismatch.current),
1064            }
1065        };
1066
1067        let snapshot_start = Instant::now();
1068        info!("startup: envd serve: catalog init: snapshot beginning");
1069        let as_of = as_of(&read_handle, upper);
1070        let snapshot: Vec<_> = snapshot_binary(&mut read_handle, as_of, &metrics)
1071            .await
1072            .map(|StateUpdate { kind, ts, diff }| (kind, ts, diff))
1073            .collect();
1074        let listen = read_handle
1075            .listen(Antichain::from_elem(as_of))
1076            .await
1077            .expect("invalid usage");
1078        info!(
1079            "startup: envd serve: catalog init: snapshot complete in {:?}",
1080            snapshot_start.elapsed()
1081        );
1082
1083        let mut handle = UnopenedPersistCatalogState {
1084            // Unopened catalogs are always writeable until they're opened in an explicit mode.
1085            mode: Mode::Writable,
1086            since_handle,
1087            write_handle,
1088            listen,
1089            persist_client,
1090            shard_id: catalog_shard_id,
1091            // Initialize empty in-memory state.
1092            snapshot: Vec::new(),
1093            update_applier: UnopenedCatalogStateInner::new(organization_id),
1094            upper,
1095            fenceable_token: FenceableToken::new(deploy_generation),
1096            catalog_content_version: version,
1097            bootstrap_complete: false,
1098            metrics,
1099        };
1100        // If the snapshot is not consolidated, and we see multiple epoch values while applying the
1101        // updates, then we might accidentally fence ourselves out.
1102        soft_assert_no_log!(
1103            snapshot.iter().all(|(_, _, diff)| *diff == Diff::ONE),
1104            "snapshot should be consolidated: {snapshot:#?}"
1105        );
1106
1107        let apply_start = Instant::now();
1108        info!("startup: envd serve: catalog init: apply updates beginning");
1109        let updates = snapshot
1110            .into_iter()
1111            .map(|(kind, ts, diff)| StateUpdate { kind, ts, diff });
1112        handle.apply_updates(updates)?;
1113        info!(
1114            "startup: envd serve: catalog init: apply updates complete in {:?}",
1115            apply_start.elapsed()
1116        );
1117
1118        // Validate that the binary version of the current process is not less than any binary
1119        // version that has written to the catalog.
1120        // This condition is only checked once, right here. If a new process comes along with a
1121        // higher version, it must fence this process out with one of the existing fencing
1122        // mechanisms.
1123        if let Some(found_version) = handle.get_catalog_content_version().await? {
1124            if handle.catalog_content_version < found_version {
1125                return Err(DurableCatalogError::IncompatiblePersistVersion {
1126                    found_version,
1127                    catalog_version: handle.catalog_content_version,
1128                });
1129            }
1130        }
1131
1132        Ok(handle)
1133    }
1134
1135    #[mz_ore::instrument]
1136    async fn open_inner(
1137        mut self,
1138        mode: Mode,
1139        initial_ts: Timestamp,
1140        bootstrap_args: &BootstrapArgs,
1141    ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1142        // It would be nice to use `initial_ts` here, but it comes from the system clock, not the
1143        // timestamp oracle.
1144        let mut commit_ts = self.upper;
1145        self.mode = mode;
1146
1147        // Validate the current deploy generation.
1148        match (&self.mode, &self.fenceable_token) {
1149            (_, FenceableToken::Unfenced { .. } | FenceableToken::Fenced { .. }) => {
1150                return Err(DurableCatalogError::Internal(
1151                    "catalog should not have fenced before opening".to_string(),
1152                )
1153                .into());
1154            }
1155            (
1156                Mode::Writable | Mode::Savepoint,
1157                FenceableToken::Initializing {
1158                    current_deploy_generation: None,
1159                    ..
1160                },
1161            ) => {
1162                return Err(DurableCatalogError::Internal(format!(
1163                    "cannot open in mode '{:?}' without a deploy generation",
1164                    self.mode,
1165                ))
1166                .into());
1167            }
1168            _ => {}
1169        }
1170
1171        let read_only = matches!(self.mode, Mode::Readonly);
1172
1173        // Fence out previous catalogs.
1174        loop {
1175            self.sync_to_current_upper().await?;
1176            commit_ts = max(commit_ts, self.upper);
1177            let (fence_updates, current_fenceable_token) = self
1178                .fenceable_token
1179                .generate_unfenced_token(self.mode)?
1180                .ok_or_else(|| {
1181                    DurableCatalogError::Internal(
1182                        "catalog should not have fenced before opening".to_string(),
1183                    )
1184                })?;
1185            debug!(
1186                ?self.upper,
1187                ?self.fenceable_token,
1188                ?current_fenceable_token,
1189                "fencing previous catalogs"
1190            );
1191            if matches!(self.mode, Mode::Writable) {
1192                match self
1193                    .compare_and_append(fence_updates.clone(), commit_ts)
1194                    .await
1195                {
1196                    Ok(upper) => {
1197                        commit_ts = upper;
1198                    }
1199                    Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
1200                    Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
1201                        warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
1202                        continue;
1203                    }
1204                }
1205            }
1206            self.fenceable_token = current_fenceable_token;
1207            break;
1208        }
1209
1210        let is_initialized = self.is_initialized_inner();
1211        if !matches!(self.mode, Mode::Writable) && !is_initialized {
1212            return Err(CatalogError::Durable(DurableCatalogError::NotWritable(
1213                format!(
1214                    "catalog tables do not exist; will not create in {:?} mode",
1215                    self.mode
1216                ),
1217            )));
1218        }
1219        soft_assert_ne_or_log!(self.upper, Timestamp::minimum());
1220
1221        // Remove all audit log entries.
1222        let (audit_logs, snapshot): (Vec<_>, Vec<_>) = self
1223            .snapshot
1224            .into_iter()
1225            .partition(|(update, _, _)| update.is_audit_log());
1226        self.snapshot = snapshot;
1227        let audit_log_count = audit_logs.iter().map(|(_, _, diff)| diff).sum::<Diff>();
1228        let audit_log_handle = AuditLogIterator::new(audit_logs);
1229
1230        // Perform data migrations.
1231        if is_initialized && !read_only {
1232            commit_ts = upgrade(&mut self, commit_ts).await?;
1233        }
1234
1235        debug!(
1236            ?is_initialized,
1237            ?self.upper,
1238            "initializing catalog state"
1239        );
1240        let mut catalog = PersistCatalogState {
1241            mode: self.mode,
1242            since_handle: self.since_handle,
1243            write_handle: self.write_handle,
1244            listen: self.listen,
1245            persist_client: self.persist_client,
1246            shard_id: self.shard_id,
1247            upper: self.upper,
1248            fenceable_token: self.fenceable_token,
1249            // Initialize empty in-memory state.
1250            snapshot: Vec::new(),
1251            update_applier: CatalogStateInner::new(),
1252            catalog_content_version: self.catalog_content_version,
1253            bootstrap_complete: false,
1254            metrics: self.metrics,
1255        };
1256        catalog.metrics.collection_entries.reset();
1257        // Normally, `collection_entries` is updated in `apply_updates`. The audit log updates skip
1258        // over that function so we manually update it here.
1259        catalog
1260            .metrics
1261            .collection_entries
1262            .with_label_values(&[&CollectionType::AuditLog.to_string()])
1263            .add(audit_log_count.into_inner());
1264        let updates = self.snapshot.into_iter().map(|(kind, ts, diff)| {
1265            let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
1266            StateUpdate { kind, ts, diff }
1267        });
1268        catalog.apply_updates(updates)?;
1269
1270        let catalog_content_version = catalog.catalog_content_version.to_string();
1271        let txn = if is_initialized {
1272            let mut txn = catalog.transaction().await?;
1273            txn.set_catalog_content_version(catalog_content_version)?;
1274            txn
1275        } else {
1276            soft_assert_eq_no_log!(
1277                catalog
1278                    .snapshot
1279                    .iter()
1280                    .filter(|(kind, _, _)| !matches!(kind, StateUpdateKind::FenceToken(_)))
1281                    .count(),
1282                0,
1283                "trace should not contain any updates for an uninitialized catalog: {:#?}",
1284                catalog.snapshot
1285            );
1286
1287            let mut txn = catalog.transaction().await?;
1288            initialize::initialize(
1289                &mut txn,
1290                bootstrap_args,
1291                initial_ts.into(),
1292                catalog_content_version,
1293            )
1294            .await?;
1295            txn
1296        };
1297
1298        if read_only {
1299            let (txn_batch, _) = txn.into_parts();
1300            // The upper here doesn't matter because we are only applying the updates in memory.
1301            let updates = StateUpdate::from_txn_batch_ts(txn_batch, catalog.upper);
1302            catalog.apply_updates(updates)?;
1303        } else {
1304            txn.commit_internal(commit_ts).await?;
1305        }
1306
1307        // Now that we've fully opened the catalog at the current version, we can increment the
1308        // version in the catalog upgrade shard to signal to readers that the allowable versions
1309        // have increased.
1310        if matches!(catalog.mode, Mode::Writable) {
1311            catalog
1312                .increment_catalog_upgrade_shard_version(self.update_applier.organization_id)
1313                .await;
1314            let write_handle = catalog
1315                .persist_client
1316                .open_writer::<SourceData, (), Timestamp, i64>(
1317                    catalog.write_handle.shard_id(),
1318                    Arc::new(desc()),
1319                    Arc::new(UnitSchema::default()),
1320                    Diagnostics {
1321                        shard_name: CATALOG_SHARD_NAME.to_string(),
1322                        handle_purpose: "compact catalog".to_string(),
1323                    },
1324                )
1325                .await
1326                .expect("invalid usage");
1327            let fuel = CATALOG_FORCE_COMPACTION_FUEL.handle(catalog.persist_client.dyncfgs());
1328            let wait = CATALOG_FORCE_COMPACTION_WAIT.handle(catalog.persist_client.dyncfgs());
1329            // We're going to gradually turn this on via dyncfgs. Run it in a task so that it
1330            // doesn't block startup.
1331            let _task = mz_ore::task::spawn(|| "catalog::force_shard_compaction", async move {
1332                let () =
1333                    mz_persist_client::cli::admin::dangerous_force_compaction_and_break_pushdown(
1334                        &write_handle,
1335                        || fuel.get(),
1336                        || wait.get(),
1337                    )
1338                    .await;
1339            });
1340        }
1341
1342        Ok((Box::new(catalog), audit_log_handle))
1343    }
1344
1345    /// Reports if the catalog state has been initialized.
1346    ///
1347    /// NOTE: This is the answer as of the last call to [`PersistHandle::sync`] or [`PersistHandle::sync_to_current_upper`],
1348    /// not necessarily what is currently in persist.
1349    #[mz_ore::instrument]
1350    fn is_initialized_inner(&self) -> bool {
1351        !self.update_applier.configs.is_empty()
1352    }
1353
1354    /// Get the current value of config `key`.
1355    ///
1356    /// Some configs need to be read before the catalog is opened for bootstrapping.
1357    #[mz_ore::instrument]
1358    async fn get_current_config(&mut self, key: &str) -> Result<Option<u64>, DurableCatalogError> {
1359        self.sync_to_current_upper().await?;
1360        Ok(self.update_applier.configs.get(key).cloned())
1361    }
1362
1363    /// Get the user version of this instance.
1364    ///
1365    /// The user version is used to determine if a migration is needed.
1366    #[mz_ore::instrument]
1367    pub(crate) async fn get_user_version(&mut self) -> Result<Option<u64>, DurableCatalogError> {
1368        self.get_current_config(USER_VERSION_KEY).await
1369    }
1370
1371    /// Get the current value of setting `name`.
1372    ///
1373    /// Some settings need to be read before the catalog is opened for bootstrapping.
1374    #[mz_ore::instrument]
1375    async fn get_current_setting(
1376        &mut self,
1377        name: &str,
1378    ) -> Result<Option<String>, DurableCatalogError> {
1379        self.sync_to_current_upper().await?;
1380        Ok(self.update_applier.settings.get(name).cloned())
1381    }
1382
1383    /// Get the catalog content version.
1384    ///
1385    /// The catalog content version is the semantic version of the most recent binary that wrote to
1386    /// the catalog.
1387    #[mz_ore::instrument]
1388    async fn get_catalog_content_version(
1389        &mut self,
1390    ) -> Result<Option<semver::Version>, DurableCatalogError> {
1391        let version = self
1392            .get_current_setting(CATALOG_CONTENT_VERSION_KEY)
1393            .await?;
1394        let version = version.map(|version| version.parse().expect("invalid version persisted"));
1395        Ok(version)
1396    }
1397}
1398
1399#[async_trait]
1400impl OpenableDurableCatalogState for UnopenedPersistCatalogState {
1401    #[mz_ore::instrument]
1402    async fn open_savepoint(
1403        mut self: Box<Self>,
1404        initial_ts: Timestamp,
1405        bootstrap_args: &BootstrapArgs,
1406    ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1407        self.open_inner(Mode::Savepoint, initial_ts, bootstrap_args)
1408            .boxed()
1409            .await
1410    }
1411
1412    #[mz_ore::instrument]
1413    async fn open_read_only(
1414        mut self: Box<Self>,
1415        bootstrap_args: &BootstrapArgs,
1416    ) -> Result<Box<dyn DurableCatalogState>, CatalogError> {
1417        self.open_inner(Mode::Readonly, EpochMillis::MIN.into(), bootstrap_args)
1418            .boxed()
1419            .await
1420            .map(|(catalog, _)| catalog)
1421    }
1422
1423    #[mz_ore::instrument]
1424    async fn open(
1425        mut self: Box<Self>,
1426        initial_ts: Timestamp,
1427        bootstrap_args: &BootstrapArgs,
1428    ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1429        self.open_inner(Mode::Writable, initial_ts, bootstrap_args)
1430            .boxed()
1431            .await
1432    }
1433
1434    #[mz_ore::instrument(level = "debug")]
1435    async fn open_debug(mut self: Box<Self>) -> Result<DebugCatalogState, CatalogError> {
1436        Ok(DebugCatalogState(*self))
1437    }
1438
1439    #[mz_ore::instrument]
1440    async fn is_initialized(&mut self) -> Result<bool, CatalogError> {
1441        self.sync_to_current_upper().await?;
1442        Ok(self.is_initialized_inner())
1443    }
1444
1445    #[mz_ore::instrument]
1446    async fn epoch(&mut self) -> Result<Epoch, CatalogError> {
1447        self.sync_to_current_upper().await?;
1448        self.fenceable_token
1449            .validate()?
1450            .map(|token| token.epoch)
1451            .ok_or(CatalogError::Durable(DurableCatalogError::Uninitialized))
1452    }
1453
1454    #[mz_ore::instrument]
1455    async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError> {
1456        self.sync_to_current_upper().await?;
1457        self.fenceable_token
1458            .token()
1459            .map(|token| token.deploy_generation)
1460            .ok_or(CatalogError::Durable(DurableCatalogError::Uninitialized))
1461    }
1462
1463    #[mz_ore::instrument(level = "debug")]
1464    async fn get_enable_0dt_deployment(&mut self) -> Result<Option<bool>, CatalogError> {
1465        let value = self.get_current_config(ENABLE_0DT_DEPLOYMENT).await?;
1466        match value {
1467            None => Ok(None),
1468            Some(0) => Ok(Some(false)),
1469            Some(1) => Ok(Some(true)),
1470            Some(v) => Err(
1471                DurableCatalogError::from(TryFromProtoError::UnknownEnumVariant(format!(
1472                    "{v} is not a valid boolean value"
1473                )))
1474                .into(),
1475            ),
1476        }
1477    }
1478
1479    #[mz_ore::instrument(level = "debug")]
1480    async fn get_0dt_deployment_max_wait(&mut self) -> Result<Option<Duration>, CatalogError> {
1481        let value = self
1482            .get_current_config(WITH_0DT_DEPLOYMENT_MAX_WAIT)
1483            .await?;
1484        match value {
1485            None => Ok(None),
1486            Some(millis) => Ok(Some(Duration::from_millis(millis))),
1487        }
1488    }
1489
1490    #[mz_ore::instrument(level = "debug")]
1491    async fn get_0dt_deployment_ddl_check_interval(
1492        &mut self,
1493    ) -> Result<Option<Duration>, CatalogError> {
1494        let value = self
1495            .get_current_config(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL)
1496            .await?;
1497        match value {
1498            None => Ok(None),
1499            Some(millis) => Ok(Some(Duration::from_millis(millis))),
1500        }
1501    }
1502
1503    #[mz_ore::instrument(level = "debug")]
1504    async fn get_enable_0dt_deployment_panic_after_timeout(
1505        &mut self,
1506    ) -> Result<Option<bool>, CatalogError> {
1507        let value = self
1508            .get_current_config(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT)
1509            .await?;
1510        match value {
1511            None => Ok(None),
1512            Some(0) => Ok(Some(false)),
1513            Some(1) => Ok(Some(true)),
1514            Some(v) => Err(
1515                DurableCatalogError::from(TryFromProtoError::UnknownEnumVariant(format!(
1516                    "{v} is not a valid boolean value"
1517                )))
1518                .into(),
1519            ),
1520        }
1521    }
1522
1523    #[mz_ore::instrument]
1524    async fn has_system_config_synced_once(&mut self) -> Result<bool, DurableCatalogError> {
1525        self.get_current_config(SYSTEM_CONFIG_SYNCED_KEY)
1526            .await
1527            .map(|value| value.map(|value| value > 0).unwrap_or(false))
1528    }
1529
1530    #[mz_ore::instrument]
1531    async fn trace_unconsolidated(&mut self) -> Result<Trace, CatalogError> {
1532        self.sync_to_current_upper().await?;
1533        if self.is_initialized_inner() {
1534            let snapshot = self.snapshot_unconsolidated().await;
1535            Ok(Trace::from_snapshot(snapshot))
1536        } else {
1537            Err(CatalogError::Durable(DurableCatalogError::Uninitialized))
1538        }
1539    }
1540
1541    #[mz_ore::instrument]
1542    async fn trace_consolidated(&mut self) -> Result<Trace, CatalogError> {
1543        self.sync_to_current_upper().await?;
1544        if self.is_initialized_inner() {
1545            let snapshot = self.current_snapshot().await?;
1546            Ok(Trace::from_snapshot(snapshot))
1547        } else {
1548            Err(CatalogError::Durable(DurableCatalogError::Uninitialized))
1549        }
1550    }
1551
1552    #[mz_ore::instrument(level = "debug")]
1553    async fn expire(self: Box<Self>) {
1554        self.expire().await
1555    }
1556}
1557
1558/// Applies updates for an opened catalog.
1559#[derive(Debug)]
1560struct CatalogStateInner {
1561    /// A trace of all catalog updates that can be consumed by some higher layer.
1562    updates: VecDeque<memory::objects::StateUpdate>,
1563}
1564
1565impl CatalogStateInner {
1566    fn new() -> CatalogStateInner {
1567        CatalogStateInner {
1568            updates: VecDeque::new(),
1569        }
1570    }
1571}
1572
1573impl ApplyUpdate<StateUpdateKind> for CatalogStateInner {
1574    fn apply_update(
1575        &mut self,
1576        update: StateUpdate<StateUpdateKind>,
1577        current_fence_token: &mut FenceableToken,
1578        metrics: &Arc<Metrics>,
1579    ) -> Result<Option<StateUpdate<StateUpdateKind>>, FenceError> {
1580        if let Some(collection_type) = update.kind.collection_type() {
1581            metrics
1582                .collection_entries
1583                .with_label_values(&[&collection_type.to_string()])
1584                .add(update.diff.into_inner());
1585        }
1586
1587        {
1588            let update: Option<memory::objects::StateUpdate> = (&update)
1589                .try_into()
1590                .expect("invalid persisted update: {update:#?}");
1591            if let Some(update) = update {
1592                self.updates.push_back(update);
1593            }
1594        }
1595
1596        match (update.kind, update.diff) {
1597            (StateUpdateKind::AuditLog(_, ()), _) => Ok(None),
1598            // Nothing to due for fence token retractions but wait for the next insertion.
1599            (StateUpdateKind::FenceToken(_), Diff::MINUS_ONE) => Ok(None),
1600            (StateUpdateKind::FenceToken(token), Diff::ONE) => {
1601                current_fence_token.maybe_fence(token)?;
1602                Ok(None)
1603            }
1604            (kind, diff) => Ok(Some(StateUpdate {
1605                kind,
1606                ts: update.ts,
1607                diff,
1608            })),
1609        }
1610    }
1611}
1612
1613/// A durable store of the catalog state using Persist as an implementation. The durable store can
1614/// serve any catalog data and transactionally modify catalog data.
1615///
1616/// Production users should call [`Self::expire`] before dropping a [`PersistCatalogState`]
1617/// so that it can expire its leases. If/when rust gets AsyncDrop, this will be done automatically.
1618type PersistCatalogState = PersistHandle<StateUpdateKind, CatalogStateInner>;
1619
1620#[async_trait]
1621impl ReadOnlyDurableCatalogState for PersistCatalogState {
1622    fn epoch(&self) -> Epoch {
1623        self.fenceable_token
1624            .token()
1625            .expect("opened catalog state must have an epoch")
1626            .epoch
1627    }
1628
1629    #[mz_ore::instrument(level = "debug")]
1630    async fn expire(self: Box<Self>) {
1631        self.expire().await
1632    }
1633
1634    fn is_bootstrap_complete(&self) -> bool {
1635        self.bootstrap_complete
1636    }
1637
1638    async fn get_audit_logs(&mut self) -> Result<Vec<VersionedEvent>, CatalogError> {
1639        self.sync_to_current_upper().await?;
1640        let audit_logs: Vec<_> = self
1641            .persist_snapshot()
1642            .await
1643            .filter_map(
1644                |StateUpdate {
1645                     kind,
1646                     ts: _,
1647                     diff: _,
1648                 }| match kind {
1649                    StateUpdateKind::AuditLog(key, ()) => Some(key),
1650                    _ => None,
1651                },
1652            )
1653            .collect();
1654        let mut audit_logs: Vec<_> = audit_logs
1655            .into_iter()
1656            .map(RustType::from_proto)
1657            .map_ok(|key: AuditLogKey| key.event)
1658            .collect::<Result<_, _>>()?;
1659        audit_logs.sort_by(|a, b| a.sortable_id().cmp(&b.sortable_id()));
1660        Ok(audit_logs)
1661    }
1662
1663    #[mz_ore::instrument(level = "debug")]
1664    async fn get_next_id(&mut self, id_type: &str) -> Result<u64, CatalogError> {
1665        self.with_trace(|trace| {
1666            Ok(trace
1667                .into_iter()
1668                .rev()
1669                .filter_map(|(kind, _, _)| match kind {
1670                    StateUpdateKind::IdAllocator(key, value) if key.name == id_type => {
1671                        Some(value.next_id)
1672                    }
1673                    _ => None,
1674                })
1675                .next()
1676                .expect("must exist"))
1677        })
1678        .await
1679    }
1680
1681    #[mz_ore::instrument(level = "debug")]
1682    async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError> {
1683        self.sync_to_current_upper().await?;
1684        Ok(self
1685            .fenceable_token
1686            .token()
1687            .expect("opened catalogs must have a token")
1688            .deploy_generation)
1689    }
1690
1691    #[mz_ore::instrument(level = "debug")]
1692    async fn snapshot(&mut self) -> Result<Snapshot, CatalogError> {
1693        self.with_snapshot(Ok).await
1694    }
1695
1696    #[mz_ore::instrument(level = "debug")]
1697    async fn sync_to_current_updates(
1698        &mut self,
1699    ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError> {
1700        let upper = self.current_upper().await;
1701        self.sync_updates(upper).await
1702    }
1703
1704    #[mz_ore::instrument(level = "debug")]
1705    async fn sync_updates(
1706        &mut self,
1707        target_upper: mz_repr::Timestamp,
1708    ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError> {
1709        self.sync(target_upper).await?;
1710        let mut updates = Vec::new();
1711        while let Some(update) = self.update_applier.updates.front() {
1712            if update.ts >= target_upper {
1713                break;
1714            }
1715
1716            let update = self
1717                .update_applier
1718                .updates
1719                .pop_front()
1720                .expect("peeked above");
1721            updates.push(update);
1722        }
1723        Ok(updates)
1724    }
1725
1726    async fn current_upper(&mut self) -> Timestamp {
1727        self.current_upper().await
1728    }
1729}
1730
1731#[async_trait]
1732#[allow(elided_named_lifetimes)]
1733impl DurableCatalogState for PersistCatalogState {
1734    fn is_read_only(&self) -> bool {
1735        matches!(self.mode, Mode::Readonly)
1736    }
1737
1738    fn is_savepoint(&self) -> bool {
1739        matches!(self.mode, Mode::Savepoint)
1740    }
1741
1742    fn mark_bootstrap_complete(&mut self) {
1743        self.bootstrap_complete = true;
1744    }
1745
1746    #[mz_ore::instrument(level = "debug")]
1747    async fn transaction(&mut self) -> Result<Transaction, CatalogError> {
1748        self.metrics.transactions_started.inc();
1749        let snapshot = self.snapshot().await?;
1750        let commit_ts = self.upper.clone();
1751        Transaction::new(self, snapshot, commit_ts)
1752    }
1753
1754    #[mz_ore::instrument(level = "debug")]
1755    async fn commit_transaction(
1756        &mut self,
1757        txn_batch: TransactionBatch,
1758        commit_ts: Timestamp,
1759    ) -> Result<Timestamp, CatalogError> {
1760        async fn commit_transaction_inner(
1761            catalog: &mut PersistCatalogState,
1762            txn_batch: TransactionBatch,
1763            commit_ts: Timestamp,
1764        ) -> Result<Timestamp, CatalogError> {
1765            // If the current upper does not match the transaction's commit timestamp, then the
1766            // catalog must have changed since the transaction was started, making the transaction
1767            // invalid. When/if we want a multi-writer catalog, this will likely have to change
1768            // from an assert to a retry.
1769            assert_eq!(
1770                catalog.upper, txn_batch.upper,
1771                "only one transaction at a time is supported"
1772            );
1773
1774            assert!(
1775                commit_ts >= catalog.upper,
1776                "expected commit ts, {}, to be greater than or equal to upper, {}",
1777                commit_ts,
1778                catalog.upper
1779            );
1780
1781            let updates = StateUpdate::from_txn_batch(txn_batch).collect();
1782            debug!("committing updates: {updates:?}");
1783
1784            let next_upper = match catalog.mode {
1785                Mode::Writable => catalog
1786                    .compare_and_append(updates, commit_ts)
1787                    .await
1788                    .map_err(|e| e.unwrap_fence_error())?,
1789                Mode::Savepoint => {
1790                    let updates = updates.into_iter().map(|(kind, diff)| StateUpdate {
1791                        kind,
1792                        ts: commit_ts,
1793                        diff,
1794                    });
1795                    catalog.apply_updates(updates)?;
1796                    catalog.upper = commit_ts.step_forward();
1797                    catalog.upper
1798                }
1799                Mode::Readonly => {
1800                    // If the transaction is empty then we don't error, even in read-only mode.
1801                    // This is mostly for legacy reasons (i.e. with enough elbow grease this
1802                    // behavior can be changed without breaking any fundamental assumptions).
1803                    if !updates.is_empty() {
1804                        return Err(DurableCatalogError::NotWritable(format!(
1805                            "cannot commit a transaction in a read-only catalog: {updates:#?}"
1806                        ))
1807                        .into());
1808                    }
1809                    catalog.upper
1810                }
1811            };
1812
1813            Ok(next_upper)
1814        }
1815        self.metrics.transaction_commits.inc();
1816        let counter = self.metrics.transaction_commit_latency_seconds.clone();
1817        commit_transaction_inner(self, txn_batch, commit_ts)
1818            .wall_time()
1819            .inc_by(counter)
1820            .await
1821    }
1822
1823    #[mz_ore::instrument(level = "debug")]
1824    async fn confirm_leadership(&mut self) -> Result<(), CatalogError> {
1825        // Read only catalog does not care about leadership.
1826        if self.is_read_only() {
1827            return Ok(());
1828        }
1829        self.sync_to_current_upper().await?;
1830        Ok(())
1831    }
1832}
1833
1834/// Deterministically generate a shard ID for the given `organization_id` and `seed`.
1835pub fn shard_id(organization_id: Uuid, seed: usize) -> ShardId {
1836    let hash = sha2::Sha256::digest(format!("{organization_id}{seed}")).to_vec();
1837    soft_assert_eq_or_log!(hash.len(), 32, "SHA256 returns 32 bytes (256 bits)");
1838    let uuid = Uuid::from_slice(&hash[0..16]).expect("from_slice accepts exactly 16 bytes");
1839    ShardId::from_str(&format!("s{uuid}")).expect("known to be valid")
1840}
1841
1842/// Returns the schema of the `Row`s/`SourceData`s stored in the persist
1843/// shard backing the catalog.
1844fn desc() -> RelationDesc {
1845    RelationDesc::builder()
1846        .with_column("data", ScalarType::Jsonb.nullable(false))
1847        .finish()
1848}
1849
1850/// Generates a timestamp for reading from `read_handle` that is as fresh as possible, given
1851/// `upper`.
1852fn as_of(
1853    read_handle: &ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1854    upper: Timestamp,
1855) -> Timestamp {
1856    let since = read_handle.since().clone();
1857    let mut as_of = upper.checked_sub(1).unwrap_or_else(|| {
1858        panic!("catalog persist shard should be initialize, found upper: {upper:?}")
1859    });
1860    // We only downgrade the since after writing, and we always set the since to one less than the
1861    // upper.
1862    soft_assert_or_log!(
1863        since.less_equal(&as_of),
1864        "since={since:?}, as_of={as_of:?}; since must be less than or equal to as_of"
1865    );
1866    // This should be a no-op if the assert above passes, however if it doesn't then we'd like to
1867    // continue with a correct timestamp instead of entering a panic loop.
1868    as_of.advance_by(since.borrow());
1869    as_of
1870}
1871
1872/// Fetch the persist version of the catalog upgrade shard, if one exists. A version will not
1873/// exist if we are creating a brand-new environment.
1874async fn fetch_catalog_upgrade_shard_version(
1875    persist_client: &PersistClient,
1876    upgrade_shard_id: ShardId,
1877) -> Option<semver::Version> {
1878    let shard_state = persist_client
1879        .inspect_shard::<Timestamp>(&upgrade_shard_id)
1880        .await
1881        .ok()?;
1882    let json_state = serde_json::to_value(shard_state).expect("state serialization error");
1883    let upgrade_version = json_state
1884        .get("applier_version")
1885        .cloned()
1886        .expect("missing applier_version");
1887    let upgrade_version =
1888        serde_json::from_value(upgrade_version).expect("version deserialization error");
1889    Some(upgrade_version)
1890}
1891
1892/// Generates an iterator of [`StateUpdate`] that contain all updates to the catalog
1893/// state up to, and including, `as_of`.
1894///
1895/// The output is consolidated and sorted by timestamp in ascending order.
1896#[mz_ore::instrument(level = "debug")]
1897async fn snapshot_binary(
1898    read_handle: &mut ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1899    as_of: Timestamp,
1900    metrics: &Arc<Metrics>,
1901) -> impl Iterator<Item = StateUpdate<StateUpdateKindJson>> + DoubleEndedIterator + use<> {
1902    metrics.snapshots_taken.inc();
1903    let counter = metrics.snapshot_latency_seconds.clone();
1904    snapshot_binary_inner(read_handle, as_of)
1905        .wall_time()
1906        .inc_by(counter)
1907        .await
1908}
1909
1910/// Generates an iterator of [`StateUpdate`] that contain all updates to the catalog
1911/// state up to, and including, `as_of`.
1912///
1913/// The output is consolidated and sorted by timestamp in ascending order.
1914#[mz_ore::instrument(level = "debug")]
1915async fn snapshot_binary_inner(
1916    read_handle: &mut ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1917    as_of: Timestamp,
1918) -> impl Iterator<Item = StateUpdate<StateUpdateKindJson>> + DoubleEndedIterator + use<> {
1919    let snapshot = read_handle
1920        .snapshot_and_fetch(Antichain::from_elem(as_of))
1921        .await
1922        .expect("we have advanced the restart_as_of by the since");
1923    soft_assert_no_log!(
1924        snapshot.iter().all(|(_, _, diff)| *diff == 1),
1925        "snapshot_and_fetch guarantees a consolidated result: {snapshot:#?}"
1926    );
1927    snapshot
1928        .into_iter()
1929        .map(Into::<StateUpdate<StateUpdateKindJson>>::into)
1930        .sorted_by(|a, b| Ord::cmp(&b.ts, &a.ts))
1931}
1932
1933/// Convert an [`Antichain<Timestamp>`] to a [`Timestamp`].
1934///
1935/// The correctness of this function relies on [`Timestamp`] being totally ordered and never
1936/// finalizing the catalog shard.
1937pub(crate) fn antichain_to_timestamp(antichain: Antichain<Timestamp>) -> Timestamp {
1938    antichain
1939        .into_option()
1940        .expect("we use a totally ordered time and never finalize the shard")
1941}
1942
1943// Debug methods used by the catalog-debug tool.
1944
1945impl Trace {
1946    /// Generates a [`Trace`] from snapshot.
1947    fn from_snapshot(snapshot: impl IntoIterator<Item = StateUpdate>) -> Trace {
1948        let mut trace = Trace::new();
1949        for StateUpdate { kind, ts, diff } in snapshot {
1950            match kind {
1951                StateUpdateKind::AuditLog(k, v) => trace.audit_log.values.push(((k, v), ts, diff)),
1952                StateUpdateKind::Cluster(k, v) => trace.clusters.values.push(((k, v), ts, diff)),
1953                StateUpdateKind::ClusterReplica(k, v) => {
1954                    trace.cluster_replicas.values.push(((k, v), ts, diff))
1955                }
1956                StateUpdateKind::Comment(k, v) => trace.comments.values.push(((k, v), ts, diff)),
1957                StateUpdateKind::Config(k, v) => trace.configs.values.push(((k, v), ts, diff)),
1958                StateUpdateKind::Database(k, v) => trace.databases.values.push(((k, v), ts, diff)),
1959                StateUpdateKind::DefaultPrivilege(k, v) => {
1960                    trace.default_privileges.values.push(((k, v), ts, diff))
1961                }
1962                StateUpdateKind::FenceToken(_) => {
1963                    // Fence token not included in trace.
1964                }
1965                StateUpdateKind::IdAllocator(k, v) => {
1966                    trace.id_allocator.values.push(((k, v), ts, diff))
1967                }
1968                StateUpdateKind::IntrospectionSourceIndex(k, v) => {
1969                    trace.introspection_sources.values.push(((k, v), ts, diff))
1970                }
1971                StateUpdateKind::Item(k, v) => trace.items.values.push(((k, v), ts, diff)),
1972                StateUpdateKind::NetworkPolicy(k, v) => {
1973                    trace.network_policies.values.push(((k, v), ts, diff))
1974                }
1975                StateUpdateKind::Role(k, v) => trace.roles.values.push(((k, v), ts, diff)),
1976                StateUpdateKind::Schema(k, v) => trace.schemas.values.push(((k, v), ts, diff)),
1977                StateUpdateKind::Setting(k, v) => trace.settings.values.push(((k, v), ts, diff)),
1978                StateUpdateKind::SourceReferences(k, v) => {
1979                    trace.source_references.values.push(((k, v), ts, diff))
1980                }
1981                StateUpdateKind::SystemConfiguration(k, v) => {
1982                    trace.system_configurations.values.push(((k, v), ts, diff))
1983                }
1984                StateUpdateKind::SystemObjectMapping(k, v) => {
1985                    trace.system_object_mappings.values.push(((k, v), ts, diff))
1986                }
1987                StateUpdateKind::SystemPrivilege(k, v) => {
1988                    trace.system_privileges.values.push(((k, v), ts, diff))
1989                }
1990                StateUpdateKind::StorageCollectionMetadata(k, v) => trace
1991                    .storage_collection_metadata
1992                    .values
1993                    .push(((k, v), ts, diff)),
1994                StateUpdateKind::UnfinalizedShard(k, ()) => {
1995                    trace.unfinalized_shards.values.push(((k, ()), ts, diff))
1996                }
1997                StateUpdateKind::TxnWalShard((), v) => {
1998                    trace.txn_wal_shard.values.push((((), v), ts, diff))
1999                }
2000                StateUpdateKind::RoleAuth(k, v) => trace.role_auth.values.push(((k, v), ts, diff)),
2001            }
2002        }
2003        trace
2004    }
2005}
2006
2007impl UnopenedPersistCatalogState {
2008    /// Manually update value of `key` in collection `T` to `value`.
2009    #[mz_ore::instrument]
2010    pub(crate) async fn debug_edit<T: Collection>(
2011        &mut self,
2012        key: T::Key,
2013        value: T::Value,
2014    ) -> Result<Option<T::Value>, CatalogError>
2015    where
2016        T::Key: PartialEq + Eq + Debug + Clone,
2017        T::Value: Debug + Clone,
2018    {
2019        let prev_value = loop {
2020            let key = key.clone();
2021            let value = value.clone();
2022            let snapshot = self.current_snapshot().await?;
2023            let trace = Trace::from_snapshot(snapshot);
2024            let collection_trace = T::collection_trace(trace);
2025            let prev_values: Vec<_> = collection_trace
2026                .values
2027                .into_iter()
2028                .filter(|((k, _), _, diff)| {
2029                    soft_assert_eq_or_log!(*diff, Diff::ONE, "trace is consolidated");
2030                    &key == k
2031                })
2032                .collect();
2033
2034            let prev_value = match &prev_values[..] {
2035                [] => None,
2036                [((_, v), _, _)] => Some(v.clone()),
2037                prev_values => panic!("multiple values found for key {key:?}: {prev_values:?}"),
2038            };
2039
2040            let mut updates: Vec<_> = prev_values
2041                .into_iter()
2042                .map(|((k, v), _, _)| (T::update(k, v), Diff::MINUS_ONE))
2043                .collect();
2044            updates.push((T::update(key, value), Diff::ONE));
2045            // We must fence out all other catalogs, if we haven't already, since we are writing.
2046            match self.fenceable_token.generate_unfenced_token(self.mode)? {
2047                Some((fence_updates, current_fenceable_token)) => {
2048                    updates.extend(fence_updates.clone());
2049                    match self.compare_and_append(updates, self.upper).await {
2050                        Ok(_) => {
2051                            self.fenceable_token = current_fenceable_token;
2052                            break prev_value;
2053                        }
2054                        Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
2055                        Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
2056                            warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
2057                            continue;
2058                        }
2059                    }
2060                }
2061                None => {
2062                    self.compare_and_append(updates, self.upper)
2063                        .await
2064                        .map_err(|e| e.unwrap_fence_error())?;
2065                    break prev_value;
2066                }
2067            }
2068        };
2069        Ok(prev_value)
2070    }
2071
2072    /// Manually delete `key` from collection `T`.
2073    #[mz_ore::instrument]
2074    pub(crate) async fn debug_delete<T: Collection>(
2075        &mut self,
2076        key: T::Key,
2077    ) -> Result<(), CatalogError>
2078    where
2079        T::Key: PartialEq + Eq + Debug + Clone,
2080        T::Value: Debug,
2081    {
2082        loop {
2083            let key = key.clone();
2084            let snapshot = self.current_snapshot().await?;
2085            let trace = Trace::from_snapshot(snapshot);
2086            let collection_trace = T::collection_trace(trace);
2087            let mut retractions: Vec<_> = collection_trace
2088                .values
2089                .into_iter()
2090                .filter(|((k, _), _, diff)| {
2091                    soft_assert_eq_or_log!(*diff, Diff::ONE, "trace is consolidated");
2092                    &key == k
2093                })
2094                .map(|((k, v), _, _)| (T::update(k, v), Diff::MINUS_ONE))
2095                .collect();
2096
2097            // We must fence out all other catalogs, if we haven't already, since we are writing.
2098            match self.fenceable_token.generate_unfenced_token(self.mode)? {
2099                Some((fence_updates, current_fenceable_token)) => {
2100                    retractions.extend(fence_updates.clone());
2101                    match self.compare_and_append(retractions, self.upper).await {
2102                        Ok(_) => {
2103                            self.fenceable_token = current_fenceable_token;
2104                            break;
2105                        }
2106                        Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
2107                        Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
2108                            warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
2109                            continue;
2110                        }
2111                    }
2112                }
2113                None => {
2114                    self.compare_and_append(retractions, self.upper)
2115                        .await
2116                        .map_err(|e| e.unwrap_fence_error())?;
2117                    break;
2118                }
2119            }
2120        }
2121        Ok(())
2122    }
2123
2124    /// Generates a [`Vec<StateUpdate>`] that contain all updates to the catalog
2125    /// state.
2126    ///
2127    /// The output is consolidated and sorted by timestamp in ascending order and the current upper.
2128    async fn current_snapshot(
2129        &mut self,
2130    ) -> Result<impl IntoIterator<Item = StateUpdate> + '_, CatalogError> {
2131        self.sync_to_current_upper().await?;
2132        self.consolidate();
2133        Ok(self.snapshot.iter().cloned().map(|(kind, ts, diff)| {
2134            let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
2135            StateUpdate { kind, ts, diff }
2136        }))
2137    }
2138}