1#[cfg(test)]
11mod tests;
12
13use std::cmp::max;
14use std::collections::{BTreeMap, VecDeque};
15use std::fmt::Debug;
16use std::str::FromStr;
17use std::sync::{Arc, LazyLock};
18use std::time::{Duration, Instant};
19
20use async_trait::async_trait;
21use differential_dataflow::lattice::Lattice;
22use futures::{FutureExt, StreamExt};
23use itertools::Itertools;
24use mz_audit_log::VersionedEvent;
25use mz_ore::metrics::MetricsFutureExt;
26use mz_ore::now::EpochMillis;
27use mz_ore::{
28 soft_assert_eq_no_log, soft_assert_eq_or_log, soft_assert_ne_or_log, soft_assert_no_log,
29 soft_assert_or_log, soft_panic_or_log,
30};
31use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
32use mz_persist_client::cli::admin::{CATALOG_FORCE_COMPACTION_FUEL, CATALOG_FORCE_COMPACTION_WAIT};
33use mz_persist_client::critical::{CriticalReaderId, Opaque, SinceHandle};
34use mz_persist_client::error::UpperMismatch;
35use mz_persist_client::read::{Listen, ListenEvent, ReadHandle};
36use mz_persist_client::write::WriteHandle;
37use mz_persist_client::{Diagnostics, PersistClient, ShardId};
38use mz_persist_types::codec_impls::UnitSchema;
39use mz_proto::{RustType, TryFromProtoError};
40use mz_repr::Diff;
41use mz_storage_client::controller::PersistEpoch;
42use mz_storage_types::StorageDiff;
43use mz_storage_types::sources::SourceData;
44use sha2::Digest;
45use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
46use tracing::{debug, info, warn};
47use uuid::Uuid;
48
49use crate::durable::debug::{Collection, CollectionType, DebugCatalogState, Trace};
50use crate::durable::error::FenceError;
51use crate::durable::initialize::{
52 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, SYSTEM_CONFIG_SYNCED_KEY, USER_VERSION_KEY,
53 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
54};
55use crate::durable::metrics::Metrics;
56use crate::durable::objects::state_update::{
57 IntoStateUpdateKindJson, StateUpdate, StateUpdateKind, StateUpdateKindJson,
58 TryIntoStateUpdateKind,
59};
60use crate::durable::objects::{AuditLogKey, FenceToken, Snapshot};
61use crate::durable::transaction::TransactionBatch;
62use crate::durable::upgrade::upgrade;
63use crate::durable::{
64 AuditLogIterator, BootstrapArgs, CATALOG_CONTENT_VERSION_KEY, CatalogError,
65 DurableCatalogError, DurableCatalogState, Epoch, OpenableDurableCatalogState,
66 ReadOnlyDurableCatalogState, Transaction, initialize, persist_desc,
67};
68use crate::memory;
69
70pub(crate) type Timestamp = mz_repr::Timestamp;
72
73const MIN_EPOCH: Epoch = unsafe { Epoch::new_unchecked(1) };
78
79const CATALOG_SHARD_NAME: &str = "catalog";
81
82static CATALOG_CRITICAL_SINCE: LazyLock<CriticalReaderId> = LazyLock::new(|| {
85 "c55555555-6666-7777-8888-999999999999"
86 .parse()
87 .expect("valid CriticalReaderId")
88});
89
90const CATALOG_SEED: usize = 1;
92const _UPGRADE_SEED: usize = 2;
94pub const _BUILTIN_MIGRATION_SEED: usize = 3;
96pub const _EXPRESSION_CACHE_SEED: usize = 4;
98
99#[derive(Debug, Copy, Clone, Eq, PartialEq)]
101pub(crate) enum Mode {
102 Readonly,
104 Savepoint,
106 Writable,
108}
109
110#[derive(Debug)]
112pub(crate) enum FenceableToken {
113 Initializing {
116 durable_token: Option<FenceToken>,
118 current_deploy_generation: Option<u64>,
120 },
121 Unfenced { current_token: FenceToken },
123 Fenced {
125 current_token: FenceToken,
126 fence_token: FenceToken,
127 },
128}
129
130impl FenceableToken {
131 fn new(current_deploy_generation: Option<u64>) -> Self {
133 Self::Initializing {
134 durable_token: None,
135 current_deploy_generation,
136 }
137 }
138
139 fn validate(&self) -> Result<Option<FenceToken>, FenceError> {
141 match self {
142 FenceableToken::Initializing { durable_token, .. } => Ok(durable_token.clone()),
143 FenceableToken::Unfenced { current_token, .. } => Ok(Some(current_token.clone())),
144 FenceableToken::Fenced {
145 current_token,
146 fence_token,
147 } => {
148 assert!(
149 fence_token > current_token,
150 "must be fenced by higher token; current={current_token:?}, fence={fence_token:?}"
151 );
152 if fence_token.deploy_generation > current_token.deploy_generation {
153 Err(FenceError::DeployGeneration {
154 current_generation: current_token.deploy_generation,
155 fence_generation: fence_token.deploy_generation,
156 })
157 } else {
158 assert!(
159 fence_token.epoch > current_token.epoch,
160 "must be fenced by higher token; current={current_token:?}, fence={fence_token:?}"
161 );
162 Err(FenceError::Epoch {
163 current_epoch: current_token.epoch,
164 fence_epoch: fence_token.epoch,
165 })
166 }
167 }
168 }
169 }
170
171 fn token(&self) -> Option<FenceToken> {
173 match self {
174 FenceableToken::Initializing { durable_token, .. } => durable_token.clone(),
175 FenceableToken::Unfenced { current_token, .. } => Some(current_token.clone()),
176 FenceableToken::Fenced { current_token, .. } => Some(current_token.clone()),
177 }
178 }
179
180 fn maybe_fence(&mut self, token: FenceToken) -> Result<(), FenceError> {
182 match self {
183 FenceableToken::Initializing {
184 durable_token,
185 current_deploy_generation,
186 ..
187 } => {
188 match durable_token {
189 Some(durable_token) => {
190 *durable_token = max(durable_token.clone(), token.clone());
191 }
192 None => {
193 *durable_token = Some(token.clone());
194 }
195 }
196 if let Some(current_deploy_generation) = current_deploy_generation {
197 if *current_deploy_generation < token.deploy_generation {
198 *self = FenceableToken::Fenced {
199 current_token: FenceToken {
200 deploy_generation: *current_deploy_generation,
201 epoch: token.epoch,
202 },
203 fence_token: token,
204 };
205 self.validate()?;
206 }
207 }
208 }
209 FenceableToken::Unfenced { current_token } => {
210 if *current_token < token {
211 *self = FenceableToken::Fenced {
212 current_token: current_token.clone(),
213 fence_token: token,
214 };
215 self.validate()?;
216 }
217 }
218 FenceableToken::Fenced { .. } => {
219 self.validate()?;
220 }
221 }
222
223 Ok(())
224 }
225
226 fn generate_unfenced_token(
230 &self,
231 mode: Mode,
232 ) -> Result<Option<(Vec<(StateUpdateKind, Diff)>, FenceableToken)>, DurableCatalogError> {
233 let (durable_token, current_deploy_generation) = match self {
234 FenceableToken::Initializing {
235 durable_token,
236 current_deploy_generation,
237 } => (durable_token.clone(), current_deploy_generation.clone()),
238 FenceableToken::Unfenced { .. } | FenceableToken::Fenced { .. } => return Ok(None),
239 };
240
241 let mut fence_updates = Vec::with_capacity(2);
242
243 if let Some(durable_token) = &durable_token {
244 fence_updates.push((
245 StateUpdateKind::FenceToken(durable_token.clone()),
246 Diff::MINUS_ONE,
247 ));
248 }
249
250 let current_deploy_generation = current_deploy_generation
251 .or_else(|| durable_token.as_ref().map(|token| token.deploy_generation))
252 .ok_or(DurableCatalogError::Uninitialized)?;
254 let mut current_epoch = durable_token
255 .map(|token| token.epoch)
256 .unwrap_or(MIN_EPOCH)
257 .get();
258 if matches!(mode, Mode::Writable) {
260 current_epoch = current_epoch + 1;
261 }
262 let current_epoch = Epoch::new(current_epoch).expect("known to be non-zero");
263 let current_token = FenceToken {
264 deploy_generation: current_deploy_generation,
265 epoch: current_epoch,
266 };
267
268 fence_updates.push((
269 StateUpdateKind::FenceToken(current_token.clone()),
270 Diff::ONE,
271 ));
272
273 let current_fenceable_token = FenceableToken::Unfenced { current_token };
274
275 Ok(Some((fence_updates, current_fenceable_token)))
276 }
277}
278
279#[derive(Debug, thiserror::Error)]
281pub(crate) enum CompareAndAppendError {
282 #[error(transparent)]
283 Fence(#[from] FenceError),
284 #[error(
287 "expected catalog upper {expected_upper:?} did not match actual catalog upper {actual_upper:?}"
288 )]
289 UpperMismatch {
290 expected_upper: Timestamp,
291 actual_upper: Timestamp,
292 },
293}
294
295impl CompareAndAppendError {
296 pub(crate) fn unwrap_fence_error(self) -> FenceError {
297 match self {
298 CompareAndAppendError::Fence(e) => e,
299 e @ CompareAndAppendError::UpperMismatch { .. } => {
300 panic!("unexpected upper mismatch: {e:?}")
301 }
302 }
303 }
304}
305
306impl From<UpperMismatch<Timestamp>> for CompareAndAppendError {
307 fn from(upper_mismatch: UpperMismatch<Timestamp>) -> Self {
308 Self::UpperMismatch {
309 expected_upper: antichain_to_timestamp(upper_mismatch.expected),
310 actual_upper: antichain_to_timestamp(upper_mismatch.current),
311 }
312 }
313}
314
315pub(crate) trait ApplyUpdate<T: IntoStateUpdateKindJson> {
316 fn apply_update(
320 &mut self,
321 update: StateUpdate<T>,
322 current_fence_token: &mut FenceableToken,
323 metrics: &Arc<Metrics>,
324 ) -> Result<Option<StateUpdate<T>>, FenceError>;
325}
326
327#[derive(Debug)]
341pub(crate) struct PersistHandle<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> {
342 pub(crate) mode: Mode,
344 since_handle: SinceHandle<SourceData, (), Timestamp, StorageDiff>,
346 write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
348 listen: Listen<SourceData, (), Timestamp, StorageDiff>,
350 persist_client: PersistClient,
352 shard_id: ShardId,
354 pub(crate) snapshot: Vec<(T, Timestamp, Diff)>,
358 update_applier: U,
360 pub(crate) upper: Timestamp,
362 fenceable_token: FenceableToken,
364 catalog_content_version: semver::Version,
366 bootstrap_complete: bool,
368 metrics: Arc<Metrics>,
370 size_at_last_consolidation: Option<usize>,
374}
375
376impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
377 #[mz_ore::instrument]
379 async fn current_upper(&mut self) -> Timestamp {
380 match self.mode {
381 Mode::Writable | Mode::Readonly => {
382 let upper = self.write_handle.fetch_recent_upper().await;
383 antichain_to_timestamp(upper.clone())
384 }
385 Mode::Savepoint => self.upper,
386 }
387 }
388
389 #[mz_ore::instrument]
393 pub(crate) async fn compare_and_append<S: IntoStateUpdateKindJson>(
394 &mut self,
395 updates: Vec<(S, Diff)>,
396 commit_ts: Timestamp,
397 ) -> Result<Timestamp, CompareAndAppendError> {
398 let contains_fence = if mz_ore::assert::soft_assertions_enabled() {
400 let parsed_updates: Vec<_> = updates
401 .clone()
402 .into_iter()
403 .filter_map(|(update, diff)| {
404 let update: StateUpdateKindJson = update.into();
405 let update = TryIntoStateUpdateKind::try_into(update).ok()?;
406 Some((update, diff))
407 })
408 .collect();
409 let contains_retraction = parsed_updates.iter().any(|(update, diff)| {
410 matches!(update, StateUpdateKind::FenceToken(..)) && *diff == Diff::MINUS_ONE
411 });
412 let contains_addition = parsed_updates.iter().any(|(update, diff)| {
413 matches!(update, StateUpdateKind::FenceToken(..)) && *diff == Diff::ONE
414 });
415 Some(contains_retraction && contains_addition)
416 } else {
417 None
418 };
419
420 let updates = updates.into_iter().map(|(kind, diff)| {
421 let kind: StateUpdateKindJson = kind.into();
422 (
423 (Into::<SourceData>::into(kind), ()),
424 commit_ts,
425 diff.into_inner(),
426 )
427 });
428 let next_upper = commit_ts.step_forward();
429 self.compare_and_append_inner(updates, next_upper)
430 .await
431 .inspect_err(|e| {
432 if let Some(contains_fence) = contains_fence {
437 soft_assert_or_log!(
438 matches!(e, CompareAndAppendError::Fence(_)) || contains_fence,
439 "encountered an upper mismatch on a non-fencing write"
440 );
441 }
442 })?;
443
444 self.sync(next_upper).await?;
445 Ok(next_upper)
446 }
447
448 async fn compare_and_append_inner(
458 &mut self,
459 updates: impl IntoIterator<Item = ((SourceData, ()), Timestamp, StorageDiff)>,
460 next_upper: Timestamp,
461 ) -> Result<(), CompareAndAppendError> {
462 assert_eq!(self.mode, Mode::Writable);
463 assert!(
464 next_upper > self.upper,
465 "next_upper ({next_upper}) not greater than current upper ({})",
466 self.upper,
467 );
468
469 let res = self
470 .write_handle
471 .compare_and_append(
472 updates,
473 Antichain::from_elem(self.upper),
474 Antichain::from_elem(next_upper),
475 )
476 .await
477 .expect("invalid usage");
478
479 if let Err(e @ UpperMismatch { .. }) = res {
480 self.sync_to_current_upper().await?;
483 return Err(e.into());
484 }
485
486 let downgrade_to = Antichain::from_elem(next_upper.saturating_sub(1));
488
489 let opaque = self.since_handle.opaque().clone();
494 let downgrade = self
495 .since_handle
496 .maybe_compare_and_downgrade_since(&opaque, (&opaque, &downgrade_to))
497 .await;
498 if let Some(Err(e)) = downgrade {
499 soft_panic_or_log!("found opaque value {e:?}, but expected {opaque:?}");
500 }
501
502 Ok(())
503 }
504
505 #[mz_ore::instrument]
508 async fn snapshot_unconsolidated(&mut self) -> Vec<StateUpdate<StateUpdateKind>> {
509 let current_upper = self.current_upper().await;
510
511 let mut snapshot = Vec::new();
512 let mut read_handle = self.read_handle().await;
513 let as_of = as_of(&read_handle, current_upper);
514 let mut stream = Box::pin(
515 read_handle
517 .snapshot_and_stream(Antichain::from_elem(as_of))
518 .await
519 .expect("we have advanced the restart_as_of by the since"),
520 );
521 while let Some(update) = stream.next().await {
522 snapshot.push(update)
523 }
524 read_handle.expire().await;
525 snapshot
526 .into_iter()
527 .map(Into::<StateUpdate<StateUpdateKindJson>>::into)
528 .map(|state_update| state_update.try_into().expect("kind decoding error"))
529 .collect()
530 }
531
532 #[mz_ore::instrument]
536 pub(crate) async fn sync_to_current_upper(&mut self) -> Result<(), FenceError> {
537 let upper = self.current_upper().await;
538 self.sync(upper).await
539 }
540
541 #[mz_ore::instrument(level = "debug")]
545 pub(crate) async fn sync(&mut self, target_upper: Timestamp) -> Result<(), FenceError> {
546 self.metrics.syncs.inc();
547 let counter = self.metrics.sync_latency_seconds.clone();
548 self.sync_inner(target_upper)
549 .wall_time()
550 .inc_by(counter)
551 .await
552 }
553
554 #[mz_ore::instrument(level = "debug")]
555 async fn sync_inner(&mut self, target_upper: Timestamp) -> Result<(), FenceError> {
556 self.fenceable_token.validate()?;
557
558 if self.mode == Mode::Savepoint {
561 self.upper = max(self.upper, target_upper);
562 return Ok(());
563 }
564
565 let mut updates: BTreeMap<_, Vec<_>> = BTreeMap::new();
566
567 self.size_at_last_consolidation = None;
570
571 while self.upper < target_upper {
572 let listen_events = self.listen.fetch_next().await;
573 for listen_event in listen_events {
574 match listen_event {
575 ListenEvent::Progress(upper) => {
576 debug!("synced up to {upper:?}");
577 self.upper = antichain_to_timestamp(upper);
578 while let Some((ts, updates)) = updates.pop_first() {
583 assert!(ts < self.upper, "expected {} < {}", ts, self.upper);
584 let updates = updates.into_iter().map(
585 |update: StateUpdate<StateUpdateKindJson>| {
586 let kind =
587 T::try_from(update.kind).expect("kind decoding error");
588 StateUpdate {
589 kind,
590 ts: update.ts,
591 diff: update.diff,
592 }
593 },
594 );
595 self.apply_updates(updates)?;
596 self.maybe_consolidate();
597 }
598 }
599 ListenEvent::Updates(batch_updates) => {
600 for update in batch_updates {
601 let update: StateUpdate<StateUpdateKindJson> = update.into();
602 updates.entry(update.ts).or_default().push(update);
603 }
604 }
605 }
606 }
607 }
608 assert_eq!(updates, BTreeMap::new(), "all updates should be applied");
609 self.consolidate();
611 Ok(())
612 }
613
614 pub(crate) fn apply_updates_and_consolidate(
621 &mut self,
622 updates: impl IntoIterator<Item = StateUpdate<T>>,
623 ) -> Result<(), FenceError> {
624 self.apply_updates(updates)?;
625 self.consolidate();
626 Ok(())
627 }
628
629 #[mz_ore::instrument(level = "debug")]
637 fn apply_updates(
638 &mut self,
639 updates: impl IntoIterator<Item = StateUpdate<T>>,
640 ) -> Result<(), FenceError> {
641 let mut updates: Vec<_> = updates
642 .into_iter()
643 .map(|StateUpdate { kind, ts, diff }| (kind, ts, diff))
644 .collect();
645
646 differential_dataflow::consolidation::consolidate_updates(&mut updates);
650
651 updates.sort_by(|(_, ts1, diff1), (_, ts2, diff2)| ts1.cmp(ts2).then(diff1.cmp(diff2)));
654
655 let mut errors = Vec::new();
656
657 for (kind, ts, diff) in updates {
658 if diff != Diff::ONE && diff != Diff::MINUS_ONE {
659 panic!("invalid update in consolidated trace: ({kind:?}, {ts:?}, {diff:?})");
660 }
661
662 match self.update_applier.apply_update(
663 StateUpdate { kind, ts, diff },
664 &mut self.fenceable_token,
665 &self.metrics,
666 ) {
667 Ok(Some(StateUpdate { kind, ts, diff })) => self.snapshot.push((kind, ts, diff)),
668 Ok(None) => {}
669 Err(err) => errors.push(err),
672 }
673 }
674
675 let len = i64::try_from(self.snapshot.len()).unwrap_or(i64::MAX);
677 if len > self.metrics.snapshot_max_entries.get() {
678 self.metrics.snapshot_max_entries.set(len);
679 }
680
681 errors.sort();
682 if let Some(err) = errors.into_iter().next() {
683 return Err(err);
684 }
685
686 Ok(())
687 }
688
689 fn maybe_consolidate(&mut self) {
694 let threshold = *self
695 .size_at_last_consolidation
696 .get_or_insert_with(|| max(self.snapshot.len(), 8));
699 if self.snapshot.len() >= threshold * 2 {
700 self.consolidate();
701 self.size_at_last_consolidation = Some(self.snapshot.len());
702 }
703 }
704
705 #[mz_ore::instrument]
706 pub(crate) fn consolidate(&mut self) {
707 self.metrics.snapshot_consolidations.inc();
708 soft_assert_no_log!(
709 self.snapshot
710 .windows(2)
711 .all(|updates| updates[0].1 <= updates[1].1),
712 "snapshot should be sorted by timestamp, {:#?}",
713 self.snapshot
714 );
715
716 let new_ts = self
717 .snapshot
718 .last()
719 .map(|(_, ts, _)| *ts)
720 .unwrap_or_else(Timestamp::minimum);
721 for (_, ts, _) in &mut self.snapshot {
722 *ts = new_ts;
723 }
724 differential_dataflow::consolidation::consolidate_updates(&mut self.snapshot);
725 }
726
727 async fn with_trace<R>(
731 &mut self,
732 f: impl FnOnce(&Vec<(T, Timestamp, Diff)>) -> Result<R, CatalogError>,
733 ) -> Result<R, CatalogError> {
734 self.sync_to_current_upper().await?;
735 f(&self.snapshot)
736 }
737
738 async fn read_handle(&self) -> ReadHandle<SourceData, (), Timestamp, StorageDiff> {
740 self.persist_client
741 .open_leased_reader(
742 self.shard_id,
743 Arc::new(persist_desc()),
744 Arc::new(UnitSchema::default()),
745 Diagnostics {
746 shard_name: CATALOG_SHARD_NAME.to_string(),
747 handle_purpose: "openable durable catalog state temporary reader".to_string(),
748 },
749 USE_CRITICAL_SINCE_CATALOG.get(self.persist_client.dyncfgs()),
750 )
751 .await
752 .expect("invalid usage")
753 }
754
755 async fn expire(self: Box<Self>) {
757 self.write_handle.expire().await;
758 self.listen.expire().await;
759 }
760}
761
762impl<U: ApplyUpdate<StateUpdateKind>> PersistHandle<StateUpdateKind, U> {
763 async fn with_snapshot<T>(
767 &mut self,
768 f: impl FnOnce(Snapshot) -> Result<T, CatalogError>,
769 ) -> Result<T, CatalogError> {
770 fn apply<K, V>(map: &mut BTreeMap<K, V>, key: &K, value: &V, diff: Diff)
771 where
772 K: Ord + Clone,
773 V: Ord + Clone + Debug,
774 {
775 let key = key.clone();
776 let value = value.clone();
777 if diff == Diff::ONE {
778 let prev = map.insert(key, value);
779 assert_eq!(
780 prev, None,
781 "values must be explicitly retracted before inserting a new value"
782 );
783 } else if diff == Diff::MINUS_ONE {
784 let prev = map.remove(&key);
785 assert_eq!(
786 prev,
787 Some(value),
788 "retraction does not match existing value"
789 );
790 }
791 }
792
793 self.with_trace(|trace| {
794 let mut snapshot = Snapshot::empty();
795 for (kind, ts, diff) in trace {
796 let diff = *diff;
797 if diff != Diff::ONE && diff != Diff::MINUS_ONE {
798 panic!("invalid update in consolidated trace: ({kind:?}, {ts:?}, {diff:?})");
799 }
800
801 match kind {
802 StateUpdateKind::AuditLog(_key, ()) => {
803 }
805 StateUpdateKind::Cluster(key, value) => {
806 apply(&mut snapshot.clusters, key, value, diff);
807 }
808 StateUpdateKind::ClusterReplica(key, value) => {
809 apply(&mut snapshot.cluster_replicas, key, value, diff);
810 }
811 StateUpdateKind::Comment(key, value) => {
812 apply(&mut snapshot.comments, key, value, diff);
813 }
814 StateUpdateKind::Config(key, value) => {
815 apply(&mut snapshot.configs, key, value, diff);
816 }
817 StateUpdateKind::Database(key, value) => {
818 apply(&mut snapshot.databases, key, value, diff);
819 }
820 StateUpdateKind::DefaultPrivilege(key, value) => {
821 apply(&mut snapshot.default_privileges, key, value, diff);
822 }
823 StateUpdateKind::FenceToken(_token) => {
824 }
826 StateUpdateKind::IdAllocator(key, value) => {
827 apply(&mut snapshot.id_allocator, key, value, diff);
828 }
829 StateUpdateKind::IntrospectionSourceIndex(key, value) => {
830 apply(&mut snapshot.introspection_sources, key, value, diff);
831 }
832 StateUpdateKind::Item(key, value) => {
833 apply(&mut snapshot.items, key, value, diff);
834 }
835 StateUpdateKind::NetworkPolicy(key, value) => {
836 apply(&mut snapshot.network_policies, key, value, diff);
837 }
838 StateUpdateKind::Role(key, value) => {
839 apply(&mut snapshot.roles, key, value, diff);
840 }
841 StateUpdateKind::Schema(key, value) => {
842 apply(&mut snapshot.schemas, key, value, diff);
843 }
844 StateUpdateKind::Setting(key, value) => {
845 apply(&mut snapshot.settings, key, value, diff);
846 }
847 StateUpdateKind::SourceReferences(key, value) => {
848 apply(&mut snapshot.source_references, key, value, diff);
849 }
850 StateUpdateKind::SystemConfiguration(key, value) => {
851 apply(&mut snapshot.system_configurations, key, value, diff);
852 }
853 StateUpdateKind::SystemObjectMapping(key, value) => {
854 apply(&mut snapshot.system_object_mappings, key, value, diff);
855 }
856 StateUpdateKind::SystemPrivilege(key, value) => {
857 apply(&mut snapshot.system_privileges, key, value, diff);
858 }
859 StateUpdateKind::StorageCollectionMetadata(key, value) => {
860 apply(&mut snapshot.storage_collection_metadata, key, value, diff);
861 }
862 StateUpdateKind::UnfinalizedShard(key, ()) => {
863 apply(&mut snapshot.unfinalized_shards, key, &(), diff);
864 }
865 StateUpdateKind::TxnWalShard((), value) => {
866 apply(&mut snapshot.txn_wal_shard, &(), value, diff);
867 }
868 StateUpdateKind::RoleAuth(key, value) => {
869 apply(&mut snapshot.role_auth, key, value, diff);
870 }
871 }
872 }
873 f(snapshot)
874 })
875 .await
876 }
877
878 #[mz_ore::instrument(level = "debug")]
885 async fn persist_snapshot(&self) -> impl Iterator<Item = StateUpdate> + DoubleEndedIterator {
886 let mut read_handle = self.read_handle().await;
887 let as_of = as_of(&read_handle, self.upper);
888 let snapshot = snapshot_binary(&mut read_handle, as_of, &self.metrics)
889 .await
890 .map(|update| update.try_into().expect("kind decoding error"));
891 read_handle.expire().await;
892 snapshot
893 }
894}
895
896#[derive(Debug)]
898pub(crate) struct UnopenedCatalogStateInner {
899 configs: BTreeMap<String, u64>,
901 settings: BTreeMap<String, String>,
903}
904
905impl UnopenedCatalogStateInner {
906 fn new() -> UnopenedCatalogStateInner {
907 UnopenedCatalogStateInner {
908 configs: BTreeMap::new(),
909 settings: BTreeMap::new(),
910 }
911 }
912}
913
914impl ApplyUpdate<StateUpdateKindJson> for UnopenedCatalogStateInner {
915 fn apply_update(
916 &mut self,
917 update: StateUpdate<StateUpdateKindJson>,
918 current_fence_token: &mut FenceableToken,
919 _metrics: &Arc<Metrics>,
920 ) -> Result<Option<StateUpdate<StateUpdateKindJson>>, FenceError> {
921 if !update.kind.is_audit_log() && update.kind.is_always_deserializable() {
922 let kind = TryInto::try_into(&update.kind).expect("kind is known to be deserializable");
923 match (kind, update.diff) {
924 (StateUpdateKind::Config(key, value), Diff::ONE) => {
925 let prev = self.configs.insert(key.key, value.value);
926 assert_eq!(
927 prev, None,
928 "values must be explicitly retracted before inserting a new value"
929 );
930 }
931 (StateUpdateKind::Config(key, value), Diff::MINUS_ONE) => {
932 let prev = self.configs.remove(&key.key);
933 assert_eq!(
934 prev,
935 Some(value.value),
936 "retraction does not match existing value"
937 );
938 }
939 (StateUpdateKind::Setting(key, value), Diff::ONE) => {
940 let prev = self.settings.insert(key.name, value.value);
941 assert_eq!(
942 prev, None,
943 "values must be explicitly retracted before inserting a new value"
944 );
945 }
946 (StateUpdateKind::Setting(key, value), Diff::MINUS_ONE) => {
947 let prev = self.settings.remove(&key.name);
948 assert_eq!(
949 prev,
950 Some(value.value),
951 "retraction does not match existing value"
952 );
953 }
954 (StateUpdateKind::FenceToken(fence_token), Diff::ONE) => {
955 current_fence_token.maybe_fence(fence_token)?;
956 }
957 _ => {}
958 }
959 }
960
961 Ok(Some(update))
962 }
963}
964
965pub(crate) type UnopenedPersistCatalogState =
973 PersistHandle<StateUpdateKindJson, UnopenedCatalogStateInner>;
974
975impl UnopenedPersistCatalogState {
976 #[mz_ore::instrument]
982 pub(crate) async fn new(
983 persist_client: PersistClient,
984 organization_id: Uuid,
985 version: semver::Version,
986 deploy_generation: Option<u64>,
987 metrics: Arc<Metrics>,
988 ) -> Result<UnopenedPersistCatalogState, DurableCatalogError> {
989 let catalog_shard_id = shard_id(organization_id, CATALOG_SEED);
990 debug!(?catalog_shard_id, "new persist backed catalog state");
991
992 let version_in_catalog_shard =
996 fetch_catalog_shard_version(&persist_client, catalog_shard_id).await;
997 if let Some(version_in_catalog_shard) = version_in_catalog_shard {
998 if !mz_persist_client::cfg::code_can_write_data(&version, &version_in_catalog_shard) {
999 return Err(DurableCatalogError::IncompatiblePersistVersion {
1000 found_version: version_in_catalog_shard,
1001 catalog_version: version,
1002 });
1003 }
1004 }
1005
1006 let open_handles_start = Instant::now();
1007 info!("startup: envd serve: catalog init: open handles beginning");
1008 let since_handle = persist_client
1009 .open_critical_since(
1010 catalog_shard_id,
1011 CATALOG_CRITICAL_SINCE.clone(),
1012 Opaque::encode(&i64::MIN),
1013 Diagnostics {
1014 shard_name: CATALOG_SHARD_NAME.to_string(),
1015 handle_purpose: "durable catalog state critical since".to_string(),
1016 },
1017 )
1018 .await
1019 .expect("invalid usage");
1020
1021 let (mut write_handle, mut read_handle) = persist_client
1022 .open(
1023 catalog_shard_id,
1024 Arc::new(persist_desc()),
1025 Arc::new(UnitSchema::default()),
1026 Diagnostics {
1027 shard_name: CATALOG_SHARD_NAME.to_string(),
1028 handle_purpose: "durable catalog state handles".to_string(),
1029 },
1030 USE_CRITICAL_SINCE_CATALOG.get(persist_client.dyncfgs()),
1031 )
1032 .await
1033 .expect("invalid usage");
1034 info!(
1035 "startup: envd serve: catalog init: open handles complete in {:?}",
1036 open_handles_start.elapsed()
1037 );
1038
1039 let upper = {
1041 const EMPTY_UPDATES: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
1042 let upper = Antichain::from_elem(Timestamp::minimum());
1043 let next_upper = Timestamp::minimum().step_forward();
1044 match write_handle
1045 .compare_and_append(EMPTY_UPDATES, upper, Antichain::from_elem(next_upper))
1046 .await
1047 .expect("invalid usage")
1048 {
1049 Ok(()) => next_upper,
1050 Err(mismatch) => antichain_to_timestamp(mismatch.current),
1051 }
1052 };
1053
1054 let snapshot_start = Instant::now();
1055 info!("startup: envd serve: catalog init: snapshot beginning");
1056 let as_of = as_of(&read_handle, upper);
1057 let snapshot: Vec<_> = snapshot_binary(&mut read_handle, as_of, &metrics)
1058 .await
1059 .map(|StateUpdate { kind, ts, diff }| (kind, ts, diff))
1060 .collect();
1061 let listen = read_handle
1062 .listen(Antichain::from_elem(as_of))
1063 .await
1064 .expect("invalid usage");
1065 info!(
1066 "startup: envd serve: catalog init: snapshot complete in {:?}",
1067 snapshot_start.elapsed()
1068 );
1069
1070 let mut handle = UnopenedPersistCatalogState {
1071 mode: Mode::Writable,
1073 since_handle,
1074 write_handle,
1075 listen,
1076 persist_client,
1077 shard_id: catalog_shard_id,
1078 snapshot: Vec::new(),
1080 update_applier: UnopenedCatalogStateInner::new(),
1081 upper,
1082 fenceable_token: FenceableToken::new(deploy_generation),
1083 catalog_content_version: version,
1084 bootstrap_complete: false,
1085 metrics,
1086 size_at_last_consolidation: None,
1087 };
1088 soft_assert_no_log!(
1091 snapshot.iter().all(|(_, _, diff)| *diff == Diff::ONE),
1092 "snapshot should be consolidated: {snapshot:#?}"
1093 );
1094
1095 let apply_start = Instant::now();
1096 info!("startup: envd serve: catalog init: apply updates beginning");
1097 let updates = snapshot
1098 .into_iter()
1099 .map(|(kind, ts, diff)| StateUpdate { kind, ts, diff });
1100 handle.apply_updates_and_consolidate(updates)?;
1101 info!(
1102 "startup: envd serve: catalog init: apply updates complete in {:?}",
1103 apply_start.elapsed()
1104 );
1105
1106 if let Some(found_version) = handle.get_catalog_content_version().await? {
1112 if handle
1114 .catalog_content_version
1115 .cmp_precedence(&found_version)
1116 == std::cmp::Ordering::Less
1117 {
1118 return Err(DurableCatalogError::IncompatiblePersistVersion {
1119 found_version,
1120 catalog_version: handle.catalog_content_version,
1121 });
1122 }
1123 }
1124
1125 Ok(handle)
1126 }
1127
1128 #[mz_ore::instrument]
1129 async fn open_inner(
1130 mut self,
1131 mode: Mode,
1132 initial_ts: Timestamp,
1133 bootstrap_args: &BootstrapArgs,
1134 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1135 let mut commit_ts = self.upper;
1138 self.mode = mode;
1139
1140 match (&self.mode, &self.fenceable_token) {
1142 (_, FenceableToken::Unfenced { .. } | FenceableToken::Fenced { .. }) => {
1143 return Err(DurableCatalogError::Internal(
1144 "catalog should not have fenced before opening".to_string(),
1145 )
1146 .into());
1147 }
1148 (
1149 Mode::Writable | Mode::Savepoint,
1150 FenceableToken::Initializing {
1151 current_deploy_generation: None,
1152 ..
1153 },
1154 ) => {
1155 return Err(DurableCatalogError::Internal(format!(
1156 "cannot open in mode '{:?}' without a deploy generation",
1157 self.mode,
1158 ))
1159 .into());
1160 }
1161 _ => {}
1162 }
1163
1164 let read_only = matches!(self.mode, Mode::Readonly);
1165
1166 loop {
1168 self.sync_to_current_upper().await?;
1169 commit_ts = max(commit_ts, self.upper);
1170 let (fence_updates, current_fenceable_token) = self
1171 .fenceable_token
1172 .generate_unfenced_token(self.mode)?
1173 .ok_or_else(|| {
1174 DurableCatalogError::Internal(
1175 "catalog should not have fenced before opening".to_string(),
1176 )
1177 })?;
1178 debug!(
1179 ?self.upper,
1180 ?self.fenceable_token,
1181 ?current_fenceable_token,
1182 "fencing previous catalogs"
1183 );
1184 if matches!(self.mode, Mode::Writable) {
1185 match self
1186 .compare_and_append(fence_updates.clone(), commit_ts)
1187 .await
1188 {
1189 Ok(upper) => {
1190 commit_ts = upper;
1191 }
1192 Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
1193 Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
1194 warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
1195 continue;
1196 }
1197 }
1198 }
1199 self.fenceable_token = current_fenceable_token;
1200 break;
1201 }
1202
1203 if matches!(self.mode, Mode::Writable) {
1204 let mut controller_handle = self
1211 .persist_client
1212 .open_critical_since::<SourceData, (), Timestamp, StorageDiff>(
1213 self.shard_id,
1214 PersistClient::CONTROLLER_CRITICAL_SINCE,
1215 Opaque::encode(&i64::MIN),
1216 Diagnostics {
1217 shard_name: CATALOG_SHARD_NAME.to_string(),
1218 handle_purpose: "durable catalog state critical since (migration)"
1219 .to_string(),
1220 },
1221 )
1222 .await
1223 .expect("invalid usage");
1224
1225 let since = controller_handle.since().clone();
1226 let res = controller_handle
1227 .compare_and_downgrade_since(
1228 &Opaque::encode(&i64::MIN),
1229 (&Opaque::encode(&PersistEpoch::default()), &since),
1230 )
1231 .await;
1232 match res {
1233 Ok(_) => info!("migrated Opaque of catalog since handle"),
1234 Err(_) => { }
1235 }
1236 }
1237
1238 let is_initialized = self.is_initialized_inner();
1239 if !matches!(self.mode, Mode::Writable) && !is_initialized {
1240 return Err(CatalogError::Durable(DurableCatalogError::NotWritable(
1241 format!(
1242 "catalog tables do not exist; will not create in {:?} mode",
1243 self.mode
1244 ),
1245 )));
1246 }
1247 soft_assert_ne_or_log!(self.upper, Timestamp::minimum());
1248
1249 let (audit_logs, snapshot): (Vec<_>, Vec<_>) = self
1251 .snapshot
1252 .into_iter()
1253 .partition(|(update, _, _)| update.is_audit_log());
1254 self.snapshot = snapshot;
1255 let audit_log_count = audit_logs.iter().map(|(_, _, diff)| diff).sum::<Diff>();
1256 let audit_log_handle = AuditLogIterator::new(audit_logs);
1257
1258 if is_initialized && !read_only {
1260 commit_ts = upgrade(&mut self, commit_ts).await?;
1261 }
1262
1263 debug!(
1264 ?is_initialized,
1265 ?self.upper,
1266 "initializing catalog state"
1267 );
1268 let mut catalog = PersistCatalogState {
1269 mode: self.mode,
1270 since_handle: self.since_handle,
1271 write_handle: self.write_handle,
1272 listen: self.listen,
1273 persist_client: self.persist_client,
1274 shard_id: self.shard_id,
1275 upper: self.upper,
1276 fenceable_token: self.fenceable_token,
1277 snapshot: Vec::new(),
1279 update_applier: CatalogStateInner::new(),
1280 catalog_content_version: self.catalog_content_version,
1281 bootstrap_complete: false,
1282 metrics: self.metrics,
1283 size_at_last_consolidation: None,
1284 };
1285 catalog.metrics.collection_entries.reset();
1286 catalog
1289 .metrics
1290 .collection_entries
1291 .with_label_values(&[&CollectionType::AuditLog.to_string()])
1292 .add(audit_log_count.into_inner());
1293 let updates = self.snapshot.into_iter().map(|(kind, ts, diff)| {
1294 let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
1295 StateUpdate { kind, ts, diff }
1296 });
1297 catalog.apply_updates_and_consolidate(updates)?;
1298
1299 let catalog_content_version = catalog.catalog_content_version.to_string();
1300 let txn = if is_initialized {
1301 let mut txn = catalog.transaction().await?;
1302
1303 if txn.get_setting("migration_version".into()).is_none() && mode != Mode::Readonly {
1310 let old_version = txn.get_catalog_content_version();
1311 txn.set_setting("migration_version".into(), old_version.map(Into::into))?;
1312 }
1313
1314 txn.set_catalog_content_version(catalog_content_version)?;
1315 txn
1316 } else {
1317 soft_assert_eq_no_log!(
1318 catalog
1319 .snapshot
1320 .iter()
1321 .filter(|(kind, _, _)| !matches!(kind, StateUpdateKind::FenceToken(_)))
1322 .count(),
1323 0,
1324 "trace should not contain any updates for an uninitialized catalog: {:#?}",
1325 catalog.snapshot
1326 );
1327
1328 let mut txn = catalog.transaction().await?;
1329 initialize::initialize(
1330 &mut txn,
1331 bootstrap_args,
1332 initial_ts.into(),
1333 catalog_content_version,
1334 )
1335 .await?;
1336 txn
1337 };
1338
1339 if read_only {
1340 let (txn_batch, _) = txn.into_parts();
1341 let updates = StateUpdate::from_txn_batch_ts(txn_batch, catalog.upper);
1343 catalog.apply_updates_and_consolidate(updates)?;
1344 } else {
1345 txn.commit_internal(commit_ts).await?;
1346 }
1347
1348 if matches!(catalog.mode, Mode::Writable) {
1349 let write_handle = catalog
1350 .persist_client
1351 .open_writer::<SourceData, (), Timestamp, i64>(
1352 catalog.write_handle.shard_id(),
1353 Arc::new(persist_desc()),
1354 Arc::new(UnitSchema::default()),
1355 Diagnostics {
1356 shard_name: CATALOG_SHARD_NAME.to_string(),
1357 handle_purpose: "compact catalog".to_string(),
1358 },
1359 )
1360 .await
1361 .expect("invalid usage");
1362 let fuel = CATALOG_FORCE_COMPACTION_FUEL.handle(catalog.persist_client.dyncfgs());
1363 let wait = CATALOG_FORCE_COMPACTION_WAIT.handle(catalog.persist_client.dyncfgs());
1364 let _task = mz_ore::task::spawn(|| "catalog::force_shard_compaction", async move {
1367 let () =
1368 mz_persist_client::cli::admin::dangerous_force_compaction_and_break_pushdown(
1369 &write_handle,
1370 || fuel.get(),
1371 || wait.get(),
1372 )
1373 .await;
1374 });
1375 }
1376
1377 Ok((Box::new(catalog), audit_log_handle))
1378 }
1379
1380 #[mz_ore::instrument]
1385 fn is_initialized_inner(&self) -> bool {
1386 !self.update_applier.configs.is_empty()
1387 }
1388
1389 #[mz_ore::instrument]
1393 async fn get_current_config(&mut self, key: &str) -> Result<Option<u64>, DurableCatalogError> {
1394 self.sync_to_current_upper().await?;
1395 Ok(self.update_applier.configs.get(key).cloned())
1396 }
1397
1398 #[mz_ore::instrument]
1402 pub(crate) async fn get_user_version(&mut self) -> Result<Option<u64>, DurableCatalogError> {
1403 self.get_current_config(USER_VERSION_KEY).await
1404 }
1405
1406 #[mz_ore::instrument]
1410 async fn get_current_setting(
1411 &mut self,
1412 name: &str,
1413 ) -> Result<Option<String>, DurableCatalogError> {
1414 self.sync_to_current_upper().await?;
1415 Ok(self.update_applier.settings.get(name).cloned())
1416 }
1417
1418 #[mz_ore::instrument]
1423 async fn get_catalog_content_version(
1424 &mut self,
1425 ) -> Result<Option<semver::Version>, DurableCatalogError> {
1426 let version = self
1427 .get_current_setting(CATALOG_CONTENT_VERSION_KEY)
1428 .await?;
1429 let version = version.map(|version| version.parse().expect("invalid version persisted"));
1430 Ok(version)
1431 }
1432}
1433
1434#[async_trait]
1435impl OpenableDurableCatalogState for UnopenedPersistCatalogState {
1436 #[mz_ore::instrument]
1437 async fn open_savepoint(
1438 mut self: Box<Self>,
1439 initial_ts: Timestamp,
1440 bootstrap_args: &BootstrapArgs,
1441 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1442 self.open_inner(Mode::Savepoint, initial_ts, bootstrap_args)
1443 .boxed()
1444 .await
1445 }
1446
1447 #[mz_ore::instrument]
1448 async fn open_read_only(
1449 mut self: Box<Self>,
1450 bootstrap_args: &BootstrapArgs,
1451 ) -> Result<Box<dyn DurableCatalogState>, CatalogError> {
1452 self.open_inner(Mode::Readonly, EpochMillis::MIN.into(), bootstrap_args)
1453 .boxed()
1454 .await
1455 .map(|(catalog, _)| catalog)
1456 }
1457
1458 #[mz_ore::instrument]
1459 async fn open(
1460 mut self: Box<Self>,
1461 initial_ts: Timestamp,
1462 bootstrap_args: &BootstrapArgs,
1463 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1464 self.open_inner(Mode::Writable, initial_ts, bootstrap_args)
1465 .boxed()
1466 .await
1467 }
1468
1469 #[mz_ore::instrument(level = "debug")]
1470 async fn open_debug(mut self: Box<Self>) -> Result<DebugCatalogState, CatalogError> {
1471 Ok(DebugCatalogState(*self))
1472 }
1473
1474 #[mz_ore::instrument]
1475 async fn is_initialized(&mut self) -> Result<bool, CatalogError> {
1476 self.sync_to_current_upper().await?;
1477 Ok(self.is_initialized_inner())
1478 }
1479
1480 #[mz_ore::instrument]
1481 async fn epoch(&mut self) -> Result<Epoch, CatalogError> {
1482 self.sync_to_current_upper().await?;
1483 self.fenceable_token
1484 .validate()?
1485 .map(|token| token.epoch)
1486 .ok_or(CatalogError::Durable(DurableCatalogError::Uninitialized))
1487 }
1488
1489 #[mz_ore::instrument]
1490 async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError> {
1491 self.sync_to_current_upper().await?;
1492 self.fenceable_token
1493 .token()
1494 .map(|token| token.deploy_generation)
1495 .ok_or(CatalogError::Durable(DurableCatalogError::Uninitialized))
1496 }
1497
1498 #[mz_ore::instrument(level = "debug")]
1499 async fn get_0dt_deployment_max_wait(&mut self) -> Result<Option<Duration>, CatalogError> {
1500 let value = self
1501 .get_current_config(WITH_0DT_DEPLOYMENT_MAX_WAIT)
1502 .await?;
1503 match value {
1504 None => Ok(None),
1505 Some(millis) => Ok(Some(Duration::from_millis(millis))),
1506 }
1507 }
1508
1509 #[mz_ore::instrument(level = "debug")]
1510 async fn get_0dt_deployment_ddl_check_interval(
1511 &mut self,
1512 ) -> Result<Option<Duration>, CatalogError> {
1513 let value = self
1514 .get_current_config(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL)
1515 .await?;
1516 match value {
1517 None => Ok(None),
1518 Some(millis) => Ok(Some(Duration::from_millis(millis))),
1519 }
1520 }
1521
1522 #[mz_ore::instrument(level = "debug")]
1523 async fn get_enable_0dt_deployment_panic_after_timeout(
1524 &mut self,
1525 ) -> Result<Option<bool>, CatalogError> {
1526 let value = self
1527 .get_current_config(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT)
1528 .await?;
1529 match value {
1530 None => Ok(None),
1531 Some(0) => Ok(Some(false)),
1532 Some(1) => Ok(Some(true)),
1533 Some(v) => Err(
1534 DurableCatalogError::from(TryFromProtoError::UnknownEnumVariant(format!(
1535 "{v} is not a valid boolean value"
1536 )))
1537 .into(),
1538 ),
1539 }
1540 }
1541
1542 #[mz_ore::instrument]
1543 async fn has_system_config_synced_once(&mut self) -> Result<bool, DurableCatalogError> {
1544 self.get_current_config(SYSTEM_CONFIG_SYNCED_KEY)
1545 .await
1546 .map(|value| value.map(|value| value > 0).unwrap_or(false))
1547 }
1548
1549 #[mz_ore::instrument]
1550 async fn trace_unconsolidated(&mut self) -> Result<Trace, CatalogError> {
1551 self.sync_to_current_upper().await?;
1552 if self.is_initialized_inner() {
1553 let snapshot = self.snapshot_unconsolidated().await;
1554 Ok(Trace::from_snapshot(snapshot))
1555 } else {
1556 Err(CatalogError::Durable(DurableCatalogError::Uninitialized))
1557 }
1558 }
1559
1560 #[mz_ore::instrument]
1561 async fn trace_consolidated(&mut self) -> Result<Trace, CatalogError> {
1562 self.sync_to_current_upper().await?;
1563 if self.is_initialized_inner() {
1564 let snapshot = self.current_snapshot().await?;
1565 Ok(Trace::from_snapshot(snapshot))
1566 } else {
1567 Err(CatalogError::Durable(DurableCatalogError::Uninitialized))
1568 }
1569 }
1570
1571 #[mz_ore::instrument(level = "debug")]
1572 async fn expire(self: Box<Self>) {
1573 self.expire().await
1574 }
1575}
1576
1577#[derive(Debug)]
1579struct CatalogStateInner {
1580 updates: VecDeque<memory::objects::StateUpdate>,
1582}
1583
1584impl CatalogStateInner {
1585 fn new() -> CatalogStateInner {
1586 CatalogStateInner {
1587 updates: VecDeque::new(),
1588 }
1589 }
1590}
1591
1592impl ApplyUpdate<StateUpdateKind> for CatalogStateInner {
1593 fn apply_update(
1594 &mut self,
1595 update: StateUpdate<StateUpdateKind>,
1596 current_fence_token: &mut FenceableToken,
1597 metrics: &Arc<Metrics>,
1598 ) -> Result<Option<StateUpdate<StateUpdateKind>>, FenceError> {
1599 if let Some(collection_type) = update.kind.collection_type() {
1600 metrics
1601 .collection_entries
1602 .with_label_values(&[&collection_type.to_string()])
1603 .add(update.diff.into_inner());
1604 }
1605
1606 {
1607 let update: Option<memory::objects::StateUpdate> = (&update)
1608 .try_into()
1609 .expect("invalid persisted update: {update:#?}");
1610 if let Some(update) = update {
1611 self.updates.push_back(update);
1612 }
1613 }
1614
1615 match (update.kind, update.diff) {
1616 (StateUpdateKind::AuditLog(_, ()), _) => Ok(None),
1617 (StateUpdateKind::FenceToken(_), Diff::MINUS_ONE) => Ok(None),
1619 (StateUpdateKind::FenceToken(token), Diff::ONE) => {
1620 current_fence_token.maybe_fence(token)?;
1621 Ok(None)
1622 }
1623 (kind, diff) => Ok(Some(StateUpdate {
1624 kind,
1625 ts: update.ts,
1626 diff,
1627 })),
1628 }
1629 }
1630}
1631
1632type PersistCatalogState = PersistHandle<StateUpdateKind, CatalogStateInner>;
1638
1639#[async_trait]
1640impl ReadOnlyDurableCatalogState for PersistCatalogState {
1641 fn epoch(&self) -> Epoch {
1642 self.fenceable_token
1643 .token()
1644 .expect("opened catalog state must have an epoch")
1645 .epoch
1646 }
1647
1648 fn metrics(&self) -> &Metrics {
1649 &self.metrics
1650 }
1651
1652 #[mz_ore::instrument(level = "debug")]
1653 async fn expire(self: Box<Self>) {
1654 self.expire().await
1655 }
1656
1657 fn is_bootstrap_complete(&self) -> bool {
1658 self.bootstrap_complete
1659 }
1660
1661 async fn get_audit_logs(&mut self) -> Result<Vec<VersionedEvent>, CatalogError> {
1662 self.sync_to_current_upper().await?;
1663 let audit_logs: Vec<_> = self
1664 .persist_snapshot()
1665 .await
1666 .filter_map(
1667 |StateUpdate {
1668 kind,
1669 ts: _,
1670 diff: _,
1671 }| match kind {
1672 StateUpdateKind::AuditLog(key, ()) => Some(key),
1673 _ => None,
1674 },
1675 )
1676 .collect();
1677 let mut audit_logs: Vec<_> = audit_logs
1678 .into_iter()
1679 .map(RustType::from_proto)
1680 .map_ok(|key: AuditLogKey| key.event)
1681 .collect::<Result<_, _>>()?;
1682 audit_logs.sort_by(|a, b| a.sortable_id().cmp(&b.sortable_id()));
1683 Ok(audit_logs)
1684 }
1685
1686 #[mz_ore::instrument(level = "debug")]
1687 async fn get_next_id(&mut self, id_type: &str) -> Result<u64, CatalogError> {
1688 self.with_trace(|trace| {
1689 Ok(trace
1690 .into_iter()
1691 .rev()
1692 .filter_map(|(kind, _, _)| match kind {
1693 StateUpdateKind::IdAllocator(key, value) if key.name == id_type => {
1694 Some(value.next_id)
1695 }
1696 _ => None,
1697 })
1698 .next()
1699 .expect("must exist"))
1700 })
1701 .await
1702 }
1703
1704 #[mz_ore::instrument(level = "debug")]
1705 async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError> {
1706 self.sync_to_current_upper().await?;
1707 Ok(self
1708 .fenceable_token
1709 .token()
1710 .expect("opened catalogs must have a token")
1711 .deploy_generation)
1712 }
1713
1714 #[mz_ore::instrument(level = "debug")]
1715 async fn snapshot(&mut self) -> Result<Snapshot, CatalogError> {
1716 self.with_snapshot(Ok).await
1717 }
1718
1719 #[mz_ore::instrument(level = "debug")]
1720 async fn sync_to_current_updates(
1721 &mut self,
1722 ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError> {
1723 let upper = self.current_upper().await;
1724 self.sync_updates(upper).await
1725 }
1726
1727 #[mz_ore::instrument(level = "debug")]
1728 async fn sync_updates(
1729 &mut self,
1730 target_upper: mz_repr::Timestamp,
1731 ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError> {
1732 self.sync(target_upper).await?;
1733 let mut updates = Vec::new();
1734 while let Some(update) = self.update_applier.updates.front() {
1735 if update.ts >= target_upper {
1736 break;
1737 }
1738
1739 let update = self
1740 .update_applier
1741 .updates
1742 .pop_front()
1743 .expect("peeked above");
1744 updates.push(update);
1745 }
1746 Ok(updates)
1747 }
1748
1749 async fn current_upper(&mut self) -> Timestamp {
1750 self.current_upper().await
1751 }
1752}
1753
1754#[async_trait]
1755#[allow(mismatched_lifetime_syntaxes)]
1756impl DurableCatalogState for PersistCatalogState {
1757 fn is_read_only(&self) -> bool {
1758 matches!(self.mode, Mode::Readonly)
1759 }
1760
1761 fn is_savepoint(&self) -> bool {
1762 matches!(self.mode, Mode::Savepoint)
1763 }
1764
1765 async fn mark_bootstrap_complete(&mut self) {
1766 self.bootstrap_complete = true;
1767 if matches!(self.mode, Mode::Writable) {
1768 self.since_handle
1769 .upgrade_version()
1770 .await
1771 .expect("invalid usage")
1772 }
1773 }
1774
1775 #[mz_ore::instrument(level = "debug")]
1776 async fn transaction(&mut self) -> Result<Transaction, CatalogError> {
1777 self.metrics.transactions_started.inc();
1778 let snapshot = self.snapshot().await?;
1779 let commit_ts = self.upper.clone();
1780 Transaction::new(self, snapshot, commit_ts)
1781 }
1782
1783 fn transaction_from_snapshot(
1784 &mut self,
1785 snapshot: Snapshot,
1786 ) -> Result<Transaction, CatalogError> {
1787 let commit_ts = self.upper.clone();
1788 Transaction::new(self, snapshot, commit_ts)
1789 }
1790
1791 #[mz_ore::instrument(level = "debug")]
1792 async fn commit_transaction(
1793 &mut self,
1794 txn_batch: TransactionBatch,
1795 commit_ts: Timestamp,
1796 ) -> Result<Timestamp, CatalogError> {
1797 async fn commit_transaction_inner(
1798 catalog: &mut PersistCatalogState,
1799 txn_batch: TransactionBatch,
1800 commit_ts: Timestamp,
1801 ) -> Result<Timestamp, CatalogError> {
1802 if catalog.mode == Mode::Readonly {
1806 let updates: Vec<_> = StateUpdate::from_txn_batch(txn_batch).collect();
1807 if !updates.is_empty() {
1808 return Err(DurableCatalogError::NotWritable(format!(
1809 "cannot commit a transaction in a read-only catalog: {updates:#?}"
1810 ))
1811 .into());
1812 }
1813 return Ok(catalog.upper);
1814 }
1815
1816 assert_eq!(
1821 catalog.upper, txn_batch.upper,
1822 "only one transaction at a time is supported"
1823 );
1824
1825 assert!(
1826 commit_ts >= catalog.upper,
1827 "expected commit ts, {}, to be greater than or equal to upper, {}",
1828 commit_ts,
1829 catalog.upper
1830 );
1831
1832 let updates = StateUpdate::from_txn_batch(txn_batch).collect();
1833 debug!("committing updates: {updates:?}");
1834
1835 let next_upper = match catalog.mode {
1836 Mode::Writable => catalog
1837 .compare_and_append(updates, commit_ts)
1838 .await
1839 .map_err(|e| e.unwrap_fence_error())?,
1840 Mode::Savepoint => {
1841 let updates = updates.into_iter().map(|(kind, diff)| StateUpdate {
1842 kind,
1843 ts: commit_ts,
1844 diff,
1845 });
1846 catalog.apply_updates_and_consolidate(updates)?;
1847 catalog.upper = commit_ts.step_forward();
1848 catalog.upper
1849 }
1850 Mode::Readonly => unreachable!("handled above"),
1851 };
1852
1853 Ok(next_upper)
1854 }
1855 self.metrics.transaction_commits.inc();
1856 let counter = self.metrics.transaction_commit_latency_seconds.clone();
1857 commit_transaction_inner(self, txn_batch, commit_ts)
1858 .wall_time()
1859 .inc_by(counter)
1860 .await
1861 }
1862
1863 #[mz_ore::instrument(level = "debug")]
1864 async fn advance_upper(&mut self, new_upper: Timestamp) -> Result<(), CatalogError> {
1865 if self.upper >= new_upper {
1866 soft_panic_or_log!(
1874 "new_upper ({new_upper}) not greater than current upper ({})",
1875 self.upper
1876 );
1877 return Ok(());
1878 }
1879
1880 match self.mode {
1881 Mode::Writable => self
1882 .compare_and_append_inner([], new_upper)
1883 .await
1884 .map_err(|e| e.unwrap_fence_error())?,
1885 Mode::Savepoint => (),
1886 Mode::Readonly => {
1887 return Err(DurableCatalogError::NotWritable(
1888 "cannot advance upper of a read-only catalog".into(),
1889 )
1890 .into());
1891 }
1892 }
1893
1894 self.upper = new_upper;
1895 Ok(())
1898 }
1899
1900 fn shard_id(&self) -> ShardId {
1901 self.shard_id
1902 }
1903}
1904
1905pub fn shard_id(organization_id: Uuid, seed: usize) -> ShardId {
1907 let hash = sha2::Sha256::digest(format!("{organization_id}{seed}")).to_vec();
1908 soft_assert_eq_or_log!(hash.len(), 32, "SHA256 returns 32 bytes (256 bits)");
1909 let uuid = Uuid::from_slice(&hash[0..16]).expect("from_slice accepts exactly 16 bytes");
1910 ShardId::from_str(&format!("s{uuid}")).expect("known to be valid")
1911}
1912
1913fn as_of(
1916 read_handle: &ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1917 upper: Timestamp,
1918) -> Timestamp {
1919 let since = read_handle.since().clone();
1920 let mut as_of = upper.checked_sub(1).unwrap_or_else(|| {
1921 panic!("catalog persist shard should be initialize, found upper: {upper:?}")
1922 });
1923 soft_assert_or_log!(
1926 since.less_equal(&as_of),
1927 "since={since:?}, as_of={as_of:?}; since must be less than or equal to as_of"
1928 );
1929 as_of.advance_by(since.borrow());
1932 as_of
1933}
1934
1935async fn fetch_catalog_shard_version(
1938 persist_client: &PersistClient,
1939 catalog_shard_id: ShardId,
1940) -> Option<semver::Version> {
1941 let shard_state = persist_client
1942 .inspect_shard::<Timestamp>(&catalog_shard_id)
1943 .await
1944 .ok()?;
1945 let json_state = serde_json::to_value(shard_state).expect("state serialization error");
1946 let json_version = json_state
1947 .get("applier_version")
1948 .cloned()
1949 .expect("missing applier_version");
1950 let version = serde_json::from_value(json_version).expect("version deserialization error");
1951 Some(version)
1952}
1953
1954#[mz_ore::instrument(level = "debug")]
1959async fn snapshot_binary(
1960 read_handle: &mut ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1961 as_of: Timestamp,
1962 metrics: &Arc<Metrics>,
1963) -> impl Iterator<Item = StateUpdate<StateUpdateKindJson>> + DoubleEndedIterator + use<> {
1964 metrics.snapshots_taken.inc();
1965 let counter = metrics.snapshot_latency_seconds.clone();
1966 snapshot_binary_inner(read_handle, as_of)
1967 .wall_time()
1968 .inc_by(counter)
1969 .await
1970}
1971
1972#[mz_ore::instrument(level = "debug")]
1977async fn snapshot_binary_inner(
1978 read_handle: &mut ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1979 as_of: Timestamp,
1980) -> impl Iterator<Item = StateUpdate<StateUpdateKindJson>> + DoubleEndedIterator + use<> {
1981 let snapshot = read_handle
1982 .snapshot_and_fetch(Antichain::from_elem(as_of))
1983 .await
1984 .expect("we have advanced the restart_as_of by the since");
1985 soft_assert_no_log!(
1986 snapshot.iter().all(|(_, _, diff)| *diff == 1),
1987 "snapshot_and_fetch guarantees a consolidated result: {snapshot:#?}"
1988 );
1989 snapshot
1990 .into_iter()
1991 .map(Into::<StateUpdate<StateUpdateKindJson>>::into)
1992 .sorted_by(|a, b| Ord::cmp(&b.ts, &a.ts))
1993}
1994
1995pub(crate) fn antichain_to_timestamp(antichain: Antichain<Timestamp>) -> Timestamp {
2000 antichain
2001 .into_option()
2002 .expect("we use a totally ordered time and never finalize the shard")
2003}
2004
2005impl Trace {
2008 fn from_snapshot(snapshot: impl IntoIterator<Item = StateUpdate>) -> Trace {
2010 let mut trace = Trace::new();
2011 for StateUpdate { kind, ts, diff } in snapshot {
2012 match kind {
2013 StateUpdateKind::AuditLog(k, v) => trace.audit_log.values.push(((k, v), ts, diff)),
2014 StateUpdateKind::Cluster(k, v) => trace.clusters.values.push(((k, v), ts, diff)),
2015 StateUpdateKind::ClusterReplica(k, v) => {
2016 trace.cluster_replicas.values.push(((k, v), ts, diff))
2017 }
2018 StateUpdateKind::Comment(k, v) => trace.comments.values.push(((k, v), ts, diff)),
2019 StateUpdateKind::Config(k, v) => trace.configs.values.push(((k, v), ts, diff)),
2020 StateUpdateKind::Database(k, v) => trace.databases.values.push(((k, v), ts, diff)),
2021 StateUpdateKind::DefaultPrivilege(k, v) => {
2022 trace.default_privileges.values.push(((k, v), ts, diff))
2023 }
2024 StateUpdateKind::FenceToken(_) => {
2025 }
2027 StateUpdateKind::IdAllocator(k, v) => {
2028 trace.id_allocator.values.push(((k, v), ts, diff))
2029 }
2030 StateUpdateKind::IntrospectionSourceIndex(k, v) => {
2031 trace.introspection_sources.values.push(((k, v), ts, diff))
2032 }
2033 StateUpdateKind::Item(k, v) => trace.items.values.push(((k, v), ts, diff)),
2034 StateUpdateKind::NetworkPolicy(k, v) => {
2035 trace.network_policies.values.push(((k, v), ts, diff))
2036 }
2037 StateUpdateKind::Role(k, v) => trace.roles.values.push(((k, v), ts, diff)),
2038 StateUpdateKind::Schema(k, v) => trace.schemas.values.push(((k, v), ts, diff)),
2039 StateUpdateKind::Setting(k, v) => trace.settings.values.push(((k, v), ts, diff)),
2040 StateUpdateKind::SourceReferences(k, v) => {
2041 trace.source_references.values.push(((k, v), ts, diff))
2042 }
2043 StateUpdateKind::SystemConfiguration(k, v) => {
2044 trace.system_configurations.values.push(((k, v), ts, diff))
2045 }
2046 StateUpdateKind::SystemObjectMapping(k, v) => {
2047 trace.system_object_mappings.values.push(((k, v), ts, diff))
2048 }
2049 StateUpdateKind::SystemPrivilege(k, v) => {
2050 trace.system_privileges.values.push(((k, v), ts, diff))
2051 }
2052 StateUpdateKind::StorageCollectionMetadata(k, v) => trace
2053 .storage_collection_metadata
2054 .values
2055 .push(((k, v), ts, diff)),
2056 StateUpdateKind::UnfinalizedShard(k, ()) => {
2057 trace.unfinalized_shards.values.push(((k, ()), ts, diff))
2058 }
2059 StateUpdateKind::TxnWalShard((), v) => {
2060 trace.txn_wal_shard.values.push((((), v), ts, diff))
2061 }
2062 StateUpdateKind::RoleAuth(k, v) => trace.role_auth.values.push(((k, v), ts, diff)),
2063 }
2064 }
2065 trace
2066 }
2067}
2068
2069impl UnopenedPersistCatalogState {
2070 #[mz_ore::instrument]
2072 pub(crate) async fn debug_edit<T: Collection>(
2073 &mut self,
2074 key: T::Key,
2075 value: T::Value,
2076 ) -> Result<Option<T::Value>, CatalogError>
2077 where
2078 T::Key: PartialEq + Eq + Debug + Clone,
2079 T::Value: Debug + Clone,
2080 {
2081 let prev_value = loop {
2082 let key = key.clone();
2083 let value = value.clone();
2084 let snapshot = self.current_snapshot().await?;
2085 let trace = Trace::from_snapshot(snapshot);
2086 let collection_trace = T::collection_trace(trace);
2087 let prev_values: Vec<_> = collection_trace
2088 .values
2089 .into_iter()
2090 .filter(|((k, _), _, diff)| {
2091 soft_assert_eq_or_log!(*diff, Diff::ONE, "trace is consolidated");
2092 &key == k
2093 })
2094 .collect();
2095
2096 let prev_value = match &prev_values[..] {
2097 [] => None,
2098 [((_, v), _, _)] => Some(v.clone()),
2099 prev_values => panic!("multiple values found for key {key:?}: {prev_values:?}"),
2100 };
2101
2102 let mut updates: Vec<_> = prev_values
2103 .into_iter()
2104 .map(|((k, v), _, _)| (T::update(k, v), Diff::MINUS_ONE))
2105 .collect();
2106 updates.push((T::update(key, value), Diff::ONE));
2107 match self.fenceable_token.generate_unfenced_token(self.mode)? {
2109 Some((fence_updates, current_fenceable_token)) => {
2110 updates.extend(fence_updates.clone());
2111 match self.compare_and_append(updates, self.upper).await {
2112 Ok(_) => {
2113 self.fenceable_token = current_fenceable_token;
2114 break prev_value;
2115 }
2116 Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
2117 Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
2118 warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
2119 continue;
2120 }
2121 }
2122 }
2123 None => {
2124 self.compare_and_append(updates, self.upper)
2125 .await
2126 .map_err(|e| e.unwrap_fence_error())?;
2127 break prev_value;
2128 }
2129 }
2130 };
2131 Ok(prev_value)
2132 }
2133
2134 #[mz_ore::instrument]
2136 pub(crate) async fn debug_delete<T: Collection>(
2137 &mut self,
2138 key: T::Key,
2139 ) -> Result<(), CatalogError>
2140 where
2141 T::Key: PartialEq + Eq + Debug + Clone,
2142 T::Value: Debug,
2143 {
2144 loop {
2145 let key = key.clone();
2146 let snapshot = self.current_snapshot().await?;
2147 let trace = Trace::from_snapshot(snapshot);
2148 let collection_trace = T::collection_trace(trace);
2149 let mut retractions: Vec<_> = collection_trace
2150 .values
2151 .into_iter()
2152 .filter(|((k, _), _, diff)| {
2153 soft_assert_eq_or_log!(*diff, Diff::ONE, "trace is consolidated");
2154 &key == k
2155 })
2156 .map(|((k, v), _, _)| (T::update(k, v), Diff::MINUS_ONE))
2157 .collect();
2158
2159 match self.fenceable_token.generate_unfenced_token(self.mode)? {
2161 Some((fence_updates, current_fenceable_token)) => {
2162 retractions.extend(fence_updates.clone());
2163 match self.compare_and_append(retractions, self.upper).await {
2164 Ok(_) => {
2165 self.fenceable_token = current_fenceable_token;
2166 break;
2167 }
2168 Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
2169 Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
2170 warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
2171 continue;
2172 }
2173 }
2174 }
2175 None => {
2176 self.compare_and_append(retractions, self.upper)
2177 .await
2178 .map_err(|e| e.unwrap_fence_error())?;
2179 break;
2180 }
2181 }
2182 }
2183 Ok(())
2184 }
2185
2186 async fn current_snapshot(
2191 &mut self,
2192 ) -> Result<impl IntoIterator<Item = StateUpdate> + '_, CatalogError> {
2193 self.sync_to_current_upper().await?;
2194 self.consolidate();
2195 Ok(self.snapshot.iter().cloned().map(|(kind, ts, diff)| {
2196 let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
2197 StateUpdate { kind, ts, diff }
2198 }))
2199 }
2200}