1#[cfg(test)]
11mod tests;
12
13use std::cmp::max;
14use std::collections::{BTreeMap, VecDeque};
15use std::fmt::Debug;
16use std::str::FromStr;
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use async_trait::async_trait;
21use differential_dataflow::lattice::Lattice;
22use futures::{FutureExt, StreamExt};
23use itertools::Itertools;
24use mz_audit_log::VersionedEvent;
25use mz_ore::metrics::MetricsFutureExt;
26use mz_ore::now::EpochMillis;
27use mz_ore::{
28 soft_assert_eq_no_log, soft_assert_eq_or_log, soft_assert_ne_or_log, soft_assert_no_log,
29 soft_assert_or_log, soft_panic_or_log,
30};
31use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
32use mz_persist_client::cli::admin::{CATALOG_FORCE_COMPACTION_FUEL, CATALOG_FORCE_COMPACTION_WAIT};
33use mz_persist_client::critical::SinceHandle;
34use mz_persist_client::error::UpperMismatch;
35use mz_persist_client::read::{Listen, ListenEvent, ReadHandle};
36use mz_persist_client::write::WriteHandle;
37use mz_persist_client::{Diagnostics, PersistClient, ShardId};
38use mz_persist_types::codec_impls::UnitSchema;
39use mz_proto::{RustType, TryFromProtoError};
40use mz_repr::{Diff, RelationDesc, SqlScalarType};
41use mz_storage_types::StorageDiff;
42use mz_storage_types::sources::SourceData;
43use sha2::Digest;
44use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
45use tracing::{debug, info, warn};
46use uuid::Uuid;
47
48use crate::durable::debug::{Collection, CollectionType, DebugCatalogState, Trace};
49use crate::durable::error::FenceError;
50use crate::durable::initialize::{
51 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, SYSTEM_CONFIG_SYNCED_KEY, USER_VERSION_KEY,
52 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
53};
54use crate::durable::metrics::Metrics;
55use crate::durable::objects::state_update::{
56 IntoStateUpdateKindJson, StateUpdate, StateUpdateKind, StateUpdateKindJson,
57 TryIntoStateUpdateKind,
58};
59use crate::durable::objects::{AuditLogKey, FenceToken, Snapshot};
60use crate::durable::transaction::TransactionBatch;
61use crate::durable::upgrade::upgrade;
62use crate::durable::{
63 AuditLogIterator, BootstrapArgs, CATALOG_CONTENT_VERSION_KEY, CatalogError,
64 DurableCatalogError, DurableCatalogState, Epoch, OpenableDurableCatalogState,
65 ReadOnlyDurableCatalogState, Transaction, initialize,
66};
67use crate::memory;
68
69pub(crate) type Timestamp = mz_repr::Timestamp;
71
72const MIN_EPOCH: Epoch = unsafe { Epoch::new_unchecked(1) };
77
78const CATALOG_SHARD_NAME: &str = "catalog";
80const UPGRADE_SHARD_NAME: &str = "catalog_upgrade";
82
83const CATALOG_SEED: usize = 1;
85const UPGRADE_SEED: usize = 2;
116pub const _BUILTIN_MIGRATION_SEED: usize = 3;
118pub const _EXPRESSION_CACHE_SEED: usize = 4;
120
121#[derive(Debug, Copy, Clone, Eq, PartialEq)]
123pub(crate) enum Mode {
124 Readonly,
126 Savepoint,
128 Writable,
130}
131
132#[derive(Debug)]
134pub(crate) enum FenceableToken {
135 Initializing {
138 durable_token: Option<FenceToken>,
140 current_deploy_generation: Option<u64>,
142 },
143 Unfenced { current_token: FenceToken },
145 Fenced {
147 current_token: FenceToken,
148 fence_token: FenceToken,
149 },
150}
151
152impl FenceableToken {
153 fn new(current_deploy_generation: Option<u64>) -> Self {
155 Self::Initializing {
156 durable_token: None,
157 current_deploy_generation,
158 }
159 }
160
161 fn validate(&self) -> Result<Option<FenceToken>, FenceError> {
163 match self {
164 FenceableToken::Initializing { durable_token, .. } => Ok(durable_token.clone()),
165 FenceableToken::Unfenced { current_token, .. } => Ok(Some(current_token.clone())),
166 FenceableToken::Fenced {
167 current_token,
168 fence_token,
169 } => {
170 assert!(
171 fence_token > current_token,
172 "must be fenced by higher token; current={current_token:?}, fence={fence_token:?}"
173 );
174 if fence_token.deploy_generation > current_token.deploy_generation {
175 Err(FenceError::DeployGeneration {
176 current_generation: current_token.deploy_generation,
177 fence_generation: fence_token.deploy_generation,
178 })
179 } else {
180 assert!(
181 fence_token.epoch > current_token.epoch,
182 "must be fenced by higher token; current={current_token:?}, fence={fence_token:?}"
183 );
184 Err(FenceError::Epoch {
185 current_epoch: current_token.epoch,
186 fence_epoch: fence_token.epoch,
187 })
188 }
189 }
190 }
191 }
192
193 fn token(&self) -> Option<FenceToken> {
195 match self {
196 FenceableToken::Initializing { durable_token, .. } => durable_token.clone(),
197 FenceableToken::Unfenced { current_token, .. } => Some(current_token.clone()),
198 FenceableToken::Fenced { current_token, .. } => Some(current_token.clone()),
199 }
200 }
201
202 fn maybe_fence(&mut self, token: FenceToken) -> Result<(), FenceError> {
204 match self {
205 FenceableToken::Initializing {
206 durable_token,
207 current_deploy_generation,
208 ..
209 } => {
210 match durable_token {
211 Some(durable_token) => {
212 *durable_token = max(durable_token.clone(), token.clone());
213 }
214 None => {
215 *durable_token = Some(token.clone());
216 }
217 }
218 if let Some(current_deploy_generation) = current_deploy_generation {
219 if *current_deploy_generation < token.deploy_generation {
220 *self = FenceableToken::Fenced {
221 current_token: FenceToken {
222 deploy_generation: *current_deploy_generation,
223 epoch: token.epoch,
224 },
225 fence_token: token,
226 };
227 self.validate()?;
228 }
229 }
230 }
231 FenceableToken::Unfenced { current_token } => {
232 if *current_token < token {
233 *self = FenceableToken::Fenced {
234 current_token: current_token.clone(),
235 fence_token: token,
236 };
237 self.validate()?;
238 }
239 }
240 FenceableToken::Fenced { .. } => {
241 self.validate()?;
242 }
243 }
244
245 Ok(())
246 }
247
248 fn generate_unfenced_token(
252 &self,
253 mode: Mode,
254 ) -> Result<Option<(Vec<(StateUpdateKind, Diff)>, FenceableToken)>, DurableCatalogError> {
255 let (durable_token, current_deploy_generation) = match self {
256 FenceableToken::Initializing {
257 durable_token,
258 current_deploy_generation,
259 } => (durable_token.clone(), current_deploy_generation.clone()),
260 FenceableToken::Unfenced { .. } | FenceableToken::Fenced { .. } => return Ok(None),
261 };
262
263 let mut fence_updates = Vec::with_capacity(2);
264
265 if let Some(durable_token) = &durable_token {
266 fence_updates.push((
267 StateUpdateKind::FenceToken(durable_token.clone()),
268 Diff::MINUS_ONE,
269 ));
270 }
271
272 let current_deploy_generation = current_deploy_generation
273 .or_else(|| durable_token.as_ref().map(|token| token.deploy_generation))
274 .ok_or(DurableCatalogError::Uninitialized)?;
276 let mut current_epoch = durable_token
277 .map(|token| token.epoch)
278 .unwrap_or(MIN_EPOCH)
279 .get();
280 if matches!(mode, Mode::Writable) {
282 current_epoch = current_epoch + 1;
283 }
284 let current_epoch = Epoch::new(current_epoch).expect("known to be non-zero");
285 let current_token = FenceToken {
286 deploy_generation: current_deploy_generation,
287 epoch: current_epoch,
288 };
289
290 fence_updates.push((
291 StateUpdateKind::FenceToken(current_token.clone()),
292 Diff::ONE,
293 ));
294
295 let current_fenceable_token = FenceableToken::Unfenced { current_token };
296
297 Ok(Some((fence_updates, current_fenceable_token)))
298 }
299}
300
301#[derive(Debug, thiserror::Error)]
303pub(crate) enum CompareAndAppendError {
304 #[error(transparent)]
305 Fence(#[from] FenceError),
306 #[error(
309 "expected catalog upper {expected_upper:?} did not match actual catalog upper {actual_upper:?}"
310 )]
311 UpperMismatch {
312 expected_upper: Timestamp,
313 actual_upper: Timestamp,
314 },
315}
316
317impl CompareAndAppendError {
318 pub(crate) fn unwrap_fence_error(self) -> FenceError {
319 match self {
320 CompareAndAppendError::Fence(e) => e,
321 e @ CompareAndAppendError::UpperMismatch { .. } => {
322 panic!("unexpected upper mismatch: {e:?}")
323 }
324 }
325 }
326}
327
328impl From<UpperMismatch<Timestamp>> for CompareAndAppendError {
329 fn from(upper_mismatch: UpperMismatch<Timestamp>) -> Self {
330 Self::UpperMismatch {
331 expected_upper: antichain_to_timestamp(upper_mismatch.expected),
332 actual_upper: antichain_to_timestamp(upper_mismatch.current),
333 }
334 }
335}
336
337pub(crate) trait ApplyUpdate<T: IntoStateUpdateKindJson> {
338 fn apply_update(
342 &mut self,
343 update: StateUpdate<T>,
344 current_fence_token: &mut FenceableToken,
345 metrics: &Arc<Metrics>,
346 ) -> Result<Option<StateUpdate<T>>, FenceError>;
347}
348
349#[derive(Debug)]
363pub(crate) struct PersistHandle<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> {
364 pub(crate) mode: Mode,
366 since_handle: SinceHandle<SourceData, (), Timestamp, StorageDiff, i64>,
368 write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
370 listen: Listen<SourceData, (), Timestamp, StorageDiff>,
372 persist_client: PersistClient,
374 shard_id: ShardId,
376 pub(crate) snapshot: Vec<(T, Timestamp, Diff)>,
380 update_applier: U,
382 pub(crate) upper: Timestamp,
384 fenceable_token: FenceableToken,
386 catalog_content_version: semver::Version,
388 bootstrap_complete: bool,
390 metrics: Arc<Metrics>,
392}
393
394impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
395 async fn increment_catalog_upgrade_shard_version(&self, organization_id: Uuid) {
397 let upgrade_shard_id = shard_id(organization_id, UPGRADE_SEED);
398
399 let () = self
400 .persist_client
401 .upgrade_version::<(), (), Timestamp, StorageDiff>(
402 upgrade_shard_id,
403 Diagnostics {
404 shard_name: UPGRADE_SHARD_NAME.to_string(),
405 handle_purpose: "durable catalog state upgrade".to_string(),
406 },
407 )
408 .await
409 .expect("invalid usage");
410 }
411
412 #[mz_ore::instrument]
414 async fn current_upper(&mut self) -> Timestamp {
415 match self.mode {
416 Mode::Writable | Mode::Readonly => {
417 let upper = self.write_handle.fetch_recent_upper().await;
418 antichain_to_timestamp(upper.clone())
419 }
420 Mode::Savepoint => self.upper,
421 }
422 }
423
424 #[mz_ore::instrument]
428 pub(crate) async fn compare_and_append<S: IntoStateUpdateKindJson>(
429 &mut self,
430 updates: Vec<(S, Diff)>,
431 commit_ts: Timestamp,
432 ) -> Result<Timestamp, CompareAndAppendError> {
433 assert_eq!(self.mode, Mode::Writable);
434 assert!(
435 commit_ts >= self.upper,
436 "expected commit ts, {}, to be greater than or equal to upper, {}",
437 commit_ts,
438 self.upper
439 );
440
441 let contains_fence = if mz_ore::assert::soft_assertions_enabled() {
444 let updates: Vec<_> = updates.clone();
445 let parsed_updates: Vec<_> = updates
446 .clone()
447 .into_iter()
448 .map(|(update, diff)| {
449 let update: StateUpdateKindJson = update.into();
450 (update, diff)
451 })
452 .filter_map(|(update, diff)| {
453 <StateUpdateKindJson as TryIntoStateUpdateKind>::try_into(update)
454 .ok()
455 .map(|update| (update, diff))
456 })
457 .collect();
458 let contains_retraction = parsed_updates.iter().any(|(update, diff)| {
459 matches!(update, StateUpdateKind::FenceToken(..)) && *diff == Diff::MINUS_ONE
460 });
461 let contains_addition = parsed_updates.iter().any(|(update, diff)| {
462 matches!(update, StateUpdateKind::FenceToken(..)) && *diff == Diff::ONE
463 });
464 let contains_fence = contains_retraction && contains_addition;
465 Some((contains_fence, updates))
466 } else {
467 None
468 };
469
470 let updates = updates.into_iter().map(|(kind, diff)| {
471 let kind: StateUpdateKindJson = kind.into();
472 (
473 (Into::<SourceData>::into(kind), ()),
474 commit_ts,
475 diff.into_inner(),
476 )
477 });
478 let next_upper = commit_ts.step_forward();
479 let res = self
480 .write_handle
481 .compare_and_append(
482 updates,
483 Antichain::from_elem(self.upper),
484 Antichain::from_elem(next_upper),
485 )
486 .await
487 .expect("invalid usage");
488
489 if let Err(e @ UpperMismatch { .. }) = res {
494 self.sync_to_current_upper().await?;
495 if let Some((contains_fence, updates)) = contains_fence {
496 assert!(
497 contains_fence,
498 "updates were neither fenced nor fencing and encountered an upper mismatch: {updates:#?}"
499 )
500 }
501 return Err(e.into());
502 }
503
504 let downgrade_to = Antichain::from_elem(next_upper.saturating_sub(1));
506
507 let opaque = *self.since_handle.opaque();
512 let downgrade = self
513 .since_handle
514 .maybe_compare_and_downgrade_since(&opaque, (&opaque, &downgrade_to))
515 .await;
516
517 match downgrade {
518 None => {}
519 Some(Err(e)) => soft_panic_or_log!("found opaque value {e}, but expected {opaque}"),
520 Some(Ok(updated)) => soft_assert_or_log!(
521 updated == downgrade_to,
522 "updated bound should match expected"
523 ),
524 }
525 self.sync(next_upper).await?;
526 Ok(next_upper)
527 }
528
529 #[mz_ore::instrument]
532 async fn snapshot_unconsolidated(&mut self) -> Vec<StateUpdate<StateUpdateKind>> {
533 let current_upper = self.current_upper().await;
534
535 let mut snapshot = Vec::new();
536 let mut read_handle = self.read_handle().await;
537 let as_of = as_of(&read_handle, current_upper);
538 let mut stream = Box::pin(
539 read_handle
541 .snapshot_and_stream(Antichain::from_elem(as_of))
542 .await
543 .expect("we have advanced the restart_as_of by the since"),
544 );
545 while let Some(update) = stream.next().await {
546 snapshot.push(update)
547 }
548 read_handle.expire().await;
549 snapshot
550 .into_iter()
551 .map(Into::<StateUpdate<StateUpdateKindJson>>::into)
552 .map(|state_update| state_update.try_into().expect("kind decoding error"))
553 .collect()
554 }
555
556 #[mz_ore::instrument]
560 pub(crate) async fn sync_to_current_upper(&mut self) -> Result<(), FenceError> {
561 let upper = self.current_upper().await;
562 self.sync(upper).await
563 }
564
565 #[mz_ore::instrument(level = "debug")]
569 pub(crate) async fn sync(&mut self, target_upper: Timestamp) -> Result<(), FenceError> {
570 self.metrics.syncs.inc();
571 let counter = self.metrics.sync_latency_seconds.clone();
572 self.sync_inner(target_upper)
573 .wall_time()
574 .inc_by(counter)
575 .await
576 }
577
578 #[mz_ore::instrument(level = "debug")]
579 async fn sync_inner(&mut self, target_upper: Timestamp) -> Result<(), FenceError> {
580 self.fenceable_token.validate()?;
581
582 if self.mode == Mode::Savepoint {
585 self.upper = max(self.upper, target_upper);
586 return Ok(());
587 }
588
589 let mut updates: BTreeMap<_, Vec<_>> = BTreeMap::new();
590
591 while self.upper < target_upper {
592 let listen_events = self.listen.fetch_next().await;
593 for listen_event in listen_events {
594 match listen_event {
595 ListenEvent::Progress(upper) => {
596 debug!("synced up to {upper:?}");
597 self.upper = antichain_to_timestamp(upper);
598 while let Some((ts, updates)) = updates.pop_first() {
603 assert!(ts < self.upper, "expected {} < {}", ts, self.upper);
604 let updates = updates.into_iter().map(
605 |update: StateUpdate<StateUpdateKindJson>| {
606 let kind =
607 T::try_from(update.kind).expect("kind decoding error");
608 StateUpdate {
609 kind,
610 ts: update.ts,
611 diff: update.diff,
612 }
613 },
614 );
615 self.apply_updates(updates)?;
616 }
617 }
618 ListenEvent::Updates(batch_updates) => {
619 for update in batch_updates {
620 let update: StateUpdate<StateUpdateKindJson> = update.into();
621 updates.entry(update.ts).or_default().push(update);
622 }
623 }
624 }
625 }
626 }
627 assert_eq!(updates, BTreeMap::new(), "all updates should be applied");
628 Ok(())
629 }
630
631 #[mz_ore::instrument(level = "debug")]
632 pub(crate) fn apply_updates(
633 &mut self,
634 updates: impl IntoIterator<Item = StateUpdate<T>>,
635 ) -> Result<(), FenceError> {
636 let mut updates: Vec<_> = updates
637 .into_iter()
638 .map(|StateUpdate { kind, ts, diff }| (kind, ts, diff))
639 .collect();
640
641 differential_dataflow::consolidation::consolidate_updates(&mut updates);
645
646 updates.sort_by(|(_, ts1, diff1), (_, ts2, diff2)| ts1.cmp(ts2).then(diff1.cmp(diff2)));
649
650 let mut errors = Vec::new();
651
652 for (kind, ts, diff) in updates {
653 if diff != Diff::ONE && diff != Diff::MINUS_ONE {
654 panic!("invalid update in consolidated trace: ({kind:?}, {ts:?}, {diff:?})");
655 }
656
657 match self.update_applier.apply_update(
658 StateUpdate { kind, ts, diff },
659 &mut self.fenceable_token,
660 &self.metrics,
661 ) {
662 Ok(Some(StateUpdate { kind, ts, diff })) => self.snapshot.push((kind, ts, diff)),
663 Ok(None) => {}
664 Err(err) => errors.push(err),
667 }
668 }
669
670 errors.sort();
671 if let Some(err) = errors.into_iter().next() {
672 return Err(err);
673 }
674
675 self.consolidate();
676
677 Ok(())
678 }
679
680 #[mz_ore::instrument]
681 pub(crate) fn consolidate(&mut self) {
682 soft_assert_no_log!(
683 self.snapshot
684 .windows(2)
685 .all(|updates| updates[0].1 <= updates[1].1),
686 "snapshot should be sorted by timestamp, {:#?}",
687 self.snapshot
688 );
689
690 let new_ts = self
691 .snapshot
692 .last()
693 .map(|(_, ts, _)| *ts)
694 .unwrap_or_else(Timestamp::minimum);
695 for (_, ts, _) in &mut self.snapshot {
696 *ts = new_ts;
697 }
698 differential_dataflow::consolidation::consolidate_updates(&mut self.snapshot);
699 }
700
701 async fn with_trace<R>(
705 &mut self,
706 f: impl FnOnce(&Vec<(T, Timestamp, Diff)>) -> Result<R, CatalogError>,
707 ) -> Result<R, CatalogError> {
708 self.sync_to_current_upper().await?;
709 f(&self.snapshot)
710 }
711
712 async fn read_handle(&self) -> ReadHandle<SourceData, (), Timestamp, StorageDiff> {
714 self.persist_client
715 .open_leased_reader(
716 self.shard_id,
717 Arc::new(desc()),
718 Arc::new(UnitSchema::default()),
719 Diagnostics {
720 shard_name: CATALOG_SHARD_NAME.to_string(),
721 handle_purpose: "openable durable catalog state temporary reader".to_string(),
722 },
723 USE_CRITICAL_SINCE_CATALOG.get(self.persist_client.dyncfgs()),
724 )
725 .await
726 .expect("invalid usage")
727 }
728
729 async fn expire(self: Box<Self>) {
731 self.write_handle.expire().await;
732 self.listen.expire().await;
733 }
734}
735
736impl<U: ApplyUpdate<StateUpdateKind>> PersistHandle<StateUpdateKind, U> {
737 async fn with_snapshot<T>(
741 &mut self,
742 f: impl FnOnce(Snapshot) -> Result<T, CatalogError>,
743 ) -> Result<T, CatalogError> {
744 fn apply<K, V>(map: &mut BTreeMap<K, V>, key: &K, value: &V, diff: Diff)
745 where
746 K: Ord + Clone,
747 V: Ord + Clone + Debug,
748 {
749 let key = key.clone();
750 let value = value.clone();
751 if diff == Diff::ONE {
752 let prev = map.insert(key, value);
753 assert_eq!(
754 prev, None,
755 "values must be explicitly retracted before inserting a new value"
756 );
757 } else if diff == Diff::MINUS_ONE {
758 let prev = map.remove(&key);
759 assert_eq!(
760 prev,
761 Some(value),
762 "retraction does not match existing value"
763 );
764 }
765 }
766
767 self.with_trace(|trace| {
768 let mut snapshot = Snapshot::empty();
769 for (kind, ts, diff) in trace {
770 let diff = *diff;
771 if diff != Diff::ONE && diff != Diff::MINUS_ONE {
772 panic!("invalid update in consolidated trace: ({kind:?}, {ts:?}, {diff:?})");
773 }
774
775 match kind {
776 StateUpdateKind::AuditLog(_key, ()) => {
777 }
779 StateUpdateKind::Cluster(key, value) => {
780 apply(&mut snapshot.clusters, key, value, diff);
781 }
782 StateUpdateKind::ClusterReplica(key, value) => {
783 apply(&mut snapshot.cluster_replicas, key, value, diff);
784 }
785 StateUpdateKind::Comment(key, value) => {
786 apply(&mut snapshot.comments, key, value, diff);
787 }
788 StateUpdateKind::Config(key, value) => {
789 apply(&mut snapshot.configs, key, value, diff);
790 }
791 StateUpdateKind::Database(key, value) => {
792 apply(&mut snapshot.databases, key, value, diff);
793 }
794 StateUpdateKind::DefaultPrivilege(key, value) => {
795 apply(&mut snapshot.default_privileges, key, value, diff);
796 }
797 StateUpdateKind::FenceToken(_token) => {
798 }
800 StateUpdateKind::IdAllocator(key, value) => {
801 apply(&mut snapshot.id_allocator, key, value, diff);
802 }
803 StateUpdateKind::IntrospectionSourceIndex(key, value) => {
804 apply(&mut snapshot.introspection_sources, key, value, diff);
805 }
806 StateUpdateKind::Item(key, value) => {
807 apply(&mut snapshot.items, key, value, diff);
808 }
809 StateUpdateKind::NetworkPolicy(key, value) => {
810 apply(&mut snapshot.network_policies, key, value, diff);
811 }
812 StateUpdateKind::Role(key, value) => {
813 apply(&mut snapshot.roles, key, value, diff);
814 }
815 StateUpdateKind::Schema(key, value) => {
816 apply(&mut snapshot.schemas, key, value, diff);
817 }
818 StateUpdateKind::Setting(key, value) => {
819 apply(&mut snapshot.settings, key, value, diff);
820 }
821 StateUpdateKind::SourceReferences(key, value) => {
822 apply(&mut snapshot.source_references, key, value, diff);
823 }
824 StateUpdateKind::SystemConfiguration(key, value) => {
825 apply(&mut snapshot.system_configurations, key, value, diff);
826 }
827 StateUpdateKind::SystemObjectMapping(key, value) => {
828 apply(&mut snapshot.system_object_mappings, key, value, diff);
829 }
830 StateUpdateKind::SystemPrivilege(key, value) => {
831 apply(&mut snapshot.system_privileges, key, value, diff);
832 }
833 StateUpdateKind::StorageCollectionMetadata(key, value) => {
834 apply(&mut snapshot.storage_collection_metadata, key, value, diff);
835 }
836 StateUpdateKind::UnfinalizedShard(key, ()) => {
837 apply(&mut snapshot.unfinalized_shards, key, &(), diff);
838 }
839 StateUpdateKind::TxnWalShard((), value) => {
840 apply(&mut snapshot.txn_wal_shard, &(), value, diff);
841 }
842 StateUpdateKind::RoleAuth(key, value) => {
843 apply(&mut snapshot.role_auth, key, value, diff);
844 }
845 }
846 }
847 f(snapshot)
848 })
849 .await
850 }
851
852 #[mz_ore::instrument(level = "debug")]
859 async fn persist_snapshot(&self) -> impl Iterator<Item = StateUpdate> + DoubleEndedIterator {
860 let mut read_handle = self.read_handle().await;
861 let as_of = as_of(&read_handle, self.upper);
862 let snapshot = snapshot_binary(&mut read_handle, as_of, &self.metrics)
863 .await
864 .map(|update| update.try_into().expect("kind decoding error"));
865 read_handle.expire().await;
866 snapshot
867 }
868}
869
870#[derive(Debug)]
872pub(crate) struct UnopenedCatalogStateInner {
873 organization_id: Uuid,
875 configs: BTreeMap<String, u64>,
877 settings: BTreeMap<String, String>,
879}
880
881impl UnopenedCatalogStateInner {
882 fn new(organization_id: Uuid) -> UnopenedCatalogStateInner {
883 UnopenedCatalogStateInner {
884 organization_id,
885 configs: BTreeMap::new(),
886 settings: BTreeMap::new(),
887 }
888 }
889}
890
891impl ApplyUpdate<StateUpdateKindJson> for UnopenedCatalogStateInner {
892 fn apply_update(
893 &mut self,
894 update: StateUpdate<StateUpdateKindJson>,
895 current_fence_token: &mut FenceableToken,
896 _metrics: &Arc<Metrics>,
897 ) -> Result<Option<StateUpdate<StateUpdateKindJson>>, FenceError> {
898 if !update.kind.is_audit_log() && update.kind.is_always_deserializable() {
899 let kind = TryInto::try_into(&update.kind).expect("kind is known to be deserializable");
900 match (kind, update.diff) {
901 (StateUpdateKind::Config(key, value), Diff::ONE) => {
902 let prev = self.configs.insert(key.key, value.value);
903 assert_eq!(
904 prev, None,
905 "values must be explicitly retracted before inserting a new value"
906 );
907 }
908 (StateUpdateKind::Config(key, value), Diff::MINUS_ONE) => {
909 let prev = self.configs.remove(&key.key);
910 assert_eq!(
911 prev,
912 Some(value.value),
913 "retraction does not match existing value"
914 );
915 }
916 (StateUpdateKind::Setting(key, value), Diff::ONE) => {
917 let prev = self.settings.insert(key.name, value.value);
918 assert_eq!(
919 prev, None,
920 "values must be explicitly retracted before inserting a new value"
921 );
922 }
923 (StateUpdateKind::Setting(key, value), Diff::MINUS_ONE) => {
924 let prev = self.settings.remove(&key.name);
925 assert_eq!(
926 prev,
927 Some(value.value),
928 "retraction does not match existing value"
929 );
930 }
931 (StateUpdateKind::FenceToken(fence_token), Diff::ONE) => {
932 current_fence_token.maybe_fence(fence_token)?;
933 }
934 _ => {}
935 }
936 }
937
938 Ok(Some(update))
939 }
940}
941
942pub(crate) type UnopenedPersistCatalogState =
950 PersistHandle<StateUpdateKindJson, UnopenedCatalogStateInner>;
951
952impl UnopenedPersistCatalogState {
953 #[mz_ore::instrument]
959 pub(crate) async fn new(
960 persist_client: PersistClient,
961 organization_id: Uuid,
962 version: semver::Version,
963 deploy_generation: Option<u64>,
964 metrics: Arc<Metrics>,
965 ) -> Result<UnopenedPersistCatalogState, DurableCatalogError> {
966 let catalog_shard_id = shard_id(organization_id, CATALOG_SEED);
967 let upgrade_shard_id = shard_id(organization_id, UPGRADE_SEED);
968 debug!(
969 ?catalog_shard_id,
970 ?upgrade_shard_id,
971 "new persist backed catalog state"
972 );
973
974 let version_in_upgrade_shard =
976 fetch_catalog_upgrade_shard_version(&persist_client, upgrade_shard_id).await;
977 if let Some(version_in_upgrade_shard) = version_in_upgrade_shard {
980 if !mz_persist_client::cfg::code_can_write_data(&version, &version_in_upgrade_shard) {
986 return Err(DurableCatalogError::IncompatiblePersistVersion {
987 found_version: version_in_upgrade_shard,
988 catalog_version: version,
989 });
990 }
991 }
992
993 let open_handles_start = Instant::now();
994 info!("startup: envd serve: catalog init: open handles beginning");
995 let since_handle = persist_client
996 .open_critical_since(
997 catalog_shard_id,
998 PersistClient::CONTROLLER_CRITICAL_SINCE,
1001 Diagnostics {
1002 shard_name: CATALOG_SHARD_NAME.to_string(),
1003 handle_purpose: "durable catalog state critical since".to_string(),
1004 },
1005 )
1006 .await
1007 .expect("invalid usage");
1008 let (mut write_handle, mut read_handle) = persist_client
1009 .open(
1010 catalog_shard_id,
1011 Arc::new(desc()),
1012 Arc::new(UnitSchema::default()),
1013 Diagnostics {
1014 shard_name: CATALOG_SHARD_NAME.to_string(),
1015 handle_purpose: "durable catalog state handles".to_string(),
1016 },
1017 USE_CRITICAL_SINCE_CATALOG.get(persist_client.dyncfgs()),
1018 )
1019 .await
1020 .expect("invalid usage");
1021 info!(
1022 "startup: envd serve: catalog init: open handles complete in {:?}",
1023 open_handles_start.elapsed()
1024 );
1025
1026 let upper = {
1028 const EMPTY_UPDATES: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
1029 let upper = Antichain::from_elem(Timestamp::minimum());
1030 let next_upper = Timestamp::minimum().step_forward();
1031 match write_handle
1032 .compare_and_append(EMPTY_UPDATES, upper, Antichain::from_elem(next_upper))
1033 .await
1034 .expect("invalid usage")
1035 {
1036 Ok(()) => next_upper,
1037 Err(mismatch) => antichain_to_timestamp(mismatch.current),
1038 }
1039 };
1040
1041 let snapshot_start = Instant::now();
1042 info!("startup: envd serve: catalog init: snapshot beginning");
1043 let as_of = as_of(&read_handle, upper);
1044 let snapshot: Vec<_> = snapshot_binary(&mut read_handle, as_of, &metrics)
1045 .await
1046 .map(|StateUpdate { kind, ts, diff }| (kind, ts, diff))
1047 .collect();
1048 let listen = read_handle
1049 .listen(Antichain::from_elem(as_of))
1050 .await
1051 .expect("invalid usage");
1052 info!(
1053 "startup: envd serve: catalog init: snapshot complete in {:?}",
1054 snapshot_start.elapsed()
1055 );
1056
1057 let mut handle = UnopenedPersistCatalogState {
1058 mode: Mode::Writable,
1060 since_handle,
1061 write_handle,
1062 listen,
1063 persist_client,
1064 shard_id: catalog_shard_id,
1065 snapshot: Vec::new(),
1067 update_applier: UnopenedCatalogStateInner::new(organization_id),
1068 upper,
1069 fenceable_token: FenceableToken::new(deploy_generation),
1070 catalog_content_version: version,
1071 bootstrap_complete: false,
1072 metrics,
1073 };
1074 soft_assert_no_log!(
1077 snapshot.iter().all(|(_, _, diff)| *diff == Diff::ONE),
1078 "snapshot should be consolidated: {snapshot:#?}"
1079 );
1080
1081 let apply_start = Instant::now();
1082 info!("startup: envd serve: catalog init: apply updates beginning");
1083 let updates = snapshot
1084 .into_iter()
1085 .map(|(kind, ts, diff)| StateUpdate { kind, ts, diff });
1086 handle.apply_updates(updates)?;
1087 info!(
1088 "startup: envd serve: catalog init: apply updates complete in {:?}",
1089 apply_start.elapsed()
1090 );
1091
1092 if let Some(found_version) = handle.get_catalog_content_version().await? {
1098 if handle.catalog_content_version < found_version {
1099 return Err(DurableCatalogError::IncompatiblePersistVersion {
1100 found_version,
1101 catalog_version: handle.catalog_content_version,
1102 });
1103 }
1104 }
1105
1106 Ok(handle)
1107 }
1108
1109 #[mz_ore::instrument]
1110 async fn open_inner(
1111 mut self,
1112 mode: Mode,
1113 initial_ts: Timestamp,
1114 bootstrap_args: &BootstrapArgs,
1115 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1116 let mut commit_ts = self.upper;
1119 self.mode = mode;
1120
1121 match (&self.mode, &self.fenceable_token) {
1123 (_, FenceableToken::Unfenced { .. } | FenceableToken::Fenced { .. }) => {
1124 return Err(DurableCatalogError::Internal(
1125 "catalog should not have fenced before opening".to_string(),
1126 )
1127 .into());
1128 }
1129 (
1130 Mode::Writable | Mode::Savepoint,
1131 FenceableToken::Initializing {
1132 current_deploy_generation: None,
1133 ..
1134 },
1135 ) => {
1136 return Err(DurableCatalogError::Internal(format!(
1137 "cannot open in mode '{:?}' without a deploy generation",
1138 self.mode,
1139 ))
1140 .into());
1141 }
1142 _ => {}
1143 }
1144
1145 let read_only = matches!(self.mode, Mode::Readonly);
1146
1147 loop {
1149 self.sync_to_current_upper().await?;
1150 commit_ts = max(commit_ts, self.upper);
1151 let (fence_updates, current_fenceable_token) = self
1152 .fenceable_token
1153 .generate_unfenced_token(self.mode)?
1154 .ok_or_else(|| {
1155 DurableCatalogError::Internal(
1156 "catalog should not have fenced before opening".to_string(),
1157 )
1158 })?;
1159 debug!(
1160 ?self.upper,
1161 ?self.fenceable_token,
1162 ?current_fenceable_token,
1163 "fencing previous catalogs"
1164 );
1165 if matches!(self.mode, Mode::Writable) {
1166 match self
1167 .compare_and_append(fence_updates.clone(), commit_ts)
1168 .await
1169 {
1170 Ok(upper) => {
1171 commit_ts = upper;
1172 }
1173 Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
1174 Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
1175 warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
1176 continue;
1177 }
1178 }
1179 }
1180 self.fenceable_token = current_fenceable_token;
1181 break;
1182 }
1183
1184 let is_initialized = self.is_initialized_inner();
1185 if !matches!(self.mode, Mode::Writable) && !is_initialized {
1186 return Err(CatalogError::Durable(DurableCatalogError::NotWritable(
1187 format!(
1188 "catalog tables do not exist; will not create in {:?} mode",
1189 self.mode
1190 ),
1191 )));
1192 }
1193 soft_assert_ne_or_log!(self.upper, Timestamp::minimum());
1194
1195 let (audit_logs, snapshot): (Vec<_>, Vec<_>) = self
1197 .snapshot
1198 .into_iter()
1199 .partition(|(update, _, _)| update.is_audit_log());
1200 self.snapshot = snapshot;
1201 let audit_log_count = audit_logs.iter().map(|(_, _, diff)| diff).sum::<Diff>();
1202 let audit_log_handle = AuditLogIterator::new(audit_logs);
1203
1204 if is_initialized && !read_only {
1206 commit_ts = upgrade(&mut self, commit_ts).await?;
1207 }
1208
1209 debug!(
1210 ?is_initialized,
1211 ?self.upper,
1212 "initializing catalog state"
1213 );
1214 let mut catalog = PersistCatalogState {
1215 mode: self.mode,
1216 since_handle: self.since_handle,
1217 write_handle: self.write_handle,
1218 listen: self.listen,
1219 persist_client: self.persist_client,
1220 shard_id: self.shard_id,
1221 upper: self.upper,
1222 fenceable_token: self.fenceable_token,
1223 snapshot: Vec::new(),
1225 update_applier: CatalogStateInner::new(),
1226 catalog_content_version: self.catalog_content_version,
1227 bootstrap_complete: false,
1228 metrics: self.metrics,
1229 };
1230 catalog.metrics.collection_entries.reset();
1231 catalog
1234 .metrics
1235 .collection_entries
1236 .with_label_values(&[&CollectionType::AuditLog.to_string()])
1237 .add(audit_log_count.into_inner());
1238 let updates = self.snapshot.into_iter().map(|(kind, ts, diff)| {
1239 let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
1240 StateUpdate { kind, ts, diff }
1241 });
1242 catalog.apply_updates(updates)?;
1243
1244 let catalog_content_version = catalog.catalog_content_version.to_string();
1245 let txn = if is_initialized {
1246 let mut txn = catalog.transaction().await?;
1247
1248 if txn.get_setting("migration_version".into()).is_none() && mode != Mode::Readonly {
1255 let old_version = txn.get_catalog_content_version();
1256 txn.set_setting("migration_version".into(), old_version.map(Into::into))?;
1257 }
1258
1259 txn.set_catalog_content_version(catalog_content_version)?;
1260 txn
1261 } else {
1262 soft_assert_eq_no_log!(
1263 catalog
1264 .snapshot
1265 .iter()
1266 .filter(|(kind, _, _)| !matches!(kind, StateUpdateKind::FenceToken(_)))
1267 .count(),
1268 0,
1269 "trace should not contain any updates for an uninitialized catalog: {:#?}",
1270 catalog.snapshot
1271 );
1272
1273 let mut txn = catalog.transaction().await?;
1274 initialize::initialize(
1275 &mut txn,
1276 bootstrap_args,
1277 initial_ts.into(),
1278 catalog_content_version,
1279 )
1280 .await?;
1281 txn
1282 };
1283
1284 if read_only {
1285 let (txn_batch, _) = txn.into_parts();
1286 let updates = StateUpdate::from_txn_batch_ts(txn_batch, catalog.upper);
1288 catalog.apply_updates(updates)?;
1289 } else {
1290 txn.commit_internal(commit_ts).await?;
1291 }
1292
1293 if matches!(catalog.mode, Mode::Writable) {
1297 catalog
1298 .increment_catalog_upgrade_shard_version(self.update_applier.organization_id)
1299 .await;
1300
1301 let write_handle = catalog
1302 .persist_client
1303 .open_writer::<SourceData, (), Timestamp, i64>(
1304 catalog.write_handle.shard_id(),
1305 Arc::new(desc()),
1306 Arc::new(UnitSchema::default()),
1307 Diagnostics {
1308 shard_name: CATALOG_SHARD_NAME.to_string(),
1309 handle_purpose: "compact catalog".to_string(),
1310 },
1311 )
1312 .await
1313 .expect("invalid usage");
1314 let fuel = CATALOG_FORCE_COMPACTION_FUEL.handle(catalog.persist_client.dyncfgs());
1315 let wait = CATALOG_FORCE_COMPACTION_WAIT.handle(catalog.persist_client.dyncfgs());
1316 let _task = mz_ore::task::spawn(|| "catalog::force_shard_compaction", async move {
1319 let () =
1320 mz_persist_client::cli::admin::dangerous_force_compaction_and_break_pushdown(
1321 &write_handle,
1322 || fuel.get(),
1323 || wait.get(),
1324 )
1325 .await;
1326 });
1327 }
1328
1329 Ok((Box::new(catalog), audit_log_handle))
1330 }
1331
1332 #[mz_ore::instrument]
1337 fn is_initialized_inner(&self) -> bool {
1338 !self.update_applier.configs.is_empty()
1339 }
1340
1341 #[mz_ore::instrument]
1345 async fn get_current_config(&mut self, key: &str) -> Result<Option<u64>, DurableCatalogError> {
1346 self.sync_to_current_upper().await?;
1347 Ok(self.update_applier.configs.get(key).cloned())
1348 }
1349
1350 #[mz_ore::instrument]
1354 pub(crate) async fn get_user_version(&mut self) -> Result<Option<u64>, DurableCatalogError> {
1355 self.get_current_config(USER_VERSION_KEY).await
1356 }
1357
1358 #[mz_ore::instrument]
1362 async fn get_current_setting(
1363 &mut self,
1364 name: &str,
1365 ) -> Result<Option<String>, DurableCatalogError> {
1366 self.sync_to_current_upper().await?;
1367 Ok(self.update_applier.settings.get(name).cloned())
1368 }
1369
1370 #[mz_ore::instrument]
1375 async fn get_catalog_content_version(
1376 &mut self,
1377 ) -> Result<Option<semver::Version>, DurableCatalogError> {
1378 let version = self
1379 .get_current_setting(CATALOG_CONTENT_VERSION_KEY)
1380 .await?;
1381 let version = version.map(|version| version.parse().expect("invalid version persisted"));
1382 Ok(version)
1383 }
1384}
1385
1386#[async_trait]
1387impl OpenableDurableCatalogState for UnopenedPersistCatalogState {
1388 #[mz_ore::instrument]
1389 async fn open_savepoint(
1390 mut self: Box<Self>,
1391 initial_ts: Timestamp,
1392 bootstrap_args: &BootstrapArgs,
1393 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1394 self.open_inner(Mode::Savepoint, initial_ts, bootstrap_args)
1395 .boxed()
1396 .await
1397 }
1398
1399 #[mz_ore::instrument]
1400 async fn open_read_only(
1401 mut self: Box<Self>,
1402 bootstrap_args: &BootstrapArgs,
1403 ) -> Result<Box<dyn DurableCatalogState>, CatalogError> {
1404 self.open_inner(Mode::Readonly, EpochMillis::MIN.into(), bootstrap_args)
1405 .boxed()
1406 .await
1407 .map(|(catalog, _)| catalog)
1408 }
1409
1410 #[mz_ore::instrument]
1411 async fn open(
1412 mut self: Box<Self>,
1413 initial_ts: Timestamp,
1414 bootstrap_args: &BootstrapArgs,
1415 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1416 self.open_inner(Mode::Writable, initial_ts, bootstrap_args)
1417 .boxed()
1418 .await
1419 }
1420
1421 #[mz_ore::instrument(level = "debug")]
1422 async fn open_debug(mut self: Box<Self>) -> Result<DebugCatalogState, CatalogError> {
1423 Ok(DebugCatalogState(*self))
1424 }
1425
1426 #[mz_ore::instrument]
1427 async fn is_initialized(&mut self) -> Result<bool, CatalogError> {
1428 self.sync_to_current_upper().await?;
1429 Ok(self.is_initialized_inner())
1430 }
1431
1432 #[mz_ore::instrument]
1433 async fn epoch(&mut self) -> Result<Epoch, CatalogError> {
1434 self.sync_to_current_upper().await?;
1435 self.fenceable_token
1436 .validate()?
1437 .map(|token| token.epoch)
1438 .ok_or(CatalogError::Durable(DurableCatalogError::Uninitialized))
1439 }
1440
1441 #[mz_ore::instrument]
1442 async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError> {
1443 self.sync_to_current_upper().await?;
1444 self.fenceable_token
1445 .token()
1446 .map(|token| token.deploy_generation)
1447 .ok_or(CatalogError::Durable(DurableCatalogError::Uninitialized))
1448 }
1449
1450 #[mz_ore::instrument(level = "debug")]
1451 async fn get_0dt_deployment_max_wait(&mut self) -> Result<Option<Duration>, CatalogError> {
1452 let value = self
1453 .get_current_config(WITH_0DT_DEPLOYMENT_MAX_WAIT)
1454 .await?;
1455 match value {
1456 None => Ok(None),
1457 Some(millis) => Ok(Some(Duration::from_millis(millis))),
1458 }
1459 }
1460
1461 #[mz_ore::instrument(level = "debug")]
1462 async fn get_0dt_deployment_ddl_check_interval(
1463 &mut self,
1464 ) -> Result<Option<Duration>, CatalogError> {
1465 let value = self
1466 .get_current_config(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL)
1467 .await?;
1468 match value {
1469 None => Ok(None),
1470 Some(millis) => Ok(Some(Duration::from_millis(millis))),
1471 }
1472 }
1473
1474 #[mz_ore::instrument(level = "debug")]
1475 async fn get_enable_0dt_deployment_panic_after_timeout(
1476 &mut self,
1477 ) -> Result<Option<bool>, CatalogError> {
1478 let value = self
1479 .get_current_config(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT)
1480 .await?;
1481 match value {
1482 None => Ok(None),
1483 Some(0) => Ok(Some(false)),
1484 Some(1) => Ok(Some(true)),
1485 Some(v) => Err(
1486 DurableCatalogError::from(TryFromProtoError::UnknownEnumVariant(format!(
1487 "{v} is not a valid boolean value"
1488 )))
1489 .into(),
1490 ),
1491 }
1492 }
1493
1494 #[mz_ore::instrument]
1495 async fn has_system_config_synced_once(&mut self) -> Result<bool, DurableCatalogError> {
1496 self.get_current_config(SYSTEM_CONFIG_SYNCED_KEY)
1497 .await
1498 .map(|value| value.map(|value| value > 0).unwrap_or(false))
1499 }
1500
1501 #[mz_ore::instrument]
1502 async fn trace_unconsolidated(&mut self) -> Result<Trace, CatalogError> {
1503 self.sync_to_current_upper().await?;
1504 if self.is_initialized_inner() {
1505 let snapshot = self.snapshot_unconsolidated().await;
1506 Ok(Trace::from_snapshot(snapshot))
1507 } else {
1508 Err(CatalogError::Durable(DurableCatalogError::Uninitialized))
1509 }
1510 }
1511
1512 #[mz_ore::instrument]
1513 async fn trace_consolidated(&mut self) -> Result<Trace, CatalogError> {
1514 self.sync_to_current_upper().await?;
1515 if self.is_initialized_inner() {
1516 let snapshot = self.current_snapshot().await?;
1517 Ok(Trace::from_snapshot(snapshot))
1518 } else {
1519 Err(CatalogError::Durable(DurableCatalogError::Uninitialized))
1520 }
1521 }
1522
1523 #[mz_ore::instrument(level = "debug")]
1524 async fn expire(self: Box<Self>) {
1525 self.expire().await
1526 }
1527}
1528
1529#[derive(Debug)]
1531struct CatalogStateInner {
1532 updates: VecDeque<memory::objects::StateUpdate>,
1534}
1535
1536impl CatalogStateInner {
1537 fn new() -> CatalogStateInner {
1538 CatalogStateInner {
1539 updates: VecDeque::new(),
1540 }
1541 }
1542}
1543
1544impl ApplyUpdate<StateUpdateKind> for CatalogStateInner {
1545 fn apply_update(
1546 &mut self,
1547 update: StateUpdate<StateUpdateKind>,
1548 current_fence_token: &mut FenceableToken,
1549 metrics: &Arc<Metrics>,
1550 ) -> Result<Option<StateUpdate<StateUpdateKind>>, FenceError> {
1551 if let Some(collection_type) = update.kind.collection_type() {
1552 metrics
1553 .collection_entries
1554 .with_label_values(&[&collection_type.to_string()])
1555 .add(update.diff.into_inner());
1556 }
1557
1558 {
1559 let update: Option<memory::objects::StateUpdate> = (&update)
1560 .try_into()
1561 .expect("invalid persisted update: {update:#?}");
1562 if let Some(update) = update {
1563 self.updates.push_back(update);
1564 }
1565 }
1566
1567 match (update.kind, update.diff) {
1568 (StateUpdateKind::AuditLog(_, ()), _) => Ok(None),
1569 (StateUpdateKind::FenceToken(_), Diff::MINUS_ONE) => Ok(None),
1571 (StateUpdateKind::FenceToken(token), Diff::ONE) => {
1572 current_fence_token.maybe_fence(token)?;
1573 Ok(None)
1574 }
1575 (kind, diff) => Ok(Some(StateUpdate {
1576 kind,
1577 ts: update.ts,
1578 diff,
1579 })),
1580 }
1581 }
1582}
1583
1584type PersistCatalogState = PersistHandle<StateUpdateKind, CatalogStateInner>;
1590
1591#[async_trait]
1592impl ReadOnlyDurableCatalogState for PersistCatalogState {
1593 fn epoch(&self) -> Epoch {
1594 self.fenceable_token
1595 .token()
1596 .expect("opened catalog state must have an epoch")
1597 .epoch
1598 }
1599
1600 fn metrics(&self) -> &Metrics {
1601 &self.metrics
1602 }
1603
1604 #[mz_ore::instrument(level = "debug")]
1605 async fn expire(self: Box<Self>) {
1606 self.expire().await
1607 }
1608
1609 fn is_bootstrap_complete(&self) -> bool {
1610 self.bootstrap_complete
1611 }
1612
1613 async fn get_audit_logs(&mut self) -> Result<Vec<VersionedEvent>, CatalogError> {
1614 self.sync_to_current_upper().await?;
1615 let audit_logs: Vec<_> = self
1616 .persist_snapshot()
1617 .await
1618 .filter_map(
1619 |StateUpdate {
1620 kind,
1621 ts: _,
1622 diff: _,
1623 }| match kind {
1624 StateUpdateKind::AuditLog(key, ()) => Some(key),
1625 _ => None,
1626 },
1627 )
1628 .collect();
1629 let mut audit_logs: Vec<_> = audit_logs
1630 .into_iter()
1631 .map(RustType::from_proto)
1632 .map_ok(|key: AuditLogKey| key.event)
1633 .collect::<Result<_, _>>()?;
1634 audit_logs.sort_by(|a, b| a.sortable_id().cmp(&b.sortable_id()));
1635 Ok(audit_logs)
1636 }
1637
1638 #[mz_ore::instrument(level = "debug")]
1639 async fn get_next_id(&mut self, id_type: &str) -> Result<u64, CatalogError> {
1640 self.with_trace(|trace| {
1641 Ok(trace
1642 .into_iter()
1643 .rev()
1644 .filter_map(|(kind, _, _)| match kind {
1645 StateUpdateKind::IdAllocator(key, value) if key.name == id_type => {
1646 Some(value.next_id)
1647 }
1648 _ => None,
1649 })
1650 .next()
1651 .expect("must exist"))
1652 })
1653 .await
1654 }
1655
1656 #[mz_ore::instrument(level = "debug")]
1657 async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError> {
1658 self.sync_to_current_upper().await?;
1659 Ok(self
1660 .fenceable_token
1661 .token()
1662 .expect("opened catalogs must have a token")
1663 .deploy_generation)
1664 }
1665
1666 #[mz_ore::instrument(level = "debug")]
1667 async fn snapshot(&mut self) -> Result<Snapshot, CatalogError> {
1668 self.with_snapshot(Ok).await
1669 }
1670
1671 #[mz_ore::instrument(level = "debug")]
1672 async fn sync_to_current_updates(
1673 &mut self,
1674 ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError> {
1675 let upper = self.current_upper().await;
1676 self.sync_updates(upper).await
1677 }
1678
1679 #[mz_ore::instrument(level = "debug")]
1680 async fn sync_updates(
1681 &mut self,
1682 target_upper: mz_repr::Timestamp,
1683 ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError> {
1684 self.sync(target_upper).await?;
1685 let mut updates = Vec::new();
1686 while let Some(update) = self.update_applier.updates.front() {
1687 if update.ts >= target_upper {
1688 break;
1689 }
1690
1691 let update = self
1692 .update_applier
1693 .updates
1694 .pop_front()
1695 .expect("peeked above");
1696 updates.push(update);
1697 }
1698 Ok(updates)
1699 }
1700
1701 async fn current_upper(&mut self) -> Timestamp {
1702 self.current_upper().await
1703 }
1704}
1705
1706#[async_trait]
1707#[allow(mismatched_lifetime_syntaxes)]
1708impl DurableCatalogState for PersistCatalogState {
1709 fn is_read_only(&self) -> bool {
1710 matches!(self.mode, Mode::Readonly)
1711 }
1712
1713 fn is_savepoint(&self) -> bool {
1714 matches!(self.mode, Mode::Savepoint)
1715 }
1716
1717 async fn mark_bootstrap_complete(&mut self) {
1718 self.bootstrap_complete = true;
1719 if matches!(self.mode, Mode::Writable) {
1720 self.since_handle
1721 .upgrade_version()
1722 .await
1723 .expect("invalid usage")
1724 }
1725 }
1726
1727 #[mz_ore::instrument(level = "debug")]
1728 async fn transaction(&mut self) -> Result<Transaction, CatalogError> {
1729 self.metrics.transactions_started.inc();
1730 let snapshot = self.snapshot().await?;
1731 let commit_ts = self.upper.clone();
1732 Transaction::new(self, snapshot, commit_ts)
1733 }
1734
1735 #[mz_ore::instrument(level = "debug")]
1736 async fn commit_transaction(
1737 &mut self,
1738 txn_batch: TransactionBatch,
1739 commit_ts: Timestamp,
1740 ) -> Result<Timestamp, CatalogError> {
1741 async fn commit_transaction_inner(
1742 catalog: &mut PersistCatalogState,
1743 txn_batch: TransactionBatch,
1744 commit_ts: Timestamp,
1745 ) -> Result<Timestamp, CatalogError> {
1746 assert_eq!(
1751 catalog.upper, txn_batch.upper,
1752 "only one transaction at a time is supported"
1753 );
1754
1755 assert!(
1756 commit_ts >= catalog.upper,
1757 "expected commit ts, {}, to be greater than or equal to upper, {}",
1758 commit_ts,
1759 catalog.upper
1760 );
1761
1762 let updates = StateUpdate::from_txn_batch(txn_batch).collect();
1763 debug!("committing updates: {updates:?}");
1764
1765 let next_upper = match catalog.mode {
1766 Mode::Writable => catalog
1767 .compare_and_append(updates, commit_ts)
1768 .await
1769 .map_err(|e| e.unwrap_fence_error())?,
1770 Mode::Savepoint => {
1771 let updates = updates.into_iter().map(|(kind, diff)| StateUpdate {
1772 kind,
1773 ts: commit_ts,
1774 diff,
1775 });
1776 catalog.apply_updates(updates)?;
1777 catalog.upper = commit_ts.step_forward();
1778 catalog.upper
1779 }
1780 Mode::Readonly => {
1781 if !updates.is_empty() {
1785 return Err(DurableCatalogError::NotWritable(format!(
1786 "cannot commit a transaction in a read-only catalog: {updates:#?}"
1787 ))
1788 .into());
1789 }
1790 catalog.upper
1791 }
1792 };
1793
1794 Ok(next_upper)
1795 }
1796 self.metrics.transaction_commits.inc();
1797 let counter = self.metrics.transaction_commit_latency_seconds.clone();
1798 commit_transaction_inner(self, txn_batch, commit_ts)
1799 .wall_time()
1800 .inc_by(counter)
1801 .await
1802 }
1803
1804 #[mz_ore::instrument(level = "debug")]
1805 async fn confirm_leadership(&mut self) -> Result<(), CatalogError> {
1806 if self.is_read_only() {
1808 return Ok(());
1809 }
1810 self.sync_to_current_upper().await?;
1811 Ok(())
1812 }
1813}
1814
1815pub fn shard_id(organization_id: Uuid, seed: usize) -> ShardId {
1817 let hash = sha2::Sha256::digest(format!("{organization_id}{seed}")).to_vec();
1818 soft_assert_eq_or_log!(hash.len(), 32, "SHA256 returns 32 bytes (256 bits)");
1819 let uuid = Uuid::from_slice(&hash[0..16]).expect("from_slice accepts exactly 16 bytes");
1820 ShardId::from_str(&format!("s{uuid}")).expect("known to be valid")
1821}
1822
1823fn desc() -> RelationDesc {
1826 RelationDesc::builder()
1827 .with_column("data", SqlScalarType::Jsonb.nullable(false))
1828 .finish()
1829}
1830
1831fn as_of(
1834 read_handle: &ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1835 upper: Timestamp,
1836) -> Timestamp {
1837 let since = read_handle.since().clone();
1838 let mut as_of = upper.checked_sub(1).unwrap_or_else(|| {
1839 panic!("catalog persist shard should be initialize, found upper: {upper:?}")
1840 });
1841 soft_assert_or_log!(
1844 since.less_equal(&as_of),
1845 "since={since:?}, as_of={as_of:?}; since must be less than or equal to as_of"
1846 );
1847 as_of.advance_by(since.borrow());
1850 as_of
1851}
1852
1853async fn fetch_catalog_upgrade_shard_version(
1856 persist_client: &PersistClient,
1857 upgrade_shard_id: ShardId,
1858) -> Option<semver::Version> {
1859 let shard_state = persist_client
1860 .inspect_shard::<Timestamp>(&upgrade_shard_id)
1861 .await
1862 .ok()?;
1863 let json_state = serde_json::to_value(shard_state).expect("state serialization error");
1864 let upgrade_version = json_state
1865 .get("applier_version")
1866 .cloned()
1867 .expect("missing applier_version");
1868 let upgrade_version =
1869 serde_json::from_value(upgrade_version).expect("version deserialization error");
1870 Some(upgrade_version)
1871}
1872
1873#[mz_ore::instrument(level = "debug")]
1878async fn snapshot_binary(
1879 read_handle: &mut ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1880 as_of: Timestamp,
1881 metrics: &Arc<Metrics>,
1882) -> impl Iterator<Item = StateUpdate<StateUpdateKindJson>> + DoubleEndedIterator + use<> {
1883 metrics.snapshots_taken.inc();
1884 let counter = metrics.snapshot_latency_seconds.clone();
1885 snapshot_binary_inner(read_handle, as_of)
1886 .wall_time()
1887 .inc_by(counter)
1888 .await
1889}
1890
1891#[mz_ore::instrument(level = "debug")]
1896async fn snapshot_binary_inner(
1897 read_handle: &mut ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1898 as_of: Timestamp,
1899) -> impl Iterator<Item = StateUpdate<StateUpdateKindJson>> + DoubleEndedIterator + use<> {
1900 let snapshot = read_handle
1901 .snapshot_and_fetch(Antichain::from_elem(as_of))
1902 .await
1903 .expect("we have advanced the restart_as_of by the since");
1904 soft_assert_no_log!(
1905 snapshot.iter().all(|(_, _, diff)| *diff == 1),
1906 "snapshot_and_fetch guarantees a consolidated result: {snapshot:#?}"
1907 );
1908 snapshot
1909 .into_iter()
1910 .map(Into::<StateUpdate<StateUpdateKindJson>>::into)
1911 .sorted_by(|a, b| Ord::cmp(&b.ts, &a.ts))
1912}
1913
1914pub(crate) fn antichain_to_timestamp(antichain: Antichain<Timestamp>) -> Timestamp {
1919 antichain
1920 .into_option()
1921 .expect("we use a totally ordered time and never finalize the shard")
1922}
1923
1924impl Trace {
1927 fn from_snapshot(snapshot: impl IntoIterator<Item = StateUpdate>) -> Trace {
1929 let mut trace = Trace::new();
1930 for StateUpdate { kind, ts, diff } in snapshot {
1931 match kind {
1932 StateUpdateKind::AuditLog(k, v) => trace.audit_log.values.push(((k, v), ts, diff)),
1933 StateUpdateKind::Cluster(k, v) => trace.clusters.values.push(((k, v), ts, diff)),
1934 StateUpdateKind::ClusterReplica(k, v) => {
1935 trace.cluster_replicas.values.push(((k, v), ts, diff))
1936 }
1937 StateUpdateKind::Comment(k, v) => trace.comments.values.push(((k, v), ts, diff)),
1938 StateUpdateKind::Config(k, v) => trace.configs.values.push(((k, v), ts, diff)),
1939 StateUpdateKind::Database(k, v) => trace.databases.values.push(((k, v), ts, diff)),
1940 StateUpdateKind::DefaultPrivilege(k, v) => {
1941 trace.default_privileges.values.push(((k, v), ts, diff))
1942 }
1943 StateUpdateKind::FenceToken(_) => {
1944 }
1946 StateUpdateKind::IdAllocator(k, v) => {
1947 trace.id_allocator.values.push(((k, v), ts, diff))
1948 }
1949 StateUpdateKind::IntrospectionSourceIndex(k, v) => {
1950 trace.introspection_sources.values.push(((k, v), ts, diff))
1951 }
1952 StateUpdateKind::Item(k, v) => trace.items.values.push(((k, v), ts, diff)),
1953 StateUpdateKind::NetworkPolicy(k, v) => {
1954 trace.network_policies.values.push(((k, v), ts, diff))
1955 }
1956 StateUpdateKind::Role(k, v) => trace.roles.values.push(((k, v), ts, diff)),
1957 StateUpdateKind::Schema(k, v) => trace.schemas.values.push(((k, v), ts, diff)),
1958 StateUpdateKind::Setting(k, v) => trace.settings.values.push(((k, v), ts, diff)),
1959 StateUpdateKind::SourceReferences(k, v) => {
1960 trace.source_references.values.push(((k, v), ts, diff))
1961 }
1962 StateUpdateKind::SystemConfiguration(k, v) => {
1963 trace.system_configurations.values.push(((k, v), ts, diff))
1964 }
1965 StateUpdateKind::SystemObjectMapping(k, v) => {
1966 trace.system_object_mappings.values.push(((k, v), ts, diff))
1967 }
1968 StateUpdateKind::SystemPrivilege(k, v) => {
1969 trace.system_privileges.values.push(((k, v), ts, diff))
1970 }
1971 StateUpdateKind::StorageCollectionMetadata(k, v) => trace
1972 .storage_collection_metadata
1973 .values
1974 .push(((k, v), ts, diff)),
1975 StateUpdateKind::UnfinalizedShard(k, ()) => {
1976 trace.unfinalized_shards.values.push(((k, ()), ts, diff))
1977 }
1978 StateUpdateKind::TxnWalShard((), v) => {
1979 trace.txn_wal_shard.values.push((((), v), ts, diff))
1980 }
1981 StateUpdateKind::RoleAuth(k, v) => trace.role_auth.values.push(((k, v), ts, diff)),
1982 }
1983 }
1984 trace
1985 }
1986}
1987
1988impl UnopenedPersistCatalogState {
1989 #[mz_ore::instrument]
1991 pub(crate) async fn debug_edit<T: Collection>(
1992 &mut self,
1993 key: T::Key,
1994 value: T::Value,
1995 ) -> Result<Option<T::Value>, CatalogError>
1996 where
1997 T::Key: PartialEq + Eq + Debug + Clone,
1998 T::Value: Debug + Clone,
1999 {
2000 let prev_value = loop {
2001 let key = key.clone();
2002 let value = value.clone();
2003 let snapshot = self.current_snapshot().await?;
2004 let trace = Trace::from_snapshot(snapshot);
2005 let collection_trace = T::collection_trace(trace);
2006 let prev_values: Vec<_> = collection_trace
2007 .values
2008 .into_iter()
2009 .filter(|((k, _), _, diff)| {
2010 soft_assert_eq_or_log!(*diff, Diff::ONE, "trace is consolidated");
2011 &key == k
2012 })
2013 .collect();
2014
2015 let prev_value = match &prev_values[..] {
2016 [] => None,
2017 [((_, v), _, _)] => Some(v.clone()),
2018 prev_values => panic!("multiple values found for key {key:?}: {prev_values:?}"),
2019 };
2020
2021 let mut updates: Vec<_> = prev_values
2022 .into_iter()
2023 .map(|((k, v), _, _)| (T::update(k, v), Diff::MINUS_ONE))
2024 .collect();
2025 updates.push((T::update(key, value), Diff::ONE));
2026 match self.fenceable_token.generate_unfenced_token(self.mode)? {
2028 Some((fence_updates, current_fenceable_token)) => {
2029 updates.extend(fence_updates.clone());
2030 match self.compare_and_append(updates, self.upper).await {
2031 Ok(_) => {
2032 self.fenceable_token = current_fenceable_token;
2033 break prev_value;
2034 }
2035 Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
2036 Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
2037 warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
2038 continue;
2039 }
2040 }
2041 }
2042 None => {
2043 self.compare_and_append(updates, self.upper)
2044 .await
2045 .map_err(|e| e.unwrap_fence_error())?;
2046 break prev_value;
2047 }
2048 }
2049 };
2050 Ok(prev_value)
2051 }
2052
2053 #[mz_ore::instrument]
2055 pub(crate) async fn debug_delete<T: Collection>(
2056 &mut self,
2057 key: T::Key,
2058 ) -> Result<(), CatalogError>
2059 where
2060 T::Key: PartialEq + Eq + Debug + Clone,
2061 T::Value: Debug,
2062 {
2063 loop {
2064 let key = key.clone();
2065 let snapshot = self.current_snapshot().await?;
2066 let trace = Trace::from_snapshot(snapshot);
2067 let collection_trace = T::collection_trace(trace);
2068 let mut retractions: Vec<_> = collection_trace
2069 .values
2070 .into_iter()
2071 .filter(|((k, _), _, diff)| {
2072 soft_assert_eq_or_log!(*diff, Diff::ONE, "trace is consolidated");
2073 &key == k
2074 })
2075 .map(|((k, v), _, _)| (T::update(k, v), Diff::MINUS_ONE))
2076 .collect();
2077
2078 match self.fenceable_token.generate_unfenced_token(self.mode)? {
2080 Some((fence_updates, current_fenceable_token)) => {
2081 retractions.extend(fence_updates.clone());
2082 match self.compare_and_append(retractions, self.upper).await {
2083 Ok(_) => {
2084 self.fenceable_token = current_fenceable_token;
2085 break;
2086 }
2087 Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
2088 Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
2089 warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
2090 continue;
2091 }
2092 }
2093 }
2094 None => {
2095 self.compare_and_append(retractions, self.upper)
2096 .await
2097 .map_err(|e| e.unwrap_fence_error())?;
2098 break;
2099 }
2100 }
2101 }
2102 Ok(())
2103 }
2104
2105 async fn current_snapshot(
2110 &mut self,
2111 ) -> Result<impl IntoIterator<Item = StateUpdate> + '_, CatalogError> {
2112 self.sync_to_current_upper().await?;
2113 self.consolidate();
2114 Ok(self.snapshot.iter().cloned().map(|(kind, ts, diff)| {
2115 let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
2116 StateUpdate { kind, ts, diff }
2117 }))
2118 }
2119}