1#[cfg(test)]
11mod tests;
12
13use std::cmp::max;
14use std::collections::{BTreeMap, VecDeque};
15use std::fmt::Debug;
16use std::str::FromStr;
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use async_trait::async_trait;
21use differential_dataflow::lattice::Lattice;
22use futures::{FutureExt, StreamExt};
23use itertools::Itertools;
24use mz_audit_log::VersionedEvent;
25use mz_ore::metrics::MetricsFutureExt;
26use mz_ore::now::EpochMillis;
27use mz_ore::{
28 soft_assert_eq_no_log, soft_assert_eq_or_log, soft_assert_ne_or_log, soft_assert_no_log,
29 soft_assert_or_log, soft_panic_or_log,
30};
31use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
32use mz_persist_client::cli::admin::{CATALOG_FORCE_COMPACTION_FUEL, CATALOG_FORCE_COMPACTION_WAIT};
33use mz_persist_client::critical::SinceHandle;
34use mz_persist_client::error::UpperMismatch;
35use mz_persist_client::read::{Listen, ListenEvent, ReadHandle};
36use mz_persist_client::write::WriteHandle;
37use mz_persist_client::{Diagnostics, PersistClient, ShardId};
38use mz_persist_types::codec_impls::UnitSchema;
39use mz_proto::{RustType, TryFromProtoError};
40use mz_repr::{Diff, RelationDesc, SqlScalarType};
41use mz_storage_types::StorageDiff;
42use mz_storage_types::sources::SourceData;
43use sha2::Digest;
44use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
45use tracing::{debug, info, warn};
46use uuid::Uuid;
47
48use crate::durable::debug::{Collection, CollectionType, DebugCatalogState, Trace};
49use crate::durable::error::FenceError;
50use crate::durable::initialize::{
51 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, SYSTEM_CONFIG_SYNCED_KEY, USER_VERSION_KEY,
52 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
53};
54use crate::durable::metrics::Metrics;
55use crate::durable::objects::state_update::{
56 IntoStateUpdateKindJson, StateUpdate, StateUpdateKind, StateUpdateKindJson,
57 TryIntoStateUpdateKind,
58};
59use crate::durable::objects::{AuditLogKey, FenceToken, Snapshot};
60use crate::durable::transaction::TransactionBatch;
61use crate::durable::upgrade::upgrade;
62use crate::durable::{
63 AuditLogIterator, BootstrapArgs, CATALOG_CONTENT_VERSION_KEY, CatalogError,
64 DurableCatalogError, DurableCatalogState, Epoch, OpenableDurableCatalogState,
65 ReadOnlyDurableCatalogState, Transaction, initialize,
66};
67use crate::memory;
68
69pub(crate) type Timestamp = mz_repr::Timestamp;
71
72const MIN_EPOCH: Epoch = unsafe { Epoch::new_unchecked(1) };
77
78const CATALOG_SHARD_NAME: &str = "catalog";
80const UPGRADE_SHARD_NAME: &str = "catalog_upgrade";
82
83const CATALOG_SEED: usize = 1;
85const UPGRADE_SEED: usize = 2;
116pub const _BUILTIN_MIGRATION_SEED: usize = 3;
118pub const _EXPRESSION_CACHE_SEED: usize = 4;
120
121#[derive(Debug, Copy, Clone, Eq, PartialEq)]
123pub(crate) enum Mode {
124 Readonly,
126 Savepoint,
128 Writable,
130}
131
132#[derive(Debug)]
134pub(crate) enum FenceableToken {
135 Initializing {
138 durable_token: Option<FenceToken>,
140 current_deploy_generation: Option<u64>,
142 },
143 Unfenced { current_token: FenceToken },
145 Fenced {
147 current_token: FenceToken,
148 fence_token: FenceToken,
149 },
150}
151
152impl FenceableToken {
153 fn new(current_deploy_generation: Option<u64>) -> Self {
155 Self::Initializing {
156 durable_token: None,
157 current_deploy_generation,
158 }
159 }
160
161 fn validate(&self) -> Result<Option<FenceToken>, FenceError> {
163 match self {
164 FenceableToken::Initializing { durable_token, .. } => Ok(durable_token.clone()),
165 FenceableToken::Unfenced { current_token, .. } => Ok(Some(current_token.clone())),
166 FenceableToken::Fenced {
167 current_token,
168 fence_token,
169 } => {
170 assert!(
171 fence_token > current_token,
172 "must be fenced by higher token; current={current_token:?}, fence={fence_token:?}"
173 );
174 if fence_token.deploy_generation > current_token.deploy_generation {
175 Err(FenceError::DeployGeneration {
176 current_generation: current_token.deploy_generation,
177 fence_generation: fence_token.deploy_generation,
178 })
179 } else {
180 assert!(
181 fence_token.epoch > current_token.epoch,
182 "must be fenced by higher token; current={current_token:?}, fence={fence_token:?}"
183 );
184 Err(FenceError::Epoch {
185 current_epoch: current_token.epoch,
186 fence_epoch: fence_token.epoch,
187 })
188 }
189 }
190 }
191 }
192
193 fn token(&self) -> Option<FenceToken> {
195 match self {
196 FenceableToken::Initializing { durable_token, .. } => durable_token.clone(),
197 FenceableToken::Unfenced { current_token, .. } => Some(current_token.clone()),
198 FenceableToken::Fenced { current_token, .. } => Some(current_token.clone()),
199 }
200 }
201
202 fn maybe_fence(&mut self, token: FenceToken) -> Result<(), FenceError> {
204 match self {
205 FenceableToken::Initializing {
206 durable_token,
207 current_deploy_generation,
208 ..
209 } => {
210 match durable_token {
211 Some(durable_token) => {
212 *durable_token = max(durable_token.clone(), token.clone());
213 }
214 None => {
215 *durable_token = Some(token.clone());
216 }
217 }
218 if let Some(current_deploy_generation) = current_deploy_generation {
219 if *current_deploy_generation < token.deploy_generation {
220 *self = FenceableToken::Fenced {
221 current_token: FenceToken {
222 deploy_generation: *current_deploy_generation,
223 epoch: token.epoch,
224 },
225 fence_token: token,
226 };
227 self.validate()?;
228 }
229 }
230 }
231 FenceableToken::Unfenced { current_token } => {
232 if *current_token < token {
233 *self = FenceableToken::Fenced {
234 current_token: current_token.clone(),
235 fence_token: token,
236 };
237 self.validate()?;
238 }
239 }
240 FenceableToken::Fenced { .. } => {
241 self.validate()?;
242 }
243 }
244
245 Ok(())
246 }
247
248 fn generate_unfenced_token(
252 &self,
253 mode: Mode,
254 ) -> Result<Option<(Vec<(StateUpdateKind, Diff)>, FenceableToken)>, DurableCatalogError> {
255 let (durable_token, current_deploy_generation) = match self {
256 FenceableToken::Initializing {
257 durable_token,
258 current_deploy_generation,
259 } => (durable_token.clone(), current_deploy_generation.clone()),
260 FenceableToken::Unfenced { .. } | FenceableToken::Fenced { .. } => return Ok(None),
261 };
262
263 let mut fence_updates = Vec::with_capacity(2);
264
265 if let Some(durable_token) = &durable_token {
266 fence_updates.push((
267 StateUpdateKind::FenceToken(durable_token.clone()),
268 Diff::MINUS_ONE,
269 ));
270 }
271
272 let current_deploy_generation = current_deploy_generation
273 .or_else(|| durable_token.as_ref().map(|token| token.deploy_generation))
274 .ok_or(DurableCatalogError::Uninitialized)?;
276 let mut current_epoch = durable_token
277 .map(|token| token.epoch)
278 .unwrap_or(MIN_EPOCH)
279 .get();
280 if matches!(mode, Mode::Writable) {
282 current_epoch = current_epoch + 1;
283 }
284 let current_epoch = Epoch::new(current_epoch).expect("known to be non-zero");
285 let current_token = FenceToken {
286 deploy_generation: current_deploy_generation,
287 epoch: current_epoch,
288 };
289
290 fence_updates.push((
291 StateUpdateKind::FenceToken(current_token.clone()),
292 Diff::ONE,
293 ));
294
295 let current_fenceable_token = FenceableToken::Unfenced { current_token };
296
297 Ok(Some((fence_updates, current_fenceable_token)))
298 }
299}
300
301#[derive(Debug, thiserror::Error)]
303pub(crate) enum CompareAndAppendError {
304 #[error(transparent)]
305 Fence(#[from] FenceError),
306 #[error(
309 "expected catalog upper {expected_upper:?} did not match actual catalog upper {actual_upper:?}"
310 )]
311 UpperMismatch {
312 expected_upper: Timestamp,
313 actual_upper: Timestamp,
314 },
315}
316
317impl CompareAndAppendError {
318 pub(crate) fn unwrap_fence_error(self) -> FenceError {
319 match self {
320 CompareAndAppendError::Fence(e) => e,
321 e @ CompareAndAppendError::UpperMismatch { .. } => {
322 panic!("unexpected upper mismatch: {e:?}")
323 }
324 }
325 }
326}
327
328impl From<UpperMismatch<Timestamp>> for CompareAndAppendError {
329 fn from(upper_mismatch: UpperMismatch<Timestamp>) -> Self {
330 Self::UpperMismatch {
331 expected_upper: antichain_to_timestamp(upper_mismatch.expected),
332 actual_upper: antichain_to_timestamp(upper_mismatch.current),
333 }
334 }
335}
336
337pub(crate) trait ApplyUpdate<T: IntoStateUpdateKindJson> {
338 fn apply_update(
342 &mut self,
343 update: StateUpdate<T>,
344 current_fence_token: &mut FenceableToken,
345 metrics: &Arc<Metrics>,
346 ) -> Result<Option<StateUpdate<T>>, FenceError>;
347}
348
349#[derive(Debug)]
363pub(crate) struct PersistHandle<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> {
364 pub(crate) mode: Mode,
366 since_handle: SinceHandle<SourceData, (), Timestamp, StorageDiff, i64>,
368 write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
370 listen: Listen<SourceData, (), Timestamp, StorageDiff>,
372 persist_client: PersistClient,
374 shard_id: ShardId,
376 pub(crate) snapshot: Vec<(T, Timestamp, Diff)>,
380 update_applier: U,
382 pub(crate) upper: Timestamp,
384 fenceable_token: FenceableToken,
386 catalog_content_version: semver::Version,
388 bootstrap_complete: bool,
390 metrics: Arc<Metrics>,
392}
393
394impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
395 async fn increment_catalog_upgrade_shard_version(&self, organization_id: Uuid) {
397 let upgrade_shard_id = shard_id(organization_id, UPGRADE_SEED);
398
399 let () = self
400 .persist_client
401 .upgrade_version::<(), (), Timestamp, StorageDiff>(
402 upgrade_shard_id,
403 Diagnostics {
404 shard_name: UPGRADE_SHARD_NAME.to_string(),
405 handle_purpose: "durable catalog state upgrade".to_string(),
406 },
407 )
408 .await
409 .expect("invalid usage");
410 }
411
412 #[mz_ore::instrument]
414 async fn current_upper(&mut self) -> Timestamp {
415 match self.mode {
416 Mode::Writable | Mode::Readonly => {
417 let upper = self.write_handle.fetch_recent_upper().await;
418 antichain_to_timestamp(upper.clone())
419 }
420 Mode::Savepoint => self.upper,
421 }
422 }
423
424 #[mz_ore::instrument]
428 pub(crate) async fn compare_and_append<S: IntoStateUpdateKindJson>(
429 &mut self,
430 updates: Vec<(S, Diff)>,
431 commit_ts: Timestamp,
432 ) -> Result<Timestamp, CompareAndAppendError> {
433 assert_eq!(self.mode, Mode::Writable);
434 assert!(
435 commit_ts >= self.upper,
436 "expected commit ts, {}, to be greater than or equal to upper, {}",
437 commit_ts,
438 self.upper
439 );
440
441 let contains_fence = if mz_ore::assert::soft_assertions_enabled() {
444 let updates: Vec<_> = updates.clone();
445 let parsed_updates: Vec<_> = updates
446 .clone()
447 .into_iter()
448 .map(|(update, diff)| {
449 let update: StateUpdateKindJson = update.into();
450 (update, diff)
451 })
452 .filter_map(|(update, diff)| {
453 <StateUpdateKindJson as TryIntoStateUpdateKind>::try_into(update)
454 .ok()
455 .map(|update| (update, diff))
456 })
457 .collect();
458 let contains_retraction = parsed_updates.iter().any(|(update, diff)| {
459 matches!(update, StateUpdateKind::FenceToken(..)) && *diff == Diff::MINUS_ONE
460 });
461 let contains_addition = parsed_updates.iter().any(|(update, diff)| {
462 matches!(update, StateUpdateKind::FenceToken(..)) && *diff == Diff::ONE
463 });
464 let contains_fence = contains_retraction && contains_addition;
465 Some((contains_fence, updates))
466 } else {
467 None
468 };
469
470 let updates = updates.into_iter().map(|(kind, diff)| {
471 let kind: StateUpdateKindJson = kind.into();
472 (
473 (Into::<SourceData>::into(kind), ()),
474 commit_ts,
475 diff.into_inner(),
476 )
477 });
478 let next_upper = commit_ts.step_forward();
479 let res = self
480 .write_handle
481 .compare_and_append(
482 updates,
483 Antichain::from_elem(self.upper),
484 Antichain::from_elem(next_upper),
485 )
486 .await
487 .expect("invalid usage");
488
489 if let Err(e @ UpperMismatch { .. }) = res {
494 self.sync_to_current_upper().await?;
495 if let Some((contains_fence, updates)) = contains_fence {
496 assert!(
497 contains_fence,
498 "updates were neither fenced nor fencing and encountered an upper mismatch: {updates:#?}"
499 )
500 }
501 return Err(e.into());
502 }
503
504 let downgrade_to = Antichain::from_elem(next_upper.saturating_sub(1));
506
507 let opaque = *self.since_handle.opaque();
512 let downgrade = self
513 .since_handle
514 .maybe_compare_and_downgrade_since(&opaque, (&opaque, &downgrade_to))
515 .await;
516
517 match downgrade {
518 None => {}
519 Some(Err(e)) => soft_panic_or_log!("found opaque value {e}, but expected {opaque}"),
520 Some(Ok(updated)) => soft_assert_or_log!(
521 updated == downgrade_to,
522 "updated bound should match expected"
523 ),
524 }
525 self.sync(next_upper).await?;
526 Ok(next_upper)
527 }
528
529 #[mz_ore::instrument]
532 async fn snapshot_unconsolidated(&mut self) -> Vec<StateUpdate<StateUpdateKind>> {
533 let current_upper = self.current_upper().await;
534
535 let mut snapshot = Vec::new();
536 let mut read_handle = self.read_handle().await;
537 let as_of = as_of(&read_handle, current_upper);
538 let mut stream = Box::pin(
539 read_handle
541 .snapshot_and_stream(Antichain::from_elem(as_of))
542 .await
543 .expect("we have advanced the restart_as_of by the since"),
544 );
545 while let Some(update) = stream.next().await {
546 snapshot.push(update)
547 }
548 read_handle.expire().await;
549 snapshot
550 .into_iter()
551 .map(Into::<StateUpdate<StateUpdateKindJson>>::into)
552 .map(|state_update| state_update.try_into().expect("kind decoding error"))
553 .collect()
554 }
555
556 #[mz_ore::instrument]
560 pub(crate) async fn sync_to_current_upper(&mut self) -> Result<(), FenceError> {
561 let upper = self.current_upper().await;
562 self.sync(upper).await
563 }
564
565 #[mz_ore::instrument(level = "debug")]
569 pub(crate) async fn sync(&mut self, target_upper: Timestamp) -> Result<(), FenceError> {
570 self.metrics.syncs.inc();
571 let counter = self.metrics.sync_latency_seconds.clone();
572 self.sync_inner(target_upper)
573 .wall_time()
574 .inc_by(counter)
575 .await
576 }
577
578 #[mz_ore::instrument(level = "debug")]
579 async fn sync_inner(&mut self, target_upper: Timestamp) -> Result<(), FenceError> {
580 self.fenceable_token.validate()?;
581
582 if self.mode == Mode::Savepoint {
585 self.upper = max(self.upper, target_upper);
586 return Ok(());
587 }
588
589 let mut updates: BTreeMap<_, Vec<_>> = BTreeMap::new();
590
591 while self.upper < target_upper {
592 let listen_events = self.listen.fetch_next().await;
593 for listen_event in listen_events {
594 match listen_event {
595 ListenEvent::Progress(upper) => {
596 debug!("synced up to {upper:?}");
597 self.upper = antichain_to_timestamp(upper);
598 while let Some((ts, updates)) = updates.pop_first() {
603 assert!(ts < self.upper, "expected {} < {}", ts, self.upper);
604 let updates = updates.into_iter().map(
605 |update: StateUpdate<StateUpdateKindJson>| {
606 let kind =
607 T::try_from(update.kind).expect("kind decoding error");
608 StateUpdate {
609 kind,
610 ts: update.ts,
611 diff: update.diff,
612 }
613 },
614 );
615 self.apply_updates(updates)?;
616 }
617 }
618 ListenEvent::Updates(batch_updates) => {
619 for update in batch_updates {
620 let update: StateUpdate<StateUpdateKindJson> = update.into();
621 updates.entry(update.ts).or_default().push(update);
622 }
623 }
624 }
625 }
626 }
627 assert_eq!(updates, BTreeMap::new(), "all updates should be applied");
628 Ok(())
629 }
630
631 #[mz_ore::instrument(level = "debug")]
632 pub(crate) fn apply_updates(
633 &mut self,
634 updates: impl IntoIterator<Item = StateUpdate<T>>,
635 ) -> Result<(), FenceError> {
636 let mut updates: Vec<_> = updates
637 .into_iter()
638 .map(|StateUpdate { kind, ts, diff }| (kind, ts, diff))
639 .collect();
640
641 differential_dataflow::consolidation::consolidate_updates(&mut updates);
645
646 updates.sort_by(|(_, ts1, diff1), (_, ts2, diff2)| ts1.cmp(ts2).then(diff1.cmp(diff2)));
649
650 let mut errors = Vec::new();
651
652 for (kind, ts, diff) in updates {
653 if diff != Diff::ONE && diff != Diff::MINUS_ONE {
654 panic!("invalid update in consolidated trace: ({kind:?}, {ts:?}, {diff:?})");
655 }
656
657 match self.update_applier.apply_update(
658 StateUpdate { kind, ts, diff },
659 &mut self.fenceable_token,
660 &self.metrics,
661 ) {
662 Ok(Some(StateUpdate { kind, ts, diff })) => self.snapshot.push((kind, ts, diff)),
663 Ok(None) => {}
664 Err(err) => errors.push(err),
667 }
668 }
669
670 errors.sort();
671 if let Some(err) = errors.into_iter().next() {
672 return Err(err);
673 }
674
675 self.consolidate();
676
677 Ok(())
678 }
679
680 #[mz_ore::instrument]
681 pub(crate) fn consolidate(&mut self) {
682 soft_assert_no_log!(
683 self.snapshot
684 .windows(2)
685 .all(|updates| updates[0].1 <= updates[1].1),
686 "snapshot should be sorted by timestamp, {:#?}",
687 self.snapshot
688 );
689
690 let new_ts = self
691 .snapshot
692 .last()
693 .map(|(_, ts, _)| *ts)
694 .unwrap_or_else(Timestamp::minimum);
695 for (_, ts, _) in &mut self.snapshot {
696 *ts = new_ts;
697 }
698 differential_dataflow::consolidation::consolidate_updates(&mut self.snapshot);
699 }
700
701 async fn with_trace<R>(
705 &mut self,
706 f: impl FnOnce(&Vec<(T, Timestamp, Diff)>) -> Result<R, CatalogError>,
707 ) -> Result<R, CatalogError> {
708 self.sync_to_current_upper().await?;
709 f(&self.snapshot)
710 }
711
712 async fn read_handle(&self) -> ReadHandle<SourceData, (), Timestamp, StorageDiff> {
714 self.persist_client
715 .open_leased_reader(
716 self.shard_id,
717 Arc::new(desc()),
718 Arc::new(UnitSchema::default()),
719 Diagnostics {
720 shard_name: CATALOG_SHARD_NAME.to_string(),
721 handle_purpose: "openable durable catalog state temporary reader".to_string(),
722 },
723 USE_CRITICAL_SINCE_CATALOG.get(self.persist_client.dyncfgs()),
724 )
725 .await
726 .expect("invalid usage")
727 }
728
729 async fn expire(self: Box<Self>) {
731 self.write_handle.expire().await;
732 self.listen.expire().await;
733 }
734}
735
736impl<U: ApplyUpdate<StateUpdateKind>> PersistHandle<StateUpdateKind, U> {
737 async fn with_snapshot<T>(
741 &mut self,
742 f: impl FnOnce(Snapshot) -> Result<T, CatalogError>,
743 ) -> Result<T, CatalogError> {
744 fn apply<K, V>(map: &mut BTreeMap<K, V>, key: &K, value: &V, diff: Diff)
745 where
746 K: Ord + Clone,
747 V: Ord + Clone + Debug,
748 {
749 let key = key.clone();
750 let value = value.clone();
751 if diff == Diff::ONE {
752 let prev = map.insert(key, value);
753 assert_eq!(
754 prev, None,
755 "values must be explicitly retracted before inserting a new value"
756 );
757 } else if diff == Diff::MINUS_ONE {
758 let prev = map.remove(&key);
759 assert_eq!(
760 prev,
761 Some(value),
762 "retraction does not match existing value"
763 );
764 }
765 }
766
767 self.with_trace(|trace| {
768 let mut snapshot = Snapshot::empty();
769 for (kind, ts, diff) in trace {
770 let diff = *diff;
771 if diff != Diff::ONE && diff != Diff::MINUS_ONE {
772 panic!("invalid update in consolidated trace: ({kind:?}, {ts:?}, {diff:?})");
773 }
774
775 match kind {
776 StateUpdateKind::AuditLog(_key, ()) => {
777 }
779 StateUpdateKind::Cluster(key, value) => {
780 apply(&mut snapshot.clusters, key, value, diff);
781 }
782 StateUpdateKind::ClusterReplica(key, value) => {
783 apply(&mut snapshot.cluster_replicas, key, value, diff);
784 }
785 StateUpdateKind::Comment(key, value) => {
786 apply(&mut snapshot.comments, key, value, diff);
787 }
788 StateUpdateKind::Config(key, value) => {
789 apply(&mut snapshot.configs, key, value, diff);
790 }
791 StateUpdateKind::Database(key, value) => {
792 apply(&mut snapshot.databases, key, value, diff);
793 }
794 StateUpdateKind::DefaultPrivilege(key, value) => {
795 apply(&mut snapshot.default_privileges, key, value, diff);
796 }
797 StateUpdateKind::FenceToken(_token) => {
798 }
800 StateUpdateKind::IdAllocator(key, value) => {
801 apply(&mut snapshot.id_allocator, key, value, diff);
802 }
803 StateUpdateKind::IntrospectionSourceIndex(key, value) => {
804 apply(&mut snapshot.introspection_sources, key, value, diff);
805 }
806 StateUpdateKind::Item(key, value) => {
807 apply(&mut snapshot.items, key, value, diff);
808 }
809 StateUpdateKind::NetworkPolicy(key, value) => {
810 apply(&mut snapshot.network_policies, key, value, diff);
811 }
812 StateUpdateKind::Role(key, value) => {
813 apply(&mut snapshot.roles, key, value, diff);
814 }
815 StateUpdateKind::Schema(key, value) => {
816 apply(&mut snapshot.schemas, key, value, diff);
817 }
818 StateUpdateKind::Setting(key, value) => {
819 apply(&mut snapshot.settings, key, value, diff);
820 }
821 StateUpdateKind::SourceReferences(key, value) => {
822 apply(&mut snapshot.source_references, key, value, diff);
823 }
824 StateUpdateKind::SystemConfiguration(key, value) => {
825 apply(&mut snapshot.system_configurations, key, value, diff);
826 }
827 StateUpdateKind::SystemObjectMapping(key, value) => {
828 apply(&mut snapshot.system_object_mappings, key, value, diff);
829 }
830 StateUpdateKind::SystemPrivilege(key, value) => {
831 apply(&mut snapshot.system_privileges, key, value, diff);
832 }
833 StateUpdateKind::StorageCollectionMetadata(key, value) => {
834 apply(&mut snapshot.storage_collection_metadata, key, value, diff);
835 }
836 StateUpdateKind::UnfinalizedShard(key, ()) => {
837 apply(&mut snapshot.unfinalized_shards, key, &(), diff);
838 }
839 StateUpdateKind::TxnWalShard((), value) => {
840 apply(&mut snapshot.txn_wal_shard, &(), value, diff);
841 }
842 StateUpdateKind::RoleAuth(key, value) => {
843 apply(&mut snapshot.role_auth, key, value, diff);
844 }
845 }
846 }
847 f(snapshot)
848 })
849 .await
850 }
851
852 #[mz_ore::instrument(level = "debug")]
859 async fn persist_snapshot(&self) -> impl Iterator<Item = StateUpdate> + DoubleEndedIterator {
860 let mut read_handle = self.read_handle().await;
861 let as_of = as_of(&read_handle, self.upper);
862 let snapshot = snapshot_binary(&mut read_handle, as_of, &self.metrics)
863 .await
864 .map(|update| update.try_into().expect("kind decoding error"));
865 read_handle.expire().await;
866 snapshot
867 }
868}
869
870#[derive(Debug)]
872pub(crate) struct UnopenedCatalogStateInner {
873 organization_id: Uuid,
875 configs: BTreeMap<String, u64>,
877 settings: BTreeMap<String, String>,
879}
880
881impl UnopenedCatalogStateInner {
882 fn new(organization_id: Uuid) -> UnopenedCatalogStateInner {
883 UnopenedCatalogStateInner {
884 organization_id,
885 configs: BTreeMap::new(),
886 settings: BTreeMap::new(),
887 }
888 }
889}
890
891impl ApplyUpdate<StateUpdateKindJson> for UnopenedCatalogStateInner {
892 fn apply_update(
893 &mut self,
894 update: StateUpdate<StateUpdateKindJson>,
895 current_fence_token: &mut FenceableToken,
896 _metrics: &Arc<Metrics>,
897 ) -> Result<Option<StateUpdate<StateUpdateKindJson>>, FenceError> {
898 if !update.kind.is_audit_log() && update.kind.is_always_deserializable() {
899 let kind = TryInto::try_into(&update.kind).expect("kind is known to be deserializable");
900 match (kind, update.diff) {
901 (StateUpdateKind::Config(key, value), Diff::ONE) => {
902 let prev = self.configs.insert(key.key, value.value);
903 assert_eq!(
904 prev, None,
905 "values must be explicitly retracted before inserting a new value"
906 );
907 }
908 (StateUpdateKind::Config(key, value), Diff::MINUS_ONE) => {
909 let prev = self.configs.remove(&key.key);
910 assert_eq!(
911 prev,
912 Some(value.value),
913 "retraction does not match existing value"
914 );
915 }
916 (StateUpdateKind::Setting(key, value), Diff::ONE) => {
917 let prev = self.settings.insert(key.name, value.value);
918 assert_eq!(
919 prev, None,
920 "values must be explicitly retracted before inserting a new value"
921 );
922 }
923 (StateUpdateKind::Setting(key, value), Diff::MINUS_ONE) => {
924 let prev = self.settings.remove(&key.name);
925 assert_eq!(
926 prev,
927 Some(value.value),
928 "retraction does not match existing value"
929 );
930 }
931 (StateUpdateKind::FenceToken(fence_token), Diff::ONE) => {
932 current_fence_token.maybe_fence(fence_token)?;
933 }
934 _ => {}
935 }
936 }
937
938 Ok(Some(update))
939 }
940}
941
942pub(crate) type UnopenedPersistCatalogState =
950 PersistHandle<StateUpdateKindJson, UnopenedCatalogStateInner>;
951
952impl UnopenedPersistCatalogState {
953 #[mz_ore::instrument]
959 pub(crate) async fn new(
960 persist_client: PersistClient,
961 organization_id: Uuid,
962 version: semver::Version,
963 deploy_generation: Option<u64>,
964 metrics: Arc<Metrics>,
965 ) -> Result<UnopenedPersistCatalogState, DurableCatalogError> {
966 let catalog_shard_id = shard_id(organization_id, CATALOG_SEED);
967 let upgrade_shard_id = shard_id(organization_id, UPGRADE_SEED);
968 debug!(
969 ?catalog_shard_id,
970 ?upgrade_shard_id,
971 "new persist backed catalog state"
972 );
973
974 let version_in_upgrade_shard =
976 fetch_catalog_upgrade_shard_version(&persist_client, upgrade_shard_id).await;
977 if let Some(version_in_upgrade_shard) = version_in_upgrade_shard {
980 if !mz_persist_client::cfg::code_can_write_data(&version, &version_in_upgrade_shard) {
986 return Err(DurableCatalogError::IncompatiblePersistVersion {
987 found_version: version_in_upgrade_shard,
988 catalog_version: version,
989 });
990 }
991 }
992
993 let open_handles_start = Instant::now();
994 info!("startup: envd serve: catalog init: open handles beginning");
995 let since_handle = persist_client
996 .open_critical_since(
997 catalog_shard_id,
998 PersistClient::CONTROLLER_CRITICAL_SINCE,
1001 Diagnostics {
1002 shard_name: CATALOG_SHARD_NAME.to_string(),
1003 handle_purpose: "durable catalog state critical since".to_string(),
1004 },
1005 )
1006 .await
1007 .expect("invalid usage");
1008 let (mut write_handle, mut read_handle) = persist_client
1009 .open(
1010 catalog_shard_id,
1011 Arc::new(desc()),
1012 Arc::new(UnitSchema::default()),
1013 Diagnostics {
1014 shard_name: CATALOG_SHARD_NAME.to_string(),
1015 handle_purpose: "durable catalog state handles".to_string(),
1016 },
1017 USE_CRITICAL_SINCE_CATALOG.get(persist_client.dyncfgs()),
1018 )
1019 .await
1020 .expect("invalid usage");
1021 info!(
1022 "startup: envd serve: catalog init: open handles complete in {:?}",
1023 open_handles_start.elapsed()
1024 );
1025
1026 let upper = {
1028 const EMPTY_UPDATES: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
1029 let upper = Antichain::from_elem(Timestamp::minimum());
1030 let next_upper = Timestamp::minimum().step_forward();
1031 match write_handle
1032 .compare_and_append(EMPTY_UPDATES, upper, Antichain::from_elem(next_upper))
1033 .await
1034 .expect("invalid usage")
1035 {
1036 Ok(()) => next_upper,
1037 Err(mismatch) => antichain_to_timestamp(mismatch.current),
1038 }
1039 };
1040
1041 let snapshot_start = Instant::now();
1042 info!("startup: envd serve: catalog init: snapshot beginning");
1043 let as_of = as_of(&read_handle, upper);
1044 let snapshot: Vec<_> = snapshot_binary(&mut read_handle, as_of, &metrics)
1045 .await
1046 .map(|StateUpdate { kind, ts, diff }| (kind, ts, diff))
1047 .collect();
1048 let listen = read_handle
1049 .listen(Antichain::from_elem(as_of))
1050 .await
1051 .expect("invalid usage");
1052 info!(
1053 "startup: envd serve: catalog init: snapshot complete in {:?}",
1054 snapshot_start.elapsed()
1055 );
1056
1057 let mut handle = UnopenedPersistCatalogState {
1058 mode: Mode::Writable,
1060 since_handle,
1061 write_handle,
1062 listen,
1063 persist_client,
1064 shard_id: catalog_shard_id,
1065 snapshot: Vec::new(),
1067 update_applier: UnopenedCatalogStateInner::new(organization_id),
1068 upper,
1069 fenceable_token: FenceableToken::new(deploy_generation),
1070 catalog_content_version: version,
1071 bootstrap_complete: false,
1072 metrics,
1073 };
1074 soft_assert_no_log!(
1077 snapshot.iter().all(|(_, _, diff)| *diff == Diff::ONE),
1078 "snapshot should be consolidated: {snapshot:#?}"
1079 );
1080
1081 let apply_start = Instant::now();
1082 info!("startup: envd serve: catalog init: apply updates beginning");
1083 let updates = snapshot
1084 .into_iter()
1085 .map(|(kind, ts, diff)| StateUpdate { kind, ts, diff });
1086 handle.apply_updates(updates)?;
1087 info!(
1088 "startup: envd serve: catalog init: apply updates complete in {:?}",
1089 apply_start.elapsed()
1090 );
1091
1092 if let Some(found_version) = handle.get_catalog_content_version().await? {
1098 if handle
1100 .catalog_content_version
1101 .cmp_precedence(&found_version)
1102 == std::cmp::Ordering::Less
1103 {
1104 return Err(DurableCatalogError::IncompatiblePersistVersion {
1105 found_version,
1106 catalog_version: handle.catalog_content_version,
1107 });
1108 }
1109 }
1110
1111 Ok(handle)
1112 }
1113
1114 #[mz_ore::instrument]
1115 async fn open_inner(
1116 mut self,
1117 mode: Mode,
1118 initial_ts: Timestamp,
1119 bootstrap_args: &BootstrapArgs,
1120 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1121 let mut commit_ts = self.upper;
1124 self.mode = mode;
1125
1126 match (&self.mode, &self.fenceable_token) {
1128 (_, FenceableToken::Unfenced { .. } | FenceableToken::Fenced { .. }) => {
1129 return Err(DurableCatalogError::Internal(
1130 "catalog should not have fenced before opening".to_string(),
1131 )
1132 .into());
1133 }
1134 (
1135 Mode::Writable | Mode::Savepoint,
1136 FenceableToken::Initializing {
1137 current_deploy_generation: None,
1138 ..
1139 },
1140 ) => {
1141 return Err(DurableCatalogError::Internal(format!(
1142 "cannot open in mode '{:?}' without a deploy generation",
1143 self.mode,
1144 ))
1145 .into());
1146 }
1147 _ => {}
1148 }
1149
1150 let read_only = matches!(self.mode, Mode::Readonly);
1151
1152 loop {
1154 self.sync_to_current_upper().await?;
1155 commit_ts = max(commit_ts, self.upper);
1156 let (fence_updates, current_fenceable_token) = self
1157 .fenceable_token
1158 .generate_unfenced_token(self.mode)?
1159 .ok_or_else(|| {
1160 DurableCatalogError::Internal(
1161 "catalog should not have fenced before opening".to_string(),
1162 )
1163 })?;
1164 debug!(
1165 ?self.upper,
1166 ?self.fenceable_token,
1167 ?current_fenceable_token,
1168 "fencing previous catalogs"
1169 );
1170 if matches!(self.mode, Mode::Writable) {
1171 match self
1172 .compare_and_append(fence_updates.clone(), commit_ts)
1173 .await
1174 {
1175 Ok(upper) => {
1176 commit_ts = upper;
1177 }
1178 Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
1179 Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
1180 warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
1181 continue;
1182 }
1183 }
1184 }
1185 self.fenceable_token = current_fenceable_token;
1186 break;
1187 }
1188
1189 let is_initialized = self.is_initialized_inner();
1190 if !matches!(self.mode, Mode::Writable) && !is_initialized {
1191 return Err(CatalogError::Durable(DurableCatalogError::NotWritable(
1192 format!(
1193 "catalog tables do not exist; will not create in {:?} mode",
1194 self.mode
1195 ),
1196 )));
1197 }
1198 soft_assert_ne_or_log!(self.upper, Timestamp::minimum());
1199
1200 let (audit_logs, snapshot): (Vec<_>, Vec<_>) = self
1202 .snapshot
1203 .into_iter()
1204 .partition(|(update, _, _)| update.is_audit_log());
1205 self.snapshot = snapshot;
1206 let audit_log_count = audit_logs.iter().map(|(_, _, diff)| diff).sum::<Diff>();
1207 let audit_log_handle = AuditLogIterator::new(audit_logs);
1208
1209 if is_initialized && !read_only {
1211 commit_ts = upgrade(&mut self, commit_ts).await?;
1212 }
1213
1214 debug!(
1215 ?is_initialized,
1216 ?self.upper,
1217 "initializing catalog state"
1218 );
1219 let mut catalog = PersistCatalogState {
1220 mode: self.mode,
1221 since_handle: self.since_handle,
1222 write_handle: self.write_handle,
1223 listen: self.listen,
1224 persist_client: self.persist_client,
1225 shard_id: self.shard_id,
1226 upper: self.upper,
1227 fenceable_token: self.fenceable_token,
1228 snapshot: Vec::new(),
1230 update_applier: CatalogStateInner::new(),
1231 catalog_content_version: self.catalog_content_version,
1232 bootstrap_complete: false,
1233 metrics: self.metrics,
1234 };
1235 catalog.metrics.collection_entries.reset();
1236 catalog
1239 .metrics
1240 .collection_entries
1241 .with_label_values(&[&CollectionType::AuditLog.to_string()])
1242 .add(audit_log_count.into_inner());
1243 let updates = self.snapshot.into_iter().map(|(kind, ts, diff)| {
1244 let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
1245 StateUpdate { kind, ts, diff }
1246 });
1247 catalog.apply_updates(updates)?;
1248
1249 let catalog_content_version = catalog.catalog_content_version.to_string();
1250 let txn = if is_initialized {
1251 let mut txn = catalog.transaction().await?;
1252
1253 if txn.get_setting("migration_version".into()).is_none() && mode != Mode::Readonly {
1260 let old_version = txn.get_catalog_content_version();
1261 txn.set_setting("migration_version".into(), old_version.map(Into::into))?;
1262 }
1263
1264 txn.set_catalog_content_version(catalog_content_version)?;
1265 txn
1266 } else {
1267 soft_assert_eq_no_log!(
1268 catalog
1269 .snapshot
1270 .iter()
1271 .filter(|(kind, _, _)| !matches!(kind, StateUpdateKind::FenceToken(_)))
1272 .count(),
1273 0,
1274 "trace should not contain any updates for an uninitialized catalog: {:#?}",
1275 catalog.snapshot
1276 );
1277
1278 let mut txn = catalog.transaction().await?;
1279 initialize::initialize(
1280 &mut txn,
1281 bootstrap_args,
1282 initial_ts.into(),
1283 catalog_content_version,
1284 )
1285 .await?;
1286 txn
1287 };
1288
1289 if read_only {
1290 let (txn_batch, _) = txn.into_parts();
1291 let updates = StateUpdate::from_txn_batch_ts(txn_batch, catalog.upper);
1293 catalog.apply_updates(updates)?;
1294 } else {
1295 txn.commit_internal(commit_ts).await?;
1296 }
1297
1298 if matches!(catalog.mode, Mode::Writable) {
1302 catalog
1303 .increment_catalog_upgrade_shard_version(self.update_applier.organization_id)
1304 .await;
1305
1306 let write_handle = catalog
1307 .persist_client
1308 .open_writer::<SourceData, (), Timestamp, i64>(
1309 catalog.write_handle.shard_id(),
1310 Arc::new(desc()),
1311 Arc::new(UnitSchema::default()),
1312 Diagnostics {
1313 shard_name: CATALOG_SHARD_NAME.to_string(),
1314 handle_purpose: "compact catalog".to_string(),
1315 },
1316 )
1317 .await
1318 .expect("invalid usage");
1319 let fuel = CATALOG_FORCE_COMPACTION_FUEL.handle(catalog.persist_client.dyncfgs());
1320 let wait = CATALOG_FORCE_COMPACTION_WAIT.handle(catalog.persist_client.dyncfgs());
1321 let _task = mz_ore::task::spawn(|| "catalog::force_shard_compaction", async move {
1324 let () =
1325 mz_persist_client::cli::admin::dangerous_force_compaction_and_break_pushdown(
1326 &write_handle,
1327 || fuel.get(),
1328 || wait.get(),
1329 )
1330 .await;
1331 });
1332 }
1333
1334 Ok((Box::new(catalog), audit_log_handle))
1335 }
1336
1337 #[mz_ore::instrument]
1342 fn is_initialized_inner(&self) -> bool {
1343 !self.update_applier.configs.is_empty()
1344 }
1345
1346 #[mz_ore::instrument]
1350 async fn get_current_config(&mut self, key: &str) -> Result<Option<u64>, DurableCatalogError> {
1351 self.sync_to_current_upper().await?;
1352 Ok(self.update_applier.configs.get(key).cloned())
1353 }
1354
1355 #[mz_ore::instrument]
1359 pub(crate) async fn get_user_version(&mut self) -> Result<Option<u64>, DurableCatalogError> {
1360 self.get_current_config(USER_VERSION_KEY).await
1361 }
1362
1363 #[mz_ore::instrument]
1367 async fn get_current_setting(
1368 &mut self,
1369 name: &str,
1370 ) -> Result<Option<String>, DurableCatalogError> {
1371 self.sync_to_current_upper().await?;
1372 Ok(self.update_applier.settings.get(name).cloned())
1373 }
1374
1375 #[mz_ore::instrument]
1380 async fn get_catalog_content_version(
1381 &mut self,
1382 ) -> Result<Option<semver::Version>, DurableCatalogError> {
1383 let version = self
1384 .get_current_setting(CATALOG_CONTENT_VERSION_KEY)
1385 .await?;
1386 let version = version.map(|version| version.parse().expect("invalid version persisted"));
1387 Ok(version)
1388 }
1389}
1390
1391#[async_trait]
1392impl OpenableDurableCatalogState for UnopenedPersistCatalogState {
1393 #[mz_ore::instrument]
1394 async fn open_savepoint(
1395 mut self: Box<Self>,
1396 initial_ts: Timestamp,
1397 bootstrap_args: &BootstrapArgs,
1398 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1399 self.open_inner(Mode::Savepoint, initial_ts, bootstrap_args)
1400 .boxed()
1401 .await
1402 }
1403
1404 #[mz_ore::instrument]
1405 async fn open_read_only(
1406 mut self: Box<Self>,
1407 bootstrap_args: &BootstrapArgs,
1408 ) -> Result<Box<dyn DurableCatalogState>, CatalogError> {
1409 self.open_inner(Mode::Readonly, EpochMillis::MIN.into(), bootstrap_args)
1410 .boxed()
1411 .await
1412 .map(|(catalog, _)| catalog)
1413 }
1414
1415 #[mz_ore::instrument]
1416 async fn open(
1417 mut self: Box<Self>,
1418 initial_ts: Timestamp,
1419 bootstrap_args: &BootstrapArgs,
1420 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1421 self.open_inner(Mode::Writable, initial_ts, bootstrap_args)
1422 .boxed()
1423 .await
1424 }
1425
1426 #[mz_ore::instrument(level = "debug")]
1427 async fn open_debug(mut self: Box<Self>) -> Result<DebugCatalogState, CatalogError> {
1428 Ok(DebugCatalogState(*self))
1429 }
1430
1431 #[mz_ore::instrument]
1432 async fn is_initialized(&mut self) -> Result<bool, CatalogError> {
1433 self.sync_to_current_upper().await?;
1434 Ok(self.is_initialized_inner())
1435 }
1436
1437 #[mz_ore::instrument]
1438 async fn epoch(&mut self) -> Result<Epoch, CatalogError> {
1439 self.sync_to_current_upper().await?;
1440 self.fenceable_token
1441 .validate()?
1442 .map(|token| token.epoch)
1443 .ok_or(CatalogError::Durable(DurableCatalogError::Uninitialized))
1444 }
1445
1446 #[mz_ore::instrument]
1447 async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError> {
1448 self.sync_to_current_upper().await?;
1449 self.fenceable_token
1450 .token()
1451 .map(|token| token.deploy_generation)
1452 .ok_or(CatalogError::Durable(DurableCatalogError::Uninitialized))
1453 }
1454
1455 #[mz_ore::instrument(level = "debug")]
1456 async fn get_0dt_deployment_max_wait(&mut self) -> Result<Option<Duration>, CatalogError> {
1457 let value = self
1458 .get_current_config(WITH_0DT_DEPLOYMENT_MAX_WAIT)
1459 .await?;
1460 match value {
1461 None => Ok(None),
1462 Some(millis) => Ok(Some(Duration::from_millis(millis))),
1463 }
1464 }
1465
1466 #[mz_ore::instrument(level = "debug")]
1467 async fn get_0dt_deployment_ddl_check_interval(
1468 &mut self,
1469 ) -> Result<Option<Duration>, CatalogError> {
1470 let value = self
1471 .get_current_config(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL)
1472 .await?;
1473 match value {
1474 None => Ok(None),
1475 Some(millis) => Ok(Some(Duration::from_millis(millis))),
1476 }
1477 }
1478
1479 #[mz_ore::instrument(level = "debug")]
1480 async fn get_enable_0dt_deployment_panic_after_timeout(
1481 &mut self,
1482 ) -> Result<Option<bool>, CatalogError> {
1483 let value = self
1484 .get_current_config(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT)
1485 .await?;
1486 match value {
1487 None => Ok(None),
1488 Some(0) => Ok(Some(false)),
1489 Some(1) => Ok(Some(true)),
1490 Some(v) => Err(
1491 DurableCatalogError::from(TryFromProtoError::UnknownEnumVariant(format!(
1492 "{v} is not a valid boolean value"
1493 )))
1494 .into(),
1495 ),
1496 }
1497 }
1498
1499 #[mz_ore::instrument]
1500 async fn has_system_config_synced_once(&mut self) -> Result<bool, DurableCatalogError> {
1501 self.get_current_config(SYSTEM_CONFIG_SYNCED_KEY)
1502 .await
1503 .map(|value| value.map(|value| value > 0).unwrap_or(false))
1504 }
1505
1506 #[mz_ore::instrument]
1507 async fn trace_unconsolidated(&mut self) -> Result<Trace, CatalogError> {
1508 self.sync_to_current_upper().await?;
1509 if self.is_initialized_inner() {
1510 let snapshot = self.snapshot_unconsolidated().await;
1511 Ok(Trace::from_snapshot(snapshot))
1512 } else {
1513 Err(CatalogError::Durable(DurableCatalogError::Uninitialized))
1514 }
1515 }
1516
1517 #[mz_ore::instrument]
1518 async fn trace_consolidated(&mut self) -> Result<Trace, CatalogError> {
1519 self.sync_to_current_upper().await?;
1520 if self.is_initialized_inner() {
1521 let snapshot = self.current_snapshot().await?;
1522 Ok(Trace::from_snapshot(snapshot))
1523 } else {
1524 Err(CatalogError::Durable(DurableCatalogError::Uninitialized))
1525 }
1526 }
1527
1528 #[mz_ore::instrument(level = "debug")]
1529 async fn expire(self: Box<Self>) {
1530 self.expire().await
1531 }
1532}
1533
1534#[derive(Debug)]
1536struct CatalogStateInner {
1537 updates: VecDeque<memory::objects::StateUpdate>,
1539}
1540
1541impl CatalogStateInner {
1542 fn new() -> CatalogStateInner {
1543 CatalogStateInner {
1544 updates: VecDeque::new(),
1545 }
1546 }
1547}
1548
1549impl ApplyUpdate<StateUpdateKind> for CatalogStateInner {
1550 fn apply_update(
1551 &mut self,
1552 update: StateUpdate<StateUpdateKind>,
1553 current_fence_token: &mut FenceableToken,
1554 metrics: &Arc<Metrics>,
1555 ) -> Result<Option<StateUpdate<StateUpdateKind>>, FenceError> {
1556 if let Some(collection_type) = update.kind.collection_type() {
1557 metrics
1558 .collection_entries
1559 .with_label_values(&[&collection_type.to_string()])
1560 .add(update.diff.into_inner());
1561 }
1562
1563 {
1564 let update: Option<memory::objects::StateUpdate> = (&update)
1565 .try_into()
1566 .expect("invalid persisted update: {update:#?}");
1567 if let Some(update) = update {
1568 self.updates.push_back(update);
1569 }
1570 }
1571
1572 match (update.kind, update.diff) {
1573 (StateUpdateKind::AuditLog(_, ()), _) => Ok(None),
1574 (StateUpdateKind::FenceToken(_), Diff::MINUS_ONE) => Ok(None),
1576 (StateUpdateKind::FenceToken(token), Diff::ONE) => {
1577 current_fence_token.maybe_fence(token)?;
1578 Ok(None)
1579 }
1580 (kind, diff) => Ok(Some(StateUpdate {
1581 kind,
1582 ts: update.ts,
1583 diff,
1584 })),
1585 }
1586 }
1587}
1588
1589type PersistCatalogState = PersistHandle<StateUpdateKind, CatalogStateInner>;
1595
1596#[async_trait]
1597impl ReadOnlyDurableCatalogState for PersistCatalogState {
1598 fn epoch(&self) -> Epoch {
1599 self.fenceable_token
1600 .token()
1601 .expect("opened catalog state must have an epoch")
1602 .epoch
1603 }
1604
1605 fn metrics(&self) -> &Metrics {
1606 &self.metrics
1607 }
1608
1609 #[mz_ore::instrument(level = "debug")]
1610 async fn expire(self: Box<Self>) {
1611 self.expire().await
1612 }
1613
1614 fn is_bootstrap_complete(&self) -> bool {
1615 self.bootstrap_complete
1616 }
1617
1618 async fn get_audit_logs(&mut self) -> Result<Vec<VersionedEvent>, CatalogError> {
1619 self.sync_to_current_upper().await?;
1620 let audit_logs: Vec<_> = self
1621 .persist_snapshot()
1622 .await
1623 .filter_map(
1624 |StateUpdate {
1625 kind,
1626 ts: _,
1627 diff: _,
1628 }| match kind {
1629 StateUpdateKind::AuditLog(key, ()) => Some(key),
1630 _ => None,
1631 },
1632 )
1633 .collect();
1634 let mut audit_logs: Vec<_> = audit_logs
1635 .into_iter()
1636 .map(RustType::from_proto)
1637 .map_ok(|key: AuditLogKey| key.event)
1638 .collect::<Result<_, _>>()?;
1639 audit_logs.sort_by(|a, b| a.sortable_id().cmp(&b.sortable_id()));
1640 Ok(audit_logs)
1641 }
1642
1643 #[mz_ore::instrument(level = "debug")]
1644 async fn get_next_id(&mut self, id_type: &str) -> Result<u64, CatalogError> {
1645 self.with_trace(|trace| {
1646 Ok(trace
1647 .into_iter()
1648 .rev()
1649 .filter_map(|(kind, _, _)| match kind {
1650 StateUpdateKind::IdAllocator(key, value) if key.name == id_type => {
1651 Some(value.next_id)
1652 }
1653 _ => None,
1654 })
1655 .next()
1656 .expect("must exist"))
1657 })
1658 .await
1659 }
1660
1661 #[mz_ore::instrument(level = "debug")]
1662 async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError> {
1663 self.sync_to_current_upper().await?;
1664 Ok(self
1665 .fenceable_token
1666 .token()
1667 .expect("opened catalogs must have a token")
1668 .deploy_generation)
1669 }
1670
1671 #[mz_ore::instrument(level = "debug")]
1672 async fn snapshot(&mut self) -> Result<Snapshot, CatalogError> {
1673 self.with_snapshot(Ok).await
1674 }
1675
1676 #[mz_ore::instrument(level = "debug")]
1677 async fn sync_to_current_updates(
1678 &mut self,
1679 ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError> {
1680 let upper = self.current_upper().await;
1681 self.sync_updates(upper).await
1682 }
1683
1684 #[mz_ore::instrument(level = "debug")]
1685 async fn sync_updates(
1686 &mut self,
1687 target_upper: mz_repr::Timestamp,
1688 ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError> {
1689 self.sync(target_upper).await?;
1690 let mut updates = Vec::new();
1691 while let Some(update) = self.update_applier.updates.front() {
1692 if update.ts >= target_upper {
1693 break;
1694 }
1695
1696 let update = self
1697 .update_applier
1698 .updates
1699 .pop_front()
1700 .expect("peeked above");
1701 updates.push(update);
1702 }
1703 Ok(updates)
1704 }
1705
1706 async fn current_upper(&mut self) -> Timestamp {
1707 self.current_upper().await
1708 }
1709}
1710
1711#[async_trait]
1712#[allow(mismatched_lifetime_syntaxes)]
1713impl DurableCatalogState for PersistCatalogState {
1714 fn is_read_only(&self) -> bool {
1715 matches!(self.mode, Mode::Readonly)
1716 }
1717
1718 fn is_savepoint(&self) -> bool {
1719 matches!(self.mode, Mode::Savepoint)
1720 }
1721
1722 async fn mark_bootstrap_complete(&mut self) {
1723 self.bootstrap_complete = true;
1724 if matches!(self.mode, Mode::Writable) {
1725 self.since_handle
1726 .upgrade_version()
1727 .await
1728 .expect("invalid usage")
1729 }
1730 }
1731
1732 #[mz_ore::instrument(level = "debug")]
1733 async fn transaction(&mut self) -> Result<Transaction, CatalogError> {
1734 self.metrics.transactions_started.inc();
1735 let snapshot = self.snapshot().await?;
1736 let commit_ts = self.upper.clone();
1737 Transaction::new(self, snapshot, commit_ts)
1738 }
1739
1740 #[mz_ore::instrument(level = "debug")]
1741 async fn commit_transaction(
1742 &mut self,
1743 txn_batch: TransactionBatch,
1744 commit_ts: Timestamp,
1745 ) -> Result<Timestamp, CatalogError> {
1746 async fn commit_transaction_inner(
1747 catalog: &mut PersistCatalogState,
1748 txn_batch: TransactionBatch,
1749 commit_ts: Timestamp,
1750 ) -> Result<Timestamp, CatalogError> {
1751 assert_eq!(
1756 catalog.upper, txn_batch.upper,
1757 "only one transaction at a time is supported"
1758 );
1759
1760 assert!(
1761 commit_ts >= catalog.upper,
1762 "expected commit ts, {}, to be greater than or equal to upper, {}",
1763 commit_ts,
1764 catalog.upper
1765 );
1766
1767 let updates = StateUpdate::from_txn_batch(txn_batch).collect();
1768 debug!("committing updates: {updates:?}");
1769
1770 let next_upper = match catalog.mode {
1771 Mode::Writable => catalog
1772 .compare_and_append(updates, commit_ts)
1773 .await
1774 .map_err(|e| e.unwrap_fence_error())?,
1775 Mode::Savepoint => {
1776 let updates = updates.into_iter().map(|(kind, diff)| StateUpdate {
1777 kind,
1778 ts: commit_ts,
1779 diff,
1780 });
1781 catalog.apply_updates(updates)?;
1782 catalog.upper = commit_ts.step_forward();
1783 catalog.upper
1784 }
1785 Mode::Readonly => {
1786 if !updates.is_empty() {
1790 return Err(DurableCatalogError::NotWritable(format!(
1791 "cannot commit a transaction in a read-only catalog: {updates:#?}"
1792 ))
1793 .into());
1794 }
1795 catalog.upper
1796 }
1797 };
1798
1799 Ok(next_upper)
1800 }
1801 self.metrics.transaction_commits.inc();
1802 let counter = self.metrics.transaction_commit_latency_seconds.clone();
1803 commit_transaction_inner(self, txn_batch, commit_ts)
1804 .wall_time()
1805 .inc_by(counter)
1806 .await
1807 }
1808
1809 #[mz_ore::instrument(level = "debug")]
1810 async fn confirm_leadership(&mut self) -> Result<(), CatalogError> {
1811 if self.is_read_only() {
1813 return Ok(());
1814 }
1815 self.sync_to_current_upper().await?;
1816 Ok(())
1817 }
1818}
1819
1820pub fn shard_id(organization_id: Uuid, seed: usize) -> ShardId {
1822 let hash = sha2::Sha256::digest(format!("{organization_id}{seed}")).to_vec();
1823 soft_assert_eq_or_log!(hash.len(), 32, "SHA256 returns 32 bytes (256 bits)");
1824 let uuid = Uuid::from_slice(&hash[0..16]).expect("from_slice accepts exactly 16 bytes");
1825 ShardId::from_str(&format!("s{uuid}")).expect("known to be valid")
1826}
1827
1828fn desc() -> RelationDesc {
1831 RelationDesc::builder()
1832 .with_column("data", SqlScalarType::Jsonb.nullable(false))
1833 .finish()
1834}
1835
1836fn as_of(
1839 read_handle: &ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1840 upper: Timestamp,
1841) -> Timestamp {
1842 let since = read_handle.since().clone();
1843 let mut as_of = upper.checked_sub(1).unwrap_or_else(|| {
1844 panic!("catalog persist shard should be initialize, found upper: {upper:?}")
1845 });
1846 soft_assert_or_log!(
1849 since.less_equal(&as_of),
1850 "since={since:?}, as_of={as_of:?}; since must be less than or equal to as_of"
1851 );
1852 as_of.advance_by(since.borrow());
1855 as_of
1856}
1857
1858async fn fetch_catalog_upgrade_shard_version(
1861 persist_client: &PersistClient,
1862 upgrade_shard_id: ShardId,
1863) -> Option<semver::Version> {
1864 let shard_state = persist_client
1865 .inspect_shard::<Timestamp>(&upgrade_shard_id)
1866 .await
1867 .ok()?;
1868 let json_state = serde_json::to_value(shard_state).expect("state serialization error");
1869 let upgrade_version = json_state
1870 .get("applier_version")
1871 .cloned()
1872 .expect("missing applier_version");
1873 let upgrade_version =
1874 serde_json::from_value(upgrade_version).expect("version deserialization error");
1875 Some(upgrade_version)
1876}
1877
1878#[mz_ore::instrument(level = "debug")]
1883async fn snapshot_binary(
1884 read_handle: &mut ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1885 as_of: Timestamp,
1886 metrics: &Arc<Metrics>,
1887) -> impl Iterator<Item = StateUpdate<StateUpdateKindJson>> + DoubleEndedIterator + use<> {
1888 metrics.snapshots_taken.inc();
1889 let counter = metrics.snapshot_latency_seconds.clone();
1890 snapshot_binary_inner(read_handle, as_of)
1891 .wall_time()
1892 .inc_by(counter)
1893 .await
1894}
1895
1896#[mz_ore::instrument(level = "debug")]
1901async fn snapshot_binary_inner(
1902 read_handle: &mut ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1903 as_of: Timestamp,
1904) -> impl Iterator<Item = StateUpdate<StateUpdateKindJson>> + DoubleEndedIterator + use<> {
1905 let snapshot = read_handle
1906 .snapshot_and_fetch(Antichain::from_elem(as_of))
1907 .await
1908 .expect("we have advanced the restart_as_of by the since");
1909 soft_assert_no_log!(
1910 snapshot.iter().all(|(_, _, diff)| *diff == 1),
1911 "snapshot_and_fetch guarantees a consolidated result: {snapshot:#?}"
1912 );
1913 snapshot
1914 .into_iter()
1915 .map(Into::<StateUpdate<StateUpdateKindJson>>::into)
1916 .sorted_by(|a, b| Ord::cmp(&b.ts, &a.ts))
1917}
1918
1919pub(crate) fn antichain_to_timestamp(antichain: Antichain<Timestamp>) -> Timestamp {
1924 antichain
1925 .into_option()
1926 .expect("we use a totally ordered time and never finalize the shard")
1927}
1928
1929impl Trace {
1932 fn from_snapshot(snapshot: impl IntoIterator<Item = StateUpdate>) -> Trace {
1934 let mut trace = Trace::new();
1935 for StateUpdate { kind, ts, diff } in snapshot {
1936 match kind {
1937 StateUpdateKind::AuditLog(k, v) => trace.audit_log.values.push(((k, v), ts, diff)),
1938 StateUpdateKind::Cluster(k, v) => trace.clusters.values.push(((k, v), ts, diff)),
1939 StateUpdateKind::ClusterReplica(k, v) => {
1940 trace.cluster_replicas.values.push(((k, v), ts, diff))
1941 }
1942 StateUpdateKind::Comment(k, v) => trace.comments.values.push(((k, v), ts, diff)),
1943 StateUpdateKind::Config(k, v) => trace.configs.values.push(((k, v), ts, diff)),
1944 StateUpdateKind::Database(k, v) => trace.databases.values.push(((k, v), ts, diff)),
1945 StateUpdateKind::DefaultPrivilege(k, v) => {
1946 trace.default_privileges.values.push(((k, v), ts, diff))
1947 }
1948 StateUpdateKind::FenceToken(_) => {
1949 }
1951 StateUpdateKind::IdAllocator(k, v) => {
1952 trace.id_allocator.values.push(((k, v), ts, diff))
1953 }
1954 StateUpdateKind::IntrospectionSourceIndex(k, v) => {
1955 trace.introspection_sources.values.push(((k, v), ts, diff))
1956 }
1957 StateUpdateKind::Item(k, v) => trace.items.values.push(((k, v), ts, diff)),
1958 StateUpdateKind::NetworkPolicy(k, v) => {
1959 trace.network_policies.values.push(((k, v), ts, diff))
1960 }
1961 StateUpdateKind::Role(k, v) => trace.roles.values.push(((k, v), ts, diff)),
1962 StateUpdateKind::Schema(k, v) => trace.schemas.values.push(((k, v), ts, diff)),
1963 StateUpdateKind::Setting(k, v) => trace.settings.values.push(((k, v), ts, diff)),
1964 StateUpdateKind::SourceReferences(k, v) => {
1965 trace.source_references.values.push(((k, v), ts, diff))
1966 }
1967 StateUpdateKind::SystemConfiguration(k, v) => {
1968 trace.system_configurations.values.push(((k, v), ts, diff))
1969 }
1970 StateUpdateKind::SystemObjectMapping(k, v) => {
1971 trace.system_object_mappings.values.push(((k, v), ts, diff))
1972 }
1973 StateUpdateKind::SystemPrivilege(k, v) => {
1974 trace.system_privileges.values.push(((k, v), ts, diff))
1975 }
1976 StateUpdateKind::StorageCollectionMetadata(k, v) => trace
1977 .storage_collection_metadata
1978 .values
1979 .push(((k, v), ts, diff)),
1980 StateUpdateKind::UnfinalizedShard(k, ()) => {
1981 trace.unfinalized_shards.values.push(((k, ()), ts, diff))
1982 }
1983 StateUpdateKind::TxnWalShard((), v) => {
1984 trace.txn_wal_shard.values.push((((), v), ts, diff))
1985 }
1986 StateUpdateKind::RoleAuth(k, v) => trace.role_auth.values.push(((k, v), ts, diff)),
1987 }
1988 }
1989 trace
1990 }
1991}
1992
1993impl UnopenedPersistCatalogState {
1994 #[mz_ore::instrument]
1996 pub(crate) async fn debug_edit<T: Collection>(
1997 &mut self,
1998 key: T::Key,
1999 value: T::Value,
2000 ) -> Result<Option<T::Value>, CatalogError>
2001 where
2002 T::Key: PartialEq + Eq + Debug + Clone,
2003 T::Value: Debug + Clone,
2004 {
2005 let prev_value = loop {
2006 let key = key.clone();
2007 let value = value.clone();
2008 let snapshot = self.current_snapshot().await?;
2009 let trace = Trace::from_snapshot(snapshot);
2010 let collection_trace = T::collection_trace(trace);
2011 let prev_values: Vec<_> = collection_trace
2012 .values
2013 .into_iter()
2014 .filter(|((k, _), _, diff)| {
2015 soft_assert_eq_or_log!(*diff, Diff::ONE, "trace is consolidated");
2016 &key == k
2017 })
2018 .collect();
2019
2020 let prev_value = match &prev_values[..] {
2021 [] => None,
2022 [((_, v), _, _)] => Some(v.clone()),
2023 prev_values => panic!("multiple values found for key {key:?}: {prev_values:?}"),
2024 };
2025
2026 let mut updates: Vec<_> = prev_values
2027 .into_iter()
2028 .map(|((k, v), _, _)| (T::update(k, v), Diff::MINUS_ONE))
2029 .collect();
2030 updates.push((T::update(key, value), Diff::ONE));
2031 match self.fenceable_token.generate_unfenced_token(self.mode)? {
2033 Some((fence_updates, current_fenceable_token)) => {
2034 updates.extend(fence_updates.clone());
2035 match self.compare_and_append(updates, self.upper).await {
2036 Ok(_) => {
2037 self.fenceable_token = current_fenceable_token;
2038 break prev_value;
2039 }
2040 Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
2041 Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
2042 warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
2043 continue;
2044 }
2045 }
2046 }
2047 None => {
2048 self.compare_and_append(updates, self.upper)
2049 .await
2050 .map_err(|e| e.unwrap_fence_error())?;
2051 break prev_value;
2052 }
2053 }
2054 };
2055 Ok(prev_value)
2056 }
2057
2058 #[mz_ore::instrument]
2060 pub(crate) async fn debug_delete<T: Collection>(
2061 &mut self,
2062 key: T::Key,
2063 ) -> Result<(), CatalogError>
2064 where
2065 T::Key: PartialEq + Eq + Debug + Clone,
2066 T::Value: Debug,
2067 {
2068 loop {
2069 let key = key.clone();
2070 let snapshot = self.current_snapshot().await?;
2071 let trace = Trace::from_snapshot(snapshot);
2072 let collection_trace = T::collection_trace(trace);
2073 let mut retractions: Vec<_> = collection_trace
2074 .values
2075 .into_iter()
2076 .filter(|((k, _), _, diff)| {
2077 soft_assert_eq_or_log!(*diff, Diff::ONE, "trace is consolidated");
2078 &key == k
2079 })
2080 .map(|((k, v), _, _)| (T::update(k, v), Diff::MINUS_ONE))
2081 .collect();
2082
2083 match self.fenceable_token.generate_unfenced_token(self.mode)? {
2085 Some((fence_updates, current_fenceable_token)) => {
2086 retractions.extend(fence_updates.clone());
2087 match self.compare_and_append(retractions, self.upper).await {
2088 Ok(_) => {
2089 self.fenceable_token = current_fenceable_token;
2090 break;
2091 }
2092 Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
2093 Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
2094 warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
2095 continue;
2096 }
2097 }
2098 }
2099 None => {
2100 self.compare_and_append(retractions, self.upper)
2101 .await
2102 .map_err(|e| e.unwrap_fence_error())?;
2103 break;
2104 }
2105 }
2106 }
2107 Ok(())
2108 }
2109
2110 async fn current_snapshot(
2115 &mut self,
2116 ) -> Result<impl IntoIterator<Item = StateUpdate> + '_, CatalogError> {
2117 self.sync_to_current_upper().await?;
2118 self.consolidate();
2119 Ok(self.snapshot.iter().cloned().map(|(kind, ts, diff)| {
2120 let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
2121 StateUpdate { kind, ts, diff }
2122 }))
2123 }
2124}