1#[cfg(test)]
11mod tests;
12
13use std::cmp::max;
14use std::collections::{BTreeMap, VecDeque};
15use std::fmt::Debug;
16use std::str::FromStr;
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use async_trait::async_trait;
21use differential_dataflow::lattice::Lattice;
22use futures::{FutureExt, StreamExt};
23use itertools::Itertools;
24use mz_audit_log::VersionedEvent;
25use mz_ore::metrics::MetricsFutureExt;
26use mz_ore::now::EpochMillis;
27use mz_ore::{
28    soft_assert_eq_no_log, soft_assert_eq_or_log, soft_assert_ne_or_log, soft_assert_no_log,
29    soft_assert_or_log, soft_panic_or_log,
30};
31use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
32use mz_persist_client::cli::admin::{CATALOG_FORCE_COMPACTION_FUEL, CATALOG_FORCE_COMPACTION_WAIT};
33use mz_persist_client::critical::SinceHandle;
34use mz_persist_client::error::UpperMismatch;
35use mz_persist_client::read::{Listen, ListenEvent, ReadHandle};
36use mz_persist_client::write::WriteHandle;
37use mz_persist_client::{Diagnostics, PersistClient, ShardId};
38use mz_persist_types::codec_impls::UnitSchema;
39use mz_proto::{RustType, TryFromProtoError};
40use mz_repr::{Diff, RelationDesc, SqlScalarType};
41use mz_storage_types::StorageDiff;
42use mz_storage_types::sources::SourceData;
43use sha2::Digest;
44use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
45use tracing::{debug, info, warn};
46use uuid::Uuid;
47
48use crate::durable::debug::{Collection, CollectionType, DebugCatalogState, Trace};
49use crate::durable::error::FenceError;
50use crate::durable::initialize::{
51    ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, SYSTEM_CONFIG_SYNCED_KEY, USER_VERSION_KEY,
52    WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
53};
54use crate::durable::metrics::Metrics;
55use crate::durable::objects::state_update::{
56    IntoStateUpdateKindJson, StateUpdate, StateUpdateKind, StateUpdateKindJson,
57    TryIntoStateUpdateKind,
58};
59use crate::durable::objects::{AuditLogKey, FenceToken, Snapshot};
60use crate::durable::transaction::TransactionBatch;
61use crate::durable::upgrade::upgrade;
62use crate::durable::{
63    AuditLogIterator, BootstrapArgs, CATALOG_CONTENT_VERSION_KEY, CatalogError,
64    DurableCatalogError, DurableCatalogState, Epoch, OpenableDurableCatalogState,
65    ReadOnlyDurableCatalogState, Transaction, initialize,
66};
67use crate::memory;
68
69pub(crate) type Timestamp = mz_repr::Timestamp;
71
72const MIN_EPOCH: Epoch = unsafe { Epoch::new_unchecked(1) };
77
78const CATALOG_SHARD_NAME: &str = "catalog";
80const UPGRADE_SHARD_NAME: &str = "catalog_upgrade";
82
83const CATALOG_SEED: usize = 1;
85const UPGRADE_SEED: usize = 2;
116pub const _BUILTIN_MIGRATION_SEED: usize = 3;
118pub const _EXPRESSION_CACHE_SEED: usize = 4;
120
121#[derive(Debug, Copy, Clone, Eq, PartialEq)]
123pub(crate) enum Mode {
124    Readonly,
126    Savepoint,
128    Writable,
130}
131
132#[derive(Debug)]
134pub(crate) enum FenceableToken {
135    Initializing {
138        durable_token: Option<FenceToken>,
140        current_deploy_generation: Option<u64>,
142    },
143    Unfenced { current_token: FenceToken },
145    Fenced {
147        current_token: FenceToken,
148        fence_token: FenceToken,
149    },
150}
151
152impl FenceableToken {
153    fn new(current_deploy_generation: Option<u64>) -> Self {
155        Self::Initializing {
156            durable_token: None,
157            current_deploy_generation,
158        }
159    }
160
161    fn validate(&self) -> Result<Option<FenceToken>, FenceError> {
163        match self {
164            FenceableToken::Initializing { durable_token, .. } => Ok(durable_token.clone()),
165            FenceableToken::Unfenced { current_token, .. } => Ok(Some(current_token.clone())),
166            FenceableToken::Fenced {
167                current_token,
168                fence_token,
169            } => {
170                assert!(
171                    fence_token > current_token,
172                    "must be fenced by higher token; current={current_token:?}, fence={fence_token:?}"
173                );
174                if fence_token.deploy_generation > current_token.deploy_generation {
175                    Err(FenceError::DeployGeneration {
176                        current_generation: current_token.deploy_generation,
177                        fence_generation: fence_token.deploy_generation,
178                    })
179                } else {
180                    assert!(
181                        fence_token.epoch > current_token.epoch,
182                        "must be fenced by higher token; current={current_token:?}, fence={fence_token:?}"
183                    );
184                    Err(FenceError::Epoch {
185                        current_epoch: current_token.epoch,
186                        fence_epoch: fence_token.epoch,
187                    })
188                }
189            }
190        }
191    }
192
193    fn token(&self) -> Option<FenceToken> {
195        match self {
196            FenceableToken::Initializing { durable_token, .. } => durable_token.clone(),
197            FenceableToken::Unfenced { current_token, .. } => Some(current_token.clone()),
198            FenceableToken::Fenced { current_token, .. } => Some(current_token.clone()),
199        }
200    }
201
202    fn maybe_fence(&mut self, token: FenceToken) -> Result<(), FenceError> {
204        match self {
205            FenceableToken::Initializing {
206                durable_token,
207                current_deploy_generation,
208                ..
209            } => {
210                match durable_token {
211                    Some(durable_token) => {
212                        *durable_token = max(durable_token.clone(), token.clone());
213                    }
214                    None => {
215                        *durable_token = Some(token.clone());
216                    }
217                }
218                if let Some(current_deploy_generation) = current_deploy_generation {
219                    if *current_deploy_generation < token.deploy_generation {
220                        *self = FenceableToken::Fenced {
221                            current_token: FenceToken {
222                                deploy_generation: *current_deploy_generation,
223                                epoch: token.epoch,
224                            },
225                            fence_token: token,
226                        };
227                        self.validate()?;
228                    }
229                }
230            }
231            FenceableToken::Unfenced { current_token } => {
232                if *current_token < token {
233                    *self = FenceableToken::Fenced {
234                        current_token: current_token.clone(),
235                        fence_token: token,
236                    };
237                    self.validate()?;
238                }
239            }
240            FenceableToken::Fenced { .. } => {
241                self.validate()?;
242            }
243        }
244
245        Ok(())
246    }
247
248    fn generate_unfenced_token(
252        &self,
253        mode: Mode,
254    ) -> Result<Option<(Vec<(StateUpdateKind, Diff)>, FenceableToken)>, DurableCatalogError> {
255        let (durable_token, current_deploy_generation) = match self {
256            FenceableToken::Initializing {
257                durable_token,
258                current_deploy_generation,
259            } => (durable_token.clone(), current_deploy_generation.clone()),
260            FenceableToken::Unfenced { .. } | FenceableToken::Fenced { .. } => return Ok(None),
261        };
262
263        let mut fence_updates = Vec::with_capacity(2);
264
265        if let Some(durable_token) = &durable_token {
266            fence_updates.push((
267                StateUpdateKind::FenceToken(durable_token.clone()),
268                Diff::MINUS_ONE,
269            ));
270        }
271
272        let current_deploy_generation = current_deploy_generation
273            .or_else(|| durable_token.as_ref().map(|token| token.deploy_generation))
274            .ok_or(DurableCatalogError::Uninitialized)?;
276        let mut current_epoch = durable_token
277            .map(|token| token.epoch)
278            .unwrap_or(MIN_EPOCH)
279            .get();
280        if matches!(mode, Mode::Writable) {
282            current_epoch = current_epoch + 1;
283        }
284        let current_epoch = Epoch::new(current_epoch).expect("known to be non-zero");
285        let current_token = FenceToken {
286            deploy_generation: current_deploy_generation,
287            epoch: current_epoch,
288        };
289
290        fence_updates.push((
291            StateUpdateKind::FenceToken(current_token.clone()),
292            Diff::ONE,
293        ));
294
295        let current_fenceable_token = FenceableToken::Unfenced { current_token };
296
297        Ok(Some((fence_updates, current_fenceable_token)))
298    }
299}
300
301#[derive(Debug, thiserror::Error)]
303pub(crate) enum CompareAndAppendError {
304    #[error(transparent)]
305    Fence(#[from] FenceError),
306    #[error(
309        "expected catalog upper {expected_upper:?} did not match actual catalog upper {actual_upper:?}"
310    )]
311    UpperMismatch {
312        expected_upper: Timestamp,
313        actual_upper: Timestamp,
314    },
315}
316
317impl CompareAndAppendError {
318    pub(crate) fn unwrap_fence_error(self) -> FenceError {
319        match self {
320            CompareAndAppendError::Fence(e) => e,
321            e @ CompareAndAppendError::UpperMismatch { .. } => {
322                panic!("unexpected upper mismatch: {e:?}")
323            }
324        }
325    }
326}
327
328impl From<UpperMismatch<Timestamp>> for CompareAndAppendError {
329    fn from(upper_mismatch: UpperMismatch<Timestamp>) -> Self {
330        Self::UpperMismatch {
331            expected_upper: antichain_to_timestamp(upper_mismatch.expected),
332            actual_upper: antichain_to_timestamp(upper_mismatch.current),
333        }
334    }
335}
336
337pub(crate) trait ApplyUpdate<T: IntoStateUpdateKindJson> {
338    fn apply_update(
342        &mut self,
343        update: StateUpdate<T>,
344        current_fence_token: &mut FenceableToken,
345        metrics: &Arc<Metrics>,
346    ) -> Result<Option<StateUpdate<T>>, FenceError>;
347}
348
349#[derive(Debug)]
363pub(crate) struct PersistHandle<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> {
364    pub(crate) mode: Mode,
366    since_handle: SinceHandle<SourceData, (), Timestamp, StorageDiff, i64>,
368    write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
370    listen: Listen<SourceData, (), Timestamp, StorageDiff>,
372    persist_client: PersistClient,
374    shard_id: ShardId,
376    pub(crate) snapshot: Vec<(T, Timestamp, Diff)>,
380    update_applier: U,
382    pub(crate) upper: Timestamp,
384    fenceable_token: FenceableToken,
386    catalog_content_version: semver::Version,
388    bootstrap_complete: bool,
390    metrics: Arc<Metrics>,
392}
393
394impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
395    async fn increment_catalog_upgrade_shard_version(&self, organization_id: Uuid) {
397        let upgrade_shard_id = shard_id(organization_id, UPGRADE_SEED);
398        let mut write_handle: WriteHandle<(), (), Timestamp, StorageDiff> = self
399            .persist_client
400            .open_writer(
401                upgrade_shard_id,
402                Arc::new(UnitSchema::default()),
403                Arc::new(UnitSchema::default()),
404                Diagnostics {
405                    shard_name: UPGRADE_SHARD_NAME.to_string(),
406                    handle_purpose: "increment durable catalog upgrade shard version".to_string(),
407                },
408            )
409            .await
410            .expect("invalid usage");
411        const EMPTY_UPDATES: &[(((), ()), Timestamp, StorageDiff)] = &[];
412        let mut upper = write_handle.fetch_recent_upper().await.clone();
413        loop {
414            let next_upper = upper
415                .iter()
416                .map(|timestamp| timestamp.step_forward())
417                .collect();
418            match write_handle
419                .compare_and_append(EMPTY_UPDATES, upper, next_upper)
420                .await
421                .expect("invalid usage")
422            {
423                Ok(()) => break,
424                Err(upper_mismatch) => {
425                    upper = upper_mismatch.current;
426                }
427            }
428        }
429    }
430
431    #[mz_ore::instrument]
433    async fn current_upper(&mut self) -> Timestamp {
434        match self.mode {
435            Mode::Writable | Mode::Readonly => {
436                let upper = self.write_handle.fetch_recent_upper().await;
437                antichain_to_timestamp(upper.clone())
438            }
439            Mode::Savepoint => self.upper,
440        }
441    }
442
443    #[mz_ore::instrument]
447    pub(crate) async fn compare_and_append<S: IntoStateUpdateKindJson>(
448        &mut self,
449        updates: Vec<(S, Diff)>,
450        commit_ts: Timestamp,
451    ) -> Result<Timestamp, CompareAndAppendError> {
452        assert_eq!(self.mode, Mode::Writable);
453        assert!(
454            commit_ts >= self.upper,
455            "expected commit ts, {}, to be greater than or equal to upper, {}",
456            commit_ts,
457            self.upper
458        );
459
460        let contains_fence = if mz_ore::assert::soft_assertions_enabled() {
463            let updates: Vec<_> = updates.clone();
464            let parsed_updates: Vec<_> = updates
465                .clone()
466                .into_iter()
467                .map(|(update, diff)| {
468                    let update: StateUpdateKindJson = update.into();
469                    (update, diff)
470                })
471                .filter_map(|(update, diff)| {
472                    <StateUpdateKindJson as TryIntoStateUpdateKind>::try_into(update)
473                        .ok()
474                        .map(|update| (update, diff))
475                })
476                .collect();
477            let contains_retraction = parsed_updates.iter().any(|(update, diff)| {
478                matches!(update, StateUpdateKind::FenceToken(..)) && *diff == Diff::MINUS_ONE
479            });
480            let contains_addition = parsed_updates.iter().any(|(update, diff)| {
481                matches!(update, StateUpdateKind::FenceToken(..)) && *diff == Diff::ONE
482            });
483            let contains_fence = contains_retraction && contains_addition;
484            Some((contains_fence, updates))
485        } else {
486            None
487        };
488
489        let updates = updates.into_iter().map(|(kind, diff)| {
490            let kind: StateUpdateKindJson = kind.into();
491            (
492                (Into::<SourceData>::into(kind), ()),
493                commit_ts,
494                diff.into_inner(),
495            )
496        });
497        let next_upper = commit_ts.step_forward();
498        let res = self
499            .write_handle
500            .compare_and_append(
501                updates,
502                Antichain::from_elem(self.upper),
503                Antichain::from_elem(next_upper),
504            )
505            .await
506            .expect("invalid usage");
507
508        if let Err(e @ UpperMismatch { .. }) = res {
513            self.sync_to_current_upper().await?;
514            if let Some((contains_fence, updates)) = contains_fence {
515                assert!(
516                    contains_fence,
517                    "updates were neither fenced nor fencing and encountered an upper mismatch: {updates:#?}"
518                )
519            }
520            return Err(e.into());
521        }
522
523        let downgrade_to = Antichain::from_elem(next_upper.saturating_sub(1));
525
526        let opaque = *self.since_handle.opaque();
531        let downgrade = self
532            .since_handle
533            .maybe_compare_and_downgrade_since(&opaque, (&opaque, &downgrade_to))
534            .await;
535
536        match downgrade {
537            None => {}
538            Some(Err(e)) => soft_panic_or_log!("found opaque value {e}, but expected {opaque}"),
539            Some(Ok(updated)) => soft_assert_or_log!(
540                updated == downgrade_to,
541                "updated bound should match expected"
542            ),
543        }
544        self.sync(next_upper).await?;
545        Ok(next_upper)
546    }
547
548    #[mz_ore::instrument]
551    async fn snapshot_unconsolidated(&mut self) -> Vec<StateUpdate<StateUpdateKind>> {
552        let current_upper = self.current_upper().await;
553
554        let mut snapshot = Vec::new();
555        let mut read_handle = self.read_handle().await;
556        let as_of = as_of(&read_handle, current_upper);
557        let mut stream = Box::pin(
558            read_handle
560                .snapshot_and_stream(Antichain::from_elem(as_of))
561                .await
562                .expect("we have advanced the restart_as_of by the since"),
563        );
564        while let Some(update) = stream.next().await {
565            snapshot.push(update)
566        }
567        read_handle.expire().await;
568        snapshot
569            .into_iter()
570            .map(Into::<StateUpdate<StateUpdateKindJson>>::into)
571            .map(|state_update| state_update.try_into().expect("kind decoding error"))
572            .collect()
573    }
574
575    #[mz_ore::instrument]
579    pub(crate) async fn sync_to_current_upper(&mut self) -> Result<(), FenceError> {
580        let upper = self.current_upper().await;
581        self.sync(upper).await
582    }
583
584    #[mz_ore::instrument(level = "debug")]
588    pub(crate) async fn sync(&mut self, target_upper: Timestamp) -> Result<(), FenceError> {
589        self.metrics.syncs.inc();
590        let counter = self.metrics.sync_latency_seconds.clone();
591        self.sync_inner(target_upper)
592            .wall_time()
593            .inc_by(counter)
594            .await
595    }
596
597    #[mz_ore::instrument(level = "debug")]
598    async fn sync_inner(&mut self, target_upper: Timestamp) -> Result<(), FenceError> {
599        self.fenceable_token.validate()?;
600
601        if self.mode == Mode::Savepoint {
604            self.upper = max(self.upper, target_upper);
605            return Ok(());
606        }
607
608        let mut updates: BTreeMap<_, Vec<_>> = BTreeMap::new();
609
610        while self.upper < target_upper {
611            let listen_events = self.listen.fetch_next().await;
612            for listen_event in listen_events {
613                match listen_event {
614                    ListenEvent::Progress(upper) => {
615                        debug!("synced up to {upper:?}");
616                        self.upper = antichain_to_timestamp(upper);
617                        while let Some((ts, updates)) = updates.pop_first() {
622                            assert!(ts < self.upper, "expected {} < {}", ts, self.upper);
623                            let updates = updates.into_iter().map(
624                                |update: StateUpdate<StateUpdateKindJson>| {
625                                    let kind =
626                                        T::try_from(update.kind).expect("kind decoding error");
627                                    StateUpdate {
628                                        kind,
629                                        ts: update.ts,
630                                        diff: update.diff,
631                                    }
632                                },
633                            );
634                            self.apply_updates(updates)?;
635                        }
636                    }
637                    ListenEvent::Updates(batch_updates) => {
638                        for update in batch_updates {
639                            let update: StateUpdate<StateUpdateKindJson> = update.into();
640                            updates.entry(update.ts).or_default().push(update);
641                        }
642                    }
643                }
644            }
645        }
646        assert_eq!(updates, BTreeMap::new(), "all updates should be applied");
647        Ok(())
648    }
649
650    #[mz_ore::instrument(level = "debug")]
651    pub(crate) fn apply_updates(
652        &mut self,
653        updates: impl IntoIterator<Item = StateUpdate<T>>,
654    ) -> Result<(), FenceError> {
655        let mut updates: Vec<_> = updates
656            .into_iter()
657            .map(|StateUpdate { kind, ts, diff }| (kind, ts, diff))
658            .collect();
659
660        differential_dataflow::consolidation::consolidate_updates(&mut updates);
664
665        updates.sort_by(|(_, ts1, diff1), (_, ts2, diff2)| ts1.cmp(ts2).then(diff1.cmp(diff2)));
668
669        let mut errors = Vec::new();
670
671        for (kind, ts, diff) in updates {
672            if diff != Diff::ONE && diff != Diff::MINUS_ONE {
673                panic!("invalid update in consolidated trace: ({kind:?}, {ts:?}, {diff:?})");
674            }
675
676            match self.update_applier.apply_update(
677                StateUpdate { kind, ts, diff },
678                &mut self.fenceable_token,
679                &self.metrics,
680            ) {
681                Ok(Some(StateUpdate { kind, ts, diff })) => self.snapshot.push((kind, ts, diff)),
682                Ok(None) => {}
683                Err(err) => errors.push(err),
686            }
687        }
688
689        errors.sort();
690        if let Some(err) = errors.into_iter().next() {
691            return Err(err);
692        }
693
694        self.consolidate();
695
696        Ok(())
697    }
698
699    #[mz_ore::instrument]
700    pub(crate) fn consolidate(&mut self) {
701        soft_assert_no_log!(
702            self.snapshot
703                .windows(2)
704                .all(|updates| updates[0].1 <= updates[1].1),
705            "snapshot should be sorted by timestamp, {:#?}",
706            self.snapshot
707        );
708
709        let new_ts = self
710            .snapshot
711            .last()
712            .map(|(_, ts, _)| *ts)
713            .unwrap_or_else(Timestamp::minimum);
714        for (_, ts, _) in &mut self.snapshot {
715            *ts = new_ts;
716        }
717        differential_dataflow::consolidation::consolidate_updates(&mut self.snapshot);
718    }
719
720    async fn with_trace<R>(
724        &mut self,
725        f: impl FnOnce(&Vec<(T, Timestamp, Diff)>) -> Result<R, CatalogError>,
726    ) -> Result<R, CatalogError> {
727        self.sync_to_current_upper().await?;
728        f(&self.snapshot)
729    }
730
731    async fn read_handle(&self) -> ReadHandle<SourceData, (), Timestamp, StorageDiff> {
733        self.persist_client
734            .open_leased_reader(
735                self.shard_id,
736                Arc::new(desc()),
737                Arc::new(UnitSchema::default()),
738                Diagnostics {
739                    shard_name: CATALOG_SHARD_NAME.to_string(),
740                    handle_purpose: "openable durable catalog state temporary reader".to_string(),
741                },
742                USE_CRITICAL_SINCE_CATALOG.get(self.persist_client.dyncfgs()),
743            )
744            .await
745            .expect("invalid usage")
746    }
747
748    async fn expire(self: Box<Self>) {
750        self.write_handle.expire().await;
751        self.listen.expire().await;
752    }
753}
754
755impl<U: ApplyUpdate<StateUpdateKind>> PersistHandle<StateUpdateKind, U> {
756    async fn with_snapshot<T>(
760        &mut self,
761        f: impl FnOnce(Snapshot) -> Result<T, CatalogError>,
762    ) -> Result<T, CatalogError> {
763        fn apply<K, V>(map: &mut BTreeMap<K, V>, key: &K, value: &V, diff: Diff)
764        where
765            K: Ord + Clone,
766            V: Ord + Clone + Debug,
767        {
768            let key = key.clone();
769            let value = value.clone();
770            if diff == Diff::ONE {
771                let prev = map.insert(key, value);
772                assert_eq!(
773                    prev, None,
774                    "values must be explicitly retracted before inserting a new value"
775                );
776            } else if diff == Diff::MINUS_ONE {
777                let prev = map.remove(&key);
778                assert_eq!(
779                    prev,
780                    Some(value),
781                    "retraction does not match existing value"
782                );
783            }
784        }
785
786        self.with_trace(|trace| {
787            let mut snapshot = Snapshot::empty();
788            for (kind, ts, diff) in trace {
789                let diff = *diff;
790                if diff != Diff::ONE && diff != Diff::MINUS_ONE {
791                    panic!("invalid update in consolidated trace: ({kind:?}, {ts:?}, {diff:?})");
792                }
793
794                match kind {
795                    StateUpdateKind::AuditLog(_key, ()) => {
796                        }
798                    StateUpdateKind::Cluster(key, value) => {
799                        apply(&mut snapshot.clusters, key, value, diff);
800                    }
801                    StateUpdateKind::ClusterReplica(key, value) => {
802                        apply(&mut snapshot.cluster_replicas, key, value, diff);
803                    }
804                    StateUpdateKind::Comment(key, value) => {
805                        apply(&mut snapshot.comments, key, value, diff);
806                    }
807                    StateUpdateKind::Config(key, value) => {
808                        apply(&mut snapshot.configs, key, value, diff);
809                    }
810                    StateUpdateKind::Database(key, value) => {
811                        apply(&mut snapshot.databases, key, value, diff);
812                    }
813                    StateUpdateKind::DefaultPrivilege(key, value) => {
814                        apply(&mut snapshot.default_privileges, key, value, diff);
815                    }
816                    StateUpdateKind::FenceToken(_token) => {
817                        }
819                    StateUpdateKind::IdAllocator(key, value) => {
820                        apply(&mut snapshot.id_allocator, key, value, diff);
821                    }
822                    StateUpdateKind::IntrospectionSourceIndex(key, value) => {
823                        apply(&mut snapshot.introspection_sources, key, value, diff);
824                    }
825                    StateUpdateKind::Item(key, value) => {
826                        apply(&mut snapshot.items, key, value, diff);
827                    }
828                    StateUpdateKind::NetworkPolicy(key, value) => {
829                        apply(&mut snapshot.network_policies, key, value, diff);
830                    }
831                    StateUpdateKind::Role(key, value) => {
832                        apply(&mut snapshot.roles, key, value, diff);
833                    }
834                    StateUpdateKind::Schema(key, value) => {
835                        apply(&mut snapshot.schemas, key, value, diff);
836                    }
837                    StateUpdateKind::Setting(key, value) => {
838                        apply(&mut snapshot.settings, key, value, diff);
839                    }
840                    StateUpdateKind::SourceReferences(key, value) => {
841                        apply(&mut snapshot.source_references, key, value, diff);
842                    }
843                    StateUpdateKind::SystemConfiguration(key, value) => {
844                        apply(&mut snapshot.system_configurations, key, value, diff);
845                    }
846                    StateUpdateKind::SystemObjectMapping(key, value) => {
847                        apply(&mut snapshot.system_object_mappings, key, value, diff);
848                    }
849                    StateUpdateKind::SystemPrivilege(key, value) => {
850                        apply(&mut snapshot.system_privileges, key, value, diff);
851                    }
852                    StateUpdateKind::StorageCollectionMetadata(key, value) => {
853                        apply(&mut snapshot.storage_collection_metadata, key, value, diff);
854                    }
855                    StateUpdateKind::UnfinalizedShard(key, ()) => {
856                        apply(&mut snapshot.unfinalized_shards, key, &(), diff);
857                    }
858                    StateUpdateKind::TxnWalShard((), value) => {
859                        apply(&mut snapshot.txn_wal_shard, &(), value, diff);
860                    }
861                    StateUpdateKind::RoleAuth(key, value) => {
862                        apply(&mut snapshot.role_auth, key, value, diff);
863                    }
864                }
865            }
866            f(snapshot)
867        })
868        .await
869    }
870
871    #[mz_ore::instrument(level = "debug")]
878    async fn persist_snapshot(
879        &mut self,
880    ) -> impl Iterator<Item = StateUpdate> + DoubleEndedIterator {
881        let mut read_handle = self.read_handle().await;
882        let as_of = as_of(&read_handle, self.upper);
883        let snapshot = snapshot_binary(&mut read_handle, as_of, &self.metrics)
884            .await
885            .map(|update| update.try_into().expect("kind decoding error"));
886        read_handle.expire().await;
887        snapshot
888    }
889}
890
891#[derive(Debug)]
893pub(crate) struct UnopenedCatalogStateInner {
894    organization_id: Uuid,
896    configs: BTreeMap<String, u64>,
898    settings: BTreeMap<String, String>,
900}
901
902impl UnopenedCatalogStateInner {
903    fn new(organization_id: Uuid) -> UnopenedCatalogStateInner {
904        UnopenedCatalogStateInner {
905            organization_id,
906            configs: BTreeMap::new(),
907            settings: BTreeMap::new(),
908        }
909    }
910}
911
912impl ApplyUpdate<StateUpdateKindJson> for UnopenedCatalogStateInner {
913    fn apply_update(
914        &mut self,
915        update: StateUpdate<StateUpdateKindJson>,
916        current_fence_token: &mut FenceableToken,
917        _metrics: &Arc<Metrics>,
918    ) -> Result<Option<StateUpdate<StateUpdateKindJson>>, FenceError> {
919        if !update.kind.is_audit_log() && update.kind.is_always_deserializable() {
920            let kind = TryInto::try_into(&update.kind).expect("kind is known to be deserializable");
921            match (kind, update.diff) {
922                (StateUpdateKind::Config(key, value), Diff::ONE) => {
923                    let prev = self.configs.insert(key.key, value.value);
924                    assert_eq!(
925                        prev, None,
926                        "values must be explicitly retracted before inserting a new value"
927                    );
928                }
929                (StateUpdateKind::Config(key, value), Diff::MINUS_ONE) => {
930                    let prev = self.configs.remove(&key.key);
931                    assert_eq!(
932                        prev,
933                        Some(value.value),
934                        "retraction does not match existing value"
935                    );
936                }
937                (StateUpdateKind::Setting(key, value), Diff::ONE) => {
938                    let prev = self.settings.insert(key.name, value.value);
939                    assert_eq!(
940                        prev, None,
941                        "values must be explicitly retracted before inserting a new value"
942                    );
943                }
944                (StateUpdateKind::Setting(key, value), Diff::MINUS_ONE) => {
945                    let prev = self.settings.remove(&key.name);
946                    assert_eq!(
947                        prev,
948                        Some(value.value),
949                        "retraction does not match existing value"
950                    );
951                }
952                (StateUpdateKind::FenceToken(fence_token), Diff::ONE) => {
953                    current_fence_token.maybe_fence(fence_token)?;
954                }
955                _ => {}
956            }
957        }
958
959        Ok(Some(update))
960    }
961}
962
963pub(crate) type UnopenedPersistCatalogState =
971    PersistHandle<StateUpdateKindJson, UnopenedCatalogStateInner>;
972
973impl UnopenedPersistCatalogState {
974    #[mz_ore::instrument]
980    pub(crate) async fn new(
981        persist_client: PersistClient,
982        organization_id: Uuid,
983        version: semver::Version,
984        deploy_generation: Option<u64>,
985        metrics: Arc<Metrics>,
986    ) -> Result<UnopenedPersistCatalogState, DurableCatalogError> {
987        let catalog_shard_id = shard_id(organization_id, CATALOG_SEED);
988        let upgrade_shard_id = shard_id(organization_id, UPGRADE_SEED);
989        debug!(
990            ?catalog_shard_id,
991            ?upgrade_shard_id,
992            "new persist backed catalog state"
993        );
994
995        let version_in_upgrade_shard =
997            fetch_catalog_upgrade_shard_version(&persist_client, upgrade_shard_id).await;
998        if let Some(version_in_upgrade_shard) = version_in_upgrade_shard {
1001            if mz_persist_client::cfg::check_data_version(&version_in_upgrade_shard, &version)
1009                .is_err()
1010            {
1011                return Err(DurableCatalogError::IncompatiblePersistVersion {
1012                    found_version: version_in_upgrade_shard,
1013                    catalog_version: version,
1014                });
1015            }
1016        }
1017
1018        let open_handles_start = Instant::now();
1019        info!("startup: envd serve: catalog init: open handles beginning");
1020        let since_handle = persist_client
1021            .open_critical_since(
1022                catalog_shard_id,
1023                PersistClient::CONTROLLER_CRITICAL_SINCE,
1026                Diagnostics {
1027                    shard_name: CATALOG_SHARD_NAME.to_string(),
1028                    handle_purpose: "durable catalog state critical since".to_string(),
1029                },
1030            )
1031            .await
1032            .expect("invalid usage");
1033        let (mut write_handle, mut read_handle) = persist_client
1034            .open(
1035                catalog_shard_id,
1036                Arc::new(desc()),
1037                Arc::new(UnitSchema::default()),
1038                Diagnostics {
1039                    shard_name: CATALOG_SHARD_NAME.to_string(),
1040                    handle_purpose: "durable catalog state handles".to_string(),
1041                },
1042                USE_CRITICAL_SINCE_CATALOG.get(persist_client.dyncfgs()),
1043            )
1044            .await
1045            .expect("invalid usage");
1046        info!(
1047            "startup: envd serve: catalog init: open handles complete in {:?}",
1048            open_handles_start.elapsed()
1049        );
1050
1051        let upper = {
1053            const EMPTY_UPDATES: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
1054            let upper = Antichain::from_elem(Timestamp::minimum());
1055            let next_upper = Timestamp::minimum().step_forward();
1056            match write_handle
1057                .compare_and_append(EMPTY_UPDATES, upper, Antichain::from_elem(next_upper))
1058                .await
1059                .expect("invalid usage")
1060            {
1061                Ok(()) => next_upper,
1062                Err(mismatch) => antichain_to_timestamp(mismatch.current),
1063            }
1064        };
1065
1066        let snapshot_start = Instant::now();
1067        info!("startup: envd serve: catalog init: snapshot beginning");
1068        let as_of = as_of(&read_handle, upper);
1069        let snapshot: Vec<_> = snapshot_binary(&mut read_handle, as_of, &metrics)
1070            .await
1071            .map(|StateUpdate { kind, ts, diff }| (kind, ts, diff))
1072            .collect();
1073        let listen = read_handle
1074            .listen(Antichain::from_elem(as_of))
1075            .await
1076            .expect("invalid usage");
1077        info!(
1078            "startup: envd serve: catalog init: snapshot complete in {:?}",
1079            snapshot_start.elapsed()
1080        );
1081
1082        let mut handle = UnopenedPersistCatalogState {
1083            mode: Mode::Writable,
1085            since_handle,
1086            write_handle,
1087            listen,
1088            persist_client,
1089            shard_id: catalog_shard_id,
1090            snapshot: Vec::new(),
1092            update_applier: UnopenedCatalogStateInner::new(organization_id),
1093            upper,
1094            fenceable_token: FenceableToken::new(deploy_generation),
1095            catalog_content_version: version,
1096            bootstrap_complete: false,
1097            metrics,
1098        };
1099        soft_assert_no_log!(
1102            snapshot.iter().all(|(_, _, diff)| *diff == Diff::ONE),
1103            "snapshot should be consolidated: {snapshot:#?}"
1104        );
1105
1106        let apply_start = Instant::now();
1107        info!("startup: envd serve: catalog init: apply updates beginning");
1108        let updates = snapshot
1109            .into_iter()
1110            .map(|(kind, ts, diff)| StateUpdate { kind, ts, diff });
1111        handle.apply_updates(updates)?;
1112        info!(
1113            "startup: envd serve: catalog init: apply updates complete in {:?}",
1114            apply_start.elapsed()
1115        );
1116
1117        if let Some(found_version) = handle.get_catalog_content_version().await? {
1123            if handle.catalog_content_version < found_version {
1124                return Err(DurableCatalogError::IncompatiblePersistVersion {
1125                    found_version,
1126                    catalog_version: handle.catalog_content_version,
1127                });
1128            }
1129        }
1130
1131        Ok(handle)
1132    }
1133
1134    #[mz_ore::instrument]
1135    async fn open_inner(
1136        mut self,
1137        mode: Mode,
1138        initial_ts: Timestamp,
1139        bootstrap_args: &BootstrapArgs,
1140    ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1141        let mut commit_ts = self.upper;
1144        self.mode = mode;
1145
1146        match (&self.mode, &self.fenceable_token) {
1148            (_, FenceableToken::Unfenced { .. } | FenceableToken::Fenced { .. }) => {
1149                return Err(DurableCatalogError::Internal(
1150                    "catalog should not have fenced before opening".to_string(),
1151                )
1152                .into());
1153            }
1154            (
1155                Mode::Writable | Mode::Savepoint,
1156                FenceableToken::Initializing {
1157                    current_deploy_generation: None,
1158                    ..
1159                },
1160            ) => {
1161                return Err(DurableCatalogError::Internal(format!(
1162                    "cannot open in mode '{:?}' without a deploy generation",
1163                    self.mode,
1164                ))
1165                .into());
1166            }
1167            _ => {}
1168        }
1169
1170        let read_only = matches!(self.mode, Mode::Readonly);
1171
1172        loop {
1174            self.sync_to_current_upper().await?;
1175            commit_ts = max(commit_ts, self.upper);
1176            let (fence_updates, current_fenceable_token) = self
1177                .fenceable_token
1178                .generate_unfenced_token(self.mode)?
1179                .ok_or_else(|| {
1180                    DurableCatalogError::Internal(
1181                        "catalog should not have fenced before opening".to_string(),
1182                    )
1183                })?;
1184            debug!(
1185                ?self.upper,
1186                ?self.fenceable_token,
1187                ?current_fenceable_token,
1188                "fencing previous catalogs"
1189            );
1190            if matches!(self.mode, Mode::Writable) {
1191                match self
1192                    .compare_and_append(fence_updates.clone(), commit_ts)
1193                    .await
1194                {
1195                    Ok(upper) => {
1196                        commit_ts = upper;
1197                    }
1198                    Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
1199                    Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
1200                        warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
1201                        continue;
1202                    }
1203                }
1204            }
1205            self.fenceable_token = current_fenceable_token;
1206            break;
1207        }
1208
1209        let is_initialized = self.is_initialized_inner();
1210        if !matches!(self.mode, Mode::Writable) && !is_initialized {
1211            return Err(CatalogError::Durable(DurableCatalogError::NotWritable(
1212                format!(
1213                    "catalog tables do not exist; will not create in {:?} mode",
1214                    self.mode
1215                ),
1216            )));
1217        }
1218        soft_assert_ne_or_log!(self.upper, Timestamp::minimum());
1219
1220        let (audit_logs, snapshot): (Vec<_>, Vec<_>) = self
1222            .snapshot
1223            .into_iter()
1224            .partition(|(update, _, _)| update.is_audit_log());
1225        self.snapshot = snapshot;
1226        let audit_log_count = audit_logs.iter().map(|(_, _, diff)| diff).sum::<Diff>();
1227        let audit_log_handle = AuditLogIterator::new(audit_logs);
1228
1229        if is_initialized && !read_only {
1231            commit_ts = upgrade(&mut self, commit_ts).await?;
1232        }
1233
1234        debug!(
1235            ?is_initialized,
1236            ?self.upper,
1237            "initializing catalog state"
1238        );
1239        let mut catalog = PersistCatalogState {
1240            mode: self.mode,
1241            since_handle: self.since_handle,
1242            write_handle: self.write_handle,
1243            listen: self.listen,
1244            persist_client: self.persist_client,
1245            shard_id: self.shard_id,
1246            upper: self.upper,
1247            fenceable_token: self.fenceable_token,
1248            snapshot: Vec::new(),
1250            update_applier: CatalogStateInner::new(),
1251            catalog_content_version: self.catalog_content_version,
1252            bootstrap_complete: false,
1253            metrics: self.metrics,
1254        };
1255        catalog.metrics.collection_entries.reset();
1256        catalog
1259            .metrics
1260            .collection_entries
1261            .with_label_values(&[&CollectionType::AuditLog.to_string()])
1262            .add(audit_log_count.into_inner());
1263        let updates = self.snapshot.into_iter().map(|(kind, ts, diff)| {
1264            let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
1265            StateUpdate { kind, ts, diff }
1266        });
1267        catalog.apply_updates(updates)?;
1268
1269        let catalog_content_version = catalog.catalog_content_version.to_string();
1270        let txn = if is_initialized {
1271            let mut txn = catalog.transaction().await?;
1272            txn.set_catalog_content_version(catalog_content_version)?;
1273            txn
1274        } else {
1275            soft_assert_eq_no_log!(
1276                catalog
1277                    .snapshot
1278                    .iter()
1279                    .filter(|(kind, _, _)| !matches!(kind, StateUpdateKind::FenceToken(_)))
1280                    .count(),
1281                0,
1282                "trace should not contain any updates for an uninitialized catalog: {:#?}",
1283                catalog.snapshot
1284            );
1285
1286            let mut txn = catalog.transaction().await?;
1287            initialize::initialize(
1288                &mut txn,
1289                bootstrap_args,
1290                initial_ts.into(),
1291                catalog_content_version,
1292            )
1293            .await?;
1294            txn
1295        };
1296
1297        if read_only {
1298            let (txn_batch, _) = txn.into_parts();
1299            let updates = StateUpdate::from_txn_batch_ts(txn_batch, catalog.upper);
1301            catalog.apply_updates(updates)?;
1302        } else {
1303            txn.commit_internal(commit_ts).await?;
1304        }
1305
1306        if matches!(catalog.mode, Mode::Writable) {
1310            catalog
1311                .increment_catalog_upgrade_shard_version(self.update_applier.organization_id)
1312                .await;
1313            let write_handle = catalog
1314                .persist_client
1315                .open_writer::<SourceData, (), Timestamp, i64>(
1316                    catalog.write_handle.shard_id(),
1317                    Arc::new(desc()),
1318                    Arc::new(UnitSchema::default()),
1319                    Diagnostics {
1320                        shard_name: CATALOG_SHARD_NAME.to_string(),
1321                        handle_purpose: "compact catalog".to_string(),
1322                    },
1323                )
1324                .await
1325                .expect("invalid usage");
1326            let fuel = CATALOG_FORCE_COMPACTION_FUEL.handle(catalog.persist_client.dyncfgs());
1327            let wait = CATALOG_FORCE_COMPACTION_WAIT.handle(catalog.persist_client.dyncfgs());
1328            let _task = mz_ore::task::spawn(|| "catalog::force_shard_compaction", async move {
1331                let () =
1332                    mz_persist_client::cli::admin::dangerous_force_compaction_and_break_pushdown(
1333                        &write_handle,
1334                        || fuel.get(),
1335                        || wait.get(),
1336                    )
1337                    .await;
1338            });
1339        }
1340
1341        Ok((Box::new(catalog), audit_log_handle))
1342    }
1343
1344    #[mz_ore::instrument]
1349    fn is_initialized_inner(&self) -> bool {
1350        !self.update_applier.configs.is_empty()
1351    }
1352
1353    #[mz_ore::instrument]
1357    async fn get_current_config(&mut self, key: &str) -> Result<Option<u64>, DurableCatalogError> {
1358        self.sync_to_current_upper().await?;
1359        Ok(self.update_applier.configs.get(key).cloned())
1360    }
1361
1362    #[mz_ore::instrument]
1366    pub(crate) async fn get_user_version(&mut self) -> Result<Option<u64>, DurableCatalogError> {
1367        self.get_current_config(USER_VERSION_KEY).await
1368    }
1369
1370    #[mz_ore::instrument]
1374    async fn get_current_setting(
1375        &mut self,
1376        name: &str,
1377    ) -> Result<Option<String>, DurableCatalogError> {
1378        self.sync_to_current_upper().await?;
1379        Ok(self.update_applier.settings.get(name).cloned())
1380    }
1381
1382    #[mz_ore::instrument]
1387    async fn get_catalog_content_version(
1388        &mut self,
1389    ) -> Result<Option<semver::Version>, DurableCatalogError> {
1390        let version = self
1391            .get_current_setting(CATALOG_CONTENT_VERSION_KEY)
1392            .await?;
1393        let version = version.map(|version| version.parse().expect("invalid version persisted"));
1394        Ok(version)
1395    }
1396}
1397
1398#[async_trait]
1399impl OpenableDurableCatalogState for UnopenedPersistCatalogState {
1400    #[mz_ore::instrument]
1401    async fn open_savepoint(
1402        mut self: Box<Self>,
1403        initial_ts: Timestamp,
1404        bootstrap_args: &BootstrapArgs,
1405    ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1406        self.open_inner(Mode::Savepoint, initial_ts, bootstrap_args)
1407            .boxed()
1408            .await
1409    }
1410
1411    #[mz_ore::instrument]
1412    async fn open_read_only(
1413        mut self: Box<Self>,
1414        bootstrap_args: &BootstrapArgs,
1415    ) -> Result<Box<dyn DurableCatalogState>, CatalogError> {
1416        self.open_inner(Mode::Readonly, EpochMillis::MIN.into(), bootstrap_args)
1417            .boxed()
1418            .await
1419            .map(|(catalog, _)| catalog)
1420    }
1421
1422    #[mz_ore::instrument]
1423    async fn open(
1424        mut self: Box<Self>,
1425        initial_ts: Timestamp,
1426        bootstrap_args: &BootstrapArgs,
1427    ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1428        self.open_inner(Mode::Writable, initial_ts, bootstrap_args)
1429            .boxed()
1430            .await
1431    }
1432
1433    #[mz_ore::instrument(level = "debug")]
1434    async fn open_debug(mut self: Box<Self>) -> Result<DebugCatalogState, CatalogError> {
1435        Ok(DebugCatalogState(*self))
1436    }
1437
1438    #[mz_ore::instrument]
1439    async fn is_initialized(&mut self) -> Result<bool, CatalogError> {
1440        self.sync_to_current_upper().await?;
1441        Ok(self.is_initialized_inner())
1442    }
1443
1444    #[mz_ore::instrument]
1445    async fn epoch(&mut self) -> Result<Epoch, CatalogError> {
1446        self.sync_to_current_upper().await?;
1447        self.fenceable_token
1448            .validate()?
1449            .map(|token| token.epoch)
1450            .ok_or(CatalogError::Durable(DurableCatalogError::Uninitialized))
1451    }
1452
1453    #[mz_ore::instrument]
1454    async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError> {
1455        self.sync_to_current_upper().await?;
1456        self.fenceable_token
1457            .token()
1458            .map(|token| token.deploy_generation)
1459            .ok_or(CatalogError::Durable(DurableCatalogError::Uninitialized))
1460    }
1461
1462    #[mz_ore::instrument(level = "debug")]
1463    async fn get_0dt_deployment_max_wait(&mut self) -> Result<Option<Duration>, CatalogError> {
1464        let value = self
1465            .get_current_config(WITH_0DT_DEPLOYMENT_MAX_WAIT)
1466            .await?;
1467        match value {
1468            None => Ok(None),
1469            Some(millis) => Ok(Some(Duration::from_millis(millis))),
1470        }
1471    }
1472
1473    #[mz_ore::instrument(level = "debug")]
1474    async fn get_0dt_deployment_ddl_check_interval(
1475        &mut self,
1476    ) -> Result<Option<Duration>, CatalogError> {
1477        let value = self
1478            .get_current_config(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL)
1479            .await?;
1480        match value {
1481            None => Ok(None),
1482            Some(millis) => Ok(Some(Duration::from_millis(millis))),
1483        }
1484    }
1485
1486    #[mz_ore::instrument(level = "debug")]
1487    async fn get_enable_0dt_deployment_panic_after_timeout(
1488        &mut self,
1489    ) -> Result<Option<bool>, CatalogError> {
1490        let value = self
1491            .get_current_config(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT)
1492            .await?;
1493        match value {
1494            None => Ok(None),
1495            Some(0) => Ok(Some(false)),
1496            Some(1) => Ok(Some(true)),
1497            Some(v) => Err(
1498                DurableCatalogError::from(TryFromProtoError::UnknownEnumVariant(format!(
1499                    "{v} is not a valid boolean value"
1500                )))
1501                .into(),
1502            ),
1503        }
1504    }
1505
1506    #[mz_ore::instrument]
1507    async fn has_system_config_synced_once(&mut self) -> Result<bool, DurableCatalogError> {
1508        self.get_current_config(SYSTEM_CONFIG_SYNCED_KEY)
1509            .await
1510            .map(|value| value.map(|value| value > 0).unwrap_or(false))
1511    }
1512
1513    #[mz_ore::instrument]
1514    async fn trace_unconsolidated(&mut self) -> Result<Trace, CatalogError> {
1515        self.sync_to_current_upper().await?;
1516        if self.is_initialized_inner() {
1517            let snapshot = self.snapshot_unconsolidated().await;
1518            Ok(Trace::from_snapshot(snapshot))
1519        } else {
1520            Err(CatalogError::Durable(DurableCatalogError::Uninitialized))
1521        }
1522    }
1523
1524    #[mz_ore::instrument]
1525    async fn trace_consolidated(&mut self) -> Result<Trace, CatalogError> {
1526        self.sync_to_current_upper().await?;
1527        if self.is_initialized_inner() {
1528            let snapshot = self.current_snapshot().await?;
1529            Ok(Trace::from_snapshot(snapshot))
1530        } else {
1531            Err(CatalogError::Durable(DurableCatalogError::Uninitialized))
1532        }
1533    }
1534
1535    #[mz_ore::instrument(level = "debug")]
1536    async fn expire(self: Box<Self>) {
1537        self.expire().await
1538    }
1539}
1540
1541#[derive(Debug)]
1543struct CatalogStateInner {
1544    updates: VecDeque<memory::objects::StateUpdate>,
1546}
1547
1548impl CatalogStateInner {
1549    fn new() -> CatalogStateInner {
1550        CatalogStateInner {
1551            updates: VecDeque::new(),
1552        }
1553    }
1554}
1555
1556impl ApplyUpdate<StateUpdateKind> for CatalogStateInner {
1557    fn apply_update(
1558        &mut self,
1559        update: StateUpdate<StateUpdateKind>,
1560        current_fence_token: &mut FenceableToken,
1561        metrics: &Arc<Metrics>,
1562    ) -> Result<Option<StateUpdate<StateUpdateKind>>, FenceError> {
1563        if let Some(collection_type) = update.kind.collection_type() {
1564            metrics
1565                .collection_entries
1566                .with_label_values(&[&collection_type.to_string()])
1567                .add(update.diff.into_inner());
1568        }
1569
1570        {
1571            let update: Option<memory::objects::StateUpdate> = (&update)
1572                .try_into()
1573                .expect("invalid persisted update: {update:#?}");
1574            if let Some(update) = update {
1575                self.updates.push_back(update);
1576            }
1577        }
1578
1579        match (update.kind, update.diff) {
1580            (StateUpdateKind::AuditLog(_, ()), _) => Ok(None),
1581            (StateUpdateKind::FenceToken(_), Diff::MINUS_ONE) => Ok(None),
1583            (StateUpdateKind::FenceToken(token), Diff::ONE) => {
1584                current_fence_token.maybe_fence(token)?;
1585                Ok(None)
1586            }
1587            (kind, diff) => Ok(Some(StateUpdate {
1588                kind,
1589                ts: update.ts,
1590                diff,
1591            })),
1592        }
1593    }
1594}
1595
1596type PersistCatalogState = PersistHandle<StateUpdateKind, CatalogStateInner>;
1602
1603#[async_trait]
1604impl ReadOnlyDurableCatalogState for PersistCatalogState {
1605    fn epoch(&self) -> Epoch {
1606        self.fenceable_token
1607            .token()
1608            .expect("opened catalog state must have an epoch")
1609            .epoch
1610    }
1611
1612    #[mz_ore::instrument(level = "debug")]
1613    async fn expire(self: Box<Self>) {
1614        self.expire().await
1615    }
1616
1617    fn is_bootstrap_complete(&self) -> bool {
1618        self.bootstrap_complete
1619    }
1620
1621    async fn get_audit_logs(&mut self) -> Result<Vec<VersionedEvent>, CatalogError> {
1622        self.sync_to_current_upper().await?;
1623        let audit_logs: Vec<_> = self
1624            .persist_snapshot()
1625            .await
1626            .filter_map(
1627                |StateUpdate {
1628                     kind,
1629                     ts: _,
1630                     diff: _,
1631                 }| match kind {
1632                    StateUpdateKind::AuditLog(key, ()) => Some(key),
1633                    _ => None,
1634                },
1635            )
1636            .collect();
1637        let mut audit_logs: Vec<_> = audit_logs
1638            .into_iter()
1639            .map(RustType::from_proto)
1640            .map_ok(|key: AuditLogKey| key.event)
1641            .collect::<Result<_, _>>()?;
1642        audit_logs.sort_by(|a, b| a.sortable_id().cmp(&b.sortable_id()));
1643        Ok(audit_logs)
1644    }
1645
1646    #[mz_ore::instrument(level = "debug")]
1647    async fn get_next_id(&mut self, id_type: &str) -> Result<u64, CatalogError> {
1648        self.with_trace(|trace| {
1649            Ok(trace
1650                .into_iter()
1651                .rev()
1652                .filter_map(|(kind, _, _)| match kind {
1653                    StateUpdateKind::IdAllocator(key, value) if key.name == id_type => {
1654                        Some(value.next_id)
1655                    }
1656                    _ => None,
1657                })
1658                .next()
1659                .expect("must exist"))
1660        })
1661        .await
1662    }
1663
1664    #[mz_ore::instrument(level = "debug")]
1665    async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError> {
1666        self.sync_to_current_upper().await?;
1667        Ok(self
1668            .fenceable_token
1669            .token()
1670            .expect("opened catalogs must have a token")
1671            .deploy_generation)
1672    }
1673
1674    #[mz_ore::instrument(level = "debug")]
1675    async fn snapshot(&mut self) -> Result<Snapshot, CatalogError> {
1676        self.with_snapshot(Ok).await
1677    }
1678
1679    #[mz_ore::instrument(level = "debug")]
1680    async fn sync_to_current_updates(
1681        &mut self,
1682    ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError> {
1683        let upper = self.current_upper().await;
1684        self.sync_updates(upper).await
1685    }
1686
1687    #[mz_ore::instrument(level = "debug")]
1688    async fn sync_updates(
1689        &mut self,
1690        target_upper: mz_repr::Timestamp,
1691    ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError> {
1692        self.sync(target_upper).await?;
1693        let mut updates = Vec::new();
1694        while let Some(update) = self.update_applier.updates.front() {
1695            if update.ts >= target_upper {
1696                break;
1697            }
1698
1699            let update = self
1700                .update_applier
1701                .updates
1702                .pop_front()
1703                .expect("peeked above");
1704            updates.push(update);
1705        }
1706        Ok(updates)
1707    }
1708
1709    async fn current_upper(&mut self) -> Timestamp {
1710        self.current_upper().await
1711    }
1712}
1713
1714#[async_trait]
1715#[allow(mismatched_lifetime_syntaxes)]
1716impl DurableCatalogState for PersistCatalogState {
1717    fn is_read_only(&self) -> bool {
1718        matches!(self.mode, Mode::Readonly)
1719    }
1720
1721    fn is_savepoint(&self) -> bool {
1722        matches!(self.mode, Mode::Savepoint)
1723    }
1724
1725    fn mark_bootstrap_complete(&mut self) {
1726        self.bootstrap_complete = true;
1727    }
1728
1729    #[mz_ore::instrument(level = "debug")]
1730    async fn transaction(&mut self) -> Result<Transaction, CatalogError> {
1731        self.metrics.transactions_started.inc();
1732        let snapshot = self.snapshot().await?;
1733        let commit_ts = self.upper.clone();
1734        Transaction::new(self, snapshot, commit_ts)
1735    }
1736
1737    #[mz_ore::instrument(level = "debug")]
1738    async fn commit_transaction(
1739        &mut self,
1740        txn_batch: TransactionBatch,
1741        commit_ts: Timestamp,
1742    ) -> Result<Timestamp, CatalogError> {
1743        async fn commit_transaction_inner(
1744            catalog: &mut PersistCatalogState,
1745            txn_batch: TransactionBatch,
1746            commit_ts: Timestamp,
1747        ) -> Result<Timestamp, CatalogError> {
1748            assert_eq!(
1753                catalog.upper, txn_batch.upper,
1754                "only one transaction at a time is supported"
1755            );
1756
1757            assert!(
1758                commit_ts >= catalog.upper,
1759                "expected commit ts, {}, to be greater than or equal to upper, {}",
1760                commit_ts,
1761                catalog.upper
1762            );
1763
1764            let updates = StateUpdate::from_txn_batch(txn_batch).collect();
1765            debug!("committing updates: {updates:?}");
1766
1767            let next_upper = match catalog.mode {
1768                Mode::Writable => catalog
1769                    .compare_and_append(updates, commit_ts)
1770                    .await
1771                    .map_err(|e| e.unwrap_fence_error())?,
1772                Mode::Savepoint => {
1773                    let updates = updates.into_iter().map(|(kind, diff)| StateUpdate {
1774                        kind,
1775                        ts: commit_ts,
1776                        diff,
1777                    });
1778                    catalog.apply_updates(updates)?;
1779                    catalog.upper = commit_ts.step_forward();
1780                    catalog.upper
1781                }
1782                Mode::Readonly => {
1783                    if !updates.is_empty() {
1787                        return Err(DurableCatalogError::NotWritable(format!(
1788                            "cannot commit a transaction in a read-only catalog: {updates:#?}"
1789                        ))
1790                        .into());
1791                    }
1792                    catalog.upper
1793                }
1794            };
1795
1796            Ok(next_upper)
1797        }
1798        self.metrics.transaction_commits.inc();
1799        let counter = self.metrics.transaction_commit_latency_seconds.clone();
1800        commit_transaction_inner(self, txn_batch, commit_ts)
1801            .wall_time()
1802            .inc_by(counter)
1803            .await
1804    }
1805
1806    #[mz_ore::instrument(level = "debug")]
1807    async fn confirm_leadership(&mut self) -> Result<(), CatalogError> {
1808        if self.is_read_only() {
1810            return Ok(());
1811        }
1812        self.sync_to_current_upper().await?;
1813        Ok(())
1814    }
1815}
1816
1817pub fn shard_id(organization_id: Uuid, seed: usize) -> ShardId {
1819    let hash = sha2::Sha256::digest(format!("{organization_id}{seed}")).to_vec();
1820    soft_assert_eq_or_log!(hash.len(), 32, "SHA256 returns 32 bytes (256 bits)");
1821    let uuid = Uuid::from_slice(&hash[0..16]).expect("from_slice accepts exactly 16 bytes");
1822    ShardId::from_str(&format!("s{uuid}")).expect("known to be valid")
1823}
1824
1825fn desc() -> RelationDesc {
1828    RelationDesc::builder()
1829        .with_column("data", SqlScalarType::Jsonb.nullable(false))
1830        .finish()
1831}
1832
1833fn as_of(
1836    read_handle: &ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1837    upper: Timestamp,
1838) -> Timestamp {
1839    let since = read_handle.since().clone();
1840    let mut as_of = upper.checked_sub(1).unwrap_or_else(|| {
1841        panic!("catalog persist shard should be initialize, found upper: {upper:?}")
1842    });
1843    soft_assert_or_log!(
1846        since.less_equal(&as_of),
1847        "since={since:?}, as_of={as_of:?}; since must be less than or equal to as_of"
1848    );
1849    as_of.advance_by(since.borrow());
1852    as_of
1853}
1854
1855async fn fetch_catalog_upgrade_shard_version(
1858    persist_client: &PersistClient,
1859    upgrade_shard_id: ShardId,
1860) -> Option<semver::Version> {
1861    let shard_state = persist_client
1862        .inspect_shard::<Timestamp>(&upgrade_shard_id)
1863        .await
1864        .ok()?;
1865    let json_state = serde_json::to_value(shard_state).expect("state serialization error");
1866    let upgrade_version = json_state
1867        .get("applier_version")
1868        .cloned()
1869        .expect("missing applier_version");
1870    let upgrade_version =
1871        serde_json::from_value(upgrade_version).expect("version deserialization error");
1872    Some(upgrade_version)
1873}
1874
1875#[mz_ore::instrument(level = "debug")]
1880async fn snapshot_binary(
1881    read_handle: &mut ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1882    as_of: Timestamp,
1883    metrics: &Arc<Metrics>,
1884) -> impl Iterator<Item = StateUpdate<StateUpdateKindJson>> + DoubleEndedIterator + use<> {
1885    metrics.snapshots_taken.inc();
1886    let counter = metrics.snapshot_latency_seconds.clone();
1887    snapshot_binary_inner(read_handle, as_of)
1888        .wall_time()
1889        .inc_by(counter)
1890        .await
1891}
1892
1893#[mz_ore::instrument(level = "debug")]
1898async fn snapshot_binary_inner(
1899    read_handle: &mut ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1900    as_of: Timestamp,
1901) -> impl Iterator<Item = StateUpdate<StateUpdateKindJson>> + DoubleEndedIterator + use<> {
1902    let snapshot = read_handle
1903        .snapshot_and_fetch(Antichain::from_elem(as_of))
1904        .await
1905        .expect("we have advanced the restart_as_of by the since");
1906    soft_assert_no_log!(
1907        snapshot.iter().all(|(_, _, diff)| *diff == 1),
1908        "snapshot_and_fetch guarantees a consolidated result: {snapshot:#?}"
1909    );
1910    snapshot
1911        .into_iter()
1912        .map(Into::<StateUpdate<StateUpdateKindJson>>::into)
1913        .sorted_by(|a, b| Ord::cmp(&b.ts, &a.ts))
1914}
1915
1916pub(crate) fn antichain_to_timestamp(antichain: Antichain<Timestamp>) -> Timestamp {
1921    antichain
1922        .into_option()
1923        .expect("we use a totally ordered time and never finalize the shard")
1924}
1925
1926impl Trace {
1929    fn from_snapshot(snapshot: impl IntoIterator<Item = StateUpdate>) -> Trace {
1931        let mut trace = Trace::new();
1932        for StateUpdate { kind, ts, diff } in snapshot {
1933            match kind {
1934                StateUpdateKind::AuditLog(k, v) => trace.audit_log.values.push(((k, v), ts, diff)),
1935                StateUpdateKind::Cluster(k, v) => trace.clusters.values.push(((k, v), ts, diff)),
1936                StateUpdateKind::ClusterReplica(k, v) => {
1937                    trace.cluster_replicas.values.push(((k, v), ts, diff))
1938                }
1939                StateUpdateKind::Comment(k, v) => trace.comments.values.push(((k, v), ts, diff)),
1940                StateUpdateKind::Config(k, v) => trace.configs.values.push(((k, v), ts, diff)),
1941                StateUpdateKind::Database(k, v) => trace.databases.values.push(((k, v), ts, diff)),
1942                StateUpdateKind::DefaultPrivilege(k, v) => {
1943                    trace.default_privileges.values.push(((k, v), ts, diff))
1944                }
1945                StateUpdateKind::FenceToken(_) => {
1946                    }
1948                StateUpdateKind::IdAllocator(k, v) => {
1949                    trace.id_allocator.values.push(((k, v), ts, diff))
1950                }
1951                StateUpdateKind::IntrospectionSourceIndex(k, v) => {
1952                    trace.introspection_sources.values.push(((k, v), ts, diff))
1953                }
1954                StateUpdateKind::Item(k, v) => trace.items.values.push(((k, v), ts, diff)),
1955                StateUpdateKind::NetworkPolicy(k, v) => {
1956                    trace.network_policies.values.push(((k, v), ts, diff))
1957                }
1958                StateUpdateKind::Role(k, v) => trace.roles.values.push(((k, v), ts, diff)),
1959                StateUpdateKind::Schema(k, v) => trace.schemas.values.push(((k, v), ts, diff)),
1960                StateUpdateKind::Setting(k, v) => trace.settings.values.push(((k, v), ts, diff)),
1961                StateUpdateKind::SourceReferences(k, v) => {
1962                    trace.source_references.values.push(((k, v), ts, diff))
1963                }
1964                StateUpdateKind::SystemConfiguration(k, v) => {
1965                    trace.system_configurations.values.push(((k, v), ts, diff))
1966                }
1967                StateUpdateKind::SystemObjectMapping(k, v) => {
1968                    trace.system_object_mappings.values.push(((k, v), ts, diff))
1969                }
1970                StateUpdateKind::SystemPrivilege(k, v) => {
1971                    trace.system_privileges.values.push(((k, v), ts, diff))
1972                }
1973                StateUpdateKind::StorageCollectionMetadata(k, v) => trace
1974                    .storage_collection_metadata
1975                    .values
1976                    .push(((k, v), ts, diff)),
1977                StateUpdateKind::UnfinalizedShard(k, ()) => {
1978                    trace.unfinalized_shards.values.push(((k, ()), ts, diff))
1979                }
1980                StateUpdateKind::TxnWalShard((), v) => {
1981                    trace.txn_wal_shard.values.push((((), v), ts, diff))
1982                }
1983                StateUpdateKind::RoleAuth(k, v) => trace.role_auth.values.push(((k, v), ts, diff)),
1984            }
1985        }
1986        trace
1987    }
1988}
1989
1990impl UnopenedPersistCatalogState {
1991    #[mz_ore::instrument]
1993    pub(crate) async fn debug_edit<T: Collection>(
1994        &mut self,
1995        key: T::Key,
1996        value: T::Value,
1997    ) -> Result<Option<T::Value>, CatalogError>
1998    where
1999        T::Key: PartialEq + Eq + Debug + Clone,
2000        T::Value: Debug + Clone,
2001    {
2002        let prev_value = loop {
2003            let key = key.clone();
2004            let value = value.clone();
2005            let snapshot = self.current_snapshot().await?;
2006            let trace = Trace::from_snapshot(snapshot);
2007            let collection_trace = T::collection_trace(trace);
2008            let prev_values: Vec<_> = collection_trace
2009                .values
2010                .into_iter()
2011                .filter(|((k, _), _, diff)| {
2012                    soft_assert_eq_or_log!(*diff, Diff::ONE, "trace is consolidated");
2013                    &key == k
2014                })
2015                .collect();
2016
2017            let prev_value = match &prev_values[..] {
2018                [] => None,
2019                [((_, v), _, _)] => Some(v.clone()),
2020                prev_values => panic!("multiple values found for key {key:?}: {prev_values:?}"),
2021            };
2022
2023            let mut updates: Vec<_> = prev_values
2024                .into_iter()
2025                .map(|((k, v), _, _)| (T::update(k, v), Diff::MINUS_ONE))
2026                .collect();
2027            updates.push((T::update(key, value), Diff::ONE));
2028            match self.fenceable_token.generate_unfenced_token(self.mode)? {
2030                Some((fence_updates, current_fenceable_token)) => {
2031                    updates.extend(fence_updates.clone());
2032                    match self.compare_and_append(updates, self.upper).await {
2033                        Ok(_) => {
2034                            self.fenceable_token = current_fenceable_token;
2035                            break prev_value;
2036                        }
2037                        Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
2038                        Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
2039                            warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
2040                            continue;
2041                        }
2042                    }
2043                }
2044                None => {
2045                    self.compare_and_append(updates, self.upper)
2046                        .await
2047                        .map_err(|e| e.unwrap_fence_error())?;
2048                    break prev_value;
2049                }
2050            }
2051        };
2052        Ok(prev_value)
2053    }
2054
2055    #[mz_ore::instrument]
2057    pub(crate) async fn debug_delete<T: Collection>(
2058        &mut self,
2059        key: T::Key,
2060    ) -> Result<(), CatalogError>
2061    where
2062        T::Key: PartialEq + Eq + Debug + Clone,
2063        T::Value: Debug,
2064    {
2065        loop {
2066            let key = key.clone();
2067            let snapshot = self.current_snapshot().await?;
2068            let trace = Trace::from_snapshot(snapshot);
2069            let collection_trace = T::collection_trace(trace);
2070            let mut retractions: Vec<_> = collection_trace
2071                .values
2072                .into_iter()
2073                .filter(|((k, _), _, diff)| {
2074                    soft_assert_eq_or_log!(*diff, Diff::ONE, "trace is consolidated");
2075                    &key == k
2076                })
2077                .map(|((k, v), _, _)| (T::update(k, v), Diff::MINUS_ONE))
2078                .collect();
2079
2080            match self.fenceable_token.generate_unfenced_token(self.mode)? {
2082                Some((fence_updates, current_fenceable_token)) => {
2083                    retractions.extend(fence_updates.clone());
2084                    match self.compare_and_append(retractions, self.upper).await {
2085                        Ok(_) => {
2086                            self.fenceable_token = current_fenceable_token;
2087                            break;
2088                        }
2089                        Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
2090                        Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
2091                            warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
2092                            continue;
2093                        }
2094                    }
2095                }
2096                None => {
2097                    self.compare_and_append(retractions, self.upper)
2098                        .await
2099                        .map_err(|e| e.unwrap_fence_error())?;
2100                    break;
2101                }
2102            }
2103        }
2104        Ok(())
2105    }
2106
2107    async fn current_snapshot(
2112        &mut self,
2113    ) -> Result<impl IntoIterator<Item = StateUpdate> + '_, CatalogError> {
2114        self.sync_to_current_upper().await?;
2115        self.consolidate();
2116        Ok(self.snapshot.iter().cloned().map(|(kind, ts, diff)| {
2117            let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
2118            StateUpdate { kind, ts, diff }
2119        }))
2120    }
2121}