1#[cfg(test)]
11mod tests;
12
13use std::cmp::max;
14use std::collections::{BTreeMap, VecDeque};
15use std::fmt::Debug;
16use std::str::FromStr;
17use std::sync::{Arc, LazyLock};
18use std::time::{Duration, Instant};
19
20use async_trait::async_trait;
21use differential_dataflow::lattice::Lattice;
22use futures::{FutureExt, StreamExt};
23use itertools::Itertools;
24use mz_audit_log::VersionedEvent;
25use mz_ore::metrics::MetricsFutureExt;
26use mz_ore::now::EpochMillis;
27use mz_ore::{
28 soft_assert_eq_no_log, soft_assert_eq_or_log, soft_assert_ne_or_log, soft_assert_no_log,
29 soft_assert_or_log, soft_panic_or_log,
30};
31use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
32use mz_persist_client::cli::admin::{CATALOG_FORCE_COMPACTION_FUEL, CATALOG_FORCE_COMPACTION_WAIT};
33use mz_persist_client::critical::{CriticalReaderId, Opaque, SinceHandle};
34use mz_persist_client::error::UpperMismatch;
35use mz_persist_client::read::{Listen, ListenEvent, ReadHandle};
36use mz_persist_client::write::WriteHandle;
37use mz_persist_client::{Diagnostics, PersistClient, ShardId};
38use mz_persist_types::codec_impls::UnitSchema;
39use mz_proto::{RustType, TryFromProtoError};
40use mz_repr::Diff;
41use mz_storage_client::controller::PersistEpoch;
42use mz_storage_types::StorageDiff;
43use mz_storage_types::sources::SourceData;
44use sha2::Digest;
45use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
46use tracing::{debug, info, warn};
47use uuid::Uuid;
48
49use crate::durable::debug::{Collection, CollectionType, DebugCatalogState, Trace};
50use crate::durable::error::FenceError;
51use crate::durable::initialize::{
52 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, SYSTEM_CONFIG_SYNCED_KEY, USER_VERSION_KEY,
53 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
54};
55use crate::durable::metrics::Metrics;
56use crate::durable::objects::state_update::{
57 IntoStateUpdateKindJson, StateUpdate, StateUpdateKind, StateUpdateKindJson,
58 TryIntoStateUpdateKind,
59};
60use crate::durable::objects::{AuditLogKey, FenceToken, Snapshot};
61use crate::durable::transaction::TransactionBatch;
62use crate::durable::upgrade::upgrade;
63use crate::durable::{
64 AuditLogIterator, BootstrapArgs, CATALOG_CONTENT_VERSION_KEY, CatalogError,
65 DurableCatalogError, DurableCatalogState, Epoch, OpenableDurableCatalogState,
66 ReadOnlyDurableCatalogState, Transaction, initialize, persist_desc,
67};
68use crate::memory;
69
70pub(crate) type Timestamp = mz_repr::Timestamp;
72
73const MIN_EPOCH: Epoch = Epoch::new(1).expect("1 is non-zero");
75
76const CATALOG_SHARD_NAME: &str = "catalog";
78
79static CATALOG_CRITICAL_SINCE: LazyLock<CriticalReaderId> = LazyLock::new(|| {
82 "c55555555-6666-7777-8888-999999999999"
83 .parse()
84 .expect("valid CriticalReaderId")
85});
86
87const CATALOG_SEED: usize = 1;
89const _UPGRADE_SEED: usize = 2;
91pub const _BUILTIN_MIGRATION_SEED: usize = 3;
93pub const _EXPRESSION_CACHE_SEED: usize = 4;
95
96#[derive(Debug, Copy, Clone, Eq, PartialEq)]
98pub(crate) enum Mode {
99 Readonly,
101 Savepoint,
103 Writable,
105}
106
107#[derive(Debug)]
109pub(crate) enum FenceableToken {
110 Initializing {
113 durable_token: Option<FenceToken>,
115 current_deploy_generation: Option<u64>,
117 },
118 Unfenced { current_token: FenceToken },
120 Fenced {
122 current_token: FenceToken,
123 fence_token: FenceToken,
124 },
125}
126
127impl FenceableToken {
128 fn new(current_deploy_generation: Option<u64>) -> Self {
130 Self::Initializing {
131 durable_token: None,
132 current_deploy_generation,
133 }
134 }
135
136 fn validate(&self) -> Result<Option<FenceToken>, FenceError> {
138 match self {
139 FenceableToken::Initializing { durable_token, .. } => Ok(durable_token.clone()),
140 FenceableToken::Unfenced { current_token, .. } => Ok(Some(current_token.clone())),
141 FenceableToken::Fenced {
142 current_token,
143 fence_token,
144 } => {
145 assert!(
146 fence_token > current_token,
147 "must be fenced by higher token; current={current_token:?}, fence={fence_token:?}"
148 );
149 if fence_token.deploy_generation > current_token.deploy_generation {
150 Err(FenceError::DeployGeneration {
151 current_generation: current_token.deploy_generation,
152 fence_generation: fence_token.deploy_generation,
153 })
154 } else {
155 assert!(
156 fence_token.epoch > current_token.epoch,
157 "must be fenced by higher token; current={current_token:?}, fence={fence_token:?}"
158 );
159 Err(FenceError::Epoch {
160 current_epoch: current_token.epoch,
161 fence_epoch: fence_token.epoch,
162 })
163 }
164 }
165 }
166 }
167
168 fn token(&self) -> Option<FenceToken> {
170 match self {
171 FenceableToken::Initializing { durable_token, .. } => durable_token.clone(),
172 FenceableToken::Unfenced { current_token, .. } => Some(current_token.clone()),
173 FenceableToken::Fenced { current_token, .. } => Some(current_token.clone()),
174 }
175 }
176
177 fn maybe_fence(&mut self, token: FenceToken) -> Result<(), FenceError> {
179 match self {
180 FenceableToken::Initializing {
181 durable_token,
182 current_deploy_generation,
183 ..
184 } => {
185 match durable_token {
186 Some(durable_token) => {
187 *durable_token = max(durable_token.clone(), token.clone());
188 }
189 None => {
190 *durable_token = Some(token.clone());
191 }
192 }
193 if let Some(current_deploy_generation) = current_deploy_generation {
194 if *current_deploy_generation < token.deploy_generation {
195 *self = FenceableToken::Fenced {
196 current_token: FenceToken {
197 deploy_generation: *current_deploy_generation,
198 epoch: token.epoch,
199 },
200 fence_token: token,
201 };
202 self.validate()?;
203 }
204 }
205 }
206 FenceableToken::Unfenced { current_token } => {
207 if *current_token < token {
208 *self = FenceableToken::Fenced {
209 current_token: current_token.clone(),
210 fence_token: token,
211 };
212 self.validate()?;
213 }
214 }
215 FenceableToken::Fenced { .. } => {
216 self.validate()?;
217 }
218 }
219
220 Ok(())
221 }
222
223 fn generate_unfenced_token(
227 &self,
228 mode: Mode,
229 ) -> Result<Option<(Vec<(StateUpdateKind, Diff)>, FenceableToken)>, DurableCatalogError> {
230 let (durable_token, current_deploy_generation) = match self {
231 FenceableToken::Initializing {
232 durable_token,
233 current_deploy_generation,
234 } => (durable_token.clone(), current_deploy_generation.clone()),
235 FenceableToken::Unfenced { .. } | FenceableToken::Fenced { .. } => return Ok(None),
236 };
237
238 let mut fence_updates = Vec::with_capacity(2);
239
240 if let Some(durable_token) = &durable_token {
241 fence_updates.push((
242 StateUpdateKind::FenceToken(durable_token.clone()),
243 Diff::MINUS_ONE,
244 ));
245 }
246
247 let current_deploy_generation = current_deploy_generation
248 .or_else(|| durable_token.as_ref().map(|token| token.deploy_generation))
249 .ok_or(DurableCatalogError::Uninitialized)?;
251 let mut current_epoch = durable_token
252 .map(|token| token.epoch)
253 .unwrap_or(MIN_EPOCH)
254 .get();
255 if matches!(mode, Mode::Writable) {
257 current_epoch = current_epoch + 1;
258 }
259 let current_epoch = Epoch::new(current_epoch).expect("known to be non-zero");
260 let current_token = FenceToken {
261 deploy_generation: current_deploy_generation,
262 epoch: current_epoch,
263 };
264
265 fence_updates.push((
266 StateUpdateKind::FenceToken(current_token.clone()),
267 Diff::ONE,
268 ));
269
270 let current_fenceable_token = FenceableToken::Unfenced { current_token };
271
272 Ok(Some((fence_updates, current_fenceable_token)))
273 }
274}
275
276#[derive(Debug, thiserror::Error)]
278pub(crate) enum CompareAndAppendError {
279 #[error(transparent)]
280 Fence(#[from] FenceError),
281 #[error(
284 "expected catalog upper {expected_upper:?} did not match actual catalog upper {actual_upper:?}"
285 )]
286 UpperMismatch {
287 expected_upper: Timestamp,
288 actual_upper: Timestamp,
289 },
290}
291
292impl CompareAndAppendError {
293 pub(crate) fn unwrap_fence_error(self) -> FenceError {
294 match self {
295 CompareAndAppendError::Fence(e) => e,
296 e @ CompareAndAppendError::UpperMismatch { .. } => {
297 panic!("unexpected upper mismatch: {e:?}")
298 }
299 }
300 }
301}
302
303impl From<UpperMismatch<Timestamp>> for CompareAndAppendError {
304 fn from(upper_mismatch: UpperMismatch<Timestamp>) -> Self {
305 Self::UpperMismatch {
306 expected_upper: antichain_to_timestamp(upper_mismatch.expected),
307 actual_upper: antichain_to_timestamp(upper_mismatch.current),
308 }
309 }
310}
311
312pub(crate) trait ApplyUpdate<T: IntoStateUpdateKindJson> {
313 fn apply_update(
317 &mut self,
318 update: StateUpdate<T>,
319 current_fence_token: &mut FenceableToken,
320 metrics: &Arc<Metrics>,
321 ) -> Result<Option<StateUpdate<T>>, FenceError>;
322}
323
324#[derive(Debug)]
338pub(crate) struct PersistHandle<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> {
339 pub(crate) mode: Mode,
341 since_handle: SinceHandle<SourceData, (), Timestamp, StorageDiff>,
343 write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
345 listen: Listen<SourceData, (), Timestamp, StorageDiff>,
347 persist_client: PersistClient,
349 shard_id: ShardId,
351 pub(crate) snapshot: Vec<(T, Timestamp, Diff)>,
355 update_applier: U,
357 pub(crate) upper: Timestamp,
359 fenceable_token: FenceableToken,
361 catalog_content_version: semver::Version,
363 bootstrap_complete: bool,
365 metrics: Arc<Metrics>,
367 size_at_last_consolidation: Option<usize>,
371}
372
373impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
374 #[mz_ore::instrument]
376 async fn current_upper(&mut self) -> Timestamp {
377 match self.mode {
378 Mode::Writable | Mode::Readonly => {
379 let upper = self.write_handle.fetch_recent_upper().await;
380 antichain_to_timestamp(upper.clone())
381 }
382 Mode::Savepoint => self.upper,
383 }
384 }
385
386 #[mz_ore::instrument]
390 pub(crate) async fn compare_and_append<S: IntoStateUpdateKindJson>(
391 &mut self,
392 updates: Vec<(S, Diff)>,
393 commit_ts: Timestamp,
394 ) -> Result<Timestamp, CompareAndAppendError> {
395 let contains_fence = if mz_ore::assert::soft_assertions_enabled() {
397 let parsed_updates: Vec<_> = updates
398 .clone()
399 .into_iter()
400 .filter_map(|(update, diff)| {
401 let update: StateUpdateKindJson = update.into();
402 let update = TryIntoStateUpdateKind::try_into(update).ok()?;
403 Some((update, diff))
404 })
405 .collect();
406 let contains_retraction = parsed_updates.iter().any(|(update, diff)| {
407 matches!(update, StateUpdateKind::FenceToken(..)) && *diff == Diff::MINUS_ONE
408 });
409 let contains_addition = parsed_updates.iter().any(|(update, diff)| {
410 matches!(update, StateUpdateKind::FenceToken(..)) && *diff == Diff::ONE
411 });
412 Some(contains_retraction && contains_addition)
413 } else {
414 None
415 };
416
417 let updates = updates.into_iter().map(|(kind, diff)| {
418 let kind: StateUpdateKindJson = kind.into();
419 (
420 (Into::<SourceData>::into(kind), ()),
421 commit_ts,
422 diff.into_inner(),
423 )
424 });
425 let next_upper = commit_ts.step_forward();
426 self.compare_and_append_inner(updates, next_upper)
427 .await
428 .inspect_err(|e| {
429 if let Some(contains_fence) = contains_fence {
434 soft_assert_or_log!(
435 matches!(e, CompareAndAppendError::Fence(_)) || contains_fence,
436 "encountered an upper mismatch on a non-fencing write"
437 );
438 }
439 })?;
440
441 self.sync(next_upper).await?;
442 Ok(next_upper)
443 }
444
445 async fn compare_and_append_inner(
455 &mut self,
456 updates: impl IntoIterator<Item = ((SourceData, ()), Timestamp, StorageDiff)>,
457 next_upper: Timestamp,
458 ) -> Result<(), CompareAndAppendError> {
459 assert_eq!(self.mode, Mode::Writable);
460 assert!(
461 next_upper > self.upper,
462 "next_upper ({next_upper}) not greater than current upper ({})",
463 self.upper,
464 );
465
466 let res = self
467 .write_handle
468 .compare_and_append(
469 updates,
470 Antichain::from_elem(self.upper),
471 Antichain::from_elem(next_upper),
472 )
473 .await
474 .expect("invalid usage");
475
476 if let Err(e @ UpperMismatch { .. }) = res {
477 self.sync_to_current_upper().await?;
480 return Err(e.into());
481 }
482
483 let downgrade_to = Antichain::from_elem(next_upper.saturating_sub(1));
485
486 let opaque = self.since_handle.opaque().clone();
491 let downgrade = self
492 .since_handle
493 .maybe_compare_and_downgrade_since(&opaque, (&opaque, &downgrade_to))
494 .await;
495 if let Some(Err(e)) = downgrade {
496 soft_panic_or_log!("found opaque value {e:?}, but expected {opaque:?}");
497 }
498
499 Ok(())
500 }
501
502 #[mz_ore::instrument]
505 async fn snapshot_unconsolidated(&mut self) -> Vec<StateUpdate<StateUpdateKind>> {
506 let current_upper = self.current_upper().await;
507
508 let mut snapshot = Vec::new();
509 let mut read_handle = self.read_handle().await;
510 let as_of = as_of(&read_handle, current_upper);
511 let mut stream = Box::pin(
512 read_handle
514 .snapshot_and_stream(Antichain::from_elem(as_of))
515 .await
516 .expect("we have advanced the restart_as_of by the since"),
517 );
518 while let Some(update) = stream.next().await {
519 snapshot.push(update)
520 }
521 read_handle.expire().await;
522 snapshot
523 .into_iter()
524 .map(Into::<StateUpdate<StateUpdateKindJson>>::into)
525 .map(|state_update| state_update.try_into().expect("kind decoding error"))
526 .collect()
527 }
528
529 #[mz_ore::instrument]
533 pub(crate) async fn sync_to_current_upper(&mut self) -> Result<(), FenceError> {
534 let upper = self.current_upper().await;
535 self.sync(upper).await
536 }
537
538 #[mz_ore::instrument(level = "debug")]
542 pub(crate) async fn sync(&mut self, target_upper: Timestamp) -> Result<(), FenceError> {
543 self.metrics.syncs.inc();
544 let counter = self.metrics.sync_latency_seconds.clone();
545 self.sync_inner(target_upper)
546 .wall_time()
547 .inc_by(counter)
548 .await
549 }
550
551 #[mz_ore::instrument(level = "debug")]
552 async fn sync_inner(&mut self, target_upper: Timestamp) -> Result<(), FenceError> {
553 self.fenceable_token.validate()?;
554
555 if self.mode == Mode::Savepoint {
558 self.upper = max(self.upper, target_upper);
559 return Ok(());
560 }
561
562 let mut updates: BTreeMap<_, Vec<_>> = BTreeMap::new();
563
564 self.size_at_last_consolidation = None;
567
568 while self.upper < target_upper {
569 let listen_events = self.listen.fetch_next().await;
570 for listen_event in listen_events {
571 match listen_event {
572 ListenEvent::Progress(upper) => {
573 debug!("synced up to {upper:?}");
574 self.upper = antichain_to_timestamp(upper);
575 while let Some((ts, updates)) = updates.pop_first() {
580 assert!(ts < self.upper, "expected {} < {}", ts, self.upper);
581 let updates = updates.into_iter().map(
582 |update: StateUpdate<StateUpdateKindJson>| {
583 let kind =
584 T::try_from(update.kind).expect("kind decoding error");
585 StateUpdate {
586 kind,
587 ts: update.ts,
588 diff: update.diff,
589 }
590 },
591 );
592 self.apply_updates(updates)?;
593 self.maybe_consolidate();
594 }
595 }
596 ListenEvent::Updates(batch_updates) => {
597 for update in batch_updates {
598 let update: StateUpdate<StateUpdateKindJson> = update.into();
599 updates.entry(update.ts).or_default().push(update);
600 }
601 }
602 }
603 }
604 }
605 assert_eq!(updates, BTreeMap::new(), "all updates should be applied");
606 self.consolidate();
608 Ok(())
609 }
610
611 pub(crate) fn apply_updates_and_consolidate(
618 &mut self,
619 updates: impl IntoIterator<Item = StateUpdate<T>>,
620 ) -> Result<(), FenceError> {
621 self.apply_updates(updates)?;
622 self.consolidate();
623 Ok(())
624 }
625
626 #[mz_ore::instrument(level = "debug")]
634 fn apply_updates(
635 &mut self,
636 updates: impl IntoIterator<Item = StateUpdate<T>>,
637 ) -> Result<(), FenceError> {
638 let mut updates: Vec<_> = updates
639 .into_iter()
640 .map(|StateUpdate { kind, ts, diff }| (kind, ts, diff))
641 .collect();
642
643 differential_dataflow::consolidation::consolidate_updates(&mut updates);
647
648 updates.sort_by(|(_, ts1, diff1), (_, ts2, diff2)| ts1.cmp(ts2).then(diff1.cmp(diff2)));
651
652 let mut errors = Vec::new();
653
654 for (kind, ts, diff) in updates {
655 if diff != Diff::ONE && diff != Diff::MINUS_ONE {
656 panic!("invalid update in consolidated trace: ({kind:?}, {ts:?}, {diff:?})");
657 }
658
659 match self.update_applier.apply_update(
660 StateUpdate { kind, ts, diff },
661 &mut self.fenceable_token,
662 &self.metrics,
663 ) {
664 Ok(Some(StateUpdate { kind, ts, diff })) => self.snapshot.push((kind, ts, diff)),
665 Ok(None) => {}
666 Err(err) => errors.push(err),
669 }
670 }
671
672 let len = i64::try_from(self.snapshot.len()).unwrap_or(i64::MAX);
674 if len > self.metrics.snapshot_max_entries.get() {
675 self.metrics.snapshot_max_entries.set(len);
676 }
677
678 errors.sort();
679 if let Some(err) = errors.into_iter().next() {
680 return Err(err);
681 }
682
683 Ok(())
684 }
685
686 fn maybe_consolidate(&mut self) {
691 let threshold = *self
692 .size_at_last_consolidation
693 .get_or_insert_with(|| max(self.snapshot.len(), 8));
696 if self.snapshot.len() >= threshold * 2 {
697 self.consolidate();
698 self.size_at_last_consolidation = Some(self.snapshot.len());
699 }
700 }
701
702 #[mz_ore::instrument]
703 pub(crate) fn consolidate(&mut self) {
704 self.metrics.snapshot_consolidations.inc();
705 soft_assert_no_log!(
706 self.snapshot
707 .windows(2)
708 .all(|updates| updates[0].1 <= updates[1].1),
709 "snapshot should be sorted by timestamp, {:#?}",
710 self.snapshot
711 );
712
713 let new_ts = self
714 .snapshot
715 .last()
716 .map(|(_, ts, _)| *ts)
717 .unwrap_or_else(Timestamp::minimum);
718 for (_, ts, _) in &mut self.snapshot {
719 *ts = new_ts;
720 }
721 differential_dataflow::consolidation::consolidate_updates(&mut self.snapshot);
722 }
723
724 async fn with_trace<R>(
728 &mut self,
729 f: impl FnOnce(&Vec<(T, Timestamp, Diff)>) -> Result<R, CatalogError>,
730 ) -> Result<R, CatalogError> {
731 self.sync_to_current_upper().await?;
732 f(&self.snapshot)
733 }
734
735 async fn read_handle(&self) -> ReadHandle<SourceData, (), Timestamp, StorageDiff> {
737 self.persist_client
738 .open_leased_reader(
739 self.shard_id,
740 Arc::new(persist_desc()),
741 Arc::new(UnitSchema::default()),
742 Diagnostics {
743 shard_name: CATALOG_SHARD_NAME.to_string(),
744 handle_purpose: "openable durable catalog state temporary reader".to_string(),
745 },
746 USE_CRITICAL_SINCE_CATALOG.get(self.persist_client.dyncfgs()),
747 )
748 .await
749 .expect("invalid usage")
750 }
751
752 async fn expire(self: Box<Self>) {
754 self.write_handle.expire().await;
755 self.listen.expire().await;
756 }
757}
758
759impl<U: ApplyUpdate<StateUpdateKind>> PersistHandle<StateUpdateKind, U> {
760 async fn with_snapshot<T>(
764 &mut self,
765 f: impl FnOnce(Snapshot) -> Result<T, CatalogError>,
766 ) -> Result<T, CatalogError> {
767 fn apply<K, V>(map: &mut BTreeMap<K, V>, key: &K, value: &V, diff: Diff)
768 where
769 K: Ord + Clone,
770 V: Ord + Clone + Debug,
771 {
772 let key = key.clone();
773 let value = value.clone();
774 if diff == Diff::ONE {
775 let prev = map.insert(key, value);
776 assert_eq!(
777 prev, None,
778 "values must be explicitly retracted before inserting a new value"
779 );
780 } else if diff == Diff::MINUS_ONE {
781 let prev = map.remove(&key);
782 assert_eq!(
783 prev,
784 Some(value),
785 "retraction does not match existing value"
786 );
787 }
788 }
789
790 self.with_trace(|trace| {
791 let mut snapshot = Snapshot::empty();
792 for (kind, ts, diff) in trace {
793 let diff = *diff;
794 if diff != Diff::ONE && diff != Diff::MINUS_ONE {
795 panic!("invalid update in consolidated trace: ({kind:?}, {ts:?}, {diff:?})");
796 }
797
798 match kind {
799 StateUpdateKind::AuditLog(_key, ()) => {
800 }
802 StateUpdateKind::Cluster(key, value) => {
803 apply(&mut snapshot.clusters, key, value, diff);
804 }
805 StateUpdateKind::ClusterReplica(key, value) => {
806 apply(&mut snapshot.cluster_replicas, key, value, diff);
807 }
808 StateUpdateKind::Comment(key, value) => {
809 apply(&mut snapshot.comments, key, value, diff);
810 }
811 StateUpdateKind::Config(key, value) => {
812 apply(&mut snapshot.configs, key, value, diff);
813 }
814 StateUpdateKind::Database(key, value) => {
815 apply(&mut snapshot.databases, key, value, diff);
816 }
817 StateUpdateKind::DefaultPrivilege(key, value) => {
818 apply(&mut snapshot.default_privileges, key, value, diff);
819 }
820 StateUpdateKind::FenceToken(_token) => {
821 }
823 StateUpdateKind::IdAllocator(key, value) => {
824 apply(&mut snapshot.id_allocator, key, value, diff);
825 }
826 StateUpdateKind::IntrospectionSourceIndex(key, value) => {
827 apply(&mut snapshot.introspection_sources, key, value, diff);
828 }
829 StateUpdateKind::Item(key, value) => {
830 apply(&mut snapshot.items, key, value, diff);
831 }
832 StateUpdateKind::NetworkPolicy(key, value) => {
833 apply(&mut snapshot.network_policies, key, value, diff);
834 }
835 StateUpdateKind::Role(key, value) => {
836 apply(&mut snapshot.roles, key, value, diff);
837 }
838 StateUpdateKind::Schema(key, value) => {
839 apply(&mut snapshot.schemas, key, value, diff);
840 }
841 StateUpdateKind::Setting(key, value) => {
842 apply(&mut snapshot.settings, key, value, diff);
843 }
844 StateUpdateKind::SourceReferences(key, value) => {
845 apply(&mut snapshot.source_references, key, value, diff);
846 }
847 StateUpdateKind::SystemConfiguration(key, value) => {
848 apply(&mut snapshot.system_configurations, key, value, diff);
849 }
850 StateUpdateKind::ClusterSystemConfiguration(key, value) => {
851 apply(
852 &mut snapshot.cluster_system_configurations,
853 key,
854 value,
855 diff,
856 );
857 }
858 StateUpdateKind::ReplicaSystemConfiguration(key, value) => {
859 apply(
860 &mut snapshot.replica_system_configurations,
861 key,
862 value,
863 diff,
864 );
865 }
866 StateUpdateKind::SystemObjectMapping(key, value) => {
867 apply(&mut snapshot.system_object_mappings, key, value, diff);
868 }
869 StateUpdateKind::SystemPrivilege(key, value) => {
870 apply(&mut snapshot.system_privileges, key, value, diff);
871 }
872 StateUpdateKind::StorageCollectionMetadata(key, value) => {
873 apply(&mut snapshot.storage_collection_metadata, key, value, diff);
874 }
875 StateUpdateKind::UnfinalizedShard(key, ()) => {
876 apply(&mut snapshot.unfinalized_shards, key, &(), diff);
877 }
878 StateUpdateKind::TxnWalShard((), value) => {
879 apply(&mut snapshot.txn_wal_shard, &(), value, diff);
880 }
881 StateUpdateKind::RoleAuth(key, value) => {
882 apply(&mut snapshot.role_auth, key, value, diff);
883 }
884 }
885 }
886 f(snapshot)
887 })
888 .await
889 }
890
891 #[mz_ore::instrument(level = "debug")]
898 async fn persist_snapshot(&self) -> impl Iterator<Item = StateUpdate> + DoubleEndedIterator {
899 let mut read_handle = self.read_handle().await;
900 let as_of = as_of(&read_handle, self.upper);
901 let snapshot = snapshot_binary(&mut read_handle, as_of, &self.metrics)
902 .await
903 .map(|update| update.try_into().expect("kind decoding error"));
904 read_handle.expire().await;
905 snapshot
906 }
907}
908
909#[derive(Debug)]
911pub(crate) struct UnopenedCatalogStateInner {
912 configs: BTreeMap<String, u64>,
914 settings: BTreeMap<String, String>,
916}
917
918impl UnopenedCatalogStateInner {
919 fn new() -> UnopenedCatalogStateInner {
920 UnopenedCatalogStateInner {
921 configs: BTreeMap::new(),
922 settings: BTreeMap::new(),
923 }
924 }
925}
926
927impl ApplyUpdate<StateUpdateKindJson> for UnopenedCatalogStateInner {
928 fn apply_update(
929 &mut self,
930 update: StateUpdate<StateUpdateKindJson>,
931 current_fence_token: &mut FenceableToken,
932 _metrics: &Arc<Metrics>,
933 ) -> Result<Option<StateUpdate<StateUpdateKindJson>>, FenceError> {
934 if !update.kind.is_audit_log() && update.kind.is_always_deserializable() {
935 let kind = TryInto::try_into(&update.kind).expect("kind is known to be deserializable");
936 match (kind, update.diff) {
937 (StateUpdateKind::Config(key, value), Diff::ONE) => {
938 let prev = self.configs.insert(key.key, value.value);
939 assert_eq!(
940 prev, None,
941 "values must be explicitly retracted before inserting a new value"
942 );
943 }
944 (StateUpdateKind::Config(key, value), Diff::MINUS_ONE) => {
945 let prev = self.configs.remove(&key.key);
946 assert_eq!(
947 prev,
948 Some(value.value),
949 "retraction does not match existing value"
950 );
951 }
952 (StateUpdateKind::Setting(key, value), Diff::ONE) => {
953 let prev = self.settings.insert(key.name, value.value);
954 assert_eq!(
955 prev, None,
956 "values must be explicitly retracted before inserting a new value"
957 );
958 }
959 (StateUpdateKind::Setting(key, value), Diff::MINUS_ONE) => {
960 let prev = self.settings.remove(&key.name);
961 assert_eq!(
962 prev,
963 Some(value.value),
964 "retraction does not match existing value"
965 );
966 }
967 (StateUpdateKind::FenceToken(fence_token), Diff::ONE) => {
968 current_fence_token.maybe_fence(fence_token)?;
969 }
970 _ => {}
971 }
972 }
973
974 Ok(Some(update))
975 }
976}
977
978pub(crate) type UnopenedPersistCatalogState =
986 PersistHandle<StateUpdateKindJson, UnopenedCatalogStateInner>;
987
988impl UnopenedPersistCatalogState {
989 #[mz_ore::instrument]
995 pub(crate) async fn new(
996 persist_client: PersistClient,
997 organization_id: Uuid,
998 version: semver::Version,
999 deploy_generation: Option<u64>,
1000 metrics: Arc<Metrics>,
1001 ) -> Result<UnopenedPersistCatalogState, DurableCatalogError> {
1002 let catalog_shard_id = shard_id(organization_id, CATALOG_SEED);
1003 debug!(?catalog_shard_id, "new persist backed catalog state");
1004
1005 let version_in_catalog_shard =
1009 fetch_catalog_shard_version(&persist_client, catalog_shard_id).await;
1010 if let Some(version_in_catalog_shard) = version_in_catalog_shard {
1011 if !mz_persist_client::cfg::code_can_write_data(&version, &version_in_catalog_shard) {
1012 return Err(DurableCatalogError::IncompatiblePersistVersion {
1013 found_version: version_in_catalog_shard,
1014 catalog_version: version,
1015 });
1016 }
1017 }
1018
1019 let open_handles_start = Instant::now();
1020 info!("startup: envd serve: catalog init: open handles beginning");
1021 let since_handle = persist_client
1022 .open_critical_since(
1023 catalog_shard_id,
1024 CATALOG_CRITICAL_SINCE.clone(),
1025 Opaque::encode(&i64::MIN),
1026 Diagnostics {
1027 shard_name: CATALOG_SHARD_NAME.to_string(),
1028 handle_purpose: "durable catalog state critical since".to_string(),
1029 },
1030 )
1031 .await
1032 .expect("invalid usage");
1033
1034 let (mut write_handle, mut read_handle) = persist_client
1035 .open(
1036 catalog_shard_id,
1037 Arc::new(persist_desc()),
1038 Arc::new(UnitSchema::default()),
1039 Diagnostics {
1040 shard_name: CATALOG_SHARD_NAME.to_string(),
1041 handle_purpose: "durable catalog state handles".to_string(),
1042 },
1043 USE_CRITICAL_SINCE_CATALOG.get(persist_client.dyncfgs()),
1044 )
1045 .await
1046 .expect("invalid usage");
1047 info!(
1048 "startup: envd serve: catalog init: open handles complete in {:?}",
1049 open_handles_start.elapsed()
1050 );
1051
1052 let upper = {
1054 const EMPTY_UPDATES: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
1055 let upper = Antichain::from_elem(Timestamp::minimum());
1056 let next_upper = Timestamp::minimum().step_forward();
1057 match write_handle
1058 .compare_and_append(EMPTY_UPDATES, upper, Antichain::from_elem(next_upper))
1059 .await
1060 .expect("invalid usage")
1061 {
1062 Ok(()) => next_upper,
1063 Err(mismatch) => antichain_to_timestamp(mismatch.current),
1064 }
1065 };
1066
1067 let snapshot_start = Instant::now();
1068 info!("startup: envd serve: catalog init: snapshot beginning");
1069 let as_of = as_of(&read_handle, upper);
1070 let snapshot: Vec<_> = snapshot_binary(&mut read_handle, as_of, &metrics)
1071 .await
1072 .map(|StateUpdate { kind, ts, diff }| (kind, ts, diff))
1073 .collect();
1074 let listen = read_handle
1075 .listen(Antichain::from_elem(as_of))
1076 .await
1077 .expect("invalid usage");
1078 info!(
1079 "startup: envd serve: catalog init: snapshot complete in {:?}",
1080 snapshot_start.elapsed()
1081 );
1082
1083 let mut handle = UnopenedPersistCatalogState {
1084 mode: Mode::Writable,
1086 since_handle,
1087 write_handle,
1088 listen,
1089 persist_client,
1090 shard_id: catalog_shard_id,
1091 snapshot: Vec::new(),
1093 update_applier: UnopenedCatalogStateInner::new(),
1094 upper,
1095 fenceable_token: FenceableToken::new(deploy_generation),
1096 catalog_content_version: version,
1097 bootstrap_complete: false,
1098 metrics,
1099 size_at_last_consolidation: None,
1100 };
1101 soft_assert_no_log!(
1104 snapshot.iter().all(|(_, _, diff)| *diff == Diff::ONE),
1105 "snapshot should be consolidated: {snapshot:#?}"
1106 );
1107
1108 let apply_start = Instant::now();
1109 info!("startup: envd serve: catalog init: apply updates beginning");
1110 let updates = snapshot
1111 .into_iter()
1112 .map(|(kind, ts, diff)| StateUpdate { kind, ts, diff });
1113 handle.apply_updates_and_consolidate(updates)?;
1114 info!(
1115 "startup: envd serve: catalog init: apply updates complete in {:?}",
1116 apply_start.elapsed()
1117 );
1118
1119 if let Some(found_version) = handle.get_catalog_content_version().await? {
1125 if handle
1127 .catalog_content_version
1128 .cmp_precedence(&found_version)
1129 == std::cmp::Ordering::Less
1130 {
1131 return Err(DurableCatalogError::IncompatiblePersistVersion {
1132 found_version,
1133 catalog_version: handle.catalog_content_version,
1134 });
1135 }
1136 }
1137
1138 Ok(handle)
1139 }
1140
1141 #[mz_ore::instrument]
1142 async fn open_inner(
1143 mut self,
1144 mode: Mode,
1145 initial_ts: Timestamp,
1146 bootstrap_args: &BootstrapArgs,
1147 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1148 let mut commit_ts = self.upper;
1151 self.mode = mode;
1152
1153 match (&self.mode, &self.fenceable_token) {
1155 (_, FenceableToken::Unfenced { .. } | FenceableToken::Fenced { .. }) => {
1156 return Err(DurableCatalogError::Internal(
1157 "catalog should not have fenced before opening".to_string(),
1158 )
1159 .into());
1160 }
1161 (
1162 Mode::Writable | Mode::Savepoint,
1163 FenceableToken::Initializing {
1164 current_deploy_generation: None,
1165 ..
1166 },
1167 ) => {
1168 return Err(DurableCatalogError::Internal(format!(
1169 "cannot open in mode '{:?}' without a deploy generation",
1170 self.mode,
1171 ))
1172 .into());
1173 }
1174 _ => {}
1175 }
1176
1177 let read_only = matches!(self.mode, Mode::Readonly);
1178
1179 loop {
1181 self.sync_to_current_upper().await?;
1182 commit_ts = max(commit_ts, self.upper);
1183 let (fence_updates, current_fenceable_token) = self
1184 .fenceable_token
1185 .generate_unfenced_token(self.mode)?
1186 .ok_or_else(|| {
1187 DurableCatalogError::Internal(
1188 "catalog should not have fenced before opening".to_string(),
1189 )
1190 })?;
1191 debug!(
1192 ?self.upper,
1193 ?self.fenceable_token,
1194 ?current_fenceable_token,
1195 "fencing previous catalogs"
1196 );
1197 if matches!(self.mode, Mode::Writable) {
1198 match self
1199 .compare_and_append(fence_updates.clone(), commit_ts)
1200 .await
1201 {
1202 Ok(upper) => {
1203 commit_ts = upper;
1204 }
1205 Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
1206 Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
1207 warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
1208 continue;
1209 }
1210 }
1211 }
1212 self.fenceable_token = current_fenceable_token;
1213 break;
1214 }
1215
1216 if matches!(self.mode, Mode::Writable) {
1217 let mut controller_handle = self
1224 .persist_client
1225 .open_critical_since::<SourceData, (), Timestamp, StorageDiff>(
1226 self.shard_id,
1227 PersistClient::CONTROLLER_CRITICAL_SINCE,
1228 Opaque::encode(&i64::MIN),
1229 Diagnostics {
1230 shard_name: CATALOG_SHARD_NAME.to_string(),
1231 handle_purpose: "durable catalog state critical since (migration)"
1232 .to_string(),
1233 },
1234 )
1235 .await
1236 .expect("invalid usage");
1237
1238 let since = controller_handle.since().clone();
1239 let res = controller_handle
1240 .compare_and_downgrade_since(
1241 &Opaque::encode(&i64::MIN),
1242 (&Opaque::encode(&PersistEpoch::default()), &since),
1243 )
1244 .await;
1245 match res {
1246 Ok(_) => info!("migrated Opaque of catalog since handle"),
1247 Err(_) => { }
1248 }
1249 }
1250
1251 let is_initialized = self.is_initialized_inner();
1252 if !matches!(self.mode, Mode::Writable) && !is_initialized {
1253 return Err(CatalogError::Durable(DurableCatalogError::NotWritable(
1254 format!(
1255 "catalog tables do not exist; will not create in {:?} mode",
1256 self.mode
1257 ),
1258 )));
1259 }
1260 soft_assert_ne_or_log!(self.upper, Timestamp::minimum());
1261
1262 let (audit_logs, snapshot): (Vec<_>, Vec<_>) = self
1264 .snapshot
1265 .into_iter()
1266 .partition(|(update, _, _)| update.is_audit_log());
1267 self.snapshot = snapshot;
1268 let audit_log_count = audit_logs.iter().map(|(_, _, diff)| diff).sum::<Diff>();
1269 let audit_log_handle = AuditLogIterator::new(audit_logs);
1270
1271 if is_initialized && !read_only {
1273 commit_ts = upgrade(&mut self, commit_ts).await?;
1274 }
1275
1276 debug!(
1277 ?is_initialized,
1278 ?self.upper,
1279 "initializing catalog state"
1280 );
1281 let mut catalog = PersistCatalogState {
1282 mode: self.mode,
1283 since_handle: self.since_handle,
1284 write_handle: self.write_handle,
1285 listen: self.listen,
1286 persist_client: self.persist_client,
1287 shard_id: self.shard_id,
1288 upper: self.upper,
1289 fenceable_token: self.fenceable_token,
1290 snapshot: Vec::new(),
1292 update_applier: CatalogStateInner::new(),
1293 catalog_content_version: self.catalog_content_version,
1294 bootstrap_complete: false,
1295 metrics: self.metrics,
1296 size_at_last_consolidation: None,
1297 };
1298 catalog.metrics.collection_entries.reset();
1299 catalog
1302 .metrics
1303 .collection_entries
1304 .with_label_values(&[&CollectionType::AuditLog.to_string()])
1305 .add(audit_log_count.into_inner());
1306 let updates = self.snapshot.into_iter().map(|(kind, ts, diff)| {
1307 let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
1308 StateUpdate { kind, ts, diff }
1309 });
1310 catalog.apply_updates_and_consolidate(updates)?;
1311
1312 let catalog_content_version = catalog.catalog_content_version.to_string();
1313 let txn = if is_initialized {
1314 let mut txn = catalog.transaction().await?;
1315
1316 if txn.get_setting("migration_version".into()).is_none() && mode != Mode::Readonly {
1323 let old_version = txn.get_catalog_content_version();
1324 txn.set_setting("migration_version".into(), old_version.map(Into::into))?;
1325 }
1326
1327 txn.set_catalog_content_version(catalog_content_version)?;
1328 txn
1329 } else {
1330 soft_assert_eq_no_log!(
1331 catalog
1332 .snapshot
1333 .iter()
1334 .filter(|(kind, _, _)| !matches!(kind, StateUpdateKind::FenceToken(_)))
1335 .count(),
1336 0,
1337 "trace should not contain any updates for an uninitialized catalog: {:#?}",
1338 catalog.snapshot
1339 );
1340
1341 let mut txn = catalog.transaction().await?;
1342 initialize::initialize(
1343 &mut txn,
1344 bootstrap_args,
1345 initial_ts.into(),
1346 catalog_content_version,
1347 )
1348 .await?;
1349 txn
1350 };
1351
1352 if read_only {
1353 let (txn_batch, _) = txn.into_parts();
1354 let updates = StateUpdate::from_txn_batch_ts(txn_batch, catalog.upper);
1356 catalog.apply_updates_and_consolidate(updates)?;
1357 } else {
1358 txn.commit_internal(commit_ts).await?;
1359 }
1360
1361 if matches!(catalog.mode, Mode::Writable) {
1362 let write_handle = catalog
1363 .persist_client
1364 .open_writer::<SourceData, (), Timestamp, i64>(
1365 catalog.write_handle.shard_id(),
1366 Arc::new(persist_desc()),
1367 Arc::new(UnitSchema::default()),
1368 Diagnostics {
1369 shard_name: CATALOG_SHARD_NAME.to_string(),
1370 handle_purpose: "compact catalog".to_string(),
1371 },
1372 )
1373 .await
1374 .expect("invalid usage");
1375 let fuel = CATALOG_FORCE_COMPACTION_FUEL.handle(catalog.persist_client.dyncfgs());
1376 let wait = CATALOG_FORCE_COMPACTION_WAIT.handle(catalog.persist_client.dyncfgs());
1377 let _task = mz_ore::task::spawn(|| "catalog::force_shard_compaction", async move {
1380 let () =
1381 mz_persist_client::cli::admin::dangerous_force_compaction_and_break_pushdown(
1382 &write_handle,
1383 || fuel.get(),
1384 || wait.get(),
1385 )
1386 .await;
1387 });
1388 }
1389
1390 Ok((Box::new(catalog), audit_log_handle))
1391 }
1392
1393 #[mz_ore::instrument]
1398 fn is_initialized_inner(&self) -> bool {
1399 !self.update_applier.configs.is_empty()
1400 }
1401
1402 #[mz_ore::instrument]
1406 async fn get_current_config(&mut self, key: &str) -> Result<Option<u64>, DurableCatalogError> {
1407 self.sync_to_current_upper().await?;
1408 Ok(self.update_applier.configs.get(key).cloned())
1409 }
1410
1411 #[mz_ore::instrument]
1415 pub(crate) async fn get_user_version(&mut self) -> Result<Option<u64>, DurableCatalogError> {
1416 self.get_current_config(USER_VERSION_KEY).await
1417 }
1418
1419 #[mz_ore::instrument]
1423 async fn get_current_setting(
1424 &mut self,
1425 name: &str,
1426 ) -> Result<Option<String>, DurableCatalogError> {
1427 self.sync_to_current_upper().await?;
1428 Ok(self.update_applier.settings.get(name).cloned())
1429 }
1430
1431 #[mz_ore::instrument]
1436 async fn get_catalog_content_version(
1437 &mut self,
1438 ) -> Result<Option<semver::Version>, DurableCatalogError> {
1439 let version = self
1440 .get_current_setting(CATALOG_CONTENT_VERSION_KEY)
1441 .await?;
1442 let version = version.map(|version| version.parse().expect("invalid version persisted"));
1443 Ok(version)
1444 }
1445}
1446
1447#[async_trait]
1448impl OpenableDurableCatalogState for UnopenedPersistCatalogState {
1449 #[mz_ore::instrument]
1450 async fn open_savepoint(
1451 mut self: Box<Self>,
1452 initial_ts: Timestamp,
1453 bootstrap_args: &BootstrapArgs,
1454 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1455 self.open_inner(Mode::Savepoint, initial_ts, bootstrap_args)
1456 .boxed()
1457 .await
1458 }
1459
1460 #[mz_ore::instrument]
1461 async fn open_read_only(
1462 mut self: Box<Self>,
1463 bootstrap_args: &BootstrapArgs,
1464 ) -> Result<Box<dyn DurableCatalogState>, CatalogError> {
1465 self.open_inner(Mode::Readonly, EpochMillis::MIN.into(), bootstrap_args)
1466 .boxed()
1467 .await
1468 .map(|(catalog, _)| catalog)
1469 }
1470
1471 #[mz_ore::instrument]
1472 async fn open(
1473 mut self: Box<Self>,
1474 initial_ts: Timestamp,
1475 bootstrap_args: &BootstrapArgs,
1476 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1477 self.open_inner(Mode::Writable, initial_ts, bootstrap_args)
1478 .boxed()
1479 .await
1480 }
1481
1482 #[mz_ore::instrument(level = "debug")]
1483 async fn open_debug(mut self: Box<Self>) -> Result<DebugCatalogState, CatalogError> {
1484 Ok(DebugCatalogState(*self))
1485 }
1486
1487 #[mz_ore::instrument]
1488 async fn is_initialized(&mut self) -> Result<bool, CatalogError> {
1489 self.sync_to_current_upper().await?;
1490 Ok(self.is_initialized_inner())
1491 }
1492
1493 #[mz_ore::instrument]
1494 async fn epoch(&mut self) -> Result<Epoch, CatalogError> {
1495 self.sync_to_current_upper().await?;
1496 self.fenceable_token
1497 .validate()?
1498 .map(|token| token.epoch)
1499 .ok_or(CatalogError::Durable(DurableCatalogError::Uninitialized))
1500 }
1501
1502 #[mz_ore::instrument]
1503 async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError> {
1504 self.sync_to_current_upper().await?;
1505 self.fenceable_token
1506 .token()
1507 .map(|token| token.deploy_generation)
1508 .ok_or(CatalogError::Durable(DurableCatalogError::Uninitialized))
1509 }
1510
1511 #[mz_ore::instrument(level = "debug")]
1512 async fn get_0dt_deployment_max_wait(&mut self) -> Result<Option<Duration>, CatalogError> {
1513 let value = self
1514 .get_current_config(WITH_0DT_DEPLOYMENT_MAX_WAIT)
1515 .await?;
1516 match value {
1517 None => Ok(None),
1518 Some(millis) => Ok(Some(Duration::from_millis(millis))),
1519 }
1520 }
1521
1522 #[mz_ore::instrument(level = "debug")]
1523 async fn get_0dt_deployment_ddl_check_interval(
1524 &mut self,
1525 ) -> Result<Option<Duration>, CatalogError> {
1526 let value = self
1527 .get_current_config(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL)
1528 .await?;
1529 match value {
1530 None => Ok(None),
1531 Some(millis) => Ok(Some(Duration::from_millis(millis))),
1532 }
1533 }
1534
1535 #[mz_ore::instrument(level = "debug")]
1536 async fn get_enable_0dt_deployment_panic_after_timeout(
1537 &mut self,
1538 ) -> Result<Option<bool>, CatalogError> {
1539 let value = self
1540 .get_current_config(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT)
1541 .await?;
1542 match value {
1543 None => Ok(None),
1544 Some(0) => Ok(Some(false)),
1545 Some(1) => Ok(Some(true)),
1546 Some(v) => Err(
1547 DurableCatalogError::from(TryFromProtoError::UnknownEnumVariant(format!(
1548 "{v} is not a valid boolean value"
1549 )))
1550 .into(),
1551 ),
1552 }
1553 }
1554
1555 #[mz_ore::instrument]
1556 async fn has_system_config_synced_once(&mut self) -> Result<bool, DurableCatalogError> {
1557 self.get_current_config(SYSTEM_CONFIG_SYNCED_KEY)
1558 .await
1559 .map(|value| value.map(|value| value > 0).unwrap_or(false))
1560 }
1561
1562 #[mz_ore::instrument]
1563 async fn trace_unconsolidated(&mut self) -> Result<Trace, CatalogError> {
1564 self.sync_to_current_upper().await?;
1565 if self.is_initialized_inner() {
1566 let snapshot = self.snapshot_unconsolidated().await;
1567 Ok(Trace::from_snapshot(snapshot))
1568 } else {
1569 Err(CatalogError::Durable(DurableCatalogError::Uninitialized))
1570 }
1571 }
1572
1573 #[mz_ore::instrument]
1574 async fn trace_consolidated(&mut self) -> Result<Trace, CatalogError> {
1575 self.sync_to_current_upper().await?;
1576 if self.is_initialized_inner() {
1577 let snapshot = self.current_snapshot().await?;
1578 Ok(Trace::from_snapshot(snapshot))
1579 } else {
1580 Err(CatalogError::Durable(DurableCatalogError::Uninitialized))
1581 }
1582 }
1583
1584 #[mz_ore::instrument(level = "debug")]
1585 async fn expire(self: Box<Self>) {
1586 self.expire().await
1587 }
1588}
1589
1590#[derive(Debug)]
1592struct CatalogStateInner {
1593 updates: VecDeque<memory::objects::StateUpdate>,
1595}
1596
1597impl CatalogStateInner {
1598 fn new() -> CatalogStateInner {
1599 CatalogStateInner {
1600 updates: VecDeque::new(),
1601 }
1602 }
1603}
1604
1605impl ApplyUpdate<StateUpdateKind> for CatalogStateInner {
1606 fn apply_update(
1607 &mut self,
1608 update: StateUpdate<StateUpdateKind>,
1609 current_fence_token: &mut FenceableToken,
1610 metrics: &Arc<Metrics>,
1611 ) -> Result<Option<StateUpdate<StateUpdateKind>>, FenceError> {
1612 if let Some(collection_type) = update.kind.collection_type() {
1613 metrics
1614 .collection_entries
1615 .with_label_values(&[&collection_type.to_string()])
1616 .add(update.diff.into_inner());
1617 }
1618
1619 {
1620 let update: Option<memory::objects::StateUpdate> = (&update)
1621 .try_into()
1622 .expect("invalid persisted update: {update:#?}");
1623 if let Some(update) = update {
1624 self.updates.push_back(update);
1625 }
1626 }
1627
1628 match (update.kind, update.diff) {
1629 (StateUpdateKind::AuditLog(_, ()), _) => Ok(None),
1630 (StateUpdateKind::FenceToken(_), Diff::MINUS_ONE) => Ok(None),
1632 (StateUpdateKind::FenceToken(token), Diff::ONE) => {
1633 current_fence_token.maybe_fence(token)?;
1634 Ok(None)
1635 }
1636 (kind, diff) => Ok(Some(StateUpdate {
1637 kind,
1638 ts: update.ts,
1639 diff,
1640 })),
1641 }
1642 }
1643}
1644
1645type PersistCatalogState = PersistHandle<StateUpdateKind, CatalogStateInner>;
1651
1652#[async_trait]
1653impl ReadOnlyDurableCatalogState for PersistCatalogState {
1654 fn epoch(&self) -> Epoch {
1655 self.fenceable_token
1656 .token()
1657 .expect("opened catalog state must have an epoch")
1658 .epoch
1659 }
1660
1661 fn metrics(&self) -> &Metrics {
1662 &self.metrics
1663 }
1664
1665 #[mz_ore::instrument(level = "debug")]
1666 async fn expire(self: Box<Self>) {
1667 self.expire().await
1668 }
1669
1670 fn is_bootstrap_complete(&self) -> bool {
1671 self.bootstrap_complete
1672 }
1673
1674 async fn get_audit_logs(&mut self) -> Result<Vec<VersionedEvent>, CatalogError> {
1675 self.sync_to_current_upper().await?;
1676 let audit_logs: Vec<_> = self
1677 .persist_snapshot()
1678 .await
1679 .filter_map(
1680 |StateUpdate {
1681 kind,
1682 ts: _,
1683 diff: _,
1684 }| match kind {
1685 StateUpdateKind::AuditLog(key, ()) => Some(key),
1686 _ => None,
1687 },
1688 )
1689 .collect();
1690 let mut audit_logs: Vec<_> = audit_logs
1691 .into_iter()
1692 .map(RustType::from_proto)
1693 .map_ok(|key: AuditLogKey| key.event)
1694 .collect::<Result<_, _>>()?;
1695 audit_logs.sort_by(|a, b| a.sortable_id().cmp(&b.sortable_id()));
1696 Ok(audit_logs)
1697 }
1698
1699 #[mz_ore::instrument(level = "debug")]
1700 async fn get_next_id(&mut self, id_type: &str) -> Result<u64, CatalogError> {
1701 self.with_trace(|trace| {
1702 Ok(trace
1703 .into_iter()
1704 .rev()
1705 .filter_map(|(kind, _, _)| match kind {
1706 StateUpdateKind::IdAllocator(key, value) if key.name == id_type => {
1707 Some(value.next_id)
1708 }
1709 _ => None,
1710 })
1711 .next()
1712 .expect("must exist"))
1713 })
1714 .await
1715 }
1716
1717 #[mz_ore::instrument(level = "debug")]
1718 async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError> {
1719 self.sync_to_current_upper().await?;
1720 Ok(self
1721 .fenceable_token
1722 .token()
1723 .expect("opened catalogs must have a token")
1724 .deploy_generation)
1725 }
1726
1727 #[mz_ore::instrument(level = "debug")]
1728 async fn snapshot(&mut self) -> Result<Snapshot, CatalogError> {
1729 self.with_snapshot(Ok).await
1730 }
1731
1732 #[mz_ore::instrument(level = "debug")]
1733 async fn sync_to_current_updates(
1734 &mut self,
1735 ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError> {
1736 let upper = self.current_upper().await;
1737 self.sync_updates(upper).await
1738 }
1739
1740 #[mz_ore::instrument(level = "debug")]
1741 async fn sync_updates(
1742 &mut self,
1743 target_upper: mz_repr::Timestamp,
1744 ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError> {
1745 self.sync(target_upper).await?;
1746 let mut updates = Vec::new();
1747 while let Some(update) = self.update_applier.updates.front() {
1748 if update.ts >= target_upper {
1749 break;
1750 }
1751
1752 let update = self
1753 .update_applier
1754 .updates
1755 .pop_front()
1756 .expect("peeked above");
1757 updates.push(update);
1758 }
1759 Ok(updates)
1760 }
1761
1762 async fn current_upper(&mut self) -> Timestamp {
1763 self.current_upper().await
1764 }
1765}
1766
1767#[async_trait]
1768#[allow(mismatched_lifetime_syntaxes)]
1769impl DurableCatalogState for PersistCatalogState {
1770 fn is_read_only(&self) -> bool {
1771 matches!(self.mode, Mode::Readonly)
1772 }
1773
1774 fn is_savepoint(&self) -> bool {
1775 matches!(self.mode, Mode::Savepoint)
1776 }
1777
1778 async fn mark_bootstrap_complete(&mut self) {
1779 self.bootstrap_complete = true;
1780 if matches!(self.mode, Mode::Writable) {
1781 self.since_handle
1782 .upgrade_version()
1783 .await
1784 .expect("invalid usage")
1785 }
1786 }
1787
1788 #[mz_ore::instrument(level = "debug")]
1789 async fn transaction(&mut self) -> Result<Transaction, CatalogError> {
1790 self.metrics.transactions_started.inc();
1791 let snapshot = self.snapshot().await?;
1792 let commit_ts = self.upper.clone();
1793 Transaction::new(self, snapshot, commit_ts)
1794 }
1795
1796 fn transaction_from_snapshot(
1797 &mut self,
1798 snapshot: Snapshot,
1799 ) -> Result<Transaction, CatalogError> {
1800 let commit_ts = self.upper.clone();
1801 Transaction::new(self, snapshot, commit_ts)
1802 }
1803
1804 #[mz_ore::instrument(level = "debug")]
1805 async fn commit_transaction(
1806 &mut self,
1807 txn_batch: TransactionBatch,
1808 commit_ts: Timestamp,
1809 ) -> Result<Timestamp, CatalogError> {
1810 async fn commit_transaction_inner(
1811 catalog: &mut PersistCatalogState,
1812 txn_batch: TransactionBatch,
1813 commit_ts: Timestamp,
1814 ) -> Result<Timestamp, CatalogError> {
1815 if catalog.mode == Mode::Readonly {
1819 let updates: Vec<_> = StateUpdate::from_txn_batch(txn_batch).collect();
1820 if !updates.is_empty() {
1821 let collection_types: Vec<_> = updates
1822 .iter()
1823 .filter_map(|u| u.0.collection_type())
1824 .collect();
1825 return Err(DurableCatalogError::NotWritable(format!(
1826 "cannot commit a transaction in a read-only catalog: \
1827 {} updates across collections: {collection_types:?}",
1828 updates.len(),
1829 ))
1830 .into());
1831 }
1832 return Ok(catalog.upper);
1833 }
1834
1835 assert_eq!(
1840 catalog.upper, txn_batch.upper,
1841 "only one transaction at a time is supported"
1842 );
1843
1844 assert!(
1845 commit_ts >= catalog.upper,
1846 "expected commit ts, {}, to be greater than or equal to upper, {}",
1847 commit_ts,
1848 catalog.upper
1849 );
1850
1851 let updates = StateUpdate::from_txn_batch(txn_batch).collect();
1852 debug!("committing updates: {updates:?}");
1853
1854 let next_upper = match catalog.mode {
1855 Mode::Writable => catalog
1856 .compare_and_append(updates, commit_ts)
1857 .await
1858 .map_err(|e| e.unwrap_fence_error())?,
1859 Mode::Savepoint => {
1860 let updates = updates.into_iter().map(|(kind, diff)| StateUpdate {
1861 kind,
1862 ts: commit_ts,
1863 diff,
1864 });
1865 catalog.apply_updates_and_consolidate(updates)?;
1866 catalog.upper = commit_ts.step_forward();
1867 catalog.upper
1868 }
1869 Mode::Readonly => unreachable!("handled above"),
1870 };
1871
1872 Ok(next_upper)
1873 }
1874 self.metrics.transaction_commits.inc();
1875 let counter = self.metrics.transaction_commit_latency_seconds.clone();
1876 commit_transaction_inner(self, txn_batch, commit_ts)
1877 .wall_time()
1878 .inc_by(counter)
1879 .await
1880 }
1881
1882 #[mz_ore::instrument(level = "debug")]
1883 async fn advance_upper(&mut self, new_upper: Timestamp) -> Result<(), CatalogError> {
1884 if self.upper >= new_upper {
1885 soft_panic_or_log!(
1893 "new_upper ({new_upper}) not greater than current upper ({})",
1894 self.upper
1895 );
1896 return Ok(());
1897 }
1898
1899 match self.mode {
1900 Mode::Writable => self
1901 .compare_and_append_inner([], new_upper)
1902 .await
1903 .map_err(|e| e.unwrap_fence_error())?,
1904 Mode::Savepoint => (),
1905 Mode::Readonly => {
1906 return Err(DurableCatalogError::NotWritable(
1907 "cannot advance upper of a read-only catalog".into(),
1908 )
1909 .into());
1910 }
1911 }
1912
1913 self.upper = new_upper;
1914 Ok(())
1917 }
1918
1919 fn shard_id(&self) -> ShardId {
1920 self.shard_id
1921 }
1922}
1923
1924pub fn shard_id(organization_id: Uuid, seed: usize) -> ShardId {
1926 let hash = sha2::Sha256::digest(format!("{organization_id}{seed}")).to_vec();
1927 soft_assert_eq_or_log!(hash.len(), 32, "SHA256 returns 32 bytes (256 bits)");
1928 let uuid = Uuid::from_slice(&hash[0..16]).expect("from_slice accepts exactly 16 bytes");
1929 ShardId::from_str(&format!("s{uuid}")).expect("known to be valid")
1930}
1931
1932fn as_of(
1935 read_handle: &ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1936 upper: Timestamp,
1937) -> Timestamp {
1938 let since = read_handle.since().clone();
1939 let mut as_of = upper.checked_sub(1).unwrap_or_else(|| {
1940 panic!("catalog persist shard should be initialize, found upper: {upper:?}")
1941 });
1942 soft_assert_or_log!(
1945 since.less_equal(&as_of),
1946 "since={since:?}, as_of={as_of:?}; since must be less than or equal to as_of"
1947 );
1948 as_of.advance_by(since.borrow());
1951 as_of
1952}
1953
1954async fn fetch_catalog_shard_version(
1957 persist_client: &PersistClient,
1958 catalog_shard_id: ShardId,
1959) -> Option<semver::Version> {
1960 let shard_state = persist_client
1961 .inspect_shard::<Timestamp>(&catalog_shard_id)
1962 .await
1963 .ok()?;
1964 let json_state = serde_json::to_value(shard_state).expect("state serialization error");
1965 let json_version = json_state
1966 .get("applier_version")
1967 .cloned()
1968 .expect("missing applier_version");
1969 let version = serde_json::from_value(json_version).expect("version deserialization error");
1970 Some(version)
1971}
1972
1973#[mz_ore::instrument(level = "debug")]
1978async fn snapshot_binary(
1979 read_handle: &mut ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1980 as_of: Timestamp,
1981 metrics: &Arc<Metrics>,
1982) -> impl Iterator<Item = StateUpdate<StateUpdateKindJson>> + DoubleEndedIterator + use<> {
1983 metrics.snapshots_taken.inc();
1984 let counter = metrics.snapshot_latency_seconds.clone();
1985 snapshot_binary_inner(read_handle, as_of)
1986 .wall_time()
1987 .inc_by(counter)
1988 .await
1989}
1990
1991#[mz_ore::instrument(level = "debug")]
1996async fn snapshot_binary_inner(
1997 read_handle: &mut ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1998 as_of: Timestamp,
1999) -> impl Iterator<Item = StateUpdate<StateUpdateKindJson>> + DoubleEndedIterator + use<> {
2000 let snapshot = read_handle
2001 .snapshot_and_fetch(Antichain::from_elem(as_of))
2002 .await
2003 .expect("we have advanced the restart_as_of by the since");
2004 soft_assert_no_log!(
2005 snapshot.iter().all(|(_, _, diff)| *diff == 1),
2006 "snapshot_and_fetch guarantees a consolidated result: {snapshot:#?}"
2007 );
2008 snapshot
2009 .into_iter()
2010 .map(Into::<StateUpdate<StateUpdateKindJson>>::into)
2011 .sorted_by(|a, b| Ord::cmp(&b.ts, &a.ts))
2012}
2013
2014pub(crate) fn antichain_to_timestamp(antichain: Antichain<Timestamp>) -> Timestamp {
2019 antichain
2020 .into_option()
2021 .expect("we use a totally ordered time and never finalize the shard")
2022}
2023
2024impl Trace {
2027 fn from_snapshot(snapshot: impl IntoIterator<Item = StateUpdate>) -> Trace {
2029 let mut trace = Trace::new();
2030 for StateUpdate { kind, ts, diff } in snapshot {
2031 match kind {
2032 StateUpdateKind::AuditLog(k, v) => trace.audit_log.values.push(((k, v), ts, diff)),
2033 StateUpdateKind::Cluster(k, v) => trace.clusters.values.push(((k, v), ts, diff)),
2034 StateUpdateKind::ClusterReplica(k, v) => {
2035 trace.cluster_replicas.values.push(((k, v), ts, diff))
2036 }
2037 StateUpdateKind::Comment(k, v) => trace.comments.values.push(((k, v), ts, diff)),
2038 StateUpdateKind::Config(k, v) => trace.configs.values.push(((k, v), ts, diff)),
2039 StateUpdateKind::Database(k, v) => trace.databases.values.push(((k, v), ts, diff)),
2040 StateUpdateKind::DefaultPrivilege(k, v) => {
2041 trace.default_privileges.values.push(((k, v), ts, diff))
2042 }
2043 StateUpdateKind::FenceToken(_) => {
2044 }
2046 StateUpdateKind::IdAllocator(k, v) => {
2047 trace.id_allocator.values.push(((k, v), ts, diff))
2048 }
2049 StateUpdateKind::IntrospectionSourceIndex(k, v) => {
2050 trace.introspection_sources.values.push(((k, v), ts, diff))
2051 }
2052 StateUpdateKind::Item(k, v) => trace.items.values.push(((k, v), ts, diff)),
2053 StateUpdateKind::NetworkPolicy(k, v) => {
2054 trace.network_policies.values.push(((k, v), ts, diff))
2055 }
2056 StateUpdateKind::Role(k, v) => trace.roles.values.push(((k, v), ts, diff)),
2057 StateUpdateKind::Schema(k, v) => trace.schemas.values.push(((k, v), ts, diff)),
2058 StateUpdateKind::Setting(k, v) => trace.settings.values.push(((k, v), ts, diff)),
2059 StateUpdateKind::SourceReferences(k, v) => {
2060 trace.source_references.values.push(((k, v), ts, diff))
2061 }
2062 StateUpdateKind::SystemConfiguration(k, v) => {
2063 trace.system_configurations.values.push(((k, v), ts, diff))
2064 }
2065 StateUpdateKind::ClusterSystemConfiguration(k, v) => trace
2066 .cluster_system_configurations
2067 .values
2068 .push(((k, v), ts, diff)),
2069 StateUpdateKind::ReplicaSystemConfiguration(k, v) => trace
2070 .replica_system_configurations
2071 .values
2072 .push(((k, v), ts, diff)),
2073 StateUpdateKind::SystemObjectMapping(k, v) => {
2074 trace.system_object_mappings.values.push(((k, v), ts, diff))
2075 }
2076 StateUpdateKind::SystemPrivilege(k, v) => {
2077 trace.system_privileges.values.push(((k, v), ts, diff))
2078 }
2079 StateUpdateKind::StorageCollectionMetadata(k, v) => trace
2080 .storage_collection_metadata
2081 .values
2082 .push(((k, v), ts, diff)),
2083 StateUpdateKind::UnfinalizedShard(k, ()) => {
2084 trace.unfinalized_shards.values.push(((k, ()), ts, diff))
2085 }
2086 StateUpdateKind::TxnWalShard((), v) => {
2087 trace.txn_wal_shard.values.push((((), v), ts, diff))
2088 }
2089 StateUpdateKind::RoleAuth(k, v) => trace.role_auth.values.push(((k, v), ts, diff)),
2090 }
2091 }
2092 trace
2093 }
2094}
2095
2096impl UnopenedPersistCatalogState {
2097 #[mz_ore::instrument]
2099 pub(crate) async fn debug_edit<T: Collection>(
2100 &mut self,
2101 key: T::Key,
2102 value: T::Value,
2103 ) -> Result<Option<T::Value>, CatalogError>
2104 where
2105 T::Key: PartialEq + Eq + Debug + Clone,
2106 T::Value: Debug + Clone,
2107 {
2108 let prev_value = loop {
2109 let key = key.clone();
2110 let value = value.clone();
2111 let snapshot = self.current_snapshot().await?;
2112 let trace = Trace::from_snapshot(snapshot);
2113 let collection_trace = T::collection_trace(trace);
2114 let prev_values: Vec<_> = collection_trace
2115 .values
2116 .into_iter()
2117 .filter(|((k, _), _, diff)| {
2118 soft_assert_eq_or_log!(*diff, Diff::ONE, "trace is consolidated");
2119 &key == k
2120 })
2121 .collect();
2122
2123 let prev_value = match &prev_values[..] {
2124 [] => None,
2125 [((_, v), _, _)] => Some(v.clone()),
2126 prev_values => panic!("multiple values found for key {key:?}: {prev_values:?}"),
2127 };
2128
2129 let mut updates: Vec<_> = prev_values
2130 .into_iter()
2131 .map(|((k, v), _, _)| (T::update(k, v), Diff::MINUS_ONE))
2132 .collect();
2133 updates.push((T::update(key, value), Diff::ONE));
2134 match self.fenceable_token.generate_unfenced_token(self.mode)? {
2136 Some((fence_updates, current_fenceable_token)) => {
2137 updates.extend(fence_updates.clone());
2138 match self.compare_and_append(updates, self.upper).await {
2139 Ok(_) => {
2140 self.fenceable_token = current_fenceable_token;
2141 break prev_value;
2142 }
2143 Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
2144 Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
2145 warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
2146 continue;
2147 }
2148 }
2149 }
2150 None => {
2151 self.compare_and_append(updates, self.upper)
2152 .await
2153 .map_err(|e| e.unwrap_fence_error())?;
2154 break prev_value;
2155 }
2156 }
2157 };
2158 Ok(prev_value)
2159 }
2160
2161 #[mz_ore::instrument]
2163 pub(crate) async fn debug_delete<T: Collection>(
2164 &mut self,
2165 key: T::Key,
2166 ) -> Result<(), CatalogError>
2167 where
2168 T::Key: PartialEq + Eq + Debug + Clone,
2169 T::Value: Debug,
2170 {
2171 loop {
2172 let key = key.clone();
2173 let snapshot = self.current_snapshot().await?;
2174 let trace = Trace::from_snapshot(snapshot);
2175 let collection_trace = T::collection_trace(trace);
2176 let mut retractions: Vec<_> = collection_trace
2177 .values
2178 .into_iter()
2179 .filter(|((k, _), _, diff)| {
2180 soft_assert_eq_or_log!(*diff, Diff::ONE, "trace is consolidated");
2181 &key == k
2182 })
2183 .map(|((k, v), _, _)| (T::update(k, v), Diff::MINUS_ONE))
2184 .collect();
2185
2186 match self.fenceable_token.generate_unfenced_token(self.mode)? {
2188 Some((fence_updates, current_fenceable_token)) => {
2189 retractions.extend(fence_updates.clone());
2190 match self.compare_and_append(retractions, self.upper).await {
2191 Ok(_) => {
2192 self.fenceable_token = current_fenceable_token;
2193 break;
2194 }
2195 Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
2196 Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
2197 warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
2198 continue;
2199 }
2200 }
2201 }
2202 None => {
2203 self.compare_and_append(retractions, self.upper)
2204 .await
2205 .map_err(|e| e.unwrap_fence_error())?;
2206 break;
2207 }
2208 }
2209 }
2210 Ok(())
2211 }
2212
2213 async fn current_snapshot(
2218 &mut self,
2219 ) -> Result<impl IntoIterator<Item = StateUpdate> + '_, CatalogError> {
2220 self.sync_to_current_upper().await?;
2221 self.consolidate();
2222 Ok(self.snapshot.iter().cloned().map(|(kind, ts, diff)| {
2223 let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
2224 StateUpdate { kind, ts, diff }
2225 }))
2226 }
2227}