Skip to main content

mz_catalog/durable/
persist.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10#[cfg(test)]
11mod tests;
12
13use std::cmp::max;
14use std::collections::{BTreeMap, VecDeque};
15use std::fmt::Debug;
16use std::str::FromStr;
17use std::sync::{Arc, LazyLock};
18use std::time::{Duration, Instant};
19
20use async_trait::async_trait;
21use differential_dataflow::lattice::Lattice;
22use futures::{FutureExt, StreamExt};
23use itertools::Itertools;
24use mz_audit_log::VersionedEvent;
25use mz_ore::metrics::MetricsFutureExt;
26use mz_ore::now::EpochMillis;
27use mz_ore::{
28    soft_assert_eq_no_log, soft_assert_eq_or_log, soft_assert_ne_or_log, soft_assert_no_log,
29    soft_assert_or_log, soft_panic_or_log,
30};
31use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
32use mz_persist_client::cli::admin::{CATALOG_FORCE_COMPACTION_FUEL, CATALOG_FORCE_COMPACTION_WAIT};
33use mz_persist_client::critical::{CriticalReaderId, Opaque, SinceHandle};
34use mz_persist_client::error::UpperMismatch;
35use mz_persist_client::read::{Listen, ListenEvent, ReadHandle};
36use mz_persist_client::write::WriteHandle;
37use mz_persist_client::{Diagnostics, PersistClient, ShardId};
38use mz_persist_types::codec_impls::UnitSchema;
39use mz_proto::{RustType, TryFromProtoError};
40use mz_repr::Diff;
41use mz_storage_client::controller::PersistEpoch;
42use mz_storage_types::StorageDiff;
43use mz_storage_types::sources::SourceData;
44use sha2::Digest;
45use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
46use tracing::{debug, info, warn};
47use uuid::Uuid;
48
49use crate::durable::debug::{Collection, CollectionType, DebugCatalogState, Trace};
50use crate::durable::error::FenceError;
51use crate::durable::initialize::{
52    ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, SYSTEM_CONFIG_SYNCED_KEY, USER_VERSION_KEY,
53    WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
54};
55use crate::durable::metrics::Metrics;
56use crate::durable::objects::state_update::{
57    IntoStateUpdateKindJson, StateUpdate, StateUpdateKind, StateUpdateKindJson,
58    TryIntoStateUpdateKind,
59};
60use crate::durable::objects::{AuditLogKey, FenceToken, Snapshot};
61use crate::durable::transaction::TransactionBatch;
62use crate::durable::upgrade::upgrade;
63use crate::durable::{
64    AuditLogIterator, BootstrapArgs, CATALOG_CONTENT_VERSION_KEY, CatalogError,
65    DurableCatalogError, DurableCatalogState, Epoch, OpenableDurableCatalogState,
66    ReadOnlyDurableCatalogState, Transaction, initialize, persist_desc,
67};
68use crate::memory;
69
70/// New-type used to represent timestamps in persist.
71pub(crate) type Timestamp = mz_repr::Timestamp;
72
73/// The minimum value of an epoch.
74///
75/// # Safety
76/// `new_unchecked` is safe to call with a non-zero value.
77const MIN_EPOCH: Epoch = unsafe { Epoch::new_unchecked(1) };
78
79/// Human readable catalog shard name.
80const CATALOG_SHARD_NAME: &str = "catalog";
81
82/// [`CriticalReaderId`] for the catalog shard's own since hold, separate from
83/// [`PersistClient::CONTROLLER_CRITICAL_SINCE`].
84static CATALOG_CRITICAL_SINCE: LazyLock<CriticalReaderId> = LazyLock::new(|| {
85    "c55555555-6666-7777-8888-999999999999"
86        .parse()
87        .expect("valid CriticalReaderId")
88});
89
90/// Seed used to generate the persist shard ID for the catalog.
91const CATALOG_SEED: usize = 1;
92/// Legacy seed used to generate the persist shard ID for the upgrade shard. DO NOT REUSE.
93const _UPGRADE_SEED: usize = 2;
94/// Legacy seed used to generate the persist shard ID for builtin table migrations. DO NOT REUSE.
95pub const _BUILTIN_MIGRATION_SEED: usize = 3;
96/// Legacy seed used to generate the persist shard ID for the expression cache. DO NOT REUSE.
97pub const _EXPRESSION_CACHE_SEED: usize = 4;
98
99/// Durable catalog mode that dictates the effect of mutable operations.
100#[derive(Debug, Copy, Clone, Eq, PartialEq)]
101pub(crate) enum Mode {
102    /// Mutable operations are prohibited.
103    Readonly,
104    /// Mutable operations have an effect in-memory, but aren't persisted durably.
105    Savepoint,
106    /// Mutable operations have an effect in-memory and durably.
107    Writable,
108}
109
110/// Enum representing the fenced state of the catalog.
111#[derive(Debug)]
112pub(crate) enum FenceableToken {
113    /// The catalog is still initializing and learning about previously written fence tokens. This
114    /// state can be fenced if it encounters a larger deploy generation.
115    Initializing {
116        /// The largest fence token durably written to the catalog, if any.
117        durable_token: Option<FenceToken>,
118        /// This process's deploy generation.
119        current_deploy_generation: Option<u64>,
120    },
121    /// The current token has not been fenced.
122    Unfenced { current_token: FenceToken },
123    /// The current token has been fenced.
124    Fenced {
125        current_token: FenceToken,
126        fence_token: FenceToken,
127    },
128}
129
130impl FenceableToken {
131    /// Returns a new token.
132    fn new(current_deploy_generation: Option<u64>) -> Self {
133        Self::Initializing {
134            durable_token: None,
135            current_deploy_generation,
136        }
137    }
138
139    /// Returns the current token if it is not fenced, otherwise returns an error.
140    fn validate(&self) -> Result<Option<FenceToken>, FenceError> {
141        match self {
142            FenceableToken::Initializing { durable_token, .. } => Ok(durable_token.clone()),
143            FenceableToken::Unfenced { current_token, .. } => Ok(Some(current_token.clone())),
144            FenceableToken::Fenced {
145                current_token,
146                fence_token,
147            } => {
148                assert!(
149                    fence_token > current_token,
150                    "must be fenced by higher token; current={current_token:?}, fence={fence_token:?}"
151                );
152                if fence_token.deploy_generation > current_token.deploy_generation {
153                    Err(FenceError::DeployGeneration {
154                        current_generation: current_token.deploy_generation,
155                        fence_generation: fence_token.deploy_generation,
156                    })
157                } else {
158                    assert!(
159                        fence_token.epoch > current_token.epoch,
160                        "must be fenced by higher token; current={current_token:?}, fence={fence_token:?}"
161                    );
162                    Err(FenceError::Epoch {
163                        current_epoch: current_token.epoch,
164                        fence_epoch: fence_token.epoch,
165                    })
166                }
167            }
168        }
169    }
170
171    /// Returns the current token.
172    fn token(&self) -> Option<FenceToken> {
173        match self {
174            FenceableToken::Initializing { durable_token, .. } => durable_token.clone(),
175            FenceableToken::Unfenced { current_token, .. } => Some(current_token.clone()),
176            FenceableToken::Fenced { current_token, .. } => Some(current_token.clone()),
177        }
178    }
179
180    /// Returns `Err` if `token` fences out `self`, `Ok` otherwise.
181    fn maybe_fence(&mut self, token: FenceToken) -> Result<(), FenceError> {
182        match self {
183            FenceableToken::Initializing {
184                durable_token,
185                current_deploy_generation,
186                ..
187            } => {
188                match durable_token {
189                    Some(durable_token) => {
190                        *durable_token = max(durable_token.clone(), token.clone());
191                    }
192                    None => {
193                        *durable_token = Some(token.clone());
194                    }
195                }
196                if let Some(current_deploy_generation) = current_deploy_generation {
197                    if *current_deploy_generation < token.deploy_generation {
198                        *self = FenceableToken::Fenced {
199                            current_token: FenceToken {
200                                deploy_generation: *current_deploy_generation,
201                                epoch: token.epoch,
202                            },
203                            fence_token: token,
204                        };
205                        self.validate()?;
206                    }
207                }
208            }
209            FenceableToken::Unfenced { current_token } => {
210                if *current_token < token {
211                    *self = FenceableToken::Fenced {
212                        current_token: current_token.clone(),
213                        fence_token: token,
214                    };
215                    self.validate()?;
216                }
217            }
218            FenceableToken::Fenced { .. } => {
219                self.validate()?;
220            }
221        }
222
223        Ok(())
224    }
225
226    /// Returns a [`FenceableToken::Unfenced`] token and the updates to the catalog required to
227    /// transition to the `Unfenced` state if `self` is [`FenceableToken::Initializing`], otherwise
228    /// returns `None`.
229    fn generate_unfenced_token(
230        &self,
231        mode: Mode,
232    ) -> Result<Option<(Vec<(StateUpdateKind, Diff)>, FenceableToken)>, DurableCatalogError> {
233        let (durable_token, current_deploy_generation) = match self {
234            FenceableToken::Initializing {
235                durable_token,
236                current_deploy_generation,
237            } => (durable_token.clone(), current_deploy_generation.clone()),
238            FenceableToken::Unfenced { .. } | FenceableToken::Fenced { .. } => return Ok(None),
239        };
240
241        let mut fence_updates = Vec::with_capacity(2);
242
243        if let Some(durable_token) = &durable_token {
244            fence_updates.push((
245                StateUpdateKind::FenceToken(durable_token.clone()),
246                Diff::MINUS_ONE,
247            ));
248        }
249
250        let current_deploy_generation = current_deploy_generation
251            .or_else(|| durable_token.as_ref().map(|token| token.deploy_generation))
252            // We cannot initialize a catalog without a deploy generation.
253            .ok_or(DurableCatalogError::Uninitialized)?;
254        let mut current_epoch = durable_token
255            .map(|token| token.epoch)
256            .unwrap_or(MIN_EPOCH)
257            .get();
258        // Only writable catalogs attempt to increment the epoch.
259        if matches!(mode, Mode::Writable) {
260            current_epoch = current_epoch + 1;
261        }
262        let current_epoch = Epoch::new(current_epoch).expect("known to be non-zero");
263        let current_token = FenceToken {
264            deploy_generation: current_deploy_generation,
265            epoch: current_epoch,
266        };
267
268        fence_updates.push((
269            StateUpdateKind::FenceToken(current_token.clone()),
270            Diff::ONE,
271        ));
272
273        let current_fenceable_token = FenceableToken::Unfenced { current_token };
274
275        Ok(Some((fence_updates, current_fenceable_token)))
276    }
277}
278
279/// An error that can occur while executing [`PersistHandle::compare_and_append`].
280#[derive(Debug, thiserror::Error)]
281pub(crate) enum CompareAndAppendError {
282    #[error(transparent)]
283    Fence(#[from] FenceError),
284    /// Catalog encountered an upper mismatch when trying to write to the catalog. This should only
285    /// happen while trying to fence out other catalogs.
286    #[error(
287        "expected catalog upper {expected_upper:?} did not match actual catalog upper {actual_upper:?}"
288    )]
289    UpperMismatch {
290        expected_upper: Timestamp,
291        actual_upper: Timestamp,
292    },
293}
294
295impl CompareAndAppendError {
296    pub(crate) fn unwrap_fence_error(self) -> FenceError {
297        match self {
298            CompareAndAppendError::Fence(e) => e,
299            e @ CompareAndAppendError::UpperMismatch { .. } => {
300                panic!("unexpected upper mismatch: {e:?}")
301            }
302        }
303    }
304}
305
306impl From<UpperMismatch<Timestamp>> for CompareAndAppendError {
307    fn from(upper_mismatch: UpperMismatch<Timestamp>) -> Self {
308        Self::UpperMismatch {
309            expected_upper: antichain_to_timestamp(upper_mismatch.expected),
310            actual_upper: antichain_to_timestamp(upper_mismatch.current),
311        }
312    }
313}
314
315pub(crate) trait ApplyUpdate<T: IntoStateUpdateKindJson> {
316    /// Process and apply `update`.
317    ///
318    /// Returns `Some` if `update` should be cached in memory and `None` otherwise.
319    fn apply_update(
320        &mut self,
321        update: StateUpdate<T>,
322        current_fence_token: &mut FenceableToken,
323        metrics: &Arc<Metrics>,
324    ) -> Result<Option<StateUpdate<T>>, FenceError>;
325}
326
327/// A handle for interacting with the persist catalog shard.
328///
329/// The catalog shard is used in multiple different contexts, for example pre-open and post-open,
330/// but for all contexts the majority of the durable catalog's behavior is identical. This struct
331/// implements those behaviors that are identical while allowing the user to specify the different
332/// behaviors via generic parameters.
333///
334/// The behavior of the durable catalog can be different along one of two axes. The first is the
335/// format of each individual update, i.e. raw binary, the current protobuf version, previous
336/// protobuf versions, etc. The second axis is what to do with each individual update, for example
337/// before opening we cache all config updates but don't cache them after opening. These behaviors
338/// are customizable via the `T: TryIntoStateUpdateKind` and `U: ApplyUpdate<T>` generic parameters
339/// respectively.
340#[derive(Debug)]
341pub(crate) struct PersistHandle<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> {
342    /// The [`Mode`] that this catalog was opened in.
343    pub(crate) mode: Mode,
344    /// Since handle to control compaction.
345    since_handle: SinceHandle<SourceData, (), Timestamp, StorageDiff>,
346    /// Write handle to persist.
347    write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
348    /// Listener to catalog changes.
349    listen: Listen<SourceData, (), Timestamp, StorageDiff>,
350    /// Handle for connecting to persist.
351    persist_client: PersistClient,
352    /// Catalog shard ID.
353    shard_id: ShardId,
354    /// Cache of the most recent catalog snapshot.
355    ///
356    /// We use a tuple instead of [`StateUpdate`] to make consolidation easier.
357    pub(crate) snapshot: Vec<(T, Timestamp, Diff)>,
358    /// Applies custom processing, filtering, and fencing for each individual update.
359    update_applier: U,
360    /// The current upper of the persist shard.
361    pub(crate) upper: Timestamp,
362    /// The fence token of the catalog, if one exists.
363    fenceable_token: FenceableToken,
364    /// The semantic version of the current binary.
365    catalog_content_version: semver::Version,
366    /// Flag to indicate if bootstrap is complete.
367    bootstrap_complete: bool,
368    /// Metrics for the persist catalog.
369    metrics: Arc<Metrics>,
370    /// Snapshot size at the last amortized consolidation, used by
371    /// [`Self::maybe_consolidate`] to decide when to consolidate. Initialized
372    /// lazily on the first call.
373    size_at_last_consolidation: Option<usize>,
374}
375
376impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
377    /// Fetch the current upper of the catalog state.
378    #[mz_ore::instrument]
379    async fn current_upper(&mut self) -> Timestamp {
380        match self.mode {
381            Mode::Writable | Mode::Readonly => {
382                let upper = self.write_handle.fetch_recent_upper().await;
383                antichain_to_timestamp(upper.clone())
384            }
385            Mode::Savepoint => self.upper,
386        }
387    }
388
389    /// Appends `updates` iff the current global upper of the catalog is `self.upper`.
390    ///
391    /// Returns the next upper used to commit the transaction.
392    #[mz_ore::instrument]
393    pub(crate) async fn compare_and_append<S: IntoStateUpdateKindJson>(
394        &mut self,
395        updates: Vec<(S, Diff)>,
396        commit_ts: Timestamp,
397    ) -> Result<Timestamp, CompareAndAppendError> {
398        // The fencing check is expensive, so run it only with soft assertions enabled.
399        let contains_fence = if mz_ore::assert::soft_assertions_enabled() {
400            let parsed_updates: Vec<_> = updates
401                .clone()
402                .into_iter()
403                .filter_map(|(update, diff)| {
404                    let update: StateUpdateKindJson = update.into();
405                    let update = TryIntoStateUpdateKind::try_into(update).ok()?;
406                    Some((update, diff))
407                })
408                .collect();
409            let contains_retraction = parsed_updates.iter().any(|(update, diff)| {
410                matches!(update, StateUpdateKind::FenceToken(..)) && *diff == Diff::MINUS_ONE
411            });
412            let contains_addition = parsed_updates.iter().any(|(update, diff)| {
413                matches!(update, StateUpdateKind::FenceToken(..)) && *diff == Diff::ONE
414            });
415            Some(contains_retraction && contains_addition)
416        } else {
417            None
418        };
419
420        let updates = updates.into_iter().map(|(kind, diff)| {
421            let kind: StateUpdateKindJson = kind.into();
422            (
423                (Into::<SourceData>::into(kind), ()),
424                commit_ts,
425                diff.into_inner(),
426            )
427        });
428        let next_upper = commit_ts.step_forward();
429        self.compare_and_append_inner(updates, next_upper)
430            .await
431            .inspect_err(|e| {
432                // A compare-and-append failure means someone else must have written to the
433                // catalog. We expect to have been fenced out, since writing to the catalog without
434                // fencing other catalogs should be impossible. The one exception is if we are
435                // trying to fence other catalogs with this write.
436                if let Some(contains_fence) = contains_fence {
437                    soft_assert_or_log!(
438                        matches!(e, CompareAndAppendError::Fence(_)) || contains_fence,
439                        "encountered an upper mismatch on a non-fencing write"
440                    );
441                }
442            })?;
443
444        self.sync(next_upper).await?;
445        Ok(next_upper)
446    }
447
448    /// Compare-and-append `updates` to the catalog shard, advancing the upper to `next_upper`.
449    ///
450    /// On success, updating `self.upper` is left to the caller. The caller can thus decide whether
451    /// or not it needs to sync the catalog.
452    ///
453    /// # Panics
454    ///
455    /// Panics if not in `Writable` mode.
456    /// Panics if `next_upper` is not greater than `self.upper`.
457    async fn compare_and_append_inner(
458        &mut self,
459        updates: impl IntoIterator<Item = ((SourceData, ()), Timestamp, StorageDiff)>,
460        next_upper: Timestamp,
461    ) -> Result<(), CompareAndAppendError> {
462        assert_eq!(self.mode, Mode::Writable);
463        assert!(
464            next_upper > self.upper,
465            "next_upper ({next_upper}) not greater than current upper ({})",
466            self.upper,
467        );
468
469        let res = self
470            .write_handle
471            .compare_and_append(
472                updates,
473                Antichain::from_elem(self.upper),
474                Antichain::from_elem(next_upper),
475            )
476            .await
477            .expect("invalid usage");
478
479        if let Err(e @ UpperMismatch { .. }) = res {
480            // Most likely we were fenced out.
481            // Sync to the current upper to detect that.
482            self.sync_to_current_upper().await?;
483            return Err(e.into());
484        }
485
486        // Lag the shard's upper by 1 to keep it readable.
487        let downgrade_to = Antichain::from_elem(next_upper.saturating_sub(1));
488
489        // The since handle gives us the ability to fence out other downgraders using an opaque token.
490        // (See the method documentation for details.)
491        // That's not needed here, so we use the since handle's opaque token to avoid any comparison
492        // failures.
493        let opaque = self.since_handle.opaque().clone();
494        let downgrade = self
495            .since_handle
496            .maybe_compare_and_downgrade_since(&opaque, (&opaque, &downgrade_to))
497            .await;
498        if let Some(Err(e)) = downgrade {
499            soft_panic_or_log!("found opaque value {e:?}, but expected {opaque:?}");
500        }
501
502        Ok(())
503    }
504
505    /// Generates an iterator of [`StateUpdate`] that contain all unconsolidated updates to the
506    /// catalog state up to, and including, `as_of`.
507    #[mz_ore::instrument]
508    async fn snapshot_unconsolidated(&mut self) -> Vec<StateUpdate<StateUpdateKind>> {
509        let current_upper = self.current_upper().await;
510
511        let mut snapshot = Vec::new();
512        let mut read_handle = self.read_handle().await;
513        let as_of = as_of(&read_handle, current_upper);
514        let mut stream = Box::pin(
515            // We use `snapshot_and_stream` because it guarantees unconsolidated output.
516            read_handle
517                .snapshot_and_stream(Antichain::from_elem(as_of))
518                .await
519                .expect("we have advanced the restart_as_of by the since"),
520        );
521        while let Some(update) = stream.next().await {
522            snapshot.push(update)
523        }
524        read_handle.expire().await;
525        snapshot
526            .into_iter()
527            .map(Into::<StateUpdate<StateUpdateKindJson>>::into)
528            .map(|state_update| state_update.try_into().expect("kind decoding error"))
529            .collect()
530    }
531
532    /// Listen and apply all updates that are currently in persist.
533    ///
534    /// Returns an error if this instance has been fenced out.
535    #[mz_ore::instrument]
536    pub(crate) async fn sync_to_current_upper(&mut self) -> Result<(), FenceError> {
537        let upper = self.current_upper().await;
538        self.sync(upper).await
539    }
540
541    /// Listen and apply all updates up to `target_upper`.
542    ///
543    /// Returns an error if this instance has been fenced out.
544    #[mz_ore::instrument(level = "debug")]
545    pub(crate) async fn sync(&mut self, target_upper: Timestamp) -> Result<(), FenceError> {
546        self.metrics.syncs.inc();
547        let counter = self.metrics.sync_latency_seconds.clone();
548        self.sync_inner(target_upper)
549            .wall_time()
550            .inc_by(counter)
551            .await
552    }
553
554    #[mz_ore::instrument(level = "debug")]
555    async fn sync_inner(&mut self, target_upper: Timestamp) -> Result<(), FenceError> {
556        self.fenceable_token.validate()?;
557
558        // Savepoint catalogs do not yet know how to update themselves in response to concurrent
559        // writes from writer catalogs.
560        if self.mode == Mode::Savepoint {
561            self.upper = max(self.upper, target_upper);
562            return Ok(());
563        }
564
565        let mut updates: BTreeMap<_, Vec<_>> = BTreeMap::new();
566
567        // Reset the amortized consolidation tracker so it picks up the
568        // current snapshot size as its baseline.
569        self.size_at_last_consolidation = None;
570
571        while self.upper < target_upper {
572            let listen_events = self.listen.fetch_next().await;
573            for listen_event in listen_events {
574                match listen_event {
575                    ListenEvent::Progress(upper) => {
576                        debug!("synced up to {upper:?}");
577                        self.upper = antichain_to_timestamp(upper);
578                        // Attempt to apply updates in batches of a single timestamp. If another
579                        // catalog wrote a fence token at one timestamp and then updates in a new
580                        // format at a later timestamp, then we want to apply the fence token
581                        // before attempting to deserialize the new updates.
582                        while let Some((ts, updates)) = updates.pop_first() {
583                            assert!(ts < self.upper, "expected {} < {}", ts, self.upper);
584                            let updates = updates.into_iter().map(
585                                |update: StateUpdate<StateUpdateKindJson>| {
586                                    let kind =
587                                        T::try_from(update.kind).expect("kind decoding error");
588                                    StateUpdate {
589                                        kind,
590                                        ts: update.ts,
591                                        diff: update.diff,
592                                    }
593                                },
594                            );
595                            self.apply_updates(updates)?;
596                            self.maybe_consolidate();
597                        }
598                    }
599                    ListenEvent::Updates(batch_updates) => {
600                        for update in batch_updates {
601                            let update: StateUpdate<StateUpdateKindJson> = update.into();
602                            updates.entry(update.ts).or_default().push(update);
603                        }
604                    }
605                }
606            }
607        }
608        assert_eq!(updates, BTreeMap::new(), "all updates should be applied");
609        // Always consolidate at the end to ensure the snapshot is clean.
610        self.consolidate();
611        Ok(())
612    }
613
614    /// Apply a batch of updates and then consolidate the snapshot. This is the
615    /// typical entry point for callers that apply updates in a single batch.
616    ///
617    /// For hot loops that apply updates across many timestamps (e.g., `sync_inner`),
618    /// use `apply_updates` directly and call `consolidate()` periodically (e.g.,
619    /// on snapshot doubling) to bound memory while staying amortized O(N log N).
620    pub(crate) fn apply_updates_and_consolidate(
621        &mut self,
622        updates: impl IntoIterator<Item = StateUpdate<T>>,
623    ) -> Result<(), FenceError> {
624        self.apply_updates(updates)?;
625        self.consolidate();
626        Ok(())
627    }
628
629    /// Apply a batch of updates to the catalog state without consolidating.
630    ///
631    /// Does NOT consolidate the snapshot afterward. If you are calling this once,
632    /// prefer `apply_updates_and_consolidate`. This method exists for loops that
633    /// call it many times — consolidating per call would be O(K * N log N) instead
634    /// of O(N log N). Callers should consolidate periodically (e.g., on snapshot
635    /// doubling) to bound memory.
636    #[mz_ore::instrument(level = "debug")]
637    fn apply_updates(
638        &mut self,
639        updates: impl IntoIterator<Item = StateUpdate<T>>,
640    ) -> Result<(), FenceError> {
641        let mut updates: Vec<_> = updates
642            .into_iter()
643            .map(|StateUpdate { kind, ts, diff }| (kind, ts, diff))
644            .collect();
645
646        // This helps guarantee that for a single key, there is at most a single retraction and a
647        // single insertion per timestamp. Otherwise, we would need to match the retractions and
648        // insertions up by value and manually figure out what the end value should be.
649        differential_dataflow::consolidation::consolidate_updates(&mut updates);
650
651        // Updates must be applied in timestamp order. Within a timestamp retractions must be
652        // applied before insertions, or we might end up retracting the wrong value.
653        updates.sort_by(|(_, ts1, diff1), (_, ts2, diff2)| ts1.cmp(ts2).then(diff1.cmp(diff2)));
654
655        let mut errors = Vec::new();
656
657        for (kind, ts, diff) in updates {
658            if diff != Diff::ONE && diff != Diff::MINUS_ONE {
659                panic!("invalid update in consolidated trace: ({kind:?}, {ts:?}, {diff:?})");
660            }
661
662            match self.update_applier.apply_update(
663                StateUpdate { kind, ts, diff },
664                &mut self.fenceable_token,
665                &self.metrics,
666            ) {
667                Ok(Some(StateUpdate { kind, ts, diff })) => self.snapshot.push((kind, ts, diff)),
668                Ok(None) => {}
669                // Instead of returning immediately, we accumulate all the errors and return the one
670                // with the most information.
671                Err(err) => errors.push(err),
672            }
673        }
674
675        // Track the high-water mark of the unconsolidated snapshot size.
676        let len = i64::try_from(self.snapshot.len()).unwrap_or(i64::MAX);
677        if len > self.metrics.snapshot_max_entries.get() {
678            self.metrics.snapshot_max_entries.set(len);
679        }
680
681        errors.sort();
682        if let Some(err) = errors.into_iter().next() {
683            return Err(err);
684        }
685
686        Ok(())
687    }
688
689    /// Consolidate the snapshot if it has at least doubled in size since the
690    /// last consolidation. This amortizes the O(N log N) consolidation cost
691    /// over many small updates, keeping the total work O(N log N) rather than
692    /// O(K * N log N) for K timestamps.
693    fn maybe_consolidate(&mut self) {
694        let threshold = *self
695            .size_at_last_consolidation
696            // Use a minimum of 8 to avoid consolidating on every update when
697            // the snapshot is small or empty (since 0 * 2 = 0).
698            .get_or_insert_with(|| max(self.snapshot.len(), 8));
699        if self.snapshot.len() >= threshold * 2 {
700            self.consolidate();
701            self.size_at_last_consolidation = Some(self.snapshot.len());
702        }
703    }
704
705    #[mz_ore::instrument]
706    pub(crate) fn consolidate(&mut self) {
707        self.metrics.snapshot_consolidations.inc();
708        soft_assert_no_log!(
709            self.snapshot
710                .windows(2)
711                .all(|updates| updates[0].1 <= updates[1].1),
712            "snapshot should be sorted by timestamp, {:#?}",
713            self.snapshot
714        );
715
716        let new_ts = self
717            .snapshot
718            .last()
719            .map(|(_, ts, _)| *ts)
720            .unwrap_or_else(Timestamp::minimum);
721        for (_, ts, _) in &mut self.snapshot {
722            *ts = new_ts;
723        }
724        differential_dataflow::consolidation::consolidate_updates(&mut self.snapshot);
725    }
726
727    /// Execute and return the results of `f` on the current catalog trace.
728    ///
729    /// Will return an error if the catalog has been fenced out.
730    async fn with_trace<R>(
731        &mut self,
732        f: impl FnOnce(&Vec<(T, Timestamp, Diff)>) -> Result<R, CatalogError>,
733    ) -> Result<R, CatalogError> {
734        self.sync_to_current_upper().await?;
735        f(&self.snapshot)
736    }
737
738    /// Open a read handle to the catalog.
739    async fn read_handle(&self) -> ReadHandle<SourceData, (), Timestamp, StorageDiff> {
740        self.persist_client
741            .open_leased_reader(
742                self.shard_id,
743                Arc::new(persist_desc()),
744                Arc::new(UnitSchema::default()),
745                Diagnostics {
746                    shard_name: CATALOG_SHARD_NAME.to_string(),
747                    handle_purpose: "openable durable catalog state temporary reader".to_string(),
748                },
749                USE_CRITICAL_SINCE_CATALOG.get(self.persist_client.dyncfgs()),
750            )
751            .await
752            .expect("invalid usage")
753    }
754
755    /// Politely releases all external resources that can only be released in an async context.
756    async fn expire(self: Box<Self>) {
757        self.write_handle.expire().await;
758        self.listen.expire().await;
759    }
760}
761
762impl<U: ApplyUpdate<StateUpdateKind>> PersistHandle<StateUpdateKind, U> {
763    /// Execute and return the results of `f` on the current catalog snapshot.
764    ///
765    /// Will return an error if the catalog has been fenced out.
766    async fn with_snapshot<T>(
767        &mut self,
768        f: impl FnOnce(Snapshot) -> Result<T, CatalogError>,
769    ) -> Result<T, CatalogError> {
770        fn apply<K, V>(map: &mut BTreeMap<K, V>, key: &K, value: &V, diff: Diff)
771        where
772            K: Ord + Clone,
773            V: Ord + Clone + Debug,
774        {
775            let key = key.clone();
776            let value = value.clone();
777            if diff == Diff::ONE {
778                let prev = map.insert(key, value);
779                assert_eq!(
780                    prev, None,
781                    "values must be explicitly retracted before inserting a new value"
782                );
783            } else if diff == Diff::MINUS_ONE {
784                let prev = map.remove(&key);
785                assert_eq!(
786                    prev,
787                    Some(value),
788                    "retraction does not match existing value"
789                );
790            }
791        }
792
793        self.with_trace(|trace| {
794            let mut snapshot = Snapshot::empty();
795            for (kind, ts, diff) in trace {
796                let diff = *diff;
797                if diff != Diff::ONE && diff != Diff::MINUS_ONE {
798                    panic!("invalid update in consolidated trace: ({kind:?}, {ts:?}, {diff:?})");
799                }
800
801                match kind {
802                    StateUpdateKind::AuditLog(_key, ()) => {
803                        // Ignore for snapshots.
804                    }
805                    StateUpdateKind::Cluster(key, value) => {
806                        apply(&mut snapshot.clusters, key, value, diff);
807                    }
808                    StateUpdateKind::ClusterReplica(key, value) => {
809                        apply(&mut snapshot.cluster_replicas, key, value, diff);
810                    }
811                    StateUpdateKind::Comment(key, value) => {
812                        apply(&mut snapshot.comments, key, value, diff);
813                    }
814                    StateUpdateKind::Config(key, value) => {
815                        apply(&mut snapshot.configs, key, value, diff);
816                    }
817                    StateUpdateKind::Database(key, value) => {
818                        apply(&mut snapshot.databases, key, value, diff);
819                    }
820                    StateUpdateKind::DefaultPrivilege(key, value) => {
821                        apply(&mut snapshot.default_privileges, key, value, diff);
822                    }
823                    StateUpdateKind::FenceToken(_token) => {
824                        // Ignore for snapshots.
825                    }
826                    StateUpdateKind::IdAllocator(key, value) => {
827                        apply(&mut snapshot.id_allocator, key, value, diff);
828                    }
829                    StateUpdateKind::IntrospectionSourceIndex(key, value) => {
830                        apply(&mut snapshot.introspection_sources, key, value, diff);
831                    }
832                    StateUpdateKind::Item(key, value) => {
833                        apply(&mut snapshot.items, key, value, diff);
834                    }
835                    StateUpdateKind::NetworkPolicy(key, value) => {
836                        apply(&mut snapshot.network_policies, key, value, diff);
837                    }
838                    StateUpdateKind::Role(key, value) => {
839                        apply(&mut snapshot.roles, key, value, diff);
840                    }
841                    StateUpdateKind::Schema(key, value) => {
842                        apply(&mut snapshot.schemas, key, value, diff);
843                    }
844                    StateUpdateKind::Setting(key, value) => {
845                        apply(&mut snapshot.settings, key, value, diff);
846                    }
847                    StateUpdateKind::SourceReferences(key, value) => {
848                        apply(&mut snapshot.source_references, key, value, diff);
849                    }
850                    StateUpdateKind::SystemConfiguration(key, value) => {
851                        apply(&mut snapshot.system_configurations, key, value, diff);
852                    }
853                    StateUpdateKind::ClusterSystemConfiguration(key, value) => {
854                        apply(
855                            &mut snapshot.cluster_system_configurations,
856                            key,
857                            value,
858                            diff,
859                        );
860                    }
861                    StateUpdateKind::ReplicaSystemConfiguration(key, value) => {
862                        apply(
863                            &mut snapshot.replica_system_configurations,
864                            key,
865                            value,
866                            diff,
867                        );
868                    }
869                    StateUpdateKind::SystemObjectMapping(key, value) => {
870                        apply(&mut snapshot.system_object_mappings, key, value, diff);
871                    }
872                    StateUpdateKind::SystemPrivilege(key, value) => {
873                        apply(&mut snapshot.system_privileges, key, value, diff);
874                    }
875                    StateUpdateKind::StorageCollectionMetadata(key, value) => {
876                        apply(&mut snapshot.storage_collection_metadata, key, value, diff);
877                    }
878                    StateUpdateKind::UnfinalizedShard(key, ()) => {
879                        apply(&mut snapshot.unfinalized_shards, key, &(), diff);
880                    }
881                    StateUpdateKind::TxnWalShard((), value) => {
882                        apply(&mut snapshot.txn_wal_shard, &(), value, diff);
883                    }
884                    StateUpdateKind::RoleAuth(key, value) => {
885                        apply(&mut snapshot.role_auth, key, value, diff);
886                    }
887                }
888            }
889            f(snapshot)
890        })
891        .await
892    }
893
894    /// Generates an iterator of [`StateUpdate`] that contain all updates to the catalog
895    /// state.
896    ///
897    /// The output is fetched directly from persist instead of the in-memory cache.
898    ///
899    /// The output is consolidated and sorted by timestamp in ascending order.
900    #[mz_ore::instrument(level = "debug")]
901    async fn persist_snapshot(&self) -> impl Iterator<Item = StateUpdate> + DoubleEndedIterator {
902        let mut read_handle = self.read_handle().await;
903        let as_of = as_of(&read_handle, self.upper);
904        let snapshot = snapshot_binary(&mut read_handle, as_of, &self.metrics)
905            .await
906            .map(|update| update.try_into().expect("kind decoding error"));
907        read_handle.expire().await;
908        snapshot
909    }
910}
911
912/// Applies updates for an unopened catalog.
913#[derive(Debug)]
914pub(crate) struct UnopenedCatalogStateInner {
915    /// A cache of the config collection of the catalog.
916    configs: BTreeMap<String, u64>,
917    /// A cache of the settings collection of the catalog.
918    settings: BTreeMap<String, String>,
919}
920
921impl UnopenedCatalogStateInner {
922    fn new() -> UnopenedCatalogStateInner {
923        UnopenedCatalogStateInner {
924            configs: BTreeMap::new(),
925            settings: BTreeMap::new(),
926        }
927    }
928}
929
930impl ApplyUpdate<StateUpdateKindJson> for UnopenedCatalogStateInner {
931    fn apply_update(
932        &mut self,
933        update: StateUpdate<StateUpdateKindJson>,
934        current_fence_token: &mut FenceableToken,
935        _metrics: &Arc<Metrics>,
936    ) -> Result<Option<StateUpdate<StateUpdateKindJson>>, FenceError> {
937        if !update.kind.is_audit_log() && update.kind.is_always_deserializable() {
938            let kind = TryInto::try_into(&update.kind).expect("kind is known to be deserializable");
939            match (kind, update.diff) {
940                (StateUpdateKind::Config(key, value), Diff::ONE) => {
941                    let prev = self.configs.insert(key.key, value.value);
942                    assert_eq!(
943                        prev, None,
944                        "values must be explicitly retracted before inserting a new value"
945                    );
946                }
947                (StateUpdateKind::Config(key, value), Diff::MINUS_ONE) => {
948                    let prev = self.configs.remove(&key.key);
949                    assert_eq!(
950                        prev,
951                        Some(value.value),
952                        "retraction does not match existing value"
953                    );
954                }
955                (StateUpdateKind::Setting(key, value), Diff::ONE) => {
956                    let prev = self.settings.insert(key.name, value.value);
957                    assert_eq!(
958                        prev, None,
959                        "values must be explicitly retracted before inserting a new value"
960                    );
961                }
962                (StateUpdateKind::Setting(key, value), Diff::MINUS_ONE) => {
963                    let prev = self.settings.remove(&key.name);
964                    assert_eq!(
965                        prev,
966                        Some(value.value),
967                        "retraction does not match existing value"
968                    );
969                }
970                (StateUpdateKind::FenceToken(fence_token), Diff::ONE) => {
971                    current_fence_token.maybe_fence(fence_token)?;
972                }
973                _ => {}
974            }
975        }
976
977        Ok(Some(update))
978    }
979}
980
981/// A Handle to an unopened catalog stored in persist. The unopened catalog can serve `Config` data,
982/// `Setting` data, or the current epoch. All other catalog data may be un-migrated and should not
983/// be read until the catalog has been opened. The [`UnopenedPersistCatalogState`] is responsible
984/// for opening the catalog, see [`OpenableDurableCatalogState::open`] for more details.
985///
986/// Production users should call [`Self::expire`] before dropping an [`UnopenedPersistCatalogState`]
987/// so that it can expire its leases. If/when rust gets AsyncDrop, this will be done automatically.
988pub(crate) type UnopenedPersistCatalogState =
989    PersistHandle<StateUpdateKindJson, UnopenedCatalogStateInner>;
990
991impl UnopenedPersistCatalogState {
992    /// Create a new [`UnopenedPersistCatalogState`] to the catalog state associated with
993    /// `organization_id`.
994    ///
995    /// All usages of the persist catalog must go through this function. That includes the
996    /// catalog-debug tool, the adapter's catalog, etc.
997    #[mz_ore::instrument]
998    pub(crate) async fn new(
999        persist_client: PersistClient,
1000        organization_id: Uuid,
1001        version: semver::Version,
1002        deploy_generation: Option<u64>,
1003        metrics: Arc<Metrics>,
1004    ) -> Result<UnopenedPersistCatalogState, DurableCatalogError> {
1005        let catalog_shard_id = shard_id(organization_id, CATALOG_SEED);
1006        debug!(?catalog_shard_id, "new persist backed catalog state");
1007
1008        // Check the catalog shard version to ensure that we are compatible with the persist
1009        // data format. This lets us return an error gracefully, rather than panicking later in
1010        // persist.
1011        let version_in_catalog_shard =
1012            fetch_catalog_shard_version(&persist_client, catalog_shard_id).await;
1013        if let Some(version_in_catalog_shard) = version_in_catalog_shard {
1014            if !mz_persist_client::cfg::code_can_write_data(&version, &version_in_catalog_shard) {
1015                return Err(DurableCatalogError::IncompatiblePersistVersion {
1016                    found_version: version_in_catalog_shard,
1017                    catalog_version: version,
1018                });
1019            }
1020        }
1021
1022        let open_handles_start = Instant::now();
1023        info!("startup: envd serve: catalog init: open handles beginning");
1024        let since_handle = persist_client
1025            .open_critical_since(
1026                catalog_shard_id,
1027                CATALOG_CRITICAL_SINCE.clone(),
1028                Opaque::encode(&i64::MIN),
1029                Diagnostics {
1030                    shard_name: CATALOG_SHARD_NAME.to_string(),
1031                    handle_purpose: "durable catalog state critical since".to_string(),
1032                },
1033            )
1034            .await
1035            .expect("invalid usage");
1036
1037        let (mut write_handle, mut read_handle) = persist_client
1038            .open(
1039                catalog_shard_id,
1040                Arc::new(persist_desc()),
1041                Arc::new(UnitSchema::default()),
1042                Diagnostics {
1043                    shard_name: CATALOG_SHARD_NAME.to_string(),
1044                    handle_purpose: "durable catalog state handles".to_string(),
1045                },
1046                USE_CRITICAL_SINCE_CATALOG.get(persist_client.dyncfgs()),
1047            )
1048            .await
1049            .expect("invalid usage");
1050        info!(
1051            "startup: envd serve: catalog init: open handles complete in {:?}",
1052            open_handles_start.elapsed()
1053        );
1054
1055        // Commit an empty write at the minimum timestamp so the catalog is always readable.
1056        let upper = {
1057            const EMPTY_UPDATES: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
1058            let upper = Antichain::from_elem(Timestamp::minimum());
1059            let next_upper = Timestamp::minimum().step_forward();
1060            match write_handle
1061                .compare_and_append(EMPTY_UPDATES, upper, Antichain::from_elem(next_upper))
1062                .await
1063                .expect("invalid usage")
1064            {
1065                Ok(()) => next_upper,
1066                Err(mismatch) => antichain_to_timestamp(mismatch.current),
1067            }
1068        };
1069
1070        let snapshot_start = Instant::now();
1071        info!("startup: envd serve: catalog init: snapshot beginning");
1072        let as_of = as_of(&read_handle, upper);
1073        let snapshot: Vec<_> = snapshot_binary(&mut read_handle, as_of, &metrics)
1074            .await
1075            .map(|StateUpdate { kind, ts, diff }| (kind, ts, diff))
1076            .collect();
1077        let listen = read_handle
1078            .listen(Antichain::from_elem(as_of))
1079            .await
1080            .expect("invalid usage");
1081        info!(
1082            "startup: envd serve: catalog init: snapshot complete in {:?}",
1083            snapshot_start.elapsed()
1084        );
1085
1086        let mut handle = UnopenedPersistCatalogState {
1087            // Unopened catalogs are always writeable until they're opened in an explicit mode.
1088            mode: Mode::Writable,
1089            since_handle,
1090            write_handle,
1091            listen,
1092            persist_client,
1093            shard_id: catalog_shard_id,
1094            // Initialize empty in-memory state.
1095            snapshot: Vec::new(),
1096            update_applier: UnopenedCatalogStateInner::new(),
1097            upper,
1098            fenceable_token: FenceableToken::new(deploy_generation),
1099            catalog_content_version: version,
1100            bootstrap_complete: false,
1101            metrics,
1102            size_at_last_consolidation: None,
1103        };
1104        // If the snapshot is not consolidated, and we see multiple epoch values while applying the
1105        // updates, then we might accidentally fence ourselves out.
1106        soft_assert_no_log!(
1107            snapshot.iter().all(|(_, _, diff)| *diff == Diff::ONE),
1108            "snapshot should be consolidated: {snapshot:#?}"
1109        );
1110
1111        let apply_start = Instant::now();
1112        info!("startup: envd serve: catalog init: apply updates beginning");
1113        let updates = snapshot
1114            .into_iter()
1115            .map(|(kind, ts, diff)| StateUpdate { kind, ts, diff });
1116        handle.apply_updates_and_consolidate(updates)?;
1117        info!(
1118            "startup: envd serve: catalog init: apply updates complete in {:?}",
1119            apply_start.elapsed()
1120        );
1121
1122        // Validate that the binary version of the current process is not less than any binary
1123        // version that has written to the catalog.
1124        // This condition is only checked once, right here. If a new process comes along with a
1125        // higher version, it must fence this process out with one of the existing fencing
1126        // mechanisms.
1127        if let Some(found_version) = handle.get_catalog_content_version().await? {
1128            // Use cmp_precedence() to ignore build metadata per SemVer 2.0.0 spec
1129            if handle
1130                .catalog_content_version
1131                .cmp_precedence(&found_version)
1132                == std::cmp::Ordering::Less
1133            {
1134                return Err(DurableCatalogError::IncompatiblePersistVersion {
1135                    found_version,
1136                    catalog_version: handle.catalog_content_version,
1137                });
1138            }
1139        }
1140
1141        Ok(handle)
1142    }
1143
1144    #[mz_ore::instrument]
1145    async fn open_inner(
1146        mut self,
1147        mode: Mode,
1148        initial_ts: Timestamp,
1149        bootstrap_args: &BootstrapArgs,
1150    ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1151        // It would be nice to use `initial_ts` here, but it comes from the system clock, not the
1152        // timestamp oracle.
1153        let mut commit_ts = self.upper;
1154        self.mode = mode;
1155
1156        // Validate the current deploy generation.
1157        match (&self.mode, &self.fenceable_token) {
1158            (_, FenceableToken::Unfenced { .. } | FenceableToken::Fenced { .. }) => {
1159                return Err(DurableCatalogError::Internal(
1160                    "catalog should not have fenced before opening".to_string(),
1161                )
1162                .into());
1163            }
1164            (
1165                Mode::Writable | Mode::Savepoint,
1166                FenceableToken::Initializing {
1167                    current_deploy_generation: None,
1168                    ..
1169                },
1170            ) => {
1171                return Err(DurableCatalogError::Internal(format!(
1172                    "cannot open in mode '{:?}' without a deploy generation",
1173                    self.mode,
1174                ))
1175                .into());
1176            }
1177            _ => {}
1178        }
1179
1180        let read_only = matches!(self.mode, Mode::Readonly);
1181
1182        // Fence out previous catalogs.
1183        loop {
1184            self.sync_to_current_upper().await?;
1185            commit_ts = max(commit_ts, self.upper);
1186            let (fence_updates, current_fenceable_token) = self
1187                .fenceable_token
1188                .generate_unfenced_token(self.mode)?
1189                .ok_or_else(|| {
1190                    DurableCatalogError::Internal(
1191                        "catalog should not have fenced before opening".to_string(),
1192                    )
1193                })?;
1194            debug!(
1195                ?self.upper,
1196                ?self.fenceable_token,
1197                ?current_fenceable_token,
1198                "fencing previous catalogs"
1199            );
1200            if matches!(self.mode, Mode::Writable) {
1201                match self
1202                    .compare_and_append(fence_updates.clone(), commit_ts)
1203                    .await
1204                {
1205                    Ok(upper) => {
1206                        commit_ts = upper;
1207                    }
1208                    Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
1209                    Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
1210                        warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
1211                        continue;
1212                    }
1213                }
1214            }
1215            self.fenceable_token = current_fenceable_token;
1216            break;
1217        }
1218
1219        if matches!(self.mode, Mode::Writable) {
1220            // One-time migration: The catalog previously used `CONTROLLER_CRITICAL_SINCE` for its
1221            // since handle. Now it uses its own `CATALOG_CRITICAL_SINCE`, to free
1222            // `CONTROLLER_CRITICAL_SINCE` up for the storage controller. The catalog and
1223            // controller handles differ in the `Opaque` codec, so we need a migration.
1224            //
1225            // TODO: Remove this once we don't support upgrading from v26 anymore.
1226            let mut controller_handle = self
1227                .persist_client
1228                .open_critical_since::<SourceData, (), Timestamp, StorageDiff>(
1229                    self.shard_id,
1230                    PersistClient::CONTROLLER_CRITICAL_SINCE,
1231                    Opaque::encode(&i64::MIN),
1232                    Diagnostics {
1233                        shard_name: CATALOG_SHARD_NAME.to_string(),
1234                        handle_purpose: "durable catalog state critical since (migration)"
1235                            .to_string(),
1236                    },
1237                )
1238                .await
1239                .expect("invalid usage");
1240
1241            let since = controller_handle.since().clone();
1242            let res = controller_handle
1243                .compare_and_downgrade_since(
1244                    &Opaque::encode(&i64::MIN),
1245                    (&Opaque::encode(&PersistEpoch::default()), &since),
1246                )
1247                .await;
1248            match res {
1249                Ok(_) => info!("migrated Opaque of catalog since handle"),
1250                Err(_) => { /* critical since was already migrated */ }
1251            }
1252        }
1253
1254        let is_initialized = self.is_initialized_inner();
1255        if !matches!(self.mode, Mode::Writable) && !is_initialized {
1256            return Err(CatalogError::Durable(DurableCatalogError::NotWritable(
1257                format!(
1258                    "catalog tables do not exist; will not create in {:?} mode",
1259                    self.mode
1260                ),
1261            )));
1262        }
1263        soft_assert_ne_or_log!(self.upper, Timestamp::minimum());
1264
1265        // Remove all audit log entries.
1266        let (audit_logs, snapshot): (Vec<_>, Vec<_>) = self
1267            .snapshot
1268            .into_iter()
1269            .partition(|(update, _, _)| update.is_audit_log());
1270        self.snapshot = snapshot;
1271        let audit_log_count = audit_logs.iter().map(|(_, _, diff)| diff).sum::<Diff>();
1272        let audit_log_handle = AuditLogIterator::new(audit_logs);
1273
1274        // Perform data migrations.
1275        if is_initialized && !read_only {
1276            commit_ts = upgrade(&mut self, commit_ts).await?;
1277        }
1278
1279        debug!(
1280            ?is_initialized,
1281            ?self.upper,
1282            "initializing catalog state"
1283        );
1284        let mut catalog = PersistCatalogState {
1285            mode: self.mode,
1286            since_handle: self.since_handle,
1287            write_handle: self.write_handle,
1288            listen: self.listen,
1289            persist_client: self.persist_client,
1290            shard_id: self.shard_id,
1291            upper: self.upper,
1292            fenceable_token: self.fenceable_token,
1293            // Initialize empty in-memory state.
1294            snapshot: Vec::new(),
1295            update_applier: CatalogStateInner::new(),
1296            catalog_content_version: self.catalog_content_version,
1297            bootstrap_complete: false,
1298            metrics: self.metrics,
1299            size_at_last_consolidation: None,
1300        };
1301        catalog.metrics.collection_entries.reset();
1302        // Normally, `collection_entries` is updated in `apply_updates`. The audit log updates skip
1303        // over that function so we manually update it here.
1304        catalog
1305            .metrics
1306            .collection_entries
1307            .with_label_values(&[&CollectionType::AuditLog.to_string()])
1308            .add(audit_log_count.into_inner());
1309        let updates = self.snapshot.into_iter().map(|(kind, ts, diff)| {
1310            let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
1311            StateUpdate { kind, ts, diff }
1312        });
1313        catalog.apply_updates_and_consolidate(updates)?;
1314
1315        let catalog_content_version = catalog.catalog_content_version.to_string();
1316        let txn = if is_initialized {
1317            let mut txn = catalog.transaction().await?;
1318
1319            // Ad-hoc migration: Initialize the `migration_version` expected by adapter to be
1320            // present in existing catalogs.
1321            //
1322            // Note: Need to exclude read-only catalog mode here, because in that mode all
1323            // transactions are expected to be no-ops.
1324            // TODO: remove this once we only support upgrades from version >= 0.164
1325            if txn.get_setting("migration_version".into()).is_none() && mode != Mode::Readonly {
1326                let old_version = txn.get_catalog_content_version();
1327                txn.set_setting("migration_version".into(), old_version.map(Into::into))?;
1328            }
1329
1330            txn.set_catalog_content_version(catalog_content_version)?;
1331            txn
1332        } else {
1333            soft_assert_eq_no_log!(
1334                catalog
1335                    .snapshot
1336                    .iter()
1337                    .filter(|(kind, _, _)| !matches!(kind, StateUpdateKind::FenceToken(_)))
1338                    .count(),
1339                0,
1340                "trace should not contain any updates for an uninitialized catalog: {:#?}",
1341                catalog.snapshot
1342            );
1343
1344            let mut txn = catalog.transaction().await?;
1345            initialize::initialize(
1346                &mut txn,
1347                bootstrap_args,
1348                initial_ts.into(),
1349                catalog_content_version,
1350            )
1351            .await?;
1352            txn
1353        };
1354
1355        if read_only {
1356            let (txn_batch, _) = txn.into_parts();
1357            // The upper here doesn't matter because we are only applying the updates in memory.
1358            let updates = StateUpdate::from_txn_batch_ts(txn_batch, catalog.upper);
1359            catalog.apply_updates_and_consolidate(updates)?;
1360        } else {
1361            txn.commit_internal(commit_ts).await?;
1362        }
1363
1364        if matches!(catalog.mode, Mode::Writable) {
1365            let write_handle = catalog
1366                .persist_client
1367                .open_writer::<SourceData, (), Timestamp, i64>(
1368                    catalog.write_handle.shard_id(),
1369                    Arc::new(persist_desc()),
1370                    Arc::new(UnitSchema::default()),
1371                    Diagnostics {
1372                        shard_name: CATALOG_SHARD_NAME.to_string(),
1373                        handle_purpose: "compact catalog".to_string(),
1374                    },
1375                )
1376                .await
1377                .expect("invalid usage");
1378            let fuel = CATALOG_FORCE_COMPACTION_FUEL.handle(catalog.persist_client.dyncfgs());
1379            let wait = CATALOG_FORCE_COMPACTION_WAIT.handle(catalog.persist_client.dyncfgs());
1380            // We're going to gradually turn this on via dyncfgs. Run it in a task so that it
1381            // doesn't block startup.
1382            let _task = mz_ore::task::spawn(|| "catalog::force_shard_compaction", async move {
1383                let () =
1384                    mz_persist_client::cli::admin::dangerous_force_compaction_and_break_pushdown(
1385                        &write_handle,
1386                        || fuel.get(),
1387                        || wait.get(),
1388                    )
1389                    .await;
1390            });
1391        }
1392
1393        Ok((Box::new(catalog), audit_log_handle))
1394    }
1395
1396    /// Reports if the catalog state has been initialized.
1397    ///
1398    /// NOTE: This is the answer as of the last call to [`PersistHandle::sync`] or [`PersistHandle::sync_to_current_upper`],
1399    /// not necessarily what is currently in persist.
1400    #[mz_ore::instrument]
1401    fn is_initialized_inner(&self) -> bool {
1402        !self.update_applier.configs.is_empty()
1403    }
1404
1405    /// Get the current value of config `key`.
1406    ///
1407    /// Some configs need to be read before the catalog is opened for bootstrapping.
1408    #[mz_ore::instrument]
1409    async fn get_current_config(&mut self, key: &str) -> Result<Option<u64>, DurableCatalogError> {
1410        self.sync_to_current_upper().await?;
1411        Ok(self.update_applier.configs.get(key).cloned())
1412    }
1413
1414    /// Get the user version of this instance.
1415    ///
1416    /// The user version is used to determine if a migration is needed.
1417    #[mz_ore::instrument]
1418    pub(crate) async fn get_user_version(&mut self) -> Result<Option<u64>, DurableCatalogError> {
1419        self.get_current_config(USER_VERSION_KEY).await
1420    }
1421
1422    /// Get the current value of setting `name`.
1423    ///
1424    /// Some settings need to be read before the catalog is opened for bootstrapping.
1425    #[mz_ore::instrument]
1426    async fn get_current_setting(
1427        &mut self,
1428        name: &str,
1429    ) -> Result<Option<String>, DurableCatalogError> {
1430        self.sync_to_current_upper().await?;
1431        Ok(self.update_applier.settings.get(name).cloned())
1432    }
1433
1434    /// Get the catalog content version.
1435    ///
1436    /// The catalog content version is the semantic version of the most recent binary that wrote to
1437    /// the catalog.
1438    #[mz_ore::instrument]
1439    async fn get_catalog_content_version(
1440        &mut self,
1441    ) -> Result<Option<semver::Version>, DurableCatalogError> {
1442        let version = self
1443            .get_current_setting(CATALOG_CONTENT_VERSION_KEY)
1444            .await?;
1445        let version = version.map(|version| version.parse().expect("invalid version persisted"));
1446        Ok(version)
1447    }
1448}
1449
1450#[async_trait]
1451impl OpenableDurableCatalogState for UnopenedPersistCatalogState {
1452    #[mz_ore::instrument]
1453    async fn open_savepoint(
1454        mut self: Box<Self>,
1455        initial_ts: Timestamp,
1456        bootstrap_args: &BootstrapArgs,
1457    ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1458        self.open_inner(Mode::Savepoint, initial_ts, bootstrap_args)
1459            .boxed()
1460            .await
1461    }
1462
1463    #[mz_ore::instrument]
1464    async fn open_read_only(
1465        mut self: Box<Self>,
1466        bootstrap_args: &BootstrapArgs,
1467    ) -> Result<Box<dyn DurableCatalogState>, CatalogError> {
1468        self.open_inner(Mode::Readonly, EpochMillis::MIN.into(), bootstrap_args)
1469            .boxed()
1470            .await
1471            .map(|(catalog, _)| catalog)
1472    }
1473
1474    #[mz_ore::instrument]
1475    async fn open(
1476        mut self: Box<Self>,
1477        initial_ts: Timestamp,
1478        bootstrap_args: &BootstrapArgs,
1479    ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1480        self.open_inner(Mode::Writable, initial_ts, bootstrap_args)
1481            .boxed()
1482            .await
1483    }
1484
1485    #[mz_ore::instrument(level = "debug")]
1486    async fn open_debug(mut self: Box<Self>) -> Result<DebugCatalogState, CatalogError> {
1487        Ok(DebugCatalogState(*self))
1488    }
1489
1490    #[mz_ore::instrument]
1491    async fn is_initialized(&mut self) -> Result<bool, CatalogError> {
1492        self.sync_to_current_upper().await?;
1493        Ok(self.is_initialized_inner())
1494    }
1495
1496    #[mz_ore::instrument]
1497    async fn epoch(&mut self) -> Result<Epoch, CatalogError> {
1498        self.sync_to_current_upper().await?;
1499        self.fenceable_token
1500            .validate()?
1501            .map(|token| token.epoch)
1502            .ok_or(CatalogError::Durable(DurableCatalogError::Uninitialized))
1503    }
1504
1505    #[mz_ore::instrument]
1506    async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError> {
1507        self.sync_to_current_upper().await?;
1508        self.fenceable_token
1509            .token()
1510            .map(|token| token.deploy_generation)
1511            .ok_or(CatalogError::Durable(DurableCatalogError::Uninitialized))
1512    }
1513
1514    #[mz_ore::instrument(level = "debug")]
1515    async fn get_0dt_deployment_max_wait(&mut self) -> Result<Option<Duration>, CatalogError> {
1516        let value = self
1517            .get_current_config(WITH_0DT_DEPLOYMENT_MAX_WAIT)
1518            .await?;
1519        match value {
1520            None => Ok(None),
1521            Some(millis) => Ok(Some(Duration::from_millis(millis))),
1522        }
1523    }
1524
1525    #[mz_ore::instrument(level = "debug")]
1526    async fn get_0dt_deployment_ddl_check_interval(
1527        &mut self,
1528    ) -> Result<Option<Duration>, CatalogError> {
1529        let value = self
1530            .get_current_config(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL)
1531            .await?;
1532        match value {
1533            None => Ok(None),
1534            Some(millis) => Ok(Some(Duration::from_millis(millis))),
1535        }
1536    }
1537
1538    #[mz_ore::instrument(level = "debug")]
1539    async fn get_enable_0dt_deployment_panic_after_timeout(
1540        &mut self,
1541    ) -> Result<Option<bool>, CatalogError> {
1542        let value = self
1543            .get_current_config(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT)
1544            .await?;
1545        match value {
1546            None => Ok(None),
1547            Some(0) => Ok(Some(false)),
1548            Some(1) => Ok(Some(true)),
1549            Some(v) => Err(
1550                DurableCatalogError::from(TryFromProtoError::UnknownEnumVariant(format!(
1551                    "{v} is not a valid boolean value"
1552                )))
1553                .into(),
1554            ),
1555        }
1556    }
1557
1558    #[mz_ore::instrument]
1559    async fn has_system_config_synced_once(&mut self) -> Result<bool, DurableCatalogError> {
1560        self.get_current_config(SYSTEM_CONFIG_SYNCED_KEY)
1561            .await
1562            .map(|value| value.map(|value| value > 0).unwrap_or(false))
1563    }
1564
1565    #[mz_ore::instrument]
1566    async fn trace_unconsolidated(&mut self) -> Result<Trace, CatalogError> {
1567        self.sync_to_current_upper().await?;
1568        if self.is_initialized_inner() {
1569            let snapshot = self.snapshot_unconsolidated().await;
1570            Ok(Trace::from_snapshot(snapshot))
1571        } else {
1572            Err(CatalogError::Durable(DurableCatalogError::Uninitialized))
1573        }
1574    }
1575
1576    #[mz_ore::instrument]
1577    async fn trace_consolidated(&mut self) -> Result<Trace, CatalogError> {
1578        self.sync_to_current_upper().await?;
1579        if self.is_initialized_inner() {
1580            let snapshot = self.current_snapshot().await?;
1581            Ok(Trace::from_snapshot(snapshot))
1582        } else {
1583            Err(CatalogError::Durable(DurableCatalogError::Uninitialized))
1584        }
1585    }
1586
1587    #[mz_ore::instrument(level = "debug")]
1588    async fn expire(self: Box<Self>) {
1589        self.expire().await
1590    }
1591}
1592
1593/// Applies updates for an opened catalog.
1594#[derive(Debug)]
1595struct CatalogStateInner {
1596    /// A trace of all catalog updates that can be consumed by some higher layer.
1597    updates: VecDeque<memory::objects::StateUpdate>,
1598}
1599
1600impl CatalogStateInner {
1601    fn new() -> CatalogStateInner {
1602        CatalogStateInner {
1603            updates: VecDeque::new(),
1604        }
1605    }
1606}
1607
1608impl ApplyUpdate<StateUpdateKind> for CatalogStateInner {
1609    fn apply_update(
1610        &mut self,
1611        update: StateUpdate<StateUpdateKind>,
1612        current_fence_token: &mut FenceableToken,
1613        metrics: &Arc<Metrics>,
1614    ) -> Result<Option<StateUpdate<StateUpdateKind>>, FenceError> {
1615        if let Some(collection_type) = update.kind.collection_type() {
1616            metrics
1617                .collection_entries
1618                .with_label_values(&[&collection_type.to_string()])
1619                .add(update.diff.into_inner());
1620        }
1621
1622        {
1623            let update: Option<memory::objects::StateUpdate> = (&update)
1624                .try_into()
1625                .expect("invalid persisted update: {update:#?}");
1626            if let Some(update) = update {
1627                self.updates.push_back(update);
1628            }
1629        }
1630
1631        match (update.kind, update.diff) {
1632            (StateUpdateKind::AuditLog(_, ()), _) => Ok(None),
1633            // Nothing to due for fence token retractions but wait for the next insertion.
1634            (StateUpdateKind::FenceToken(_), Diff::MINUS_ONE) => Ok(None),
1635            (StateUpdateKind::FenceToken(token), Diff::ONE) => {
1636                current_fence_token.maybe_fence(token)?;
1637                Ok(None)
1638            }
1639            (kind, diff) => Ok(Some(StateUpdate {
1640                kind,
1641                ts: update.ts,
1642                diff,
1643            })),
1644        }
1645    }
1646}
1647
1648/// A durable store of the catalog state using Persist as an implementation. The durable store can
1649/// serve any catalog data and transactionally modify catalog data.
1650///
1651/// Production users should call [`Self::expire`] before dropping a [`PersistCatalogState`]
1652/// so that it can expire its leases. If/when rust gets AsyncDrop, this will be done automatically.
1653type PersistCatalogState = PersistHandle<StateUpdateKind, CatalogStateInner>;
1654
1655#[async_trait]
1656impl ReadOnlyDurableCatalogState for PersistCatalogState {
1657    fn epoch(&self) -> Epoch {
1658        self.fenceable_token
1659            .token()
1660            .expect("opened catalog state must have an epoch")
1661            .epoch
1662    }
1663
1664    fn metrics(&self) -> &Metrics {
1665        &self.metrics
1666    }
1667
1668    #[mz_ore::instrument(level = "debug")]
1669    async fn expire(self: Box<Self>) {
1670        self.expire().await
1671    }
1672
1673    fn is_bootstrap_complete(&self) -> bool {
1674        self.bootstrap_complete
1675    }
1676
1677    async fn get_audit_logs(&mut self) -> Result<Vec<VersionedEvent>, CatalogError> {
1678        self.sync_to_current_upper().await?;
1679        let audit_logs: Vec<_> = self
1680            .persist_snapshot()
1681            .await
1682            .filter_map(
1683                |StateUpdate {
1684                     kind,
1685                     ts: _,
1686                     diff: _,
1687                 }| match kind {
1688                    StateUpdateKind::AuditLog(key, ()) => Some(key),
1689                    _ => None,
1690                },
1691            )
1692            .collect();
1693        let mut audit_logs: Vec<_> = audit_logs
1694            .into_iter()
1695            .map(RustType::from_proto)
1696            .map_ok(|key: AuditLogKey| key.event)
1697            .collect::<Result<_, _>>()?;
1698        audit_logs.sort_by(|a, b| a.sortable_id().cmp(&b.sortable_id()));
1699        Ok(audit_logs)
1700    }
1701
1702    #[mz_ore::instrument(level = "debug")]
1703    async fn get_next_id(&mut self, id_type: &str) -> Result<u64, CatalogError> {
1704        self.with_trace(|trace| {
1705            Ok(trace
1706                .into_iter()
1707                .rev()
1708                .filter_map(|(kind, _, _)| match kind {
1709                    StateUpdateKind::IdAllocator(key, value) if key.name == id_type => {
1710                        Some(value.next_id)
1711                    }
1712                    _ => None,
1713                })
1714                .next()
1715                .expect("must exist"))
1716        })
1717        .await
1718    }
1719
1720    #[mz_ore::instrument(level = "debug")]
1721    async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError> {
1722        self.sync_to_current_upper().await?;
1723        Ok(self
1724            .fenceable_token
1725            .token()
1726            .expect("opened catalogs must have a token")
1727            .deploy_generation)
1728    }
1729
1730    #[mz_ore::instrument(level = "debug")]
1731    async fn snapshot(&mut self) -> Result<Snapshot, CatalogError> {
1732        self.with_snapshot(Ok).await
1733    }
1734
1735    #[mz_ore::instrument(level = "debug")]
1736    async fn sync_to_current_updates(
1737        &mut self,
1738    ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError> {
1739        let upper = self.current_upper().await;
1740        self.sync_updates(upper).await
1741    }
1742
1743    #[mz_ore::instrument(level = "debug")]
1744    async fn sync_updates(
1745        &mut self,
1746        target_upper: mz_repr::Timestamp,
1747    ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError> {
1748        self.sync(target_upper).await?;
1749        let mut updates = Vec::new();
1750        while let Some(update) = self.update_applier.updates.front() {
1751            if update.ts >= target_upper {
1752                break;
1753            }
1754
1755            let update = self
1756                .update_applier
1757                .updates
1758                .pop_front()
1759                .expect("peeked above");
1760            updates.push(update);
1761        }
1762        Ok(updates)
1763    }
1764
1765    async fn current_upper(&mut self) -> Timestamp {
1766        self.current_upper().await
1767    }
1768}
1769
1770#[async_trait]
1771#[allow(mismatched_lifetime_syntaxes)]
1772impl DurableCatalogState for PersistCatalogState {
1773    fn is_read_only(&self) -> bool {
1774        matches!(self.mode, Mode::Readonly)
1775    }
1776
1777    fn is_savepoint(&self) -> bool {
1778        matches!(self.mode, Mode::Savepoint)
1779    }
1780
1781    async fn mark_bootstrap_complete(&mut self) {
1782        self.bootstrap_complete = true;
1783        if matches!(self.mode, Mode::Writable) {
1784            self.since_handle
1785                .upgrade_version()
1786                .await
1787                .expect("invalid usage")
1788        }
1789    }
1790
1791    #[mz_ore::instrument(level = "debug")]
1792    async fn transaction(&mut self) -> Result<Transaction, CatalogError> {
1793        self.metrics.transactions_started.inc();
1794        let snapshot = self.snapshot().await?;
1795        let commit_ts = self.upper.clone();
1796        Transaction::new(self, snapshot, commit_ts)
1797    }
1798
1799    fn transaction_from_snapshot(
1800        &mut self,
1801        snapshot: Snapshot,
1802    ) -> Result<Transaction, CatalogError> {
1803        let commit_ts = self.upper.clone();
1804        Transaction::new(self, snapshot, commit_ts)
1805    }
1806
1807    #[mz_ore::instrument(level = "debug")]
1808    async fn commit_transaction(
1809        &mut self,
1810        txn_batch: TransactionBatch,
1811        commit_ts: Timestamp,
1812    ) -> Result<Timestamp, CatalogError> {
1813        async fn commit_transaction_inner(
1814            catalog: &mut PersistCatalogState,
1815            txn_batch: TransactionBatch,
1816            commit_ts: Timestamp,
1817        ) -> Result<Timestamp, CatalogError> {
1818            // If the transaction is empty then we don't error, even in read-only mode.
1819            // This is mostly for legacy reasons (i.e. with enough elbow grease this
1820            // behavior can be changed without breaking any fundamental assumptions).
1821            if catalog.mode == Mode::Readonly {
1822                let updates: Vec<_> = StateUpdate::from_txn_batch(txn_batch).collect();
1823                if !updates.is_empty() {
1824                    let collection_types: Vec<_> = updates
1825                        .iter()
1826                        .filter_map(|u| u.0.collection_type())
1827                        .collect();
1828                    return Err(DurableCatalogError::NotWritable(format!(
1829                        "cannot commit a transaction in a read-only catalog: \
1830                         {} updates across collections: {collection_types:?}",
1831                        updates.len(),
1832                    ))
1833                    .into());
1834                }
1835                return Ok(catalog.upper);
1836            }
1837
1838            // If the current upper does not match the transaction's commit timestamp, then the
1839            // catalog must have changed since the transaction was started, making the transaction
1840            // invalid. When/if we want a multi-writer catalog, this will likely have to change
1841            // from an assert to a retry.
1842            assert_eq!(
1843                catalog.upper, txn_batch.upper,
1844                "only one transaction at a time is supported"
1845            );
1846
1847            assert!(
1848                commit_ts >= catalog.upper,
1849                "expected commit ts, {}, to be greater than or equal to upper, {}",
1850                commit_ts,
1851                catalog.upper
1852            );
1853
1854            let updates = StateUpdate::from_txn_batch(txn_batch).collect();
1855            debug!("committing updates: {updates:?}");
1856
1857            let next_upper = match catalog.mode {
1858                Mode::Writable => catalog
1859                    .compare_and_append(updates, commit_ts)
1860                    .await
1861                    .map_err(|e| e.unwrap_fence_error())?,
1862                Mode::Savepoint => {
1863                    let updates = updates.into_iter().map(|(kind, diff)| StateUpdate {
1864                        kind,
1865                        ts: commit_ts,
1866                        diff,
1867                    });
1868                    catalog.apply_updates_and_consolidate(updates)?;
1869                    catalog.upper = commit_ts.step_forward();
1870                    catalog.upper
1871                }
1872                Mode::Readonly => unreachable!("handled above"),
1873            };
1874
1875            Ok(next_upper)
1876        }
1877        self.metrics.transaction_commits.inc();
1878        let counter = self.metrics.transaction_commit_latency_seconds.clone();
1879        commit_transaction_inner(self, txn_batch, commit_ts)
1880            .wall_time()
1881            .inc_by(counter)
1882            .await
1883    }
1884
1885    #[mz_ore::instrument(level = "debug")]
1886    async fn advance_upper(&mut self, new_upper: Timestamp) -> Result<(), CatalogError> {
1887        if self.upper >= new_upper {
1888            // We don't expect a no-op advancement, but if we are wrong we'd crash the process.
1889            // Seems safer to only soft-assert and return gracefully in production. If we get here
1890            // that means we tried to make the catalog shard readable at a time it was already
1891            // readable, which likely means we are violating linearizability. That's not great, but
1892            // crashing (or even crash-looping) is worse.
1893            //
1894            // TODO: Consider removing this once we have built some confidence.
1895            soft_panic_or_log!(
1896                "new_upper ({new_upper}) not greater than current upper ({})",
1897                self.upper
1898            );
1899            return Ok(());
1900        }
1901
1902        match self.mode {
1903            Mode::Writable => self
1904                .compare_and_append_inner([], new_upper)
1905                .await
1906                .map_err(|e| e.unwrap_fence_error())?,
1907            Mode::Savepoint => (),
1908            Mode::Readonly => {
1909                return Err(DurableCatalogError::NotWritable(
1910                    "cannot advance upper of a read-only catalog".into(),
1911                )
1912                .into());
1913            }
1914        }
1915
1916        self.upper = new_upper;
1917        // No sync needed since no data was written.
1918
1919        Ok(())
1920    }
1921
1922    fn shard_id(&self) -> ShardId {
1923        self.shard_id
1924    }
1925}
1926
1927/// Deterministically generate a shard ID for the given `organization_id` and `seed`.
1928pub fn shard_id(organization_id: Uuid, seed: usize) -> ShardId {
1929    let hash = sha2::Sha256::digest(format!("{organization_id}{seed}")).to_vec();
1930    soft_assert_eq_or_log!(hash.len(), 32, "SHA256 returns 32 bytes (256 bits)");
1931    let uuid = Uuid::from_slice(&hash[0..16]).expect("from_slice accepts exactly 16 bytes");
1932    ShardId::from_str(&format!("s{uuid}")).expect("known to be valid")
1933}
1934
1935/// Generates a timestamp for reading from `read_handle` that is as fresh as possible, given
1936/// `upper`.
1937fn as_of(
1938    read_handle: &ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1939    upper: Timestamp,
1940) -> Timestamp {
1941    let since = read_handle.since().clone();
1942    let mut as_of = upper.checked_sub(1).unwrap_or_else(|| {
1943        panic!("catalog persist shard should be initialize, found upper: {upper:?}")
1944    });
1945    // We only downgrade the since after writing, and we always set the since to one less than the
1946    // upper.
1947    soft_assert_or_log!(
1948        since.less_equal(&as_of),
1949        "since={since:?}, as_of={as_of:?}; since must be less than or equal to as_of"
1950    );
1951    // This should be a no-op if the assert above passes, however if it doesn't then we'd like to
1952    // continue with a correct timestamp instead of entering a panic loop.
1953    as_of.advance_by(since.borrow());
1954    as_of
1955}
1956
1957/// Fetch the persist version of the catalog shard, if one exists. A version will not
1958/// exist if we are creating a brand-new environment.
1959async fn fetch_catalog_shard_version(
1960    persist_client: &PersistClient,
1961    catalog_shard_id: ShardId,
1962) -> Option<semver::Version> {
1963    let shard_state = persist_client
1964        .inspect_shard::<Timestamp>(&catalog_shard_id)
1965        .await
1966        .ok()?;
1967    let json_state = serde_json::to_value(shard_state).expect("state serialization error");
1968    let json_version = json_state
1969        .get("applier_version")
1970        .cloned()
1971        .expect("missing applier_version");
1972    let version = serde_json::from_value(json_version).expect("version deserialization error");
1973    Some(version)
1974}
1975
1976/// Generates an iterator of [`StateUpdate`] that contain all updates to the catalog
1977/// state up to, and including, `as_of`.
1978///
1979/// The output is consolidated and sorted by timestamp in ascending order.
1980#[mz_ore::instrument(level = "debug")]
1981async fn snapshot_binary(
1982    read_handle: &mut ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1983    as_of: Timestamp,
1984    metrics: &Arc<Metrics>,
1985) -> impl Iterator<Item = StateUpdate<StateUpdateKindJson>> + DoubleEndedIterator + use<> {
1986    metrics.snapshots_taken.inc();
1987    let counter = metrics.snapshot_latency_seconds.clone();
1988    snapshot_binary_inner(read_handle, as_of)
1989        .wall_time()
1990        .inc_by(counter)
1991        .await
1992}
1993
1994/// Generates an iterator of [`StateUpdate`] that contain all updates to the catalog
1995/// state up to, and including, `as_of`.
1996///
1997/// The output is consolidated and sorted by timestamp in ascending order.
1998#[mz_ore::instrument(level = "debug")]
1999async fn snapshot_binary_inner(
2000    read_handle: &mut ReadHandle<SourceData, (), Timestamp, StorageDiff>,
2001    as_of: Timestamp,
2002) -> impl Iterator<Item = StateUpdate<StateUpdateKindJson>> + DoubleEndedIterator + use<> {
2003    let snapshot = read_handle
2004        .snapshot_and_fetch(Antichain::from_elem(as_of))
2005        .await
2006        .expect("we have advanced the restart_as_of by the since");
2007    soft_assert_no_log!(
2008        snapshot.iter().all(|(_, _, diff)| *diff == 1),
2009        "snapshot_and_fetch guarantees a consolidated result: {snapshot:#?}"
2010    );
2011    snapshot
2012        .into_iter()
2013        .map(Into::<StateUpdate<StateUpdateKindJson>>::into)
2014        .sorted_by(|a, b| Ord::cmp(&b.ts, &a.ts))
2015}
2016
2017/// Convert an [`Antichain<Timestamp>`] to a [`Timestamp`].
2018///
2019/// The correctness of this function relies on [`Timestamp`] being totally ordered and never
2020/// finalizing the catalog shard.
2021pub(crate) fn antichain_to_timestamp(antichain: Antichain<Timestamp>) -> Timestamp {
2022    antichain
2023        .into_option()
2024        .expect("we use a totally ordered time and never finalize the shard")
2025}
2026
2027// Debug methods used by the catalog-debug tool.
2028
2029impl Trace {
2030    /// Generates a [`Trace`] from snapshot.
2031    fn from_snapshot(snapshot: impl IntoIterator<Item = StateUpdate>) -> Trace {
2032        let mut trace = Trace::new();
2033        for StateUpdate { kind, ts, diff } in snapshot {
2034            match kind {
2035                StateUpdateKind::AuditLog(k, v) => trace.audit_log.values.push(((k, v), ts, diff)),
2036                StateUpdateKind::Cluster(k, v) => trace.clusters.values.push(((k, v), ts, diff)),
2037                StateUpdateKind::ClusterReplica(k, v) => {
2038                    trace.cluster_replicas.values.push(((k, v), ts, diff))
2039                }
2040                StateUpdateKind::Comment(k, v) => trace.comments.values.push(((k, v), ts, diff)),
2041                StateUpdateKind::Config(k, v) => trace.configs.values.push(((k, v), ts, diff)),
2042                StateUpdateKind::Database(k, v) => trace.databases.values.push(((k, v), ts, diff)),
2043                StateUpdateKind::DefaultPrivilege(k, v) => {
2044                    trace.default_privileges.values.push(((k, v), ts, diff))
2045                }
2046                StateUpdateKind::FenceToken(_) => {
2047                    // Fence token not included in trace.
2048                }
2049                StateUpdateKind::IdAllocator(k, v) => {
2050                    trace.id_allocator.values.push(((k, v), ts, diff))
2051                }
2052                StateUpdateKind::IntrospectionSourceIndex(k, v) => {
2053                    trace.introspection_sources.values.push(((k, v), ts, diff))
2054                }
2055                StateUpdateKind::Item(k, v) => trace.items.values.push(((k, v), ts, diff)),
2056                StateUpdateKind::NetworkPolicy(k, v) => {
2057                    trace.network_policies.values.push(((k, v), ts, diff))
2058                }
2059                StateUpdateKind::Role(k, v) => trace.roles.values.push(((k, v), ts, diff)),
2060                StateUpdateKind::Schema(k, v) => trace.schemas.values.push(((k, v), ts, diff)),
2061                StateUpdateKind::Setting(k, v) => trace.settings.values.push(((k, v), ts, diff)),
2062                StateUpdateKind::SourceReferences(k, v) => {
2063                    trace.source_references.values.push(((k, v), ts, diff))
2064                }
2065                StateUpdateKind::SystemConfiguration(k, v) => {
2066                    trace.system_configurations.values.push(((k, v), ts, diff))
2067                }
2068                StateUpdateKind::ClusterSystemConfiguration(k, v) => trace
2069                    .cluster_system_configurations
2070                    .values
2071                    .push(((k, v), ts, diff)),
2072                StateUpdateKind::ReplicaSystemConfiguration(k, v) => trace
2073                    .replica_system_configurations
2074                    .values
2075                    .push(((k, v), ts, diff)),
2076                StateUpdateKind::SystemObjectMapping(k, v) => {
2077                    trace.system_object_mappings.values.push(((k, v), ts, diff))
2078                }
2079                StateUpdateKind::SystemPrivilege(k, v) => {
2080                    trace.system_privileges.values.push(((k, v), ts, diff))
2081                }
2082                StateUpdateKind::StorageCollectionMetadata(k, v) => trace
2083                    .storage_collection_metadata
2084                    .values
2085                    .push(((k, v), ts, diff)),
2086                StateUpdateKind::UnfinalizedShard(k, ()) => {
2087                    trace.unfinalized_shards.values.push(((k, ()), ts, diff))
2088                }
2089                StateUpdateKind::TxnWalShard((), v) => {
2090                    trace.txn_wal_shard.values.push((((), v), ts, diff))
2091                }
2092                StateUpdateKind::RoleAuth(k, v) => trace.role_auth.values.push(((k, v), ts, diff)),
2093            }
2094        }
2095        trace
2096    }
2097}
2098
2099impl UnopenedPersistCatalogState {
2100    /// Manually update value of `key` in collection `T` to `value`.
2101    #[mz_ore::instrument]
2102    pub(crate) async fn debug_edit<T: Collection>(
2103        &mut self,
2104        key: T::Key,
2105        value: T::Value,
2106    ) -> Result<Option<T::Value>, CatalogError>
2107    where
2108        T::Key: PartialEq + Eq + Debug + Clone,
2109        T::Value: Debug + Clone,
2110    {
2111        let prev_value = loop {
2112            let key = key.clone();
2113            let value = value.clone();
2114            let snapshot = self.current_snapshot().await?;
2115            let trace = Trace::from_snapshot(snapshot);
2116            let collection_trace = T::collection_trace(trace);
2117            let prev_values: Vec<_> = collection_trace
2118                .values
2119                .into_iter()
2120                .filter(|((k, _), _, diff)| {
2121                    soft_assert_eq_or_log!(*diff, Diff::ONE, "trace is consolidated");
2122                    &key == k
2123                })
2124                .collect();
2125
2126            let prev_value = match &prev_values[..] {
2127                [] => None,
2128                [((_, v), _, _)] => Some(v.clone()),
2129                prev_values => panic!("multiple values found for key {key:?}: {prev_values:?}"),
2130            };
2131
2132            let mut updates: Vec<_> = prev_values
2133                .into_iter()
2134                .map(|((k, v), _, _)| (T::update(k, v), Diff::MINUS_ONE))
2135                .collect();
2136            updates.push((T::update(key, value), Diff::ONE));
2137            // We must fence out all other catalogs, if we haven't already, since we are writing.
2138            match self.fenceable_token.generate_unfenced_token(self.mode)? {
2139                Some((fence_updates, current_fenceable_token)) => {
2140                    updates.extend(fence_updates.clone());
2141                    match self.compare_and_append(updates, self.upper).await {
2142                        Ok(_) => {
2143                            self.fenceable_token = current_fenceable_token;
2144                            break prev_value;
2145                        }
2146                        Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
2147                        Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
2148                            warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
2149                            continue;
2150                        }
2151                    }
2152                }
2153                None => {
2154                    self.compare_and_append(updates, self.upper)
2155                        .await
2156                        .map_err(|e| e.unwrap_fence_error())?;
2157                    break prev_value;
2158                }
2159            }
2160        };
2161        Ok(prev_value)
2162    }
2163
2164    /// Manually delete `key` from collection `T`.
2165    #[mz_ore::instrument]
2166    pub(crate) async fn debug_delete<T: Collection>(
2167        &mut self,
2168        key: T::Key,
2169    ) -> Result<(), CatalogError>
2170    where
2171        T::Key: PartialEq + Eq + Debug + Clone,
2172        T::Value: Debug,
2173    {
2174        loop {
2175            let key = key.clone();
2176            let snapshot = self.current_snapshot().await?;
2177            let trace = Trace::from_snapshot(snapshot);
2178            let collection_trace = T::collection_trace(trace);
2179            let mut retractions: Vec<_> = collection_trace
2180                .values
2181                .into_iter()
2182                .filter(|((k, _), _, diff)| {
2183                    soft_assert_eq_or_log!(*diff, Diff::ONE, "trace is consolidated");
2184                    &key == k
2185                })
2186                .map(|((k, v), _, _)| (T::update(k, v), Diff::MINUS_ONE))
2187                .collect();
2188
2189            // We must fence out all other catalogs, if we haven't already, since we are writing.
2190            match self.fenceable_token.generate_unfenced_token(self.mode)? {
2191                Some((fence_updates, current_fenceable_token)) => {
2192                    retractions.extend(fence_updates.clone());
2193                    match self.compare_and_append(retractions, self.upper).await {
2194                        Ok(_) => {
2195                            self.fenceable_token = current_fenceable_token;
2196                            break;
2197                        }
2198                        Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
2199                        Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
2200                            warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
2201                            continue;
2202                        }
2203                    }
2204                }
2205                None => {
2206                    self.compare_and_append(retractions, self.upper)
2207                        .await
2208                        .map_err(|e| e.unwrap_fence_error())?;
2209                    break;
2210                }
2211            }
2212        }
2213        Ok(())
2214    }
2215
2216    /// Generates a [`Vec<StateUpdate>`] that contain all updates to the catalog
2217    /// state.
2218    ///
2219    /// The output is consolidated and sorted by timestamp in ascending order and the current upper.
2220    async fn current_snapshot(
2221        &mut self,
2222    ) -> Result<impl IntoIterator<Item = StateUpdate> + '_, CatalogError> {
2223        self.sync_to_current_upper().await?;
2224        self.consolidate();
2225        Ok(self.snapshot.iter().cloned().map(|(kind, ts, diff)| {
2226            let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
2227            StateUpdate { kind, ts, diff }
2228        }))
2229    }
2230}