1#[cfg(test)]
11mod tests;
12
13use std::cmp::max;
14use std::collections::{BTreeMap, VecDeque};
15use std::fmt::Debug;
16use std::str::FromStr;
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use async_trait::async_trait;
21use differential_dataflow::lattice::Lattice;
22use futures::{FutureExt, StreamExt};
23use itertools::Itertools;
24use mz_audit_log::VersionedEvent;
25use mz_ore::metrics::MetricsFutureExt;
26use mz_ore::now::EpochMillis;
27use mz_ore::{
28 soft_assert_eq_no_log, soft_assert_eq_or_log, soft_assert_ne_or_log, soft_assert_no_log,
29 soft_assert_or_log, soft_panic_or_log,
30};
31use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
32use mz_persist_client::cli::admin::{CATALOG_FORCE_COMPACTION_FUEL, CATALOG_FORCE_COMPACTION_WAIT};
33use mz_persist_client::critical::SinceHandle;
34use mz_persist_client::error::UpperMismatch;
35use mz_persist_client::read::{Listen, ListenEvent, ReadHandle};
36use mz_persist_client::write::WriteHandle;
37use mz_persist_client::{Diagnostics, PersistClient, ShardId};
38use mz_persist_types::codec_impls::UnitSchema;
39use mz_proto::{RustType, TryFromProtoError};
40use mz_repr::{Diff, RelationDesc, ScalarType};
41use mz_storage_types::StorageDiff;
42use mz_storage_types::sources::SourceData;
43use sha2::Digest;
44use timely::Container;
45use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
46use tracing::{debug, info, warn};
47use uuid::Uuid;
48
49use crate::durable::debug::{Collection, CollectionType, DebugCatalogState, Trace};
50use crate::durable::error::FenceError;
51use crate::durable::initialize::{
52 ENABLE_0DT_DEPLOYMENT, ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, SYSTEM_CONFIG_SYNCED_KEY,
53 USER_VERSION_KEY, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
54};
55use crate::durable::metrics::Metrics;
56use crate::durable::objects::state_update::{
57 IntoStateUpdateKindJson, StateUpdate, StateUpdateKind, StateUpdateKindJson,
58 TryIntoStateUpdateKind,
59};
60use crate::durable::objects::{AuditLogKey, FenceToken, Snapshot};
61use crate::durable::transaction::TransactionBatch;
62use crate::durable::upgrade::upgrade;
63use crate::durable::{
64 AuditLogIterator, BootstrapArgs, CATALOG_CONTENT_VERSION_KEY, CatalogError,
65 DurableCatalogError, DurableCatalogState, Epoch, OpenableDurableCatalogState,
66 ReadOnlyDurableCatalogState, Transaction, initialize,
67};
68use crate::memory;
69
70pub(crate) type Timestamp = mz_repr::Timestamp;
72
73const MIN_EPOCH: Epoch = unsafe { Epoch::new_unchecked(1) };
78
79const CATALOG_SHARD_NAME: &str = "catalog";
81const UPGRADE_SHARD_NAME: &str = "catalog_upgrade";
83
84const CATALOG_SEED: usize = 1;
86const UPGRADE_SEED: usize = 2;
117pub const _BUILTIN_MIGRATION_SEED: usize = 3;
119pub const _EXPRESSION_CACHE_SEED: usize = 4;
121
122#[derive(Debug, Copy, Clone, Eq, PartialEq)]
124pub(crate) enum Mode {
125 Readonly,
127 Savepoint,
129 Writable,
131}
132
133#[derive(Debug)]
135pub(crate) enum FenceableToken {
136 Initializing {
139 durable_token: Option<FenceToken>,
141 current_deploy_generation: Option<u64>,
143 },
144 Unfenced { current_token: FenceToken },
146 Fenced {
148 current_token: FenceToken,
149 fence_token: FenceToken,
150 },
151}
152
153impl FenceableToken {
154 fn new(current_deploy_generation: Option<u64>) -> Self {
156 Self::Initializing {
157 durable_token: None,
158 current_deploy_generation,
159 }
160 }
161
162 fn validate(&self) -> Result<Option<FenceToken>, FenceError> {
164 match self {
165 FenceableToken::Initializing { durable_token, .. } => Ok(durable_token.clone()),
166 FenceableToken::Unfenced { current_token, .. } => Ok(Some(current_token.clone())),
167 FenceableToken::Fenced {
168 current_token,
169 fence_token,
170 } => {
171 assert!(
172 fence_token > current_token,
173 "must be fenced by higher token; current={current_token:?}, fence={fence_token:?}"
174 );
175 if fence_token.deploy_generation > current_token.deploy_generation {
176 Err(FenceError::DeployGeneration {
177 current_generation: current_token.deploy_generation,
178 fence_generation: fence_token.deploy_generation,
179 })
180 } else {
181 assert!(
182 fence_token.epoch > current_token.epoch,
183 "must be fenced by higher token; current={current_token:?}, fence={fence_token:?}"
184 );
185 Err(FenceError::Epoch {
186 current_epoch: current_token.epoch,
187 fence_epoch: fence_token.epoch,
188 })
189 }
190 }
191 }
192 }
193
194 fn token(&self) -> Option<FenceToken> {
196 match self {
197 FenceableToken::Initializing { durable_token, .. } => durable_token.clone(),
198 FenceableToken::Unfenced { current_token, .. } => Some(current_token.clone()),
199 FenceableToken::Fenced { current_token, .. } => Some(current_token.clone()),
200 }
201 }
202
203 fn maybe_fence(&mut self, token: FenceToken) -> Result<(), FenceError> {
205 match self {
206 FenceableToken::Initializing {
207 durable_token,
208 current_deploy_generation,
209 ..
210 } => {
211 match durable_token {
212 Some(durable_token) => {
213 *durable_token = max(durable_token.clone(), token.clone());
214 }
215 None => {
216 *durable_token = Some(token.clone());
217 }
218 }
219 if let Some(current_deploy_generation) = current_deploy_generation {
220 if *current_deploy_generation < token.deploy_generation {
221 *self = FenceableToken::Fenced {
222 current_token: FenceToken {
223 deploy_generation: *current_deploy_generation,
224 epoch: token.epoch,
225 },
226 fence_token: token,
227 };
228 self.validate()?;
229 }
230 }
231 }
232 FenceableToken::Unfenced { current_token } => {
233 if *current_token < token {
234 *self = FenceableToken::Fenced {
235 current_token: current_token.clone(),
236 fence_token: token,
237 };
238 self.validate()?;
239 }
240 }
241 FenceableToken::Fenced { .. } => {
242 self.validate()?;
243 }
244 }
245
246 Ok(())
247 }
248
249 fn generate_unfenced_token(
253 &self,
254 mode: Mode,
255 ) -> Result<Option<(Vec<(StateUpdateKind, Diff)>, FenceableToken)>, DurableCatalogError> {
256 let (durable_token, current_deploy_generation) = match self {
257 FenceableToken::Initializing {
258 durable_token,
259 current_deploy_generation,
260 } => (durable_token.clone(), current_deploy_generation.clone()),
261 FenceableToken::Unfenced { .. } | FenceableToken::Fenced { .. } => return Ok(None),
262 };
263
264 let mut fence_updates = Vec::with_capacity(2);
265
266 if let Some(durable_token) = &durable_token {
267 fence_updates.push((
268 StateUpdateKind::FenceToken(durable_token.clone()),
269 Diff::MINUS_ONE,
270 ));
271 }
272
273 let current_deploy_generation = current_deploy_generation
274 .or_else(|| durable_token.as_ref().map(|token| token.deploy_generation))
275 .ok_or(DurableCatalogError::Uninitialized)?;
277 let mut current_epoch = durable_token
278 .map(|token| token.epoch)
279 .unwrap_or(MIN_EPOCH)
280 .get();
281 if matches!(mode, Mode::Writable) {
283 current_epoch = current_epoch + 1;
284 }
285 let current_epoch = Epoch::new(current_epoch).expect("known to be non-zero");
286 let current_token = FenceToken {
287 deploy_generation: current_deploy_generation,
288 epoch: current_epoch,
289 };
290
291 fence_updates.push((
292 StateUpdateKind::FenceToken(current_token.clone()),
293 Diff::ONE,
294 ));
295
296 let current_fenceable_token = FenceableToken::Unfenced { current_token };
297
298 Ok(Some((fence_updates, current_fenceable_token)))
299 }
300}
301
302#[derive(Debug, thiserror::Error)]
304pub(crate) enum CompareAndAppendError {
305 #[error(transparent)]
306 Fence(#[from] FenceError),
307 #[error(
310 "expected catalog upper {expected_upper:?} did not match actual catalog upper {actual_upper:?}"
311 )]
312 UpperMismatch {
313 expected_upper: Timestamp,
314 actual_upper: Timestamp,
315 },
316}
317
318impl CompareAndAppendError {
319 pub(crate) fn unwrap_fence_error(self) -> FenceError {
320 match self {
321 CompareAndAppendError::Fence(e) => e,
322 e @ CompareAndAppendError::UpperMismatch { .. } => {
323 panic!("unexpected upper mismatch: {e:?}")
324 }
325 }
326 }
327}
328
329impl From<UpperMismatch<Timestamp>> for CompareAndAppendError {
330 fn from(upper_mismatch: UpperMismatch<Timestamp>) -> Self {
331 Self::UpperMismatch {
332 expected_upper: antichain_to_timestamp(upper_mismatch.expected),
333 actual_upper: antichain_to_timestamp(upper_mismatch.current),
334 }
335 }
336}
337
338pub(crate) trait ApplyUpdate<T: IntoStateUpdateKindJson> {
339 fn apply_update(
343 &mut self,
344 update: StateUpdate<T>,
345 current_fence_token: &mut FenceableToken,
346 metrics: &Arc<Metrics>,
347 ) -> Result<Option<StateUpdate<T>>, FenceError>;
348}
349
350#[derive(Debug)]
364pub(crate) struct PersistHandle<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> {
365 pub(crate) mode: Mode,
367 since_handle: SinceHandle<SourceData, (), Timestamp, StorageDiff, i64>,
369 write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
371 listen: Listen<SourceData, (), Timestamp, StorageDiff>,
373 persist_client: PersistClient,
375 shard_id: ShardId,
377 pub(crate) snapshot: Vec<(T, Timestamp, Diff)>,
381 update_applier: U,
383 pub(crate) upper: Timestamp,
385 fenceable_token: FenceableToken,
387 catalog_content_version: semver::Version,
389 bootstrap_complete: bool,
391 metrics: Arc<Metrics>,
393}
394
395impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
396 async fn increment_catalog_upgrade_shard_version(&self, organization_id: Uuid) {
398 let upgrade_shard_id = shard_id(organization_id, UPGRADE_SEED);
399 let mut write_handle: WriteHandle<(), (), Timestamp, StorageDiff> = self
400 .persist_client
401 .open_writer(
402 upgrade_shard_id,
403 Arc::new(UnitSchema::default()),
404 Arc::new(UnitSchema::default()),
405 Diagnostics {
406 shard_name: UPGRADE_SHARD_NAME.to_string(),
407 handle_purpose: "increment durable catalog upgrade shard version".to_string(),
408 },
409 )
410 .await
411 .expect("invalid usage");
412 const EMPTY_UPDATES: &[(((), ()), Timestamp, StorageDiff)] = &[];
413 let mut upper = write_handle.fetch_recent_upper().await.clone();
414 loop {
415 let next_upper = upper
416 .iter()
417 .map(|timestamp| timestamp.step_forward())
418 .collect();
419 match write_handle
420 .compare_and_append(EMPTY_UPDATES, upper, next_upper)
421 .await
422 .expect("invalid usage")
423 {
424 Ok(()) => break,
425 Err(upper_mismatch) => {
426 upper = upper_mismatch.current;
427 }
428 }
429 }
430 }
431
432 #[mz_ore::instrument]
434 async fn current_upper(&mut self) -> Timestamp {
435 match self.mode {
436 Mode::Writable | Mode::Readonly => {
437 let upper = self.write_handle.fetch_recent_upper().await;
438 antichain_to_timestamp(upper.clone())
439 }
440 Mode::Savepoint => self.upper,
441 }
442 }
443
444 #[mz_ore::instrument]
448 pub(crate) async fn compare_and_append<S: IntoStateUpdateKindJson>(
449 &mut self,
450 updates: Vec<(S, Diff)>,
451 commit_ts: Timestamp,
452 ) -> Result<Timestamp, CompareAndAppendError> {
453 assert_eq!(self.mode, Mode::Writable);
454 assert!(
455 commit_ts >= self.upper,
456 "expected commit ts, {}, to be greater than or equal to upper, {}",
457 commit_ts,
458 self.upper
459 );
460
461 let contains_fence = if mz_ore::assert::soft_assertions_enabled() {
464 let updates: Vec<_> = updates.clone();
465 let parsed_updates: Vec<_> = updates
466 .clone()
467 .into_iter()
468 .map(|(update, diff)| {
469 let update: StateUpdateKindJson = update.into();
470 (update, diff)
471 })
472 .filter_map(|(update, diff)| {
473 <StateUpdateKindJson as TryIntoStateUpdateKind>::try_into(update)
474 .ok()
475 .map(|update| (update, diff))
476 })
477 .collect();
478 let contains_retraction = parsed_updates.iter().any(|(update, diff)| {
479 matches!(update, StateUpdateKind::FenceToken(..)) && *diff == Diff::MINUS_ONE
480 });
481 let contains_addition = parsed_updates.iter().any(|(update, diff)| {
482 matches!(update, StateUpdateKind::FenceToken(..)) && *diff == Diff::ONE
483 });
484 let contains_fence = contains_retraction && contains_addition;
485 Some((contains_fence, updates))
486 } else {
487 None
488 };
489
490 let updates = updates.into_iter().map(|(kind, diff)| {
491 let kind: StateUpdateKindJson = kind.into();
492 (
493 (Into::<SourceData>::into(kind), ()),
494 commit_ts,
495 diff.into_inner(),
496 )
497 });
498 let next_upper = commit_ts.step_forward();
499 let res = self
500 .write_handle
501 .compare_and_append(
502 updates,
503 Antichain::from_elem(self.upper),
504 Antichain::from_elem(next_upper),
505 )
506 .await
507 .expect("invalid usage");
508
509 if let Err(e @ UpperMismatch { .. }) = res {
514 self.sync_to_current_upper().await?;
515 if let Some((contains_fence, updates)) = contains_fence {
516 assert!(
517 contains_fence,
518 "updates were neither fenced nor fencing and encountered an upper mismatch: {updates:#?}"
519 )
520 }
521 return Err(e.into());
522 }
523
524 let downgrade_to = Antichain::from_elem(next_upper.saturating_sub(1));
526
527 let opaque = *self.since_handle.opaque();
532 let downgrade = self
533 .since_handle
534 .maybe_compare_and_downgrade_since(&opaque, (&opaque, &downgrade_to))
535 .await;
536
537 match downgrade {
538 None => {}
539 Some(Err(e)) => soft_panic_or_log!("found opaque value {e}, but expected {opaque}"),
540 Some(Ok(updated)) => soft_assert_or_log!(
541 updated == downgrade_to,
542 "updated bound should match expected"
543 ),
544 }
545 self.sync(next_upper).await?;
546 Ok(next_upper)
547 }
548
549 #[mz_ore::instrument]
552 async fn snapshot_unconsolidated(&mut self) -> Vec<StateUpdate<StateUpdateKind>> {
553 let current_upper = self.current_upper().await;
554
555 let mut snapshot = Vec::new();
556 let mut read_handle = self.read_handle().await;
557 let as_of = as_of(&read_handle, current_upper);
558 let mut stream = Box::pin(
559 read_handle
561 .snapshot_and_stream(Antichain::from_elem(as_of))
562 .await
563 .expect("we have advanced the restart_as_of by the since"),
564 );
565 while let Some(update) = stream.next().await {
566 snapshot.push(update)
567 }
568 read_handle.expire().await;
569 snapshot
570 .into_iter()
571 .map(Into::<StateUpdate<StateUpdateKindJson>>::into)
572 .map(|state_update| state_update.try_into().expect("kind decoding error"))
573 .collect()
574 }
575
576 #[mz_ore::instrument]
580 pub(crate) async fn sync_to_current_upper(&mut self) -> Result<(), FenceError> {
581 let upper = self.current_upper().await;
582 self.sync(upper).await
583 }
584
585 #[mz_ore::instrument(level = "debug")]
589 pub(crate) async fn sync(&mut self, target_upper: Timestamp) -> Result<(), FenceError> {
590 self.metrics.syncs.inc();
591 let counter = self.metrics.sync_latency_seconds.clone();
592 self.sync_inner(target_upper)
593 .wall_time()
594 .inc_by(counter)
595 .await
596 }
597
598 #[mz_ore::instrument(level = "debug")]
599 async fn sync_inner(&mut self, target_upper: Timestamp) -> Result<(), FenceError> {
600 self.fenceable_token.validate()?;
601
602 if self.mode == Mode::Savepoint {
605 self.upper = max(self.upper, target_upper);
606 return Ok(());
607 }
608
609 let mut updates: BTreeMap<_, Vec<_>> = BTreeMap::new();
610
611 while self.upper < target_upper {
612 let listen_events = self.listen.fetch_next().await;
613 for listen_event in listen_events {
614 match listen_event {
615 ListenEvent::Progress(upper) => {
616 debug!("synced up to {upper:?}");
617 self.upper = antichain_to_timestamp(upper);
618 while let Some((ts, updates)) = updates.pop_first() {
623 assert!(ts < self.upper, "expected {} < {}", ts, self.upper);
624 let updates = updates.into_iter().map(
625 |update: StateUpdate<StateUpdateKindJson>| {
626 let kind =
627 T::try_from(update.kind).expect("kind decoding error");
628 StateUpdate {
629 kind,
630 ts: update.ts,
631 diff: update.diff,
632 }
633 },
634 );
635 self.apply_updates(updates)?;
636 }
637 }
638 ListenEvent::Updates(batch_updates) => {
639 for update in batch_updates {
640 let update: StateUpdate<StateUpdateKindJson> = update.into();
641 updates.entry(update.ts).or_default().push(update);
642 }
643 }
644 }
645 }
646 }
647 assert_eq!(updates, BTreeMap::new(), "all updates should be applied");
648 Ok(())
649 }
650
651 #[mz_ore::instrument(level = "debug")]
652 pub(crate) fn apply_updates(
653 &mut self,
654 updates: impl IntoIterator<Item = StateUpdate<T>>,
655 ) -> Result<(), FenceError> {
656 let mut updates: Vec<_> = updates
657 .into_iter()
658 .map(|StateUpdate { kind, ts, diff }| (kind, ts, diff))
659 .collect();
660
661 differential_dataflow::consolidation::consolidate_updates(&mut updates);
665
666 updates.sort_by(|(_, ts1, diff1), (_, ts2, diff2)| ts1.cmp(ts2).then(diff1.cmp(diff2)));
669
670 let mut errors = Vec::new();
671
672 for (kind, ts, diff) in updates {
673 if diff != Diff::ONE && diff != Diff::MINUS_ONE {
674 panic!("invalid update in consolidated trace: ({kind:?}, {ts:?}, {diff:?})");
675 }
676
677 match self.update_applier.apply_update(
678 StateUpdate { kind, ts, diff },
679 &mut self.fenceable_token,
680 &self.metrics,
681 ) {
682 Ok(Some(StateUpdate { kind, ts, diff })) => self.snapshot.push((kind, ts, diff)),
683 Ok(None) => {}
684 Err(err) => errors.push(err),
687 }
688 }
689
690 errors.sort();
691 if let Some(err) = errors.into_iter().next() {
692 return Err(err);
693 }
694
695 self.consolidate();
696
697 Ok(())
698 }
699
700 #[mz_ore::instrument]
701 pub(crate) fn consolidate(&mut self) {
702 soft_assert_no_log!(
703 self.snapshot
704 .windows(2)
705 .all(|updates| updates[0].1 <= updates[1].1),
706 "snapshot should be sorted by timestamp, {:#?}",
707 self.snapshot
708 );
709
710 let new_ts = self
711 .snapshot
712 .last()
713 .map(|(_, ts, _)| *ts)
714 .unwrap_or_else(Timestamp::minimum);
715 for (_, ts, _) in &mut self.snapshot {
716 *ts = new_ts;
717 }
718 differential_dataflow::consolidation::consolidate_updates(&mut self.snapshot);
719 }
720
721 async fn with_trace<R>(
725 &mut self,
726 f: impl FnOnce(&Vec<(T, Timestamp, Diff)>) -> Result<R, CatalogError>,
727 ) -> Result<R, CatalogError> {
728 self.sync_to_current_upper().await?;
729 f(&self.snapshot)
730 }
731
732 async fn read_handle(&self) -> ReadHandle<SourceData, (), Timestamp, StorageDiff> {
734 self.persist_client
735 .open_leased_reader(
736 self.shard_id,
737 Arc::new(desc()),
738 Arc::new(UnitSchema::default()),
739 Diagnostics {
740 shard_name: CATALOG_SHARD_NAME.to_string(),
741 handle_purpose: "openable durable catalog state temporary reader".to_string(),
742 },
743 USE_CRITICAL_SINCE_CATALOG.get(self.persist_client.dyncfgs()),
744 )
745 .await
746 .expect("invalid usage")
747 }
748
749 async fn expire(self: Box<Self>) {
751 self.write_handle.expire().await;
752 self.listen.expire().await;
753 }
754}
755
756impl<U: ApplyUpdate<StateUpdateKind>> PersistHandle<StateUpdateKind, U> {
757 async fn with_snapshot<T>(
761 &mut self,
762 f: impl FnOnce(Snapshot) -> Result<T, CatalogError>,
763 ) -> Result<T, CatalogError> {
764 fn apply<K, V>(map: &mut BTreeMap<K, V>, key: &K, value: &V, diff: Diff)
765 where
766 K: Ord + Clone,
767 V: Ord + Clone + Debug,
768 {
769 let key = key.clone();
770 let value = value.clone();
771 if diff == Diff::ONE {
772 let prev = map.insert(key, value);
773 assert_eq!(
774 prev, None,
775 "values must be explicitly retracted before inserting a new value"
776 );
777 } else if diff == Diff::MINUS_ONE {
778 let prev = map.remove(&key);
779 assert_eq!(
780 prev,
781 Some(value),
782 "retraction does not match existing value"
783 );
784 }
785 }
786
787 self.with_trace(|trace| {
788 let mut snapshot = Snapshot::empty();
789 for (kind, ts, diff) in trace {
790 let diff = *diff;
791 if diff != Diff::ONE && diff != Diff::MINUS_ONE {
792 panic!("invalid update in consolidated trace: ({kind:?}, {ts:?}, {diff:?})");
793 }
794
795 match kind {
796 StateUpdateKind::AuditLog(_key, ()) => {
797 }
799 StateUpdateKind::Cluster(key, value) => {
800 apply(&mut snapshot.clusters, key, value, diff);
801 }
802 StateUpdateKind::ClusterReplica(key, value) => {
803 apply(&mut snapshot.cluster_replicas, key, value, diff);
804 }
805 StateUpdateKind::Comment(key, value) => {
806 apply(&mut snapshot.comments, key, value, diff);
807 }
808 StateUpdateKind::Config(key, value) => {
809 apply(&mut snapshot.configs, key, value, diff);
810 }
811 StateUpdateKind::Database(key, value) => {
812 apply(&mut snapshot.databases, key, value, diff);
813 }
814 StateUpdateKind::DefaultPrivilege(key, value) => {
815 apply(&mut snapshot.default_privileges, key, value, diff);
816 }
817 StateUpdateKind::FenceToken(_token) => {
818 }
820 StateUpdateKind::IdAllocator(key, value) => {
821 apply(&mut snapshot.id_allocator, key, value, diff);
822 }
823 StateUpdateKind::IntrospectionSourceIndex(key, value) => {
824 apply(&mut snapshot.introspection_sources, key, value, diff);
825 }
826 StateUpdateKind::Item(key, value) => {
827 apply(&mut snapshot.items, key, value, diff);
828 }
829 StateUpdateKind::NetworkPolicy(key, value) => {
830 apply(&mut snapshot.network_policies, key, value, diff);
831 }
832 StateUpdateKind::Role(key, value) => {
833 apply(&mut snapshot.roles, key, value, diff);
834 }
835 StateUpdateKind::Schema(key, value) => {
836 apply(&mut snapshot.schemas, key, value, diff);
837 }
838 StateUpdateKind::Setting(key, value) => {
839 apply(&mut snapshot.settings, key, value, diff);
840 }
841 StateUpdateKind::SourceReferences(key, value) => {
842 apply(&mut snapshot.source_references, key, value, diff);
843 }
844 StateUpdateKind::SystemConfiguration(key, value) => {
845 apply(&mut snapshot.system_configurations, key, value, diff);
846 }
847 StateUpdateKind::SystemObjectMapping(key, value) => {
848 apply(&mut snapshot.system_object_mappings, key, value, diff);
849 }
850 StateUpdateKind::SystemPrivilege(key, value) => {
851 apply(&mut snapshot.system_privileges, key, value, diff);
852 }
853 StateUpdateKind::StorageCollectionMetadata(key, value) => {
854 apply(&mut snapshot.storage_collection_metadata, key, value, diff);
855 }
856 StateUpdateKind::UnfinalizedShard(key, ()) => {
857 apply(&mut snapshot.unfinalized_shards, key, &(), diff);
858 }
859 StateUpdateKind::TxnWalShard((), value) => {
860 apply(&mut snapshot.txn_wal_shard, &(), value, diff);
861 }
862 StateUpdateKind::RoleAuth(key, value) => {
863 apply(&mut snapshot.role_auth, key, value, diff);
864 }
865 }
866 }
867 f(snapshot)
868 })
869 .await
870 }
871
872 #[mz_ore::instrument(level = "debug")]
879 async fn persist_snapshot(
880 &mut self,
881 ) -> impl Iterator<Item = StateUpdate> + DoubleEndedIterator {
882 let mut read_handle = self.read_handle().await;
883 let as_of = as_of(&read_handle, self.upper);
884 let snapshot = snapshot_binary(&mut read_handle, as_of, &self.metrics)
885 .await
886 .map(|update| update.try_into().expect("kind decoding error"));
887 read_handle.expire().await;
888 snapshot
889 }
890}
891
892#[derive(Debug)]
894pub(crate) struct UnopenedCatalogStateInner {
895 organization_id: Uuid,
897 configs: BTreeMap<String, u64>,
899 settings: BTreeMap<String, String>,
901}
902
903impl UnopenedCatalogStateInner {
904 fn new(organization_id: Uuid) -> UnopenedCatalogStateInner {
905 UnopenedCatalogStateInner {
906 organization_id,
907 configs: BTreeMap::new(),
908 settings: BTreeMap::new(),
909 }
910 }
911}
912
913impl ApplyUpdate<StateUpdateKindJson> for UnopenedCatalogStateInner {
914 fn apply_update(
915 &mut self,
916 update: StateUpdate<StateUpdateKindJson>,
917 current_fence_token: &mut FenceableToken,
918 _metrics: &Arc<Metrics>,
919 ) -> Result<Option<StateUpdate<StateUpdateKindJson>>, FenceError> {
920 if !update.kind.is_audit_log() && update.kind.is_always_deserializable() {
921 let kind = TryInto::try_into(&update.kind).expect("kind is known to be deserializable");
922 match (kind, update.diff) {
923 (StateUpdateKind::Config(key, value), Diff::ONE) => {
924 let prev = self.configs.insert(key.key, value.value);
925 assert_eq!(
926 prev, None,
927 "values must be explicitly retracted before inserting a new value"
928 );
929 }
930 (StateUpdateKind::Config(key, value), Diff::MINUS_ONE) => {
931 let prev = self.configs.remove(&key.key);
932 assert_eq!(
933 prev,
934 Some(value.value),
935 "retraction does not match existing value"
936 );
937 }
938 (StateUpdateKind::Setting(key, value), Diff::ONE) => {
939 let prev = self.settings.insert(key.name, value.value);
940 assert_eq!(
941 prev, None,
942 "values must be explicitly retracted before inserting a new value"
943 );
944 }
945 (StateUpdateKind::Setting(key, value), Diff::MINUS_ONE) => {
946 let prev = self.settings.remove(&key.name);
947 assert_eq!(
948 prev,
949 Some(value.value),
950 "retraction does not match existing value"
951 );
952 }
953 (StateUpdateKind::FenceToken(fence_token), Diff::ONE) => {
954 current_fence_token.maybe_fence(fence_token)?;
955 }
956 _ => {}
957 }
958 }
959
960 Ok(Some(update))
961 }
962}
963
964pub(crate) type UnopenedPersistCatalogState =
972 PersistHandle<StateUpdateKindJson, UnopenedCatalogStateInner>;
973
974impl UnopenedPersistCatalogState {
975 #[mz_ore::instrument]
981 pub(crate) async fn new(
982 persist_client: PersistClient,
983 organization_id: Uuid,
984 version: semver::Version,
985 deploy_generation: Option<u64>,
986 metrics: Arc<Metrics>,
987 ) -> Result<UnopenedPersistCatalogState, DurableCatalogError> {
988 let catalog_shard_id = shard_id(organization_id, CATALOG_SEED);
989 let upgrade_shard_id = shard_id(organization_id, UPGRADE_SEED);
990 debug!(
991 ?catalog_shard_id,
992 ?upgrade_shard_id,
993 "new persist backed catalog state"
994 );
995
996 let version_in_upgrade_shard =
998 fetch_catalog_upgrade_shard_version(&persist_client, upgrade_shard_id).await;
999 if let Some(version_in_upgrade_shard) = version_in_upgrade_shard {
1002 if mz_persist_client::cfg::check_data_version(&version_in_upgrade_shard, &version)
1010 .is_err()
1011 {
1012 return Err(DurableCatalogError::IncompatiblePersistVersion {
1013 found_version: version_in_upgrade_shard,
1014 catalog_version: version,
1015 });
1016 }
1017 }
1018
1019 let open_handles_start = Instant::now();
1020 info!("startup: envd serve: catalog init: open handles beginning");
1021 let since_handle = persist_client
1022 .open_critical_since(
1023 catalog_shard_id,
1024 PersistClient::CONTROLLER_CRITICAL_SINCE,
1027 Diagnostics {
1028 shard_name: CATALOG_SHARD_NAME.to_string(),
1029 handle_purpose: "durable catalog state critical since".to_string(),
1030 },
1031 )
1032 .await
1033 .expect("invalid usage");
1034 let (mut write_handle, mut read_handle) = persist_client
1035 .open(
1036 catalog_shard_id,
1037 Arc::new(desc()),
1038 Arc::new(UnitSchema::default()),
1039 Diagnostics {
1040 shard_name: CATALOG_SHARD_NAME.to_string(),
1041 handle_purpose: "durable catalog state handles".to_string(),
1042 },
1043 USE_CRITICAL_SINCE_CATALOG.get(persist_client.dyncfgs()),
1044 )
1045 .await
1046 .expect("invalid usage");
1047 info!(
1048 "startup: envd serve: catalog init: open handles complete in {:?}",
1049 open_handles_start.elapsed()
1050 );
1051
1052 let upper = {
1054 const EMPTY_UPDATES: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
1055 let upper = Antichain::from_elem(Timestamp::minimum());
1056 let next_upper = Timestamp::minimum().step_forward();
1057 match write_handle
1058 .compare_and_append(EMPTY_UPDATES, upper, Antichain::from_elem(next_upper))
1059 .await
1060 .expect("invalid usage")
1061 {
1062 Ok(()) => next_upper,
1063 Err(mismatch) => antichain_to_timestamp(mismatch.current),
1064 }
1065 };
1066
1067 let snapshot_start = Instant::now();
1068 info!("startup: envd serve: catalog init: snapshot beginning");
1069 let as_of = as_of(&read_handle, upper);
1070 let snapshot: Vec<_> = snapshot_binary(&mut read_handle, as_of, &metrics)
1071 .await
1072 .map(|StateUpdate { kind, ts, diff }| (kind, ts, diff))
1073 .collect();
1074 let listen = read_handle
1075 .listen(Antichain::from_elem(as_of))
1076 .await
1077 .expect("invalid usage");
1078 info!(
1079 "startup: envd serve: catalog init: snapshot complete in {:?}",
1080 snapshot_start.elapsed()
1081 );
1082
1083 let mut handle = UnopenedPersistCatalogState {
1084 mode: Mode::Writable,
1086 since_handle,
1087 write_handle,
1088 listen,
1089 persist_client,
1090 shard_id: catalog_shard_id,
1091 snapshot: Vec::new(),
1093 update_applier: UnopenedCatalogStateInner::new(organization_id),
1094 upper,
1095 fenceable_token: FenceableToken::new(deploy_generation),
1096 catalog_content_version: version,
1097 bootstrap_complete: false,
1098 metrics,
1099 };
1100 soft_assert_no_log!(
1103 snapshot.iter().all(|(_, _, diff)| *diff == Diff::ONE),
1104 "snapshot should be consolidated: {snapshot:#?}"
1105 );
1106
1107 let apply_start = Instant::now();
1108 info!("startup: envd serve: catalog init: apply updates beginning");
1109 let updates = snapshot
1110 .into_iter()
1111 .map(|(kind, ts, diff)| StateUpdate { kind, ts, diff });
1112 handle.apply_updates(updates)?;
1113 info!(
1114 "startup: envd serve: catalog init: apply updates complete in {:?}",
1115 apply_start.elapsed()
1116 );
1117
1118 if let Some(found_version) = handle.get_catalog_content_version().await? {
1124 if handle.catalog_content_version < found_version {
1125 return Err(DurableCatalogError::IncompatiblePersistVersion {
1126 found_version,
1127 catalog_version: handle.catalog_content_version,
1128 });
1129 }
1130 }
1131
1132 Ok(handle)
1133 }
1134
1135 #[mz_ore::instrument]
1136 async fn open_inner(
1137 mut self,
1138 mode: Mode,
1139 initial_ts: Timestamp,
1140 bootstrap_args: &BootstrapArgs,
1141 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1142 let mut commit_ts = self.upper;
1145 self.mode = mode;
1146
1147 match (&self.mode, &self.fenceable_token) {
1149 (_, FenceableToken::Unfenced { .. } | FenceableToken::Fenced { .. }) => {
1150 return Err(DurableCatalogError::Internal(
1151 "catalog should not have fenced before opening".to_string(),
1152 )
1153 .into());
1154 }
1155 (
1156 Mode::Writable | Mode::Savepoint,
1157 FenceableToken::Initializing {
1158 current_deploy_generation: None,
1159 ..
1160 },
1161 ) => {
1162 return Err(DurableCatalogError::Internal(format!(
1163 "cannot open in mode '{:?}' without a deploy generation",
1164 self.mode,
1165 ))
1166 .into());
1167 }
1168 _ => {}
1169 }
1170
1171 let read_only = matches!(self.mode, Mode::Readonly);
1172
1173 loop {
1175 self.sync_to_current_upper().await?;
1176 commit_ts = max(commit_ts, self.upper);
1177 let (fence_updates, current_fenceable_token) = self
1178 .fenceable_token
1179 .generate_unfenced_token(self.mode)?
1180 .ok_or_else(|| {
1181 DurableCatalogError::Internal(
1182 "catalog should not have fenced before opening".to_string(),
1183 )
1184 })?;
1185 debug!(
1186 ?self.upper,
1187 ?self.fenceable_token,
1188 ?current_fenceable_token,
1189 "fencing previous catalogs"
1190 );
1191 if matches!(self.mode, Mode::Writable) {
1192 match self
1193 .compare_and_append(fence_updates.clone(), commit_ts)
1194 .await
1195 {
1196 Ok(upper) => {
1197 commit_ts = upper;
1198 }
1199 Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
1200 Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
1201 warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
1202 continue;
1203 }
1204 }
1205 }
1206 self.fenceable_token = current_fenceable_token;
1207 break;
1208 }
1209
1210 let is_initialized = self.is_initialized_inner();
1211 if !matches!(self.mode, Mode::Writable) && !is_initialized {
1212 return Err(CatalogError::Durable(DurableCatalogError::NotWritable(
1213 format!(
1214 "catalog tables do not exist; will not create in {:?} mode",
1215 self.mode
1216 ),
1217 )));
1218 }
1219 soft_assert_ne_or_log!(self.upper, Timestamp::minimum());
1220
1221 let (audit_logs, snapshot): (Vec<_>, Vec<_>) = self
1223 .snapshot
1224 .into_iter()
1225 .partition(|(update, _, _)| update.is_audit_log());
1226 self.snapshot = snapshot;
1227 let audit_log_count = audit_logs.iter().map(|(_, _, diff)| diff).sum::<Diff>();
1228 let audit_log_handle = AuditLogIterator::new(audit_logs);
1229
1230 if is_initialized && !read_only {
1232 commit_ts = upgrade(&mut self, commit_ts).await?;
1233 }
1234
1235 debug!(
1236 ?is_initialized,
1237 ?self.upper,
1238 "initializing catalog state"
1239 );
1240 let mut catalog = PersistCatalogState {
1241 mode: self.mode,
1242 since_handle: self.since_handle,
1243 write_handle: self.write_handle,
1244 listen: self.listen,
1245 persist_client: self.persist_client,
1246 shard_id: self.shard_id,
1247 upper: self.upper,
1248 fenceable_token: self.fenceable_token,
1249 snapshot: Vec::new(),
1251 update_applier: CatalogStateInner::new(),
1252 catalog_content_version: self.catalog_content_version,
1253 bootstrap_complete: false,
1254 metrics: self.metrics,
1255 };
1256 catalog.metrics.collection_entries.reset();
1257 catalog
1260 .metrics
1261 .collection_entries
1262 .with_label_values(&[&CollectionType::AuditLog.to_string()])
1263 .add(audit_log_count.into_inner());
1264 let updates = self.snapshot.into_iter().map(|(kind, ts, diff)| {
1265 let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
1266 StateUpdate { kind, ts, diff }
1267 });
1268 catalog.apply_updates(updates)?;
1269
1270 let catalog_content_version = catalog.catalog_content_version.to_string();
1271 let txn = if is_initialized {
1272 let mut txn = catalog.transaction().await?;
1273 txn.set_catalog_content_version(catalog_content_version)?;
1274 txn
1275 } else {
1276 soft_assert_eq_no_log!(
1277 catalog
1278 .snapshot
1279 .iter()
1280 .filter(|(kind, _, _)| !matches!(kind, StateUpdateKind::FenceToken(_)))
1281 .count(),
1282 0,
1283 "trace should not contain any updates for an uninitialized catalog: {:#?}",
1284 catalog.snapshot
1285 );
1286
1287 let mut txn = catalog.transaction().await?;
1288 initialize::initialize(
1289 &mut txn,
1290 bootstrap_args,
1291 initial_ts.into(),
1292 catalog_content_version,
1293 )
1294 .await?;
1295 txn
1296 };
1297
1298 if read_only {
1299 let (txn_batch, _) = txn.into_parts();
1300 let updates = StateUpdate::from_txn_batch_ts(txn_batch, catalog.upper);
1302 catalog.apply_updates(updates)?;
1303 } else {
1304 txn.commit_internal(commit_ts).await?;
1305 }
1306
1307 if matches!(catalog.mode, Mode::Writable) {
1311 catalog
1312 .increment_catalog_upgrade_shard_version(self.update_applier.organization_id)
1313 .await;
1314 let write_handle = catalog
1315 .persist_client
1316 .open_writer::<SourceData, (), Timestamp, i64>(
1317 catalog.write_handle.shard_id(),
1318 Arc::new(desc()),
1319 Arc::new(UnitSchema::default()),
1320 Diagnostics {
1321 shard_name: CATALOG_SHARD_NAME.to_string(),
1322 handle_purpose: "compact catalog".to_string(),
1323 },
1324 )
1325 .await
1326 .expect("invalid usage");
1327 let fuel = CATALOG_FORCE_COMPACTION_FUEL.handle(catalog.persist_client.dyncfgs());
1328 let wait = CATALOG_FORCE_COMPACTION_WAIT.handle(catalog.persist_client.dyncfgs());
1329 let _task = mz_ore::task::spawn(|| "catalog::force_shard_compaction", async move {
1332 let () =
1333 mz_persist_client::cli::admin::dangerous_force_compaction_and_break_pushdown(
1334 &write_handle,
1335 || fuel.get(),
1336 || wait.get(),
1337 )
1338 .await;
1339 });
1340 }
1341
1342 Ok((Box::new(catalog), audit_log_handle))
1343 }
1344
1345 #[mz_ore::instrument]
1350 fn is_initialized_inner(&self) -> bool {
1351 !self.update_applier.configs.is_empty()
1352 }
1353
1354 #[mz_ore::instrument]
1358 async fn get_current_config(&mut self, key: &str) -> Result<Option<u64>, DurableCatalogError> {
1359 self.sync_to_current_upper().await?;
1360 Ok(self.update_applier.configs.get(key).cloned())
1361 }
1362
1363 #[mz_ore::instrument]
1367 pub(crate) async fn get_user_version(&mut self) -> Result<Option<u64>, DurableCatalogError> {
1368 self.get_current_config(USER_VERSION_KEY).await
1369 }
1370
1371 #[mz_ore::instrument]
1375 async fn get_current_setting(
1376 &mut self,
1377 name: &str,
1378 ) -> Result<Option<String>, DurableCatalogError> {
1379 self.sync_to_current_upper().await?;
1380 Ok(self.update_applier.settings.get(name).cloned())
1381 }
1382
1383 #[mz_ore::instrument]
1388 async fn get_catalog_content_version(
1389 &mut self,
1390 ) -> Result<Option<semver::Version>, DurableCatalogError> {
1391 let version = self
1392 .get_current_setting(CATALOG_CONTENT_VERSION_KEY)
1393 .await?;
1394 let version = version.map(|version| version.parse().expect("invalid version persisted"));
1395 Ok(version)
1396 }
1397}
1398
1399#[async_trait]
1400impl OpenableDurableCatalogState for UnopenedPersistCatalogState {
1401 #[mz_ore::instrument]
1402 async fn open_savepoint(
1403 mut self: Box<Self>,
1404 initial_ts: Timestamp,
1405 bootstrap_args: &BootstrapArgs,
1406 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1407 self.open_inner(Mode::Savepoint, initial_ts, bootstrap_args)
1408 .boxed()
1409 .await
1410 }
1411
1412 #[mz_ore::instrument]
1413 async fn open_read_only(
1414 mut self: Box<Self>,
1415 bootstrap_args: &BootstrapArgs,
1416 ) -> Result<Box<dyn DurableCatalogState>, CatalogError> {
1417 self.open_inner(Mode::Readonly, EpochMillis::MIN.into(), bootstrap_args)
1418 .boxed()
1419 .await
1420 .map(|(catalog, _)| catalog)
1421 }
1422
1423 #[mz_ore::instrument]
1424 async fn open(
1425 mut self: Box<Self>,
1426 initial_ts: Timestamp,
1427 bootstrap_args: &BootstrapArgs,
1428 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1429 self.open_inner(Mode::Writable, initial_ts, bootstrap_args)
1430 .boxed()
1431 .await
1432 }
1433
1434 #[mz_ore::instrument(level = "debug")]
1435 async fn open_debug(mut self: Box<Self>) -> Result<DebugCatalogState, CatalogError> {
1436 Ok(DebugCatalogState(*self))
1437 }
1438
1439 #[mz_ore::instrument]
1440 async fn is_initialized(&mut self) -> Result<bool, CatalogError> {
1441 self.sync_to_current_upper().await?;
1442 Ok(self.is_initialized_inner())
1443 }
1444
1445 #[mz_ore::instrument]
1446 async fn epoch(&mut self) -> Result<Epoch, CatalogError> {
1447 self.sync_to_current_upper().await?;
1448 self.fenceable_token
1449 .validate()?
1450 .map(|token| token.epoch)
1451 .ok_or(CatalogError::Durable(DurableCatalogError::Uninitialized))
1452 }
1453
1454 #[mz_ore::instrument]
1455 async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError> {
1456 self.sync_to_current_upper().await?;
1457 self.fenceable_token
1458 .token()
1459 .map(|token| token.deploy_generation)
1460 .ok_or(CatalogError::Durable(DurableCatalogError::Uninitialized))
1461 }
1462
1463 #[mz_ore::instrument(level = "debug")]
1464 async fn get_enable_0dt_deployment(&mut self) -> Result<Option<bool>, CatalogError> {
1465 let value = self.get_current_config(ENABLE_0DT_DEPLOYMENT).await?;
1466 match value {
1467 None => Ok(None),
1468 Some(0) => Ok(Some(false)),
1469 Some(1) => Ok(Some(true)),
1470 Some(v) => Err(
1471 DurableCatalogError::from(TryFromProtoError::UnknownEnumVariant(format!(
1472 "{v} is not a valid boolean value"
1473 )))
1474 .into(),
1475 ),
1476 }
1477 }
1478
1479 #[mz_ore::instrument(level = "debug")]
1480 async fn get_0dt_deployment_max_wait(&mut self) -> Result<Option<Duration>, CatalogError> {
1481 let value = self
1482 .get_current_config(WITH_0DT_DEPLOYMENT_MAX_WAIT)
1483 .await?;
1484 match value {
1485 None => Ok(None),
1486 Some(millis) => Ok(Some(Duration::from_millis(millis))),
1487 }
1488 }
1489
1490 #[mz_ore::instrument(level = "debug")]
1491 async fn get_0dt_deployment_ddl_check_interval(
1492 &mut self,
1493 ) -> Result<Option<Duration>, CatalogError> {
1494 let value = self
1495 .get_current_config(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL)
1496 .await?;
1497 match value {
1498 None => Ok(None),
1499 Some(millis) => Ok(Some(Duration::from_millis(millis))),
1500 }
1501 }
1502
1503 #[mz_ore::instrument(level = "debug")]
1504 async fn get_enable_0dt_deployment_panic_after_timeout(
1505 &mut self,
1506 ) -> Result<Option<bool>, CatalogError> {
1507 let value = self
1508 .get_current_config(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT)
1509 .await?;
1510 match value {
1511 None => Ok(None),
1512 Some(0) => Ok(Some(false)),
1513 Some(1) => Ok(Some(true)),
1514 Some(v) => Err(
1515 DurableCatalogError::from(TryFromProtoError::UnknownEnumVariant(format!(
1516 "{v} is not a valid boolean value"
1517 )))
1518 .into(),
1519 ),
1520 }
1521 }
1522
1523 #[mz_ore::instrument]
1524 async fn has_system_config_synced_once(&mut self) -> Result<bool, DurableCatalogError> {
1525 self.get_current_config(SYSTEM_CONFIG_SYNCED_KEY)
1526 .await
1527 .map(|value| value.map(|value| value > 0).unwrap_or(false))
1528 }
1529
1530 #[mz_ore::instrument]
1531 async fn trace_unconsolidated(&mut self) -> Result<Trace, CatalogError> {
1532 self.sync_to_current_upper().await?;
1533 if self.is_initialized_inner() {
1534 let snapshot = self.snapshot_unconsolidated().await;
1535 Ok(Trace::from_snapshot(snapshot))
1536 } else {
1537 Err(CatalogError::Durable(DurableCatalogError::Uninitialized))
1538 }
1539 }
1540
1541 #[mz_ore::instrument]
1542 async fn trace_consolidated(&mut self) -> Result<Trace, CatalogError> {
1543 self.sync_to_current_upper().await?;
1544 if self.is_initialized_inner() {
1545 let snapshot = self.current_snapshot().await?;
1546 Ok(Trace::from_snapshot(snapshot))
1547 } else {
1548 Err(CatalogError::Durable(DurableCatalogError::Uninitialized))
1549 }
1550 }
1551
1552 #[mz_ore::instrument(level = "debug")]
1553 async fn expire(self: Box<Self>) {
1554 self.expire().await
1555 }
1556}
1557
1558#[derive(Debug)]
1560struct CatalogStateInner {
1561 updates: VecDeque<memory::objects::StateUpdate>,
1563}
1564
1565impl CatalogStateInner {
1566 fn new() -> CatalogStateInner {
1567 CatalogStateInner {
1568 updates: VecDeque::new(),
1569 }
1570 }
1571}
1572
1573impl ApplyUpdate<StateUpdateKind> for CatalogStateInner {
1574 fn apply_update(
1575 &mut self,
1576 update: StateUpdate<StateUpdateKind>,
1577 current_fence_token: &mut FenceableToken,
1578 metrics: &Arc<Metrics>,
1579 ) -> Result<Option<StateUpdate<StateUpdateKind>>, FenceError> {
1580 if let Some(collection_type) = update.kind.collection_type() {
1581 metrics
1582 .collection_entries
1583 .with_label_values(&[&collection_type.to_string()])
1584 .add(update.diff.into_inner());
1585 }
1586
1587 {
1588 let update: Option<memory::objects::StateUpdate> = (&update)
1589 .try_into()
1590 .expect("invalid persisted update: {update:#?}");
1591 if let Some(update) = update {
1592 self.updates.push_back(update);
1593 }
1594 }
1595
1596 match (update.kind, update.diff) {
1597 (StateUpdateKind::AuditLog(_, ()), _) => Ok(None),
1598 (StateUpdateKind::FenceToken(_), Diff::MINUS_ONE) => Ok(None),
1600 (StateUpdateKind::FenceToken(token), Diff::ONE) => {
1601 current_fence_token.maybe_fence(token)?;
1602 Ok(None)
1603 }
1604 (kind, diff) => Ok(Some(StateUpdate {
1605 kind,
1606 ts: update.ts,
1607 diff,
1608 })),
1609 }
1610 }
1611}
1612
1613type PersistCatalogState = PersistHandle<StateUpdateKind, CatalogStateInner>;
1619
1620#[async_trait]
1621impl ReadOnlyDurableCatalogState for PersistCatalogState {
1622 fn epoch(&self) -> Epoch {
1623 self.fenceable_token
1624 .token()
1625 .expect("opened catalog state must have an epoch")
1626 .epoch
1627 }
1628
1629 #[mz_ore::instrument(level = "debug")]
1630 async fn expire(self: Box<Self>) {
1631 self.expire().await
1632 }
1633
1634 fn is_bootstrap_complete(&self) -> bool {
1635 self.bootstrap_complete
1636 }
1637
1638 async fn get_audit_logs(&mut self) -> Result<Vec<VersionedEvent>, CatalogError> {
1639 self.sync_to_current_upper().await?;
1640 let audit_logs: Vec<_> = self
1641 .persist_snapshot()
1642 .await
1643 .filter_map(
1644 |StateUpdate {
1645 kind,
1646 ts: _,
1647 diff: _,
1648 }| match kind {
1649 StateUpdateKind::AuditLog(key, ()) => Some(key),
1650 _ => None,
1651 },
1652 )
1653 .collect();
1654 let mut audit_logs: Vec<_> = audit_logs
1655 .into_iter()
1656 .map(RustType::from_proto)
1657 .map_ok(|key: AuditLogKey| key.event)
1658 .collect::<Result<_, _>>()?;
1659 audit_logs.sort_by(|a, b| a.sortable_id().cmp(&b.sortable_id()));
1660 Ok(audit_logs)
1661 }
1662
1663 #[mz_ore::instrument(level = "debug")]
1664 async fn get_next_id(&mut self, id_type: &str) -> Result<u64, CatalogError> {
1665 self.with_trace(|trace| {
1666 Ok(trace
1667 .into_iter()
1668 .rev()
1669 .filter_map(|(kind, _, _)| match kind {
1670 StateUpdateKind::IdAllocator(key, value) if key.name == id_type => {
1671 Some(value.next_id)
1672 }
1673 _ => None,
1674 })
1675 .next()
1676 .expect("must exist"))
1677 })
1678 .await
1679 }
1680
1681 #[mz_ore::instrument(level = "debug")]
1682 async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError> {
1683 self.sync_to_current_upper().await?;
1684 Ok(self
1685 .fenceable_token
1686 .token()
1687 .expect("opened catalogs must have a token")
1688 .deploy_generation)
1689 }
1690
1691 #[mz_ore::instrument(level = "debug")]
1692 async fn snapshot(&mut self) -> Result<Snapshot, CatalogError> {
1693 self.with_snapshot(Ok).await
1694 }
1695
1696 #[mz_ore::instrument(level = "debug")]
1697 async fn sync_to_current_updates(
1698 &mut self,
1699 ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError> {
1700 let upper = self.current_upper().await;
1701 self.sync_updates(upper).await
1702 }
1703
1704 #[mz_ore::instrument(level = "debug")]
1705 async fn sync_updates(
1706 &mut self,
1707 target_upper: mz_repr::Timestamp,
1708 ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError> {
1709 self.sync(target_upper).await?;
1710 let mut updates = Vec::new();
1711 while let Some(update) = self.update_applier.updates.front() {
1712 if update.ts >= target_upper {
1713 break;
1714 }
1715
1716 let update = self
1717 .update_applier
1718 .updates
1719 .pop_front()
1720 .expect("peeked above");
1721 updates.push(update);
1722 }
1723 Ok(updates)
1724 }
1725
1726 async fn current_upper(&mut self) -> Timestamp {
1727 self.current_upper().await
1728 }
1729}
1730
1731#[async_trait]
1732#[allow(elided_named_lifetimes)]
1733impl DurableCatalogState for PersistCatalogState {
1734 fn is_read_only(&self) -> bool {
1735 matches!(self.mode, Mode::Readonly)
1736 }
1737
1738 fn is_savepoint(&self) -> bool {
1739 matches!(self.mode, Mode::Savepoint)
1740 }
1741
1742 fn mark_bootstrap_complete(&mut self) {
1743 self.bootstrap_complete = true;
1744 }
1745
1746 #[mz_ore::instrument(level = "debug")]
1747 async fn transaction(&mut self) -> Result<Transaction, CatalogError> {
1748 self.metrics.transactions_started.inc();
1749 let snapshot = self.snapshot().await?;
1750 let commit_ts = self.upper.clone();
1751 Transaction::new(self, snapshot, commit_ts)
1752 }
1753
1754 #[mz_ore::instrument(level = "debug")]
1755 async fn commit_transaction(
1756 &mut self,
1757 txn_batch: TransactionBatch,
1758 commit_ts: Timestamp,
1759 ) -> Result<Timestamp, CatalogError> {
1760 async fn commit_transaction_inner(
1761 catalog: &mut PersistCatalogState,
1762 txn_batch: TransactionBatch,
1763 commit_ts: Timestamp,
1764 ) -> Result<Timestamp, CatalogError> {
1765 assert_eq!(
1770 catalog.upper, txn_batch.upper,
1771 "only one transaction at a time is supported"
1772 );
1773
1774 assert!(
1775 commit_ts >= catalog.upper,
1776 "expected commit ts, {}, to be greater than or equal to upper, {}",
1777 commit_ts,
1778 catalog.upper
1779 );
1780
1781 let updates = StateUpdate::from_txn_batch(txn_batch).collect();
1782 debug!("committing updates: {updates:?}");
1783
1784 let next_upper = match catalog.mode {
1785 Mode::Writable => catalog
1786 .compare_and_append(updates, commit_ts)
1787 .await
1788 .map_err(|e| e.unwrap_fence_error())?,
1789 Mode::Savepoint => {
1790 let updates = updates.into_iter().map(|(kind, diff)| StateUpdate {
1791 kind,
1792 ts: commit_ts,
1793 diff,
1794 });
1795 catalog.apply_updates(updates)?;
1796 catalog.upper = commit_ts.step_forward();
1797 catalog.upper
1798 }
1799 Mode::Readonly => {
1800 if !updates.is_empty() {
1804 return Err(DurableCatalogError::NotWritable(format!(
1805 "cannot commit a transaction in a read-only catalog: {updates:#?}"
1806 ))
1807 .into());
1808 }
1809 catalog.upper
1810 }
1811 };
1812
1813 Ok(next_upper)
1814 }
1815 self.metrics.transaction_commits.inc();
1816 let counter = self.metrics.transaction_commit_latency_seconds.clone();
1817 commit_transaction_inner(self, txn_batch, commit_ts)
1818 .wall_time()
1819 .inc_by(counter)
1820 .await
1821 }
1822
1823 #[mz_ore::instrument(level = "debug")]
1824 async fn confirm_leadership(&mut self) -> Result<(), CatalogError> {
1825 if self.is_read_only() {
1827 return Ok(());
1828 }
1829 self.sync_to_current_upper().await?;
1830 Ok(())
1831 }
1832}
1833
1834pub fn shard_id(organization_id: Uuid, seed: usize) -> ShardId {
1836 let hash = sha2::Sha256::digest(format!("{organization_id}{seed}")).to_vec();
1837 soft_assert_eq_or_log!(hash.len(), 32, "SHA256 returns 32 bytes (256 bits)");
1838 let uuid = Uuid::from_slice(&hash[0..16]).expect("from_slice accepts exactly 16 bytes");
1839 ShardId::from_str(&format!("s{uuid}")).expect("known to be valid")
1840}
1841
1842fn desc() -> RelationDesc {
1845 RelationDesc::builder()
1846 .with_column("data", ScalarType::Jsonb.nullable(false))
1847 .finish()
1848}
1849
1850fn as_of(
1853 read_handle: &ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1854 upper: Timestamp,
1855) -> Timestamp {
1856 let since = read_handle.since().clone();
1857 let mut as_of = upper.checked_sub(1).unwrap_or_else(|| {
1858 panic!("catalog persist shard should be initialize, found upper: {upper:?}")
1859 });
1860 soft_assert_or_log!(
1863 since.less_equal(&as_of),
1864 "since={since:?}, as_of={as_of:?}; since must be less than or equal to as_of"
1865 );
1866 as_of.advance_by(since.borrow());
1869 as_of
1870}
1871
1872async fn fetch_catalog_upgrade_shard_version(
1875 persist_client: &PersistClient,
1876 upgrade_shard_id: ShardId,
1877) -> Option<semver::Version> {
1878 let shard_state = persist_client
1879 .inspect_shard::<Timestamp>(&upgrade_shard_id)
1880 .await
1881 .ok()?;
1882 let json_state = serde_json::to_value(shard_state).expect("state serialization error");
1883 let upgrade_version = json_state
1884 .get("applier_version")
1885 .cloned()
1886 .expect("missing applier_version");
1887 let upgrade_version =
1888 serde_json::from_value(upgrade_version).expect("version deserialization error");
1889 Some(upgrade_version)
1890}
1891
1892#[mz_ore::instrument(level = "debug")]
1897async fn snapshot_binary(
1898 read_handle: &mut ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1899 as_of: Timestamp,
1900 metrics: &Arc<Metrics>,
1901) -> impl Iterator<Item = StateUpdate<StateUpdateKindJson>> + DoubleEndedIterator + use<> {
1902 metrics.snapshots_taken.inc();
1903 let counter = metrics.snapshot_latency_seconds.clone();
1904 snapshot_binary_inner(read_handle, as_of)
1905 .wall_time()
1906 .inc_by(counter)
1907 .await
1908}
1909
1910#[mz_ore::instrument(level = "debug")]
1915async fn snapshot_binary_inner(
1916 read_handle: &mut ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1917 as_of: Timestamp,
1918) -> impl Iterator<Item = StateUpdate<StateUpdateKindJson>> + DoubleEndedIterator + use<> {
1919 let snapshot = read_handle
1920 .snapshot_and_fetch(Antichain::from_elem(as_of))
1921 .await
1922 .expect("we have advanced the restart_as_of by the since");
1923 soft_assert_no_log!(
1924 snapshot.iter().all(|(_, _, diff)| *diff == 1),
1925 "snapshot_and_fetch guarantees a consolidated result: {snapshot:#?}"
1926 );
1927 snapshot
1928 .into_iter()
1929 .map(Into::<StateUpdate<StateUpdateKindJson>>::into)
1930 .sorted_by(|a, b| Ord::cmp(&b.ts, &a.ts))
1931}
1932
1933pub(crate) fn antichain_to_timestamp(antichain: Antichain<Timestamp>) -> Timestamp {
1938 antichain
1939 .into_option()
1940 .expect("we use a totally ordered time and never finalize the shard")
1941}
1942
1943impl Trace {
1946 fn from_snapshot(snapshot: impl IntoIterator<Item = StateUpdate>) -> Trace {
1948 let mut trace = Trace::new();
1949 for StateUpdate { kind, ts, diff } in snapshot {
1950 match kind {
1951 StateUpdateKind::AuditLog(k, v) => trace.audit_log.values.push(((k, v), ts, diff)),
1952 StateUpdateKind::Cluster(k, v) => trace.clusters.values.push(((k, v), ts, diff)),
1953 StateUpdateKind::ClusterReplica(k, v) => {
1954 trace.cluster_replicas.values.push(((k, v), ts, diff))
1955 }
1956 StateUpdateKind::Comment(k, v) => trace.comments.values.push(((k, v), ts, diff)),
1957 StateUpdateKind::Config(k, v) => trace.configs.values.push(((k, v), ts, diff)),
1958 StateUpdateKind::Database(k, v) => trace.databases.values.push(((k, v), ts, diff)),
1959 StateUpdateKind::DefaultPrivilege(k, v) => {
1960 trace.default_privileges.values.push(((k, v), ts, diff))
1961 }
1962 StateUpdateKind::FenceToken(_) => {
1963 }
1965 StateUpdateKind::IdAllocator(k, v) => {
1966 trace.id_allocator.values.push(((k, v), ts, diff))
1967 }
1968 StateUpdateKind::IntrospectionSourceIndex(k, v) => {
1969 trace.introspection_sources.values.push(((k, v), ts, diff))
1970 }
1971 StateUpdateKind::Item(k, v) => trace.items.values.push(((k, v), ts, diff)),
1972 StateUpdateKind::NetworkPolicy(k, v) => {
1973 trace.network_policies.values.push(((k, v), ts, diff))
1974 }
1975 StateUpdateKind::Role(k, v) => trace.roles.values.push(((k, v), ts, diff)),
1976 StateUpdateKind::Schema(k, v) => trace.schemas.values.push(((k, v), ts, diff)),
1977 StateUpdateKind::Setting(k, v) => trace.settings.values.push(((k, v), ts, diff)),
1978 StateUpdateKind::SourceReferences(k, v) => {
1979 trace.source_references.values.push(((k, v), ts, diff))
1980 }
1981 StateUpdateKind::SystemConfiguration(k, v) => {
1982 trace.system_configurations.values.push(((k, v), ts, diff))
1983 }
1984 StateUpdateKind::SystemObjectMapping(k, v) => {
1985 trace.system_object_mappings.values.push(((k, v), ts, diff))
1986 }
1987 StateUpdateKind::SystemPrivilege(k, v) => {
1988 trace.system_privileges.values.push(((k, v), ts, diff))
1989 }
1990 StateUpdateKind::StorageCollectionMetadata(k, v) => trace
1991 .storage_collection_metadata
1992 .values
1993 .push(((k, v), ts, diff)),
1994 StateUpdateKind::UnfinalizedShard(k, ()) => {
1995 trace.unfinalized_shards.values.push(((k, ()), ts, diff))
1996 }
1997 StateUpdateKind::TxnWalShard((), v) => {
1998 trace.txn_wal_shard.values.push((((), v), ts, diff))
1999 }
2000 StateUpdateKind::RoleAuth(k, v) => trace.role_auth.values.push(((k, v), ts, diff)),
2001 }
2002 }
2003 trace
2004 }
2005}
2006
2007impl UnopenedPersistCatalogState {
2008 #[mz_ore::instrument]
2010 pub(crate) async fn debug_edit<T: Collection>(
2011 &mut self,
2012 key: T::Key,
2013 value: T::Value,
2014 ) -> Result<Option<T::Value>, CatalogError>
2015 where
2016 T::Key: PartialEq + Eq + Debug + Clone,
2017 T::Value: Debug + Clone,
2018 {
2019 let prev_value = loop {
2020 let key = key.clone();
2021 let value = value.clone();
2022 let snapshot = self.current_snapshot().await?;
2023 let trace = Trace::from_snapshot(snapshot);
2024 let collection_trace = T::collection_trace(trace);
2025 let prev_values: Vec<_> = collection_trace
2026 .values
2027 .into_iter()
2028 .filter(|((k, _), _, diff)| {
2029 soft_assert_eq_or_log!(*diff, Diff::ONE, "trace is consolidated");
2030 &key == k
2031 })
2032 .collect();
2033
2034 let prev_value = match &prev_values[..] {
2035 [] => None,
2036 [((_, v), _, _)] => Some(v.clone()),
2037 prev_values => panic!("multiple values found for key {key:?}: {prev_values:?}"),
2038 };
2039
2040 let mut updates: Vec<_> = prev_values
2041 .into_iter()
2042 .map(|((k, v), _, _)| (T::update(k, v), Diff::MINUS_ONE))
2043 .collect();
2044 updates.push((T::update(key, value), Diff::ONE));
2045 match self.fenceable_token.generate_unfenced_token(self.mode)? {
2047 Some((fence_updates, current_fenceable_token)) => {
2048 updates.extend(fence_updates.clone());
2049 match self.compare_and_append(updates, self.upper).await {
2050 Ok(_) => {
2051 self.fenceable_token = current_fenceable_token;
2052 break prev_value;
2053 }
2054 Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
2055 Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
2056 warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
2057 continue;
2058 }
2059 }
2060 }
2061 None => {
2062 self.compare_and_append(updates, self.upper)
2063 .await
2064 .map_err(|e| e.unwrap_fence_error())?;
2065 break prev_value;
2066 }
2067 }
2068 };
2069 Ok(prev_value)
2070 }
2071
2072 #[mz_ore::instrument]
2074 pub(crate) async fn debug_delete<T: Collection>(
2075 &mut self,
2076 key: T::Key,
2077 ) -> Result<(), CatalogError>
2078 where
2079 T::Key: PartialEq + Eq + Debug + Clone,
2080 T::Value: Debug,
2081 {
2082 loop {
2083 let key = key.clone();
2084 let snapshot = self.current_snapshot().await?;
2085 let trace = Trace::from_snapshot(snapshot);
2086 let collection_trace = T::collection_trace(trace);
2087 let mut retractions: Vec<_> = collection_trace
2088 .values
2089 .into_iter()
2090 .filter(|((k, _), _, diff)| {
2091 soft_assert_eq_or_log!(*diff, Diff::ONE, "trace is consolidated");
2092 &key == k
2093 })
2094 .map(|((k, v), _, _)| (T::update(k, v), Diff::MINUS_ONE))
2095 .collect();
2096
2097 match self.fenceable_token.generate_unfenced_token(self.mode)? {
2099 Some((fence_updates, current_fenceable_token)) => {
2100 retractions.extend(fence_updates.clone());
2101 match self.compare_and_append(retractions, self.upper).await {
2102 Ok(_) => {
2103 self.fenceable_token = current_fenceable_token;
2104 break;
2105 }
2106 Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
2107 Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
2108 warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
2109 continue;
2110 }
2111 }
2112 }
2113 None => {
2114 self.compare_and_append(retractions, self.upper)
2115 .await
2116 .map_err(|e| e.unwrap_fence_error())?;
2117 break;
2118 }
2119 }
2120 }
2121 Ok(())
2122 }
2123
2124 async fn current_snapshot(
2129 &mut self,
2130 ) -> Result<impl IntoIterator<Item = StateUpdate> + '_, CatalogError> {
2131 self.sync_to_current_upper().await?;
2132 self.consolidate();
2133 Ok(self.snapshot.iter().cloned().map(|(kind, ts, diff)| {
2134 let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
2135 StateUpdate { kind, ts, diff }
2136 }))
2137 }
2138}