1#[cfg(test)]
11mod tests;
12
13use std::cmp::max;
14use std::collections::{BTreeMap, VecDeque};
15use std::fmt::Debug;
16use std::str::FromStr;
17use std::sync::{Arc, LazyLock};
18use std::time::{Duration, Instant};
19
20use async_trait::async_trait;
21use differential_dataflow::lattice::Lattice;
22use futures::{FutureExt, StreamExt};
23use itertools::Itertools;
24use mz_audit_log::VersionedEvent;
25use mz_ore::metrics::MetricsFutureExt;
26use mz_ore::now::EpochMillis;
27use mz_ore::{
28 soft_assert_eq_no_log, soft_assert_eq_or_log, soft_assert_ne_or_log, soft_assert_no_log,
29 soft_assert_or_log, soft_panic_or_log,
30};
31use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
32use mz_persist_client::cli::admin::{CATALOG_FORCE_COMPACTION_FUEL, CATALOG_FORCE_COMPACTION_WAIT};
33use mz_persist_client::critical::{CriticalReaderId, Opaque, SinceHandle};
34use mz_persist_client::error::UpperMismatch;
35use mz_persist_client::read::{Listen, ListenEvent, ReadHandle};
36use mz_persist_client::write::WriteHandle;
37use mz_persist_client::{Diagnostics, PersistClient, ShardId};
38use mz_persist_types::codec_impls::UnitSchema;
39use mz_proto::{RustType, TryFromProtoError};
40use mz_repr::Diff;
41use mz_storage_client::controller::PersistEpoch;
42use mz_storage_types::StorageDiff;
43use mz_storage_types::sources::SourceData;
44use sha2::Digest;
45use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
46use tracing::{debug, info, warn};
47use uuid::Uuid;
48
49use crate::durable::debug::{Collection, CollectionType, DebugCatalogState, Trace};
50use crate::durable::error::FenceError;
51use crate::durable::initialize::{
52 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, SYSTEM_CONFIG_SYNCED_KEY, USER_VERSION_KEY,
53 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
54};
55use crate::durable::metrics::Metrics;
56use crate::durable::objects::state_update::{
57 IntoStateUpdateKindJson, StateUpdate, StateUpdateKind, StateUpdateKindJson,
58 TryIntoStateUpdateKind,
59};
60use crate::durable::objects::{AuditLogKey, FenceToken, Snapshot};
61use crate::durable::transaction::TransactionBatch;
62use crate::durable::upgrade::upgrade;
63use crate::durable::{
64 AuditLogIterator, BootstrapArgs, CATALOG_CONTENT_VERSION_KEY, CatalogError,
65 DurableCatalogError, DurableCatalogState, Epoch, OpenableDurableCatalogState,
66 ReadOnlyDurableCatalogState, Transaction, initialize, persist_desc,
67};
68use crate::memory;
69
70pub(crate) type Timestamp = mz_repr::Timestamp;
72
73const MIN_EPOCH: Epoch = unsafe { Epoch::new_unchecked(1) };
78
79const CATALOG_SHARD_NAME: &str = "catalog";
81
82static CATALOG_CRITICAL_SINCE: LazyLock<CriticalReaderId> = LazyLock::new(|| {
85 "c55555555-6666-7777-8888-999999999999"
86 .parse()
87 .expect("valid CriticalReaderId")
88});
89
90const CATALOG_SEED: usize = 1;
92const _UPGRADE_SEED: usize = 2;
94pub const _BUILTIN_MIGRATION_SEED: usize = 3;
96pub const _EXPRESSION_CACHE_SEED: usize = 4;
98
99#[derive(Debug, Copy, Clone, Eq, PartialEq)]
101pub(crate) enum Mode {
102 Readonly,
104 Savepoint,
106 Writable,
108}
109
110#[derive(Debug)]
112pub(crate) enum FenceableToken {
113 Initializing {
116 durable_token: Option<FenceToken>,
118 current_deploy_generation: Option<u64>,
120 },
121 Unfenced { current_token: FenceToken },
123 Fenced {
125 current_token: FenceToken,
126 fence_token: FenceToken,
127 },
128}
129
130impl FenceableToken {
131 fn new(current_deploy_generation: Option<u64>) -> Self {
133 Self::Initializing {
134 durable_token: None,
135 current_deploy_generation,
136 }
137 }
138
139 fn validate(&self) -> Result<Option<FenceToken>, FenceError> {
141 match self {
142 FenceableToken::Initializing { durable_token, .. } => Ok(durable_token.clone()),
143 FenceableToken::Unfenced { current_token, .. } => Ok(Some(current_token.clone())),
144 FenceableToken::Fenced {
145 current_token,
146 fence_token,
147 } => {
148 assert!(
149 fence_token > current_token,
150 "must be fenced by higher token; current={current_token:?}, fence={fence_token:?}"
151 );
152 if fence_token.deploy_generation > current_token.deploy_generation {
153 Err(FenceError::DeployGeneration {
154 current_generation: current_token.deploy_generation,
155 fence_generation: fence_token.deploy_generation,
156 })
157 } else {
158 assert!(
159 fence_token.epoch > current_token.epoch,
160 "must be fenced by higher token; current={current_token:?}, fence={fence_token:?}"
161 );
162 Err(FenceError::Epoch {
163 current_epoch: current_token.epoch,
164 fence_epoch: fence_token.epoch,
165 })
166 }
167 }
168 }
169 }
170
171 fn token(&self) -> Option<FenceToken> {
173 match self {
174 FenceableToken::Initializing { durable_token, .. } => durable_token.clone(),
175 FenceableToken::Unfenced { current_token, .. } => Some(current_token.clone()),
176 FenceableToken::Fenced { current_token, .. } => Some(current_token.clone()),
177 }
178 }
179
180 fn maybe_fence(&mut self, token: FenceToken) -> Result<(), FenceError> {
182 match self {
183 FenceableToken::Initializing {
184 durable_token,
185 current_deploy_generation,
186 ..
187 } => {
188 match durable_token {
189 Some(durable_token) => {
190 *durable_token = max(durable_token.clone(), token.clone());
191 }
192 None => {
193 *durable_token = Some(token.clone());
194 }
195 }
196 if let Some(current_deploy_generation) = current_deploy_generation {
197 if *current_deploy_generation < token.deploy_generation {
198 *self = FenceableToken::Fenced {
199 current_token: FenceToken {
200 deploy_generation: *current_deploy_generation,
201 epoch: token.epoch,
202 },
203 fence_token: token,
204 };
205 self.validate()?;
206 }
207 }
208 }
209 FenceableToken::Unfenced { current_token } => {
210 if *current_token < token {
211 *self = FenceableToken::Fenced {
212 current_token: current_token.clone(),
213 fence_token: token,
214 };
215 self.validate()?;
216 }
217 }
218 FenceableToken::Fenced { .. } => {
219 self.validate()?;
220 }
221 }
222
223 Ok(())
224 }
225
226 fn generate_unfenced_token(
230 &self,
231 mode: Mode,
232 ) -> Result<Option<(Vec<(StateUpdateKind, Diff)>, FenceableToken)>, DurableCatalogError> {
233 let (durable_token, current_deploy_generation) = match self {
234 FenceableToken::Initializing {
235 durable_token,
236 current_deploy_generation,
237 } => (durable_token.clone(), current_deploy_generation.clone()),
238 FenceableToken::Unfenced { .. } | FenceableToken::Fenced { .. } => return Ok(None),
239 };
240
241 let mut fence_updates = Vec::with_capacity(2);
242
243 if let Some(durable_token) = &durable_token {
244 fence_updates.push((
245 StateUpdateKind::FenceToken(durable_token.clone()),
246 Diff::MINUS_ONE,
247 ));
248 }
249
250 let current_deploy_generation = current_deploy_generation
251 .or_else(|| durable_token.as_ref().map(|token| token.deploy_generation))
252 .ok_or(DurableCatalogError::Uninitialized)?;
254 let mut current_epoch = durable_token
255 .map(|token| token.epoch)
256 .unwrap_or(MIN_EPOCH)
257 .get();
258 if matches!(mode, Mode::Writable) {
260 current_epoch = current_epoch + 1;
261 }
262 let current_epoch = Epoch::new(current_epoch).expect("known to be non-zero");
263 let current_token = FenceToken {
264 deploy_generation: current_deploy_generation,
265 epoch: current_epoch,
266 };
267
268 fence_updates.push((
269 StateUpdateKind::FenceToken(current_token.clone()),
270 Diff::ONE,
271 ));
272
273 let current_fenceable_token = FenceableToken::Unfenced { current_token };
274
275 Ok(Some((fence_updates, current_fenceable_token)))
276 }
277}
278
279#[derive(Debug, thiserror::Error)]
281pub(crate) enum CompareAndAppendError {
282 #[error(transparent)]
283 Fence(#[from] FenceError),
284 #[error(
287 "expected catalog upper {expected_upper:?} did not match actual catalog upper {actual_upper:?}"
288 )]
289 UpperMismatch {
290 expected_upper: Timestamp,
291 actual_upper: Timestamp,
292 },
293}
294
295impl CompareAndAppendError {
296 pub(crate) fn unwrap_fence_error(self) -> FenceError {
297 match self {
298 CompareAndAppendError::Fence(e) => e,
299 e @ CompareAndAppendError::UpperMismatch { .. } => {
300 panic!("unexpected upper mismatch: {e:?}")
301 }
302 }
303 }
304}
305
306impl From<UpperMismatch<Timestamp>> for CompareAndAppendError {
307 fn from(upper_mismatch: UpperMismatch<Timestamp>) -> Self {
308 Self::UpperMismatch {
309 expected_upper: antichain_to_timestamp(upper_mismatch.expected),
310 actual_upper: antichain_to_timestamp(upper_mismatch.current),
311 }
312 }
313}
314
315pub(crate) trait ApplyUpdate<T: IntoStateUpdateKindJson> {
316 fn apply_update(
320 &mut self,
321 update: StateUpdate<T>,
322 current_fence_token: &mut FenceableToken,
323 metrics: &Arc<Metrics>,
324 ) -> Result<Option<StateUpdate<T>>, FenceError>;
325}
326
327#[derive(Debug)]
341pub(crate) struct PersistHandle<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> {
342 pub(crate) mode: Mode,
344 since_handle: SinceHandle<SourceData, (), Timestamp, StorageDiff>,
346 write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
348 listen: Listen<SourceData, (), Timestamp, StorageDiff>,
350 persist_client: PersistClient,
352 shard_id: ShardId,
354 pub(crate) snapshot: Vec<(T, Timestamp, Diff)>,
358 update_applier: U,
360 pub(crate) upper: Timestamp,
362 fenceable_token: FenceableToken,
364 catalog_content_version: semver::Version,
366 bootstrap_complete: bool,
368 metrics: Arc<Metrics>,
370 size_at_last_consolidation: Option<usize>,
374}
375
376impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
377 #[mz_ore::instrument]
379 async fn current_upper(&mut self) -> Timestamp {
380 match self.mode {
381 Mode::Writable | Mode::Readonly => {
382 let upper = self.write_handle.fetch_recent_upper().await;
383 antichain_to_timestamp(upper.clone())
384 }
385 Mode::Savepoint => self.upper,
386 }
387 }
388
389 #[mz_ore::instrument]
393 pub(crate) async fn compare_and_append<S: IntoStateUpdateKindJson>(
394 &mut self,
395 updates: Vec<(S, Diff)>,
396 commit_ts: Timestamp,
397 ) -> Result<Timestamp, CompareAndAppendError> {
398 let contains_fence = if mz_ore::assert::soft_assertions_enabled() {
400 let parsed_updates: Vec<_> = updates
401 .clone()
402 .into_iter()
403 .filter_map(|(update, diff)| {
404 let update: StateUpdateKindJson = update.into();
405 let update = TryIntoStateUpdateKind::try_into(update).ok()?;
406 Some((update, diff))
407 })
408 .collect();
409 let contains_retraction = parsed_updates.iter().any(|(update, diff)| {
410 matches!(update, StateUpdateKind::FenceToken(..)) && *diff == Diff::MINUS_ONE
411 });
412 let contains_addition = parsed_updates.iter().any(|(update, diff)| {
413 matches!(update, StateUpdateKind::FenceToken(..)) && *diff == Diff::ONE
414 });
415 Some(contains_retraction && contains_addition)
416 } else {
417 None
418 };
419
420 let updates = updates.into_iter().map(|(kind, diff)| {
421 let kind: StateUpdateKindJson = kind.into();
422 (
423 (Into::<SourceData>::into(kind), ()),
424 commit_ts,
425 diff.into_inner(),
426 )
427 });
428 let next_upper = commit_ts.step_forward();
429 self.compare_and_append_inner(updates, next_upper)
430 .await
431 .inspect_err(|e| {
432 if let Some(contains_fence) = contains_fence {
437 soft_assert_or_log!(
438 matches!(e, CompareAndAppendError::Fence(_)) || contains_fence,
439 "encountered an upper mismatch on a non-fencing write"
440 );
441 }
442 })?;
443
444 self.sync(next_upper).await?;
445 Ok(next_upper)
446 }
447
448 async fn compare_and_append_inner(
458 &mut self,
459 updates: impl IntoIterator<Item = ((SourceData, ()), Timestamp, StorageDiff)>,
460 next_upper: Timestamp,
461 ) -> Result<(), CompareAndAppendError> {
462 assert_eq!(self.mode, Mode::Writable);
463 assert!(
464 next_upper > self.upper,
465 "next_upper ({next_upper}) not greater than current upper ({})",
466 self.upper,
467 );
468
469 let res = self
470 .write_handle
471 .compare_and_append(
472 updates,
473 Antichain::from_elem(self.upper),
474 Antichain::from_elem(next_upper),
475 )
476 .await
477 .expect("invalid usage");
478
479 if let Err(e @ UpperMismatch { .. }) = res {
480 self.sync_to_current_upper().await?;
483 return Err(e.into());
484 }
485
486 let downgrade_to = Antichain::from_elem(next_upper.saturating_sub(1));
488
489 let opaque = self.since_handle.opaque().clone();
494 let downgrade = self
495 .since_handle
496 .maybe_compare_and_downgrade_since(&opaque, (&opaque, &downgrade_to))
497 .await;
498 if let Some(Err(e)) = downgrade {
499 soft_panic_or_log!("found opaque value {e:?}, but expected {opaque:?}");
500 }
501
502 Ok(())
503 }
504
505 #[mz_ore::instrument]
508 async fn snapshot_unconsolidated(&mut self) -> Vec<StateUpdate<StateUpdateKind>> {
509 let current_upper = self.current_upper().await;
510
511 let mut snapshot = Vec::new();
512 let mut read_handle = self.read_handle().await;
513 let as_of = as_of(&read_handle, current_upper);
514 let mut stream = Box::pin(
515 read_handle
517 .snapshot_and_stream(Antichain::from_elem(as_of))
518 .await
519 .expect("we have advanced the restart_as_of by the since"),
520 );
521 while let Some(update) = stream.next().await {
522 snapshot.push(update)
523 }
524 read_handle.expire().await;
525 snapshot
526 .into_iter()
527 .map(Into::<StateUpdate<StateUpdateKindJson>>::into)
528 .map(|state_update| state_update.try_into().expect("kind decoding error"))
529 .collect()
530 }
531
532 #[mz_ore::instrument]
536 pub(crate) async fn sync_to_current_upper(&mut self) -> Result<(), FenceError> {
537 let upper = self.current_upper().await;
538 self.sync(upper).await
539 }
540
541 #[mz_ore::instrument(level = "debug")]
545 pub(crate) async fn sync(&mut self, target_upper: Timestamp) -> Result<(), FenceError> {
546 self.metrics.syncs.inc();
547 let counter = self.metrics.sync_latency_seconds.clone();
548 self.sync_inner(target_upper)
549 .wall_time()
550 .inc_by(counter)
551 .await
552 }
553
554 #[mz_ore::instrument(level = "debug")]
555 async fn sync_inner(&mut self, target_upper: Timestamp) -> Result<(), FenceError> {
556 self.fenceable_token.validate()?;
557
558 if self.mode == Mode::Savepoint {
561 self.upper = max(self.upper, target_upper);
562 return Ok(());
563 }
564
565 let mut updates: BTreeMap<_, Vec<_>> = BTreeMap::new();
566
567 self.size_at_last_consolidation = None;
570
571 while self.upper < target_upper {
572 let listen_events = self.listen.fetch_next().await;
573 for listen_event in listen_events {
574 match listen_event {
575 ListenEvent::Progress(upper) => {
576 debug!("synced up to {upper:?}");
577 self.upper = antichain_to_timestamp(upper);
578 while let Some((ts, updates)) = updates.pop_first() {
583 assert!(ts < self.upper, "expected {} < {}", ts, self.upper);
584 let updates = updates.into_iter().map(
585 |update: StateUpdate<StateUpdateKindJson>| {
586 let kind =
587 T::try_from(update.kind).expect("kind decoding error");
588 StateUpdate {
589 kind,
590 ts: update.ts,
591 diff: update.diff,
592 }
593 },
594 );
595 self.apply_updates(updates)?;
596 self.maybe_consolidate();
597 }
598 }
599 ListenEvent::Updates(batch_updates) => {
600 for update in batch_updates {
601 let update: StateUpdate<StateUpdateKindJson> = update.into();
602 updates.entry(update.ts).or_default().push(update);
603 }
604 }
605 }
606 }
607 }
608 assert_eq!(updates, BTreeMap::new(), "all updates should be applied");
609 self.consolidate();
611 Ok(())
612 }
613
614 pub(crate) fn apply_updates_and_consolidate(
621 &mut self,
622 updates: impl IntoIterator<Item = StateUpdate<T>>,
623 ) -> Result<(), FenceError> {
624 self.apply_updates(updates)?;
625 self.consolidate();
626 Ok(())
627 }
628
629 #[mz_ore::instrument(level = "debug")]
637 fn apply_updates(
638 &mut self,
639 updates: impl IntoIterator<Item = StateUpdate<T>>,
640 ) -> Result<(), FenceError> {
641 let mut updates: Vec<_> = updates
642 .into_iter()
643 .map(|StateUpdate { kind, ts, diff }| (kind, ts, diff))
644 .collect();
645
646 differential_dataflow::consolidation::consolidate_updates(&mut updates);
650
651 updates.sort_by(|(_, ts1, diff1), (_, ts2, diff2)| ts1.cmp(ts2).then(diff1.cmp(diff2)));
654
655 let mut errors = Vec::new();
656
657 for (kind, ts, diff) in updates {
658 if diff != Diff::ONE && diff != Diff::MINUS_ONE {
659 panic!("invalid update in consolidated trace: ({kind:?}, {ts:?}, {diff:?})");
660 }
661
662 match self.update_applier.apply_update(
663 StateUpdate { kind, ts, diff },
664 &mut self.fenceable_token,
665 &self.metrics,
666 ) {
667 Ok(Some(StateUpdate { kind, ts, diff })) => self.snapshot.push((kind, ts, diff)),
668 Ok(None) => {}
669 Err(err) => errors.push(err),
672 }
673 }
674
675 let len = i64::try_from(self.snapshot.len()).unwrap_or(i64::MAX);
677 if len > self.metrics.snapshot_max_entries.get() {
678 self.metrics.snapshot_max_entries.set(len);
679 }
680
681 errors.sort();
682 if let Some(err) = errors.into_iter().next() {
683 return Err(err);
684 }
685
686 Ok(())
687 }
688
689 fn maybe_consolidate(&mut self) {
694 let threshold = *self
695 .size_at_last_consolidation
696 .get_or_insert_with(|| max(self.snapshot.len(), 8));
699 if self.snapshot.len() >= threshold * 2 {
700 self.consolidate();
701 self.size_at_last_consolidation = Some(self.snapshot.len());
702 }
703 }
704
705 #[mz_ore::instrument]
706 pub(crate) fn consolidate(&mut self) {
707 self.metrics.snapshot_consolidations.inc();
708 soft_assert_no_log!(
709 self.snapshot
710 .windows(2)
711 .all(|updates| updates[0].1 <= updates[1].1),
712 "snapshot should be sorted by timestamp, {:#?}",
713 self.snapshot
714 );
715
716 let new_ts = self
717 .snapshot
718 .last()
719 .map(|(_, ts, _)| *ts)
720 .unwrap_or_else(Timestamp::minimum);
721 for (_, ts, _) in &mut self.snapshot {
722 *ts = new_ts;
723 }
724 differential_dataflow::consolidation::consolidate_updates(&mut self.snapshot);
725 }
726
727 async fn with_trace<R>(
731 &mut self,
732 f: impl FnOnce(&Vec<(T, Timestamp, Diff)>) -> Result<R, CatalogError>,
733 ) -> Result<R, CatalogError> {
734 self.sync_to_current_upper().await?;
735 f(&self.snapshot)
736 }
737
738 async fn read_handle(&self) -> ReadHandle<SourceData, (), Timestamp, StorageDiff> {
740 self.persist_client
741 .open_leased_reader(
742 self.shard_id,
743 Arc::new(persist_desc()),
744 Arc::new(UnitSchema::default()),
745 Diagnostics {
746 shard_name: CATALOG_SHARD_NAME.to_string(),
747 handle_purpose: "openable durable catalog state temporary reader".to_string(),
748 },
749 USE_CRITICAL_SINCE_CATALOG.get(self.persist_client.dyncfgs()),
750 )
751 .await
752 .expect("invalid usage")
753 }
754
755 async fn expire(self: Box<Self>) {
757 self.write_handle.expire().await;
758 self.listen.expire().await;
759 }
760}
761
762impl<U: ApplyUpdate<StateUpdateKind>> PersistHandle<StateUpdateKind, U> {
763 async fn with_snapshot<T>(
767 &mut self,
768 f: impl FnOnce(Snapshot) -> Result<T, CatalogError>,
769 ) -> Result<T, CatalogError> {
770 fn apply<K, V>(map: &mut BTreeMap<K, V>, key: &K, value: &V, diff: Diff)
771 where
772 K: Ord + Clone,
773 V: Ord + Clone + Debug,
774 {
775 let key = key.clone();
776 let value = value.clone();
777 if diff == Diff::ONE {
778 let prev = map.insert(key, value);
779 assert_eq!(
780 prev, None,
781 "values must be explicitly retracted before inserting a new value"
782 );
783 } else if diff == Diff::MINUS_ONE {
784 let prev = map.remove(&key);
785 assert_eq!(
786 prev,
787 Some(value),
788 "retraction does not match existing value"
789 );
790 }
791 }
792
793 self.with_trace(|trace| {
794 let mut snapshot = Snapshot::empty();
795 for (kind, ts, diff) in trace {
796 let diff = *diff;
797 if diff != Diff::ONE && diff != Diff::MINUS_ONE {
798 panic!("invalid update in consolidated trace: ({kind:?}, {ts:?}, {diff:?})");
799 }
800
801 match kind {
802 StateUpdateKind::AuditLog(_key, ()) => {
803 }
805 StateUpdateKind::Cluster(key, value) => {
806 apply(&mut snapshot.clusters, key, value, diff);
807 }
808 StateUpdateKind::ClusterReplica(key, value) => {
809 apply(&mut snapshot.cluster_replicas, key, value, diff);
810 }
811 StateUpdateKind::Comment(key, value) => {
812 apply(&mut snapshot.comments, key, value, diff);
813 }
814 StateUpdateKind::Config(key, value) => {
815 apply(&mut snapshot.configs, key, value, diff);
816 }
817 StateUpdateKind::Database(key, value) => {
818 apply(&mut snapshot.databases, key, value, diff);
819 }
820 StateUpdateKind::DefaultPrivilege(key, value) => {
821 apply(&mut snapshot.default_privileges, key, value, diff);
822 }
823 StateUpdateKind::FenceToken(_token) => {
824 }
826 StateUpdateKind::IdAllocator(key, value) => {
827 apply(&mut snapshot.id_allocator, key, value, diff);
828 }
829 StateUpdateKind::IntrospectionSourceIndex(key, value) => {
830 apply(&mut snapshot.introspection_sources, key, value, diff);
831 }
832 StateUpdateKind::Item(key, value) => {
833 apply(&mut snapshot.items, key, value, diff);
834 }
835 StateUpdateKind::NetworkPolicy(key, value) => {
836 apply(&mut snapshot.network_policies, key, value, diff);
837 }
838 StateUpdateKind::Role(key, value) => {
839 apply(&mut snapshot.roles, key, value, diff);
840 }
841 StateUpdateKind::Schema(key, value) => {
842 apply(&mut snapshot.schemas, key, value, diff);
843 }
844 StateUpdateKind::Setting(key, value) => {
845 apply(&mut snapshot.settings, key, value, diff);
846 }
847 StateUpdateKind::SourceReferences(key, value) => {
848 apply(&mut snapshot.source_references, key, value, diff);
849 }
850 StateUpdateKind::SystemConfiguration(key, value) => {
851 apply(&mut snapshot.system_configurations, key, value, diff);
852 }
853 StateUpdateKind::ClusterSystemConfiguration(key, value) => {
854 apply(
855 &mut snapshot.cluster_system_configurations,
856 key,
857 value,
858 diff,
859 );
860 }
861 StateUpdateKind::ReplicaSystemConfiguration(key, value) => {
862 apply(
863 &mut snapshot.replica_system_configurations,
864 key,
865 value,
866 diff,
867 );
868 }
869 StateUpdateKind::SystemObjectMapping(key, value) => {
870 apply(&mut snapshot.system_object_mappings, key, value, diff);
871 }
872 StateUpdateKind::SystemPrivilege(key, value) => {
873 apply(&mut snapshot.system_privileges, key, value, diff);
874 }
875 StateUpdateKind::StorageCollectionMetadata(key, value) => {
876 apply(&mut snapshot.storage_collection_metadata, key, value, diff);
877 }
878 StateUpdateKind::UnfinalizedShard(key, ()) => {
879 apply(&mut snapshot.unfinalized_shards, key, &(), diff);
880 }
881 StateUpdateKind::TxnWalShard((), value) => {
882 apply(&mut snapshot.txn_wal_shard, &(), value, diff);
883 }
884 StateUpdateKind::RoleAuth(key, value) => {
885 apply(&mut snapshot.role_auth, key, value, diff);
886 }
887 }
888 }
889 f(snapshot)
890 })
891 .await
892 }
893
894 #[mz_ore::instrument(level = "debug")]
901 async fn persist_snapshot(&self) -> impl Iterator<Item = StateUpdate> + DoubleEndedIterator {
902 let mut read_handle = self.read_handle().await;
903 let as_of = as_of(&read_handle, self.upper);
904 let snapshot = snapshot_binary(&mut read_handle, as_of, &self.metrics)
905 .await
906 .map(|update| update.try_into().expect("kind decoding error"));
907 read_handle.expire().await;
908 snapshot
909 }
910}
911
912#[derive(Debug)]
914pub(crate) struct UnopenedCatalogStateInner {
915 configs: BTreeMap<String, u64>,
917 settings: BTreeMap<String, String>,
919}
920
921impl UnopenedCatalogStateInner {
922 fn new() -> UnopenedCatalogStateInner {
923 UnopenedCatalogStateInner {
924 configs: BTreeMap::new(),
925 settings: BTreeMap::new(),
926 }
927 }
928}
929
930impl ApplyUpdate<StateUpdateKindJson> for UnopenedCatalogStateInner {
931 fn apply_update(
932 &mut self,
933 update: StateUpdate<StateUpdateKindJson>,
934 current_fence_token: &mut FenceableToken,
935 _metrics: &Arc<Metrics>,
936 ) -> Result<Option<StateUpdate<StateUpdateKindJson>>, FenceError> {
937 if !update.kind.is_audit_log() && update.kind.is_always_deserializable() {
938 let kind = TryInto::try_into(&update.kind).expect("kind is known to be deserializable");
939 match (kind, update.diff) {
940 (StateUpdateKind::Config(key, value), Diff::ONE) => {
941 let prev = self.configs.insert(key.key, value.value);
942 assert_eq!(
943 prev, None,
944 "values must be explicitly retracted before inserting a new value"
945 );
946 }
947 (StateUpdateKind::Config(key, value), Diff::MINUS_ONE) => {
948 let prev = self.configs.remove(&key.key);
949 assert_eq!(
950 prev,
951 Some(value.value),
952 "retraction does not match existing value"
953 );
954 }
955 (StateUpdateKind::Setting(key, value), Diff::ONE) => {
956 let prev = self.settings.insert(key.name, value.value);
957 assert_eq!(
958 prev, None,
959 "values must be explicitly retracted before inserting a new value"
960 );
961 }
962 (StateUpdateKind::Setting(key, value), Diff::MINUS_ONE) => {
963 let prev = self.settings.remove(&key.name);
964 assert_eq!(
965 prev,
966 Some(value.value),
967 "retraction does not match existing value"
968 );
969 }
970 (StateUpdateKind::FenceToken(fence_token), Diff::ONE) => {
971 current_fence_token.maybe_fence(fence_token)?;
972 }
973 _ => {}
974 }
975 }
976
977 Ok(Some(update))
978 }
979}
980
981pub(crate) type UnopenedPersistCatalogState =
989 PersistHandle<StateUpdateKindJson, UnopenedCatalogStateInner>;
990
991impl UnopenedPersistCatalogState {
992 #[mz_ore::instrument]
998 pub(crate) async fn new(
999 persist_client: PersistClient,
1000 organization_id: Uuid,
1001 version: semver::Version,
1002 deploy_generation: Option<u64>,
1003 metrics: Arc<Metrics>,
1004 ) -> Result<UnopenedPersistCatalogState, DurableCatalogError> {
1005 let catalog_shard_id = shard_id(organization_id, CATALOG_SEED);
1006 debug!(?catalog_shard_id, "new persist backed catalog state");
1007
1008 let version_in_catalog_shard =
1012 fetch_catalog_shard_version(&persist_client, catalog_shard_id).await;
1013 if let Some(version_in_catalog_shard) = version_in_catalog_shard {
1014 if !mz_persist_client::cfg::code_can_write_data(&version, &version_in_catalog_shard) {
1015 return Err(DurableCatalogError::IncompatiblePersistVersion {
1016 found_version: version_in_catalog_shard,
1017 catalog_version: version,
1018 });
1019 }
1020 }
1021
1022 let open_handles_start = Instant::now();
1023 info!("startup: envd serve: catalog init: open handles beginning");
1024 let since_handle = persist_client
1025 .open_critical_since(
1026 catalog_shard_id,
1027 CATALOG_CRITICAL_SINCE.clone(),
1028 Opaque::encode(&i64::MIN),
1029 Diagnostics {
1030 shard_name: CATALOG_SHARD_NAME.to_string(),
1031 handle_purpose: "durable catalog state critical since".to_string(),
1032 },
1033 )
1034 .await
1035 .expect("invalid usage");
1036
1037 let (mut write_handle, mut read_handle) = persist_client
1038 .open(
1039 catalog_shard_id,
1040 Arc::new(persist_desc()),
1041 Arc::new(UnitSchema::default()),
1042 Diagnostics {
1043 shard_name: CATALOG_SHARD_NAME.to_string(),
1044 handle_purpose: "durable catalog state handles".to_string(),
1045 },
1046 USE_CRITICAL_SINCE_CATALOG.get(persist_client.dyncfgs()),
1047 )
1048 .await
1049 .expect("invalid usage");
1050 info!(
1051 "startup: envd serve: catalog init: open handles complete in {:?}",
1052 open_handles_start.elapsed()
1053 );
1054
1055 let upper = {
1057 const EMPTY_UPDATES: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
1058 let upper = Antichain::from_elem(Timestamp::minimum());
1059 let next_upper = Timestamp::minimum().step_forward();
1060 match write_handle
1061 .compare_and_append(EMPTY_UPDATES, upper, Antichain::from_elem(next_upper))
1062 .await
1063 .expect("invalid usage")
1064 {
1065 Ok(()) => next_upper,
1066 Err(mismatch) => antichain_to_timestamp(mismatch.current),
1067 }
1068 };
1069
1070 let snapshot_start = Instant::now();
1071 info!("startup: envd serve: catalog init: snapshot beginning");
1072 let as_of = as_of(&read_handle, upper);
1073 let snapshot: Vec<_> = snapshot_binary(&mut read_handle, as_of, &metrics)
1074 .await
1075 .map(|StateUpdate { kind, ts, diff }| (kind, ts, diff))
1076 .collect();
1077 let listen = read_handle
1078 .listen(Antichain::from_elem(as_of))
1079 .await
1080 .expect("invalid usage");
1081 info!(
1082 "startup: envd serve: catalog init: snapshot complete in {:?}",
1083 snapshot_start.elapsed()
1084 );
1085
1086 let mut handle = UnopenedPersistCatalogState {
1087 mode: Mode::Writable,
1089 since_handle,
1090 write_handle,
1091 listen,
1092 persist_client,
1093 shard_id: catalog_shard_id,
1094 snapshot: Vec::new(),
1096 update_applier: UnopenedCatalogStateInner::new(),
1097 upper,
1098 fenceable_token: FenceableToken::new(deploy_generation),
1099 catalog_content_version: version,
1100 bootstrap_complete: false,
1101 metrics,
1102 size_at_last_consolidation: None,
1103 };
1104 soft_assert_no_log!(
1107 snapshot.iter().all(|(_, _, diff)| *diff == Diff::ONE),
1108 "snapshot should be consolidated: {snapshot:#?}"
1109 );
1110
1111 let apply_start = Instant::now();
1112 info!("startup: envd serve: catalog init: apply updates beginning");
1113 let updates = snapshot
1114 .into_iter()
1115 .map(|(kind, ts, diff)| StateUpdate { kind, ts, diff });
1116 handle.apply_updates_and_consolidate(updates)?;
1117 info!(
1118 "startup: envd serve: catalog init: apply updates complete in {:?}",
1119 apply_start.elapsed()
1120 );
1121
1122 if let Some(found_version) = handle.get_catalog_content_version().await? {
1128 if handle
1130 .catalog_content_version
1131 .cmp_precedence(&found_version)
1132 == std::cmp::Ordering::Less
1133 {
1134 return Err(DurableCatalogError::IncompatiblePersistVersion {
1135 found_version,
1136 catalog_version: handle.catalog_content_version,
1137 });
1138 }
1139 }
1140
1141 Ok(handle)
1142 }
1143
1144 #[mz_ore::instrument]
1145 async fn open_inner(
1146 mut self,
1147 mode: Mode,
1148 initial_ts: Timestamp,
1149 bootstrap_args: &BootstrapArgs,
1150 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1151 let mut commit_ts = self.upper;
1154 self.mode = mode;
1155
1156 match (&self.mode, &self.fenceable_token) {
1158 (_, FenceableToken::Unfenced { .. } | FenceableToken::Fenced { .. }) => {
1159 return Err(DurableCatalogError::Internal(
1160 "catalog should not have fenced before opening".to_string(),
1161 )
1162 .into());
1163 }
1164 (
1165 Mode::Writable | Mode::Savepoint,
1166 FenceableToken::Initializing {
1167 current_deploy_generation: None,
1168 ..
1169 },
1170 ) => {
1171 return Err(DurableCatalogError::Internal(format!(
1172 "cannot open in mode '{:?}' without a deploy generation",
1173 self.mode,
1174 ))
1175 .into());
1176 }
1177 _ => {}
1178 }
1179
1180 let read_only = matches!(self.mode, Mode::Readonly);
1181
1182 loop {
1184 self.sync_to_current_upper().await?;
1185 commit_ts = max(commit_ts, self.upper);
1186 let (fence_updates, current_fenceable_token) = self
1187 .fenceable_token
1188 .generate_unfenced_token(self.mode)?
1189 .ok_or_else(|| {
1190 DurableCatalogError::Internal(
1191 "catalog should not have fenced before opening".to_string(),
1192 )
1193 })?;
1194 debug!(
1195 ?self.upper,
1196 ?self.fenceable_token,
1197 ?current_fenceable_token,
1198 "fencing previous catalogs"
1199 );
1200 if matches!(self.mode, Mode::Writable) {
1201 match self
1202 .compare_and_append(fence_updates.clone(), commit_ts)
1203 .await
1204 {
1205 Ok(upper) => {
1206 commit_ts = upper;
1207 }
1208 Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
1209 Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
1210 warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
1211 continue;
1212 }
1213 }
1214 }
1215 self.fenceable_token = current_fenceable_token;
1216 break;
1217 }
1218
1219 if matches!(self.mode, Mode::Writable) {
1220 let mut controller_handle = self
1227 .persist_client
1228 .open_critical_since::<SourceData, (), Timestamp, StorageDiff>(
1229 self.shard_id,
1230 PersistClient::CONTROLLER_CRITICAL_SINCE,
1231 Opaque::encode(&i64::MIN),
1232 Diagnostics {
1233 shard_name: CATALOG_SHARD_NAME.to_string(),
1234 handle_purpose: "durable catalog state critical since (migration)"
1235 .to_string(),
1236 },
1237 )
1238 .await
1239 .expect("invalid usage");
1240
1241 let since = controller_handle.since().clone();
1242 let res = controller_handle
1243 .compare_and_downgrade_since(
1244 &Opaque::encode(&i64::MIN),
1245 (&Opaque::encode(&PersistEpoch::default()), &since),
1246 )
1247 .await;
1248 match res {
1249 Ok(_) => info!("migrated Opaque of catalog since handle"),
1250 Err(_) => { }
1251 }
1252 }
1253
1254 let is_initialized = self.is_initialized_inner();
1255 if !matches!(self.mode, Mode::Writable) && !is_initialized {
1256 return Err(CatalogError::Durable(DurableCatalogError::NotWritable(
1257 format!(
1258 "catalog tables do not exist; will not create in {:?} mode",
1259 self.mode
1260 ),
1261 )));
1262 }
1263 soft_assert_ne_or_log!(self.upper, Timestamp::minimum());
1264
1265 let (audit_logs, snapshot): (Vec<_>, Vec<_>) = self
1267 .snapshot
1268 .into_iter()
1269 .partition(|(update, _, _)| update.is_audit_log());
1270 self.snapshot = snapshot;
1271 let audit_log_count = audit_logs.iter().map(|(_, _, diff)| diff).sum::<Diff>();
1272 let audit_log_handle = AuditLogIterator::new(audit_logs);
1273
1274 if is_initialized && !read_only {
1276 commit_ts = upgrade(&mut self, commit_ts).await?;
1277 }
1278
1279 debug!(
1280 ?is_initialized,
1281 ?self.upper,
1282 "initializing catalog state"
1283 );
1284 let mut catalog = PersistCatalogState {
1285 mode: self.mode,
1286 since_handle: self.since_handle,
1287 write_handle: self.write_handle,
1288 listen: self.listen,
1289 persist_client: self.persist_client,
1290 shard_id: self.shard_id,
1291 upper: self.upper,
1292 fenceable_token: self.fenceable_token,
1293 snapshot: Vec::new(),
1295 update_applier: CatalogStateInner::new(),
1296 catalog_content_version: self.catalog_content_version,
1297 bootstrap_complete: false,
1298 metrics: self.metrics,
1299 size_at_last_consolidation: None,
1300 };
1301 catalog.metrics.collection_entries.reset();
1302 catalog
1305 .metrics
1306 .collection_entries
1307 .with_label_values(&[&CollectionType::AuditLog.to_string()])
1308 .add(audit_log_count.into_inner());
1309 let updates = self.snapshot.into_iter().map(|(kind, ts, diff)| {
1310 let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
1311 StateUpdate { kind, ts, diff }
1312 });
1313 catalog.apply_updates_and_consolidate(updates)?;
1314
1315 let catalog_content_version = catalog.catalog_content_version.to_string();
1316 let txn = if is_initialized {
1317 let mut txn = catalog.transaction().await?;
1318
1319 if txn.get_setting("migration_version".into()).is_none() && mode != Mode::Readonly {
1326 let old_version = txn.get_catalog_content_version();
1327 txn.set_setting("migration_version".into(), old_version.map(Into::into))?;
1328 }
1329
1330 txn.set_catalog_content_version(catalog_content_version)?;
1331 txn
1332 } else {
1333 soft_assert_eq_no_log!(
1334 catalog
1335 .snapshot
1336 .iter()
1337 .filter(|(kind, _, _)| !matches!(kind, StateUpdateKind::FenceToken(_)))
1338 .count(),
1339 0,
1340 "trace should not contain any updates for an uninitialized catalog: {:#?}",
1341 catalog.snapshot
1342 );
1343
1344 let mut txn = catalog.transaction().await?;
1345 initialize::initialize(
1346 &mut txn,
1347 bootstrap_args,
1348 initial_ts.into(),
1349 catalog_content_version,
1350 )
1351 .await?;
1352 txn
1353 };
1354
1355 if read_only {
1356 let (txn_batch, _) = txn.into_parts();
1357 let updates = StateUpdate::from_txn_batch_ts(txn_batch, catalog.upper);
1359 catalog.apply_updates_and_consolidate(updates)?;
1360 } else {
1361 txn.commit_internal(commit_ts).await?;
1362 }
1363
1364 if matches!(catalog.mode, Mode::Writable) {
1365 let write_handle = catalog
1366 .persist_client
1367 .open_writer::<SourceData, (), Timestamp, i64>(
1368 catalog.write_handle.shard_id(),
1369 Arc::new(persist_desc()),
1370 Arc::new(UnitSchema::default()),
1371 Diagnostics {
1372 shard_name: CATALOG_SHARD_NAME.to_string(),
1373 handle_purpose: "compact catalog".to_string(),
1374 },
1375 )
1376 .await
1377 .expect("invalid usage");
1378 let fuel = CATALOG_FORCE_COMPACTION_FUEL.handle(catalog.persist_client.dyncfgs());
1379 let wait = CATALOG_FORCE_COMPACTION_WAIT.handle(catalog.persist_client.dyncfgs());
1380 let _task = mz_ore::task::spawn(|| "catalog::force_shard_compaction", async move {
1383 let () =
1384 mz_persist_client::cli::admin::dangerous_force_compaction_and_break_pushdown(
1385 &write_handle,
1386 || fuel.get(),
1387 || wait.get(),
1388 )
1389 .await;
1390 });
1391 }
1392
1393 Ok((Box::new(catalog), audit_log_handle))
1394 }
1395
1396 #[mz_ore::instrument]
1401 fn is_initialized_inner(&self) -> bool {
1402 !self.update_applier.configs.is_empty()
1403 }
1404
1405 #[mz_ore::instrument]
1409 async fn get_current_config(&mut self, key: &str) -> Result<Option<u64>, DurableCatalogError> {
1410 self.sync_to_current_upper().await?;
1411 Ok(self.update_applier.configs.get(key).cloned())
1412 }
1413
1414 #[mz_ore::instrument]
1418 pub(crate) async fn get_user_version(&mut self) -> Result<Option<u64>, DurableCatalogError> {
1419 self.get_current_config(USER_VERSION_KEY).await
1420 }
1421
1422 #[mz_ore::instrument]
1426 async fn get_current_setting(
1427 &mut self,
1428 name: &str,
1429 ) -> Result<Option<String>, DurableCatalogError> {
1430 self.sync_to_current_upper().await?;
1431 Ok(self.update_applier.settings.get(name).cloned())
1432 }
1433
1434 #[mz_ore::instrument]
1439 async fn get_catalog_content_version(
1440 &mut self,
1441 ) -> Result<Option<semver::Version>, DurableCatalogError> {
1442 let version = self
1443 .get_current_setting(CATALOG_CONTENT_VERSION_KEY)
1444 .await?;
1445 let version = version.map(|version| version.parse().expect("invalid version persisted"));
1446 Ok(version)
1447 }
1448}
1449
1450#[async_trait]
1451impl OpenableDurableCatalogState for UnopenedPersistCatalogState {
1452 #[mz_ore::instrument]
1453 async fn open_savepoint(
1454 mut self: Box<Self>,
1455 initial_ts: Timestamp,
1456 bootstrap_args: &BootstrapArgs,
1457 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1458 self.open_inner(Mode::Savepoint, initial_ts, bootstrap_args)
1459 .boxed()
1460 .await
1461 }
1462
1463 #[mz_ore::instrument]
1464 async fn open_read_only(
1465 mut self: Box<Self>,
1466 bootstrap_args: &BootstrapArgs,
1467 ) -> Result<Box<dyn DurableCatalogState>, CatalogError> {
1468 self.open_inner(Mode::Readonly, EpochMillis::MIN.into(), bootstrap_args)
1469 .boxed()
1470 .await
1471 .map(|(catalog, _)| catalog)
1472 }
1473
1474 #[mz_ore::instrument]
1475 async fn open(
1476 mut self: Box<Self>,
1477 initial_ts: Timestamp,
1478 bootstrap_args: &BootstrapArgs,
1479 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1480 self.open_inner(Mode::Writable, initial_ts, bootstrap_args)
1481 .boxed()
1482 .await
1483 }
1484
1485 #[mz_ore::instrument(level = "debug")]
1486 async fn open_debug(mut self: Box<Self>) -> Result<DebugCatalogState, CatalogError> {
1487 Ok(DebugCatalogState(*self))
1488 }
1489
1490 #[mz_ore::instrument]
1491 async fn is_initialized(&mut self) -> Result<bool, CatalogError> {
1492 self.sync_to_current_upper().await?;
1493 Ok(self.is_initialized_inner())
1494 }
1495
1496 #[mz_ore::instrument]
1497 async fn epoch(&mut self) -> Result<Epoch, CatalogError> {
1498 self.sync_to_current_upper().await?;
1499 self.fenceable_token
1500 .validate()?
1501 .map(|token| token.epoch)
1502 .ok_or(CatalogError::Durable(DurableCatalogError::Uninitialized))
1503 }
1504
1505 #[mz_ore::instrument]
1506 async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError> {
1507 self.sync_to_current_upper().await?;
1508 self.fenceable_token
1509 .token()
1510 .map(|token| token.deploy_generation)
1511 .ok_or(CatalogError::Durable(DurableCatalogError::Uninitialized))
1512 }
1513
1514 #[mz_ore::instrument(level = "debug")]
1515 async fn get_0dt_deployment_max_wait(&mut self) -> Result<Option<Duration>, CatalogError> {
1516 let value = self
1517 .get_current_config(WITH_0DT_DEPLOYMENT_MAX_WAIT)
1518 .await?;
1519 match value {
1520 None => Ok(None),
1521 Some(millis) => Ok(Some(Duration::from_millis(millis))),
1522 }
1523 }
1524
1525 #[mz_ore::instrument(level = "debug")]
1526 async fn get_0dt_deployment_ddl_check_interval(
1527 &mut self,
1528 ) -> Result<Option<Duration>, CatalogError> {
1529 let value = self
1530 .get_current_config(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL)
1531 .await?;
1532 match value {
1533 None => Ok(None),
1534 Some(millis) => Ok(Some(Duration::from_millis(millis))),
1535 }
1536 }
1537
1538 #[mz_ore::instrument(level = "debug")]
1539 async fn get_enable_0dt_deployment_panic_after_timeout(
1540 &mut self,
1541 ) -> Result<Option<bool>, CatalogError> {
1542 let value = self
1543 .get_current_config(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT)
1544 .await?;
1545 match value {
1546 None => Ok(None),
1547 Some(0) => Ok(Some(false)),
1548 Some(1) => Ok(Some(true)),
1549 Some(v) => Err(
1550 DurableCatalogError::from(TryFromProtoError::UnknownEnumVariant(format!(
1551 "{v} is not a valid boolean value"
1552 )))
1553 .into(),
1554 ),
1555 }
1556 }
1557
1558 #[mz_ore::instrument]
1559 async fn has_system_config_synced_once(&mut self) -> Result<bool, DurableCatalogError> {
1560 self.get_current_config(SYSTEM_CONFIG_SYNCED_KEY)
1561 .await
1562 .map(|value| value.map(|value| value > 0).unwrap_or(false))
1563 }
1564
1565 #[mz_ore::instrument]
1566 async fn trace_unconsolidated(&mut self) -> Result<Trace, CatalogError> {
1567 self.sync_to_current_upper().await?;
1568 if self.is_initialized_inner() {
1569 let snapshot = self.snapshot_unconsolidated().await;
1570 Ok(Trace::from_snapshot(snapshot))
1571 } else {
1572 Err(CatalogError::Durable(DurableCatalogError::Uninitialized))
1573 }
1574 }
1575
1576 #[mz_ore::instrument]
1577 async fn trace_consolidated(&mut self) -> Result<Trace, CatalogError> {
1578 self.sync_to_current_upper().await?;
1579 if self.is_initialized_inner() {
1580 let snapshot = self.current_snapshot().await?;
1581 Ok(Trace::from_snapshot(snapshot))
1582 } else {
1583 Err(CatalogError::Durable(DurableCatalogError::Uninitialized))
1584 }
1585 }
1586
1587 #[mz_ore::instrument(level = "debug")]
1588 async fn expire(self: Box<Self>) {
1589 self.expire().await
1590 }
1591}
1592
1593#[derive(Debug)]
1595struct CatalogStateInner {
1596 updates: VecDeque<memory::objects::StateUpdate>,
1598}
1599
1600impl CatalogStateInner {
1601 fn new() -> CatalogStateInner {
1602 CatalogStateInner {
1603 updates: VecDeque::new(),
1604 }
1605 }
1606}
1607
1608impl ApplyUpdate<StateUpdateKind> for CatalogStateInner {
1609 fn apply_update(
1610 &mut self,
1611 update: StateUpdate<StateUpdateKind>,
1612 current_fence_token: &mut FenceableToken,
1613 metrics: &Arc<Metrics>,
1614 ) -> Result<Option<StateUpdate<StateUpdateKind>>, FenceError> {
1615 if let Some(collection_type) = update.kind.collection_type() {
1616 metrics
1617 .collection_entries
1618 .with_label_values(&[&collection_type.to_string()])
1619 .add(update.diff.into_inner());
1620 }
1621
1622 {
1623 let update: Option<memory::objects::StateUpdate> = (&update)
1624 .try_into()
1625 .expect("invalid persisted update: {update:#?}");
1626 if let Some(update) = update {
1627 self.updates.push_back(update);
1628 }
1629 }
1630
1631 match (update.kind, update.diff) {
1632 (StateUpdateKind::AuditLog(_, ()), _) => Ok(None),
1633 (StateUpdateKind::FenceToken(_), Diff::MINUS_ONE) => Ok(None),
1635 (StateUpdateKind::FenceToken(token), Diff::ONE) => {
1636 current_fence_token.maybe_fence(token)?;
1637 Ok(None)
1638 }
1639 (kind, diff) => Ok(Some(StateUpdate {
1640 kind,
1641 ts: update.ts,
1642 diff,
1643 })),
1644 }
1645 }
1646}
1647
1648type PersistCatalogState = PersistHandle<StateUpdateKind, CatalogStateInner>;
1654
1655#[async_trait]
1656impl ReadOnlyDurableCatalogState for PersistCatalogState {
1657 fn epoch(&self) -> Epoch {
1658 self.fenceable_token
1659 .token()
1660 .expect("opened catalog state must have an epoch")
1661 .epoch
1662 }
1663
1664 fn metrics(&self) -> &Metrics {
1665 &self.metrics
1666 }
1667
1668 #[mz_ore::instrument(level = "debug")]
1669 async fn expire(self: Box<Self>) {
1670 self.expire().await
1671 }
1672
1673 fn is_bootstrap_complete(&self) -> bool {
1674 self.bootstrap_complete
1675 }
1676
1677 async fn get_audit_logs(&mut self) -> Result<Vec<VersionedEvent>, CatalogError> {
1678 self.sync_to_current_upper().await?;
1679 let audit_logs: Vec<_> = self
1680 .persist_snapshot()
1681 .await
1682 .filter_map(
1683 |StateUpdate {
1684 kind,
1685 ts: _,
1686 diff: _,
1687 }| match kind {
1688 StateUpdateKind::AuditLog(key, ()) => Some(key),
1689 _ => None,
1690 },
1691 )
1692 .collect();
1693 let mut audit_logs: Vec<_> = audit_logs
1694 .into_iter()
1695 .map(RustType::from_proto)
1696 .map_ok(|key: AuditLogKey| key.event)
1697 .collect::<Result<_, _>>()?;
1698 audit_logs.sort_by(|a, b| a.sortable_id().cmp(&b.sortable_id()));
1699 Ok(audit_logs)
1700 }
1701
1702 #[mz_ore::instrument(level = "debug")]
1703 async fn get_next_id(&mut self, id_type: &str) -> Result<u64, CatalogError> {
1704 self.with_trace(|trace| {
1705 Ok(trace
1706 .into_iter()
1707 .rev()
1708 .filter_map(|(kind, _, _)| match kind {
1709 StateUpdateKind::IdAllocator(key, value) if key.name == id_type => {
1710 Some(value.next_id)
1711 }
1712 _ => None,
1713 })
1714 .next()
1715 .expect("must exist"))
1716 })
1717 .await
1718 }
1719
1720 #[mz_ore::instrument(level = "debug")]
1721 async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError> {
1722 self.sync_to_current_upper().await?;
1723 Ok(self
1724 .fenceable_token
1725 .token()
1726 .expect("opened catalogs must have a token")
1727 .deploy_generation)
1728 }
1729
1730 #[mz_ore::instrument(level = "debug")]
1731 async fn snapshot(&mut self) -> Result<Snapshot, CatalogError> {
1732 self.with_snapshot(Ok).await
1733 }
1734
1735 #[mz_ore::instrument(level = "debug")]
1736 async fn sync_to_current_updates(
1737 &mut self,
1738 ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError> {
1739 let upper = self.current_upper().await;
1740 self.sync_updates(upper).await
1741 }
1742
1743 #[mz_ore::instrument(level = "debug")]
1744 async fn sync_updates(
1745 &mut self,
1746 target_upper: mz_repr::Timestamp,
1747 ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError> {
1748 self.sync(target_upper).await?;
1749 let mut updates = Vec::new();
1750 while let Some(update) = self.update_applier.updates.front() {
1751 if update.ts >= target_upper {
1752 break;
1753 }
1754
1755 let update = self
1756 .update_applier
1757 .updates
1758 .pop_front()
1759 .expect("peeked above");
1760 updates.push(update);
1761 }
1762 Ok(updates)
1763 }
1764
1765 async fn current_upper(&mut self) -> Timestamp {
1766 self.current_upper().await
1767 }
1768}
1769
1770#[async_trait]
1771#[allow(mismatched_lifetime_syntaxes)]
1772impl DurableCatalogState for PersistCatalogState {
1773 fn is_read_only(&self) -> bool {
1774 matches!(self.mode, Mode::Readonly)
1775 }
1776
1777 fn is_savepoint(&self) -> bool {
1778 matches!(self.mode, Mode::Savepoint)
1779 }
1780
1781 async fn mark_bootstrap_complete(&mut self) {
1782 self.bootstrap_complete = true;
1783 if matches!(self.mode, Mode::Writable) {
1784 self.since_handle
1785 .upgrade_version()
1786 .await
1787 .expect("invalid usage")
1788 }
1789 }
1790
1791 #[mz_ore::instrument(level = "debug")]
1792 async fn transaction(&mut self) -> Result<Transaction, CatalogError> {
1793 self.metrics.transactions_started.inc();
1794 let snapshot = self.snapshot().await?;
1795 let commit_ts = self.upper.clone();
1796 Transaction::new(self, snapshot, commit_ts)
1797 }
1798
1799 fn transaction_from_snapshot(
1800 &mut self,
1801 snapshot: Snapshot,
1802 ) -> Result<Transaction, CatalogError> {
1803 let commit_ts = self.upper.clone();
1804 Transaction::new(self, snapshot, commit_ts)
1805 }
1806
1807 #[mz_ore::instrument(level = "debug")]
1808 async fn commit_transaction(
1809 &mut self,
1810 txn_batch: TransactionBatch,
1811 commit_ts: Timestamp,
1812 ) -> Result<Timestamp, CatalogError> {
1813 async fn commit_transaction_inner(
1814 catalog: &mut PersistCatalogState,
1815 txn_batch: TransactionBatch,
1816 commit_ts: Timestamp,
1817 ) -> Result<Timestamp, CatalogError> {
1818 if catalog.mode == Mode::Readonly {
1822 let updates: Vec<_> = StateUpdate::from_txn_batch(txn_batch).collect();
1823 if !updates.is_empty() {
1824 let collection_types: Vec<_> = updates
1825 .iter()
1826 .filter_map(|u| u.0.collection_type())
1827 .collect();
1828 return Err(DurableCatalogError::NotWritable(format!(
1829 "cannot commit a transaction in a read-only catalog: \
1830 {} updates across collections: {collection_types:?}",
1831 updates.len(),
1832 ))
1833 .into());
1834 }
1835 return Ok(catalog.upper);
1836 }
1837
1838 assert_eq!(
1843 catalog.upper, txn_batch.upper,
1844 "only one transaction at a time is supported"
1845 );
1846
1847 assert!(
1848 commit_ts >= catalog.upper,
1849 "expected commit ts, {}, to be greater than or equal to upper, {}",
1850 commit_ts,
1851 catalog.upper
1852 );
1853
1854 let updates = StateUpdate::from_txn_batch(txn_batch).collect();
1855 debug!("committing updates: {updates:?}");
1856
1857 let next_upper = match catalog.mode {
1858 Mode::Writable => catalog
1859 .compare_and_append(updates, commit_ts)
1860 .await
1861 .map_err(|e| e.unwrap_fence_error())?,
1862 Mode::Savepoint => {
1863 let updates = updates.into_iter().map(|(kind, diff)| StateUpdate {
1864 kind,
1865 ts: commit_ts,
1866 diff,
1867 });
1868 catalog.apply_updates_and_consolidate(updates)?;
1869 catalog.upper = commit_ts.step_forward();
1870 catalog.upper
1871 }
1872 Mode::Readonly => unreachable!("handled above"),
1873 };
1874
1875 Ok(next_upper)
1876 }
1877 self.metrics.transaction_commits.inc();
1878 let counter = self.metrics.transaction_commit_latency_seconds.clone();
1879 commit_transaction_inner(self, txn_batch, commit_ts)
1880 .wall_time()
1881 .inc_by(counter)
1882 .await
1883 }
1884
1885 #[mz_ore::instrument(level = "debug")]
1886 async fn advance_upper(&mut self, new_upper: Timestamp) -> Result<(), CatalogError> {
1887 if self.upper >= new_upper {
1888 soft_panic_or_log!(
1896 "new_upper ({new_upper}) not greater than current upper ({})",
1897 self.upper
1898 );
1899 return Ok(());
1900 }
1901
1902 match self.mode {
1903 Mode::Writable => self
1904 .compare_and_append_inner([], new_upper)
1905 .await
1906 .map_err(|e| e.unwrap_fence_error())?,
1907 Mode::Savepoint => (),
1908 Mode::Readonly => {
1909 return Err(DurableCatalogError::NotWritable(
1910 "cannot advance upper of a read-only catalog".into(),
1911 )
1912 .into());
1913 }
1914 }
1915
1916 self.upper = new_upper;
1917 Ok(())
1920 }
1921
1922 fn shard_id(&self) -> ShardId {
1923 self.shard_id
1924 }
1925}
1926
1927pub fn shard_id(organization_id: Uuid, seed: usize) -> ShardId {
1929 let hash = sha2::Sha256::digest(format!("{organization_id}{seed}")).to_vec();
1930 soft_assert_eq_or_log!(hash.len(), 32, "SHA256 returns 32 bytes (256 bits)");
1931 let uuid = Uuid::from_slice(&hash[0..16]).expect("from_slice accepts exactly 16 bytes");
1932 ShardId::from_str(&format!("s{uuid}")).expect("known to be valid")
1933}
1934
1935fn as_of(
1938 read_handle: &ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1939 upper: Timestamp,
1940) -> Timestamp {
1941 let since = read_handle.since().clone();
1942 let mut as_of = upper.checked_sub(1).unwrap_or_else(|| {
1943 panic!("catalog persist shard should be initialize, found upper: {upper:?}")
1944 });
1945 soft_assert_or_log!(
1948 since.less_equal(&as_of),
1949 "since={since:?}, as_of={as_of:?}; since must be less than or equal to as_of"
1950 );
1951 as_of.advance_by(since.borrow());
1954 as_of
1955}
1956
1957async fn fetch_catalog_shard_version(
1960 persist_client: &PersistClient,
1961 catalog_shard_id: ShardId,
1962) -> Option<semver::Version> {
1963 let shard_state = persist_client
1964 .inspect_shard::<Timestamp>(&catalog_shard_id)
1965 .await
1966 .ok()?;
1967 let json_state = serde_json::to_value(shard_state).expect("state serialization error");
1968 let json_version = json_state
1969 .get("applier_version")
1970 .cloned()
1971 .expect("missing applier_version");
1972 let version = serde_json::from_value(json_version).expect("version deserialization error");
1973 Some(version)
1974}
1975
1976#[mz_ore::instrument(level = "debug")]
1981async fn snapshot_binary(
1982 read_handle: &mut ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1983 as_of: Timestamp,
1984 metrics: &Arc<Metrics>,
1985) -> impl Iterator<Item = StateUpdate<StateUpdateKindJson>> + DoubleEndedIterator + use<> {
1986 metrics.snapshots_taken.inc();
1987 let counter = metrics.snapshot_latency_seconds.clone();
1988 snapshot_binary_inner(read_handle, as_of)
1989 .wall_time()
1990 .inc_by(counter)
1991 .await
1992}
1993
1994#[mz_ore::instrument(level = "debug")]
1999async fn snapshot_binary_inner(
2000 read_handle: &mut ReadHandle<SourceData, (), Timestamp, StorageDiff>,
2001 as_of: Timestamp,
2002) -> impl Iterator<Item = StateUpdate<StateUpdateKindJson>> + DoubleEndedIterator + use<> {
2003 let snapshot = read_handle
2004 .snapshot_and_fetch(Antichain::from_elem(as_of))
2005 .await
2006 .expect("we have advanced the restart_as_of by the since");
2007 soft_assert_no_log!(
2008 snapshot.iter().all(|(_, _, diff)| *diff == 1),
2009 "snapshot_and_fetch guarantees a consolidated result: {snapshot:#?}"
2010 );
2011 snapshot
2012 .into_iter()
2013 .map(Into::<StateUpdate<StateUpdateKindJson>>::into)
2014 .sorted_by(|a, b| Ord::cmp(&b.ts, &a.ts))
2015}
2016
2017pub(crate) fn antichain_to_timestamp(antichain: Antichain<Timestamp>) -> Timestamp {
2022 antichain
2023 .into_option()
2024 .expect("we use a totally ordered time and never finalize the shard")
2025}
2026
2027impl Trace {
2030 fn from_snapshot(snapshot: impl IntoIterator<Item = StateUpdate>) -> Trace {
2032 let mut trace = Trace::new();
2033 for StateUpdate { kind, ts, diff } in snapshot {
2034 match kind {
2035 StateUpdateKind::AuditLog(k, v) => trace.audit_log.values.push(((k, v), ts, diff)),
2036 StateUpdateKind::Cluster(k, v) => trace.clusters.values.push(((k, v), ts, diff)),
2037 StateUpdateKind::ClusterReplica(k, v) => {
2038 trace.cluster_replicas.values.push(((k, v), ts, diff))
2039 }
2040 StateUpdateKind::Comment(k, v) => trace.comments.values.push(((k, v), ts, diff)),
2041 StateUpdateKind::Config(k, v) => trace.configs.values.push(((k, v), ts, diff)),
2042 StateUpdateKind::Database(k, v) => trace.databases.values.push(((k, v), ts, diff)),
2043 StateUpdateKind::DefaultPrivilege(k, v) => {
2044 trace.default_privileges.values.push(((k, v), ts, diff))
2045 }
2046 StateUpdateKind::FenceToken(_) => {
2047 }
2049 StateUpdateKind::IdAllocator(k, v) => {
2050 trace.id_allocator.values.push(((k, v), ts, diff))
2051 }
2052 StateUpdateKind::IntrospectionSourceIndex(k, v) => {
2053 trace.introspection_sources.values.push(((k, v), ts, diff))
2054 }
2055 StateUpdateKind::Item(k, v) => trace.items.values.push(((k, v), ts, diff)),
2056 StateUpdateKind::NetworkPolicy(k, v) => {
2057 trace.network_policies.values.push(((k, v), ts, diff))
2058 }
2059 StateUpdateKind::Role(k, v) => trace.roles.values.push(((k, v), ts, diff)),
2060 StateUpdateKind::Schema(k, v) => trace.schemas.values.push(((k, v), ts, diff)),
2061 StateUpdateKind::Setting(k, v) => trace.settings.values.push(((k, v), ts, diff)),
2062 StateUpdateKind::SourceReferences(k, v) => {
2063 trace.source_references.values.push(((k, v), ts, diff))
2064 }
2065 StateUpdateKind::SystemConfiguration(k, v) => {
2066 trace.system_configurations.values.push(((k, v), ts, diff))
2067 }
2068 StateUpdateKind::ClusterSystemConfiguration(k, v) => trace
2069 .cluster_system_configurations
2070 .values
2071 .push(((k, v), ts, diff)),
2072 StateUpdateKind::ReplicaSystemConfiguration(k, v) => trace
2073 .replica_system_configurations
2074 .values
2075 .push(((k, v), ts, diff)),
2076 StateUpdateKind::SystemObjectMapping(k, v) => {
2077 trace.system_object_mappings.values.push(((k, v), ts, diff))
2078 }
2079 StateUpdateKind::SystemPrivilege(k, v) => {
2080 trace.system_privileges.values.push(((k, v), ts, diff))
2081 }
2082 StateUpdateKind::StorageCollectionMetadata(k, v) => trace
2083 .storage_collection_metadata
2084 .values
2085 .push(((k, v), ts, diff)),
2086 StateUpdateKind::UnfinalizedShard(k, ()) => {
2087 trace.unfinalized_shards.values.push(((k, ()), ts, diff))
2088 }
2089 StateUpdateKind::TxnWalShard((), v) => {
2090 trace.txn_wal_shard.values.push((((), v), ts, diff))
2091 }
2092 StateUpdateKind::RoleAuth(k, v) => trace.role_auth.values.push(((k, v), ts, diff)),
2093 }
2094 }
2095 trace
2096 }
2097}
2098
2099impl UnopenedPersistCatalogState {
2100 #[mz_ore::instrument]
2102 pub(crate) async fn debug_edit<T: Collection>(
2103 &mut self,
2104 key: T::Key,
2105 value: T::Value,
2106 ) -> Result<Option<T::Value>, CatalogError>
2107 where
2108 T::Key: PartialEq + Eq + Debug + Clone,
2109 T::Value: Debug + Clone,
2110 {
2111 let prev_value = loop {
2112 let key = key.clone();
2113 let value = value.clone();
2114 let snapshot = self.current_snapshot().await?;
2115 let trace = Trace::from_snapshot(snapshot);
2116 let collection_trace = T::collection_trace(trace);
2117 let prev_values: Vec<_> = collection_trace
2118 .values
2119 .into_iter()
2120 .filter(|((k, _), _, diff)| {
2121 soft_assert_eq_or_log!(*diff, Diff::ONE, "trace is consolidated");
2122 &key == k
2123 })
2124 .collect();
2125
2126 let prev_value = match &prev_values[..] {
2127 [] => None,
2128 [((_, v), _, _)] => Some(v.clone()),
2129 prev_values => panic!("multiple values found for key {key:?}: {prev_values:?}"),
2130 };
2131
2132 let mut updates: Vec<_> = prev_values
2133 .into_iter()
2134 .map(|((k, v), _, _)| (T::update(k, v), Diff::MINUS_ONE))
2135 .collect();
2136 updates.push((T::update(key, value), Diff::ONE));
2137 match self.fenceable_token.generate_unfenced_token(self.mode)? {
2139 Some((fence_updates, current_fenceable_token)) => {
2140 updates.extend(fence_updates.clone());
2141 match self.compare_and_append(updates, self.upper).await {
2142 Ok(_) => {
2143 self.fenceable_token = current_fenceable_token;
2144 break prev_value;
2145 }
2146 Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
2147 Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
2148 warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
2149 continue;
2150 }
2151 }
2152 }
2153 None => {
2154 self.compare_and_append(updates, self.upper)
2155 .await
2156 .map_err(|e| e.unwrap_fence_error())?;
2157 break prev_value;
2158 }
2159 }
2160 };
2161 Ok(prev_value)
2162 }
2163
2164 #[mz_ore::instrument]
2166 pub(crate) async fn debug_delete<T: Collection>(
2167 &mut self,
2168 key: T::Key,
2169 ) -> Result<(), CatalogError>
2170 where
2171 T::Key: PartialEq + Eq + Debug + Clone,
2172 T::Value: Debug,
2173 {
2174 loop {
2175 let key = key.clone();
2176 let snapshot = self.current_snapshot().await?;
2177 let trace = Trace::from_snapshot(snapshot);
2178 let collection_trace = T::collection_trace(trace);
2179 let mut retractions: Vec<_> = collection_trace
2180 .values
2181 .into_iter()
2182 .filter(|((k, _), _, diff)| {
2183 soft_assert_eq_or_log!(*diff, Diff::ONE, "trace is consolidated");
2184 &key == k
2185 })
2186 .map(|((k, v), _, _)| (T::update(k, v), Diff::MINUS_ONE))
2187 .collect();
2188
2189 match self.fenceable_token.generate_unfenced_token(self.mode)? {
2191 Some((fence_updates, current_fenceable_token)) => {
2192 retractions.extend(fence_updates.clone());
2193 match self.compare_and_append(retractions, self.upper).await {
2194 Ok(_) => {
2195 self.fenceable_token = current_fenceable_token;
2196 break;
2197 }
2198 Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
2199 Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
2200 warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
2201 continue;
2202 }
2203 }
2204 }
2205 None => {
2206 self.compare_and_append(retractions, self.upper)
2207 .await
2208 .map_err(|e| e.unwrap_fence_error())?;
2209 break;
2210 }
2211 }
2212 }
2213 Ok(())
2214 }
2215
2216 async fn current_snapshot(
2221 &mut self,
2222 ) -> Result<impl IntoIterator<Item = StateUpdate> + '_, CatalogError> {
2223 self.sync_to_current_upper().await?;
2224 self.consolidate();
2225 Ok(self.snapshot.iter().cloned().map(|(kind, ts, diff)| {
2226 let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
2227 StateUpdate { kind, ts, diff }
2228 }))
2229 }
2230}